1 # Copyright (c) 2009, 2010, 2011 Nicira Networks
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at:
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
20 from ovs.db import error
24 """Open vSwitch Database Interface Definition Language (OVSDB IDL).
26 The OVSDB IDL maintains an in-memory replica of a database. It issues RPC
27 requests to an OVSDB database server and parses the responses, converting
28 raw JSON into data structures that are easier for clients to digest.
30 The IDL also assists with issuing database transactions. The client
31 creates a transaction, manipulates the IDL data structures, and commits or
32 aborts the transaction. The IDL then composes and issues the necessary
33 JSON-RPC requests and reports to the client whether the transaction
34 completed successfully.
36 If 'schema_cb' is provided, it should be a callback function that accepts
37 an ovs.db.schema.DbSchema as its argument. It should determine whether the
38 schema is acceptable and raise an ovs.db.error.Error if it is not. It may
39 also delete any tables or columns from the schema that the client has no
40 interest in monitoring, to save time and bandwidth during monitoring. Its
41 return value is ignored."""
43 def __init__(self, remote, db_name, schema_cb=None):
44 """Creates and returns a connection to the database named 'db_name' on
45 'remote', which should be in a form acceptable to
46 ovs.jsonrpc.session.open(). The connection will maintain an in-memory
47 replica of the remote database."""
49 self.session = ovs.jsonrpc.Session.open(remote)
50 self.db_name = db_name
51 self.last_seqno = None
56 self.schema_cb = schema_cb
62 """Processes a batch of messages from the database server. Returns
63 True if the database as seen through the IDL changed, False if it did
64 not change. The initial fetch of the entire contents of the remote
65 database is considered to be one kind of change.
67 This function can return occasional false positives, that is, report
68 that the database changed even though it didn't. This happens if the
69 connection to the database drops and reconnects, which causes the
70 database contents to be reloaded even if they didn't change. (It could
71 also happen if the database server sends out a "change" that reflects
72 what we already thought was in the database, but the database server is
73 not supposed to do that.)
75 As an alternative to checking the return value, the client may check
76 for changes in the value returned by self.get_seqno()."""
77 initial_change_seqno = self.change_seqno
79 if self.session.is_connected():
80 seqno = self.session.get_seqno()
81 if seqno != self.last_seqno:
82 self.last_seqno = seqno
83 self.state = (self.__send_schema_request, None)
86 return initial_change_seqno != self.change_seqno
88 def wait(self, poller):
89 """Arranges for poller.block() to wake up when self.run() has something
90 to do or when activity occurs on a transaction on 'self'."""
91 self.session.wait(poller)
92 if self.state and self.state[1]:
96 """Returns a number that represents the IDL's state. When the IDL
97 updated (by self.run()), the return value changes."""
98 return self.change_seqno
100 def __send_schema_request(self):
101 msg = ovs.jsonrpc.Message.create_request("get_schema", [self.db_name])
102 self.session.send(msg)
103 self.state = (lambda: self.__recv_schema(msg.id), self.__recv_wait)
105 def __recv_schema(self, id):
106 msg = self.session.recv()
107 if msg and msg.type == ovs.jsonrpc.Message.T_REPLY and msg.id == id:
109 self.schema = ovs.db.schema.DbSchema.from_json(msg.result)
110 except error.Error, e:
111 logging.error("%s: parse error in received schema: %s"
118 self.schema_cb(self.schema)
119 except error.Error, e:
120 logging.error("%s: error validating schema: %s"
125 self.__send_monitor_request()
127 logging.error("%s: unexpected message expecting schema: %s"
128 % (self.remote, msg))
131 def __recv_wait(self, poller):
132 self.session.recv_wait(poller)
134 def __send_monitor_request(self):
135 monitor_requests = {}
136 for table in self.schema.tables.itervalues():
137 monitor_requests[table.name] = {"columns": table.columns.keys()}
138 msg = ovs.jsonrpc.Message.create_request(
139 "monitor", [self.db_name, None, monitor_requests])
140 self.session.send(msg)
141 self.state = (lambda: self.__recv_monitor_reply(msg.id),
144 def __recv_monitor_reply(self, id):
145 msg = self.session.recv()
146 if msg and msg.type == ovs.jsonrpc.Message.T_REPLY and msg.id == id:
148 self.change_seqno += 1
149 self.state = (self.__recv_update, self.__recv_wait)
151 self.__parse_update(msg.result)
152 except error.Error, e:
153 logging.error("%s: parse error in received schema: %s"
157 logging.error("%s: unexpected message expecting schema: %s"
158 % (self.remote, msg))
161 def __recv_update(self):
162 msg = self.session.recv()
163 if (msg and msg.type == ovs.jsonrpc.Message.T_NOTIFY and
164 type(msg.params) == list and len(msg.params) == 2 and
165 msg.params[0] is None):
166 self.__parse_update(msg.params[1])
168 logging.error("%s: unexpected message expecting update: %s"
169 % (self.remote, msg))
173 self.session.force_reconnect()
175 def __parse_update(self, update):
177 self.__do_parse_update(update)
178 except error.Error, e:
179 logging.error("%s: error parsing update: %s" % (self.remote, e))
181 def __do_parse_update(self, table_updates):
182 if type(table_updates) != dict:
183 raise error.Error("<table-updates> is not an object",
186 for table_name, table_update in table_updates.iteritems():
187 table = self.schema.tables.get(table_name)
189 raise error.Error('<table-updates> includes unknown '
190 'table "%s"' % table_name)
192 if type(table_update) != dict:
193 raise error.Error('<table-update> for table "%s" is not '
194 'an object' % table_name, table_update)
196 for uuid_string, row_update in table_update.iteritems():
197 if not ovs.ovsuuid.UUID.is_valid_string(uuid_string):
198 raise error.Error('<table-update> for table "%s" '
199 'contains bad UUID "%s" as member '
200 'name' % (table_name, uuid_string),
202 uuid = ovs.ovsuuid.UUID.from_string(uuid_string)
204 if type(row_update) != dict:
205 raise error.Error('<table-update> for table "%s" '
206 'contains <row-update> for %s that '
208 % (table_name, uuid_string))
210 parser = ovs.db.parser.Parser(json, "row-update")
211 old = parser.get_optional("old", [dict])
212 new = parser.get_optional("new", [dict])
215 if not old and not new:
216 raise error.Error('<row-update> missing "old" and '
217 '"new" members', row_update)
219 if self.__parse_row_update(table, uuid, old, new):
220 self.change_seqno += 1
222 def __parse_row_update(self, table, uuid, old, new):
223 """Returns True if a column changed, False otherwise."""
224 row = self.data[table.name].get(uuid)
228 del self.data[table.name][uuid]
231 logging.warning("cannot delete missing row %s from table %s"
232 % (uuid, table.name))
237 row = self.__create_row(table, uuid)
240 logging.warning("cannot add existing row %s to table %s"
241 % (uuid, table.name))
242 self.__modify_row(table, row, new)
245 row = self.__create_row(table, uuid)
247 logging.warning("cannot modify missing row %s in table %s"
248 % (uuid, table_name))
249 self.__modify_row(table, row, new)
252 def __modify_row(self, table, row, row_json):
254 for column_name, datum_json in row_json.iteritems():
255 column = table.columns.get(column_name)
258 logging.warning("unknown column %s updating table %s"
259 % (column_name, table.name))
263 datum = ovs.db.data.Datum.from_json(column.type, datum_json)
264 except error.Error, e:
266 logging.warning("error parsing column %s in table %s: %s"
267 % (column_name, table_name, e))
270 if datum != getattr(row, column_name):
271 setattr(row, column_name, datum)
274 # Didn't really change but the OVSDB monitor protocol always
275 # includes every value in a row.
281 for table_name in self.schema.tables:
282 if self.data[table_name] != {}:
283 self.change_seqno += 1
287 for table_name in self.schema.tables:
288 self.data[table_name] = {}
290 def __create_row(self, table, uuid):
293 row = self.data[table.name][uuid] = Row()
294 for column in table.columns.itervalues():
295 setattr(row, column.name, ovs.db.data.Datum.default(column.type))
298 def force_reconnect(self):
299 """Forces the IDL to drop its connection to the database and reconnect.
300 In the meantime, the contents of the IDL will not change."""
301 self.session.force_reconnect()