1 # Copyright (c) 2009, 2010, 2011 Nicira Networks
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at:
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
20 from ovs.db import error
25 vlog = ovs.vlog.Vlog("idl")
27 __pychecker__ = 'no-classattr no-objattrs'
31 """Open vSwitch Database Interface Definition Language (OVSDB IDL).
33 The OVSDB IDL maintains an in-memory replica of a database. It issues RPC
34 requests to an OVSDB database server and parses the responses, converting
35 raw JSON into data structures that are easier for clients to digest.
37 The IDL also assists with issuing database transactions. The client
38 creates a transaction, manipulates the IDL data structures, and commits or
39 aborts the transaction. The IDL then composes and issues the necessary
40 JSON-RPC requests and reports to the client whether the transaction
41 completed successfully.
43 The client is allowed to access the following attributes directly, in a
46 - 'tables': This is the 'tables' map in the ovs.db.schema.DbSchema provided
47 to the Idl constructor. Each ovs.db.schema.TableSchema in the map is
48 annotated with a new attribute 'rows', which is a dict from a uuid.UUID
51 The client may directly read and write the Row objects referenced by the
52 'rows' map values. Refer to Row for more details.
54 - 'change_seqno': A number that represents the IDL's state. When the IDL
55 is updated (by Idl.run()), its value changes.
57 - 'lock_name': The name of the lock configured with Idl.set_lock(), or None
58 if no lock is configured.
60 - 'has_lock': True, if the IDL is configured to obtain a lock and owns that
61 lock, and False otherwise.
63 Locking and unlocking happens asynchronously from the database client's
64 point of view, so the information is only useful for optimization
65 (e.g. if the client doesn't have the lock then there's no point in trying
66 to write to the database).
68 - 'is_lock_contended': True, if the IDL is configured to obtain a lock but
69 the database server has indicated that some other client already owns the
70 requested lock, and False otherwise.
72 - 'txn': The ovs.db.idl.Transaction object for the database transaction
73 currently being constructed, if there is one, or None otherwise.
76 def __init__(self, remote, schema):
77 """Creates and returns a connection to the database named 'db_name' on
78 'remote', which should be in a form acceptable to
79 ovs.jsonrpc.session.open(). The connection will maintain an in-memory
80 replica of the remote database.
82 'schema' should be the schema for the remote database. The caller may
83 have cut it down by removing tables or columns that are not of
84 interest. The IDL will only replicate the tables and columns that
85 remain. The caller may also add a attribute named 'alert' to selected
86 remaining columns, setting its value to False; if so, then changes to
87 those columns will not be considered changes to the database for the
88 purpose of the return value of Idl.run() and Idl.change_seqno. This is
89 useful for columns that the IDL's client will write but not read.
91 The IDL uses and modifies 'schema' directly."""
93 self.tables = schema.tables
95 self._session = ovs.jsonrpc.Session.open(remote)
96 self._monitor_request_id = None
97 self._last_seqno = None
101 self.lock_name = None # Name of lock we need, None if none.
102 self.has_lock = False # Has db server said we have the lock?
103 self.is_lock_contended = False # Has db server said we can't get lock?
104 self._lock_request_id = None # JSON-RPC ID of in-flight lock request.
106 # Transaction support.
108 self._outstanding_txns = {}
110 for table in schema.tables.itervalues():
111 for column in table.columns.itervalues():
112 if not hasattr(column, 'alert'):
114 table.need_table = False
119 """Closes the connection to the database. The IDL will no longer
121 self._session.close()
124 """Processes a batch of messages from the database server. Returns
125 True if the database as seen through the IDL changed, False if it did
126 not change. The initial fetch of the entire contents of the remote
127 database is considered to be one kind of change. If the IDL has been
128 configured to acquire a database lock (with Idl.set_lock()), then
129 successfully acquiring the lock is also considered to be a change.
131 This function can return occasional false positives, that is, report
132 that the database changed even though it didn't. This happens if the
133 connection to the database drops and reconnects, which causes the
134 database contents to be reloaded even if they didn't change. (It could
135 also happen if the database server sends out a "change" that reflects
136 what we already thought was in the database, but the database server is
137 not supposed to do that.)
139 As an alternative to checking the return value, the client may check
140 for changes in self.change_seqno."""
142 initial_change_seqno = self.change_seqno
147 if not self._session.is_connected():
150 seqno = self._session.get_seqno()
151 if seqno != self._last_seqno:
152 self._last_seqno = seqno
153 self.__txn_abort_all()
154 self.__send_monitor_request()
156 self.__send_lock_request()
159 msg = self._session.recv()
162 if (msg.type == ovs.jsonrpc.Message.T_NOTIFY
163 and msg.method == "update"
164 and len(msg.params) == 2
165 and msg.params[0] == None):
166 # Database contents changed.
167 self.__parse_update(msg.params[1])
168 elif (msg.type == ovs.jsonrpc.Message.T_REPLY
169 and self._monitor_request_id is not None
170 and self._monitor_request_id == msg.id):
171 # Reply to our "monitor" request.
173 self.change_seqno += 1
174 self._monitor_request_id = None
176 self.__parse_update(msg.result)
177 except error.Error, e:
178 vlog.err("%s: parse error in received schema: %s"
179 % (self._session.get_name(), e))
181 elif (msg.type == ovs.jsonrpc.Message.T_REPLY
182 and self._lock_request_id is not None
183 and self._lock_request_id == msg.id):
184 # Reply to our "lock" request.
185 self.__parse_lock_reply(msg.result)
186 elif (msg.type == ovs.jsonrpc.Message.T_NOTIFY
187 and msg.method == "locked"):
189 self.__parse_lock_notify(msg.params, True)
190 elif (msg.type == ovs.jsonrpc.Message.T_NOTIFY
191 and msg.method == "stolen"):
192 # Someone else stole our lock.
193 self.__parse_lock_notify(msg.params, False)
194 elif msg.type == ovs.jsonrpc.Message.T_NOTIFY and msg.id == "echo":
195 # Reply to our echo request. Ignore it.
197 elif (msg.type in (ovs.jsonrpc.Message.T_ERROR,
198 ovs.jsonrpc.Message.T_REPLY)
199 and self.__txn_process_reply(msg)):
200 # __txn_process_reply() did everything needed.
203 # This can happen if a transaction is destroyed before we
204 # receive the reply, so keep the log level low.
205 vlog.dbg("%s: received unexpected %s message"
206 % (self._session.get_name(),
207 ovs.jsonrpc.Message.type_to_string(msg.type)))
209 return initial_change_seqno != self.change_seqno
211 def wait(self, poller):
212 """Arranges for poller.block() to wake up when self.run() has something
213 to do or when activity occurs on a transaction on 'self'."""
214 self._session.wait(poller)
215 self._session.recv_wait(poller)
217 def has_ever_connected(self):
218 """Returns True, if the IDL successfully connected to the remote
219 database and retrieved its contents (even if the connection
220 subsequently dropped and is in the process of reconnecting). If so,
221 then the IDL contains an atomic snapshot of the database's contents
222 (but it might be arbitrarily old if the connection dropped).
224 Returns False if the IDL has never connected or retrieved the
225 database's contents. If so, the IDL is empty."""
226 return self.change_seqno != 0
228 def force_reconnect(self):
229 """Forces the IDL to drop its connection to the database and reconnect.
230 In the meantime, the contents of the IDL will not change."""
231 self._session.force_reconnect()
233 def set_lock(self, lock_name):
234 """If 'lock_name' is not None, configures the IDL to obtain the named
235 lock from the database server and to avoid modifying the database when
236 the lock cannot be acquired (that is, when another client has the same
239 If 'lock_name' is None, drops the locking requirement and releases the
242 assert not self._outstanding_txns
244 if self.lock_name and (not lock_name or lock_name != self.lock_name):
245 # Release previous lock.
246 self.__send_unlock_request()
247 self.lock_name = None
248 self.is_lock_contended = False
250 if lock_name and not self.lock_name:
252 self.lock_name = lock_name
253 self.__send_lock_request()
258 for table in self.tables.itervalues():
264 self.change_seqno += 1
266 def __update_has_lock(self, new_has_lock):
267 if new_has_lock and not self.has_lock:
268 if self._monitor_request_id is None:
269 self.change_seqno += 1
271 # We're waiting for a monitor reply, so don't signal that the
272 # database changed. The monitor reply will increment
273 # change_seqno anyhow.
275 self.is_lock_contended = False
276 self.has_lock = new_has_lock
278 def __do_send_lock_request(self, method):
279 self.__update_has_lock(False)
280 self._lock_request_id = None
281 if self._session.is_connected():
282 msg = ovs.jsonrpc.Message.create_request(method, [self.lock_name])
284 self._session.send(msg)
289 def __send_lock_request(self):
290 self._lock_request_id = self.__do_send_lock_request("lock")
292 def __send_unlock_request(self):
293 self.__do_send_lock_request("unlock")
295 def __parse_lock_reply(self, result):
296 self._lock_request_id = None
297 got_lock = type(result) == dict and result.get("locked") is True
298 self.__update_has_lock(got_lock)
300 self.is_lock_contended = True
302 def __parse_lock_notify(self, params, new_has_lock):
303 if (self.lock_name is not None
304 and type(params) in (list, tuple)
306 and params[0] == self.lock_name):
307 self.__update_has_lock(self, new_has_lock)
309 self.is_lock_contended = True
311 def __send_monitor_request(self):
312 monitor_requests = {}
313 for table in self.tables.itervalues():
314 monitor_requests[table.name] = {"columns": table.columns.keys()}
315 msg = ovs.jsonrpc.Message.create_request(
316 "monitor", [self._db.name, None, monitor_requests])
317 self._monitor_request_id = msg.id
318 self._session.send(msg)
320 def __parse_update(self, update):
322 self.__do_parse_update(update)
323 except error.Error, e:
324 vlog.err("%s: error parsing update: %s"
325 % (self._session.get_name(), e))
327 def __do_parse_update(self, table_updates):
328 if type(table_updates) != dict:
329 raise error.Error("<table-updates> is not an object",
332 for table_name, table_update in table_updates.iteritems():
333 table = self.tables.get(table_name)
335 raise error.Error('<table-updates> includes unknown '
336 'table "%s"' % table_name)
338 if type(table_update) != dict:
339 raise error.Error('<table-update> for table "%s" is not '
340 'an object' % table_name, table_update)
342 for uuid_string, row_update in table_update.iteritems():
343 if not ovs.ovsuuid.is_valid_string(uuid_string):
344 raise error.Error('<table-update> for table "%s" '
345 'contains bad UUID "%s" as member '
346 'name' % (table_name, uuid_string),
348 uuid = ovs.ovsuuid.from_string(uuid_string)
350 if type(row_update) != dict:
351 raise error.Error('<table-update> for table "%s" '
352 'contains <row-update> for %s that '
354 % (table_name, uuid_string))
356 parser = ovs.db.parser.Parser(row_update, "row-update")
357 old = parser.get_optional("old", [dict])
358 new = parser.get_optional("new", [dict])
361 if not old and not new:
362 raise error.Error('<row-update> missing "old" and '
363 '"new" members', row_update)
365 if self.__process_update(table, uuid, old, new):
366 self.change_seqno += 1
368 def __process_update(self, table, uuid, old, new):
369 """Returns True if a column changed, False otherwise."""
370 row = table.rows.get(uuid)
379 vlog.warn("cannot delete missing row %s from table %s"
380 % (uuid, table.name))
384 row = self.__create_row(table, uuid)
388 vlog.warn("cannot add existing row %s to table %s"
389 % (uuid, table.name))
390 if self.__row_update(table, row, new):
394 row = self.__create_row(table, uuid)
397 vlog.warn("cannot modify missing row %s in table %s"
398 % (uuid, table.name))
399 if self.__row_update(table, row, new):
403 def __row_update(self, table, row, row_json):
405 for column_name, datum_json in row_json.iteritems():
406 column = table.columns.get(column_name)
409 vlog.warn("unknown column %s updating table %s"
410 % (column_name, table.name))
414 datum = ovs.db.data.Datum.from_json(column.type, datum_json)
415 except error.Error, e:
417 vlog.warn("error parsing column %s in table %s: %s"
418 % (column_name, table.name, e))
421 if datum != row._data[column_name]:
422 row._data[column_name] = datum
426 # Didn't really change but the OVSDB monitor protocol always
427 # includes every value in a row.
431 def __create_row(self, table, uuid):
433 for column in table.columns.itervalues():
434 data[column.name] = ovs.db.data.Datum.default(column.type)
435 row = table.rows[uuid] = Row(self, table, uuid, data)
439 self._session.force_reconnect()
441 def __txn_abort_all(self):
442 while self._outstanding_txns:
443 txn = self._outstanding_txns.popitem()[1]
444 txn._status = Transaction.TRY_AGAIN
446 def __txn_process_reply(self, msg):
447 txn = self._outstanding_txns.pop(msg.id, None)
449 txn._process_reply(msg)
452 def _uuid_to_row(atom, base):
454 return base.ref_table.rows.get(atom)
459 def _row_to_uuid(value):
460 if type(value) == Row:
467 """A row within an IDL.
469 The client may access the following attributes directly:
471 - 'uuid': a uuid.UUID object whose value is the row's database UUID.
473 - An attribute for each column in the Row's table, named for the column,
474 whose values are as returned by Datum.to_python() for the column's type.
476 If some error occurs (e.g. the database server's idea of the column is
477 different from the IDL's idea), then the attribute values is the
478 "default" value return by Datum.default() for the column's type. (It is
479 important to know this because the default value may violate constraints
480 for the column's type, e.g. the default integer value is 0 even if column
481 contraints require the column's value to be positive.)
483 When a transaction is active, column attributes may also be assigned new
484 values. Committing the transaction will then cause the new value to be
485 stored into the database.
487 *NOTE*: In the current implementation, the value of a column is a *copy*
488 of the value in the database. This means that modifying its value
489 directly will have no useful effect. For example, the following:
490 row.mycolumn["a"] = "b" # don't do this
491 will not change anything in the database, even after commit. To modify
492 the column, instead assign the modified column value back to the column:
497 def __init__(self, idl, table, uuid, data):
498 # All of the explicit references to self.__dict__ below are required
499 # to set real attributes with invoking self.__getattr__().
500 self.__dict__["uuid"] = uuid
502 self.__dict__["_idl"] = idl
503 self.__dict__["_table"] = table
505 # _data is the committed data. It takes the following values:
507 # - A dictionary that maps every column name to a Datum, if the row
508 # exists in the committed form of the database.
510 # - None, if this row is newly inserted within the active transaction
511 # and thus has no committed form.
512 self.__dict__["_data"] = data
514 # _changes describes changes to this row within the active transaction.
515 # It takes the following values:
517 # - {}, the empty dictionary, if no transaction is active or if the
518 # row has yet not been changed within this transaction.
520 # - A dictionary that maps a column name to its new Datum, if an
521 # active transaction changes those columns' values.
523 # - A dictionary that maps every column name to a Datum, if the row
524 # is newly inserted within the active transaction.
526 # - None, if this transaction deletes this row.
527 self.__dict__["_changes"] = {}
529 # A dictionary whose keys are the names of columns that must be
530 # verified as prerequisites when the transaction commits. The values
531 # in the dictionary are all None.
532 self.__dict__["_prereqs"] = {}
534 def __getattr__(self, column_name):
535 assert self._changes is not None
537 datum = self._changes.get(column_name)
539 datum = self._data[column_name]
541 return datum.to_python(_uuid_to_row)
543 def __setattr__(self, column_name, value):
544 assert self._changes is not None
547 column = self._table.columns[column_name]
549 datum = ovs.db.data.Datum.from_python(column.type, value,
551 except error.Error, e:
553 vlog.err("attempting to write bad value to column %s (%s)"
556 self._idl.txn._write(self, column, datum)
558 def verify(self, column_name):
559 """Causes the original contents of column 'column_name' in this row to
560 be verified as a prerequisite to completing the transaction. That is,
561 if 'column_name' changed in this row (or if this row was deleted)
562 between the time that the IDL originally read its contents and the time
563 that the transaction commits, then the transaction aborts and
564 Transaction.commit() returns Transaction.TRY_AGAIN.
566 The intention is that, to ensure that no transaction commits based on
567 dirty reads, an application should call Row.verify() on each data item
568 read as part of a read-modify-write operation.
570 In some cases Row.verify() reduces to a no-op, because the current
571 value of the column is already known:
573 - If this row is a row created by the current transaction (returned
574 by Transaction.insert()).
576 - If the column has already been modified within the current
579 Because of the latter property, always call Row.verify() *before*
580 modifying the column, for a given read-modify-write.
582 A transaction must be in progress."""
584 assert self._changes is not None
585 if not self._data or column_name in self._changes:
588 self._prereqs[column_name] = None
591 """Deletes this row from its table.
593 A transaction must be in progress."""
595 assert self._changes is not None
596 if self._data is None:
597 del self._idl.txn._txn_rows[self.uuid]
598 self.__dict__["_changes"] = None
599 del self._table.rows[self.uuid]
602 def _uuid_name_from_uuid(uuid):
603 return "row%s" % str(uuid).replace("-", "_")
606 def _where_uuid_equals(uuid):
607 return [["_uuid", "==", ["uuid", str(uuid)]]]
610 class _InsertedRow(object):
611 def __init__(self, op_index):
612 self.op_index = op_index
616 class Transaction(object):
617 # Status values that Transaction.commit() can return.
618 UNCOMMITTED = "uncommitted" # Not yet committed or aborted.
619 UNCHANGED = "unchanged" # Transaction didn't include any changes.
620 INCOMPLETE = "incomplete" # Commit in progress, please wait.
621 ABORTED = "aborted" # ovsdb_idl_txn_abort() called.
622 SUCCESS = "success" # Commit successful.
623 TRY_AGAIN = "try again" # Commit failed because a "verify" operation
624 # reported an inconsistency, due to a network
625 # problem, or other transient failure.
626 NOT_LOCKED = "not locked" # Server hasn't given us the lock yet.
627 ERROR = "error" # Commit failed due to a hard error.
630 def status_to_string(status):
631 """Converts one of the status values that Transaction.commit() can
632 return into a human-readable string.
634 (The status values are in fact such strings already, so
635 there's nothing to do.)"""
638 def __init__(self, idl):
639 """Starts a new transaction on 'idl' (an instance of ovs.db.idl.Idl).
640 A given Idl may only have a single active transaction at a time.
642 A Transaction may modify the contents of a database by assigning new
643 values to columns (attributes of Row), deleting rows (with
644 Row.delete()), or inserting rows (with Transaction.insert()). It may
645 also check that columns in the database have not changed with
648 When a transaction is complete (which must be before the next call to
649 Idl.run()), call Transaction.commit() or Transaction.abort()."""
650 assert idl.txn is None
653 self._request_id = None
657 self._status = Transaction.UNCOMMITTED
661 self._inc_table = None
662 self._inc_column = None
663 self._inc_where = None
665 self._inserted_rows = {} # Map from UUID to _InsertedRow
667 def add_comment(self, comment):
668 """Appens 'comment' to the comments that will be passed to the OVSDB
669 server when this transaction is committed. (The comment will be
670 committed to the OVSDB log, which "ovsdb-tool show-log" can print in a
671 relatively human-readable form.)"""
672 self._comments.append(comment)
674 def increment(self, table, column, where):
675 assert not self._inc_table
676 self._inc_table = table
677 self._inc_column = column
678 self._inc_where = where
680 def wait(self, poller):
681 if self._status not in (Transaction.UNCOMMITTED,
682 Transaction.INCOMPLETE):
683 poller.immediate_wake()
685 def _substitute_uuids(self, json):
686 if type(json) in (list, tuple):
688 and json[0] == 'uuid'
689 and ovs.ovsuuid.is_valid_string(json[1])):
690 uuid = ovs.ovsuuid.from_string(json[1])
691 row = self._txn_rows.get(uuid, None)
692 if row and row._data is None:
693 return ["named-uuid", _uuid_name_from_uuid(uuid)]
696 def __disassemble(self):
699 for row in self._txn_rows.itervalues():
700 if row._changes is None:
701 row._table.rows[row.uuid] = row
702 elif row._data is None:
703 del row._table.rows[row.uuid]
704 row.__dict__["_changes"] = {}
705 row.__dict__["_prereqs"] = {}
709 """Attempts to commit this transaction and returns the status of the
710 commit operation, one of the constants declared as class attributes.
711 If the return value is Transaction.INCOMPLETE, then the transaction is
712 not yet complete and the caller should try calling again later, after
713 calling Idl.run() to run the Idl.
715 Committing a transaction rolls back all of the changes that it made to
716 the Idl's copy of the database. If the transaction commits
717 successfully, then the database server will send an update and, thus,
718 the Idl will be updated with the committed changes."""
719 # The status can only change if we're the active transaction.
720 # (Otherwise, our status will change only in Idl.run().)
721 if self != self.idl.txn:
724 # If we need a lock but don't have it, give up quickly.
725 if self.idl.lock_name and not self.idl.has_lock():
726 self._status = Transaction.NOT_LOCKED
730 operations = [self.idl._db.name]
732 # Assert that we have the required lock (avoiding a race).
733 if self.idl.lock_name:
734 operations.append({"op": "assert",
735 "lock": self.idl.lock_name})
737 # Add prerequisites and declarations of new rows.
738 for row in self._txn_rows.itervalues():
742 for column_name in row._prereqs:
743 columns.append(column_name)
744 rows[column_name] = row._data[column_name].to_json()
745 operations.append({"op": "wait",
746 "table": row._table.name,
748 "where": _where_uuid_equals(row.uuid),
755 for row in self._txn_rows.itervalues():
756 if row._changes is None:
757 if row._table.is_root:
758 operations.append({"op": "delete",
759 "table": row._table.name,
760 "where": _where_uuid_equals(row.uuid)})
763 # Let ovsdb-server decide whether to really delete it.
766 op = {"table": row._table.name}
767 if row._data is None:
769 op["uuid-name"] = _uuid_name_from_uuid(row.uuid)
772 op_index = len(operations) - 1
773 self._inserted_rows[row.uuid] = _InsertedRow(op_index)
776 op["where"] = _where_uuid_equals(row.uuid)
781 for column_name, datum in row._changes.iteritems():
782 if row._data is not None or not datum.is_default():
783 row_json[column_name] = (
784 self._substitute_uuids(datum.to_json()))
786 # If anything really changed, consider it an update.
787 # We can't suppress not-really-changed values earlier
788 # or transactions would become nonatomic (see the big
789 # comment inside Transaction._write()).
790 if (not any_updates and row._data is not None and
791 row._data[column_name] != datum):
794 if row._data is None or row_json:
795 operations.append(op)
798 if self._inc_table and any_updates:
799 self._inc_index = len(operations) - 1
801 operations.append({"op": "mutate",
802 "table": self._inc_table,
803 "where": self._substitute_uuids(
805 "mutations": [[self._inc_column, "+=", 1]]})
806 operations.append({"op": "select",
807 "table": self._inc_table,
808 "where": self._substitute_uuids(
810 "columns": [self._inc_column]})
814 operations.append({"op": "comment",
815 "comment": "\n".join(self._comments)})
819 operations.append({"op": "abort"})
822 self._status = Transaction.UNCHANGED
824 msg = ovs.jsonrpc.Message.create_request("transact", operations)
825 self._request_id = msg.id
826 if not self.idl._session.send(msg):
827 self.idl._outstanding_txns[self._request_id] = self
828 self._status = Transaction.INCOMPLETE
830 self._status = Transaction.TRY_AGAIN
835 def commit_block(self):
837 status = self.commit()
838 if status != Transaction.INCOMPLETE:
843 poller = ovs.poller.Poller()
844 self.idl.wait(poller)
848 def get_increment_new_value(self):
849 assert self._status == Transaction.SUCCESS
850 return self._inc_new_value
853 """Aborts this transaction. If Transaction.commit() has already been
854 called then the transaction might get committed anyhow."""
856 if self._status in (Transaction.UNCOMMITTED,
857 Transaction.INCOMPLETE):
858 self._status = Transaction.ABORTED
861 """Returns a string representing this transaction's current status,
862 suitable for use in log messages."""
863 if self._status != Transaction.ERROR:
864 return Transaction.status_to_string(self._status)
868 return "no error details available"
870 def __set_error_json(self, json):
871 if self._error is None:
872 self._error = ovs.json.to_string(json)
874 def get_insert_uuid(self, uuid):
875 """Finds and returns the permanent UUID that the database assigned to a
876 newly inserted row, given the UUID that Transaction.insert() assigned
879 Returns None if 'uuid' is not a UUID assigned by Transaction.insert()
880 or if it was assigned by that function and then deleted by Row.delete()
881 within the same transaction. (Rows that are inserted and then deleted
882 within a single transaction are never sent to the database server, so
883 it never assigns them a permanent UUID.)
885 This transaction must have completed successfully."""
886 assert self._status in (Transaction.SUCCESS,
887 Transaction.UNCHANGED)
888 inserted_row = self._inserted_rows.get(uuid)
890 return inserted_row.real
893 def _write(self, row, column, datum):
894 assert row._changes is not None
898 # If this is a write-only column and the datum being written is the
899 # same as the one already there, just skip the update entirely. This
900 # is worth optimizing because we have a lot of columns that get
901 # periodically refreshed into the database but don't actually change
904 # We don't do this for read/write columns because that would break
905 # atomicity of transactions--some other client might have written a
906 # different value in that column since we read it. (But if a whole
907 # transaction only does writes of existing values, without making any
908 # real changes, we will drop the whole transaction later in
909 # ovsdb_idl_txn_commit().)
910 if not column.alert and row._data.get(column.name) == datum:
911 new_value = row._changes.get(column.name)
912 if new_value is None or new_value == datum:
915 txn._txn_rows[row.uuid] = row
916 row._changes[column.name] = datum.copy()
918 def insert(self, table, new_uuid=None):
919 """Inserts and returns a new row in 'table', which must be one of the
920 ovs.db.schema.TableSchema objects in the Idl's 'tables' dict.
922 The new row is assigned a provisional UUID. If 'uuid' is None then one
923 is randomly generated; otherwise 'uuid' should specify a randomly
924 generated uuid.UUID not otherwise in use. ovsdb-server will assign a
925 different UUID when 'txn' is committed, but the IDL will replace any
926 uses of the provisional UUID in the data to be to be committed by the
927 UUID assigned by ovsdb-server."""
928 assert self._status == Transaction.UNCOMMITTED
930 new_uuid = uuid.uuid4()
931 row = Row(self.idl, table, new_uuid, None)
932 table.rows[row.uuid] = row
933 self._txn_rows[row.uuid] = row
936 def _process_reply(self, msg):
937 if msg.type == ovs.jsonrpc.Message.T_ERROR:
938 self._status = Transaction.ERROR
939 elif type(msg.result) not in (list, tuple):
941 vlog.warn('reply to "transact" is not JSON array')
950 # This isn't an error in itself but indicates that some
951 # prior operation failed, so make sure that we know about
954 elif type(op) == dict:
955 error = op.get("error")
956 if error is not None:
957 if error == "timed out":
959 elif error == "not owner":
961 elif error == "aborted":
965 self.__set_error_json(op)
968 self.__set_error_json(op)
970 vlog.warn("operation reply is not JSON null or object")
972 if not soft_errors and not hard_errors and not lock_errors:
973 if self._inc_table and not self.__process_inc_reply(ops):
976 for insert in self._inserted_rows.itervalues():
977 if not self.__process_insert_reply(insert, ops):
981 self._status = Transaction.ERROR
983 self._status = Transaction.NOT_LOCKED
985 self._status = Transaction.TRY_AGAIN
987 self._status = Transaction.SUCCESS
990 def __check_json_type(json, types, name):
993 vlog.warn("%s is missing" % name)
995 elif type(json) not in types:
997 vlog.warn("%s has unexpected type %s" % (name, type(json)))
1002 def __process_inc_reply(self, ops):
1003 if self._inc_index + 2 > len(ops):
1005 vlog.warn("reply does not contain enough operations for "
1006 "increment (has %d, needs %d)" %
1007 (len(ops), self._inc_index + 2))
1009 # We know that this is a JSON object because the loop in
1010 # __process_reply() already checked.
1011 mutate = ops[self._inc_index]
1012 count = mutate.get("count")
1013 if not Transaction.__check_json_type(count, (int, long),
1014 '"mutate" reply "count"'):
1018 vlog.warn('"mutate" reply "count" is %d instead of 1' % count)
1021 select = ops[self._inc_index + 1]
1022 rows = select.get("rows")
1023 if not Transaction.__check_json_type(rows, (list, tuple),
1024 '"select" reply "rows"'):
1028 vlog.warn('"select" reply "rows" has %d elements '
1029 'instead of 1' % len(rows))
1032 if not Transaction.__check_json_type(row, (dict,),
1033 '"select" reply row'):
1035 column = row.get(self._inc_column)
1036 if not Transaction.__check_json_type(column, (int, long),
1037 '"select" reply inc column'):
1039 self._inc_new_value = column
1042 def __process_insert_reply(self, insert, ops):
1043 if insert.op_index >= len(ops):
1045 vlog.warn("reply does not contain enough operations "
1046 "for insert (has %d, needs %d)"
1047 % (len(ops), insert.op_index))
1050 # We know that this is a JSON object because the loop in
1051 # __process_reply() already checked.
1052 reply = ops[insert.op_index]
1053 json_uuid = reply.get("uuid")
1054 if not Transaction.__check_json_type(json_uuid, (tuple, list),
1055 '"insert" reply "uuid"'):
1059 uuid_ = ovs.ovsuuid.from_json(json_uuid)
1062 vlog.warn('"insert" reply "uuid" is not a JSON UUID')