1 # Copyright (c) 2009, 2010, 2011, 2012 Nicira Networks
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at:
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
20 from ovs.db import error
25 vlog = ovs.vlog.Vlog("idl")
27 __pychecker__ = 'no-classattr no-objattrs'
31 """Open vSwitch Database Interface Definition Language (OVSDB IDL).
33 The OVSDB IDL maintains an in-memory replica of a database. It issues RPC
34 requests to an OVSDB database server and parses the responses, converting
35 raw JSON into data structures that are easier for clients to digest.
37 The IDL also assists with issuing database transactions. The client
38 creates a transaction, manipulates the IDL data structures, and commits or
39 aborts the transaction. The IDL then composes and issues the necessary
40 JSON-RPC requests and reports to the client whether the transaction
41 completed successfully.
43 The client is allowed to access the following attributes directly, in a
46 - 'tables': This is the 'tables' map in the ovs.db.schema.DbSchema provided
47 to the Idl constructor. Each ovs.db.schema.TableSchema in the map is
48 annotated with a new attribute 'rows', which is a dict from a uuid.UUID
51 The client may directly read and write the Row objects referenced by the
52 'rows' map values. Refer to Row for more details.
54 - 'change_seqno': A number that represents the IDL's state. When the IDL
55 is updated (by Idl.run()), its value changes.
57 - 'lock_name': The name of the lock configured with Idl.set_lock(), or None
58 if no lock is configured.
60 - 'has_lock': True, if the IDL is configured to obtain a lock and owns that
61 lock, and False otherwise.
63 Locking and unlocking happens asynchronously from the database client's
64 point of view, so the information is only useful for optimization
65 (e.g. if the client doesn't have the lock then there's no point in trying
66 to write to the database).
68 - 'is_lock_contended': True, if the IDL is configured to obtain a lock but
69 the database server has indicated that some other client already owns the
70 requested lock, and False otherwise.
72 - 'txn': The ovs.db.idl.Transaction object for the database transaction
73 currently being constructed, if there is one, or None otherwise.
76 def __init__(self, remote, schema):
77 """Creates and returns a connection to the database named 'db_name' on
78 'remote', which should be in a form acceptable to
79 ovs.jsonrpc.session.open(). The connection will maintain an in-memory
80 replica of the remote database.
82 'schema' should be the schema for the remote database. The caller may
83 have cut it down by removing tables or columns that are not of
84 interest. The IDL will only replicate the tables and columns that
85 remain. The caller may also add a attribute named 'alert' to selected
86 remaining columns, setting its value to False; if so, then changes to
87 those columns will not be considered changes to the database for the
88 purpose of the return value of Idl.run() and Idl.change_seqno. This is
89 useful for columns that the IDL's client will write but not read.
91 As a convenience to users, 'schema' may also be an instance of the
94 The IDL uses and modifies 'schema' directly."""
96 assert isinstance(schema, SchemaHelper)
97 schema = schema.get_idl_schema()
99 self.tables = schema.tables
101 self._session = ovs.jsonrpc.Session.open(remote)
102 self._monitor_request_id = None
103 self._last_seqno = None
104 self.change_seqno = 0
107 self.lock_name = None # Name of lock we need, None if none.
108 self.has_lock = False # Has db server said we have the lock?
109 self.is_lock_contended = False # Has db server said we can't get lock?
110 self._lock_request_id = None # JSON-RPC ID of in-flight lock request.
112 # Transaction support.
114 self._outstanding_txns = {}
116 for table in schema.tables.itervalues():
117 for column in table.columns.itervalues():
118 if not hasattr(column, 'alert'):
120 table.need_table = False
125 """Closes the connection to the database. The IDL will no longer
127 self._session.close()
130 """Processes a batch of messages from the database server. Returns
131 True if the database as seen through the IDL changed, False if it did
132 not change. The initial fetch of the entire contents of the remote
133 database is considered to be one kind of change. If the IDL has been
134 configured to acquire a database lock (with Idl.set_lock()), then
135 successfully acquiring the lock is also considered to be a change.
137 This function can return occasional false positives, that is, report
138 that the database changed even though it didn't. This happens if the
139 connection to the database drops and reconnects, which causes the
140 database contents to be reloaded even if they didn't change. (It could
141 also happen if the database server sends out a "change" that reflects
142 what we already thought was in the database, but the database server is
143 not supposed to do that.)
145 As an alternative to checking the return value, the client may check
146 for changes in self.change_seqno."""
148 initial_change_seqno = self.change_seqno
153 if not self._session.is_connected():
156 seqno = self._session.get_seqno()
157 if seqno != self._last_seqno:
158 self._last_seqno = seqno
159 self.__txn_abort_all()
160 self.__send_monitor_request()
162 self.__send_lock_request()
165 msg = self._session.recv()
168 if (msg.type == ovs.jsonrpc.Message.T_NOTIFY
169 and msg.method == "update"
170 and len(msg.params) == 2
171 and msg.params[0] == None):
172 # Database contents changed.
173 self.__parse_update(msg.params[1])
174 elif (msg.type == ovs.jsonrpc.Message.T_REPLY
175 and self._monitor_request_id is not None
176 and self._monitor_request_id == msg.id):
177 # Reply to our "monitor" request.
179 self.change_seqno += 1
180 self._monitor_request_id = None
182 self.__parse_update(msg.result)
183 except error.Error, e:
184 vlog.err("%s: parse error in received schema: %s"
185 % (self._session.get_name(), e))
187 elif (msg.type == ovs.jsonrpc.Message.T_REPLY
188 and self._lock_request_id is not None
189 and self._lock_request_id == msg.id):
190 # Reply to our "lock" request.
191 self.__parse_lock_reply(msg.result)
192 elif (msg.type == ovs.jsonrpc.Message.T_NOTIFY
193 and msg.method == "locked"):
195 self.__parse_lock_notify(msg.params, True)
196 elif (msg.type == ovs.jsonrpc.Message.T_NOTIFY
197 and msg.method == "stolen"):
198 # Someone else stole our lock.
199 self.__parse_lock_notify(msg.params, False)
200 elif msg.type == ovs.jsonrpc.Message.T_NOTIFY and msg.id == "echo":
201 # Reply to our echo request. Ignore it.
203 elif (msg.type in (ovs.jsonrpc.Message.T_ERROR,
204 ovs.jsonrpc.Message.T_REPLY)
205 and self.__txn_process_reply(msg)):
206 # __txn_process_reply() did everything needed.
209 # This can happen if a transaction is destroyed before we
210 # receive the reply, so keep the log level low.
211 vlog.dbg("%s: received unexpected %s message"
212 % (self._session.get_name(),
213 ovs.jsonrpc.Message.type_to_string(msg.type)))
215 return initial_change_seqno != self.change_seqno
217 def wait(self, poller):
218 """Arranges for poller.block() to wake up when self.run() has something
219 to do or when activity occurs on a transaction on 'self'."""
220 self._session.wait(poller)
221 self._session.recv_wait(poller)
223 def has_ever_connected(self):
224 """Returns True, if the IDL successfully connected to the remote
225 database and retrieved its contents (even if the connection
226 subsequently dropped and is in the process of reconnecting). If so,
227 then the IDL contains an atomic snapshot of the database's contents
228 (but it might be arbitrarily old if the connection dropped).
230 Returns False if the IDL has never connected or retrieved the
231 database's contents. If so, the IDL is empty."""
232 return self.change_seqno != 0
234 def force_reconnect(self):
235 """Forces the IDL to drop its connection to the database and reconnect.
236 In the meantime, the contents of the IDL will not change."""
237 self._session.force_reconnect()
239 def set_lock(self, lock_name):
240 """If 'lock_name' is not None, configures the IDL to obtain the named
241 lock from the database server and to avoid modifying the database when
242 the lock cannot be acquired (that is, when another client has the same
245 If 'lock_name' is None, drops the locking requirement and releases the
248 assert not self._outstanding_txns
250 if self.lock_name and (not lock_name or lock_name != self.lock_name):
251 # Release previous lock.
252 self.__send_unlock_request()
253 self.lock_name = None
254 self.is_lock_contended = False
256 if lock_name and not self.lock_name:
258 self.lock_name = lock_name
259 self.__send_lock_request()
264 for table in self.tables.itervalues():
270 self.change_seqno += 1
272 def __update_has_lock(self, new_has_lock):
273 if new_has_lock and not self.has_lock:
274 if self._monitor_request_id is None:
275 self.change_seqno += 1
277 # We're waiting for a monitor reply, so don't signal that the
278 # database changed. The monitor reply will increment
279 # change_seqno anyhow.
281 self.is_lock_contended = False
282 self.has_lock = new_has_lock
284 def __do_send_lock_request(self, method):
285 self.__update_has_lock(False)
286 self._lock_request_id = None
287 if self._session.is_connected():
288 msg = ovs.jsonrpc.Message.create_request(method, [self.lock_name])
290 self._session.send(msg)
295 def __send_lock_request(self):
296 self._lock_request_id = self.__do_send_lock_request("lock")
298 def __send_unlock_request(self):
299 self.__do_send_lock_request("unlock")
301 def __parse_lock_reply(self, result):
302 self._lock_request_id = None
303 got_lock = type(result) == dict and result.get("locked") is True
304 self.__update_has_lock(got_lock)
306 self.is_lock_contended = True
308 def __parse_lock_notify(self, params, new_has_lock):
309 if (self.lock_name is not None
310 and type(params) in (list, tuple)
312 and params[0] == self.lock_name):
313 self.__update_has_lock(self, new_has_lock)
315 self.is_lock_contended = True
317 def __send_monitor_request(self):
318 monitor_requests = {}
319 for table in self.tables.itervalues():
320 monitor_requests[table.name] = {"columns": table.columns.keys()}
321 msg = ovs.jsonrpc.Message.create_request(
322 "monitor", [self._db.name, None, monitor_requests])
323 self._monitor_request_id = msg.id
324 self._session.send(msg)
326 def __parse_update(self, update):
328 self.__do_parse_update(update)
329 except error.Error, e:
330 vlog.err("%s: error parsing update: %s"
331 % (self._session.get_name(), e))
333 def __do_parse_update(self, table_updates):
334 if type(table_updates) != dict:
335 raise error.Error("<table-updates> is not an object",
338 for table_name, table_update in table_updates.iteritems():
339 table = self.tables.get(table_name)
341 raise error.Error('<table-updates> includes unknown '
342 'table "%s"' % table_name)
344 if type(table_update) != dict:
345 raise error.Error('<table-update> for table "%s" is not '
346 'an object' % table_name, table_update)
348 for uuid_string, row_update in table_update.iteritems():
349 if not ovs.ovsuuid.is_valid_string(uuid_string):
350 raise error.Error('<table-update> for table "%s" '
351 'contains bad UUID "%s" as member '
352 'name' % (table_name, uuid_string),
354 uuid = ovs.ovsuuid.from_string(uuid_string)
356 if type(row_update) != dict:
357 raise error.Error('<table-update> for table "%s" '
358 'contains <row-update> for %s that '
360 % (table_name, uuid_string))
362 parser = ovs.db.parser.Parser(row_update, "row-update")
363 old = parser.get_optional("old", [dict])
364 new = parser.get_optional("new", [dict])
367 if not old and not new:
368 raise error.Error('<row-update> missing "old" and '
369 '"new" members', row_update)
371 if self.__process_update(table, uuid, old, new):
372 self.change_seqno += 1
374 def __process_update(self, table, uuid, old, new):
375 """Returns True if a column changed, False otherwise."""
376 row = table.rows.get(uuid)
385 vlog.warn("cannot delete missing row %s from table %s"
386 % (uuid, table.name))
390 row = self.__create_row(table, uuid)
394 vlog.warn("cannot add existing row %s to table %s"
395 % (uuid, table.name))
396 if self.__row_update(table, row, new):
400 row = self.__create_row(table, uuid)
403 vlog.warn("cannot modify missing row %s in table %s"
404 % (uuid, table.name))
405 if self.__row_update(table, row, new):
409 def __row_update(self, table, row, row_json):
411 for column_name, datum_json in row_json.iteritems():
412 column = table.columns.get(column_name)
415 vlog.warn("unknown column %s updating table %s"
416 % (column_name, table.name))
420 datum = ovs.db.data.Datum.from_json(column.type, datum_json)
421 except error.Error, e:
423 vlog.warn("error parsing column %s in table %s: %s"
424 % (column_name, table.name, e))
427 if datum != row._data[column_name]:
428 row._data[column_name] = datum
432 # Didn't really change but the OVSDB monitor protocol always
433 # includes every value in a row.
437 def __create_row(self, table, uuid):
439 for column in table.columns.itervalues():
440 data[column.name] = ovs.db.data.Datum.default(column.type)
441 row = table.rows[uuid] = Row(self, table, uuid, data)
445 self._session.force_reconnect()
447 def __txn_abort_all(self):
448 while self._outstanding_txns:
449 txn = self._outstanding_txns.popitem()[1]
450 txn._status = Transaction.TRY_AGAIN
452 def __txn_process_reply(self, msg):
453 txn = self._outstanding_txns.pop(msg.id, None)
455 txn._process_reply(msg)
458 def _uuid_to_row(atom, base):
460 return base.ref_table.rows.get(atom)
465 def _row_to_uuid(value):
466 if type(value) == Row:
473 """A row within an IDL.
475 The client may access the following attributes directly:
477 - 'uuid': a uuid.UUID object whose value is the row's database UUID.
479 - An attribute for each column in the Row's table, named for the column,
480 whose values are as returned by Datum.to_python() for the column's type.
482 If some error occurs (e.g. the database server's idea of the column is
483 different from the IDL's idea), then the attribute values is the
484 "default" value return by Datum.default() for the column's type. (It is
485 important to know this because the default value may violate constraints
486 for the column's type, e.g. the default integer value is 0 even if column
487 contraints require the column's value to be positive.)
489 When a transaction is active, column attributes may also be assigned new
490 values. Committing the transaction will then cause the new value to be
491 stored into the database.
493 *NOTE*: In the current implementation, the value of a column is a *copy*
494 of the value in the database. This means that modifying its value
495 directly will have no useful effect. For example, the following:
496 row.mycolumn["a"] = "b" # don't do this
497 will not change anything in the database, even after commit. To modify
498 the column, instead assign the modified column value back to the column:
503 def __init__(self, idl, table, uuid, data):
504 # All of the explicit references to self.__dict__ below are required
505 # to set real attributes with invoking self.__getattr__().
506 self.__dict__["uuid"] = uuid
508 self.__dict__["_idl"] = idl
509 self.__dict__["_table"] = table
511 # _data is the committed data. It takes the following values:
513 # - A dictionary that maps every column name to a Datum, if the row
514 # exists in the committed form of the database.
516 # - None, if this row is newly inserted within the active transaction
517 # and thus has no committed form.
518 self.__dict__["_data"] = data
520 # _changes describes changes to this row within the active transaction.
521 # It takes the following values:
523 # - {}, the empty dictionary, if no transaction is active or if the
524 # row has yet not been changed within this transaction.
526 # - A dictionary that maps a column name to its new Datum, if an
527 # active transaction changes those columns' values.
529 # - A dictionary that maps every column name to a Datum, if the row
530 # is newly inserted within the active transaction.
532 # - None, if this transaction deletes this row.
533 self.__dict__["_changes"] = {}
535 # A dictionary whose keys are the names of columns that must be
536 # verified as prerequisites when the transaction commits. The values
537 # in the dictionary are all None.
538 self.__dict__["_prereqs"] = {}
540 def __getattr__(self, column_name):
541 assert self._changes is not None
543 datum = self._changes.get(column_name)
545 datum = self._data[column_name]
547 return datum.to_python(_uuid_to_row)
549 def __setattr__(self, column_name, value):
550 assert self._changes is not None
553 column = self._table.columns[column_name]
555 datum = ovs.db.data.Datum.from_python(column.type, value,
557 except error.Error, e:
559 vlog.err("attempting to write bad value to column %s (%s)"
562 self._idl.txn._write(self, column, datum)
564 def verify(self, column_name):
565 """Causes the original contents of column 'column_name' in this row to
566 be verified as a prerequisite to completing the transaction. That is,
567 if 'column_name' changed in this row (or if this row was deleted)
568 between the time that the IDL originally read its contents and the time
569 that the transaction commits, then the transaction aborts and
570 Transaction.commit() returns Transaction.TRY_AGAIN.
572 The intention is that, to ensure that no transaction commits based on
573 dirty reads, an application should call Row.verify() on each data item
574 read as part of a read-modify-write operation.
576 In some cases Row.verify() reduces to a no-op, because the current
577 value of the column is already known:
579 - If this row is a row created by the current transaction (returned
580 by Transaction.insert()).
582 - If the column has already been modified within the current
585 Because of the latter property, always call Row.verify() *before*
586 modifying the column, for a given read-modify-write.
588 A transaction must be in progress."""
590 assert self._changes is not None
591 if not self._data or column_name in self._changes:
594 self._prereqs[column_name] = None
597 """Deletes this row from its table.
599 A transaction must be in progress."""
601 assert self._changes is not None
602 if self._data is None:
603 del self._idl.txn._txn_rows[self.uuid]
604 self.__dict__["_changes"] = None
605 del self._table.rows[self.uuid]
608 def _uuid_name_from_uuid(uuid):
609 return "row%s" % str(uuid).replace("-", "_")
612 def _where_uuid_equals(uuid):
613 return [["_uuid", "==", ["uuid", str(uuid)]]]
616 class _InsertedRow(object):
617 def __init__(self, op_index):
618 self.op_index = op_index
622 class Transaction(object):
623 # Status values that Transaction.commit() can return.
624 UNCOMMITTED = "uncommitted" # Not yet committed or aborted.
625 UNCHANGED = "unchanged" # Transaction didn't include any changes.
626 INCOMPLETE = "incomplete" # Commit in progress, please wait.
627 ABORTED = "aborted" # ovsdb_idl_txn_abort() called.
628 SUCCESS = "success" # Commit successful.
629 TRY_AGAIN = "try again" # Commit failed because a "verify" operation
630 # reported an inconsistency, due to a network
631 # problem, or other transient failure. Wait
632 # for a change, then try again.
633 NOT_LOCKED = "not locked" # Server hasn't given us the lock yet.
634 ERROR = "error" # Commit failed due to a hard error.
637 def status_to_string(status):
638 """Converts one of the status values that Transaction.commit() can
639 return into a human-readable string.
641 (The status values are in fact such strings already, so
642 there's nothing to do.)"""
645 def __init__(self, idl):
646 """Starts a new transaction on 'idl' (an instance of ovs.db.idl.Idl).
647 A given Idl may only have a single active transaction at a time.
649 A Transaction may modify the contents of a database by assigning new
650 values to columns (attributes of Row), deleting rows (with
651 Row.delete()), or inserting rows (with Transaction.insert()). It may
652 also check that columns in the database have not changed with
655 When a transaction is complete (which must be before the next call to
656 Idl.run()), call Transaction.commit() or Transaction.abort()."""
657 assert idl.txn is None
660 self._request_id = None
664 self._status = Transaction.UNCOMMITTED
667 self._commit_seqno = self.idl.change_seqno
669 self._inc_table = None
670 self._inc_column = None
671 self._inc_where = None
673 self._inserted_rows = {} # Map from UUID to _InsertedRow
675 def add_comment(self, comment):
676 """Appens 'comment' to the comments that will be passed to the OVSDB
677 server when this transaction is committed. (The comment will be
678 committed to the OVSDB log, which "ovsdb-tool show-log" can print in a
679 relatively human-readable form.)"""
680 self._comments.append(comment)
682 def increment(self, table, column, where):
683 assert not self._inc_table
684 self._inc_table = table
685 self._inc_column = column
686 self._inc_where = where
688 def wait(self, poller):
689 if self._status not in (Transaction.UNCOMMITTED,
690 Transaction.INCOMPLETE):
691 poller.immediate_wake()
693 def _substitute_uuids(self, json):
694 if type(json) in (list, tuple):
696 and json[0] == 'uuid'
697 and ovs.ovsuuid.is_valid_string(json[1])):
698 uuid = ovs.ovsuuid.from_string(json[1])
699 row = self._txn_rows.get(uuid, None)
700 if row and row._data is None:
701 return ["named-uuid", _uuid_name_from_uuid(uuid)]
704 def __disassemble(self):
707 for row in self._txn_rows.itervalues():
708 if row._changes is None:
709 row._table.rows[row.uuid] = row
710 elif row._data is None:
711 del row._table.rows[row.uuid]
712 row.__dict__["_changes"] = {}
713 row.__dict__["_prereqs"] = {}
717 """Attempts to commit this transaction and returns the status of the
718 commit operation, one of the constants declared as class attributes.
719 If the return value is Transaction.INCOMPLETE, then the transaction is
720 not yet complete and the caller should try calling again later, after
721 calling Idl.run() to run the Idl.
723 Committing a transaction rolls back all of the changes that it made to
724 the Idl's copy of the database. If the transaction commits
725 successfully, then the database server will send an update and, thus,
726 the Idl will be updated with the committed changes."""
727 # The status can only change if we're the active transaction.
728 # (Otherwise, our status will change only in Idl.run().)
729 if self != self.idl.txn:
732 # If we need a lock but don't have it, give up quickly.
733 if self.idl.lock_name and not self.idl.has_lock():
734 self._status = Transaction.NOT_LOCKED
738 operations = [self.idl._db.name]
740 # Assert that we have the required lock (avoiding a race).
741 if self.idl.lock_name:
742 operations.append({"op": "assert",
743 "lock": self.idl.lock_name})
745 # Add prerequisites and declarations of new rows.
746 for row in self._txn_rows.itervalues():
750 for column_name in row._prereqs:
751 columns.append(column_name)
752 rows[column_name] = row._data[column_name].to_json()
753 operations.append({"op": "wait",
754 "table": row._table.name,
756 "where": _where_uuid_equals(row.uuid),
763 for row in self._txn_rows.itervalues():
764 if row._changes is None:
765 if row._table.is_root:
766 operations.append({"op": "delete",
767 "table": row._table.name,
768 "where": _where_uuid_equals(row.uuid)})
771 # Let ovsdb-server decide whether to really delete it.
774 op = {"table": row._table.name}
775 if row._data is None:
777 op["uuid-name"] = _uuid_name_from_uuid(row.uuid)
780 op_index = len(operations) - 1
781 self._inserted_rows[row.uuid] = _InsertedRow(op_index)
784 op["where"] = _where_uuid_equals(row.uuid)
789 for column_name, datum in row._changes.iteritems():
790 if row._data is not None or not datum.is_default():
791 row_json[column_name] = (
792 self._substitute_uuids(datum.to_json()))
794 # If anything really changed, consider it an update.
795 # We can't suppress not-really-changed values earlier
796 # or transactions would become nonatomic (see the big
797 # comment inside Transaction._write()).
798 if (not any_updates and row._data is not None and
799 row._data[column_name] != datum):
802 if row._data is None or row_json:
803 operations.append(op)
806 if self._inc_table and any_updates:
807 self._inc_index = len(operations) - 1
809 operations.append({"op": "mutate",
810 "table": self._inc_table,
811 "where": self._substitute_uuids(
813 "mutations": [[self._inc_column, "+=", 1]]})
814 operations.append({"op": "select",
815 "table": self._inc_table,
816 "where": self._substitute_uuids(
818 "columns": [self._inc_column]})
822 operations.append({"op": "comment",
823 "comment": "\n".join(self._comments)})
827 operations.append({"op": "abort"})
830 self._status = Transaction.UNCHANGED
832 msg = ovs.jsonrpc.Message.create_request("transact", operations)
833 self._request_id = msg.id
834 if not self.idl._session.send(msg):
835 self.idl._outstanding_txns[self._request_id] = self
836 self._status = Transaction.INCOMPLETE
838 self._status = Transaction.TRY_AGAIN
843 def commit_block(self):
845 status = self.commit()
846 if status != Transaction.INCOMPLETE:
851 poller = ovs.poller.Poller()
852 self.idl.wait(poller)
856 def get_increment_new_value(self):
857 assert self._status == Transaction.SUCCESS
858 return self._inc_new_value
861 """Aborts this transaction. If Transaction.commit() has already been
862 called then the transaction might get committed anyhow."""
864 if self._status in (Transaction.UNCOMMITTED,
865 Transaction.INCOMPLETE):
866 self._status = Transaction.ABORTED
869 """Returns a string representing this transaction's current status,
870 suitable for use in log messages."""
871 if self._status != Transaction.ERROR:
872 return Transaction.status_to_string(self._status)
876 return "no error details available"
878 def __set_error_json(self, json):
879 if self._error is None:
880 self._error = ovs.json.to_string(json)
882 def get_insert_uuid(self, uuid):
883 """Finds and returns the permanent UUID that the database assigned to a
884 newly inserted row, given the UUID that Transaction.insert() assigned
887 Returns None if 'uuid' is not a UUID assigned by Transaction.insert()
888 or if it was assigned by that function and then deleted by Row.delete()
889 within the same transaction. (Rows that are inserted and then deleted
890 within a single transaction are never sent to the database server, so
891 it never assigns them a permanent UUID.)
893 This transaction must have completed successfully."""
894 assert self._status in (Transaction.SUCCESS,
895 Transaction.UNCHANGED)
896 inserted_row = self._inserted_rows.get(uuid)
898 return inserted_row.real
901 def _write(self, row, column, datum):
902 assert row._changes is not None
906 # If this is a write-only column and the datum being written is the
907 # same as the one already there, just skip the update entirely. This
908 # is worth optimizing because we have a lot of columns that get
909 # periodically refreshed into the database but don't actually change
912 # We don't do this for read/write columns because that would break
913 # atomicity of transactions--some other client might have written a
914 # different value in that column since we read it. (But if a whole
915 # transaction only does writes of existing values, without making any
916 # real changes, we will drop the whole transaction later in
917 # ovsdb_idl_txn_commit().)
918 if not column.alert and row._data.get(column.name) == datum:
919 new_value = row._changes.get(column.name)
920 if new_value is None or new_value == datum:
923 txn._txn_rows[row.uuid] = row
924 row._changes[column.name] = datum.copy()
926 def insert(self, table, new_uuid=None):
927 """Inserts and returns a new row in 'table', which must be one of the
928 ovs.db.schema.TableSchema objects in the Idl's 'tables' dict.
930 The new row is assigned a provisional UUID. If 'uuid' is None then one
931 is randomly generated; otherwise 'uuid' should specify a randomly
932 generated uuid.UUID not otherwise in use. ovsdb-server will assign a
933 different UUID when 'txn' is committed, but the IDL will replace any
934 uses of the provisional UUID in the data to be to be committed by the
935 UUID assigned by ovsdb-server."""
936 assert self._status == Transaction.UNCOMMITTED
938 new_uuid = uuid.uuid4()
939 row = Row(self.idl, table, new_uuid, None)
940 table.rows[row.uuid] = row
941 self._txn_rows[row.uuid] = row
944 def _process_reply(self, msg):
945 if msg.type == ovs.jsonrpc.Message.T_ERROR:
946 self._status = Transaction.ERROR
947 elif type(msg.result) not in (list, tuple):
949 vlog.warn('reply to "transact" is not JSON array')
958 # This isn't an error in itself but indicates that some
959 # prior operation failed, so make sure that we know about
962 elif type(op) == dict:
963 error = op.get("error")
964 if error is not None:
965 if error == "timed out":
967 elif error == "not owner":
969 elif error == "aborted":
973 self.__set_error_json(op)
976 self.__set_error_json(op)
978 vlog.warn("operation reply is not JSON null or object")
980 if not soft_errors and not hard_errors and not lock_errors:
981 if self._inc_table and not self.__process_inc_reply(ops):
984 for insert in self._inserted_rows.itervalues():
985 if not self.__process_insert_reply(insert, ops):
989 self._status = Transaction.ERROR
991 self._status = Transaction.NOT_LOCKED
993 self._status = Transaction.TRY_AGAIN
995 self._status = Transaction.SUCCESS
998 def __check_json_type(json, types, name):
1001 vlog.warn("%s is missing" % name)
1003 elif type(json) not in types:
1005 vlog.warn("%s has unexpected type %s" % (name, type(json)))
1010 def __process_inc_reply(self, ops):
1011 if self._inc_index + 2 > len(ops):
1013 vlog.warn("reply does not contain enough operations for "
1014 "increment (has %d, needs %d)" %
1015 (len(ops), self._inc_index + 2))
1017 # We know that this is a JSON object because the loop in
1018 # __process_reply() already checked.
1019 mutate = ops[self._inc_index]
1020 count = mutate.get("count")
1021 if not Transaction.__check_json_type(count, (int, long),
1022 '"mutate" reply "count"'):
1026 vlog.warn('"mutate" reply "count" is %d instead of 1' % count)
1029 select = ops[self._inc_index + 1]
1030 rows = select.get("rows")
1031 if not Transaction.__check_json_type(rows, (list, tuple),
1032 '"select" reply "rows"'):
1036 vlog.warn('"select" reply "rows" has %d elements '
1037 'instead of 1' % len(rows))
1040 if not Transaction.__check_json_type(row, (dict,),
1041 '"select" reply row'):
1043 column = row.get(self._inc_column)
1044 if not Transaction.__check_json_type(column, (int, long),
1045 '"select" reply inc column'):
1047 self._inc_new_value = column
1050 def __process_insert_reply(self, insert, ops):
1051 if insert.op_index >= len(ops):
1053 vlog.warn("reply does not contain enough operations "
1054 "for insert (has %d, needs %d)"
1055 % (len(ops), insert.op_index))
1058 # We know that this is a JSON object because the loop in
1059 # __process_reply() already checked.
1060 reply = ops[insert.op_index]
1061 json_uuid = reply.get("uuid")
1062 if not Transaction.__check_json_type(json_uuid, (tuple, list),
1063 '"insert" reply "uuid"'):
1067 uuid_ = ovs.ovsuuid.from_json(json_uuid)
1070 vlog.warn('"insert" reply "uuid" is not a JSON UUID')
1077 class SchemaHelper(object):
1078 """IDL Schema helper.
1080 This class encapsulates the logic required to generate schemas suitable
1081 for creating 'ovs.db.idl.Idl' objects. Clients should register columns
1082 they are interested in using register_columns(). When finished, the
1083 get_idl_schema() function may be called.
1085 The location on disk of the schema used may be found in the
1086 'schema_location' variable."""
1088 def __init__(self, location=None):
1089 """Creates a new Schema object."""
1091 if location is None:
1092 location = "%s/vswitch.ovsschema" % ovs.dirs.PKGDATADIR
1094 self.schema_location = location
1098 def register_columns(self, table, columns):
1099 """Registers interest in the given 'columns' of 'table'. Future calls
1100 to get_idl_schema() will include 'table':column for each column in
1101 'columns'. This function automatically avoids adding duplicate entries
1104 'table' must be a string.
1105 'columns' must be a list of strings.
1108 assert type(table) is str
1109 assert type(columns) is list
1111 columns = set(columns) | self._tables.get(table, set())
1112 self._tables[table] = columns
1114 def register_all(self):
1115 """Registers interest in every column of every table."""
1118 def get_idl_schema(self):
1119 """Gets a schema appropriate for the creation of an 'ovs.db.id.IDL'
1120 object based on columns registered using the register_columns()
1123 schema = ovs.db.schema.DbSchema.from_json(
1124 ovs.json.from_file(self.schema_location))
1128 for table, columns in self._tables.iteritems():
1129 schema_tables[table] = (
1130 self._keep_table_columns(schema, table, columns))
1132 schema.tables = schema_tables
1135 def _keep_table_columns(self, schema, table_name, columns):
1136 assert table_name in schema.tables
1137 table = schema.tables[table_name]
1140 for column_name in columns:
1141 assert type(column_name) is str
1142 assert column_name in table.columns
1144 new_columns[column_name] = table.columns[column_name]
1146 table.columns = new_columns