1 # Copyright (c) 2010, 2011 Nicira Networks
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at:
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
26 vlog = ovs.vlog.Vlog("jsonrpc")
29 class Message(object):
30 T_REQUEST = 0 # Request.
31 T_NOTIFY = 1 # Notification.
32 T_REPLY = 2 # Successful reply.
33 T_ERROR = 3 # Error reply.
35 __types = {T_REQUEST: "request",
36 T_NOTIFY: "notification",
40 def __init__(self, type_, method, params, result, error, id):
52 this_id = Message._next_id
57 def create_request(method, params):
58 return Message(Message.T_REQUEST, method, params, None, None,
62 def create_notify(method, params):
63 return Message(Message.T_NOTIFY, method, params, None, None,
67 def create_reply(result, id):
68 return Message(Message.T_REPLY, None, None, result, None, id)
71 def create_error(error, id):
72 return Message(Message.T_ERROR, None, None, None, error, id)
75 def type_to_string(type_):
76 return Message.__types[type_]
78 def __validate_arg(self, value, name, must_have):
79 if (value is not None) == (must_have != 0):
82 type_name = Message.type_to_string(self.type)
87 return "%s %s have \"%s\"" % (type_name, verb, name)
90 if self.params is not None and type(self.params) != list:
91 return "\"params\" must be JSON array"
93 pattern = {Message.T_REQUEST: 0x11001,
94 Message.T_NOTIFY: 0x11000,
95 Message.T_REPLY: 0x00101,
96 Message.T_ERROR: 0x00011}.get(self.type)
98 return "invalid JSON-RPC message type %s" % self.type
101 self.__validate_arg(self.method, "method", pattern & 0x10000) or
102 self.__validate_arg(self.params, "params", pattern & 0x1000) or
103 self.__validate_arg(self.result, "result", pattern & 0x100) or
104 self.__validate_arg(self.error, "error", pattern & 0x10) or
105 self.__validate_arg(self.id, "id", pattern & 0x1))
109 if type(json) != dict:
110 return "message is not a JSON object"
112 # Make a copy to avoid modifying the caller's dict.
116 method = json.pop("method")
117 if type(method) not in [str, unicode]:
118 return "method is not a JSON string"
122 params = json.pop("params", None)
123 result = json.pop("result", None)
124 error = json.pop("error", None)
125 id_ = json.pop("id", None)
127 return "message has unexpected member \"%s\"" % json.popitem()[0]
129 if result is not None:
130 msg_type = Message.T_REPLY
131 elif error is not None:
132 msg_type = Message.T_ERROR
133 elif id_ is not None:
134 msg_type = Message.T_REQUEST
136 msg_type = Message.T_NOTIFY
138 msg = Message(msg_type, method, params, result, error, id_)
139 validation_error = msg.is_valid()
140 if validation_error is not None:
141 return validation_error
148 if self.method is not None:
149 json["method"] = self.method
151 if self.params is not None:
152 json["params"] = self.params
154 if self.result is not None or self.type == Message.T_ERROR:
155 json["result"] = self.result
157 if self.error is not None or self.type == Message.T_REPLY:
158 json["error"] = self.error
160 if self.id is not None or self.type == Message.T_NOTIFY:
166 s = [Message.type_to_string(self.type)]
167 if self.method is not None:
168 s.append("method=\"%s\"" % self.method)
169 if self.params is not None:
170 s.append("params=" + ovs.json.to_string(self.params))
171 if self.result is not None:
172 s.append("result=" + ovs.json.to_string(self.result))
173 if self.error is not None:
174 s.append("error=" + ovs.json.to_string(self.error))
175 if self.id is not None:
176 s.append("id=" + ovs.json.to_string(self.id))
180 class Connection(object):
181 def __init__(self, stream):
182 self.name = stream.name
197 while len(self.output):
198 retval = self.stream.send(self.output)
200 self.output = self.output[retval:]
202 if retval != -errno.EAGAIN:
203 vlog.warn("%s: send error: %s" %
204 (self.name, os.strerror(-retval)))
208 def wait(self, poller):
210 self.stream.run_wait(poller)
212 self.stream.send_wait()
214 def get_status(self):
217 def get_backlog(self):
221 return len(self.output)
223 def __log_msg(self, title, msg):
224 vlog.dbg("%s: %s %s" % (self.name, title, msg))
230 self.__log_msg("send", msg)
232 was_empty = len(self.output) == 0
233 self.output += ovs.json.to_string(msg.to_json())
238 def send_block(self, msg):
239 error = self.send(msg)
245 if not self.get_backlog() or self.get_status():
248 poller = ovs.poller.Poller()
254 return self.status, None
258 error, data = self.stream.recv(4096)
260 if error == errno.EAGAIN:
264 vlog.warn("%s: receive error: %s"
265 % (self.name, os.strerror(error)))
267 return self.status, None
274 if self.parser is None:
275 self.parser = ovs.json.Parser()
276 self.input = self.input[self.parser.feed(self.input):]
277 if self.parser.is_done():
278 msg = self.__process_msg()
282 return self.status, None
284 def recv_block(self):
286 error, msg = self.recv()
287 if error != errno.EAGAIN:
292 poller = ovs.poller.Poller()
294 self.recv_wait(poller)
297 def transact_block(self, request):
300 error = self.send(request)
303 error, reply = self.recv_block()
305 and (reply.type == Message.T_REPLY
306 or reply.type == Message.T_ERROR)
307 and reply.id == id_):
311 def __process_msg(self):
312 json = self.parser.finish()
314 if type(json) in [str, unicode]:
316 vlog.warn("%s: error parsing stream: %s" % (self.name, json))
317 self.error(errno.EPROTO)
320 msg = Message.from_json(json)
321 if not isinstance(msg, Message):
323 vlog.warn("%s: received bad JSON-RPC message: %s"
325 self.error(errno.EPROTO)
328 self.__log_msg("received", msg)
331 def recv_wait(self, poller):
332 if self.status or self.input:
333 poller.immediate_wake()
335 self.stream.recv_wait(poller)
337 def error(self, error):
344 class Session(object):
345 """A JSON-RPC session with reconnection."""
347 def __init__(self, reconnect, rpc):
348 self.reconnect = reconnect
356 """Creates and returns a Session that maintains a JSON-RPC session to
357 'name', which should be a string acceptable to ovs.stream.Stream or
358 ovs.stream.PassiveStream's initializer.
360 If 'name' is an active connection method, e.g. "tcp:127.1.2.3", the new
361 session connects and reconnects, with back-off, to 'name'.
363 If 'name' is a passive connection method, e.g. "ptcp:", the new session
364 listens for connections to 'name'. It maintains at most one connection
365 at any given time. Any new connection causes the previous one (if any)
367 reconnect = ovs.reconnect.Reconnect(ovs.timeval.msec())
368 reconnect.set_name(name)
369 reconnect.enable(ovs.timeval.msec())
371 if ovs.stream.PassiveStream.is_valid_name(name):
372 reconnect.set_passive(True, ovs.timeval.msec())
374 return Session(reconnect, None)
377 def open_unreliably(jsonrpc):
378 reconnect = ovs.reconnect.Reconnect(ovs.timeval.msec())
379 reconnect.set_quiet(True)
380 reconnect.set_name(jsonrpc.name)
381 reconnect.set_max_tries(0)
382 reconnect.connected(ovs.timeval.msec())
383 return Session(reconnect, jsonrpc)
386 if self.rpc is not None:
389 if self.stream is not None:
392 if self.pstream is not None:
396 def __disconnect(self):
397 if self.rpc is not None:
402 elif self.stream is not None:
410 name = self.reconnect.get_name()
411 if not self.reconnect.is_passive():
412 error, self.stream = ovs.stream.Stream.open(name)
414 self.reconnect.connecting(ovs.timeval.msec())
416 self.reconnect.connect_failed(ovs.timeval.msec(), error)
417 elif self.pstream is not None:
418 error, self.pstream = ovs.stream.PassiveStream.open(name)
420 self.reconnect.listening(ovs.timeval.msec())
422 self.reconnect.connect_failed(ovs.timeval.msec(), error)
427 if self.pstream is not None:
428 error, stream = self.pstream.accept()
430 if self.rpc or self.stream:
432 vlog.info("%s: new connection replacing active "
433 "connection" % self.reconnect.get_name())
435 self.reconnect.connected(ovs.timeval.msec())
436 self.rpc = Connection(stream)
437 elif error != errno.EAGAIN:
438 self.reconnect.listen_error(ovs.timeval.msec(), error)
444 error = self.rpc.get_status()
446 self.reconnect.disconnected(ovs.timeval.msec(), error)
448 elif self.stream is not None:
450 error = self.stream.connect()
452 self.reconnect.connected(ovs.timeval.msec())
453 self.rpc = Connection(self.stream)
455 elif error != errno.EAGAIN:
456 self.reconnect.connect_failed(ovs.timeval.msec(), error)
460 action = self.reconnect.run(ovs.timeval.msec())
461 if action == ovs.reconnect.CONNECT:
463 elif action == ovs.reconnect.DISCONNECT:
464 self.reconnect.disconnected(ovs.timeval.msec(), 0)
466 elif action == ovs.reconnect.PROBE:
468 request = Message.create_request("echo", [])
470 self.rpc.send(request)
472 assert action == None
474 def wait(self, poller):
475 if self.rpc is not None:
476 self.rpc.wait(poller)
477 elif self.stream is not None:
478 self.stream.run_wait(poller)
479 self.stream.connect_wait(poller)
480 if self.pstream is not None:
481 self.pstream.wait(poller)
482 self.reconnect.wait(poller, ovs.timeval.msec())
484 def get_backlog(self):
485 if self.rpc is not None:
486 return self.rpc.get_backlog()
491 return self.reconnect.get_name()
494 if self.rpc is not None:
495 return self.rpc.send(msg)
497 return errno.ENOTCONN
500 if self.rpc is not None:
501 error, msg = self.rpc.recv()
503 self.reconnect.received(ovs.timeval.msec())
504 if msg.type == Message.T_REQUEST and msg.method == "echo":
505 # Echo request. Send reply.
506 self.send(Message.create_reply(msg.params, msg.id))
507 elif msg.type == Message.T_REPLY and msg.id == "echo":
508 # It's a reply to our echo request. Suppress it.
514 def recv_wait(self, poller):
515 if self.rpc is not None:
516 self.rpc.recv_wait(poller)
519 if self.rpc is not None or self.stream is not None:
522 max_tries = self.reconnect.get_max_tries()
523 return max_tries is None or max_tries > 0
525 def is_connected(self):
526 return self.rpc is not None
531 def force_reconnect(self):
532 self.reconnect.force_reconnect(ovs.timeval.msec())