ovs.db.idl: Fix call to ovs.db.parser.Parser constructor.
[openvswitch] / python / ovs / db / idl.py
1 # Copyright (c) 2009, 2010, 2011 Nicira Networks
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at:
6 #
7 #     http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14
15 import logging
16
17 import ovs.jsonrpc
18 import ovs.db.parser
19 import ovs.db.schema
20 from ovs.db import error
21 import ovs.ovsuuid
22
23 class Idl:
24     """Open vSwitch Database Interface Definition Language (OVSDB IDL).
25
26     The OVSDB IDL maintains an in-memory replica of a database.  It issues RPC
27     requests to an OVSDB database server and parses the responses, converting
28     raw JSON into data structures that are easier for clients to digest.
29
30     The IDL also assists with issuing database transactions.  The client
31     creates a transaction, manipulates the IDL data structures, and commits or
32     aborts the transaction.  The IDL then composes and issues the necessary
33     JSON-RPC requests and reports to the client whether the transaction
34     completed successfully.
35
36     If 'schema_cb' is provided, it should be a callback function that accepts
37     an ovs.db.schema.DbSchema as its argument.  It should determine whether the
38     schema is acceptable and raise an ovs.db.error.Error if it is not.  It may
39     also delete any tables or columns from the schema that the client has no
40     interest in monitoring, to save time and bandwidth during monitoring.  Its
41     return value is ignored."""
42
43     def __init__(self, remote, db_name, schema_cb=None):
44         """Creates and returns a connection to the database named 'db_name' on
45         'remote', which should be in a form acceptable to
46         ovs.jsonrpc.session.open().  The connection will maintain an in-memory
47         replica of the remote database."""
48         self.remote = remote
49         self.session = ovs.jsonrpc.Session.open(remote)
50         self.db_name = db_name
51         self.last_seqno = None
52         self.schema = None
53         self.state = None
54         self.change_seqno = 0
55         self.data = {}
56         self.schema_cb = schema_cb
57
58     def close(self):
59         self.session.close()
60
61     def run(self):
62         """Processes a batch of messages from the database server.  Returns
63         True if the database as seen through the IDL changed, False if it did
64         not change.  The initial fetch of the entire contents of the remote
65         database is considered to be one kind of change.
66
67         This function can return occasional false positives, that is, report
68         that the database changed even though it didn't.  This happens if the
69         connection to the database drops and reconnects, which causes the
70         database contents to be reloaded even if they didn't change.  (It could
71         also happen if the database server sends out a "change" that reflects
72         what we already thought was in the database, but the database server is
73         not supposed to do that.)
74
75         As an alternative to checking the return value, the client may check
76         for changes in the value returned by self.get_seqno()."""
77         initial_change_seqno = self.change_seqno
78         self.session.run()
79         if self.session.is_connected():
80             seqno = self.session.get_seqno()
81             if seqno != self.last_seqno:
82                 self.last_seqno = seqno
83                 self.state = (self.__send_schema_request, None)
84             if self.state:
85                 self.state[0]()
86         return initial_change_seqno != self.change_seqno
87
88     def wait(self, poller):
89         """Arranges for poller.block() to wake up when self.run() has something
90         to do or when activity occurs on a transaction on 'self'."""
91         self.session.wait(poller)
92         if self.state and self.state[1]:
93             self.state[1](poller)
94
95     def get_seqno(self):
96         """Returns a number that represents the IDL's state.  When the IDL
97         updated (by self.run()), the return value changes."""
98         return self.change_seqno
99         
100     def __send_schema_request(self):
101         msg = ovs.jsonrpc.Message.create_request("get_schema", [self.db_name])
102         self.session.send(msg)
103         self.state = (lambda: self.__recv_schema(msg.id), self.__recv_wait)
104
105     def __recv_schema(self, id):
106         msg = self.session.recv()
107         if msg and msg.type == ovs.jsonrpc.Message.T_REPLY and msg.id == id:
108             try:
109                 self.schema = ovs.db.schema.DbSchema.from_json(msg.result)
110             except error.Error, e:
111                 logging.error("%s: parse error in received schema: %s"
112                               % (self.remote, e))
113                 self.__error()
114                 return
115
116             if self.schema_cb:
117                 try:
118                     self.schema_cb(self.schema)
119                 except error.Error, e:
120                     logging.error("%s: error validating schema: %s"
121                                   % (self.remote, e))
122                     self.__error()
123                     return
124
125             self.__send_monitor_request()
126         elif msg:
127             logging.error("%s: unexpected message expecting schema: %s"
128                           % (self.remote, msg))
129             self.__error()
130             
131     def __recv_wait(self, poller):
132         self.session.recv_wait(poller)
133
134     def __send_monitor_request(self):
135         monitor_requests = {}
136         for table in self.schema.tables.itervalues():
137             monitor_requests[table.name] = {"columns": table.columns.keys()}
138         msg = ovs.jsonrpc.Message.create_request(
139             "monitor", [self.db_name, None, monitor_requests])
140         self.session.send(msg)
141         self.state = (lambda: self.__recv_monitor_reply(msg.id),
142                       self.__recv_wait)
143
144     def __recv_monitor_reply(self, id):
145         msg = self.session.recv()
146         if msg and msg.type == ovs.jsonrpc.Message.T_REPLY and msg.id == id:
147             try:
148                 self.change_seqno += 1
149                 self.state = (self.__recv_update, self.__recv_wait)
150                 self.__clear()
151                 self.__parse_update(msg.result)
152             except error.Error, e:
153                 logging.error("%s: parse error in received schema: %s"
154                               % (self.remote, e))
155                 self.__error()
156         elif msg:
157             logging.error("%s: unexpected message expecting schema: %s"
158                           % (self.remote, msg))
159             self.__error()
160
161     def __recv_update(self):
162         msg = self.session.recv()
163         if (msg and msg.type == ovs.jsonrpc.Message.T_NOTIFY and
164             type(msg.params) == list and len(msg.params) == 2 and
165             msg.params[0] is None):
166             self.__parse_update(msg.params[1])
167         elif msg:
168             logging.error("%s: unexpected message expecting update: %s"
169                           % (self.remote, msg))
170             self.__error()
171
172     def __error(self):
173         self.session.force_reconnect()
174
175     def __parse_update(self, update):
176         try:
177             self.__do_parse_update(update)
178         except error.Error, e:
179             logging.error("%s: error parsing update: %s" % (self.remote, e))
180
181     def __do_parse_update(self, table_updates):
182         if type(table_updates) != dict:
183             raise error.Error("<table-updates> is not an object",
184                               table_updates)
185
186         for table_name, table_update in table_updates.iteritems():
187             table = self.schema.tables.get(table_name)
188             if not table:
189                 raise error.Error('<table-updates> includes unknown '
190                                   'table "%s"' % table_name)
191
192             if type(table_update) != dict:
193                 raise error.Error('<table-update> for table "%s" is not '
194                                   'an object' % table_name, table_update)
195
196             for uuid_string, row_update in table_update.iteritems():
197                 if not ovs.ovsuuid.UUID.is_valid_string(uuid_string):
198                     raise error.Error('<table-update> for table "%s" '
199                                       'contains bad UUID "%s" as member '
200                                       'name' % (table_name, uuid_string),
201                                       table_update)
202                 uuid = ovs.ovsuuid.UUID.from_string(uuid_string)
203
204                 if type(row_update) != dict:
205                     raise error.Error('<table-update> for table "%s" '
206                                       'contains <row-update> for %s that '
207                                       'is not an object'
208                                       % (table_name, uuid_string))
209
210                 parser = ovs.db.parser.Parser(row_update, "row-update")
211                 old = parser.get_optional("old", [dict])
212                 new = parser.get_optional("new", [dict])
213                 parser.finish()
214
215                 if not old and not new:
216                     raise error.Error('<row-update> missing "old" and '
217                                       '"new" members', row_update)
218
219                 if self.__parse_row_update(table, uuid, old, new):
220                     self.change_seqno += 1
221
222     def __parse_row_update(self, table, uuid, old, new):
223         """Returns True if a column changed, False otherwise."""
224         row = self.data[table.name].get(uuid)
225         changed = False
226         if not new:
227             # Delete row.
228             if row:
229                 del self.data[table.name][uuid]
230                 changed = True
231             else:
232                 # XXX rate-limit
233                 logging.warning("cannot delete missing row %s from table %s"
234                                 % (uuid, table.name))
235         elif not old:
236             # Insert row.
237             if not row:
238                 row = self.__create_row(table, uuid)
239                 changed = True
240             else:
241                 # XXX rate-limit
242                 logging.warning("cannot add existing row %s to table %s"
243                                 % (uuid, table.name))
244             if self.__modify_row(table, row, new):
245                 changed = True
246         else:
247             if not row:
248                 row = self.__create_row(table, uuid)
249                 changed = True
250                 # XXX rate-limit
251                 logging.warning("cannot modify missing row %s in table %s"
252                                 % (uuid, table_name))
253             if self.__modify_row(table, row, new):
254                 changed = True
255         return changed
256
257     def __modify_row(self, table, row, row_json):
258         changed = False
259         for column_name, datum_json in row_json.iteritems():
260             column = table.columns.get(column_name)
261             if not column:
262                 # XXX rate-limit
263                 logging.warning("unknown column %s updating table %s"
264                                 % (column_name, table.name))
265                 continue
266
267             try:
268                 datum = ovs.db.data.Datum.from_json(column.type, datum_json)
269             except error.Error, e:
270                 # XXX rate-limit
271                 logging.warning("error parsing column %s in table %s: %s"
272                                 % (column_name, table_name, e))
273                 continue
274
275             if datum != getattr(row, column_name):
276                 setattr(row, column_name, datum)
277                 changed = True
278             else:
279                 # Didn't really change but the OVSDB monitor protocol always
280                 # includes every value in a row.
281                 pass
282         return changed
283
284     def __clear(self):
285         if self.data != {}:
286             for table_name in self.schema.tables:
287                 if self.data[table_name] != {}:
288                     self.change_seqno += 1
289                     break
290
291         self.data = {}
292         for table_name in self.schema.tables:
293             self.data[table_name] = {}
294
295     def __create_row(self, table, uuid):
296         row = self.data[table.name][uuid] = Row()
297         for column in table.columns.itervalues():
298             setattr(row, column.name, ovs.db.data.Datum.default(column.type))
299         return row
300
301     def force_reconnect(self):
302         """Forces the IDL to drop its connection to the database and reconnect.
303         In the meantime, the contents of the IDL will not change."""
304         self.session.force_reconnect()
305
306 class Row(object):
307     """A row within an Idl.  Data for each column is stored as an attribute
308     with the same name as the column and using an ovs.db.data.Datum as the
309     value."""
310     pass
311