1 # Copyright (c) 2009, 2010, 2011 Nicira Networks
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at:
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
20 from ovs.db import error
25 vlog = ovs.vlog.Vlog("idl")
27 __pychecker__ = 'no-classattr no-objattrs'
31 """Open vSwitch Database Interface Definition Language (OVSDB IDL).
33 The OVSDB IDL maintains an in-memory replica of a database. It issues RPC
34 requests to an OVSDB database server and parses the responses, converting
35 raw JSON into data structures that are easier for clients to digest.
37 The IDL also assists with issuing database transactions. The client
38 creates a transaction, manipulates the IDL data structures, and commits or
39 aborts the transaction. The IDL then composes and issues the necessary
40 JSON-RPC requests and reports to the client whether the transaction
41 completed successfully.
43 The client is allowed to access the following attributes directly, in a
46 - 'tables': This is the 'tables' map in the ovs.db.schema.DbSchema provided
47 to the Idl constructor. Each ovs.db.schema.TableSchema in the map is
48 annotated with a new attribute 'rows', which is a dict from a uuid.UUID
51 The client may directly read and write the Row objects referenced by the
52 'rows' map values. Refer to Row for more details.
54 - 'change_seqno': A number that represents the IDL's state. When the IDL
55 is updated (by Idl.run()), its value changes.
57 - 'lock_name': The name of the lock configured with Idl.set_lock(), or None
58 if no lock is configured.
60 - 'has_lock': True, if the IDL is configured to obtain a lock and owns that
61 lock, and False otherwise.
63 Locking and unlocking happens asynchronously from the database client's
64 point of view, so the information is only useful for optimization
65 (e.g. if the client doesn't have the lock then there's no point in trying
66 to write to the database).
68 - 'is_lock_contended': True, if the IDL is configured to obtain a lock but
69 the database server has indicated that some other client already owns the
70 requested lock, and False otherwise.
72 - 'txn': The ovs.db.idl.Transaction object for the database transaction
73 currently being constructed, if there is one, or None otherwise.
76 def __init__(self, remote, schema):
77 """Creates and returns a connection to the database named 'db_name' on
78 'remote', which should be in a form acceptable to
79 ovs.jsonrpc.session.open(). The connection will maintain an in-memory
80 replica of the remote database.
82 'schema' should be the schema for the remote database. The caller may
83 have cut it down by removing tables or columns that are not of
84 interest. The IDL will only replicate the tables and columns that
85 remain. The caller may also add a attribute named 'alert' to selected
86 remaining columns, setting its value to False; if so, then changes to
87 those columns will not be considered changes to the database for the
88 purpose of the return value of Idl.run() and Idl.change_seqno. This is
89 useful for columns that the IDL's client will write but not read.
91 The IDL uses and modifies 'schema' directly."""
93 self.tables = schema.tables
95 self._session = ovs.jsonrpc.Session.open(remote)
96 self._monitor_request_id = None
97 self._last_seqno = None
101 self.lock_name = None # Name of lock we need, None if none.
102 self.has_lock = False # Has db server said we have the lock?
103 self.is_lock_contended = False # Has db server said we can't get lock?
104 self._lock_request_id = None # JSON-RPC ID of in-flight lock request.
106 # Transaction support.
108 self._outstanding_txns = {}
110 for table in schema.tables.itervalues():
111 for column in table.columns.itervalues():
112 if not hasattr(column, 'alert'):
114 table.need_table = False
119 """Closes the connection to the database. The IDL will no longer
121 self._session.close()
124 """Processes a batch of messages from the database server. Returns
125 True if the database as seen through the IDL changed, False if it did
126 not change. The initial fetch of the entire contents of the remote
127 database is considered to be one kind of change. If the IDL has been
128 configured to acquire a database lock (with Idl.set_lock()), then
129 successfully acquiring the lock is also considered to be a change.
131 This function can return occasional false positives, that is, report
132 that the database changed even though it didn't. This happens if the
133 connection to the database drops and reconnects, which causes the
134 database contents to be reloaded even if they didn't change. (It could
135 also happen if the database server sends out a "change" that reflects
136 what we already thought was in the database, but the database server is
137 not supposed to do that.)
139 As an alternative to checking the return value, the client may check
140 for changes in self.change_seqno."""
142 initial_change_seqno = self.change_seqno
147 if not self._session.is_connected():
150 seqno = self._session.get_seqno()
151 if seqno != self._last_seqno:
152 self._last_seqno = seqno
153 self.__txn_abort_all()
154 self.__send_monitor_request()
156 self.__send_lock_request()
159 msg = self._session.recv()
162 if (msg.type == ovs.jsonrpc.Message.T_NOTIFY
163 and msg.method == "update"
164 and len(msg.params) == 2
165 and msg.params[0] == None):
166 # Database contents changed.
167 self.__parse_update(msg.params[1])
168 elif (msg.type == ovs.jsonrpc.Message.T_REPLY
169 and self._monitor_request_id is not None
170 and self._monitor_request_id == msg.id):
171 # Reply to our "monitor" request.
173 self.change_seqno += 1
174 self._monitor_request_id = None
176 self.__parse_update(msg.result)
177 except error.Error, e:
178 vlog.err("%s: parse error in received schema: %s"
179 % (self._session.get_name(), e))
181 elif (msg.type == ovs.jsonrpc.Message.T_REPLY
182 and self._lock_request_id is not None
183 and self._lock_request_id == msg.id):
184 # Reply to our "lock" request.
185 self.__parse_lock_reply(msg.result)
186 elif (msg.type == ovs.jsonrpc.Message.T_NOTIFY
187 and msg.method == "locked"):
189 self.__parse_lock_notify(msg.params, True)
190 elif (msg.type == ovs.jsonrpc.Message.T_NOTIFY
191 and msg.method == "stolen"):
192 # Someone else stole our lock.
193 self.__parse_lock_notify(msg.params, False)
194 elif msg.type == ovs.jsonrpc.Message.T_NOTIFY and msg.id == "echo":
195 # Reply to our echo request. Ignore it.
197 elif (msg.type in (ovs.jsonrpc.Message.T_ERROR,
198 ovs.jsonrpc.Message.T_REPLY)
199 and self.__txn_process_reply(msg)):
200 # __txn_process_reply() did everything needed.
203 # This can happen if a transaction is destroyed before we
204 # receive the reply, so keep the log level low.
205 vlog.dbg("%s: received unexpected %s message"
206 % (self._session.get_name(),
207 ovs.jsonrpc.Message.type_to_string(msg.type)))
209 return initial_change_seqno != self.change_seqno
211 def wait(self, poller):
212 """Arranges for poller.block() to wake up when self.run() has something
213 to do or when activity occurs on a transaction on 'self'."""
214 self._session.wait(poller)
215 self._session.recv_wait(poller)
217 def has_ever_connected(self):
218 """Returns True, if the IDL successfully connected to the remote
219 database and retrieved its contents (even if the connection
220 subsequently dropped and is in the process of reconnecting). If so,
221 then the IDL contains an atomic snapshot of the database's contents
222 (but it might be arbitrarily old if the connection dropped).
224 Returns False if the IDL has never connected or retrieved the
225 database's contents. If so, the IDL is empty."""
226 return self.change_seqno != 0
228 def force_reconnect(self):
229 """Forces the IDL to drop its connection to the database and reconnect.
230 In the meantime, the contents of the IDL will not change."""
231 self._session.force_reconnect()
233 def set_lock(self, lock_name):
234 """If 'lock_name' is not None, configures the IDL to obtain the named
235 lock from the database server and to avoid modifying the database when
236 the lock cannot be acquired (that is, when another client has the same
239 If 'lock_name' is None, drops the locking requirement and releases the
242 assert not self._outstanding_txns
244 if self.lock_name and (not lock_name or lock_name != self.lock_name):
245 # Release previous lock.
246 self.__send_unlock_request()
247 self.lock_name = None
248 self.is_lock_contended = False
250 if lock_name and not self.lock_name:
252 self.lock_name = lock_name
253 self.__send_lock_request()
258 for table in self.tables.itervalues():
264 self.change_seqno += 1
266 def __update_has_lock(self, new_has_lock):
267 if new_has_lock and not self.has_lock:
268 if self._monitor_request_id is None:
269 self.change_seqno += 1
271 # We're waiting for a monitor reply, so don't signal that the
272 # database changed. The monitor reply will increment
273 # change_seqno anyhow.
275 self.is_lock_contended = False
276 self.has_lock = new_has_lock
278 def __do_send_lock_request(self, method):
279 self.__update_has_lock(False)
280 self._lock_request_id = None
281 if self._session.is_connected():
282 msg = ovs.jsonrpc.Message.create_request(method, [self.lock_name])
284 self._session.send(msg)
289 def __send_lock_request(self):
290 self._lock_request_id = self.__do_send_lock_request("lock")
292 def __send_unlock_request(self):
293 self.__do_send_lock_request("unlock")
295 def __parse_lock_reply(self, result):
296 self._lock_request_id = None
297 got_lock = type(result) == dict and result.get("locked") is True
298 self.__update_has_lock(got_lock)
300 self.is_lock_contended = True
302 def __parse_lock_notify(self, params, new_has_lock):
303 if (self.lock_name is not None
304 and type(params) in (list, tuple)
306 and params[0] == self.lock_name):
307 self.__update_has_lock(self, new_has_lock)
309 self.is_lock_contended = True
311 def __send_monitor_request(self):
312 monitor_requests = {}
313 for table in self.tables.itervalues():
314 monitor_requests[table.name] = {"columns": table.columns.keys()}
315 msg = ovs.jsonrpc.Message.create_request(
316 "monitor", [self._db.name, None, monitor_requests])
317 self._monitor_request_id = msg.id
318 self._session.send(msg)
320 def __parse_update(self, update):
322 self.__do_parse_update(update)
323 except error.Error, e:
324 vlog.err("%s: error parsing update: %s"
325 % (self._session.get_name(), e))
327 def __do_parse_update(self, table_updates):
328 if type(table_updates) != dict:
329 raise error.Error("<table-updates> is not an object",
332 for table_name, table_update in table_updates.iteritems():
333 table = self.tables.get(table_name)
335 raise error.Error('<table-updates> includes unknown '
336 'table "%s"' % table_name)
338 if type(table_update) != dict:
339 raise error.Error('<table-update> for table "%s" is not '
340 'an object' % table_name, table_update)
342 for uuid_string, row_update in table_update.iteritems():
343 if not ovs.ovsuuid.is_valid_string(uuid_string):
344 raise error.Error('<table-update> for table "%s" '
345 'contains bad UUID "%s" as member '
346 'name' % (table_name, uuid_string),
348 uuid = ovs.ovsuuid.from_string(uuid_string)
350 if type(row_update) != dict:
351 raise error.Error('<table-update> for table "%s" '
352 'contains <row-update> for %s that '
354 % (table_name, uuid_string))
356 parser = ovs.db.parser.Parser(row_update, "row-update")
357 old = parser.get_optional("old", [dict])
358 new = parser.get_optional("new", [dict])
361 if not old and not new:
362 raise error.Error('<row-update> missing "old" and '
363 '"new" members', row_update)
365 if self.__process_update(table, uuid, old, new):
366 self.change_seqno += 1
368 def __process_update(self, table, uuid, old, new):
369 """Returns True if a column changed, False otherwise."""
370 row = table.rows.get(uuid)
379 vlog.warn("cannot delete missing row %s from table %s"
380 % (uuid, table.name))
384 row = self.__create_row(table, uuid)
388 vlog.warn("cannot add existing row %s to table %s"
389 % (uuid, table.name))
390 if self.__row_update(table, row, new):
394 row = self.__create_row(table, uuid)
397 vlog.warn("cannot modify missing row %s in table %s"
398 % (uuid, table.name))
399 if self.__row_update(table, row, new):
403 def __row_update(self, table, row, row_json):
405 for column_name, datum_json in row_json.iteritems():
406 column = table.columns.get(column_name)
409 vlog.warn("unknown column %s updating table %s"
410 % (column_name, table.name))
414 datum = ovs.db.data.Datum.from_json(column.type, datum_json)
415 except error.Error, e:
417 vlog.warn("error parsing column %s in table %s: %s"
418 % (column_name, table.name, e))
421 if datum != row._data[column_name]:
422 row._data[column_name] = datum
426 # Didn't really change but the OVSDB monitor protocol always
427 # includes every value in a row.
431 def __create_row(self, table, uuid):
433 for column in table.columns.itervalues():
434 data[column.name] = ovs.db.data.Datum.default(column.type)
435 row = table.rows[uuid] = Row(self, table, uuid, data)
439 self._session.force_reconnect()
441 def __txn_abort_all(self):
442 while self._outstanding_txns:
443 txn = self._outstanding_txns.popitem()[1]
444 txn._status = Transaction.AGAIN_WAIT
446 def __txn_process_reply(self, msg):
447 txn = self._outstanding_txns.pop(msg.id, None)
449 txn._process_reply(msg)
452 def _uuid_to_row(atom, base):
454 return base.ref_table.rows.get(atom)
459 def _row_to_uuid(value):
460 if type(value) == Row:
467 """A row within an IDL.
469 The client may access the following attributes directly:
471 - 'uuid': a uuid.UUID object whose value is the row's database UUID.
473 - An attribute for each column in the Row's table, named for the column,
474 whose values are as returned by Datum.to_python() for the column's type.
476 If some error occurs (e.g. the database server's idea of the column is
477 different from the IDL's idea), then the attribute values is the
478 "default" value return by Datum.default() for the column's type. (It is
479 important to know this because the default value may violate constraints
480 for the column's type, e.g. the default integer value is 0 even if column
481 contraints require the column's value to be positive.)
483 When a transaction is active, column attributes may also be assigned new
484 values. Committing the transaction will then cause the new value to be
485 stored into the database.
487 *NOTE*: In the current implementation, the value of a column is a *copy*
488 of the value in the database. This means that modifying its value
489 directly will have no useful effect. For example, the following:
490 row.mycolumn["a"] = "b" # don't do this
491 will not change anything in the database, even after commit. To modify
492 the column, instead assign the modified column value back to the column:
497 def __init__(self, idl, table, uuid, data):
498 # All of the explicit references to self.__dict__ below are required
499 # to set real attributes with invoking self.__getattr__().
500 self.__dict__["uuid"] = uuid
502 self.__dict__["_idl"] = idl
503 self.__dict__["_table"] = table
505 # _data is the committed data. It takes the following values:
507 # - A dictionary that maps every column name to a Datum, if the row
508 # exists in the committed form of the database.
510 # - None, if this row is newly inserted within the active transaction
511 # and thus has no committed form.
512 self.__dict__["_data"] = data
514 # _changes describes changes to this row within the active transaction.
515 # It takes the following values:
517 # - {}, the empty dictionary, if no transaction is active or if the
518 # row has yet not been changed within this transaction.
520 # - A dictionary that maps a column name to its new Datum, if an
521 # active transaction changes those columns' values.
523 # - A dictionary that maps every column name to a Datum, if the row
524 # is newly inserted within the active transaction.
526 # - None, if this transaction deletes this row.
527 self.__dict__["_changes"] = {}
529 # A dictionary whose keys are the names of columns that must be
530 # verified as prerequisites when the transaction commits. The values
531 # in the dictionary are all None.
532 self.__dict__["_prereqs"] = {}
534 def __getattr__(self, column_name):
535 assert self._changes is not None
537 datum = self._changes.get(column_name)
539 datum = self._data[column_name]
541 return datum.to_python(_uuid_to_row)
543 def __setattr__(self, column_name, value):
544 assert self._changes is not None
547 column = self._table.columns[column_name]
549 datum = ovs.db.data.Datum.from_python(column.type, value,
551 except error.Error, e:
553 vlog.err("attempting to write bad value to column %s (%s)"
556 self._idl.txn._write(self, column, datum)
558 def verify(self, column_name):
559 """Causes the original contents of column 'column_name' in this row to
560 be verified as a prerequisite to completing the transaction. That is,
561 if 'column_name' changed in this row (or if this row was deleted)
562 between the time that the IDL originally read its contents and the time
563 that the transaction commits, then the transaction aborts and
564 Transaction.commit() returns Transaction.AGAIN_WAIT or
565 Transaction.AGAIN_NOW (depending on whether the database change has
566 already been received).
568 The intention is that, to ensure that no transaction commits based on
569 dirty reads, an application should call Row.verify() on each data item
570 read as part of a read-modify-write operation.
572 In some cases Row.verify() reduces to a no-op, because the current
573 value of the column is already known:
575 - If this row is a row created by the current transaction (returned
576 by Transaction.insert()).
578 - If the column has already been modified within the current
581 Because of the latter property, always call Row.verify() *before*
582 modifying the column, for a given read-modify-write.
584 A transaction must be in progress."""
586 assert self._changes is not None
587 if not self._data or column_name in self._changes:
590 self._prereqs[column_name] = None
593 """Deletes this row from its table.
595 A transaction must be in progress."""
597 assert self._changes is not None
598 if self._data is None:
599 del self._idl.txn._txn_rows[self.uuid]
600 self.__dict__["_changes"] = None
601 del self._table.rows[self.uuid]
604 def _uuid_name_from_uuid(uuid):
605 return "row%s" % str(uuid).replace("-", "_")
608 def _where_uuid_equals(uuid):
609 return [["_uuid", "==", ["uuid", str(uuid)]]]
612 class _InsertedRow(object):
613 def __init__(self, op_index):
614 self.op_index = op_index
618 class Transaction(object):
619 # Status values that Transaction.commit() can return.
620 UNCOMMITTED = "uncommitted" # Not yet committed or aborted.
621 UNCHANGED = "unchanged" # Transaction didn't include any changes.
622 INCOMPLETE = "incomplete" # Commit in progress, please wait.
623 ABORTED = "aborted" # ovsdb_idl_txn_abort() called.
624 SUCCESS = "success" # Commit successful.
625 AGAIN_WAIT = "wait then try again"
626 # Commit failed because a "verify" operation
627 # reported an inconsistency, due to a network
628 # problem, or other transient failure. Wait
629 # for a change, then try again.
630 AGAIN_NOW = "try again now" # Same as AGAIN_WAIT but try again right away.
631 NOT_LOCKED = "not locked" # Server hasn't given us the lock yet.
632 ERROR = "error" # Commit failed due to a hard error.
635 def status_to_string(status):
636 """Converts one of the status values that Transaction.commit() can
637 return into a human-readable string.
639 (The status values are in fact such strings already, so
640 there's nothing to do.)"""
643 def __init__(self, idl):
644 """Starts a new transaction on 'idl' (an instance of ovs.db.idl.Idl).
645 A given Idl may only have a single active transaction at a time.
647 A Transaction may modify the contents of a database by assigning new
648 values to columns (attributes of Row), deleting rows (with
649 Row.delete()), or inserting rows (with Transaction.insert()). It may
650 also check that columns in the database have not changed with
653 When a transaction is complete (which must be before the next call to
654 Idl.run()), call Transaction.commit() or Transaction.abort()."""
655 assert idl.txn is None
658 self._request_id = None
662 self._status = Transaction.UNCOMMITTED
665 self._commit_seqno = self.idl.change_seqno
667 self._inc_table = None
668 self._inc_column = None
669 self._inc_where = None
671 self._inserted_rows = {} # Map from UUID to _InsertedRow
673 def add_comment(self, comment):
674 """Appens 'comment' to the comments that will be passed to the OVSDB
675 server when this transaction is committed. (The comment will be
676 committed to the OVSDB log, which "ovsdb-tool show-log" can print in a
677 relatively human-readable form.)"""
678 self._comments.append(comment)
680 def increment(self, table, column, where):
681 assert not self._inc_table
682 self._inc_table = table
683 self._inc_column = column
684 self._inc_where = where
686 def wait(self, poller):
687 if self._status not in (Transaction.UNCOMMITTED,
688 Transaction.INCOMPLETE):
689 poller.immediate_wake()
691 def _substitute_uuids(self, json):
692 if type(json) in (list, tuple):
694 and json[0] == 'uuid'
695 and ovs.ovsuuid.is_valid_string(json[1])):
696 uuid = ovs.ovsuuid.from_string(json[1])
697 row = self._txn_rows.get(uuid, None)
698 if row and row._data is None:
699 return ["named-uuid", _uuid_name_from_uuid(uuid)]
702 def __disassemble(self):
705 for row in self._txn_rows.itervalues():
706 if row._changes is None:
707 row._table.rows[row.uuid] = row
708 elif row._data is None:
709 del row._table.rows[row.uuid]
710 row.__dict__["_changes"] = {}
711 row.__dict__["_prereqs"] = {}
715 """Attempts to commit this transaction and returns the status of the
716 commit operation, one of the constants declared as class attributes.
717 If the return value is Transaction.INCOMPLETE, then the transaction is
718 not yet complete and the caller should try calling again later, after
719 calling Idl.run() to run the Idl.
721 Committing a transaction rolls back all of the changes that it made to
722 the Idl's copy of the database. If the transaction commits
723 successfully, then the database server will send an update and, thus,
724 the Idl will be updated with the committed changes."""
725 # The status can only change if we're the active transaction.
726 # (Otherwise, our status will change only in Idl.run().)
727 if self != self.idl.txn:
730 # If we need a lock but don't have it, give up quickly.
731 if self.idl.lock_name and not self.idl.has_lock():
732 self._status = Transaction.NOT_LOCKED
736 operations = [self.idl._db.name]
738 # Assert that we have the required lock (avoiding a race).
739 if self.idl.lock_name:
740 operations.append({"op": "assert",
741 "lock": self.idl.lock_name})
743 # Add prerequisites and declarations of new rows.
744 for row in self._txn_rows.itervalues():
748 for column_name in row._prereqs:
749 columns.append(column_name)
750 rows[column_name] = row._data[column_name].to_json()
751 operations.append({"op": "wait",
752 "table": row._table.name,
754 "where": _where_uuid_equals(row.uuid),
761 for row in self._txn_rows.itervalues():
762 if row._changes is None:
763 if row._table.is_root:
764 operations.append({"op": "delete",
765 "table": row._table.name,
766 "where": _where_uuid_equals(row.uuid)})
769 # Let ovsdb-server decide whether to really delete it.
772 op = {"table": row._table.name}
773 if row._data is None:
775 op["uuid-name"] = _uuid_name_from_uuid(row.uuid)
778 op_index = len(operations) - 1
779 self._inserted_rows[row.uuid] = _InsertedRow(op_index)
782 op["where"] = _where_uuid_equals(row.uuid)
787 for column_name, datum in row._changes.iteritems():
788 if row._data is not None or not datum.is_default():
789 row_json[column_name] = (
790 self._substitute_uuids(datum.to_json()))
792 # If anything really changed, consider it an update.
793 # We can't suppress not-really-changed values earlier
794 # or transactions would become nonatomic (see the big
795 # comment inside Transaction._write()).
796 if (not any_updates and row._data is not None and
797 row._data[column_name] != datum):
800 if row._data is None or row_json:
801 operations.append(op)
804 if self._inc_table and any_updates:
805 self._inc_index = len(operations) - 1
807 operations.append({"op": "mutate",
808 "table": self._inc_table,
809 "where": self._substitute_uuids(
811 "mutations": [[self._inc_column, "+=", 1]]})
812 operations.append({"op": "select",
813 "table": self._inc_table,
814 "where": self._substitute_uuids(
816 "columns": [self._inc_column]})
820 operations.append({"op": "comment",
821 "comment": "\n".join(self._comments)})
825 operations.append({"op": "abort"})
828 self._status = Transaction.UNCHANGED
830 msg = ovs.jsonrpc.Message.create_request("transact", operations)
831 self._request_id = msg.id
832 if not self.idl._session.send(msg):
833 self.idl._outstanding_txns[self._request_id] = self
834 self._status = Transaction.INCOMPLETE
836 self._status = Transaction.AGAIN_WAIT
841 def commit_block(self):
843 status = self.commit()
844 if status != Transaction.INCOMPLETE:
849 poller = ovs.poller.Poller()
850 self.idl.wait(poller)
854 def get_increment_new_value(self):
855 assert self._status == Transaction.SUCCESS
856 return self._inc_new_value
859 """Aborts this transaction. If Transaction.commit() has already been
860 called then the transaction might get committed anyhow."""
862 if self._status in (Transaction.UNCOMMITTED,
863 Transaction.INCOMPLETE):
864 self._status = Transaction.ABORTED
867 """Returns a string representing this transaction's current status,
868 suitable for use in log messages."""
869 if self._status != Transaction.ERROR:
870 return Transaction.status_to_string(self._status)
874 return "no error details available"
876 def __set_error_json(self, json):
877 if self._error is None:
878 self._error = ovs.json.to_string(json)
880 def get_insert_uuid(self, uuid):
881 """Finds and returns the permanent UUID that the database assigned to a
882 newly inserted row, given the UUID that Transaction.insert() assigned
885 Returns None if 'uuid' is not a UUID assigned by Transaction.insert()
886 or if it was assigned by that function and then deleted by Row.delete()
887 within the same transaction. (Rows that are inserted and then deleted
888 within a single transaction are never sent to the database server, so
889 it never assigns them a permanent UUID.)
891 This transaction must have completed successfully."""
892 assert self._status in (Transaction.SUCCESS,
893 Transaction.UNCHANGED)
894 inserted_row = self._inserted_rows.get(uuid)
896 return inserted_row.real
899 def _write(self, row, column, datum):
900 assert row._changes is not None
904 # If this is a write-only column and the datum being written is the
905 # same as the one already there, just skip the update entirely. This
906 # is worth optimizing because we have a lot of columns that get
907 # periodically refreshed into the database but don't actually change
910 # We don't do this for read/write columns because that would break
911 # atomicity of transactions--some other client might have written a
912 # different value in that column since we read it. (But if a whole
913 # transaction only does writes of existing values, without making any
914 # real changes, we will drop the whole transaction later in
915 # ovsdb_idl_txn_commit().)
916 if not column.alert and row._data.get(column.name) == datum:
917 new_value = row._changes.get(column.name)
918 if new_value is None or new_value == datum:
921 txn._txn_rows[row.uuid] = row
922 row._changes[column.name] = datum.copy()
924 def insert(self, table, new_uuid=None):
925 """Inserts and returns a new row in 'table', which must be one of the
926 ovs.db.schema.TableSchema objects in the Idl's 'tables' dict.
928 The new row is assigned a provisional UUID. If 'uuid' is None then one
929 is randomly generated; otherwise 'uuid' should specify a randomly
930 generated uuid.UUID not otherwise in use. ovsdb-server will assign a
931 different UUID when 'txn' is committed, but the IDL will replace any
932 uses of the provisional UUID in the data to be to be committed by the
933 UUID assigned by ovsdb-server."""
934 assert self._status == Transaction.UNCOMMITTED
936 new_uuid = uuid.uuid4()
937 row = Row(self.idl, table, new_uuid, None)
938 table.rows[row.uuid] = row
939 self._txn_rows[row.uuid] = row
942 def _process_reply(self, msg):
943 if msg.type == ovs.jsonrpc.Message.T_ERROR:
944 self._status = Transaction.ERROR
945 elif type(msg.result) not in (list, tuple):
947 vlog.warn('reply to "transact" is not JSON array')
956 # This isn't an error in itself but indicates that some
957 # prior operation failed, so make sure that we know about
960 elif type(op) == dict:
961 error = op.get("error")
962 if error is not None:
963 if error == "timed out":
965 elif error == "not owner":
967 elif error == "aborted":
971 self.__set_error_json(op)
974 self.__set_error_json(op)
976 vlog.warn("operation reply is not JSON null or object")
978 if not soft_errors and not hard_errors and not lock_errors:
979 if self._inc_table and not self.__process_inc_reply(ops):
982 for insert in self._inserted_rows.itervalues():
983 if not self.__process_insert_reply(insert, ops):
987 self._status = Transaction.ERROR
989 self._status = Transaction.NOT_LOCKED
991 if self._commit_seqno == self.idl.change_seqno:
992 self._status = Transaction.AGAIN_WAIT
994 self._status = Transaction.AGAIN_NOW
996 self._status = Transaction.SUCCESS
999 def __check_json_type(json, types, name):
1002 vlog.warn("%s is missing" % name)
1004 elif type(json) not in types:
1006 vlog.warn("%s has unexpected type %s" % (name, type(json)))
1011 def __process_inc_reply(self, ops):
1012 if self._inc_index + 2 > len(ops):
1014 vlog.warn("reply does not contain enough operations for "
1015 "increment (has %d, needs %d)" %
1016 (len(ops), self._inc_index + 2))
1018 # We know that this is a JSON object because the loop in
1019 # __process_reply() already checked.
1020 mutate = ops[self._inc_index]
1021 count = mutate.get("count")
1022 if not Transaction.__check_json_type(count, (int, long),
1023 '"mutate" reply "count"'):
1027 vlog.warn('"mutate" reply "count" is %d instead of 1' % count)
1030 select = ops[self._inc_index + 1]
1031 rows = select.get("rows")
1032 if not Transaction.__check_json_type(rows, (list, tuple),
1033 '"select" reply "rows"'):
1037 vlog.warn('"select" reply "rows" has %d elements '
1038 'instead of 1' % len(rows))
1041 if not Transaction.__check_json_type(row, (dict,),
1042 '"select" reply row'):
1044 column = row.get(self._inc_column)
1045 if not Transaction.__check_json_type(column, (int, long),
1046 '"select" reply inc column'):
1048 self._inc_new_value = column
1051 def __process_insert_reply(self, insert, ops):
1052 if insert.op_index >= len(ops):
1054 vlog.warn("reply does not contain enough operations "
1055 "for insert (has %d, needs %d)"
1056 % (len(ops), insert.op_index))
1059 # We know that this is a JSON object because the loop in
1060 # __process_reply() already checked.
1061 reply = ops[insert.op_index]
1062 json_uuid = reply.get("uuid")
1063 if not Transaction.__check_json_type(json_uuid, (tuple, list),
1064 '"insert" reply "uuid"'):
1068 uuid_ = ovs.ovsuuid.from_json(json_uuid)
1071 vlog.warn('"insert" reply "uuid" is not a JSON UUID')