1 # Copyright (c) 2009, 2010, 2011 Nicira Networks
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at:
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
20 from ovs.db import error
24 """Open vSwitch Database Interface Definition Language (OVSDB IDL).
26 The OVSDB IDL maintains an in-memory replica of a database. It issues RPC
27 requests to an OVSDB database server and parses the responses, converting
28 raw JSON into data structures that are easier for clients to digest.
30 The IDL also assists with issuing database transactions. The client
31 creates a transaction, manipulates the IDL data structures, and commits or
32 aborts the transaction. The IDL then composes and issues the necessary
33 JSON-RPC requests and reports to the client whether the transaction
34 completed successfully.
36 If 'schema_cb' is provided, it should be a callback function that accepts
37 an ovs.db.schema.DbSchema as its argument. It should determine whether the
38 schema is acceptable and raise an ovs.db.error.Error if it is not. It may
39 also delete any tables or columns from the schema that the client has no
40 interest in monitoring, to save time and bandwidth during monitoring. Its
41 return value is ignored."""
43 def __init__(self, remote, db_name, schema_cb=None):
44 """Creates and returns a connection to the database named 'db_name' on
45 'remote', which should be in a form acceptable to
46 ovs.jsonrpc.session.open(). The connection will maintain an in-memory
47 replica of the remote database."""
49 self.session = ovs.jsonrpc.Session.open(remote)
50 self.db_name = db_name
51 self.last_seqno = None
56 self.schema_cb = schema_cb
62 """Processes a batch of messages from the database server. Returns
63 True if the database as seen through the IDL changed, False if it did
64 not change. The initial fetch of the entire contents of the remote
65 database is considered to be one kind of change.
67 This function can return occasional false positives, that is, report
68 that the database changed even though it didn't. This happens if the
69 connection to the database drops and reconnects, which causes the
70 database contents to be reloaded even if they didn't change. (It could
71 also happen if the database server sends out a "change" that reflects
72 what we already thought was in the database, but the database server is
73 not supposed to do that.)
75 As an alternative to checking the return value, the client may check
76 for changes in the value returned by self.get_seqno()."""
77 initial_change_seqno = self.change_seqno
79 if self.session.is_connected():
80 seqno = self.session.get_seqno()
81 if seqno != self.last_seqno:
82 self.last_seqno = seqno
83 self.state = (self.__send_schema_request, None)
86 return initial_change_seqno != self.change_seqno
88 def wait(self, poller):
89 """Arranges for poller.block() to wake up when self.run() has something
90 to do or when activity occurs on a transaction on 'self'."""
91 self.session.wait(poller)
92 if self.state and self.state[1]:
96 """Returns a number that represents the IDL's state. When the IDL
97 updated (by self.run()), the return value changes."""
98 return self.change_seqno
100 def __send_schema_request(self):
101 msg = ovs.jsonrpc.Message.create_request("get_schema", [self.db_name])
102 self.session.send(msg)
103 self.state = (lambda: self.__recv_schema(msg.id), self.__recv_wait)
105 def __recv_schema(self, id):
106 msg = self.session.recv()
107 if msg and msg.type == ovs.jsonrpc.Message.T_REPLY and msg.id == id:
109 self.schema = ovs.db.schema.DbSchema.from_json(msg.result)
110 except error.Error, e:
111 logging.error("%s: parse error in received schema: %s"
118 self.schema_cb(self.schema)
119 except error.Error, e:
120 logging.error("%s: error validating schema: %s"
125 self.__send_monitor_request()
127 logging.error("%s: unexpected message expecting schema: %s"
128 % (self.remote, msg))
131 def __recv_wait(self, poller):
132 self.session.recv_wait(poller)
134 def __send_monitor_request(self):
135 monitor_requests = {}
136 for table in self.schema.tables.itervalues():
137 monitor_requests[table.name] = {"columns": table.columns.keys()}
138 msg = ovs.jsonrpc.Message.create_request(
139 "monitor", [self.db_name, None, monitor_requests])
140 self.session.send(msg)
141 self.state = (lambda: self.__recv_monitor_reply(msg.id),
144 def __recv_monitor_reply(self, id):
145 msg = self.session.recv()
146 if msg and msg.type == ovs.jsonrpc.Message.T_REPLY and msg.id == id:
148 self.change_seqno += 1
149 self.state = (self.__recv_update, self.__recv_wait)
151 self.__parse_update(msg.result)
152 except error.Error, e:
153 logging.error("%s: parse error in received schema: %s"
157 logging.error("%s: unexpected message expecting schema: %s"
158 % (self.remote, msg))
161 def __recv_update(self):
162 msg = self.session.recv()
163 if (msg and msg.type == ovs.jsonrpc.Message.T_NOTIFY and
164 type(msg.params) == list and len(msg.params) == 2 and
165 msg.params[0] is None):
166 self.__parse_update(msg.params[1])
168 logging.error("%s: unexpected message expecting update: %s"
169 % (self.remote, msg))
173 self.session.force_reconnect()
175 def __parse_update(self, update):
177 self.__do_parse_update(update)
178 except error.Error, e:
179 logging.error("%s: error parsing update: %s" % (self.remote, e))
181 def __do_parse_update(self, table_updates):
182 if type(table_updates) != dict:
183 raise error.Error("<table-updates> is not an object",
186 for table_name, table_update in table_updates.iteritems():
187 table = self.schema.tables.get(table_name)
189 raise error.Error('<table-updates> includes unknown '
190 'table "%s"' % table_name)
192 if type(table_update) != dict:
193 raise error.Error('<table-update> for table "%s" is not '
194 'an object' % table_name, table_update)
196 for uuid_string, row_update in table_update.iteritems():
197 if not ovs.ovsuuid.UUID.is_valid_string(uuid_string):
198 raise error.Error('<table-update> for table "%s" '
199 'contains bad UUID "%s" as member '
200 'name' % (table_name, uuid_string),
202 uuid = ovs.ovsuuid.UUID.from_string(uuid_string)
204 if type(row_update) != dict:
205 raise error.Error('<table-update> for table "%s" '
206 'contains <row-update> for %s that '
208 % (table_name, uuid_string))
210 parser = ovs.db.parser.Parser(row_update, "row-update")
211 old = parser.get_optional("old", [dict])
212 new = parser.get_optional("new", [dict])
215 if not old and not new:
216 raise error.Error('<row-update> missing "old" and '
217 '"new" members', row_update)
219 if self.__parse_row_update(table, uuid, old, new):
220 self.change_seqno += 1
222 def __parse_row_update(self, table, uuid, old, new):
223 """Returns True if a column changed, False otherwise."""
224 row = self.data[table.name].get(uuid)
229 del self.data[table.name][uuid]
233 logging.warning("cannot delete missing row %s from table %s"
234 % (uuid, table.name))
238 row = self.__create_row(table, uuid)
242 logging.warning("cannot add existing row %s to table %s"
243 % (uuid, table.name))
244 if self.__modify_row(table, row, new):
248 row = self.__create_row(table, uuid)
251 logging.warning("cannot modify missing row %s in table %s"
252 % (uuid, table_name))
253 if self.__modify_row(table, row, new):
257 def __modify_row(self, table, row, row_json):
259 for column_name, datum_json in row_json.iteritems():
260 column = table.columns.get(column_name)
263 logging.warning("unknown column %s updating table %s"
264 % (column_name, table.name))
268 datum = ovs.db.data.Datum.from_json(column.type, datum_json)
269 except error.Error, e:
271 logging.warning("error parsing column %s in table %s: %s"
272 % (column_name, table_name, e))
275 if datum != getattr(row, column_name):
276 setattr(row, column_name, datum)
279 # Didn't really change but the OVSDB monitor protocol always
280 # includes every value in a row.
286 for table_name in self.schema.tables:
287 if self.data[table_name] != {}:
288 self.change_seqno += 1
292 for table_name in self.schema.tables:
293 self.data[table_name] = {}
295 def __create_row(self, table, uuid):
296 row = self.data[table.name][uuid] = Row()
297 for column in table.columns.itervalues():
298 setattr(row, column.name, ovs.db.data.Datum.default(column.type))
301 def force_reconnect(self):
302 """Forces the IDL to drop its connection to the database and reconnect.
303 In the meantime, the contents of the IDL will not change."""
304 self.session.force_reconnect()
307 """A row within an Idl. Data for each column is stored as an attribute
308 with the same name as the column and using an ovs.db.data.Datum as the