1 # Copyright (c) 2010, 2011 Nicira Networks
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at:
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
26 vlog = ovs.vlog.Vlog("jsonrpc")
29 class Message(object):
30 T_REQUEST = 0 # Request.
31 T_NOTIFY = 1 # Notification.
32 T_REPLY = 2 # Successful reply.
33 T_ERROR = 3 # Error reply.
35 __types = {T_REQUEST: "request",
36 T_NOTIFY: "notification",
40 def __init__(self, type_, method, params, result, error, id):
52 this_id = Message._next_id
57 def create_request(method, params):
58 return Message(Message.T_REQUEST, method, params, None, None,
62 def create_notify(method, params):
63 return Message(Message.T_NOTIFY, method, params, None, None,
67 def create_reply(result, id):
68 return Message(Message.T_REPLY, None, None, result, None, id)
71 def create_error(error, id):
72 return Message(Message.T_ERROR, None, None, None, error, id)
75 def type_to_string(type_):
76 return Message.__types[type_]
78 def __validate_arg(self, value, name, must_have):
79 if (value is not None) == (must_have != 0):
82 type_name = Message.type_to_string(self.type)
87 return "%s %s have \"%s\"" % (type_name, verb, name)
90 if self.params is not None and type(self.params) != list:
91 return "\"params\" must be JSON array"
93 pattern = {Message.T_REQUEST: 0x11001,
94 Message.T_NOTIFY: 0x11000,
95 Message.T_REPLY: 0x00101,
96 Message.T_ERROR: 0x00011}.get(self.type)
98 return "invalid JSON-RPC message type %s" % self.type
101 self.__validate_arg(self.method, "method", pattern & 0x10000) or
102 self.__validate_arg(self.params, "params", pattern & 0x1000) or
103 self.__validate_arg(self.result, "result", pattern & 0x100) or
104 self.__validate_arg(self.error, "error", pattern & 0x10) or
105 self.__validate_arg(self.id, "id", pattern & 0x1))
109 if type(json) != dict:
110 return "message is not a JSON object"
112 # Make a copy to avoid modifying the caller's dict.
116 method = json.pop("method")
117 if type(method) not in [str, unicode]:
118 return "method is not a JSON string"
122 params = json.pop("params", None)
123 result = json.pop("result", None)
124 error = json.pop("error", None)
125 id_ = json.pop("id", None)
127 return "message has unexpected member \"%s\"" % json.popitem()[0]
129 if result is not None:
130 msg_type = Message.T_REPLY
131 elif error is not None:
132 msg_type = Message.T_ERROR
133 elif id_ is not None:
134 msg_type = Message.T_REQUEST
136 msg_type = Message.T_NOTIFY
138 msg = Message(msg_type, method, params, result, error, id_)
139 validation_error = msg.is_valid()
140 if validation_error is not None:
141 return validation_error
148 if self.method is not None:
149 json["method"] = self.method
151 if self.params is not None:
152 json["params"] = self.params
154 if self.result is not None or self.type == Message.T_ERROR:
155 json["result"] = self.result
157 if self.error is not None or self.type == Message.T_REPLY:
158 json["error"] = self.error
160 if self.id is not None or self.type == Message.T_NOTIFY:
166 s = [Message.type_to_string(self.type)]
167 if self.method is not None:
168 s.append("method=\"%s\"" % self.method)
169 if self.params is not None:
170 s.append("params=" + ovs.json.to_string(self.params))
171 if self.result is not None:
172 s.append("result=" + ovs.json.to_string(self.result))
173 if self.error is not None:
174 s.append("error=" + ovs.json.to_string(self.error))
175 if self.id is not None:
176 s.append("id=" + ovs.json.to_string(self.id))
180 class Connection(object):
181 def __init__(self, stream):
182 self.name = stream.name
197 while len(self.output):
198 retval = self.stream.send(self.output)
200 self.output = self.output[retval:]
202 if retval != -errno.EAGAIN:
203 vlog.warn("%s: send error: %s" %
204 (self.name, os.strerror(-retval)))
208 def wait(self, poller):
210 self.stream.run_wait(poller)
212 self.stream.send_wait()
214 def get_status(self):
217 def get_backlog(self):
221 return len(self.output)
223 def __log_msg(self, title, msg):
224 vlog.dbg("%s: %s %s" % (self.name, title, msg))
230 self.__log_msg("send", msg)
232 was_empty = len(self.output) == 0
233 self.output += ovs.json.to_string(msg.to_json())
238 def send_block(self, msg):
239 error = self.send(msg)
245 if not self.get_backlog() or self.get_status():
248 poller = ovs.poller.Poller()
254 return self.status, None
258 error, data = self.stream.recv(4096)
260 if error == errno.EAGAIN:
264 vlog.warn("%s: receive error: %s"
265 % (self.name, os.strerror(error)))
267 return self.status, None
274 if self.parser is None:
275 self.parser = ovs.json.Parser()
276 self.input = self.input[self.parser.feed(self.input):]
277 if self.parser.is_done():
278 msg = self.__process_msg()
282 return self.status, None
284 def recv_block(self):
286 error, msg = self.recv()
287 if error != errno.EAGAIN:
292 poller = ovs.poller.Poller()
294 self.recv_wait(poller)
297 def transact_block(self, request):
300 error = self.send(request)
303 error, reply = self.recv_block()
304 if reply and reply.type == Message.T_REPLY and reply.id == id_:
308 def __process_msg(self):
309 json = self.parser.finish()
311 if type(json) in [str, unicode]:
313 vlog.warn("%s: error parsing stream: %s" % (self.name, json))
314 self.error(errno.EPROTO)
317 msg = Message.from_json(json)
318 if not isinstance(msg, Message):
320 vlog.warn("%s: received bad JSON-RPC message: %s"
322 self.error(errno.EPROTO)
325 self.__log_msg("received", msg)
328 def recv_wait(self, poller):
329 if self.status or self.input:
330 poller.immediate_wake()
332 self.stream.recv_wait(poller)
334 def error(self, error):
341 class Session(object):
342 """A JSON-RPC session with reconnection."""
344 def __init__(self, reconnect, rpc):
345 self.reconnect = reconnect
353 """Creates and returns a Session that maintains a JSON-RPC session to
354 'name', which should be a string acceptable to ovs.stream.Stream or
355 ovs.stream.PassiveStream's initializer.
357 If 'name' is an active connection method, e.g. "tcp:127.1.2.3", the new
358 session connects and reconnects, with back-off, to 'name'.
360 If 'name' is a passive connection method, e.g. "ptcp:", the new session
361 listens for connections to 'name'. It maintains at most one connection
362 at any given time. Any new connection causes the previous one (if any)
364 reconnect = ovs.reconnect.Reconnect(ovs.timeval.msec())
365 reconnect.set_name(name)
366 reconnect.enable(ovs.timeval.msec())
368 if ovs.stream.PassiveStream.is_valid_name(name):
369 reconnect.set_passive(True, ovs.timeval.msec())
371 return Session(reconnect, None)
374 def open_unreliably(jsonrpc):
375 reconnect = ovs.reconnect.Reconnect(ovs.timeval.msec())
376 reconnect.set_quiet(True)
377 reconnect.set_name(jsonrpc.name)
378 reconnect.set_max_tries(0)
379 reconnect.connected(ovs.timeval.msec())
380 return Session(reconnect, jsonrpc)
383 if self.rpc is not None:
386 if self.stream is not None:
389 if self.pstream is not None:
393 def __disconnect(self):
394 if self.rpc is not None:
399 elif self.stream is not None:
407 name = self.reconnect.get_name()
408 if not self.reconnect.is_passive():
409 error, self.stream = ovs.stream.Stream.open(name)
411 self.reconnect.connecting(ovs.timeval.msec())
413 self.reconnect.connect_failed(ovs.timeval.msec(), error)
414 elif self.pstream is not None:
415 error, self.pstream = ovs.stream.PassiveStream.open(name)
417 self.reconnect.listening(ovs.timeval.msec())
419 self.reconnect.connect_failed(ovs.timeval.msec(), error)
424 if self.pstream is not None:
425 error, stream = self.pstream.accept()
427 if self.rpc or self.stream:
429 vlog.info("%s: new connection replacing active "
430 "connection" % self.reconnect.get_name())
432 self.reconnect.connected(ovs.timeval.msec())
433 self.rpc = Connection(stream)
434 elif error != errno.EAGAIN:
435 self.reconnect.listen_error(ovs.timeval.msec(), error)
441 error = self.rpc.get_status()
443 self.reconnect.disconnected(ovs.timeval.msec(), error)
445 elif self.stream is not None:
447 error = self.stream.connect()
449 self.reconnect.connected(ovs.timeval.msec())
450 self.rpc = Connection(self.stream)
452 elif error != errno.EAGAIN:
453 self.reconnect.connect_failed(ovs.timeval.msec(), error)
457 action = self.reconnect.run(ovs.timeval.msec())
458 if action == ovs.reconnect.CONNECT:
460 elif action == ovs.reconnect.DISCONNECT:
461 self.reconnect.disconnected(ovs.timeval.msec(), 0)
463 elif action == ovs.reconnect.PROBE:
465 request = Message.create_request("echo", [])
467 self.rpc.send(request)
469 assert action == None
471 def wait(self, poller):
472 if self.rpc is not None:
473 self.rpc.wait(poller)
474 elif self.stream is not None:
475 self.stream.run_wait(poller)
476 self.stream.connect_wait(poller)
477 if self.pstream is not None:
478 self.pstream.wait(poller)
479 self.reconnect.wait(poller, ovs.timeval.msec())
481 def get_backlog(self):
482 if self.rpc is not None:
483 return self.rpc.get_backlog()
488 return self.reconnect.get_name()
491 if self.rpc is not None:
492 return self.rpc.send(msg)
494 return errno.ENOTCONN
497 if self.rpc is not None:
498 error, msg = self.rpc.recv()
500 self.reconnect.received(ovs.timeval.msec())
501 if msg.type == Message.T_REQUEST and msg.method == "echo":
502 # Echo request. Send reply.
503 self.send(Message.create_reply(msg.params, msg.id))
504 elif msg.type == Message.T_REPLY and msg.id == "echo":
505 # It's a reply to our echo request. Suppress it.
511 def recv_wait(self, poller):
512 if self.rpc is not None:
513 self.rpc.recv_wait(poller)
516 if self.rpc is not None or self.stream is not None:
519 max_tries = self.reconnect.get_max_tries()
520 return max_tries is None or max_tries > 0
522 def is_connected(self):
523 return self.rpc is not None
528 def force_reconnect(self):
529 self.reconnect.force_reconnect(ovs.timeval.msec())