8ce17515abecec12f848758ea39e89141a0548ea
[openvswitch] / python / ovs / db / idl.py
1 # Copyright (c) 2009, 2010, 2011 Nicira Networks
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at:
6 #
7 #     http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14
15 import logging
16
17 import ovs.jsonrpc
18 import ovs.db.schema
19 from ovs.db import error
20 import ovs.ovsuuid
21
22 class Idl:
23     """Open vSwitch Database Interface Definition Language (OVSDB IDL).
24
25     The OVSDB IDL maintains an in-memory replica of a database.  It issues RPC
26     requests to an OVSDB database server and parses the responses, converting
27     raw JSON into data structures that are easier for clients to digest.
28
29     The IDL also assists with issuing database transactions.  The client
30     creates a transaction, manipulates the IDL data structures, and commits or
31     aborts the transaction.  The IDL then composes and issues the necessary
32     JSON-RPC requests and reports to the client whether the transaction
33     completed successfully.
34
35     If 'schema_cb' is provided, it should be a callback function that accepts
36     an ovs.db.schema.DbSchema as its argument.  It should determine whether the
37     schema is acceptable and raise an ovs.db.error.Error if it is not.  It may
38     also delete any tables or columns from the schema that the client has no
39     interest in monitoring, to save time and bandwidth during monitoring.  Its
40     return value is ignored."""
41
42     def __init__(self, remote, db_name, schema_cb=None):
43         """Creates and returns a connection to the database named 'db_name' on
44         'remote', which should be in a form acceptable to
45         ovs.jsonrpc.session.open().  The connection will maintain an in-memory
46         replica of the remote database."""
47         self.remote = remote
48         self.session = ovs.jsonrpc.Session.open(remote)
49         self.db_name = db_name
50         self.last_seqno = None
51         self.schema = None
52         self.state = None
53         self.change_seqno = 0
54         self.data = {}
55         self.schema_cb = schema_cb
56
57     def close(self):
58         self.session.close()
59
60     def run(self):
61         """Processes a batch of messages from the database server.  Returns
62         True if the database as seen through the IDL changed, False if it did
63         not change.  The initial fetch of the entire contents of the remote
64         database is considered to be one kind of change.
65
66         This function can return occasional false positives, that is, report
67         that the database changed even though it didn't.  This happens if the
68         connection to the database drops and reconnects, which causes the
69         database contents to be reloaded even if they didn't change.  (It could
70         also happen if the database server sends out a "change" that reflects
71         what we already thought was in the database, but the database server is
72         not supposed to do that.)
73
74         As an alternative to checking the return value, the client may check
75         for changes in the value returned by self.get_seqno()."""
76         initial_change_seqno = self.change_seqno
77         self.session.run()
78         if self.session.is_connected():
79             seqno = self.session.get_seqno()
80             if seqno != self.last_seqno:
81                 self.last_seqno = seqno
82                 self.state = (self.__send_schema_request, None)
83             if self.state:
84                 self.state[0]()
85         return initial_change_seqno != self.change_seqno
86
87     def wait(self, poller):
88         """Arranges for poller.block() to wake up when self.run() has something
89         to do or when activity occurs on a transaction on 'self'."""
90         self.session.wait(poller)
91         if self.state and self.state[1]:
92             self.state[1](poller)
93
94     def get_seqno(self):
95         """Returns a number that represents the IDL's state.  When the IDL
96         updated (by self.run()), the return value changes."""
97         return self.change_seqno
98         
99     def __send_schema_request(self):
100         msg = ovs.jsonrpc.Message.create_request("get_schema", [self.db_name])
101         self.session.send(msg)
102         self.state = (lambda: self.__recv_schema(msg.id), self.__recv_wait)
103
104     def __recv_schema(self, id):
105         msg = self.session.recv()
106         if msg and msg.type == ovs.jsonrpc.Message.T_REPLY and msg.id == id:
107             try:
108                 self.schema = ovs.db.schema.DbSchema.from_json(msg.result)
109             except error.Error, e:
110                 logging.error("%s: parse error in received schema: %s"
111                               % (self.remote, e))
112                 self.__error()
113                 return
114
115             if self.schema_cb:
116                 try:
117                     self.schema_cb(self.schema)
118                 except error.Error, e:
119                     logging.error("%s: error validating schema: %s"
120                                   % (self.remote, e))
121                     self.__error()
122                     return
123
124             self.__send_monitor_request()
125         elif msg:
126             logging.error("%s: unexpected message expecting schema: %s"
127                           % (self.remote, msg))
128             self.__error()
129             
130     def __recv_wait(self, poller):
131         self.session.recv_wait(poller)
132
133     def __send_monitor_request(self):
134         monitor_requests = {}
135         for table in self.schema.tables.itervalues():
136             monitor_requests[table.name] = {"columns": table.columns.keys()}
137         msg = ovs.jsonrpc.Message.create_request(
138             "monitor", [self.db_name, None, monitor_requests])
139         self.session.send(msg)
140         self.state = (lambda: self.__recv_monitor_reply(msg.id),
141                       self.__recv_wait)
142
143     def __recv_monitor_reply(self, id):
144         msg = self.session.recv()
145         if msg and msg.type == ovs.jsonrpc.Message.T_REPLY and msg.id == id:
146             try:
147                 self.change_seqno += 1
148                 self.state = (self.__recv_update, self.__recv_wait)
149                 self.__clear()
150                 self.__parse_update(msg.result)
151             except error.Error, e:
152                 logging.error("%s: parse error in received schema: %s"
153                               % (self.remote, e))
154                 self.__error()
155         elif msg:
156             logging.error("%s: unexpected message expecting schema: %s"
157                           % (self.remote, msg))
158             self.__error()
159
160     def __recv_update(self):
161         msg = self.session.recv()
162         if (msg and msg.type == ovs.jsonrpc.Message.T_NOTIFY and
163             type(msg.params) == list and len(msg.params) == 2 and
164             msg.params[0] is None):
165             self.__parse_update(msg.params[1])
166         elif msg:
167             logging.error("%s: unexpected message expecting update: %s"
168                           % (self.remote, msg))
169             self.__error()
170
171     def __error(self):
172         self.session.force_reconnect()
173
174     def __parse_update(self, update):
175         try:
176             self.__do_parse_update(update)
177         except error.Error, e:
178             logging.error("%s: error parsing update: %s" % (self.remote, e))
179
180     def __do_parse_update(self, table_updates):
181         if type(table_updates) != dict:
182             raise error.Error("<table-updates> is not an object",
183                               table_updates)
184
185         for table_name, table_update in table_updates.iteritems():
186             table = self.schema.tables.get(table_name)
187             if not table:
188                 raise error.Error('<table-updates> includes unknown '
189                                   'table "%s"' % table_name)
190
191             if type(table_update) != dict:
192                 raise error.Error('<table-update> for table "%s" is not '
193                                   'an object' % table_name, table_update)
194
195             for uuid_string, row_update in table_update.iteritems():
196                 if not ovs.ovsuuid.UUID.is_valid_string(uuid_string):
197                     raise error.Error('<table-update> for table "%s" '
198                                       'contains bad UUID "%s" as member '
199                                       'name' % (table_name, uuid_string),
200                                       table_update)
201                 uuid = ovs.ovsuuid.UUID.from_string(uuid_string)
202
203                 if type(row_update) != dict:
204                     raise error.Error('<table-update> for table "%s" '
205                                       'contains <row-update> for %s that '
206                                       'is not an object'
207                                       % (table_name, uuid_string))
208
209                 old = row_update.get("old", None)
210                 new = row_update.get("new", None)
211
212                 if old is not None and type(old) != dict:
213                     raise error.Error('"old" <row> is not an object', old)
214                 if new is not None and type(new) != dict:
215                     raise error.Error('"new" <row> is not an object', new)
216                 if (old is not None) + (new is not None) != len(row_update):
217                     raise error.Error("<row-update> contains unexpected "
218                                       "member", row_update)
219                 if not old and not new:
220                     raise error.Error('<row-update> missing "old" and '
221                                       '"new" members', row_update)
222
223                 if self.__parse_row_update(table, uuid, old, new):
224                     self.change_seqno += 1
225
226     def __parse_row_update(self, table, uuid, old, new):
227         """Returns True if a column changed, False otherwise."""
228         row = self.data[table.name].get(uuid)
229         if not new:
230             # Delete row.
231             if row:
232                 del self.data[table.name][uuid]
233             else:
234                 # XXX rate-limit
235                 logging.warning("cannot delete missing row %s from table %s"
236                                 % (uuid, table.name))
237                 return False
238         elif not old:
239             # Insert row.
240             if not row:
241                 row = self.__create_row(table, uuid)
242             else:
243                 # XXX rate-limit
244                 logging.warning("cannot add existing row %s to table %s"
245                                 % (uuid, table.name))
246             self.__modify_row(table, row, new)
247         else:
248             if not row:
249                 row = self.__create_row(table, uuid)
250                 # XXX rate-limit
251                 logging.warning("cannot modify missing row %s in table %s"
252                                 % (uuid, table_name))
253             self.__modify_row(table, row, new)
254         return True
255
256     def __modify_row(self, table, row, row_json):
257         changed = False
258         for column_name, datum_json in row_json.iteritems():
259             column = table.columns.get(column_name)
260             if not column:
261                 # XXX rate-limit
262                 logging.warning("unknown column %s updating table %s"
263                                 % (column_name, table.name))
264                 continue
265
266             try:
267                 datum = ovs.db.data.Datum.from_json(column.type, datum_json)
268             except error.Error, e:
269                 # XXX rate-limit
270                 logging.warning("error parsing column %s in table %s: %s"
271                                 % (column_name, table_name, e))
272                 continue
273
274             if datum != getattr(row, column_name):
275                 setattr(row, column_name, datum)
276                 changed = True
277             else:
278                 # Didn't really change but the OVSDB monitor protocol always
279                 # includes every value in a row.
280                 pass
281         return changed
282
283     def __clear(self):
284         if self.data != {}:
285             for table_name in self.schema.tables:
286                 if self.data[table_name] != {}:
287                     self.change_seqno += 1
288                     break
289
290         self.data = {}
291         for table_name in self.schema.tables:
292             self.data[table_name] = {}
293
294     def __create_row(self, table, uuid):
295         class Row(object):
296             pass
297         row = self.data[table.name][uuid] = Row()
298         for column in table.columns.itervalues():
299             setattr(row, column.name, ovs.db.data.Datum.default(column.type))
300         return row
301
302     def force_reconnect(self):
303         """Forces the IDL to drop its connection to the database and reconnect.
304         In the meantime, the contents of the IDL will not change."""
305         self.session.force_reconnect()