1 # Copyright (c) 2009, 2010, 2011 Nicira Networks
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at:
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
19 from ovs.db import error
23 """Open vSwitch Database Interface Definition Language (OVSDB IDL).
25 The OVSDB IDL maintains an in-memory replica of a database. It issues RPC
26 requests to an OVSDB database server and parses the responses, converting
27 raw JSON into data structures that are easier for clients to digest.
29 The IDL also assists with issuing database transactions. The client
30 creates a transaction, manipulates the IDL data structures, and commits or
31 aborts the transaction. The IDL then composes and issues the necessary
32 JSON-RPC requests and reports to the client whether the transaction
33 completed successfully.
35 If 'schema_cb' is provided, it should be a callback function that accepts
36 an ovs.db.schema.DbSchema as its argument. It should determine whether the
37 schema is acceptable and raise an ovs.db.error.Error if it is not. It may
38 also delete any tables or columns from the schema that the client has no
39 interest in monitoring, to save time and bandwidth during monitoring. Its
40 return value is ignored."""
42 def __init__(self, remote, db_name, schema_cb=None):
43 """Creates and returns a connection to the database named 'db_name' on
44 'remote', which should be in a form acceptable to
45 ovs.jsonrpc.session.open(). The connection will maintain an in-memory
46 replica of the remote database."""
48 self.session = ovs.jsonrpc.Session.open(remote)
49 self.db_name = db_name
50 self.last_seqno = None
55 self.schema_cb = schema_cb
61 """Processes a batch of messages from the database server. Returns
62 True if the database as seen through the IDL changed, False if it did
63 not change. The initial fetch of the entire contents of the remote
64 database is considered to be one kind of change.
66 This function can return occasional false positives, that is, report
67 that the database changed even though it didn't. This happens if the
68 connection to the database drops and reconnects, which causes the
69 database contents to be reloaded even if they didn't change. (It could
70 also happen if the database server sends out a "change" that reflects
71 what we already thought was in the database, but the database server is
72 not supposed to do that.)
74 As an alternative to checking the return value, the client may check
75 for changes in the value returned by self.get_seqno()."""
76 initial_change_seqno = self.change_seqno
78 if self.session.is_connected():
79 seqno = self.session.get_seqno()
80 if seqno != self.last_seqno:
81 self.last_seqno = seqno
82 self.state = (self.__send_schema_request, None)
85 return initial_change_seqno != self.change_seqno
87 def wait(self, poller):
88 """Arranges for poller.block() to wake up when self.run() has something
89 to do or when activity occurs on a transaction on 'self'."""
90 self.session.wait(poller)
91 if self.state and self.state[1]:
95 """Returns a number that represents the IDL's state. When the IDL
96 updated (by self.run()), the return value changes."""
97 return self.change_seqno
99 def __send_schema_request(self):
100 msg = ovs.jsonrpc.Message.create_request("get_schema", [self.db_name])
101 self.session.send(msg)
102 self.state = (lambda: self.__recv_schema(msg.id), self.__recv_wait)
104 def __recv_schema(self, id):
105 msg = self.session.recv()
106 if msg and msg.type == ovs.jsonrpc.Message.T_REPLY and msg.id == id:
108 self.schema = ovs.db.schema.DbSchema.from_json(msg.result)
109 except error.Error, e:
110 logging.error("%s: parse error in received schema: %s"
117 self.schema_cb(self.schema)
118 except error.Error, e:
119 logging.error("%s: error validating schema: %s"
124 self.__send_monitor_request()
126 logging.error("%s: unexpected message expecting schema: %s"
127 % (self.remote, msg))
130 def __recv_wait(self, poller):
131 self.session.recv_wait(poller)
133 def __send_monitor_request(self):
134 monitor_requests = {}
135 for table in self.schema.tables.itervalues():
136 monitor_requests[table.name] = {"columns": table.columns.keys()}
137 msg = ovs.jsonrpc.Message.create_request(
138 "monitor", [self.db_name, None, monitor_requests])
139 self.session.send(msg)
140 self.state = (lambda: self.__recv_monitor_reply(msg.id),
143 def __recv_monitor_reply(self, id):
144 msg = self.session.recv()
145 if msg and msg.type == ovs.jsonrpc.Message.T_REPLY and msg.id == id:
147 self.change_seqno += 1
148 self.state = (self.__recv_update, self.__recv_wait)
150 self.__parse_update(msg.result)
151 except error.Error, e:
152 logging.error("%s: parse error in received schema: %s"
156 logging.error("%s: unexpected message expecting schema: %s"
157 % (self.remote, msg))
160 def __recv_update(self):
161 msg = self.session.recv()
162 if (msg and msg.type == ovs.jsonrpc.Message.T_NOTIFY and
163 type(msg.params) == list and len(msg.params) == 2 and
164 msg.params[0] is None):
165 self.__parse_update(msg.params[1])
167 logging.error("%s: unexpected message expecting update: %s"
168 % (self.remote, msg))
172 self.session.force_reconnect()
174 def __parse_update(self, update):
176 self.__do_parse_update(update)
177 except error.Error, e:
178 logging.error("%s: error parsing update: %s" % (self.remote, e))
180 def __do_parse_update(self, table_updates):
181 if type(table_updates) != dict:
182 raise error.Error("<table-updates> is not an object",
185 for table_name, table_update in table_updates.iteritems():
186 table = self.schema.tables.get(table_name)
188 raise error.Error('<table-updates> includes unknown '
189 'table "%s"' % table_name)
191 if type(table_update) != dict:
192 raise error.Error('<table-update> for table "%s" is not '
193 'an object' % table_name, table_update)
195 for uuid_string, row_update in table_update.iteritems():
196 if not ovs.ovsuuid.UUID.is_valid_string(uuid_string):
197 raise error.Error('<table-update> for table "%s" '
198 'contains bad UUID "%s" as member '
199 'name' % (table_name, uuid_string),
201 uuid = ovs.ovsuuid.UUID.from_string(uuid_string)
203 if type(row_update) != dict:
204 raise error.Error('<table-update> for table "%s" '
205 'contains <row-update> for %s that '
207 % (table_name, uuid_string))
209 old = row_update.get("old", None)
210 new = row_update.get("new", None)
212 if old is not None and type(old) != dict:
213 raise error.Error('"old" <row> is not an object', old)
214 if new is not None and type(new) != dict:
215 raise error.Error('"new" <row> is not an object', new)
216 if (old is not None) + (new is not None) != len(row_update):
217 raise error.Error("<row-update> contains unexpected "
218 "member", row_update)
219 if not old and not new:
220 raise error.Error('<row-update> missing "old" and '
221 '"new" members', row_update)
223 if self.__parse_row_update(table, uuid, old, new):
224 self.change_seqno += 1
226 def __parse_row_update(self, table, uuid, old, new):
227 """Returns True if a column changed, False otherwise."""
228 row = self.data[table.name].get(uuid)
232 del self.data[table.name][uuid]
235 logging.warning("cannot delete missing row %s from table %s"
236 % (uuid, table.name))
241 row = self.__create_row(table, uuid)
244 logging.warning("cannot add existing row %s to table %s"
245 % (uuid, table.name))
246 self.__modify_row(table, row, new)
249 row = self.__create_row(table, uuid)
251 logging.warning("cannot modify missing row %s in table %s"
252 % (uuid, table_name))
253 self.__modify_row(table, row, new)
256 def __modify_row(self, table, row, row_json):
258 for column_name, datum_json in row_json.iteritems():
259 column = table.columns.get(column_name)
262 logging.warning("unknown column %s updating table %s"
263 % (column_name, table.name))
267 datum = ovs.db.data.Datum.from_json(column.type, datum_json)
268 except error.Error, e:
270 logging.warning("error parsing column %s in table %s: %s"
271 % (column_name, table_name, e))
274 if datum != getattr(row, column_name):
275 setattr(row, column_name, datum)
278 # Didn't really change but the OVSDB monitor protocol always
279 # includes every value in a row.
285 for table_name in self.schema.tables:
286 if self.data[table_name] != {}:
287 self.change_seqno += 1
291 for table_name in self.schema.tables:
292 self.data[table_name] = {}
294 def __create_row(self, table, uuid):
297 row = self.data[table.name][uuid] = Row()
298 for column in table.columns.itervalues():
299 setattr(row, column.name, ovs.db.data.Datum.default(column.type))
302 def force_reconnect(self):
303 """Forces the IDL to drop its connection to the database and reconnect.
304 In the meantime, the contents of the IDL will not change."""
305 self.session.force_reconnect()