1 # Copyright (c) 2009, 2010, 2011, 2012 Nicira Networks
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at:
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
20 from ovs.db import error
25 vlog = ovs.vlog.Vlog("idl")
27 __pychecker__ = 'no-classattr no-objattrs'
31 """Open vSwitch Database Interface Definition Language (OVSDB IDL).
33 The OVSDB IDL maintains an in-memory replica of a database. It issues RPC
34 requests to an OVSDB database server and parses the responses, converting
35 raw JSON into data structures that are easier for clients to digest.
37 The IDL also assists with issuing database transactions. The client
38 creates a transaction, manipulates the IDL data structures, and commits or
39 aborts the transaction. The IDL then composes and issues the necessary
40 JSON-RPC requests and reports to the client whether the transaction
41 completed successfully.
43 The client is allowed to access the following attributes directly, in a
46 - 'tables': This is the 'tables' map in the ovs.db.schema.DbSchema provided
47 to the Idl constructor. Each ovs.db.schema.TableSchema in the map is
48 annotated with a new attribute 'rows', which is a dict from a uuid.UUID
51 The client may directly read and write the Row objects referenced by the
52 'rows' map values. Refer to Row for more details.
54 - 'change_seqno': A number that represents the IDL's state. When the IDL
55 is updated (by Idl.run()), its value changes.
57 - 'lock_name': The name of the lock configured with Idl.set_lock(), or None
58 if no lock is configured.
60 - 'has_lock': True, if the IDL is configured to obtain a lock and owns that
61 lock, and False otherwise.
63 Locking and unlocking happens asynchronously from the database client's
64 point of view, so the information is only useful for optimization
65 (e.g. if the client doesn't have the lock then there's no point in trying
66 to write to the database).
68 - 'is_lock_contended': True, if the IDL is configured to obtain a lock but
69 the database server has indicated that some other client already owns the
70 requested lock, and False otherwise.
72 - 'txn': The ovs.db.idl.Transaction object for the database transaction
73 currently being constructed, if there is one, or None otherwise.
76 def __init__(self, remote, schema):
77 """Creates and returns a connection to the database named 'db_name' on
78 'remote', which should be in a form acceptable to
79 ovs.jsonrpc.session.open(). The connection will maintain an in-memory
80 replica of the remote database.
82 'schema' should be the schema for the remote database. The caller may
83 have cut it down by removing tables or columns that are not of
84 interest. The IDL will only replicate the tables and columns that
85 remain. The caller may also add a attribute named 'alert' to selected
86 remaining columns, setting its value to False; if so, then changes to
87 those columns will not be considered changes to the database for the
88 purpose of the return value of Idl.run() and Idl.change_seqno. This is
89 useful for columns that the IDL's client will write but not read.
91 As a convenience to users, 'schema' may also be an instance of the
94 The IDL uses and modifies 'schema' directly."""
96 assert isinstance(schema, SchemaHelper)
97 schema = schema.get_idl_schema()
99 self.tables = schema.tables
101 self._session = ovs.jsonrpc.Session.open(remote)
102 self._monitor_request_id = None
103 self._last_seqno = None
104 self.change_seqno = 0
107 self.lock_name = None # Name of lock we need, None if none.
108 self.has_lock = False # Has db server said we have the lock?
109 self.is_lock_contended = False # Has db server said we can't get lock?
110 self._lock_request_id = None # JSON-RPC ID of in-flight lock request.
112 # Transaction support.
114 self._outstanding_txns = {}
116 for table in schema.tables.itervalues():
117 for column in table.columns.itervalues():
118 if not hasattr(column, 'alert'):
120 table.need_table = False
125 """Closes the connection to the database. The IDL will no longer
127 self._session.close()
130 """Processes a batch of messages from the database server. Returns
131 True if the database as seen through the IDL changed, False if it did
132 not change. The initial fetch of the entire contents of the remote
133 database is considered to be one kind of change. If the IDL has been
134 configured to acquire a database lock (with Idl.set_lock()), then
135 successfully acquiring the lock is also considered to be a change.
137 This function can return occasional false positives, that is, report
138 that the database changed even though it didn't. This happens if the
139 connection to the database drops and reconnects, which causes the
140 database contents to be reloaded even if they didn't change. (It could
141 also happen if the database server sends out a "change" that reflects
142 what we already thought was in the database, but the database server is
143 not supposed to do that.)
145 As an alternative to checking the return value, the client may check
146 for changes in self.change_seqno."""
148 initial_change_seqno = self.change_seqno
153 if not self._session.is_connected():
156 seqno = self._session.get_seqno()
157 if seqno != self._last_seqno:
158 self._last_seqno = seqno
159 self.__txn_abort_all()
160 self.__send_monitor_request()
162 self.__send_lock_request()
165 msg = self._session.recv()
168 if (msg.type == ovs.jsonrpc.Message.T_NOTIFY
169 and msg.method == "update"
170 and len(msg.params) == 2
171 and msg.params[0] == None):
172 # Database contents changed.
173 self.__parse_update(msg.params[1])
174 elif (msg.type == ovs.jsonrpc.Message.T_REPLY
175 and self._monitor_request_id is not None
176 and self._monitor_request_id == msg.id):
177 # Reply to our "monitor" request.
179 self.change_seqno += 1
180 self._monitor_request_id = None
182 self.__parse_update(msg.result)
183 except error.Error, e:
184 vlog.err("%s: parse error in received schema: %s"
185 % (self._session.get_name(), e))
187 elif (msg.type == ovs.jsonrpc.Message.T_REPLY
188 and self._lock_request_id is not None
189 and self._lock_request_id == msg.id):
190 # Reply to our "lock" request.
191 self.__parse_lock_reply(msg.result)
192 elif (msg.type == ovs.jsonrpc.Message.T_NOTIFY
193 and msg.method == "locked"):
195 self.__parse_lock_notify(msg.params, True)
196 elif (msg.type == ovs.jsonrpc.Message.T_NOTIFY
197 and msg.method == "stolen"):
198 # Someone else stole our lock.
199 self.__parse_lock_notify(msg.params, False)
200 elif msg.type == ovs.jsonrpc.Message.T_NOTIFY and msg.id == "echo":
201 # Reply to our echo request. Ignore it.
203 elif (msg.type in (ovs.jsonrpc.Message.T_ERROR,
204 ovs.jsonrpc.Message.T_REPLY)
205 and self.__txn_process_reply(msg)):
206 # __txn_process_reply() did everything needed.
209 # This can happen if a transaction is destroyed before we
210 # receive the reply, so keep the log level low.
211 vlog.dbg("%s: received unexpected %s message"
212 % (self._session.get_name(),
213 ovs.jsonrpc.Message.type_to_string(msg.type)))
215 return initial_change_seqno != self.change_seqno
217 def wait(self, poller):
218 """Arranges for poller.block() to wake up when self.run() has something
219 to do or when activity occurs on a transaction on 'self'."""
220 self._session.wait(poller)
221 self._session.recv_wait(poller)
223 def has_ever_connected(self):
224 """Returns True, if the IDL successfully connected to the remote
225 database and retrieved its contents (even if the connection
226 subsequently dropped and is in the process of reconnecting). If so,
227 then the IDL contains an atomic snapshot of the database's contents
228 (but it might be arbitrarily old if the connection dropped).
230 Returns False if the IDL has never connected or retrieved the
231 database's contents. If so, the IDL is empty."""
232 return self.change_seqno != 0
234 def force_reconnect(self):
235 """Forces the IDL to drop its connection to the database and reconnect.
236 In the meantime, the contents of the IDL will not change."""
237 self._session.force_reconnect()
239 def set_lock(self, lock_name):
240 """If 'lock_name' is not None, configures the IDL to obtain the named
241 lock from the database server and to avoid modifying the database when
242 the lock cannot be acquired (that is, when another client has the same
245 If 'lock_name' is None, drops the locking requirement and releases the
248 assert not self._outstanding_txns
250 if self.lock_name and (not lock_name or lock_name != self.lock_name):
251 # Release previous lock.
252 self.__send_unlock_request()
253 self.lock_name = None
254 self.is_lock_contended = False
256 if lock_name and not self.lock_name:
258 self.lock_name = lock_name
259 self.__send_lock_request()
264 for table in self.tables.itervalues():
270 self.change_seqno += 1
272 def __update_has_lock(self, new_has_lock):
273 if new_has_lock and not self.has_lock:
274 if self._monitor_request_id is None:
275 self.change_seqno += 1
277 # We're waiting for a monitor reply, so don't signal that the
278 # database changed. The monitor reply will increment
279 # change_seqno anyhow.
281 self.is_lock_contended = False
282 self.has_lock = new_has_lock
284 def __do_send_lock_request(self, method):
285 self.__update_has_lock(False)
286 self._lock_request_id = None
287 if self._session.is_connected():
288 msg = ovs.jsonrpc.Message.create_request(method, [self.lock_name])
290 self._session.send(msg)
295 def __send_lock_request(self):
296 self._lock_request_id = self.__do_send_lock_request("lock")
298 def __send_unlock_request(self):
299 self.__do_send_lock_request("unlock")
301 def __parse_lock_reply(self, result):
302 self._lock_request_id = None
303 got_lock = type(result) == dict and result.get("locked") is True
304 self.__update_has_lock(got_lock)
306 self.is_lock_contended = True
308 def __parse_lock_notify(self, params, new_has_lock):
309 if (self.lock_name is not None
310 and type(params) in (list, tuple)
312 and params[0] == self.lock_name):
313 self.__update_has_lock(self, new_has_lock)
315 self.is_lock_contended = True
317 def __send_monitor_request(self):
318 monitor_requests = {}
319 for table in self.tables.itervalues():
320 monitor_requests[table.name] = {"columns": table.columns.keys()}
321 msg = ovs.jsonrpc.Message.create_request(
322 "monitor", [self._db.name, None, monitor_requests])
323 self._monitor_request_id = msg.id
324 self._session.send(msg)
326 def __parse_update(self, update):
328 self.__do_parse_update(update)
329 except error.Error, e:
330 vlog.err("%s: error parsing update: %s"
331 % (self._session.get_name(), e))
333 def __do_parse_update(self, table_updates):
334 if type(table_updates) != dict:
335 raise error.Error("<table-updates> is not an object",
338 for table_name, table_update in table_updates.iteritems():
339 table = self.tables.get(table_name)
341 raise error.Error('<table-updates> includes unknown '
342 'table "%s"' % table_name)
344 if type(table_update) != dict:
345 raise error.Error('<table-update> for table "%s" is not '
346 'an object' % table_name, table_update)
348 for uuid_string, row_update in table_update.iteritems():
349 if not ovs.ovsuuid.is_valid_string(uuid_string):
350 raise error.Error('<table-update> for table "%s" '
351 'contains bad UUID "%s" as member '
352 'name' % (table_name, uuid_string),
354 uuid = ovs.ovsuuid.from_string(uuid_string)
356 if type(row_update) != dict:
357 raise error.Error('<table-update> for table "%s" '
358 'contains <row-update> for %s that '
360 % (table_name, uuid_string))
362 parser = ovs.db.parser.Parser(row_update, "row-update")
363 old = parser.get_optional("old", [dict])
364 new = parser.get_optional("new", [dict])
367 if not old and not new:
368 raise error.Error('<row-update> missing "old" and '
369 '"new" members', row_update)
371 if self.__process_update(table, uuid, old, new):
372 self.change_seqno += 1
374 def __process_update(self, table, uuid, old, new):
375 """Returns True if a column changed, False otherwise."""
376 row = table.rows.get(uuid)
385 vlog.warn("cannot delete missing row %s from table %s"
386 % (uuid, table.name))
390 row = self.__create_row(table, uuid)
394 vlog.warn("cannot add existing row %s to table %s"
395 % (uuid, table.name))
396 if self.__row_update(table, row, new):
400 row = self.__create_row(table, uuid)
403 vlog.warn("cannot modify missing row %s in table %s"
404 % (uuid, table.name))
405 if self.__row_update(table, row, new):
409 def __row_update(self, table, row, row_json):
411 for column_name, datum_json in row_json.iteritems():
412 column = table.columns.get(column_name)
415 vlog.warn("unknown column %s updating table %s"
416 % (column_name, table.name))
420 datum = ovs.db.data.Datum.from_json(column.type, datum_json)
421 except error.Error, e:
423 vlog.warn("error parsing column %s in table %s: %s"
424 % (column_name, table.name, e))
427 if datum != row._data[column_name]:
428 row._data[column_name] = datum
432 # Didn't really change but the OVSDB monitor protocol always
433 # includes every value in a row.
437 def __create_row(self, table, uuid):
439 for column in table.columns.itervalues():
440 data[column.name] = ovs.db.data.Datum.default(column.type)
441 row = table.rows[uuid] = Row(self, table, uuid, data)
445 self._session.force_reconnect()
447 def __txn_abort_all(self):
448 while self._outstanding_txns:
449 txn = self._outstanding_txns.popitem()[1]
450 txn._status = Transaction.TRY_AGAIN
452 def __txn_process_reply(self, msg):
453 txn = self._outstanding_txns.pop(msg.id, None)
455 txn._process_reply(msg)
458 def _uuid_to_row(atom, base):
460 return base.ref_table.rows.get(atom)
465 def _row_to_uuid(value):
466 if type(value) == Row:
473 """A row within an IDL.
475 The client may access the following attributes directly:
477 - 'uuid': a uuid.UUID object whose value is the row's database UUID.
479 - An attribute for each column in the Row's table, named for the column,
480 whose values are as returned by Datum.to_python() for the column's type.
482 If some error occurs (e.g. the database server's idea of the column is
483 different from the IDL's idea), then the attribute values is the
484 "default" value return by Datum.default() for the column's type. (It is
485 important to know this because the default value may violate constraints
486 for the column's type, e.g. the default integer value is 0 even if column
487 contraints require the column's value to be positive.)
489 When a transaction is active, column attributes may also be assigned new
490 values. Committing the transaction will then cause the new value to be
491 stored into the database.
493 *NOTE*: In the current implementation, the value of a column is a *copy*
494 of the value in the database. This means that modifying its value
495 directly will have no useful effect. For example, the following:
496 row.mycolumn["a"] = "b" # don't do this
497 will not change anything in the database, even after commit. To modify
498 the column, instead assign the modified column value back to the column:
503 def __init__(self, idl, table, uuid, data):
504 # All of the explicit references to self.__dict__ below are required
505 # to set real attributes with invoking self.__getattr__().
506 self.__dict__["uuid"] = uuid
508 self.__dict__["_idl"] = idl
509 self.__dict__["_table"] = table
511 # _data is the committed data. It takes the following values:
513 # - A dictionary that maps every column name to a Datum, if the row
514 # exists in the committed form of the database.
516 # - None, if this row is newly inserted within the active transaction
517 # and thus has no committed form.
518 self.__dict__["_data"] = data
520 # _changes describes changes to this row within the active transaction.
521 # It takes the following values:
523 # - {}, the empty dictionary, if no transaction is active or if the
524 # row has yet not been changed within this transaction.
526 # - A dictionary that maps a column name to its new Datum, if an
527 # active transaction changes those columns' values.
529 # - A dictionary that maps every column name to a Datum, if the row
530 # is newly inserted within the active transaction.
532 # - None, if this transaction deletes this row.
533 self.__dict__["_changes"] = {}
535 # A dictionary whose keys are the names of columns that must be
536 # verified as prerequisites when the transaction commits. The values
537 # in the dictionary are all None.
538 self.__dict__["_prereqs"] = {}
540 def __getattr__(self, column_name):
541 assert self._changes is not None
543 datum = self._changes.get(column_name)
545 datum = self._data[column_name]
547 return datum.to_python(_uuid_to_row)
549 def __setattr__(self, column_name, value):
550 assert self._changes is not None
553 column = self._table.columns[column_name]
555 datum = ovs.db.data.Datum.from_python(column.type, value,
557 except error.Error, e:
559 vlog.err("attempting to write bad value to column %s (%s)"
562 self._idl.txn._write(self, column, datum)
564 def verify(self, column_name):
565 """Causes the original contents of column 'column_name' in this row to
566 be verified as a prerequisite to completing the transaction. That is,
567 if 'column_name' changed in this row (or if this row was deleted)
568 between the time that the IDL originally read its contents and the time
569 that the transaction commits, then the transaction aborts and
570 Transaction.commit() returns Transaction.TRY_AGAIN.
572 The intention is that, to ensure that no transaction commits based on
573 dirty reads, an application should call Row.verify() on each data item
574 read as part of a read-modify-write operation.
576 In some cases Row.verify() reduces to a no-op, because the current
577 value of the column is already known:
579 - If this row is a row created by the current transaction (returned
580 by Transaction.insert()).
582 - If the column has already been modified within the current
585 Because of the latter property, always call Row.verify() *before*
586 modifying the column, for a given read-modify-write.
588 A transaction must be in progress."""
590 assert self._changes is not None
591 if not self._data or column_name in self._changes:
594 self._prereqs[column_name] = None
597 """Deletes this row from its table.
599 A transaction must be in progress."""
601 assert self._changes is not None
602 if self._data is None:
603 del self._idl.txn._txn_rows[self.uuid]
604 self.__dict__["_changes"] = None
605 del self._table.rows[self.uuid]
607 def increment(self, column_name):
608 """Causes the transaction, when committed, to increment the value of
609 'column_name' within this row by 1. 'column_name' must have an integer
610 type. After the transaction commits successfully, the client may
611 retrieve the final (incremented) value of 'column_name' with
612 Transaction.get_increment_new_value().
614 The client could accomplish something similar by reading and writing
615 and verify()ing columns. However, increment() will never (by itself)
616 cause a transaction to fail because of a verify error.
618 The intended use is for incrementing the "next_cfg" column in
619 the Open_vSwitch table."""
620 self._idl.txn._increment(self, column_name)
623 def _uuid_name_from_uuid(uuid):
624 return "row%s" % str(uuid).replace("-", "_")
627 def _where_uuid_equals(uuid):
628 return [["_uuid", "==", ["uuid", str(uuid)]]]
631 class _InsertedRow(object):
632 def __init__(self, op_index):
633 self.op_index = op_index
637 class Transaction(object):
638 # Status values that Transaction.commit() can return.
639 UNCOMMITTED = "uncommitted" # Not yet committed or aborted.
640 UNCHANGED = "unchanged" # Transaction didn't include any changes.
641 INCOMPLETE = "incomplete" # Commit in progress, please wait.
642 ABORTED = "aborted" # ovsdb_idl_txn_abort() called.
643 SUCCESS = "success" # Commit successful.
644 TRY_AGAIN = "try again" # Commit failed because a "verify" operation
645 # reported an inconsistency, due to a network
646 # problem, or other transient failure. Wait
647 # for a change, then try again.
648 NOT_LOCKED = "not locked" # Server hasn't given us the lock yet.
649 ERROR = "error" # Commit failed due to a hard error.
652 def status_to_string(status):
653 """Converts one of the status values that Transaction.commit() can
654 return into a human-readable string.
656 (The status values are in fact such strings already, so
657 there's nothing to do.)"""
660 def __init__(self, idl):
661 """Starts a new transaction on 'idl' (an instance of ovs.db.idl.Idl).
662 A given Idl may only have a single active transaction at a time.
664 A Transaction may modify the contents of a database by assigning new
665 values to columns (attributes of Row), deleting rows (with
666 Row.delete()), or inserting rows (with Transaction.insert()). It may
667 also check that columns in the database have not changed with
670 When a transaction is complete (which must be before the next call to
671 Idl.run()), call Transaction.commit() or Transaction.abort()."""
672 assert idl.txn is None
675 self._request_id = None
679 self._status = Transaction.UNCOMMITTED
682 self._commit_seqno = self.idl.change_seqno
685 self._inc_column = None
687 self._inserted_rows = {} # Map from UUID to _InsertedRow
689 def add_comment(self, comment):
690 """Appens 'comment' to the comments that will be passed to the OVSDB
691 server when this transaction is committed. (The comment will be
692 committed to the OVSDB log, which "ovsdb-tool show-log" can print in a
693 relatively human-readable form.)"""
694 self._comments.append(comment)
696 def wait(self, poller):
697 if self._status not in (Transaction.UNCOMMITTED,
698 Transaction.INCOMPLETE):
699 poller.immediate_wake()
701 def _substitute_uuids(self, json):
702 if type(json) in (list, tuple):
704 and json[0] == 'uuid'
705 and ovs.ovsuuid.is_valid_string(json[1])):
706 uuid = ovs.ovsuuid.from_string(json[1])
707 row = self._txn_rows.get(uuid, None)
708 if row and row._data is None:
709 return ["named-uuid", _uuid_name_from_uuid(uuid)]
712 def __disassemble(self):
715 for row in self._txn_rows.itervalues():
716 if row._changes is None:
717 row._table.rows[row.uuid] = row
718 elif row._data is None:
719 del row._table.rows[row.uuid]
720 row.__dict__["_changes"] = {}
721 row.__dict__["_prereqs"] = {}
725 """Attempts to commit this transaction and returns the status of the
726 commit operation, one of the constants declared as class attributes.
727 If the return value is Transaction.INCOMPLETE, then the transaction is
728 not yet complete and the caller should try calling again later, after
729 calling Idl.run() to run the Idl.
731 Committing a transaction rolls back all of the changes that it made to
732 the Idl's copy of the database. If the transaction commits
733 successfully, then the database server will send an update and, thus,
734 the Idl will be updated with the committed changes."""
735 # The status can only change if we're the active transaction.
736 # (Otherwise, our status will change only in Idl.run().)
737 if self != self.idl.txn:
740 # If we need a lock but don't have it, give up quickly.
741 if self.idl.lock_name and not self.idl.has_lock():
742 self._status = Transaction.NOT_LOCKED
746 operations = [self.idl._db.name]
748 # Assert that we have the required lock (avoiding a race).
749 if self.idl.lock_name:
750 operations.append({"op": "assert",
751 "lock": self.idl.lock_name})
753 # Add prerequisites and declarations of new rows.
754 for row in self._txn_rows.itervalues():
758 for column_name in row._prereqs:
759 columns.append(column_name)
760 rows[column_name] = row._data[column_name].to_json()
761 operations.append({"op": "wait",
762 "table": row._table.name,
764 "where": _where_uuid_equals(row.uuid),
771 for row in self._txn_rows.itervalues():
772 if row._changes is None:
773 if row._table.is_root:
774 operations.append({"op": "delete",
775 "table": row._table.name,
776 "where": _where_uuid_equals(row.uuid)})
779 # Let ovsdb-server decide whether to really delete it.
782 op = {"table": row._table.name}
783 if row._data is None:
785 op["uuid-name"] = _uuid_name_from_uuid(row.uuid)
788 op_index = len(operations) - 1
789 self._inserted_rows[row.uuid] = _InsertedRow(op_index)
792 op["where"] = _where_uuid_equals(row.uuid)
797 for column_name, datum in row._changes.iteritems():
798 if row._data is not None or not datum.is_default():
799 row_json[column_name] = (
800 self._substitute_uuids(datum.to_json()))
802 # If anything really changed, consider it an update.
803 # We can't suppress not-really-changed values earlier
804 # or transactions would become nonatomic (see the big
805 # comment inside Transaction._write()).
806 if (not any_updates and row._data is not None and
807 row._data[column_name] != datum):
810 if row._data is None or row_json:
811 operations.append(op)
814 if self._inc_row and any_updates:
815 self._inc_index = len(operations) - 1
817 operations.append({"op": "mutate",
818 "table": self._inc_row._table.name,
819 "where": self._substitute_uuids(
820 _where_uuid_equals(self._inc_row.uuid)),
821 "mutations": [[self._inc_column, "+=", 1]]})
822 operations.append({"op": "select",
823 "table": self._inc_row._table.name,
824 "where": self._substitute_uuids(
825 _where_uuid_equals(self._inc_row.uuid)),
826 "columns": [self._inc_column]})
830 operations.append({"op": "comment",
831 "comment": "\n".join(self._comments)})
835 operations.append({"op": "abort"})
838 self._status = Transaction.UNCHANGED
840 msg = ovs.jsonrpc.Message.create_request("transact", operations)
841 self._request_id = msg.id
842 if not self.idl._session.send(msg):
843 self.idl._outstanding_txns[self._request_id] = self
844 self._status = Transaction.INCOMPLETE
846 self._status = Transaction.TRY_AGAIN
851 def commit_block(self):
853 status = self.commit()
854 if status != Transaction.INCOMPLETE:
859 poller = ovs.poller.Poller()
860 self.idl.wait(poller)
864 def get_increment_new_value(self):
865 assert self._status == Transaction.SUCCESS
866 return self._inc_new_value
869 """Aborts this transaction. If Transaction.commit() has already been
870 called then the transaction might get committed anyhow."""
872 if self._status in (Transaction.UNCOMMITTED,
873 Transaction.INCOMPLETE):
874 self._status = Transaction.ABORTED
877 """Returns a string representing this transaction's current status,
878 suitable for use in log messages."""
879 if self._status != Transaction.ERROR:
880 return Transaction.status_to_string(self._status)
884 return "no error details available"
886 def __set_error_json(self, json):
887 if self._error is None:
888 self._error = ovs.json.to_string(json)
890 def get_insert_uuid(self, uuid):
891 """Finds and returns the permanent UUID that the database assigned to a
892 newly inserted row, given the UUID that Transaction.insert() assigned
895 Returns None if 'uuid' is not a UUID assigned by Transaction.insert()
896 or if it was assigned by that function and then deleted by Row.delete()
897 within the same transaction. (Rows that are inserted and then deleted
898 within a single transaction are never sent to the database server, so
899 it never assigns them a permanent UUID.)
901 This transaction must have completed successfully."""
902 assert self._status in (Transaction.SUCCESS,
903 Transaction.UNCHANGED)
904 inserted_row = self._inserted_rows.get(uuid)
906 return inserted_row.real
909 def _increment(self, row, column):
910 assert not self._inc_row
912 self._inc_column = column
914 def _write(self, row, column, datum):
915 assert row._changes is not None
919 # If this is a write-only column and the datum being written is the
920 # same as the one already there, just skip the update entirely. This
921 # is worth optimizing because we have a lot of columns that get
922 # periodically refreshed into the database but don't actually change
925 # We don't do this for read/write columns because that would break
926 # atomicity of transactions--some other client might have written a
927 # different value in that column since we read it. (But if a whole
928 # transaction only does writes of existing values, without making any
929 # real changes, we will drop the whole transaction later in
930 # ovsdb_idl_txn_commit().)
931 if not column.alert and row._data.get(column.name) == datum:
932 new_value = row._changes.get(column.name)
933 if new_value is None or new_value == datum:
936 txn._txn_rows[row.uuid] = row
937 row._changes[column.name] = datum.copy()
939 def insert(self, table, new_uuid=None):
940 """Inserts and returns a new row in 'table', which must be one of the
941 ovs.db.schema.TableSchema objects in the Idl's 'tables' dict.
943 The new row is assigned a provisional UUID. If 'uuid' is None then one
944 is randomly generated; otherwise 'uuid' should specify a randomly
945 generated uuid.UUID not otherwise in use. ovsdb-server will assign a
946 different UUID when 'txn' is committed, but the IDL will replace any
947 uses of the provisional UUID in the data to be to be committed by the
948 UUID assigned by ovsdb-server."""
949 assert self._status == Transaction.UNCOMMITTED
951 new_uuid = uuid.uuid4()
952 row = Row(self.idl, table, new_uuid, None)
953 table.rows[row.uuid] = row
954 self._txn_rows[row.uuid] = row
957 def _process_reply(self, msg):
958 if msg.type == ovs.jsonrpc.Message.T_ERROR:
959 self._status = Transaction.ERROR
960 elif type(msg.result) not in (list, tuple):
962 vlog.warn('reply to "transact" is not JSON array')
971 # This isn't an error in itself but indicates that some
972 # prior operation failed, so make sure that we know about
975 elif type(op) == dict:
976 error = op.get("error")
977 if error is not None:
978 if error == "timed out":
980 elif error == "not owner":
982 elif error == "aborted":
986 self.__set_error_json(op)
989 self.__set_error_json(op)
991 vlog.warn("operation reply is not JSON null or object")
993 if not soft_errors and not hard_errors and not lock_errors:
994 if self._inc_row and not self.__process_inc_reply(ops):
997 for insert in self._inserted_rows.itervalues():
998 if not self.__process_insert_reply(insert, ops):
1002 self._status = Transaction.ERROR
1004 self._status = Transaction.NOT_LOCKED
1006 self._status = Transaction.TRY_AGAIN
1008 self._status = Transaction.SUCCESS
1011 def __check_json_type(json, types, name):
1014 vlog.warn("%s is missing" % name)
1016 elif type(json) not in types:
1018 vlog.warn("%s has unexpected type %s" % (name, type(json)))
1023 def __process_inc_reply(self, ops):
1024 if self._inc_index + 2 > len(ops):
1026 vlog.warn("reply does not contain enough operations for "
1027 "increment (has %d, needs %d)" %
1028 (len(ops), self._inc_index + 2))
1030 # We know that this is a JSON object because the loop in
1031 # __process_reply() already checked.
1032 mutate = ops[self._inc_index]
1033 count = mutate.get("count")
1034 if not Transaction.__check_json_type(count, (int, long),
1035 '"mutate" reply "count"'):
1039 vlog.warn('"mutate" reply "count" is %d instead of 1' % count)
1042 select = ops[self._inc_index + 1]
1043 rows = select.get("rows")
1044 if not Transaction.__check_json_type(rows, (list, tuple),
1045 '"select" reply "rows"'):
1049 vlog.warn('"select" reply "rows" has %d elements '
1050 'instead of 1' % len(rows))
1053 if not Transaction.__check_json_type(row, (dict,),
1054 '"select" reply row'):
1056 column = row.get(self._inc_column)
1057 if not Transaction.__check_json_type(column, (int, long),
1058 '"select" reply inc column'):
1060 self._inc_new_value = column
1063 def __process_insert_reply(self, insert, ops):
1064 if insert.op_index >= len(ops):
1066 vlog.warn("reply does not contain enough operations "
1067 "for insert (has %d, needs %d)"
1068 % (len(ops), insert.op_index))
1071 # We know that this is a JSON object because the loop in
1072 # __process_reply() already checked.
1073 reply = ops[insert.op_index]
1074 json_uuid = reply.get("uuid")
1075 if not Transaction.__check_json_type(json_uuid, (tuple, list),
1076 '"insert" reply "uuid"'):
1080 uuid_ = ovs.ovsuuid.from_json(json_uuid)
1083 vlog.warn('"insert" reply "uuid" is not a JSON UUID')
1090 class SchemaHelper(object):
1091 """IDL Schema helper.
1093 This class encapsulates the logic required to generate schemas suitable
1094 for creating 'ovs.db.idl.Idl' objects. Clients should register columns
1095 they are interested in using register_columns(). When finished, the
1096 get_idl_schema() function may be called.
1098 The location on disk of the schema used may be found in the
1099 'schema_location' variable."""
1101 def __init__(self, location=None):
1102 """Creates a new Schema object."""
1104 if location is None:
1105 location = "%s/vswitch.ovsschema" % ovs.dirs.PKGDATADIR
1107 self.schema_location = location
1111 def register_columns(self, table, columns):
1112 """Registers interest in the given 'columns' of 'table'. Future calls
1113 to get_idl_schema() will include 'table':column for each column in
1114 'columns'. This function automatically avoids adding duplicate entries
1117 'table' must be a string.
1118 'columns' must be a list of strings.
1121 assert type(table) is str
1122 assert type(columns) is list
1124 columns = set(columns) | self._tables.get(table, set())
1125 self._tables[table] = columns
1127 def register_all(self):
1128 """Registers interest in every column of every table."""
1131 def get_idl_schema(self):
1132 """Gets a schema appropriate for the creation of an 'ovs.db.id.IDL'
1133 object based on columns registered using the register_columns()
1136 schema = ovs.db.schema.DbSchema.from_json(
1137 ovs.json.from_file(self.schema_location))
1141 for table, columns in self._tables.iteritems():
1142 schema_tables[table] = (
1143 self._keep_table_columns(schema, table, columns))
1145 schema.tables = schema_tables
1148 def _keep_table_columns(self, schema, table_name, columns):
1149 assert table_name in schema.tables
1150 table = schema.tables[table_name]
1153 for column_name in columns:
1154 assert type(column_name) is str
1155 assert column_name in table.columns
1157 new_columns[column_name] = table.columns[column_name]
1159 table.columns = new_columns