1 # Copyright (c) 2010, 2011, 2012 Nicira, Inc.
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at:
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
27 vlog = ovs.vlog.Vlog("jsonrpc")
30 class Message(object):
31 T_REQUEST = 0 # Request.
32 T_NOTIFY = 1 # Notification.
33 T_REPLY = 2 # Successful reply.
34 T_ERROR = 3 # Error reply.
36 __types = {T_REQUEST: "request",
37 T_NOTIFY: "notification",
41 def __init__(self, type_, method, params, result, error, id):
53 this_id = Message._next_id
58 def create_request(method, params):
59 return Message(Message.T_REQUEST, method, params, None, None,
63 def create_notify(method, params):
64 return Message(Message.T_NOTIFY, method, params, None, None,
68 def create_reply(result, id):
69 return Message(Message.T_REPLY, None, None, result, None, id)
72 def create_error(error, id):
73 return Message(Message.T_ERROR, None, None, None, error, id)
76 def type_to_string(type_):
77 return Message.__types[type_]
79 def __validate_arg(self, value, name, must_have):
80 if (value is not None) == (must_have != 0):
83 type_name = Message.type_to_string(self.type)
88 return "%s %s have \"%s\"" % (type_name, verb, name)
91 if self.params is not None and type(self.params) != list:
92 return "\"params\" must be JSON array"
94 pattern = {Message.T_REQUEST: 0x11001,
95 Message.T_NOTIFY: 0x11000,
96 Message.T_REPLY: 0x00101,
97 Message.T_ERROR: 0x00011}.get(self.type)
99 return "invalid JSON-RPC message type %s" % self.type
102 self.__validate_arg(self.method, "method", pattern & 0x10000) or
103 self.__validate_arg(self.params, "params", pattern & 0x1000) or
104 self.__validate_arg(self.result, "result", pattern & 0x100) or
105 self.__validate_arg(self.error, "error", pattern & 0x10) or
106 self.__validate_arg(self.id, "id", pattern & 0x1))
110 if type(json) != dict:
111 return "message is not a JSON object"
113 # Make a copy to avoid modifying the caller's dict.
117 method = json.pop("method")
118 if type(method) not in [str, unicode]:
119 return "method is not a JSON string"
123 params = json.pop("params", None)
124 result = json.pop("result", None)
125 error = json.pop("error", None)
126 id_ = json.pop("id", None)
128 return "message has unexpected member \"%s\"" % json.popitem()[0]
130 if result is not None:
131 msg_type = Message.T_REPLY
132 elif error is not None:
133 msg_type = Message.T_ERROR
134 elif id_ is not None:
135 msg_type = Message.T_REQUEST
137 msg_type = Message.T_NOTIFY
139 msg = Message(msg_type, method, params, result, error, id_)
140 validation_error = msg.is_valid()
141 if validation_error is not None:
142 return validation_error
149 if self.method is not None:
150 json["method"] = self.method
152 if self.params is not None:
153 json["params"] = self.params
155 if self.result is not None or self.type == Message.T_ERROR:
156 json["result"] = self.result
158 if self.error is not None or self.type == Message.T_REPLY:
159 json["error"] = self.error
161 if self.id is not None or self.type == Message.T_NOTIFY:
167 s = [Message.type_to_string(self.type)]
168 if self.method is not None:
169 s.append("method=\"%s\"" % self.method)
170 if self.params is not None:
171 s.append("params=" + ovs.json.to_string(self.params))
172 if self.result is not None:
173 s.append("result=" + ovs.json.to_string(self.result))
174 if self.error is not None:
175 s.append("error=" + ovs.json.to_string(self.error))
176 if self.id is not None:
177 s.append("id=" + ovs.json.to_string(self.id))
181 class Connection(object):
182 def __init__(self, stream):
183 self.name = stream.name
189 self.received_bytes = 0
199 while len(self.output):
200 retval = self.stream.send(self.output)
202 self.output = self.output[retval:]
204 if retval != -errno.EAGAIN:
205 vlog.warn("%s: send error: %s" %
206 (self.name, os.strerror(-retval)))
210 def wait(self, poller):
212 self.stream.run_wait(poller)
214 self.stream.send_wait(poller)
216 def get_status(self):
219 def get_backlog(self):
223 return len(self.output)
225 def get_received_bytes(self):
226 return self.received_bytes
228 def __log_msg(self, title, msg):
229 vlog.dbg("%s: %s %s" % (self.name, title, msg))
235 self.__log_msg("send", msg)
237 was_empty = len(self.output) == 0
238 self.output += ovs.json.to_string(msg.to_json())
243 def send_block(self, msg):
244 error = self.send(msg)
250 if not self.get_backlog() or self.get_status():
253 poller = ovs.poller.Poller()
259 return self.status, None
263 error, data = self.stream.recv(4096)
265 if error == errno.EAGAIN:
269 vlog.warn("%s: receive error: %s"
270 % (self.name, os.strerror(error)))
272 return self.status, None
278 self.received_bytes += len(data)
280 if self.parser is None:
281 self.parser = ovs.json.Parser()
282 self.input = self.input[self.parser.feed(self.input):]
283 if self.parser.is_done():
284 msg = self.__process_msg()
288 return self.status, None
290 def recv_block(self):
292 error, msg = self.recv()
293 if error != errno.EAGAIN:
298 poller = ovs.poller.Poller()
300 self.recv_wait(poller)
303 def transact_block(self, request):
306 error = self.send(request)
309 error, reply = self.recv_block()
311 and (reply.type == Message.T_REPLY
312 or reply.type == Message.T_ERROR)
313 and reply.id == id_):
317 def __process_msg(self):
318 json = self.parser.finish()
320 if type(json) in [str, unicode]:
322 vlog.warn("%s: error parsing stream: %s" % (self.name, json))
323 self.error(errno.EPROTO)
326 msg = Message.from_json(json)
327 if not isinstance(msg, Message):
329 vlog.warn("%s: received bad JSON-RPC message: %s"
331 self.error(errno.EPROTO)
334 self.__log_msg("received", msg)
337 def recv_wait(self, poller):
338 if self.status or self.input:
339 poller.immediate_wake()
341 self.stream.recv_wait(poller)
343 def error(self, error):
350 class Session(object):
351 """A JSON-RPC session with reconnection."""
353 def __init__(self, reconnect, rpc):
354 self.reconnect = reconnect
362 """Creates and returns a Session that maintains a JSON-RPC session to
363 'name', which should be a string acceptable to ovs.stream.Stream or
364 ovs.stream.PassiveStream's initializer.
366 If 'name' is an active connection method, e.g. "tcp:127.1.2.3", the new
367 session connects and reconnects, with back-off, to 'name'.
369 If 'name' is a passive connection method, e.g. "ptcp:", the new session
370 listens for connections to 'name'. It maintains at most one connection
371 at any given time. Any new connection causes the previous one (if any)
373 reconnect = ovs.reconnect.Reconnect(ovs.timeval.msec())
374 reconnect.set_name(name)
375 reconnect.enable(ovs.timeval.msec())
377 if ovs.stream.PassiveStream.is_valid_name(name):
378 reconnect.set_passive(True, ovs.timeval.msec())
380 if ovs.stream.stream_or_pstream_needs_probes(name):
381 reconnect.set_probe_interval(0)
383 return Session(reconnect, None)
386 def open_unreliably(jsonrpc):
387 reconnect = ovs.reconnect.Reconnect(ovs.timeval.msec())
388 reconnect.set_quiet(True)
389 reconnect.set_name(jsonrpc.name)
390 reconnect.set_max_tries(0)
391 reconnect.connected(ovs.timeval.msec())
392 return Session(reconnect, jsonrpc)
395 if self.rpc is not None:
398 if self.stream is not None:
401 if self.pstream is not None:
405 def __disconnect(self):
406 if self.rpc is not None:
411 elif self.stream is not None:
419 name = self.reconnect.get_name()
420 if not self.reconnect.is_passive():
421 error, self.stream = ovs.stream.Stream.open(name)
423 self.reconnect.connecting(ovs.timeval.msec())
425 self.reconnect.connect_failed(ovs.timeval.msec(), error)
426 elif self.pstream is not None:
427 error, self.pstream = ovs.stream.PassiveStream.open(name)
429 self.reconnect.listening(ovs.timeval.msec())
431 self.reconnect.connect_failed(ovs.timeval.msec(), error)
436 if self.pstream is not None:
437 error, stream = self.pstream.accept()
439 if self.rpc or self.stream:
441 vlog.info("%s: new connection replacing active "
442 "connection" % self.reconnect.get_name())
444 self.reconnect.connected(ovs.timeval.msec())
445 self.rpc = Connection(stream)
446 elif error != errno.EAGAIN:
447 self.reconnect.listen_error(ovs.timeval.msec(), error)
452 backlog = self.rpc.get_backlog()
454 if self.rpc.get_backlog() < backlog:
455 # Data previously caught in a queue was successfully sent (or
456 # there's an error, which we'll catch below).
458 # We don't count data that is successfully sent immediately as
459 # activity, because there's a lot of queuing downstream from
460 # us, which means that we can push a lot of data into a
461 # connection that has stalled and won't ever recover.
462 self.reconnect.activity(ovs.timeval.msec())
464 error = self.rpc.get_status()
466 self.reconnect.disconnected(ovs.timeval.msec(), error)
468 elif self.stream is not None:
470 error = self.stream.connect()
472 self.reconnect.connected(ovs.timeval.msec())
473 self.rpc = Connection(self.stream)
475 elif error != errno.EAGAIN:
476 self.reconnect.connect_failed(ovs.timeval.msec(), error)
480 action = self.reconnect.run(ovs.timeval.msec())
481 if action == ovs.reconnect.CONNECT:
483 elif action == ovs.reconnect.DISCONNECT:
484 self.reconnect.disconnected(ovs.timeval.msec(), 0)
486 elif action == ovs.reconnect.PROBE:
488 request = Message.create_request("echo", [])
490 self.rpc.send(request)
492 assert action == None
494 def wait(self, poller):
495 if self.rpc is not None:
496 self.rpc.wait(poller)
497 elif self.stream is not None:
498 self.stream.run_wait(poller)
499 self.stream.connect_wait(poller)
500 if self.pstream is not None:
501 self.pstream.wait(poller)
502 self.reconnect.wait(poller, ovs.timeval.msec())
504 def get_backlog(self):
505 if self.rpc is not None:
506 return self.rpc.get_backlog()
511 return self.reconnect.get_name()
514 if self.rpc is not None:
515 return self.rpc.send(msg)
517 return errno.ENOTCONN
520 if self.rpc is not None:
521 received_bytes = self.rpc.get_received_bytes()
522 error, msg = self.rpc.recv()
523 if received_bytes != self.rpc.get_received_bytes():
524 # Data was successfully received.
526 # Previously we only counted receiving a full message as
527 # activity, but with large messages or a slow connection that
528 # policy could time out the session mid-message.
529 self.reconnect.activity(ovs.timeval.msec())
532 if msg.type == Message.T_REQUEST and msg.method == "echo":
533 # Echo request. Send reply.
534 self.send(Message.create_reply(msg.params, msg.id))
535 elif msg.type == Message.T_REPLY and msg.id == "echo":
536 # It's a reply to our echo request. Suppress it.
542 def recv_wait(self, poller):
543 if self.rpc is not None:
544 self.rpc.recv_wait(poller)
547 if self.rpc is not None or self.stream is not None:
550 max_tries = self.reconnect.get_max_tries()
551 return max_tries is None or max_tries > 0
553 def is_connected(self):
554 return self.rpc is not None
559 def force_reconnect(self):
560 self.reconnect.force_reconnect(ovs.timeval.msec())