1 # Copyright (c) 2009, 2010, 2011, 2012 Nicira Networks
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at:
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
20 from ovs.db import error
25 vlog = ovs.vlog.Vlog("idl")
27 __pychecker__ = 'no-classattr no-objattrs'
31 """Open vSwitch Database Interface Definition Language (OVSDB IDL).
33 The OVSDB IDL maintains an in-memory replica of a database. It issues RPC
34 requests to an OVSDB database server and parses the responses, converting
35 raw JSON into data structures that are easier for clients to digest.
37 The IDL also assists with issuing database transactions. The client
38 creates a transaction, manipulates the IDL data structures, and commits or
39 aborts the transaction. The IDL then composes and issues the necessary
40 JSON-RPC requests and reports to the client whether the transaction
41 completed successfully.
43 The client is allowed to access the following attributes directly, in a
46 - 'tables': This is the 'tables' map in the ovs.db.schema.DbSchema provided
47 to the Idl constructor. Each ovs.db.schema.TableSchema in the map is
48 annotated with a new attribute 'rows', which is a dict from a uuid.UUID
51 The client may directly read and write the Row objects referenced by the
52 'rows' map values. Refer to Row for more details.
54 - 'change_seqno': A number that represents the IDL's state. When the IDL
55 is updated (by Idl.run()), its value changes.
57 - 'lock_name': The name of the lock configured with Idl.set_lock(), or None
58 if no lock is configured.
60 - 'has_lock': True, if the IDL is configured to obtain a lock and owns that
61 lock, and False otherwise.
63 Locking and unlocking happens asynchronously from the database client's
64 point of view, so the information is only useful for optimization
65 (e.g. if the client doesn't have the lock then there's no point in trying
66 to write to the database).
68 - 'is_lock_contended': True, if the IDL is configured to obtain a lock but
69 the database server has indicated that some other client already owns the
70 requested lock, and False otherwise.
72 - 'txn': The ovs.db.idl.Transaction object for the database transaction
73 currently being constructed, if there is one, or None otherwise.
76 def __init__(self, remote, schema):
77 """Creates and returns a connection to the database named 'db_name' on
78 'remote', which should be in a form acceptable to
79 ovs.jsonrpc.session.open(). The connection will maintain an in-memory
80 replica of the remote database.
82 'schema' should be the schema for the remote database. The caller may
83 have cut it down by removing tables or columns that are not of
84 interest. The IDL will only replicate the tables and columns that
85 remain. The caller may also add a attribute named 'alert' to selected
86 remaining columns, setting its value to False; if so, then changes to
87 those columns will not be considered changes to the database for the
88 purpose of the return value of Idl.run() and Idl.change_seqno. This is
89 useful for columns that the IDL's client will write but not read.
91 As a convenience to users, 'schema' may also be an instance of the
94 The IDL uses and modifies 'schema' directly."""
96 if isinstance(schema, SchemaHelper):
97 schema = schema.get_idl_schema()
99 self.tables = schema.tables
101 self._session = ovs.jsonrpc.Session.open(remote)
102 self._monitor_request_id = None
103 self._last_seqno = None
104 self.change_seqno = 0
107 self.lock_name = None # Name of lock we need, None if none.
108 self.has_lock = False # Has db server said we have the lock?
109 self.is_lock_contended = False # Has db server said we can't get lock?
110 self._lock_request_id = None # JSON-RPC ID of in-flight lock request.
112 # Transaction support.
114 self._outstanding_txns = {}
116 for table in schema.tables.itervalues():
117 for column in table.columns.itervalues():
118 if not hasattr(column, 'alert'):
120 table.need_table = False
125 """Closes the connection to the database. The IDL will no longer
127 self._session.close()
130 """Processes a batch of messages from the database server. Returns
131 True if the database as seen through the IDL changed, False if it did
132 not change. The initial fetch of the entire contents of the remote
133 database is considered to be one kind of change. If the IDL has been
134 configured to acquire a database lock (with Idl.set_lock()), then
135 successfully acquiring the lock is also considered to be a change.
137 This function can return occasional false positives, that is, report
138 that the database changed even though it didn't. This happens if the
139 connection to the database drops and reconnects, which causes the
140 database contents to be reloaded even if they didn't change. (It could
141 also happen if the database server sends out a "change" that reflects
142 what we already thought was in the database, but the database server is
143 not supposed to do that.)
145 As an alternative to checking the return value, the client may check
146 for changes in self.change_seqno."""
148 initial_change_seqno = self.change_seqno
153 if not self._session.is_connected():
156 seqno = self._session.get_seqno()
157 if seqno != self._last_seqno:
158 self._last_seqno = seqno
159 self.__txn_abort_all()
160 self.__send_monitor_request()
162 self.__send_lock_request()
165 msg = self._session.recv()
168 if (msg.type == ovs.jsonrpc.Message.T_NOTIFY
169 and msg.method == "update"
170 and len(msg.params) == 2
171 and msg.params[0] == None):
172 # Database contents changed.
173 self.__parse_update(msg.params[1])
174 elif (msg.type == ovs.jsonrpc.Message.T_REPLY
175 and self._monitor_request_id is not None
176 and self._monitor_request_id == msg.id):
177 # Reply to our "monitor" request.
179 self.change_seqno += 1
180 self._monitor_request_id = None
182 self.__parse_update(msg.result)
183 except error.Error, e:
184 vlog.err("%s: parse error in received schema: %s"
185 % (self._session.get_name(), e))
187 elif (msg.type == ovs.jsonrpc.Message.T_REPLY
188 and self._lock_request_id is not None
189 and self._lock_request_id == msg.id):
190 # Reply to our "lock" request.
191 self.__parse_lock_reply(msg.result)
192 elif (msg.type == ovs.jsonrpc.Message.T_NOTIFY
193 and msg.method == "locked"):
195 self.__parse_lock_notify(msg.params, True)
196 elif (msg.type == ovs.jsonrpc.Message.T_NOTIFY
197 and msg.method == "stolen"):
198 # Someone else stole our lock.
199 self.__parse_lock_notify(msg.params, False)
200 elif msg.type == ovs.jsonrpc.Message.T_NOTIFY and msg.id == "echo":
201 # Reply to our echo request. Ignore it.
203 elif (msg.type in (ovs.jsonrpc.Message.T_ERROR,
204 ovs.jsonrpc.Message.T_REPLY)
205 and self.__txn_process_reply(msg)):
206 # __txn_process_reply() did everything needed.
209 # This can happen if a transaction is destroyed before we
210 # receive the reply, so keep the log level low.
211 vlog.dbg("%s: received unexpected %s message"
212 % (self._session.get_name(),
213 ovs.jsonrpc.Message.type_to_string(msg.type)))
215 return initial_change_seqno != self.change_seqno
217 def wait(self, poller):
218 """Arranges for poller.block() to wake up when self.run() has something
219 to do or when activity occurs on a transaction on 'self'."""
220 self._session.wait(poller)
221 self._session.recv_wait(poller)
223 def has_ever_connected(self):
224 """Returns True, if the IDL successfully connected to the remote
225 database and retrieved its contents (even if the connection
226 subsequently dropped and is in the process of reconnecting). If so,
227 then the IDL contains an atomic snapshot of the database's contents
228 (but it might be arbitrarily old if the connection dropped).
230 Returns False if the IDL has never connected or retrieved the
231 database's contents. If so, the IDL is empty."""
232 return self.change_seqno != 0
234 def force_reconnect(self):
235 """Forces the IDL to drop its connection to the database and reconnect.
236 In the meantime, the contents of the IDL will not change."""
237 self._session.force_reconnect()
239 def set_lock(self, lock_name):
240 """If 'lock_name' is not None, configures the IDL to obtain the named
241 lock from the database server and to avoid modifying the database when
242 the lock cannot be acquired (that is, when another client has the same
245 If 'lock_name' is None, drops the locking requirement and releases the
248 assert not self._outstanding_txns
250 if self.lock_name and (not lock_name or lock_name != self.lock_name):
251 # Release previous lock.
252 self.__send_unlock_request()
253 self.lock_name = None
254 self.is_lock_contended = False
256 if lock_name and not self.lock_name:
258 self.lock_name = lock_name
259 self.__send_lock_request()
264 for table in self.tables.itervalues():
270 self.change_seqno += 1
272 def __update_has_lock(self, new_has_lock):
273 if new_has_lock and not self.has_lock:
274 if self._monitor_request_id is None:
275 self.change_seqno += 1
277 # We're waiting for a monitor reply, so don't signal that the
278 # database changed. The monitor reply will increment
279 # change_seqno anyhow.
281 self.is_lock_contended = False
282 self.has_lock = new_has_lock
284 def __do_send_lock_request(self, method):
285 self.__update_has_lock(False)
286 self._lock_request_id = None
287 if self._session.is_connected():
288 msg = ovs.jsonrpc.Message.create_request(method, [self.lock_name])
290 self._session.send(msg)
295 def __send_lock_request(self):
296 self._lock_request_id = self.__do_send_lock_request("lock")
298 def __send_unlock_request(self):
299 self.__do_send_lock_request("unlock")
301 def __parse_lock_reply(self, result):
302 self._lock_request_id = None
303 got_lock = type(result) == dict and result.get("locked") is True
304 self.__update_has_lock(got_lock)
306 self.is_lock_contended = True
308 def __parse_lock_notify(self, params, new_has_lock):
309 if (self.lock_name is not None
310 and type(params) in (list, tuple)
312 and params[0] == self.lock_name):
313 self.__update_has_lock(self, new_has_lock)
315 self.is_lock_contended = True
317 def __send_monitor_request(self):
318 monitor_requests = {}
319 for table in self.tables.itervalues():
320 monitor_requests[table.name] = {"columns": table.columns.keys()}
321 msg = ovs.jsonrpc.Message.create_request(
322 "monitor", [self._db.name, None, monitor_requests])
323 self._monitor_request_id = msg.id
324 self._session.send(msg)
326 def __parse_update(self, update):
328 self.__do_parse_update(update)
329 except error.Error, e:
330 vlog.err("%s: error parsing update: %s"
331 % (self._session.get_name(), e))
333 def __do_parse_update(self, table_updates):
334 if type(table_updates) != dict:
335 raise error.Error("<table-updates> is not an object",
338 for table_name, table_update in table_updates.iteritems():
339 table = self.tables.get(table_name)
341 raise error.Error('<table-updates> includes unknown '
342 'table "%s"' % table_name)
344 if type(table_update) != dict:
345 raise error.Error('<table-update> for table "%s" is not '
346 'an object' % table_name, table_update)
348 for uuid_string, row_update in table_update.iteritems():
349 if not ovs.ovsuuid.is_valid_string(uuid_string):
350 raise error.Error('<table-update> for table "%s" '
351 'contains bad UUID "%s" as member '
352 'name' % (table_name, uuid_string),
354 uuid = ovs.ovsuuid.from_string(uuid_string)
356 if type(row_update) != dict:
357 raise error.Error('<table-update> for table "%s" '
358 'contains <row-update> for %s that '
360 % (table_name, uuid_string))
362 parser = ovs.db.parser.Parser(row_update, "row-update")
363 old = parser.get_optional("old", [dict])
364 new = parser.get_optional("new", [dict])
367 if not old and not new:
368 raise error.Error('<row-update> missing "old" and '
369 '"new" members', row_update)
371 if self.__process_update(table, uuid, old, new):
372 self.change_seqno += 1
374 def __process_update(self, table, uuid, old, new):
375 """Returns True if a column changed, False otherwise."""
376 row = table.rows.get(uuid)
385 vlog.warn("cannot delete missing row %s from table %s"
386 % (uuid, table.name))
390 row = self.__create_row(table, uuid)
394 vlog.warn("cannot add existing row %s to table %s"
395 % (uuid, table.name))
396 if self.__row_update(table, row, new):
400 row = self.__create_row(table, uuid)
403 vlog.warn("cannot modify missing row %s in table %s"
404 % (uuid, table.name))
405 if self.__row_update(table, row, new):
409 def __row_update(self, table, row, row_json):
411 for column_name, datum_json in row_json.iteritems():
412 column = table.columns.get(column_name)
415 vlog.warn("unknown column %s updating table %s"
416 % (column_name, table.name))
420 datum = ovs.db.data.Datum.from_json(column.type, datum_json)
421 except error.Error, e:
423 vlog.warn("error parsing column %s in table %s: %s"
424 % (column_name, table.name, e))
427 if datum != row._data[column_name]:
428 row._data[column_name] = datum
432 # Didn't really change but the OVSDB monitor protocol always
433 # includes every value in a row.
437 def __create_row(self, table, uuid):
439 for column in table.columns.itervalues():
440 data[column.name] = ovs.db.data.Datum.default(column.type)
441 row = table.rows[uuid] = Row(self, table, uuid, data)
445 self._session.force_reconnect()
447 def __txn_abort_all(self):
448 while self._outstanding_txns:
449 txn = self._outstanding_txns.popitem()[1]
450 txn._status = Transaction.AGAIN_WAIT
452 def __txn_process_reply(self, msg):
453 txn = self._outstanding_txns.pop(msg.id, None)
455 txn._process_reply(msg)
458 def _uuid_to_row(atom, base):
460 return base.ref_table.rows.get(atom)
465 def _row_to_uuid(value):
466 if type(value) == Row:
473 """A row within an IDL.
475 The client may access the following attributes directly:
477 - 'uuid': a uuid.UUID object whose value is the row's database UUID.
479 - An attribute for each column in the Row's table, named for the column,
480 whose values are as returned by Datum.to_python() for the column's type.
482 If some error occurs (e.g. the database server's idea of the column is
483 different from the IDL's idea), then the attribute values is the
484 "default" value return by Datum.default() for the column's type. (It is
485 important to know this because the default value may violate constraints
486 for the column's type, e.g. the default integer value is 0 even if column
487 contraints require the column's value to be positive.)
489 When a transaction is active, column attributes may also be assigned new
490 values. Committing the transaction will then cause the new value to be
491 stored into the database.
493 *NOTE*: In the current implementation, the value of a column is a *copy*
494 of the value in the database. This means that modifying its value
495 directly will have no useful effect. For example, the following:
496 row.mycolumn["a"] = "b" # don't do this
497 will not change anything in the database, even after commit. To modify
498 the column, instead assign the modified column value back to the column:
503 def __init__(self, idl, table, uuid, data):
504 # All of the explicit references to self.__dict__ below are required
505 # to set real attributes with invoking self.__getattr__().
506 self.__dict__["uuid"] = uuid
508 self.__dict__["_idl"] = idl
509 self.__dict__["_table"] = table
511 # _data is the committed data. It takes the following values:
513 # - A dictionary that maps every column name to a Datum, if the row
514 # exists in the committed form of the database.
516 # - None, if this row is newly inserted within the active transaction
517 # and thus has no committed form.
518 self.__dict__["_data"] = data
520 # _changes describes changes to this row within the active transaction.
521 # It takes the following values:
523 # - {}, the empty dictionary, if no transaction is active or if the
524 # row has yet not been changed within this transaction.
526 # - A dictionary that maps a column name to its new Datum, if an
527 # active transaction changes those columns' values.
529 # - A dictionary that maps every column name to a Datum, if the row
530 # is newly inserted within the active transaction.
532 # - None, if this transaction deletes this row.
533 self.__dict__["_changes"] = {}
535 # A dictionary whose keys are the names of columns that must be
536 # verified as prerequisites when the transaction commits. The values
537 # in the dictionary are all None.
538 self.__dict__["_prereqs"] = {}
540 def __getattr__(self, column_name):
541 assert self._changes is not None
543 datum = self._changes.get(column_name)
545 datum = self._data[column_name]
547 return datum.to_python(_uuid_to_row)
549 def __setattr__(self, column_name, value):
550 assert self._changes is not None
553 column = self._table.columns[column_name]
555 datum = ovs.db.data.Datum.from_python(column.type, value,
557 except error.Error, e:
559 vlog.err("attempting to write bad value to column %s (%s)"
562 self._idl.txn._write(self, column, datum)
564 def verify(self, column_name):
565 """Causes the original contents of column 'column_name' in this row to
566 be verified as a prerequisite to completing the transaction. That is,
567 if 'column_name' changed in this row (or if this row was deleted)
568 between the time that the IDL originally read its contents and the time
569 that the transaction commits, then the transaction aborts and
570 Transaction.commit() returns Transaction.AGAIN_WAIT or
571 Transaction.AGAIN_NOW (depending on whether the database change has
572 already been received).
574 The intention is that, to ensure that no transaction commits based on
575 dirty reads, an application should call Row.verify() on each data item
576 read as part of a read-modify-write operation.
578 In some cases Row.verify() reduces to a no-op, because the current
579 value of the column is already known:
581 - If this row is a row created by the current transaction (returned
582 by Transaction.insert()).
584 - If the column has already been modified within the current
587 Because of the latter property, always call Row.verify() *before*
588 modifying the column, for a given read-modify-write.
590 A transaction must be in progress."""
592 assert self._changes is not None
593 if not self._data or column_name in self._changes:
596 self._prereqs[column_name] = None
599 """Deletes this row from its table.
601 A transaction must be in progress."""
603 assert self._changes is not None
604 if self._data is None:
605 del self._idl.txn._txn_rows[self.uuid]
606 self.__dict__["_changes"] = None
607 del self._table.rows[self.uuid]
610 def _uuid_name_from_uuid(uuid):
611 return "row%s" % str(uuid).replace("-", "_")
614 def _where_uuid_equals(uuid):
615 return [["_uuid", "==", ["uuid", str(uuid)]]]
618 class _InsertedRow(object):
619 def __init__(self, op_index):
620 self.op_index = op_index
624 class Transaction(object):
625 # Status values that Transaction.commit() can return.
626 UNCOMMITTED = "uncommitted" # Not yet committed or aborted.
627 UNCHANGED = "unchanged" # Transaction didn't include any changes.
628 INCOMPLETE = "incomplete" # Commit in progress, please wait.
629 ABORTED = "aborted" # ovsdb_idl_txn_abort() called.
630 SUCCESS = "success" # Commit successful.
631 AGAIN_WAIT = "wait then try again"
632 # Commit failed because a "verify" operation
633 # reported an inconsistency, due to a network
634 # problem, or other transient failure. Wait
635 # for a change, then try again.
636 AGAIN_NOW = "try again now" # Same as AGAIN_WAIT but try again right away.
637 NOT_LOCKED = "not locked" # Server hasn't given us the lock yet.
638 ERROR = "error" # Commit failed due to a hard error.
641 def status_to_string(status):
642 """Converts one of the status values that Transaction.commit() can
643 return into a human-readable string.
645 (The status values are in fact such strings already, so
646 there's nothing to do.)"""
649 def __init__(self, idl):
650 """Starts a new transaction on 'idl' (an instance of ovs.db.idl.Idl).
651 A given Idl may only have a single active transaction at a time.
653 A Transaction may modify the contents of a database by assigning new
654 values to columns (attributes of Row), deleting rows (with
655 Row.delete()), or inserting rows (with Transaction.insert()). It may
656 also check that columns in the database have not changed with
659 When a transaction is complete (which must be before the next call to
660 Idl.run()), call Transaction.commit() or Transaction.abort()."""
661 assert idl.txn is None
664 self._request_id = None
668 self._status = Transaction.UNCOMMITTED
671 self._commit_seqno = self.idl.change_seqno
673 self._inc_table = None
674 self._inc_column = None
675 self._inc_where = None
677 self._inserted_rows = {} # Map from UUID to _InsertedRow
679 def add_comment(self, comment):
680 """Appens 'comment' to the comments that will be passed to the OVSDB
681 server when this transaction is committed. (The comment will be
682 committed to the OVSDB log, which "ovsdb-tool show-log" can print in a
683 relatively human-readable form.)"""
684 self._comments.append(comment)
686 def increment(self, table, column, where):
687 assert not self._inc_table
688 self._inc_table = table
689 self._inc_column = column
690 self._inc_where = where
692 def wait(self, poller):
693 if self._status not in (Transaction.UNCOMMITTED,
694 Transaction.INCOMPLETE):
695 poller.immediate_wake()
697 def _substitute_uuids(self, json):
698 if type(json) in (list, tuple):
700 and json[0] == 'uuid'
701 and ovs.ovsuuid.is_valid_string(json[1])):
702 uuid = ovs.ovsuuid.from_string(json[1])
703 row = self._txn_rows.get(uuid, None)
704 if row and row._data is None:
705 return ["named-uuid", _uuid_name_from_uuid(uuid)]
708 def __disassemble(self):
711 for row in self._txn_rows.itervalues():
712 if row._changes is None:
713 row._table.rows[row.uuid] = row
714 elif row._data is None:
715 del row._table.rows[row.uuid]
716 row.__dict__["_changes"] = {}
717 row.__dict__["_prereqs"] = {}
721 """Attempts to commit this transaction and returns the status of the
722 commit operation, one of the constants declared as class attributes.
723 If the return value is Transaction.INCOMPLETE, then the transaction is
724 not yet complete and the caller should try calling again later, after
725 calling Idl.run() to run the Idl.
727 Committing a transaction rolls back all of the changes that it made to
728 the Idl's copy of the database. If the transaction commits
729 successfully, then the database server will send an update and, thus,
730 the Idl will be updated with the committed changes."""
731 # The status can only change if we're the active transaction.
732 # (Otherwise, our status will change only in Idl.run().)
733 if self != self.idl.txn:
736 # If we need a lock but don't have it, give up quickly.
737 if self.idl.lock_name and not self.idl.has_lock():
738 self._status = Transaction.NOT_LOCKED
742 operations = [self.idl._db.name]
744 # Assert that we have the required lock (avoiding a race).
745 if self.idl.lock_name:
746 operations.append({"op": "assert",
747 "lock": self.idl.lock_name})
749 # Add prerequisites and declarations of new rows.
750 for row in self._txn_rows.itervalues():
754 for column_name in row._prereqs:
755 columns.append(column_name)
756 rows[column_name] = row._data[column_name].to_json()
757 operations.append({"op": "wait",
758 "table": row._table.name,
760 "where": _where_uuid_equals(row.uuid),
767 for row in self._txn_rows.itervalues():
768 if row._changes is None:
769 if row._table.is_root:
770 operations.append({"op": "delete",
771 "table": row._table.name,
772 "where": _where_uuid_equals(row.uuid)})
775 # Let ovsdb-server decide whether to really delete it.
778 op = {"table": row._table.name}
779 if row._data is None:
781 op["uuid-name"] = _uuid_name_from_uuid(row.uuid)
784 op_index = len(operations) - 1
785 self._inserted_rows[row.uuid] = _InsertedRow(op_index)
788 op["where"] = _where_uuid_equals(row.uuid)
793 for column_name, datum in row._changes.iteritems():
794 if row._data is not None or not datum.is_default():
795 row_json[column_name] = (
796 self._substitute_uuids(datum.to_json()))
798 # If anything really changed, consider it an update.
799 # We can't suppress not-really-changed values earlier
800 # or transactions would become nonatomic (see the big
801 # comment inside Transaction._write()).
802 if (not any_updates and row._data is not None and
803 row._data[column_name] != datum):
806 if row._data is None or row_json:
807 operations.append(op)
810 if self._inc_table and any_updates:
811 self._inc_index = len(operations) - 1
813 operations.append({"op": "mutate",
814 "table": self._inc_table,
815 "where": self._substitute_uuids(
817 "mutations": [[self._inc_column, "+=", 1]]})
818 operations.append({"op": "select",
819 "table": self._inc_table,
820 "where": self._substitute_uuids(
822 "columns": [self._inc_column]})
826 operations.append({"op": "comment",
827 "comment": "\n".join(self._comments)})
831 operations.append({"op": "abort"})
834 self._status = Transaction.UNCHANGED
836 msg = ovs.jsonrpc.Message.create_request("transact", operations)
837 self._request_id = msg.id
838 if not self.idl._session.send(msg):
839 self.idl._outstanding_txns[self._request_id] = self
840 self._status = Transaction.INCOMPLETE
842 self._status = Transaction.AGAIN_WAIT
847 def commit_block(self):
849 status = self.commit()
850 if status != Transaction.INCOMPLETE:
855 poller = ovs.poller.Poller()
856 self.idl.wait(poller)
860 def get_increment_new_value(self):
861 assert self._status == Transaction.SUCCESS
862 return self._inc_new_value
865 """Aborts this transaction. If Transaction.commit() has already been
866 called then the transaction might get committed anyhow."""
868 if self._status in (Transaction.UNCOMMITTED,
869 Transaction.INCOMPLETE):
870 self._status = Transaction.ABORTED
873 """Returns a string representing this transaction's current status,
874 suitable for use in log messages."""
875 if self._status != Transaction.ERROR:
876 return Transaction.status_to_string(self._status)
880 return "no error details available"
882 def __set_error_json(self, json):
883 if self._error is None:
884 self._error = ovs.json.to_string(json)
886 def get_insert_uuid(self, uuid):
887 """Finds and returns the permanent UUID that the database assigned to a
888 newly inserted row, given the UUID that Transaction.insert() assigned
891 Returns None if 'uuid' is not a UUID assigned by Transaction.insert()
892 or if it was assigned by that function and then deleted by Row.delete()
893 within the same transaction. (Rows that are inserted and then deleted
894 within a single transaction are never sent to the database server, so
895 it never assigns them a permanent UUID.)
897 This transaction must have completed successfully."""
898 assert self._status in (Transaction.SUCCESS,
899 Transaction.UNCHANGED)
900 inserted_row = self._inserted_rows.get(uuid)
902 return inserted_row.real
905 def _write(self, row, column, datum):
906 assert row._changes is not None
910 # If this is a write-only column and the datum being written is the
911 # same as the one already there, just skip the update entirely. This
912 # is worth optimizing because we have a lot of columns that get
913 # periodically refreshed into the database but don't actually change
916 # We don't do this for read/write columns because that would break
917 # atomicity of transactions--some other client might have written a
918 # different value in that column since we read it. (But if a whole
919 # transaction only does writes of existing values, without making any
920 # real changes, we will drop the whole transaction later in
921 # ovsdb_idl_txn_commit().)
922 if not column.alert and row._data.get(column.name) == datum:
923 new_value = row._changes.get(column.name)
924 if new_value is None or new_value == datum:
927 txn._txn_rows[row.uuid] = row
928 row._changes[column.name] = datum.copy()
930 def insert(self, table, new_uuid=None):
931 """Inserts and returns a new row in 'table', which must be one of the
932 ovs.db.schema.TableSchema objects in the Idl's 'tables' dict.
934 The new row is assigned a provisional UUID. If 'uuid' is None then one
935 is randomly generated; otherwise 'uuid' should specify a randomly
936 generated uuid.UUID not otherwise in use. ovsdb-server will assign a
937 different UUID when 'txn' is committed, but the IDL will replace any
938 uses of the provisional UUID in the data to be to be committed by the
939 UUID assigned by ovsdb-server."""
940 assert self._status == Transaction.UNCOMMITTED
942 new_uuid = uuid.uuid4()
943 row = Row(self.idl, table, new_uuid, None)
944 table.rows[row.uuid] = row
945 self._txn_rows[row.uuid] = row
948 def _process_reply(self, msg):
949 if msg.type == ovs.jsonrpc.Message.T_ERROR:
950 self._status = Transaction.ERROR
951 elif type(msg.result) not in (list, tuple):
953 vlog.warn('reply to "transact" is not JSON array')
962 # This isn't an error in itself but indicates that some
963 # prior operation failed, so make sure that we know about
966 elif type(op) == dict:
967 error = op.get("error")
968 if error is not None:
969 if error == "timed out":
971 elif error == "not owner":
973 elif error == "aborted":
977 self.__set_error_json(op)
980 self.__set_error_json(op)
982 vlog.warn("operation reply is not JSON null or object")
984 if not soft_errors and not hard_errors and not lock_errors:
985 if self._inc_table and not self.__process_inc_reply(ops):
988 for insert in self._inserted_rows.itervalues():
989 if not self.__process_insert_reply(insert, ops):
993 self._status = Transaction.ERROR
995 self._status = Transaction.NOT_LOCKED
997 if self._commit_seqno == self.idl.change_seqno:
998 self._status = Transaction.AGAIN_WAIT
1000 self._status = Transaction.AGAIN_NOW
1002 self._status = Transaction.SUCCESS
1005 def __check_json_type(json, types, name):
1008 vlog.warn("%s is missing" % name)
1010 elif type(json) not in types:
1012 vlog.warn("%s has unexpected type %s" % (name, type(json)))
1017 def __process_inc_reply(self, ops):
1018 if self._inc_index + 2 > len(ops):
1020 vlog.warn("reply does not contain enough operations for "
1021 "increment (has %d, needs %d)" %
1022 (len(ops), self._inc_index + 2))
1024 # We know that this is a JSON object because the loop in
1025 # __process_reply() already checked.
1026 mutate = ops[self._inc_index]
1027 count = mutate.get("count")
1028 if not Transaction.__check_json_type(count, (int, long),
1029 '"mutate" reply "count"'):
1033 vlog.warn('"mutate" reply "count" is %d instead of 1' % count)
1036 select = ops[self._inc_index + 1]
1037 rows = select.get("rows")
1038 if not Transaction.__check_json_type(rows, (list, tuple),
1039 '"select" reply "rows"'):
1043 vlog.warn('"select" reply "rows" has %d elements '
1044 'instead of 1' % len(rows))
1047 if not Transaction.__check_json_type(row, (dict,),
1048 '"select" reply row'):
1050 column = row.get(self._inc_column)
1051 if not Transaction.__check_json_type(column, (int, long),
1052 '"select" reply inc column'):
1054 self._inc_new_value = column
1057 def __process_insert_reply(self, insert, ops):
1058 if insert.op_index >= len(ops):
1060 vlog.warn("reply does not contain enough operations "
1061 "for insert (has %d, needs %d)"
1062 % (len(ops), insert.op_index))
1065 # We know that this is a JSON object because the loop in
1066 # __process_reply() already checked.
1067 reply = ops[insert.op_index]
1068 json_uuid = reply.get("uuid")
1069 if not Transaction.__check_json_type(json_uuid, (tuple, list),
1070 '"insert" reply "uuid"'):
1074 uuid_ = ovs.ovsuuid.from_json(json_uuid)
1077 vlog.warn('"insert" reply "uuid" is not a JSON UUID')
1084 class SchemaHelper(object):
1085 """IDL Schema helper.
1087 This class encapsulates the logic required to generate schemas suitable
1088 for creating 'ovs.db.idl.Idl' objects. Clients should register columns
1089 they are interested in using register_columns(). When finished, the
1090 get_idl_schema() function may be called.
1092 The location on disk of the schema used may be found in the
1093 'schema_location' variable."""
1095 def __init__(self, location=None):
1096 """Creates a new Schema object."""
1098 if location is None:
1099 location = "%s/vswitch.ovsschema" % ovs.dirs.PKGDATADIR
1101 self.schema_location = location
1104 def register_columns(self, table, columns):
1105 """Registers interest in the given 'columns' of 'table'. Future calls
1106 to get_idl_schema() will include 'table':column for each column in
1107 'columns'. This function automatically avoids adding duplicate entries
1110 'table' must be a string.
1111 'columns' must be a list of strings.
1114 assert type(table) is str
1115 assert type(columns) is list
1117 columns = set(columns) | self._tables.get(table, set())
1118 self._tables[table] = columns
1120 def get_idl_schema(self):
1121 """Gets a schema appropriate for the creation of an 'ovs.db.id.IDL'
1122 object based on columns registered using the register_columns()
1125 schema = ovs.db.schema.DbSchema.from_json(
1126 ovs.json.from_file(self.schema_location))
1128 for table, columns in self._tables.iteritems():
1129 schema_tables[table] = (
1130 self._keep_table_columns(schema, table, columns))
1132 schema.tables = schema_tables
1135 def _keep_table_columns(self, schema, table_name, columns):
1136 assert table_name in schema.tables
1137 table = schema.tables[table_name]
1140 for column_name in columns:
1141 assert type(column_name) is str
1142 assert column_name in table.columns
1144 new_columns[column_name] = table.columns[column_name]
1146 table.columns = new_columns