X-Git-Url: https://pintos-os.org/cgi-bin/gitweb.cgi?a=blobdiff_plain;f=python%2Fovs%2Fdb%2Fidl.py;h=3a8dec2828cac76e559901dfd47c5d5cbdb648b9;hb=225b582a8c218eec242921b0eed291cf6ec19b76;hp=d01fde803b9aa74e068ea273cd395a1db2225c62;hpb=8cdf0349740c3e1a73af9aa6209bb22be952cd37;p=openvswitch diff --git a/python/ovs/db/idl.py b/python/ovs/db/idl.py index d01fde80..3a8dec28 100644 --- a/python/ovs/db/idl.py +++ b/python/ovs/db/idl.py @@ -1,4 +1,4 @@ -# Copyright (c) 2009, 2010, 2011 Nicira Networks +# Copyright (c) 2009, 2010, 2011, 2012 Nicira, Inc. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -12,7 +12,6 @@ # See the License for the specific language governing permissions and # limitations under the License. -import logging import uuid import ovs.jsonrpc @@ -21,6 +20,12 @@ import ovs.db.schema from ovs.db import error import ovs.ovsuuid import ovs.poller +import ovs.vlog + +vlog = ovs.vlog.Vlog("idl") + +__pychecker__ = 'no-classattr no-objattrs' + class Idl: """Open vSwitch Database Interface Definition Language (OVSDB IDL). @@ -47,7 +52,13 @@ class Idl: 'rows' map values. Refer to Row for more details. - 'change_seqno': A number that represents the IDL's state. When the IDL - is updated (by Idl.run()), its value changes. + is updated (by Idl.run()), its value changes. The sequence number can + occasionally change even if the database does not. This happens if the + connection to the database drops and reconnects, which causes the + database contents to be reloaded even if they didn't change. (It could + also happen if the database server sends out a "change" that reflects + what the IDL already thought was in the database. The database server is + not supposed to do that, but bugs could in theory cause it to do so.) - 'lock_name': The name of the lock configured with Idl.set_lock(), or None if no lock is configured. @@ -83,8 +94,14 @@ class Idl: purpose of the return value of Idl.run() and Idl.change_seqno. This is useful for columns that the IDL's client will write but not read. + As a convenience to users, 'schema' may also be an instance of the + SchemaHelper class. + The IDL uses and modifies 'schema' directly.""" + assert isinstance(schema, SchemaHelper) + schema = schema.get_idl_schema() + self.tables = schema.tables self._db = schema self._session = ovs.jsonrpc.Session.open(remote) @@ -95,7 +112,7 @@ class Idl: # Database locking. self.lock_name = None # Name of lock we need, None if none. self.has_lock = False # Has db server said we have the lock? - self.is_lock_contended = False # Has db server said we can't get lock? + self.is_lock_contended = False # Has db server said we can't get lock? self._lock_request_id = None # JSON-RPC ID of in-flight lock request. # Transaction support. @@ -170,8 +187,8 @@ class Idl: self.__clear() self.__parse_update(msg.result) except error.Error, e: - logging.error("%s: parse error in received schema: %s" - % (self._session.get_name(), e)) + vlog.err("%s: parse error in received schema: %s" + % (self._session.get_name(), e)) self.__error() elif (msg.type == ovs.jsonrpc.Message.T_REPLY and self._lock_request_id is not None @@ -197,9 +214,9 @@ class Idl: else: # This can happen if a transaction is destroyed before we # receive the reply, so keep the log level low. - logging.debug("%s: received unexpected %s message" - % (self._session.get_name(), - ovs.jsonrpc.Message.type_to_string(msg.type))) + vlog.dbg("%s: received unexpected %s message" + % (self._session.get_name(), + ovs.jsonrpc.Message.type_to_string(msg.type))) return initial_change_seqno != self.change_seqno @@ -316,8 +333,8 @@ class Idl: try: self.__do_parse_update(update) except error.Error, e: - logging.error("%s: error parsing update: %s" - % (self._session.get_name(), e)) + vlog.err("%s: error parsing update: %s" + % (self._session.get_name(), e)) def __do_parse_update(self, table_updates): if type(table_updates) != dict: @@ -371,8 +388,8 @@ class Idl: changed = True else: # XXX rate-limit - logging.warning("cannot delete missing row %s from table %s" - % (uuid, table.name)) + vlog.warn("cannot delete missing row %s from table %s" + % (uuid, table.name)) elif not old: # Insert row. if not row: @@ -380,8 +397,8 @@ class Idl: changed = True else: # XXX rate-limit - logging.warning("cannot add existing row %s to table %s" - % (uuid, table.name)) + vlog.warn("cannot add existing row %s to table %s" + % (uuid, table.name)) if self.__row_update(table, row, new): changed = True else: @@ -389,8 +406,8 @@ class Idl: row = self.__create_row(table, uuid) changed = True # XXX rate-limit - logging.warning("cannot modify missing row %s in table %s" - % (uuid, table.name)) + vlog.warn("cannot modify missing row %s in table %s" + % (uuid, table.name)) if self.__row_update(table, row, new): changed = True return changed @@ -401,16 +418,16 @@ class Idl: column = table.columns.get(column_name) if not column: # XXX rate-limit - logging.warning("unknown column %s updating table %s" - % (column_name, table.name)) + vlog.warn("unknown column %s updating table %s" + % (column_name, table.name)) continue try: datum = ovs.db.data.Datum.from_json(column.type, datum_json) except error.Error, e: # XXX rate-limit - logging.warning("error parsing column %s in table %s: %s" - % (column_name, table.name, e)) + vlog.warn("error parsing column %s in table %s: %s" + % (column_name, table.name, e)) continue if datum != row._data[column_name]: @@ -443,18 +460,21 @@ class Idl: if txn: txn._process_reply(msg) + def _uuid_to_row(atom, base): if base.ref_table: return base.ref_table.rows.get(atom) else: return atom + def _row_to_uuid(value): if type(value) == Row: return value.uuid else: return value + class Row(object): """A row within an IDL. @@ -542,8 +562,8 @@ class Row(object): _row_to_uuid) except error.Error, e: # XXX rate-limit - logging.error("attempting to write bad value to column %s (%s)" - % (column_name, e)) + vlog.err("attempting to write bad value to column %s (%s)" + % (column_name, e)) return self._idl.txn._write(self, column, datum) @@ -590,29 +610,92 @@ class Row(object): self.__dict__["_changes"] = None del self._table.rows[self.uuid] + def increment(self, column_name): + """Causes the transaction, when committed, to increment the value of + 'column_name' within this row by 1. 'column_name' must have an integer + type. After the transaction commits successfully, the client may + retrieve the final (incremented) value of 'column_name' with + Transaction.get_increment_new_value(). + + The client could accomplish something similar by reading and writing + and verify()ing columns. However, increment() will never (by itself) + cause a transaction to fail because of a verify error. + + The intended use is for incrementing the "next_cfg" column in + the Open_vSwitch table.""" + self._idl.txn._increment(self, column_name) + + def _uuid_name_from_uuid(uuid): return "row%s" % str(uuid).replace("-", "_") + def _where_uuid_equals(uuid): return [["_uuid", "==", ["uuid", str(uuid)]]] + class _InsertedRow(object): def __init__(self, op_index): self.op_index = op_index self.real = None + class Transaction(object): + """A transaction may modify the contents of a database by modifying the + values of columns, deleting rows, inserting rows, or adding checks that + columns in the database have not changed ("verify" operations), through + Row methods. + + Reading and writing columns and inserting and deleting rows are all + straightforward. The reasons to verify columns are less obvious. + Verification is the key to maintaining transactional integrity. Because + OVSDB handles multiple clients, it can happen that between the time that + OVSDB client A reads a column and writes a new value, OVSDB client B has + written that column. Client A's write should not ordinarily overwrite + client B's, especially if the column in question is a "map" column that + contains several more or less independent data items. If client A adds a + "verify" operation before it writes the column, then the transaction fails + in case client B modifies it first. Client A will then see the new value + of the column and compose a new transaction based on the new contents + written by client B. + + When a transaction is complete, which must be before the next call to + Idl.run(), call Transaction.commit() or Transaction.abort(). + + The life-cycle of a transaction looks like this: + + 1. Create the transaction and record the initial sequence number: + + seqno = idl.change_seqno(idl) + txn = Transaction(idl) + + 2. Modify the database with Row and Transaction methods. + + 3. Commit the transaction by calling Transaction.commit(). The first call + to this function probably returns Transaction.INCOMPLETE. The client + must keep calling again along as this remains true, calling Idl.run() in + between to let the IDL do protocol processing. (If the client doesn't + have anything else to do in the meantime, it can use + Transaction.commit_block() to avoid having to loop itself.) + + 4. If the final status is Transaction.TRY_AGAIN, wait for Idl.change_seqno + to change from the saved 'seqno' (it's possible that it's already + changed, in which case the client should not wait at all), then start + over from step 1. Only a call to Idl.run() will change the return value + of Idl.change_seqno. (Transaction.commit_block() calls Idl.run().)""" + # Status values that Transaction.commit() can return. - UNCOMMITTED = "uncommitted" # Not yet committed or aborted. - UNCHANGED = "unchanged" # Transaction didn't include any changes. - INCOMPLETE = "incomplete" # Commit in progress, please wait. - ABORTED = "aborted" # ovsdb_idl_txn_abort() called. - SUCCESS = "success" # Commit successful. - TRY_AGAIN = "try again" # Commit failed because a "verify" operation - # reported an inconsistency, due to a network - # problem, or other transient failure. - NOT_LOCKED = "not locked" # Server hasn't given us the lock yet. - ERROR = "error" # Commit failed due to a hard error. + UNCOMMITTED = "uncommitted" # Not yet committed or aborted. + UNCHANGED = "unchanged" # Transaction didn't include any changes. + INCOMPLETE = "incomplete" # Commit in progress, please wait. + ABORTED = "aborted" # ovsdb_idl_txn_abort() called. + SUCCESS = "success" # Commit successful. + TRY_AGAIN = "try again" # Commit failed because a "verify" operation + # reported an inconsistency, due to a network + # problem, or other transient failure. Wait + # for a change, then try again. + NOT_LOCKED = "not locked" # Server hasn't given us the lock yet. + ERROR = "error" # Commit failed due to a hard error. @staticmethod def status_to_string(status): @@ -645,12 +728,12 @@ class Transaction(object): self._status = Transaction.UNCOMMITTED self._error = None self._comments = [] + self._commit_seqno = self.idl.change_seqno - self._inc_table = None + self._inc_row = None self._inc_column = None - self._inc_where = None - self._inserted_rows = {} # Map from UUID to _InsertedRow + self._inserted_rows = {} # Map from UUID to _InsertedRow def add_comment(self, comment): """Appens 'comment' to the comments that will be passed to the OVSDB @@ -659,13 +742,9 @@ class Transaction(object): relatively human-readable form.)""" self._comments.append(comment) - def increment(self, table, column, where): - assert not self._inc_table - self._inc_table = table - self._inc_column = column - self._inc_where = where - def wait(self, poller): + """Causes poll_block() to wake up if this transaction has completed + committing.""" if self._status not in (Transaction.UNCOMMITTED, Transaction.INCOMPLETE): poller.immediate_wake() @@ -679,6 +758,8 @@ class Transaction(object): row = self._txn_rows.get(uuid, None) if row and row._data is None: return ["named-uuid", _uuid_name_from_uuid(uuid)] + else: + return [self._substitute_uuids(elem) for elem in json] return json def __disassemble(self): @@ -694,16 +775,56 @@ class Transaction(object): self._txn_rows = {} def commit(self): - """Attempts to commit this transaction and returns the status of the - commit operation, one of the constants declared as class attributes. - If the return value is Transaction.INCOMPLETE, then the transaction is - not yet complete and the caller should try calling again later, after - calling Idl.run() to run the Idl. + """Attempts to commit 'txn'. Returns the status of the commit + operation, one of the following constants: + + Transaction.INCOMPLETE: + + The transaction is in progress, but not yet complete. The caller + should call again later, after calling Idl.run() to let the + IDL do OVSDB protocol processing. + + Transaction.UNCHANGED: + + The transaction is complete. (It didn't actually change the + database, so the IDL didn't send any request to the database + server.) + + Transaction.ABORTED: + + The caller previously called Transaction.abort(). + + Transaction.SUCCESS: + + The transaction was successful. The update made by the + transaction (and possibly other changes made by other database + clients) should already be visible in the IDL. + + Transaction.TRY_AGAIN: + + The transaction failed for some transient reason, e.g. because a + "verify" operation reported an inconsistency or due to a network + problem. The caller should wait for a change to the database, + then compose a new transaction, and commit the new transaction. + + Use Idl.change_seqno to wait for a change in the database. It is + important to use its value *before* the initial call to + Transaction.commit() as the baseline for this purpose, because + the change that one should wait for can happen after the initial + call but before the call that returns Transaction.TRY_AGAIN, and + using some other baseline value in that situation could cause an + indefinite wait if the database rarely changes. + + Transaction.NOT_LOCKED: + + The transaction failed because the IDL has been configured to + require a database lock (with Idl.set_lock()) but didn't + get it yet or has already lost it. Committing a transaction rolls back all of the changes that it made to - the Idl's copy of the database. If the transaction commits + the IDL's copy of the database. If the transaction commits successfully, then the database server will send an update and, thus, - the Idl will be updated with the committed changes.""" + the IDL will be updated with the committed changes.""" # The status can only change if we're the active transaction. # (Otherwise, our status will change only in Idl.run().) if self != self.idl.txn: @@ -768,7 +889,8 @@ class Transaction(object): for column_name, datum in row._changes.iteritems(): if row._data is not None or not datum.is_default(): - row_json[column_name] = self._substitute_uuids(datum.to_json()) + row_json[column_name] = ( + self._substitute_uuids(datum.to_json())) # If anything really changed, consider it an update. # We can't suppress not-really-changed values earlier @@ -782,16 +904,18 @@ class Transaction(object): operations.append(op) # Add increment. - if self._inc_table and any_updates: + if self._inc_row and any_updates: self._inc_index = len(operations) - 1 operations.append({"op": "mutate", - "table": self._inc_table, - "where": self._substitute_uuids(self._inc_where), + "table": self._inc_row._table.name, + "where": self._substitute_uuids( + _where_uuid_equals(self._inc_row.uuid)), "mutations": [[self._inc_column, "+=", 1]]}) operations.append({"op": "select", - "table": self._inc_table, - "where": self._substitute_uuids(self._inc_where), + "table": self._inc_row._table.name, + "where": self._substitute_uuids( + _where_uuid_equals(self._inc_row.uuid)), "columns": [self._inc_column]}) # Add comment. @@ -818,6 +942,12 @@ class Transaction(object): return self._status def commit_block(self): + """Attempts to commit this transaction, blocking until the commit + either succeeds or fails. Returns the final commit status, which may + be any Transaction.* value other than Transaction.INCOMPLETE. + + This function calls Idl.run() on this transaction'ss IDL, so it may + cause Idl.change_seqno to change.""" while True: status = self.commit() if status != Transaction.INCOMPLETE: @@ -831,6 +961,9 @@ class Transaction(object): poller.block() def get_increment_new_value(self): + """Returns the final (incremented) value of the column in this + transaction that was set to be incremented by Row.increment. This + transaction must have committed successfully.""" assert self._status == Transaction.SUCCESS return self._inc_new_value @@ -875,6 +1008,11 @@ class Transaction(object): return inserted_row.real return None + def _increment(self, row, column): + assert not self._inc_row + self._inc_row = row + self._inc_column = column + def _write(self, row, column, datum): assert row._changes is not None @@ -923,7 +1061,7 @@ class Transaction(object): self._status = Transaction.ERROR elif type(msg.result) not in (list, tuple): # XXX rate-limit - logging.warning('reply to "transact" is not JSON array') + vlog.warn('reply to "transact" is not JSON array') else: hard_errors = False soft_errors = False @@ -952,11 +1090,10 @@ class Transaction(object): hard_errors = True self.__set_error_json(op) # XXX rate-limit - logging.warning("operation reply is not JSON null or " - "object") + vlog.warn("operation reply is not JSON null or object") if not soft_errors and not hard_errors and not lock_errors: - if self._inc_table and not self.__process_inc_reply(ops): + if self._inc_row and not self.__process_inc_reply(ops): hard_errors = True for insert in self._inserted_rows.itervalues(): @@ -976,11 +1113,11 @@ class Transaction(object): def __check_json_type(json, types, name): if not json: # XXX rate-limit - logging.warning("%s is missing" % name) + vlog.warn("%s is missing" % name) return False elif type(json) not in types: # XXX rate-limit - logging.warning("%s has unexpected type %s" % (name, type(json))) + vlog.warn("%s has unexpected type %s" % (name, type(json))) return False else: return True @@ -988,9 +1125,9 @@ class Transaction(object): def __process_inc_reply(self, ops): if self._inc_index + 2 > len(ops): # XXX rate-limit - logging.warning("reply does not contain enough operations for " - "increment (has %d, needs %d)" % - (len(ops), self._inc_index + 2)) + vlog.warn("reply does not contain enough operations for " + "increment (has %d, needs %d)" % + (len(ops), self._inc_index + 2)) # We know that this is a JSON object because the loop in # __process_reply() already checked. @@ -1001,8 +1138,7 @@ class Transaction(object): return False if count != 1: # XXX rate-limit - logging.warning('"mutate" reply "count" is %d instead of 1' - % count) + vlog.warn('"mutate" reply "count" is %d instead of 1' % count) return False select = ops[self._inc_index + 1] @@ -1012,8 +1148,8 @@ class Transaction(object): return False if len(rows) != 1: # XXX rate-limit - logging.warning('"select" reply "rows" has %d elements ' - 'instead of 1' % len(rows)) + vlog.warn('"select" reply "rows" has %d elements ' + 'instead of 1' % len(rows)) return False row = rows[0] if not Transaction.__check_json_type(row, (dict,), @@ -1029,9 +1165,9 @@ class Transaction(object): def __process_insert_reply(self, insert, ops): if insert.op_index >= len(ops): # XXX rate-limit - logging.warning("reply does not contain enough operations " - "for insert (has %d, needs %d)" - % (len(ops), insert.op_index)) + vlog.warn("reply does not contain enough operations " + "for insert (has %d, needs %d)" + % (len(ops), insert.op_index)) return False # We know that this is a JSON object because the loop in @@ -1046,8 +1182,103 @@ class Transaction(object): uuid_ = ovs.ovsuuid.from_json(json_uuid) except error.Error: # XXX rate-limit - logging.warning('"insert" reply "uuid" is not a JSON UUID') + vlog.warn('"insert" reply "uuid" is not a JSON UUID') return False insert.real = uuid_ return True + + +class SchemaHelper(object): + """IDL Schema helper. + + This class encapsulates the logic required to generate schemas suitable + for creating 'ovs.db.idl.Idl' objects. Clients should register columns + they are interested in using register_columns(). When finished, the + get_idl_schema() function may be called. + + The location on disk of the schema used may be found in the + 'schema_location' variable.""" + + def __init__(self, location=None, schema_json=None): + """Creates a new Schema object. + + 'location' file path to ovs schema. None means default location + 'schema_json' schema in json preresentation in memory + """ + + if location and schema_json: + raise ValueError("both location and schema_json can't be " + "specified. it's ambiguous.") + if schema_json is None: + if location is None: + location = "%s/vswitch.ovsschema" % ovs.dirs.PKGDATADIR + schema_json = ovs.json.from_file(location) + + self.schema_json = schema_json + self._tables = {} + self._all = False + + def register_columns(self, table, columns): + """Registers interest in the given 'columns' of 'table'. Future calls + to get_idl_schema() will include 'table':column for each column in + 'columns'. This function automatically avoids adding duplicate entries + to the schema. + + 'table' must be a string. + 'columns' must be a list of strings. + """ + + assert type(table) is str + assert type(columns) is list + + columns = set(columns) | self._tables.get(table, set()) + self._tables[table] = columns + + def register_table(self, table): + """Registers interest in the given all columns of 'table'. Future calls + to get_idl_schema() will include all columns of 'table'. + + 'table' must be a string + """ + assert type(table) is str + self._tables[table] = set() # empty set means all columns in the table + + def register_all(self): + """Registers interest in every column of every table.""" + self._all = True + + def get_idl_schema(self): + """Gets a schema appropriate for the creation of an 'ovs.db.id.IDL' + object based on columns registered using the register_columns() + function.""" + + schema = ovs.db.schema.DbSchema.from_json(self.schema_json) + self.schema_json = None + + if not self._all: + schema_tables = {} + for table, columns in self._tables.iteritems(): + schema_tables[table] = ( + self._keep_table_columns(schema, table, columns)) + + schema.tables = schema_tables + return schema + + def _keep_table_columns(self, schema, table_name, columns): + assert table_name in schema.tables + table = schema.tables[table_name] + + if not columns: + # empty set means all columns in the table + return table + + new_columns = {} + for column_name in columns: + assert type(column_name) is str + assert column_name in table.columns + + new_columns[column_name] = table.columns[column_name] + + table.columns = new_columns + return table