1 # Copyright (c) 2009, 2010, 2011 Nicira Networks
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at:
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
21 from ovs.db import error
25 __pychecker__ = 'no-classattr no-objattrs'
29 """Open vSwitch Database Interface Definition Language (OVSDB IDL).
31 The OVSDB IDL maintains an in-memory replica of a database. It issues RPC
32 requests to an OVSDB database server and parses the responses, converting
33 raw JSON into data structures that are easier for clients to digest.
35 The IDL also assists with issuing database transactions. The client
36 creates a transaction, manipulates the IDL data structures, and commits or
37 aborts the transaction. The IDL then composes and issues the necessary
38 JSON-RPC requests and reports to the client whether the transaction
39 completed successfully.
41 The client is allowed to access the following attributes directly, in a
44 - 'tables': This is the 'tables' map in the ovs.db.schema.DbSchema provided
45 to the Idl constructor. Each ovs.db.schema.TableSchema in the map is
46 annotated with a new attribute 'rows', which is a dict from a uuid.UUID
49 The client may directly read and write the Row objects referenced by the
50 'rows' map values. Refer to Row for more details.
52 - 'change_seqno': A number that represents the IDL's state. When the IDL
53 is updated (by Idl.run()), its value changes.
55 - 'lock_name': The name of the lock configured with Idl.set_lock(), or None
56 if no lock is configured.
58 - 'has_lock': True, if the IDL is configured to obtain a lock and owns that
59 lock, and False otherwise.
61 Locking and unlocking happens asynchronously from the database client's
62 point of view, so the information is only useful for optimization
63 (e.g. if the client doesn't have the lock then there's no point in trying
64 to write to the database).
66 - 'is_lock_contended': True, if the IDL is configured to obtain a lock but
67 the database server has indicated that some other client already owns the
68 requested lock, and False otherwise.
70 - 'txn': The ovs.db.idl.Transaction object for the database transaction
71 currently being constructed, if there is one, or None otherwise.
74 def __init__(self, remote, schema):
75 """Creates and returns a connection to the database named 'db_name' on
76 'remote', which should be in a form acceptable to
77 ovs.jsonrpc.session.open(). The connection will maintain an in-memory
78 replica of the remote database.
80 'schema' should be the schema for the remote database. The caller may
81 have cut it down by removing tables or columns that are not of
82 interest. The IDL will only replicate the tables and columns that
83 remain. The caller may also add a attribute named 'alert' to selected
84 remaining columns, setting its value to False; if so, then changes to
85 those columns will not be considered changes to the database for the
86 purpose of the return value of Idl.run() and Idl.change_seqno. This is
87 useful for columns that the IDL's client will write but not read.
89 The IDL uses and modifies 'schema' directly."""
91 self.tables = schema.tables
93 self._session = ovs.jsonrpc.Session.open(remote)
94 self._monitor_request_id = None
95 self._last_seqno = None
99 self.lock_name = None # Name of lock we need, None if none.
100 self.has_lock = False # Has db server said we have the lock?
101 self.is_lock_contended = False # Has db server said we can't get lock?
102 self._lock_request_id = None # JSON-RPC ID of in-flight lock request.
104 # Transaction support.
106 self._outstanding_txns = {}
108 for table in schema.tables.itervalues():
109 for column in table.columns.itervalues():
110 if not hasattr(column, 'alert'):
112 table.need_table = False
117 """Closes the connection to the database. The IDL will no longer
119 self._session.close()
122 """Processes a batch of messages from the database server. Returns
123 True if the database as seen through the IDL changed, False if it did
124 not change. The initial fetch of the entire contents of the remote
125 database is considered to be one kind of change. If the IDL has been
126 configured to acquire a database lock (with Idl.set_lock()), then
127 successfully acquiring the lock is also considered to be a change.
129 This function can return occasional false positives, that is, report
130 that the database changed even though it didn't. This happens if the
131 connection to the database drops and reconnects, which causes the
132 database contents to be reloaded even if they didn't change. (It could
133 also happen if the database server sends out a "change" that reflects
134 what we already thought was in the database, but the database server is
135 not supposed to do that.)
137 As an alternative to checking the return value, the client may check
138 for changes in self.change_seqno."""
140 initial_change_seqno = self.change_seqno
145 if not self._session.is_connected():
148 seqno = self._session.get_seqno()
149 if seqno != self._last_seqno:
150 self._last_seqno = seqno
151 self.__txn_abort_all()
152 self.__send_monitor_request()
154 self.__send_lock_request()
157 msg = self._session.recv()
160 if (msg.type == ovs.jsonrpc.Message.T_NOTIFY
161 and msg.method == "update"
162 and len(msg.params) == 2
163 and msg.params[0] == None):
164 # Database contents changed.
165 self.__parse_update(msg.params[1])
166 elif (msg.type == ovs.jsonrpc.Message.T_REPLY
167 and self._monitor_request_id is not None
168 and self._monitor_request_id == msg.id):
169 # Reply to our "monitor" request.
171 self.change_seqno += 1
172 self._monitor_request_id = None
174 self.__parse_update(msg.result)
175 except error.Error, e:
176 logging.error("%s: parse error in received schema: %s"
177 % (self._session.get_name(), e))
179 elif (msg.type == ovs.jsonrpc.Message.T_REPLY
180 and self._lock_request_id is not None
181 and self._lock_request_id == msg.id):
182 # Reply to our "lock" request.
183 self.__parse_lock_reply(msg.result)
184 elif (msg.type == ovs.jsonrpc.Message.T_NOTIFY
185 and msg.method == "locked"):
187 self.__parse_lock_notify(msg.params, True)
188 elif (msg.type == ovs.jsonrpc.Message.T_NOTIFY
189 and msg.method == "stolen"):
190 # Someone else stole our lock.
191 self.__parse_lock_notify(msg.params, False)
192 elif msg.type == ovs.jsonrpc.Message.T_NOTIFY and msg.id == "echo":
193 # Reply to our echo request. Ignore it.
195 elif (msg.type in (ovs.jsonrpc.Message.T_ERROR,
196 ovs.jsonrpc.Message.T_REPLY)
197 and self.__txn_process_reply(msg)):
198 # __txn_process_reply() did everything needed.
201 # This can happen if a transaction is destroyed before we
202 # receive the reply, so keep the log level low.
203 logging.debug("%s: received unexpected %s message"
204 % (self._session.get_name(),
205 ovs.jsonrpc.Message.type_to_string(msg.type)))
207 return initial_change_seqno != self.change_seqno
209 def wait(self, poller):
210 """Arranges for poller.block() to wake up when self.run() has something
211 to do or when activity occurs on a transaction on 'self'."""
212 self._session.wait(poller)
213 self._session.recv_wait(poller)
215 def has_ever_connected(self):
216 """Returns True, if the IDL successfully connected to the remote
217 database and retrieved its contents (even if the connection
218 subsequently dropped and is in the process of reconnecting). If so,
219 then the IDL contains an atomic snapshot of the database's contents
220 (but it might be arbitrarily old if the connection dropped).
222 Returns False if the IDL has never connected or retrieved the
223 database's contents. If so, the IDL is empty."""
224 return self.change_seqno != 0
226 def force_reconnect(self):
227 """Forces the IDL to drop its connection to the database and reconnect.
228 In the meantime, the contents of the IDL will not change."""
229 self._session.force_reconnect()
231 def set_lock(self, lock_name):
232 """If 'lock_name' is not None, configures the IDL to obtain the named
233 lock from the database server and to avoid modifying the database when
234 the lock cannot be acquired (that is, when another client has the same
237 If 'lock_name' is None, drops the locking requirement and releases the
240 assert not self._outstanding_txns
242 if self.lock_name and (not lock_name or lock_name != self.lock_name):
243 # Release previous lock.
244 self.__send_unlock_request()
245 self.lock_name = None
246 self.is_lock_contended = False
248 if lock_name and not self.lock_name:
250 self.lock_name = lock_name
251 self.__send_lock_request()
256 for table in self.tables.itervalues():
262 self.change_seqno += 1
264 def __update_has_lock(self, new_has_lock):
265 if new_has_lock and not self.has_lock:
266 if self._monitor_request_id is None:
267 self.change_seqno += 1
269 # We're waiting for a monitor reply, so don't signal that the
270 # database changed. The monitor reply will increment
271 # change_seqno anyhow.
273 self.is_lock_contended = False
274 self.has_lock = new_has_lock
276 def __do_send_lock_request(self, method):
277 self.__update_has_lock(False)
278 self._lock_request_id = None
279 if self._session.is_connected():
280 msg = ovs.jsonrpc.Message.create_request(method, [self.lock_name])
282 self._session.send(msg)
287 def __send_lock_request(self):
288 self._lock_request_id = self.__do_send_lock_request("lock")
290 def __send_unlock_request(self):
291 self.__do_send_lock_request("unlock")
293 def __parse_lock_reply(self, result):
294 self._lock_request_id = None
295 got_lock = type(result) == dict and result.get("locked") is True
296 self.__update_has_lock(got_lock)
298 self.is_lock_contended = True
300 def __parse_lock_notify(self, params, new_has_lock):
301 if (self.lock_name is not None
302 and type(params) in (list, tuple)
304 and params[0] == self.lock_name):
305 self.__update_has_lock(self, new_has_lock)
307 self.is_lock_contended = True
309 def __send_monitor_request(self):
310 monitor_requests = {}
311 for table in self.tables.itervalues():
312 monitor_requests[table.name] = {"columns": table.columns.keys()}
313 msg = ovs.jsonrpc.Message.create_request(
314 "monitor", [self._db.name, None, monitor_requests])
315 self._monitor_request_id = msg.id
316 self._session.send(msg)
318 def __parse_update(self, update):
320 self.__do_parse_update(update)
321 except error.Error, e:
322 logging.error("%s: error parsing update: %s"
323 % (self._session.get_name(), e))
325 def __do_parse_update(self, table_updates):
326 if type(table_updates) != dict:
327 raise error.Error("<table-updates> is not an object",
330 for table_name, table_update in table_updates.iteritems():
331 table = self.tables.get(table_name)
333 raise error.Error('<table-updates> includes unknown '
334 'table "%s"' % table_name)
336 if type(table_update) != dict:
337 raise error.Error('<table-update> for table "%s" is not '
338 'an object' % table_name, table_update)
340 for uuid_string, row_update in table_update.iteritems():
341 if not ovs.ovsuuid.is_valid_string(uuid_string):
342 raise error.Error('<table-update> for table "%s" '
343 'contains bad UUID "%s" as member '
344 'name' % (table_name, uuid_string),
346 uuid = ovs.ovsuuid.from_string(uuid_string)
348 if type(row_update) != dict:
349 raise error.Error('<table-update> for table "%s" '
350 'contains <row-update> for %s that '
352 % (table_name, uuid_string))
354 parser = ovs.db.parser.Parser(row_update, "row-update")
355 old = parser.get_optional("old", [dict])
356 new = parser.get_optional("new", [dict])
359 if not old and not new:
360 raise error.Error('<row-update> missing "old" and '
361 '"new" members', row_update)
363 if self.__process_update(table, uuid, old, new):
364 self.change_seqno += 1
366 def __process_update(self, table, uuid, old, new):
367 """Returns True if a column changed, False otherwise."""
368 row = table.rows.get(uuid)
377 logging.warning("cannot delete missing row %s from table %s"
378 % (uuid, table.name))
382 row = self.__create_row(table, uuid)
386 logging.warning("cannot add existing row %s to table %s"
387 % (uuid, table.name))
388 if self.__row_update(table, row, new):
392 row = self.__create_row(table, uuid)
395 logging.warning("cannot modify missing row %s in table %s"
396 % (uuid, table.name))
397 if self.__row_update(table, row, new):
401 def __row_update(self, table, row, row_json):
403 for column_name, datum_json in row_json.iteritems():
404 column = table.columns.get(column_name)
407 logging.warning("unknown column %s updating table %s"
408 % (column_name, table.name))
412 datum = ovs.db.data.Datum.from_json(column.type, datum_json)
413 except error.Error, e:
415 logging.warning("error parsing column %s in table %s: %s"
416 % (column_name, table.name, e))
419 if datum != row._data[column_name]:
420 row._data[column_name] = datum
424 # Didn't really change but the OVSDB monitor protocol always
425 # includes every value in a row.
429 def __create_row(self, table, uuid):
431 for column in table.columns.itervalues():
432 data[column.name] = ovs.db.data.Datum.default(column.type)
433 row = table.rows[uuid] = Row(self, table, uuid, data)
437 self._session.force_reconnect()
439 def __txn_abort_all(self):
440 while self._outstanding_txns:
441 txn = self._outstanding_txns.popitem()[1]
442 txn._status = Transaction.TRY_AGAIN
444 def __txn_process_reply(self, msg):
445 txn = self._outstanding_txns.pop(msg.id, None)
447 txn._process_reply(msg)
450 def _uuid_to_row(atom, base):
452 return base.ref_table.rows.get(atom)
457 def _row_to_uuid(value):
458 if type(value) == Row:
465 """A row within an IDL.
467 The client may access the following attributes directly:
469 - 'uuid': a uuid.UUID object whose value is the row's database UUID.
471 - An attribute for each column in the Row's table, named for the column,
472 whose values are as returned by Datum.to_python() for the column's type.
474 If some error occurs (e.g. the database server's idea of the column is
475 different from the IDL's idea), then the attribute values is the
476 "default" value return by Datum.default() for the column's type. (It is
477 important to know this because the default value may violate constraints
478 for the column's type, e.g. the default integer value is 0 even if column
479 contraints require the column's value to be positive.)
481 When a transaction is active, column attributes may also be assigned new
482 values. Committing the transaction will then cause the new value to be
483 stored into the database.
485 *NOTE*: In the current implementation, the value of a column is a *copy*
486 of the value in the database. This means that modifying its value
487 directly will have no useful effect. For example, the following:
488 row.mycolumn["a"] = "b" # don't do this
489 will not change anything in the database, even after commit. To modify
490 the column, instead assign the modified column value back to the column:
495 def __init__(self, idl, table, uuid, data):
496 # All of the explicit references to self.__dict__ below are required
497 # to set real attributes with invoking self.__getattr__().
498 self.__dict__["uuid"] = uuid
500 self.__dict__["_idl"] = idl
501 self.__dict__["_table"] = table
503 # _data is the committed data. It takes the following values:
505 # - A dictionary that maps every column name to a Datum, if the row
506 # exists in the committed form of the database.
508 # - None, if this row is newly inserted within the active transaction
509 # and thus has no committed form.
510 self.__dict__["_data"] = data
512 # _changes describes changes to this row within the active transaction.
513 # It takes the following values:
515 # - {}, the empty dictionary, if no transaction is active or if the
516 # row has yet not been changed within this transaction.
518 # - A dictionary that maps a column name to its new Datum, if an
519 # active transaction changes those columns' values.
521 # - A dictionary that maps every column name to a Datum, if the row
522 # is newly inserted within the active transaction.
524 # - None, if this transaction deletes this row.
525 self.__dict__["_changes"] = {}
527 # A dictionary whose keys are the names of columns that must be
528 # verified as prerequisites when the transaction commits. The values
529 # in the dictionary are all None.
530 self.__dict__["_prereqs"] = {}
532 def __getattr__(self, column_name):
533 assert self._changes is not None
535 datum = self._changes.get(column_name)
537 datum = self._data[column_name]
539 return datum.to_python(_uuid_to_row)
541 def __setattr__(self, column_name, value):
542 assert self._changes is not None
545 column = self._table.columns[column_name]
547 datum = ovs.db.data.Datum.from_python(column.type, value,
549 except error.Error, e:
551 logging.error("attempting to write bad value to column %s (%s)"
554 self._idl.txn._write(self, column, datum)
556 def verify(self, column_name):
557 """Causes the original contents of column 'column_name' in this row to
558 be verified as a prerequisite to completing the transaction. That is,
559 if 'column_name' changed in this row (or if this row was deleted)
560 between the time that the IDL originally read its contents and the time
561 that the transaction commits, then the transaction aborts and
562 Transaction.commit() returns Transaction.TRY_AGAIN.
564 The intention is that, to ensure that no transaction commits based on
565 dirty reads, an application should call Row.verify() on each data item
566 read as part of a read-modify-write operation.
568 In some cases Row.verify() reduces to a no-op, because the current
569 value of the column is already known:
571 - If this row is a row created by the current transaction (returned
572 by Transaction.insert()).
574 - If the column has already been modified within the current
577 Because of the latter property, always call Row.verify() *before*
578 modifying the column, for a given read-modify-write.
580 A transaction must be in progress."""
582 assert self._changes is not None
583 if not self._data or column_name in self._changes:
586 self._prereqs[column_name] = None
589 """Deletes this row from its table.
591 A transaction must be in progress."""
593 assert self._changes is not None
594 if self._data is None:
595 del self._idl.txn._txn_rows[self.uuid]
596 self.__dict__["_changes"] = None
597 del self._table.rows[self.uuid]
600 def _uuid_name_from_uuid(uuid):
601 return "row%s" % str(uuid).replace("-", "_")
604 def _where_uuid_equals(uuid):
605 return [["_uuid", "==", ["uuid", str(uuid)]]]
608 class _InsertedRow(object):
609 def __init__(self, op_index):
610 self.op_index = op_index
614 class Transaction(object):
615 # Status values that Transaction.commit() can return.
616 UNCOMMITTED = "uncommitted" # Not yet committed or aborted.
617 UNCHANGED = "unchanged" # Transaction didn't include any changes.
618 INCOMPLETE = "incomplete" # Commit in progress, please wait.
619 ABORTED = "aborted" # ovsdb_idl_txn_abort() called.
620 SUCCESS = "success" # Commit successful.
621 TRY_AGAIN = "try again" # Commit failed because a "verify" operation
622 # reported an inconsistency, due to a network
623 # problem, or other transient failure.
624 NOT_LOCKED = "not locked" # Server hasn't given us the lock yet.
625 ERROR = "error" # Commit failed due to a hard error.
628 def status_to_string(status):
629 """Converts one of the status values that Transaction.commit() can
630 return into a human-readable string.
632 (The status values are in fact such strings already, so
633 there's nothing to do.)"""
636 def __init__(self, idl):
637 """Starts a new transaction on 'idl' (an instance of ovs.db.idl.Idl).
638 A given Idl may only have a single active transaction at a time.
640 A Transaction may modify the contents of a database by assigning new
641 values to columns (attributes of Row), deleting rows (with
642 Row.delete()), or inserting rows (with Transaction.insert()). It may
643 also check that columns in the database have not changed with
646 When a transaction is complete (which must be before the next call to
647 Idl.run()), call Transaction.commit() or Transaction.abort()."""
648 assert idl.txn is None
651 self._request_id = None
655 self._status = Transaction.UNCOMMITTED
659 self._inc_table = None
660 self._inc_column = None
661 self._inc_where = None
663 self._inserted_rows = {} # Map from UUID to _InsertedRow
665 def add_comment(self, comment):
666 """Appens 'comment' to the comments that will be passed to the OVSDB
667 server when this transaction is committed. (The comment will be
668 committed to the OVSDB log, which "ovsdb-tool show-log" can print in a
669 relatively human-readable form.)"""
670 self._comments.append(comment)
672 def increment(self, table, column, where):
673 assert not self._inc_table
674 self._inc_table = table
675 self._inc_column = column
676 self._inc_where = where
678 def wait(self, poller):
679 if self._status not in (Transaction.UNCOMMITTED,
680 Transaction.INCOMPLETE):
681 poller.immediate_wake()
683 def _substitute_uuids(self, json):
684 if type(json) in (list, tuple):
686 and json[0] == 'uuid'
687 and ovs.ovsuuid.is_valid_string(json[1])):
688 uuid = ovs.ovsuuid.from_string(json[1])
689 row = self._txn_rows.get(uuid, None)
690 if row and row._data is None:
691 return ["named-uuid", _uuid_name_from_uuid(uuid)]
694 def __disassemble(self):
697 for row in self._txn_rows.itervalues():
698 if row._changes is None:
699 row._table.rows[row.uuid] = row
700 elif row._data is None:
701 del row._table.rows[row.uuid]
702 row.__dict__["_changes"] = {}
703 row.__dict__["_prereqs"] = {}
707 """Attempts to commit this transaction and returns the status of the
708 commit operation, one of the constants declared as class attributes.
709 If the return value is Transaction.INCOMPLETE, then the transaction is
710 not yet complete and the caller should try calling again later, after
711 calling Idl.run() to run the Idl.
713 Committing a transaction rolls back all of the changes that it made to
714 the Idl's copy of the database. If the transaction commits
715 successfully, then the database server will send an update and, thus,
716 the Idl will be updated with the committed changes."""
717 # The status can only change if we're the active transaction.
718 # (Otherwise, our status will change only in Idl.run().)
719 if self != self.idl.txn:
722 # If we need a lock but don't have it, give up quickly.
723 if self.idl.lock_name and not self.idl.has_lock():
724 self._status = Transaction.NOT_LOCKED
728 operations = [self.idl._db.name]
730 # Assert that we have the required lock (avoiding a race).
731 if self.idl.lock_name:
732 operations.append({"op": "assert",
733 "lock": self.idl.lock_name})
735 # Add prerequisites and declarations of new rows.
736 for row in self._txn_rows.itervalues():
740 for column_name in row._prereqs:
741 columns.append(column_name)
742 rows[column_name] = row._data[column_name].to_json()
743 operations.append({"op": "wait",
744 "table": row._table.name,
746 "where": _where_uuid_equals(row.uuid),
753 for row in self._txn_rows.itervalues():
754 if row._changes is None:
755 if row._table.is_root:
756 operations.append({"op": "delete",
757 "table": row._table.name,
758 "where": _where_uuid_equals(row.uuid)})
761 # Let ovsdb-server decide whether to really delete it.
764 op = {"table": row._table.name}
765 if row._data is None:
767 op["uuid-name"] = _uuid_name_from_uuid(row.uuid)
770 op_index = len(operations) - 1
771 self._inserted_rows[row.uuid] = _InsertedRow(op_index)
774 op["where"] = _where_uuid_equals(row.uuid)
779 for column_name, datum in row._changes.iteritems():
780 if row._data is not None or not datum.is_default():
781 row_json[column_name] = (
782 self._substitute_uuids(datum.to_json()))
784 # If anything really changed, consider it an update.
785 # We can't suppress not-really-changed values earlier
786 # or transactions would become nonatomic (see the big
787 # comment inside Transaction._write()).
788 if (not any_updates and row._data is not None and
789 row._data[column_name] != datum):
792 if row._data is None or row_json:
793 operations.append(op)
796 if self._inc_table and any_updates:
797 self._inc_index = len(operations) - 1
799 operations.append({"op": "mutate",
800 "table": self._inc_table,
801 "where": self._substitute_uuids(
803 "mutations": [[self._inc_column, "+=", 1]]})
804 operations.append({"op": "select",
805 "table": self._inc_table,
806 "where": self._substitute_uuids(
808 "columns": [self._inc_column]})
812 operations.append({"op": "comment",
813 "comment": "\n".join(self._comments)})
817 operations.append({"op": "abort"})
820 self._status = Transaction.UNCHANGED
822 msg = ovs.jsonrpc.Message.create_request("transact", operations)
823 self._request_id = msg.id
824 if not self.idl._session.send(msg):
825 self.idl._outstanding_txns[self._request_id] = self
826 self._status = Transaction.INCOMPLETE
828 self._status = Transaction.TRY_AGAIN
833 def commit_block(self):
835 status = self.commit()
836 if status != Transaction.INCOMPLETE:
841 poller = ovs.poller.Poller()
842 self.idl.wait(poller)
846 def get_increment_new_value(self):
847 assert self._status == Transaction.SUCCESS
848 return self._inc_new_value
851 """Aborts this transaction. If Transaction.commit() has already been
852 called then the transaction might get committed anyhow."""
854 if self._status in (Transaction.UNCOMMITTED,
855 Transaction.INCOMPLETE):
856 self._status = Transaction.ABORTED
859 """Returns a string representing this transaction's current status,
860 suitable for use in log messages."""
861 if self._status != Transaction.ERROR:
862 return Transaction.status_to_string(self._status)
866 return "no error details available"
868 def __set_error_json(self, json):
869 if self._error is None:
870 self._error = ovs.json.to_string(json)
872 def get_insert_uuid(self, uuid):
873 """Finds and returns the permanent UUID that the database assigned to a
874 newly inserted row, given the UUID that Transaction.insert() assigned
877 Returns None if 'uuid' is not a UUID assigned by Transaction.insert()
878 or if it was assigned by that function and then deleted by Row.delete()
879 within the same transaction. (Rows that are inserted and then deleted
880 within a single transaction are never sent to the database server, so
881 it never assigns them a permanent UUID.)
883 This transaction must have completed successfully."""
884 assert self._status in (Transaction.SUCCESS,
885 Transaction.UNCHANGED)
886 inserted_row = self._inserted_rows.get(uuid)
888 return inserted_row.real
891 def _write(self, row, column, datum):
892 assert row._changes is not None
896 # If this is a write-only column and the datum being written is the
897 # same as the one already there, just skip the update entirely. This
898 # is worth optimizing because we have a lot of columns that get
899 # periodically refreshed into the database but don't actually change
902 # We don't do this for read/write columns because that would break
903 # atomicity of transactions--some other client might have written a
904 # different value in that column since we read it. (But if a whole
905 # transaction only does writes of existing values, without making any
906 # real changes, we will drop the whole transaction later in
907 # ovsdb_idl_txn_commit().)
908 if not column.alert and row._data.get(column.name) == datum:
909 new_value = row._changes.get(column.name)
910 if new_value is None or new_value == datum:
913 txn._txn_rows[row.uuid] = row
914 row._changes[column.name] = datum.copy()
916 def insert(self, table, new_uuid=None):
917 """Inserts and returns a new row in 'table', which must be one of the
918 ovs.db.schema.TableSchema objects in the Idl's 'tables' dict.
920 The new row is assigned a provisional UUID. If 'uuid' is None then one
921 is randomly generated; otherwise 'uuid' should specify a randomly
922 generated uuid.UUID not otherwise in use. ovsdb-server will assign a
923 different UUID when 'txn' is committed, but the IDL will replace any
924 uses of the provisional UUID in the data to be to be committed by the
925 UUID assigned by ovsdb-server."""
926 assert self._status == Transaction.UNCOMMITTED
928 new_uuid = uuid.uuid4()
929 row = Row(self.idl, table, new_uuid, None)
930 table.rows[row.uuid] = row
931 self._txn_rows[row.uuid] = row
934 def _process_reply(self, msg):
935 if msg.type == ovs.jsonrpc.Message.T_ERROR:
936 self._status = Transaction.ERROR
937 elif type(msg.result) not in (list, tuple):
939 logging.warning('reply to "transact" is not JSON array')
948 # This isn't an error in itself but indicates that some
949 # prior operation failed, so make sure that we know about
952 elif type(op) == dict:
953 error = op.get("error")
954 if error is not None:
955 if error == "timed out":
957 elif error == "not owner":
959 elif error == "aborted":
963 self.__set_error_json(op)
966 self.__set_error_json(op)
968 logging.warning("operation reply is not JSON null or "
971 if not soft_errors and not hard_errors and not lock_errors:
972 if self._inc_table and not self.__process_inc_reply(ops):
975 for insert in self._inserted_rows.itervalues():
976 if not self.__process_insert_reply(insert, ops):
980 self._status = Transaction.ERROR
982 self._status = Transaction.NOT_LOCKED
984 self._status = Transaction.TRY_AGAIN
986 self._status = Transaction.SUCCESS
989 def __check_json_type(json, types, name):
992 logging.warning("%s is missing" % name)
994 elif type(json) not in types:
996 logging.warning("%s has unexpected type %s" % (name, type(json)))
1001 def __process_inc_reply(self, ops):
1002 if self._inc_index + 2 > len(ops):
1004 logging.warning("reply does not contain enough operations for "
1005 "increment (has %d, needs %d)" %
1006 (len(ops), self._inc_index + 2))
1008 # We know that this is a JSON object because the loop in
1009 # __process_reply() already checked.
1010 mutate = ops[self._inc_index]
1011 count = mutate.get("count")
1012 if not Transaction.__check_json_type(count, (int, long),
1013 '"mutate" reply "count"'):
1017 logging.warning('"mutate" reply "count" is %d instead of 1'
1021 select = ops[self._inc_index + 1]
1022 rows = select.get("rows")
1023 if not Transaction.__check_json_type(rows, (list, tuple),
1024 '"select" reply "rows"'):
1028 logging.warning('"select" reply "rows" has %d elements '
1029 'instead of 1' % len(rows))
1032 if not Transaction.__check_json_type(row, (dict,),
1033 '"select" reply row'):
1035 column = row.get(self._inc_column)
1036 if not Transaction.__check_json_type(column, (int, long),
1037 '"select" reply inc column'):
1039 self._inc_new_value = column
1042 def __process_insert_reply(self, insert, ops):
1043 if insert.op_index >= len(ops):
1045 logging.warning("reply does not contain enough operations "
1046 "for insert (has %d, needs %d)"
1047 % (len(ops), insert.op_index))
1050 # We know that this is a JSON object because the loop in
1051 # __process_reply() already checked.
1052 reply = ops[insert.op_index]
1053 json_uuid = reply.get("uuid")
1054 if not Transaction.__check_json_type(json_uuid, (tuple, list),
1055 '"insert" reply "uuid"'):
1059 uuid_ = ovs.ovsuuid.from_json(json_uuid)
1062 logging.warning('"insert" reply "uuid" is not a JSON UUID')