1 # Copyright (c) 2010, 2011, 2012 Nicira Networks
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at:
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
27 vlog = ovs.vlog.Vlog("jsonrpc")
30 class Message(object):
31 T_REQUEST = 0 # Request.
32 T_NOTIFY = 1 # Notification.
33 T_REPLY = 2 # Successful reply.
34 T_ERROR = 3 # Error reply.
36 __types = {T_REQUEST: "request",
37 T_NOTIFY: "notification",
41 def __init__(self, type_, method, params, result, error, id):
53 this_id = Message._next_id
58 def create_request(method, params):
59 return Message(Message.T_REQUEST, method, params, None, None,
63 def create_notify(method, params):
64 return Message(Message.T_NOTIFY, method, params, None, None,
68 def create_reply(result, id):
69 return Message(Message.T_REPLY, None, None, result, None, id)
72 def create_error(error, id):
73 return Message(Message.T_ERROR, None, None, None, error, id)
76 def type_to_string(type_):
77 return Message.__types[type_]
79 def __validate_arg(self, value, name, must_have):
80 if (value is not None) == (must_have != 0):
83 type_name = Message.type_to_string(self.type)
88 return "%s %s have \"%s\"" % (type_name, verb, name)
91 if self.params is not None and type(self.params) != list:
92 return "\"params\" must be JSON array"
94 pattern = {Message.T_REQUEST: 0x11001,
95 Message.T_NOTIFY: 0x11000,
96 Message.T_REPLY: 0x00101,
97 Message.T_ERROR: 0x00011}.get(self.type)
99 return "invalid JSON-RPC message type %s" % self.type
102 self.__validate_arg(self.method, "method", pattern & 0x10000) or
103 self.__validate_arg(self.params, "params", pattern & 0x1000) or
104 self.__validate_arg(self.result, "result", pattern & 0x100) or
105 self.__validate_arg(self.error, "error", pattern & 0x10) or
106 self.__validate_arg(self.id, "id", pattern & 0x1))
110 if type(json) != dict:
111 return "message is not a JSON object"
113 # Make a copy to avoid modifying the caller's dict.
117 method = json.pop("method")
118 if type(method) not in [str, unicode]:
119 return "method is not a JSON string"
123 params = json.pop("params", None)
124 result = json.pop("result", None)
125 error = json.pop("error", None)
126 id_ = json.pop("id", None)
128 return "message has unexpected member \"%s\"" % json.popitem()[0]
130 if result is not None:
131 msg_type = Message.T_REPLY
132 elif error is not None:
133 msg_type = Message.T_ERROR
134 elif id_ is not None:
135 msg_type = Message.T_REQUEST
137 msg_type = Message.T_NOTIFY
139 msg = Message(msg_type, method, params, result, error, id_)
140 validation_error = msg.is_valid()
141 if validation_error is not None:
142 return validation_error
149 if self.method is not None:
150 json["method"] = self.method
152 if self.params is not None:
153 json["params"] = self.params
155 if self.result is not None or self.type == Message.T_ERROR:
156 json["result"] = self.result
158 if self.error is not None or self.type == Message.T_REPLY:
159 json["error"] = self.error
161 if self.id is not None or self.type == Message.T_NOTIFY:
167 s = [Message.type_to_string(self.type)]
168 if self.method is not None:
169 s.append("method=\"%s\"" % self.method)
170 if self.params is not None:
171 s.append("params=" + ovs.json.to_string(self.params))
172 if self.result is not None:
173 s.append("result=" + ovs.json.to_string(self.result))
174 if self.error is not None:
175 s.append("error=" + ovs.json.to_string(self.error))
176 if self.id is not None:
177 s.append("id=" + ovs.json.to_string(self.id))
181 class Connection(object):
182 def __init__(self, stream):
183 self.name = stream.name
198 while len(self.output):
199 retval = self.stream.send(self.output)
201 self.output = self.output[retval:]
203 if retval != -errno.EAGAIN:
204 vlog.warn("%s: send error: %s" %
205 (self.name, os.strerror(-retval)))
209 def wait(self, poller):
211 self.stream.run_wait(poller)
213 self.stream.send_wait()
215 def get_status(self):
218 def get_backlog(self):
222 return len(self.output)
224 def __log_msg(self, title, msg):
225 vlog.dbg("%s: %s %s" % (self.name, title, msg))
231 self.__log_msg("send", msg)
233 was_empty = len(self.output) == 0
234 self.output += ovs.json.to_string(msg.to_json())
239 def send_block(self, msg):
240 error = self.send(msg)
246 if not self.get_backlog() or self.get_status():
249 poller = ovs.poller.Poller()
255 return self.status, None
259 error, data = self.stream.recv(4096)
261 if error == errno.EAGAIN:
265 vlog.warn("%s: receive error: %s"
266 % (self.name, os.strerror(error)))
268 return self.status, None
275 if self.parser is None:
276 self.parser = ovs.json.Parser()
277 self.input = self.input[self.parser.feed(self.input):]
278 if self.parser.is_done():
279 msg = self.__process_msg()
283 return self.status, None
285 def recv_block(self):
287 error, msg = self.recv()
288 if error != errno.EAGAIN:
293 poller = ovs.poller.Poller()
295 self.recv_wait(poller)
298 def transact_block(self, request):
301 error = self.send(request)
304 error, reply = self.recv_block()
306 and (reply.type == Message.T_REPLY
307 or reply.type == Message.T_ERROR)
308 and reply.id == id_):
312 def __process_msg(self):
313 json = self.parser.finish()
315 if type(json) in [str, unicode]:
317 vlog.warn("%s: error parsing stream: %s" % (self.name, json))
318 self.error(errno.EPROTO)
321 msg = Message.from_json(json)
322 if not isinstance(msg, Message):
324 vlog.warn("%s: received bad JSON-RPC message: %s"
326 self.error(errno.EPROTO)
329 self.__log_msg("received", msg)
332 def recv_wait(self, poller):
333 if self.status or self.input:
334 poller.immediate_wake()
336 self.stream.recv_wait(poller)
338 def error(self, error):
345 class Session(object):
346 """A JSON-RPC session with reconnection."""
348 def __init__(self, reconnect, rpc):
349 self.reconnect = reconnect
357 """Creates and returns a Session that maintains a JSON-RPC session to
358 'name', which should be a string acceptable to ovs.stream.Stream or
359 ovs.stream.PassiveStream's initializer.
361 If 'name' is an active connection method, e.g. "tcp:127.1.2.3", the new
362 session connects and reconnects, with back-off, to 'name'.
364 If 'name' is a passive connection method, e.g. "ptcp:", the new session
365 listens for connections to 'name'. It maintains at most one connection
366 at any given time. Any new connection causes the previous one (if any)
368 reconnect = ovs.reconnect.Reconnect(ovs.timeval.msec())
369 reconnect.set_name(name)
370 reconnect.enable(ovs.timeval.msec())
372 if ovs.stream.PassiveStream.is_valid_name(name):
373 reconnect.set_passive(True, ovs.timeval.msec())
375 return Session(reconnect, None)
378 def open_unreliably(jsonrpc):
379 reconnect = ovs.reconnect.Reconnect(ovs.timeval.msec())
380 reconnect.set_quiet(True)
381 reconnect.set_name(jsonrpc.name)
382 reconnect.set_max_tries(0)
383 reconnect.connected(ovs.timeval.msec())
384 return Session(reconnect, jsonrpc)
387 if self.rpc is not None:
390 if self.stream is not None:
393 if self.pstream is not None:
397 def __disconnect(self):
398 if self.rpc is not None:
403 elif self.stream is not None:
411 name = self.reconnect.get_name()
412 if not self.reconnect.is_passive():
413 error, self.stream = ovs.stream.Stream.open(name)
415 self.reconnect.connecting(ovs.timeval.msec())
417 self.reconnect.connect_failed(ovs.timeval.msec(), error)
418 elif self.pstream is not None:
419 error, self.pstream = ovs.stream.PassiveStream.open(name)
421 self.reconnect.listening(ovs.timeval.msec())
423 self.reconnect.connect_failed(ovs.timeval.msec(), error)
428 if self.pstream is not None:
429 error, stream = self.pstream.accept()
431 if self.rpc or self.stream:
433 vlog.info("%s: new connection replacing active "
434 "connection" % self.reconnect.get_name())
436 self.reconnect.connected(ovs.timeval.msec())
437 self.rpc = Connection(stream)
438 elif error != errno.EAGAIN:
439 self.reconnect.listen_error(ovs.timeval.msec(), error)
445 error = self.rpc.get_status()
447 self.reconnect.disconnected(ovs.timeval.msec(), error)
449 elif self.stream is not None:
451 error = self.stream.connect()
453 self.reconnect.connected(ovs.timeval.msec())
454 self.rpc = Connection(self.stream)
456 elif error != errno.EAGAIN:
457 self.reconnect.connect_failed(ovs.timeval.msec(), error)
461 action = self.reconnect.run(ovs.timeval.msec())
462 if action == ovs.reconnect.CONNECT:
464 elif action == ovs.reconnect.DISCONNECT:
465 self.reconnect.disconnected(ovs.timeval.msec(), 0)
467 elif action == ovs.reconnect.PROBE:
469 request = Message.create_request("echo", [])
471 self.rpc.send(request)
473 assert action == None
475 def wait(self, poller):
476 if self.rpc is not None:
477 self.rpc.wait(poller)
478 elif self.stream is not None:
479 self.stream.run_wait(poller)
480 self.stream.connect_wait(poller)
481 if self.pstream is not None:
482 self.pstream.wait(poller)
483 self.reconnect.wait(poller, ovs.timeval.msec())
485 def get_backlog(self):
486 if self.rpc is not None:
487 return self.rpc.get_backlog()
492 return self.reconnect.get_name()
495 if self.rpc is not None:
496 return self.rpc.send(msg)
498 return errno.ENOTCONN
501 if self.rpc is not None:
502 error, msg = self.rpc.recv()
504 self.reconnect.received(ovs.timeval.msec())
505 if msg.type == Message.T_REQUEST and msg.method == "echo":
506 # Echo request. Send reply.
507 self.send(Message.create_reply(msg.params, msg.id))
508 elif msg.type == Message.T_REPLY and msg.id == "echo":
509 # It's a reply to our echo request. Suppress it.
515 def recv_wait(self, poller):
516 if self.rpc is not None:
517 self.rpc.recv_wait(poller)
520 if self.rpc is not None or self.stream is not None:
523 max_tries = self.reconnect.get_max_tries()
524 return max_tries is None or max_tries > 0
526 def is_connected(self):
527 return self.rpc is not None
532 def force_reconnect(self):
533 self.reconnect.force_reconnect(ovs.timeval.msec())