9760fc64d319b6d7ca279b7605abefad7b26e211
[openvswitch] / python / ovs / db / idl.py
1 # Copyright (c) 2009, 2010, 2011, 2012 Nicira Networks
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at:
6 #
7 #     http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14
15 import uuid
16
17 import ovs.jsonrpc
18 import ovs.db.parser
19 import ovs.db.schema
20 from ovs.db import error
21 import ovs.ovsuuid
22 import ovs.poller
23 import ovs.vlog
24
25 vlog = ovs.vlog.Vlog("idl")
26
27 __pychecker__ = 'no-classattr no-objattrs'
28
29
30 class Idl:
31     """Open vSwitch Database Interface Definition Language (OVSDB IDL).
32
33     The OVSDB IDL maintains an in-memory replica of a database.  It issues RPC
34     requests to an OVSDB database server and parses the responses, converting
35     raw JSON into data structures that are easier for clients to digest.
36
37     The IDL also assists with issuing database transactions.  The client
38     creates a transaction, manipulates the IDL data structures, and commits or
39     aborts the transaction.  The IDL then composes and issues the necessary
40     JSON-RPC requests and reports to the client whether the transaction
41     completed successfully.
42
43     The client is allowed to access the following attributes directly, in a
44     read-only fashion:
45
46     - 'tables': This is the 'tables' map in the ovs.db.schema.DbSchema provided
47       to the Idl constructor.  Each ovs.db.schema.TableSchema in the map is
48       annotated with a new attribute 'rows', which is a dict from a uuid.UUID
49       to a Row object.
50
51       The client may directly read and write the Row objects referenced by the
52       'rows' map values.  Refer to Row for more details.
53
54     - 'change_seqno': A number that represents the IDL's state.  When the IDL
55       is updated (by Idl.run()), its value changes.
56
57     - 'lock_name': The name of the lock configured with Idl.set_lock(), or None
58       if no lock is configured.
59
60     - 'has_lock': True, if the IDL is configured to obtain a lock and owns that
61       lock, and False otherwise.
62
63       Locking and unlocking happens asynchronously from the database client's
64       point of view, so the information is only useful for optimization
65       (e.g. if the client doesn't have the lock then there's no point in trying
66       to write to the database).
67
68     - 'is_lock_contended': True, if the IDL is configured to obtain a lock but
69       the database server has indicated that some other client already owns the
70       requested lock, and False otherwise.
71
72     - 'txn': The ovs.db.idl.Transaction object for the database transaction
73       currently being constructed, if there is one, or None otherwise.
74 """
75
76     def __init__(self, remote, schema):
77         """Creates and returns a connection to the database named 'db_name' on
78         'remote', which should be in a form acceptable to
79         ovs.jsonrpc.session.open().  The connection will maintain an in-memory
80         replica of the remote database.
81
82         'schema' should be the schema for the remote database.  The caller may
83         have cut it down by removing tables or columns that are not of
84         interest.  The IDL will only replicate the tables and columns that
85         remain.  The caller may also add a attribute named 'alert' to selected
86         remaining columns, setting its value to False; if so, then changes to
87         those columns will not be considered changes to the database for the
88         purpose of the return value of Idl.run() and Idl.change_seqno.  This is
89         useful for columns that the IDL's client will write but not read.
90
91         As a convenience to users, 'schema' may also be an instance of the
92         SchemaHelper class.
93
94         The IDL uses and modifies 'schema' directly."""
95
96         assert isinstance(schema, SchemaHelper)
97         schema = schema.get_idl_schema()
98
99         self.tables = schema.tables
100         self._db = schema
101         self._session = ovs.jsonrpc.Session.open(remote)
102         self._monitor_request_id = None
103         self._last_seqno = None
104         self.change_seqno = 0
105
106         # Database locking.
107         self.lock_name = None          # Name of lock we need, None if none.
108         self.has_lock = False          # Has db server said we have the lock?
109         self.is_lock_contended = False  # Has db server said we can't get lock?
110         self._lock_request_id = None   # JSON-RPC ID of in-flight lock request.
111
112         # Transaction support.
113         self.txn = None
114         self._outstanding_txns = {}
115
116         for table in schema.tables.itervalues():
117             for column in table.columns.itervalues():
118                 if not hasattr(column, 'alert'):
119                     column.alert = True
120             table.need_table = False
121             table.rows = {}
122             table.idl = self
123
124     def close(self):
125         """Closes the connection to the database.  The IDL will no longer
126         update."""
127         self._session.close()
128
129     def run(self):
130         """Processes a batch of messages from the database server.  Returns
131         True if the database as seen through the IDL changed, False if it did
132         not change.  The initial fetch of the entire contents of the remote
133         database is considered to be one kind of change.  If the IDL has been
134         configured to acquire a database lock (with Idl.set_lock()), then
135         successfully acquiring the lock is also considered to be a change.
136
137         This function can return occasional false positives, that is, report
138         that the database changed even though it didn't.  This happens if the
139         connection to the database drops and reconnects, which causes the
140         database contents to be reloaded even if they didn't change.  (It could
141         also happen if the database server sends out a "change" that reflects
142         what we already thought was in the database, but the database server is
143         not supposed to do that.)
144
145         As an alternative to checking the return value, the client may check
146         for changes in self.change_seqno."""
147         assert not self.txn
148         initial_change_seqno = self.change_seqno
149         self._session.run()
150         i = 0
151         while i < 50:
152             i += 1
153             if not self._session.is_connected():
154                 break
155
156             seqno = self._session.get_seqno()
157             if seqno != self._last_seqno:
158                 self._last_seqno = seqno
159                 self.__txn_abort_all()
160                 self.__send_monitor_request()
161                 if self.lock_name:
162                     self.__send_lock_request()
163                 break
164
165             msg = self._session.recv()
166             if msg is None:
167                 break
168             if (msg.type == ovs.jsonrpc.Message.T_NOTIFY
169                 and msg.method == "update"
170                 and len(msg.params) == 2
171                 and msg.params[0] == None):
172                 # Database contents changed.
173                 self.__parse_update(msg.params[1])
174             elif (msg.type == ovs.jsonrpc.Message.T_REPLY
175                   and self._monitor_request_id is not None
176                   and self._monitor_request_id == msg.id):
177                 # Reply to our "monitor" request.
178                 try:
179                     self.change_seqno += 1
180                     self._monitor_request_id = None
181                     self.__clear()
182                     self.__parse_update(msg.result)
183                 except error.Error, e:
184                     vlog.err("%s: parse error in received schema: %s"
185                               % (self._session.get_name(), e))
186                     self.__error()
187             elif (msg.type == ovs.jsonrpc.Message.T_REPLY
188                   and self._lock_request_id is not None
189                   and self._lock_request_id == msg.id):
190                 # Reply to our "lock" request.
191                 self.__parse_lock_reply(msg.result)
192             elif (msg.type == ovs.jsonrpc.Message.T_NOTIFY
193                   and msg.method == "locked"):
194                 # We got our lock.
195                 self.__parse_lock_notify(msg.params, True)
196             elif (msg.type == ovs.jsonrpc.Message.T_NOTIFY
197                   and msg.method == "stolen"):
198                 # Someone else stole our lock.
199                 self.__parse_lock_notify(msg.params, False)
200             elif msg.type == ovs.jsonrpc.Message.T_NOTIFY and msg.id == "echo":
201                 # Reply to our echo request.  Ignore it.
202                 pass
203             elif (msg.type in (ovs.jsonrpc.Message.T_ERROR,
204                                ovs.jsonrpc.Message.T_REPLY)
205                   and self.__txn_process_reply(msg)):
206                 # __txn_process_reply() did everything needed.
207                 pass
208             else:
209                 # This can happen if a transaction is destroyed before we
210                 # receive the reply, so keep the log level low.
211                 vlog.dbg("%s: received unexpected %s message"
212                          % (self._session.get_name(),
213                              ovs.jsonrpc.Message.type_to_string(msg.type)))
214
215         return initial_change_seqno != self.change_seqno
216
217     def wait(self, poller):
218         """Arranges for poller.block() to wake up when self.run() has something
219         to do or when activity occurs on a transaction on 'self'."""
220         self._session.wait(poller)
221         self._session.recv_wait(poller)
222
223     def has_ever_connected(self):
224         """Returns True, if the IDL successfully connected to the remote
225         database and retrieved its contents (even if the connection
226         subsequently dropped and is in the process of reconnecting).  If so,
227         then the IDL contains an atomic snapshot of the database's contents
228         (but it might be arbitrarily old if the connection dropped).
229
230         Returns False if the IDL has never connected or retrieved the
231         database's contents.  If so, the IDL is empty."""
232         return self.change_seqno != 0
233
234     def force_reconnect(self):
235         """Forces the IDL to drop its connection to the database and reconnect.
236         In the meantime, the contents of the IDL will not change."""
237         self._session.force_reconnect()
238
239     def set_lock(self, lock_name):
240         """If 'lock_name' is not None, configures the IDL to obtain the named
241         lock from the database server and to avoid modifying the database when
242         the lock cannot be acquired (that is, when another client has the same
243         lock).
244
245         If 'lock_name' is None, drops the locking requirement and releases the
246         lock."""
247         assert not self.txn
248         assert not self._outstanding_txns
249
250         if self.lock_name and (not lock_name or lock_name != self.lock_name):
251             # Release previous lock.
252             self.__send_unlock_request()
253             self.lock_name = None
254             self.is_lock_contended = False
255
256         if lock_name and not self.lock_name:
257             # Acquire new lock.
258             self.lock_name = lock_name
259             self.__send_lock_request()
260
261     def __clear(self):
262         changed = False
263
264         for table in self.tables.itervalues():
265             if table.rows:
266                 changed = True
267                 table.rows = {}
268
269         if changed:
270             self.change_seqno += 1
271
272     def __update_has_lock(self, new_has_lock):
273         if new_has_lock and not self.has_lock:
274             if self._monitor_request_id is None:
275                 self.change_seqno += 1
276             else:
277                 # We're waiting for a monitor reply, so don't signal that the
278                 # database changed.  The monitor reply will increment
279                 # change_seqno anyhow.
280                 pass
281             self.is_lock_contended = False
282         self.has_lock = new_has_lock
283
284     def __do_send_lock_request(self, method):
285         self.__update_has_lock(False)
286         self._lock_request_id = None
287         if self._session.is_connected():
288             msg = ovs.jsonrpc.Message.create_request(method, [self.lock_name])
289             msg_id = msg.id
290             self._session.send(msg)
291         else:
292             msg_id = None
293         return msg_id
294
295     def __send_lock_request(self):
296         self._lock_request_id = self.__do_send_lock_request("lock")
297
298     def __send_unlock_request(self):
299         self.__do_send_lock_request("unlock")
300
301     def __parse_lock_reply(self, result):
302         self._lock_request_id = None
303         got_lock = type(result) == dict and result.get("locked") is True
304         self.__update_has_lock(got_lock)
305         if not got_lock:
306             self.is_lock_contended = True
307
308     def __parse_lock_notify(self, params, new_has_lock):
309         if (self.lock_name is not None
310             and type(params) in (list, tuple)
311             and params
312             and params[0] == self.lock_name):
313             self.__update_has_lock(self, new_has_lock)
314             if not new_has_lock:
315                 self.is_lock_contended = True
316
317     def __send_monitor_request(self):
318         monitor_requests = {}
319         for table in self.tables.itervalues():
320             monitor_requests[table.name] = {"columns": table.columns.keys()}
321         msg = ovs.jsonrpc.Message.create_request(
322             "monitor", [self._db.name, None, monitor_requests])
323         self._monitor_request_id = msg.id
324         self._session.send(msg)
325
326     def __parse_update(self, update):
327         try:
328             self.__do_parse_update(update)
329         except error.Error, e:
330             vlog.err("%s: error parsing update: %s"
331                      % (self._session.get_name(), e))
332
333     def __do_parse_update(self, table_updates):
334         if type(table_updates) != dict:
335             raise error.Error("<table-updates> is not an object",
336                               table_updates)
337
338         for table_name, table_update in table_updates.iteritems():
339             table = self.tables.get(table_name)
340             if not table:
341                 raise error.Error('<table-updates> includes unknown '
342                                   'table "%s"' % table_name)
343
344             if type(table_update) != dict:
345                 raise error.Error('<table-update> for table "%s" is not '
346                                   'an object' % table_name, table_update)
347
348             for uuid_string, row_update in table_update.iteritems():
349                 if not ovs.ovsuuid.is_valid_string(uuid_string):
350                     raise error.Error('<table-update> for table "%s" '
351                                       'contains bad UUID "%s" as member '
352                                       'name' % (table_name, uuid_string),
353                                       table_update)
354                 uuid = ovs.ovsuuid.from_string(uuid_string)
355
356                 if type(row_update) != dict:
357                     raise error.Error('<table-update> for table "%s" '
358                                       'contains <row-update> for %s that '
359                                       'is not an object'
360                                       % (table_name, uuid_string))
361
362                 parser = ovs.db.parser.Parser(row_update, "row-update")
363                 old = parser.get_optional("old", [dict])
364                 new = parser.get_optional("new", [dict])
365                 parser.finish()
366
367                 if not old and not new:
368                     raise error.Error('<row-update> missing "old" and '
369                                       '"new" members', row_update)
370
371                 if self.__process_update(table, uuid, old, new):
372                     self.change_seqno += 1
373
374     def __process_update(self, table, uuid, old, new):
375         """Returns True if a column changed, False otherwise."""
376         row = table.rows.get(uuid)
377         changed = False
378         if not new:
379             # Delete row.
380             if row:
381                 del table.rows[uuid]
382                 changed = True
383             else:
384                 # XXX rate-limit
385                 vlog.warn("cannot delete missing row %s from table %s"
386                           % (uuid, table.name))
387         elif not old:
388             # Insert row.
389             if not row:
390                 row = self.__create_row(table, uuid)
391                 changed = True
392             else:
393                 # XXX rate-limit
394                 vlog.warn("cannot add existing row %s to table %s"
395                           % (uuid, table.name))
396             if self.__row_update(table, row, new):
397                 changed = True
398         else:
399             if not row:
400                 row = self.__create_row(table, uuid)
401                 changed = True
402                 # XXX rate-limit
403                 vlog.warn("cannot modify missing row %s in table %s"
404                           % (uuid, table.name))
405             if self.__row_update(table, row, new):
406                 changed = True
407         return changed
408
409     def __row_update(self, table, row, row_json):
410         changed = False
411         for column_name, datum_json in row_json.iteritems():
412             column = table.columns.get(column_name)
413             if not column:
414                 # XXX rate-limit
415                 vlog.warn("unknown column %s updating table %s"
416                           % (column_name, table.name))
417                 continue
418
419             try:
420                 datum = ovs.db.data.Datum.from_json(column.type, datum_json)
421             except error.Error, e:
422                 # XXX rate-limit
423                 vlog.warn("error parsing column %s in table %s: %s"
424                           % (column_name, table.name, e))
425                 continue
426
427             if datum != row._data[column_name]:
428                 row._data[column_name] = datum
429                 if column.alert:
430                     changed = True
431             else:
432                 # Didn't really change but the OVSDB monitor protocol always
433                 # includes every value in a row.
434                 pass
435         return changed
436
437     def __create_row(self, table, uuid):
438         data = {}
439         for column in table.columns.itervalues():
440             data[column.name] = ovs.db.data.Datum.default(column.type)
441         row = table.rows[uuid] = Row(self, table, uuid, data)
442         return row
443
444     def __error(self):
445         self._session.force_reconnect()
446
447     def __txn_abort_all(self):
448         while self._outstanding_txns:
449             txn = self._outstanding_txns.popitem()[1]
450             txn._status = Transaction.TRY_AGAIN
451
452     def __txn_process_reply(self, msg):
453         txn = self._outstanding_txns.pop(msg.id, None)
454         if txn:
455             txn._process_reply(msg)
456
457
458 def _uuid_to_row(atom, base):
459     if base.ref_table:
460         return base.ref_table.rows.get(atom)
461     else:
462         return atom
463
464
465 def _row_to_uuid(value):
466     if type(value) == Row:
467         return value.uuid
468     else:
469         return value
470
471
472 class Row(object):
473     """A row within an IDL.
474
475     The client may access the following attributes directly:
476
477     - 'uuid': a uuid.UUID object whose value is the row's database UUID.
478
479     - An attribute for each column in the Row's table, named for the column,
480       whose values are as returned by Datum.to_python() for the column's type.
481
482       If some error occurs (e.g. the database server's idea of the column is
483       different from the IDL's idea), then the attribute values is the
484       "default" value return by Datum.default() for the column's type.  (It is
485       important to know this because the default value may violate constraints
486       for the column's type, e.g. the default integer value is 0 even if column
487       contraints require the column's value to be positive.)
488
489       When a transaction is active, column attributes may also be assigned new
490       values.  Committing the transaction will then cause the new value to be
491       stored into the database.
492
493       *NOTE*: In the current implementation, the value of a column is a *copy*
494       of the value in the database.  This means that modifying its value
495       directly will have no useful effect.  For example, the following:
496         row.mycolumn["a"] = "b"              # don't do this
497       will not change anything in the database, even after commit.  To modify
498       the column, instead assign the modified column value back to the column:
499         d = row.mycolumn
500         d["a"] = "b"
501         row.mycolumn = d
502 """
503     def __init__(self, idl, table, uuid, data):
504         # All of the explicit references to self.__dict__ below are required
505         # to set real attributes with invoking self.__getattr__().
506         self.__dict__["uuid"] = uuid
507
508         self.__dict__["_idl"] = idl
509         self.__dict__["_table"] = table
510
511         # _data is the committed data.  It takes the following values:
512         #
513         #   - A dictionary that maps every column name to a Datum, if the row
514         #     exists in the committed form of the database.
515         #
516         #   - None, if this row is newly inserted within the active transaction
517         #     and thus has no committed form.
518         self.__dict__["_data"] = data
519
520         # _changes describes changes to this row within the active transaction.
521         # It takes the following values:
522         #
523         #   - {}, the empty dictionary, if no transaction is active or if the
524         #     row has yet not been changed within this transaction.
525         #
526         #   - A dictionary that maps a column name to its new Datum, if an
527         #     active transaction changes those columns' values.
528         #
529         #   - A dictionary that maps every column name to a Datum, if the row
530         #     is newly inserted within the active transaction.
531         #
532         #   - None, if this transaction deletes this row.
533         self.__dict__["_changes"] = {}
534
535         # A dictionary whose keys are the names of columns that must be
536         # verified as prerequisites when the transaction commits.  The values
537         # in the dictionary are all None.
538         self.__dict__["_prereqs"] = {}
539
540     def __getattr__(self, column_name):
541         assert self._changes is not None
542
543         datum = self._changes.get(column_name)
544         if datum is None:
545             datum = self._data[column_name]
546
547         return datum.to_python(_uuid_to_row)
548
549     def __setattr__(self, column_name, value):
550         assert self._changes is not None
551         assert self._idl.txn
552
553         column = self._table.columns[column_name]
554         try:
555             datum = ovs.db.data.Datum.from_python(column.type, value,
556                                                   _row_to_uuid)
557         except error.Error, e:
558             # XXX rate-limit
559             vlog.err("attempting to write bad value to column %s (%s)"
560                      % (column_name, e))
561             return
562         self._idl.txn._write(self, column, datum)
563
564     def verify(self, column_name):
565         """Causes the original contents of column 'column_name' in this row to
566         be verified as a prerequisite to completing the transaction.  That is,
567         if 'column_name' changed in this row (or if this row was deleted)
568         between the time that the IDL originally read its contents and the time
569         that the transaction commits, then the transaction aborts and
570         Transaction.commit() returns Transaction.TRY_AGAIN.
571
572         The intention is that, to ensure that no transaction commits based on
573         dirty reads, an application should call Row.verify() on each data item
574         read as part of a read-modify-write operation.
575
576         In some cases Row.verify() reduces to a no-op, because the current
577         value of the column is already known:
578
579           - If this row is a row created by the current transaction (returned
580             by Transaction.insert()).
581
582           - If the column has already been modified within the current
583             transaction.
584
585         Because of the latter property, always call Row.verify() *before*
586         modifying the column, for a given read-modify-write.
587
588         A transaction must be in progress."""
589         assert self._idl.txn
590         assert self._changes is not None
591         if not self._data or column_name in self._changes:
592             return
593
594         self._prereqs[column_name] = None
595
596     def delete(self):
597         """Deletes this row from its table.
598
599         A transaction must be in progress."""
600         assert self._idl.txn
601         assert self._changes is not None
602         if self._data is None:
603             del self._idl.txn._txn_rows[self.uuid]
604         self.__dict__["_changes"] = None
605         del self._table.rows[self.uuid]
606
607
608 def _uuid_name_from_uuid(uuid):
609     return "row%s" % str(uuid).replace("-", "_")
610
611
612 def _where_uuid_equals(uuid):
613     return [["_uuid", "==", ["uuid", str(uuid)]]]
614
615
616 class _InsertedRow(object):
617     def __init__(self, op_index):
618         self.op_index = op_index
619         self.real = None
620
621
622 class Transaction(object):
623     # Status values that Transaction.commit() can return.
624     UNCOMMITTED = "uncommitted"  # Not yet committed or aborted.
625     UNCHANGED = "unchanged"      # Transaction didn't include any changes.
626     INCOMPLETE = "incomplete"    # Commit in progress, please wait.
627     ABORTED = "aborted"          # ovsdb_idl_txn_abort() called.
628     SUCCESS = "success"          # Commit successful.
629     TRY_AGAIN = "try again"      # Commit failed because a "verify" operation
630                                  # reported an inconsistency, due to a network
631                                  # problem, or other transient failure.  Wait
632                                  # for a change, then try again.
633     NOT_LOCKED = "not locked"    # Server hasn't given us the lock yet.
634     ERROR = "error"              # Commit failed due to a hard error.
635
636     @staticmethod
637     def status_to_string(status):
638         """Converts one of the status values that Transaction.commit() can
639         return into a human-readable string.
640
641         (The status values are in fact such strings already, so
642         there's nothing to do.)"""
643         return status
644
645     def __init__(self, idl):
646         """Starts a new transaction on 'idl' (an instance of ovs.db.idl.Idl).
647         A given Idl may only have a single active transaction at a time.
648
649         A Transaction may modify the contents of a database by assigning new
650         values to columns (attributes of Row), deleting rows (with
651         Row.delete()), or inserting rows (with Transaction.insert()).  It may
652         also check that columns in the database have not changed with
653         Row.verify().
654
655         When a transaction is complete (which must be before the next call to
656         Idl.run()), call Transaction.commit() or Transaction.abort()."""
657         assert idl.txn is None
658
659         idl.txn = self
660         self._request_id = None
661         self.idl = idl
662         self.dry_run = False
663         self._txn_rows = {}
664         self._status = Transaction.UNCOMMITTED
665         self._error = None
666         self._comments = []
667         self._commit_seqno = self.idl.change_seqno
668
669         self._inc_table = None
670         self._inc_column = None
671         self._inc_where = None
672
673         self._inserted_rows = {}  # Map from UUID to _InsertedRow
674
675     def add_comment(self, comment):
676         """Appens 'comment' to the comments that will be passed to the OVSDB
677         server when this transaction is committed.  (The comment will be
678         committed to the OVSDB log, which "ovsdb-tool show-log" can print in a
679         relatively human-readable form.)"""
680         self._comments.append(comment)
681
682     def increment(self, table, column, where):
683         assert not self._inc_table
684         self._inc_table = table
685         self._inc_column = column
686         self._inc_where = where
687
688     def wait(self, poller):
689         if self._status not in (Transaction.UNCOMMITTED,
690                                 Transaction.INCOMPLETE):
691             poller.immediate_wake()
692
693     def _substitute_uuids(self, json):
694         if type(json) in (list, tuple):
695             if (len(json) == 2
696                 and json[0] == 'uuid'
697                 and ovs.ovsuuid.is_valid_string(json[1])):
698                 uuid = ovs.ovsuuid.from_string(json[1])
699                 row = self._txn_rows.get(uuid, None)
700                 if row and row._data is None:
701                     return ["named-uuid", _uuid_name_from_uuid(uuid)]
702         return json
703
704     def __disassemble(self):
705         self.idl.txn = None
706
707         for row in self._txn_rows.itervalues():
708             if row._changes is None:
709                 row._table.rows[row.uuid] = row
710             elif row._data is None:
711                 del row._table.rows[row.uuid]
712             row.__dict__["_changes"] = {}
713             row.__dict__["_prereqs"] = {}
714         self._txn_rows = {}
715
716     def commit(self):
717         """Attempts to commit this transaction and returns the status of the
718         commit operation, one of the constants declared as class attributes.
719         If the return value is Transaction.INCOMPLETE, then the transaction is
720         not yet complete and the caller should try calling again later, after
721         calling Idl.run() to run the Idl.
722
723         Committing a transaction rolls back all of the changes that it made to
724         the Idl's copy of the database.  If the transaction commits
725         successfully, then the database server will send an update and, thus,
726         the Idl will be updated with the committed changes."""
727         # The status can only change if we're the active transaction.
728         # (Otherwise, our status will change only in Idl.run().)
729         if self != self.idl.txn:
730             return self._status
731
732         # If we need a lock but don't have it, give up quickly.
733         if self.idl.lock_name and not self.idl.has_lock():
734             self._status = Transaction.NOT_LOCKED
735             self.__disassemble()
736             return self._status
737
738         operations = [self.idl._db.name]
739
740         # Assert that we have the required lock (avoiding a race).
741         if self.idl.lock_name:
742             operations.append({"op": "assert",
743                                "lock": self.idl.lock_name})
744
745         # Add prerequisites and declarations of new rows.
746         for row in self._txn_rows.itervalues():
747             if row._prereqs:
748                 rows = {}
749                 columns = []
750                 for column_name in row._prereqs:
751                     columns.append(column_name)
752                     rows[column_name] = row._data[column_name].to_json()
753                 operations.append({"op": "wait",
754                                    "table": row._table.name,
755                                    "timeout": 0,
756                                    "where": _where_uuid_equals(row.uuid),
757                                    "until": "==",
758                                    "columns": columns,
759                                    "rows": [rows]})
760
761         # Add updates.
762         any_updates = False
763         for row in self._txn_rows.itervalues():
764             if row._changes is None:
765                 if row._table.is_root:
766                     operations.append({"op": "delete",
767                                        "table": row._table.name,
768                                        "where": _where_uuid_equals(row.uuid)})
769                     any_updates = True
770                 else:
771                     # Let ovsdb-server decide whether to really delete it.
772                     pass
773             elif row._changes:
774                 op = {"table": row._table.name}
775                 if row._data is None:
776                     op["op"] = "insert"
777                     op["uuid-name"] = _uuid_name_from_uuid(row.uuid)
778                     any_updates = True
779
780                     op_index = len(operations) - 1
781                     self._inserted_rows[row.uuid] = _InsertedRow(op_index)
782                 else:
783                     op["op"] = "update"
784                     op["where"] = _where_uuid_equals(row.uuid)
785
786                 row_json = {}
787                 op["row"] = row_json
788
789                 for column_name, datum in row._changes.iteritems():
790                     if row._data is not None or not datum.is_default():
791                         row_json[column_name] = (
792                                 self._substitute_uuids(datum.to_json()))
793
794                         # If anything really changed, consider it an update.
795                         # We can't suppress not-really-changed values earlier
796                         # or transactions would become nonatomic (see the big
797                         # comment inside Transaction._write()).
798                         if (not any_updates and row._data is not None and
799                             row._data[column_name] != datum):
800                             any_updates = True
801
802                 if row._data is None or row_json:
803                     operations.append(op)
804
805         # Add increment.
806         if self._inc_table and any_updates:
807             self._inc_index = len(operations) - 1
808
809             operations.append({"op": "mutate",
810                                "table": self._inc_table,
811                                "where": self._substitute_uuids(
812                                    self._inc_where),
813                                "mutations": [[self._inc_column, "+=", 1]]})
814             operations.append({"op": "select",
815                                "table": self._inc_table,
816                                "where": self._substitute_uuids(
817                                    self._inc_where),
818                                "columns": [self._inc_column]})
819
820         # Add comment.
821         if self._comments:
822             operations.append({"op": "comment",
823                                "comment": "\n".join(self._comments)})
824
825         # Dry run?
826         if self.dry_run:
827             operations.append({"op": "abort"})
828
829         if not any_updates:
830             self._status = Transaction.UNCHANGED
831         else:
832             msg = ovs.jsonrpc.Message.create_request("transact", operations)
833             self._request_id = msg.id
834             if not self.idl._session.send(msg):
835                 self.idl._outstanding_txns[self._request_id] = self
836                 self._status = Transaction.INCOMPLETE
837             else:
838                 self._status = Transaction.TRY_AGAIN
839
840         self.__disassemble()
841         return self._status
842
843     def commit_block(self):
844         while True:
845             status = self.commit()
846             if status != Transaction.INCOMPLETE:
847                 return status
848
849             self.idl.run()
850
851             poller = ovs.poller.Poller()
852             self.idl.wait(poller)
853             self.wait(poller)
854             poller.block()
855
856     def get_increment_new_value(self):
857         assert self._status == Transaction.SUCCESS
858         return self._inc_new_value
859
860     def abort(self):
861         """Aborts this transaction.  If Transaction.commit() has already been
862         called then the transaction might get committed anyhow."""
863         self.__disassemble()
864         if self._status in (Transaction.UNCOMMITTED,
865                             Transaction.INCOMPLETE):
866             self._status = Transaction.ABORTED
867
868     def get_error(self):
869         """Returns a string representing this transaction's current status,
870         suitable for use in log messages."""
871         if self._status != Transaction.ERROR:
872             return Transaction.status_to_string(self._status)
873         elif self._error:
874             return self._error
875         else:
876             return "no error details available"
877
878     def __set_error_json(self, json):
879         if self._error is None:
880             self._error = ovs.json.to_string(json)
881
882     def get_insert_uuid(self, uuid):
883         """Finds and returns the permanent UUID that the database assigned to a
884         newly inserted row, given the UUID that Transaction.insert() assigned
885         locally to that row.
886
887         Returns None if 'uuid' is not a UUID assigned by Transaction.insert()
888         or if it was assigned by that function and then deleted by Row.delete()
889         within the same transaction.  (Rows that are inserted and then deleted
890         within a single transaction are never sent to the database server, so
891         it never assigns them a permanent UUID.)
892
893         This transaction must have completed successfully."""
894         assert self._status in (Transaction.SUCCESS,
895                                 Transaction.UNCHANGED)
896         inserted_row = self._inserted_rows.get(uuid)
897         if inserted_row:
898             return inserted_row.real
899         return None
900
901     def _write(self, row, column, datum):
902         assert row._changes is not None
903
904         txn = row._idl.txn
905
906         # If this is a write-only column and the datum being written is the
907         # same as the one already there, just skip the update entirely.  This
908         # is worth optimizing because we have a lot of columns that get
909         # periodically refreshed into the database but don't actually change
910         # that often.
911         #
912         # We don't do this for read/write columns because that would break
913         # atomicity of transactions--some other client might have written a
914         # different value in that column since we read it.  (But if a whole
915         # transaction only does writes of existing values, without making any
916         # real changes, we will drop the whole transaction later in
917         # ovsdb_idl_txn_commit().)
918         if not column.alert and row._data.get(column.name) == datum:
919             new_value = row._changes.get(column.name)
920             if new_value is None or new_value == datum:
921                 return
922
923         txn._txn_rows[row.uuid] = row
924         row._changes[column.name] = datum.copy()
925
926     def insert(self, table, new_uuid=None):
927         """Inserts and returns a new row in 'table', which must be one of the
928         ovs.db.schema.TableSchema objects in the Idl's 'tables' dict.
929
930         The new row is assigned a provisional UUID.  If 'uuid' is None then one
931         is randomly generated; otherwise 'uuid' should specify a randomly
932         generated uuid.UUID not otherwise in use.  ovsdb-server will assign a
933         different UUID when 'txn' is committed, but the IDL will replace any
934         uses of the provisional UUID in the data to be to be committed by the
935         UUID assigned by ovsdb-server."""
936         assert self._status == Transaction.UNCOMMITTED
937         if new_uuid is None:
938             new_uuid = uuid.uuid4()
939         row = Row(self.idl, table, new_uuid, None)
940         table.rows[row.uuid] = row
941         self._txn_rows[row.uuid] = row
942         return row
943
944     def _process_reply(self, msg):
945         if msg.type == ovs.jsonrpc.Message.T_ERROR:
946             self._status = Transaction.ERROR
947         elif type(msg.result) not in (list, tuple):
948             # XXX rate-limit
949             vlog.warn('reply to "transact" is not JSON array')
950         else:
951             hard_errors = False
952             soft_errors = False
953             lock_errors = False
954
955             ops = msg.result
956             for op in ops:
957                 if op is None:
958                     # This isn't an error in itself but indicates that some
959                     # prior operation failed, so make sure that we know about
960                     # it.
961                     soft_errors = True
962                 elif type(op) == dict:
963                     error = op.get("error")
964                     if error is not None:
965                         if error == "timed out":
966                             soft_errors = True
967                         elif error == "not owner":
968                             lock_errors = True
969                         elif error == "aborted":
970                             pass
971                         else:
972                             hard_errors = True
973                             self.__set_error_json(op)
974                 else:
975                     hard_errors = True
976                     self.__set_error_json(op)
977                     # XXX rate-limit
978                     vlog.warn("operation reply is not JSON null or object")
979
980             if not soft_errors and not hard_errors and not lock_errors:
981                 if self._inc_table and not self.__process_inc_reply(ops):
982                     hard_errors = True
983
984                 for insert in self._inserted_rows.itervalues():
985                     if not self.__process_insert_reply(insert, ops):
986                         hard_errors = True
987
988             if hard_errors:
989                 self._status = Transaction.ERROR
990             elif lock_errors:
991                 self._status = Transaction.NOT_LOCKED
992             elif soft_errors:
993                 self._status = Transaction.TRY_AGAIN
994             else:
995                 self._status = Transaction.SUCCESS
996
997     @staticmethod
998     def __check_json_type(json, types, name):
999         if not json:
1000             # XXX rate-limit
1001             vlog.warn("%s is missing" % name)
1002             return False
1003         elif type(json) not in types:
1004             # XXX rate-limit
1005             vlog.warn("%s has unexpected type %s" % (name, type(json)))
1006             return False
1007         else:
1008             return True
1009
1010     def __process_inc_reply(self, ops):
1011         if self._inc_index + 2 > len(ops):
1012             # XXX rate-limit
1013             vlog.warn("reply does not contain enough operations for "
1014                       "increment (has %d, needs %d)" %
1015                       (len(ops), self._inc_index + 2))
1016
1017         # We know that this is a JSON object because the loop in
1018         # __process_reply() already checked.
1019         mutate = ops[self._inc_index]
1020         count = mutate.get("count")
1021         if not Transaction.__check_json_type(count, (int, long),
1022                                              '"mutate" reply "count"'):
1023             return False
1024         if count != 1:
1025             # XXX rate-limit
1026             vlog.warn('"mutate" reply "count" is %d instead of 1' % count)
1027             return False
1028
1029         select = ops[self._inc_index + 1]
1030         rows = select.get("rows")
1031         if not Transaction.__check_json_type(rows, (list, tuple),
1032                                              '"select" reply "rows"'):
1033             return False
1034         if len(rows) != 1:
1035             # XXX rate-limit
1036             vlog.warn('"select" reply "rows" has %d elements '
1037                       'instead of 1' % len(rows))
1038             return False
1039         row = rows[0]
1040         if not Transaction.__check_json_type(row, (dict,),
1041                                              '"select" reply row'):
1042             return False
1043         column = row.get(self._inc_column)
1044         if not Transaction.__check_json_type(column, (int, long),
1045                                              '"select" reply inc column'):
1046             return False
1047         self._inc_new_value = column
1048         return True
1049
1050     def __process_insert_reply(self, insert, ops):
1051         if insert.op_index >= len(ops):
1052             # XXX rate-limit
1053             vlog.warn("reply does not contain enough operations "
1054                       "for insert (has %d, needs %d)"
1055                       % (len(ops), insert.op_index))
1056             return False
1057
1058         # We know that this is a JSON object because the loop in
1059         # __process_reply() already checked.
1060         reply = ops[insert.op_index]
1061         json_uuid = reply.get("uuid")
1062         if not Transaction.__check_json_type(json_uuid, (tuple, list),
1063                                              '"insert" reply "uuid"'):
1064             return False
1065
1066         try:
1067             uuid_ = ovs.ovsuuid.from_json(json_uuid)
1068         except error.Error:
1069             # XXX rate-limit
1070             vlog.warn('"insert" reply "uuid" is not a JSON UUID')
1071             return False
1072
1073         insert.real = uuid_
1074         return True
1075
1076
1077 class SchemaHelper(object):
1078     """IDL Schema helper.
1079
1080     This class encapsulates the logic required to generate schemas suitable
1081     for creating 'ovs.db.idl.Idl' objects.  Clients should register columns
1082     they are interested in using register_columns().  When finished, the
1083     get_idl_schema() function may be called.
1084
1085     The location on disk of the schema used may be found in the
1086     'schema_location' variable."""
1087
1088     def __init__(self, location=None):
1089         """Creates a new Schema object."""
1090
1091         if location is None:
1092             location = "%s/vswitch.ovsschema" % ovs.dirs.PKGDATADIR
1093
1094         self.schema_location = location
1095         self._tables = {}
1096         self._all = False
1097
1098     def register_columns(self, table, columns):
1099         """Registers interest in the given 'columns' of 'table'.  Future calls
1100         to get_idl_schema() will include 'table':column for each column in
1101         'columns'. This function automatically avoids adding duplicate entries
1102         to the schema.
1103
1104         'table' must be a string.
1105         'columns' must be a list of strings.
1106         """
1107
1108         assert type(table) is str
1109         assert type(columns) is list
1110
1111         columns = set(columns) | self._tables.get(table, set())
1112         self._tables[table] = columns
1113
1114     def register_all(self):
1115         """Registers interest in every column of every table."""
1116         self._all = True
1117
1118     def get_idl_schema(self):
1119         """Gets a schema appropriate for the creation of an 'ovs.db.id.IDL'
1120         object based on columns registered using the register_columns()
1121         function."""
1122
1123         schema = ovs.db.schema.DbSchema.from_json(
1124             ovs.json.from_file(self.schema_location))
1125
1126         if not self._all:
1127             schema_tables = {}
1128             for table, columns in self._tables.iteritems():
1129                 schema_tables[table] = (
1130                     self._keep_table_columns(schema, table, columns))
1131
1132             schema.tables = schema_tables
1133         return schema
1134
1135     def _keep_table_columns(self, schema, table_name, columns):
1136         assert table_name in schema.tables
1137         table = schema.tables[table_name]
1138
1139         new_columns = {}
1140         for column_name in columns:
1141             assert type(column_name) is str
1142             assert column_name in table.columns
1143
1144             new_columns[column_name] = table.columns[column_name]
1145
1146         table.columns = new_columns
1147         return table