ovs.jsonrpc: Include result in Message.__str__() output.
[openvswitch] / python / ovs / jsonrpc.py
1 # Copyright (c) 2010, 2011 Nicira Networks
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at:
6 #
7 #     http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14
15 import errno
16 import logging
17 import os
18
19 import ovs.json
20 import ovs.poller
21 import ovs.reconnect
22 import ovs.stream
23 import ovs.timeval
24
25 EOF = -1
26
27 class Message(object):
28     T_REQUEST = 0               # Request.
29     T_NOTIFY = 1                # Notification.
30     T_REPLY = 2                 # Successful reply.
31     T_ERROR = 3                 # Error reply.
32
33     __types = {T_REQUEST: "request",
34                T_NOTIFY: "notification",
35                T_REPLY: "reply",
36                T_ERROR: "error"}
37
38     def __init__(self, type_, method, params, result, error, id):
39         self.type = type_
40         self.method = method
41         self.params = params
42         self.result = result
43         self.error = error
44         self.id = id
45
46     _next_id = 0
47     @staticmethod
48     def _create_id():
49         this_id = Message._next_id
50         Message._next_id += 1
51         return this_id
52
53     @staticmethod
54     def create_request(method, params):
55         return Message(Message.T_REQUEST, method, params, None, None,
56                        Message._create_id())
57
58     @staticmethod
59     def create_notify(method, params):
60         return Message(Message.T_NOTIFY, method, params, None, None,
61                        None)
62
63     @staticmethod
64     def create_reply(result, id):
65         return Message(Message.T_REPLY, None, None, result, None, id)
66
67     @staticmethod
68     def create_error(error, id):
69         return Message(Message.T_ERROR, None, None, None, error, id)
70
71     @staticmethod
72     def type_to_string(type_):
73         return Message.__types[type_]
74
75     def __validate_arg(self, value, name, must_have):
76         if (value is not None) == (must_have != 0):
77             return None
78         else:
79             type_name = Message.type_to_string(self.type)
80             if must_have:
81                 verb = "must"
82             else:
83                 verb = "must not"
84             return "%s %s have \"%s\"" % (type_name, verb, name)
85
86     def is_valid(self):
87         if self.params is not None and type(self.params) != list:
88             return "\"params\" must be JSON array"
89
90         pattern = {Message.T_REQUEST: 0x11001,
91                    Message.T_NOTIFY:  0x11000,
92                    Message.T_REPLY:   0x00101,
93                    Message.T_ERROR:   0x00011}.get(self.type)
94         if pattern is None:
95             return "invalid JSON-RPC message type %s" % self.type
96
97         return (
98             self.__validate_arg(self.method, "method", pattern & 0x10000) or
99             self.__validate_arg(self.params, "params", pattern & 0x1000) or
100             self.__validate_arg(self.result, "result", pattern & 0x100) or
101             self.__validate_arg(self.error, "error", pattern & 0x10) or
102             self.__validate_arg(self.id, "id", pattern & 0x1))
103
104     @staticmethod
105     def from_json(json):
106         if type(json) != dict:
107             return "message is not a JSON object"
108
109         # Make a copy to avoid modifying the caller's dict.
110         json = dict(json)
111
112         if "method" in json:
113             method = json.pop("method")
114             if type(method) not in [str, unicode]:
115                 return "method is not a JSON string"
116         else:
117             method = None
118
119         params = json.pop("params", None)
120         result = json.pop("result", None)
121         error = json.pop("error", None)
122         id_ = json.pop("id", None)
123         if len(json):
124             return "message has unexpected member \"%s\"" % json.popitem()[0]
125
126         if result is not None:
127             msg_type = Message.T_REPLY
128         elif error is not None:
129             msg_type = Message.T_ERROR
130         elif id_ is not None:
131             msg_type = Message.T_REQUEST
132         else:
133             msg_type = Message.T_NOTIFY
134         
135         msg = Message(msg_type, method, params, result, error, id_)
136         validation_error = msg.is_valid()
137         if validation_error is not None:
138             return validation_error
139         else:
140             return msg
141
142     def to_json(self):
143         json = {}
144
145         if self.method is not None:
146             json["method"] = self.method
147
148         if self.params is not None:
149             json["params"] = self.params
150
151         if self.result is not None or self.type == Message.T_ERROR:
152             json["result"] = self.result
153
154         if self.error is not None or self.type == Message.T_REPLY:
155             json["error"] = self.error
156
157         if self.id is not None or self.type == Message.T_NOTIFY:
158             json["id"] = self.id
159
160         return json
161
162     def __str__(self):
163         s = [Message.type_to_string(self.type)]
164         if self.method is not None:
165             s.append("method=\"%s\"" % self.method)
166         if self.params is not None:
167             s.append("params=" + ovs.json.to_string(self.params))
168         if self.result is not None:
169             s.append("result=" + ovs.json.to_string(self.result))
170         if self.error is not None:
171             s.append("error=" + ovs.json.to_string(self.error))
172         if self.id is not None:
173             s.append("id=" + ovs.json.to_string(self.id))
174         return ", ".join(s)
175
176 class Connection(object):
177     def __init__(self, stream):
178         self.name = stream.name
179         self.stream = stream
180         self.status = 0
181         self.input = ""
182         self.output = ""
183         self.parser = None
184
185     def close(self):
186         self.stream.close()
187         self.stream = None
188
189     def run(self):
190         if self.status:
191             return
192
193         while len(self.output):
194             retval = self.stream.send(self.output)
195             if retval >= 0:
196                 self.output = self.output[retval:]
197             else:
198                 if retval != -errno.EAGAIN:
199                     logging.warn("%s: send error: %s" % (self.name,
200                                                          os.strerror(-retval)))
201                     self.error(-retval)
202                 break
203
204     def wait(self, poller):
205         if not self.status:
206             self.stream.run_wait(poller)
207             if len(self.output):
208                 self.stream.send_wait()
209
210     def get_status(self):
211         return self.status
212
213     def get_backlog(self):
214         if self.status != 0:
215             return 0
216         else:
217             return len(self.output)
218
219     def __log_msg(self, title, msg):
220         logging.debug("%s: %s %s" % (self.name, title, msg))
221
222     def send(self, msg):
223         if self.status:
224             return self.status
225
226         self.__log_msg("send", msg)
227
228         was_empty = len(self.output) == 0
229         self.output += ovs.json.to_string(msg.to_json())
230         if was_empty:
231             self.run()
232         return self.status
233
234     def send_block(self, msg):
235         error = self.send(msg)
236         if error:
237             return error
238
239         while True:
240             self.run()
241             if not self.get_backlog() or self.get_status():
242                 return self.status
243
244             poller = ovs.poller.Poller()
245             self.wait(poller)
246             poller.block()
247
248     def recv(self):
249         if self.status:
250             return self.status, None
251
252         while True:
253             if not self.input:
254                 error, data = self.stream.recv(4096)
255                 if error:
256                     if error == errno.EAGAIN:
257                         return error, None
258                     else:
259                         # XXX rate-limit
260                         logging.warning("%s: receive error: %s"
261                                         % (self.name, os.strerror(error)))
262                         self.error(error)
263                         return self.status, None
264                 elif not data:
265                     self.error(EOF)
266                     return EOF, None
267                 else:
268                     self.input += data
269             else:
270                 if self.parser is None:
271                     self.parser = ovs.json.Parser()
272                 self.input = self.input[self.parser.feed(self.input):]
273                 if self.parser.is_done():
274                     msg = self.__process_msg()
275                     if msg:
276                         return 0, msg
277                     else:
278                         return self.status, None
279
280     def recv_block(self):
281         while True:
282             error, msg = self.recv()
283             if error != errno.EAGAIN:
284                 return error, msg
285
286             self.run()
287
288             poller = ovs.poller.Poller()
289             self.wait(poller)
290             self.recv_wait(poller)
291             poller.block()
292     
293     def transact_block(self, request):
294         id_ = request.id
295
296         error = self.send(request)
297         reply = None
298         while not error:
299             error, reply = self.recv_block()
300             if reply and reply.type == Message.T_REPLY and reply.id == id_:
301                 break
302         return error, reply
303
304     def __process_msg(self):
305         json = self.parser.finish()
306         self.parser = None
307         if type(json) in [str, unicode]:
308             # XXX rate-limit
309             logging.warning("%s: error parsing stream: %s" % (self.name, json))
310             self.error(errno.EPROTO)
311             return
312
313         msg = Message.from_json(json)
314         if not isinstance(msg, Message):
315             # XXX rate-limit
316             logging.warning("%s: received bad JSON-RPC message: %s"
317                             % (self.name, msg))
318             self.error(errno.EPROTO)
319             return
320
321         self.__log_msg("received", msg)
322         return msg
323         
324     def recv_wait(self, poller):
325         if self.status or self.input:
326             poller.immediate_wake()
327         else:
328             self.stream.recv_wait(poller)
329
330     def error(self, error):
331         if self.status == 0:
332             self.status = error
333             self.stream.close()
334             self.output = ""
335             
336 class Session(object):
337     """A JSON-RPC session with reconnection."""
338
339     def __init__(self, reconnect, rpc):
340         self.reconnect = reconnect
341         self.rpc = rpc
342         self.stream = None
343         self.pstream = None
344         self.seqno = 0
345
346     @staticmethod
347     def open(name):
348         """Creates and returns a Session that maintains a JSON-RPC session to
349         'name', which should be a string acceptable to ovs.stream.Stream or
350         ovs.stream.PassiveStream's initializer.
351         
352         If 'name' is an active connection method, e.g. "tcp:127.1.2.3", the new
353         session connects and reconnects, with back-off, to 'name'.
354         
355         If 'name' is a passive connection method, e.g. "ptcp:", the new session
356         listens for connections to 'name'.  It maintains at most one connection
357         at any given time.  Any new connection causes the previous one (if any)
358         to be dropped."""
359         reconnect = ovs.reconnect.Reconnect(ovs.timeval.msec())
360         reconnect.set_name(name)
361         reconnect.enable(ovs.timeval.msec())
362
363         if ovs.stream.PassiveStream.is_valid_name(name):
364             reconnect.set_passive(True, ovs.timeval.msec())
365
366         return Session(reconnect, None)
367
368     @staticmethod
369     def open_unreliably(jsonrpc):
370         reconnect = ovs.reconnect.Reconnect(ovs.timeval.msec())
371         reconnect.set_quiet(True)
372         reconnect.set_name(jsonrpc.name)
373         reconnect.set_max_tries(0)
374         reconnect.connected(ovs.timeval.msec())
375         return Session(reconnect, jsonrpc)
376
377     def close(self):
378         if self.rpc is not None:
379             self.rpc.close()
380             self.rpc = None
381         if self.stream is not None:
382             self.stream.close()
383             self.stream = None
384         if self.pstream is not None:
385             self.pstream.close()
386             self.pstream = None
387
388     def __disconnect(self):
389         if self.rpc is not None:
390             self.rpc.error(EOF)
391             self.rpc.close()
392             self.rpc = None
393             self.seqno += 1
394         elif self.stream is not None:
395             self.stream.close()
396             self.stream = None
397             self.seqno += 1
398     
399     def __connect(self):
400         self.__disconnect()
401
402         name = self.reconnect.get_name()
403         if not self.reconnect.is_passive():
404             error, self.stream = ovs.stream.Stream.open(name)
405             if not error:
406                 self.reconnect.connecting(ovs.timeval.msec())
407             else:
408                 self.reconnect.connect_failed(ovs.timeval.msec(), error)
409         elif self.pstream is not None:
410             error, self.pstream = ovs.stream.PassiveStream.open(name)
411             if not error:
412                 self.reconnect.listening(ovs.timeval.msec())
413             else:
414                 self.reconnect.connect_failed(ovs.timeval.msec(), error)
415
416         self.seqno += 1
417
418     def run(self):
419         if self.pstream is not None:
420             error, stream = self.pstream.accept()
421             if error == 0:
422                 if self.rpc or self.stream:
423                     # XXX rate-limit
424                     logging.info("%s: new connection replacing active "
425                                  "connection" % self.reconnect.get_name())
426                     self.__disconnect()
427                 self.reconnect.connected(ovs.timeval.msec())
428                 self.rpc = Connection(stream)
429             elif error != errno.EAGAIN:
430                 self.reconnect.listen_error(ovs.timeval.msec(), error)
431                 self.pstream.close()
432                 self.pstream = None
433
434         if self.rpc:
435             self.rpc.run()
436             error = self.rpc.get_status()
437             if error != 0:
438                 self.reconnect.disconnected(ovs.timeval.msec(), error)
439                 self.__disconnect()
440         elif self.stream is not None:
441             self.stream.run()
442             error = self.stream.connect()
443             if error == 0:
444                 self.reconnect.connected(ovs.timeval.msec())
445                 self.rpc = Connection(self.stream)
446                 self.stream = None
447             elif error != errno.EAGAIN:
448                 self.reconnect.connect_failed(ovs.timeval.msec(), error)
449                 self.stream.close()
450                 self.stream = None
451
452         action = self.reconnect.run(ovs.timeval.msec())
453         if action == ovs.reconnect.CONNECT:
454             self.__connect()
455         elif action == ovs.reconnect.DISCONNECT:
456             self.reconnect.disconnected(ovs.timeval.msec(), 0)
457             self.__disconnect()
458         elif action == ovs.reconnect.PROBE:
459             if self.rpc:
460                 request = Message.create_request("echo", [])
461                 request.id = "echo"
462                 self.rpc.send(request)
463         else:
464             assert action == None
465
466     def wait(self, poller):
467         if self.rpc is not None:
468             self.rpc.wait(poller)
469         elif self.stream is not None:
470             self.stream.run_wait(poller)
471             self.stream.connect_wait(poller)
472         if self.pstream is not None:
473             self.pstream.wait(poller)
474         self.reconnect.wait(poller, ovs.timeval.msec())
475
476     def get_backlog(self):
477         if self.rpc is not None:
478             return self.rpc.get_backlog()
479         else:
480             return 0
481
482     def get_name(self):
483         return self.reconnect.get_name()
484
485     def send(self, msg):
486         if self.rpc is not None:
487             return self.rpc.send(msg)
488         else:
489             return errno.ENOTCONN
490
491     def recv(self):
492         if self.rpc is not None:
493             error, msg = self.rpc.recv()
494             if not error:
495                 self.reconnect.received(ovs.timeval.msec())
496                 if msg.type == Message.T_REQUEST and msg.method == "echo":
497                     # Echo request.  Send reply.
498                     self.send(Message.create_reply(msg.params, msg.id))
499                 elif msg.type == Message.T_REPLY and msg.id == "echo":
500                     # It's a reply to our echo request.  Suppress it.
501                     pass
502                 else:
503                     return msg
504         return None
505
506     def recv_wait(self, poller):
507         if self.rpc is not None:
508             self.rpc.recv_wait(poller)
509
510     def is_alive(self):
511         if self.rpc is not None or self.stream is not None:
512             return True
513         else:
514             max_tries = self.reconnect.get_max_tries()
515             return max_tries is None or max_tries > 0
516     
517     def is_connected(self):
518         return self.rpc is not None
519
520     def get_seqno(self):
521         return self.seqno
522
523     def force_reconnect(self):
524         self.reconnect.force_reconnect(ovs.timeval.msec())