1 # Copyright (c) 2010, 2011, 2012 Nicira Networks
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at:
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
27 vlog = ovs.vlog.Vlog("jsonrpc")
30 class Message(object):
31 T_REQUEST = 0 # Request.
32 T_NOTIFY = 1 # Notification.
33 T_REPLY = 2 # Successful reply.
34 T_ERROR = 3 # Error reply.
36 __types = {T_REQUEST: "request",
37 T_NOTIFY: "notification",
41 def __init__(self, type_, method, params, result, error, id):
53 this_id = Message._next_id
58 def create_request(method, params):
59 return Message(Message.T_REQUEST, method, params, None, None,
63 def create_notify(method, params):
64 return Message(Message.T_NOTIFY, method, params, None, None,
68 def create_reply(result, id):
69 return Message(Message.T_REPLY, None, None, result, None, id)
72 def create_error(error, id):
73 return Message(Message.T_ERROR, None, None, None, error, id)
76 def type_to_string(type_):
77 return Message.__types[type_]
79 def __validate_arg(self, value, name, must_have):
80 if (value is not None) == (must_have != 0):
83 type_name = Message.type_to_string(self.type)
88 return "%s %s have \"%s\"" % (type_name, verb, name)
91 if self.params is not None and type(self.params) != list:
92 return "\"params\" must be JSON array"
94 pattern = {Message.T_REQUEST: 0x11001,
95 Message.T_NOTIFY: 0x11000,
96 Message.T_REPLY: 0x00101,
97 Message.T_ERROR: 0x00011}.get(self.type)
99 return "invalid JSON-RPC message type %s" % self.type
102 self.__validate_arg(self.method, "method", pattern & 0x10000) or
103 self.__validate_arg(self.params, "params", pattern & 0x1000) or
104 self.__validate_arg(self.result, "result", pattern & 0x100) or
105 self.__validate_arg(self.error, "error", pattern & 0x10) or
106 self.__validate_arg(self.id, "id", pattern & 0x1))
110 if type(json) != dict:
111 return "message is not a JSON object"
113 # Make a copy to avoid modifying the caller's dict.
117 method = json.pop("method")
118 if type(method) not in [str, unicode]:
119 return "method is not a JSON string"
123 params = json.pop("params", None)
124 result = json.pop("result", None)
125 error = json.pop("error", None)
126 id_ = json.pop("id", None)
128 return "message has unexpected member \"%s\"" % json.popitem()[0]
130 if result is not None:
131 msg_type = Message.T_REPLY
132 elif error is not None:
133 msg_type = Message.T_ERROR
134 elif id_ is not None:
135 msg_type = Message.T_REQUEST
137 msg_type = Message.T_NOTIFY
139 msg = Message(msg_type, method, params, result, error, id_)
140 validation_error = msg.is_valid()
141 if validation_error is not None:
142 return validation_error
149 if self.method is not None:
150 json["method"] = self.method
152 if self.params is not None:
153 json["params"] = self.params
155 if self.result is not None or self.type == Message.T_ERROR:
156 json["result"] = self.result
158 if self.error is not None or self.type == Message.T_REPLY:
159 json["error"] = self.error
161 if self.id is not None or self.type == Message.T_NOTIFY:
167 s = [Message.type_to_string(self.type)]
168 if self.method is not None:
169 s.append("method=\"%s\"" % self.method)
170 if self.params is not None:
171 s.append("params=" + ovs.json.to_string(self.params))
172 if self.result is not None:
173 s.append("result=" + ovs.json.to_string(self.result))
174 if self.error is not None:
175 s.append("error=" + ovs.json.to_string(self.error))
176 if self.id is not None:
177 s.append("id=" + ovs.json.to_string(self.id))
181 class Connection(object):
182 def __init__(self, stream):
183 self.name = stream.name
198 while len(self.output):
199 retval = self.stream.send(self.output)
201 self.output = self.output[retval:]
203 if retval != -errno.EAGAIN:
204 vlog.warn("%s: send error: %s" %
205 (self.name, os.strerror(-retval)))
209 def wait(self, poller):
211 self.stream.run_wait(poller)
213 self.stream.send_wait()
215 def get_status(self):
218 def get_backlog(self):
222 return len(self.output)
224 def __log_msg(self, title, msg):
225 vlog.dbg("%s: %s %s" % (self.name, title, msg))
231 self.__log_msg("send", msg)
233 was_empty = len(self.output) == 0
234 self.output += ovs.json.to_string(msg.to_json())
239 def send_block(self, msg):
240 error = self.send(msg)
246 if not self.get_backlog() or self.get_status():
249 poller = ovs.poller.Poller()
255 return self.status, None
259 error, data = self.stream.recv(4096)
261 if error == errno.EAGAIN:
265 vlog.warn("%s: receive error: %s"
266 % (self.name, os.strerror(error)))
268 return self.status, None
275 if self.parser is None:
276 self.parser = ovs.json.Parser()
277 self.input = self.input[self.parser.feed(self.input):]
278 if self.parser.is_done():
279 msg = self.__process_msg()
283 return self.status, None
285 def recv_block(self):
287 error, msg = self.recv()
288 if error != errno.EAGAIN:
293 poller = ovs.poller.Poller()
295 self.recv_wait(poller)
298 def transact_block(self, request):
301 error = self.send(request)
304 error, reply = self.recv_block()
306 and (reply.type == Message.T_REPLY
307 or reply.type == Message.T_ERROR)
308 and reply.id == id_):
312 def __process_msg(self):
313 json = self.parser.finish()
315 if type(json) in [str, unicode]:
317 vlog.warn("%s: error parsing stream: %s" % (self.name, json))
318 self.error(errno.EPROTO)
321 msg = Message.from_json(json)
322 if not isinstance(msg, Message):
324 vlog.warn("%s: received bad JSON-RPC message: %s"
326 self.error(errno.EPROTO)
329 self.__log_msg("received", msg)
332 def recv_wait(self, poller):
333 if self.status or self.input:
334 poller.immediate_wake()
336 self.stream.recv_wait(poller)
338 def error(self, error):
345 class Session(object):
346 """A JSON-RPC session with reconnection."""
348 def __init__(self, reconnect, rpc):
349 self.reconnect = reconnect
357 """Creates and returns a Session that maintains a JSON-RPC session to
358 'name', which should be a string acceptable to ovs.stream.Stream or
359 ovs.stream.PassiveStream's initializer.
361 If 'name' is an active connection method, e.g. "tcp:127.1.2.3", the new
362 session connects and reconnects, with back-off, to 'name'.
364 If 'name' is a passive connection method, e.g. "ptcp:", the new session
365 listens for connections to 'name'. It maintains at most one connection
366 at any given time. Any new connection causes the previous one (if any)
368 reconnect = ovs.reconnect.Reconnect(ovs.timeval.msec())
369 reconnect.set_name(name)
370 reconnect.enable(ovs.timeval.msec())
372 if ovs.stream.PassiveStream.is_valid_name(name):
373 reconnect.set_passive(True, ovs.timeval.msec())
375 if ovs.stream.stream_or_pstream_needs_probes(name):
376 reconnect.set_probe_interval(0)
378 return Session(reconnect, None)
381 def open_unreliably(jsonrpc):
382 reconnect = ovs.reconnect.Reconnect(ovs.timeval.msec())
383 reconnect.set_quiet(True)
384 reconnect.set_name(jsonrpc.name)
385 reconnect.set_max_tries(0)
386 reconnect.connected(ovs.timeval.msec())
387 return Session(reconnect, jsonrpc)
390 if self.rpc is not None:
393 if self.stream is not None:
396 if self.pstream is not None:
400 def __disconnect(self):
401 if self.rpc is not None:
406 elif self.stream is not None:
414 name = self.reconnect.get_name()
415 if not self.reconnect.is_passive():
416 error, self.stream = ovs.stream.Stream.open(name)
418 self.reconnect.connecting(ovs.timeval.msec())
420 self.reconnect.connect_failed(ovs.timeval.msec(), error)
421 elif self.pstream is not None:
422 error, self.pstream = ovs.stream.PassiveStream.open(name)
424 self.reconnect.listening(ovs.timeval.msec())
426 self.reconnect.connect_failed(ovs.timeval.msec(), error)
431 if self.pstream is not None:
432 error, stream = self.pstream.accept()
434 if self.rpc or self.stream:
436 vlog.info("%s: new connection replacing active "
437 "connection" % self.reconnect.get_name())
439 self.reconnect.connected(ovs.timeval.msec())
440 self.rpc = Connection(stream)
441 elif error != errno.EAGAIN:
442 self.reconnect.listen_error(ovs.timeval.msec(), error)
448 error = self.rpc.get_status()
450 self.reconnect.disconnected(ovs.timeval.msec(), error)
452 elif self.stream is not None:
454 error = self.stream.connect()
456 self.reconnect.connected(ovs.timeval.msec())
457 self.rpc = Connection(self.stream)
459 elif error != errno.EAGAIN:
460 self.reconnect.connect_failed(ovs.timeval.msec(), error)
464 action = self.reconnect.run(ovs.timeval.msec())
465 if action == ovs.reconnect.CONNECT:
467 elif action == ovs.reconnect.DISCONNECT:
468 self.reconnect.disconnected(ovs.timeval.msec(), 0)
470 elif action == ovs.reconnect.PROBE:
472 request = Message.create_request("echo", [])
474 self.rpc.send(request)
476 assert action == None
478 def wait(self, poller):
479 if self.rpc is not None:
480 self.rpc.wait(poller)
481 elif self.stream is not None:
482 self.stream.run_wait(poller)
483 self.stream.connect_wait(poller)
484 if self.pstream is not None:
485 self.pstream.wait(poller)
486 self.reconnect.wait(poller, ovs.timeval.msec())
488 def get_backlog(self):
489 if self.rpc is not None:
490 return self.rpc.get_backlog()
495 return self.reconnect.get_name()
498 if self.rpc is not None:
499 return self.rpc.send(msg)
501 return errno.ENOTCONN
504 if self.rpc is not None:
505 error, msg = self.rpc.recv()
507 self.reconnect.received(ovs.timeval.msec())
508 if msg.type == Message.T_REQUEST and msg.method == "echo":
509 # Echo request. Send reply.
510 self.send(Message.create_reply(msg.params, msg.id))
511 elif msg.type == Message.T_REPLY and msg.id == "echo":
512 # It's a reply to our echo request. Suppress it.
518 def recv_wait(self, poller):
519 if self.rpc is not None:
520 self.rpc.recv_wait(poller)
523 if self.rpc is not None or self.stream is not None:
526 max_tries = self.reconnect.get_max_tries()
527 return max_tries is None or max_tries > 0
529 def is_connected(self):
530 return self.rpc is not None
535 def force_reconnect(self):
536 self.reconnect.force_reconnect(ovs.timeval.msec())