1 # Copyright (c) 2010, 2011, 2012 Nicira, Inc.
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at:
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
27 vlog = ovs.vlog.Vlog("jsonrpc")
30 class Message(object):
31 T_REQUEST = 0 # Request.
32 T_NOTIFY = 1 # Notification.
33 T_REPLY = 2 # Successful reply.
34 T_ERROR = 3 # Error reply.
36 __types = {T_REQUEST: "request",
37 T_NOTIFY: "notification",
41 def __init__(self, type_, method, params, result, error, id):
53 this_id = Message._next_id
58 def create_request(method, params):
59 return Message(Message.T_REQUEST, method, params, None, None,
63 def create_notify(method, params):
64 return Message(Message.T_NOTIFY, method, params, None, None,
68 def create_reply(result, id):
69 return Message(Message.T_REPLY, None, None, result, None, id)
72 def create_error(error, id):
73 return Message(Message.T_ERROR, None, None, None, error, id)
76 def type_to_string(type_):
77 return Message.__types[type_]
79 def __validate_arg(self, value, name, must_have):
80 if (value is not None) == (must_have != 0):
83 type_name = Message.type_to_string(self.type)
88 return "%s %s have \"%s\"" % (type_name, verb, name)
91 if self.params is not None and type(self.params) != list:
92 return "\"params\" must be JSON array"
94 pattern = {Message.T_REQUEST: 0x11001,
95 Message.T_NOTIFY: 0x11000,
96 Message.T_REPLY: 0x00101,
97 Message.T_ERROR: 0x00011}.get(self.type)
99 return "invalid JSON-RPC message type %s" % self.type
102 self.__validate_arg(self.method, "method", pattern & 0x10000) or
103 self.__validate_arg(self.params, "params", pattern & 0x1000) or
104 self.__validate_arg(self.result, "result", pattern & 0x100) or
105 self.__validate_arg(self.error, "error", pattern & 0x10) or
106 self.__validate_arg(self.id, "id", pattern & 0x1))
110 if type(json) != dict:
111 return "message is not a JSON object"
113 # Make a copy to avoid modifying the caller's dict.
117 method = json.pop("method")
118 if type(method) not in [str, unicode]:
119 return "method is not a JSON string"
123 params = json.pop("params", None)
124 result = json.pop("result", None)
125 error = json.pop("error", None)
126 id_ = json.pop("id", None)
128 return "message has unexpected member \"%s\"" % json.popitem()[0]
130 if result is not None:
131 msg_type = Message.T_REPLY
132 elif error is not None:
133 msg_type = Message.T_ERROR
134 elif id_ is not None:
135 msg_type = Message.T_REQUEST
137 msg_type = Message.T_NOTIFY
139 msg = Message(msg_type, method, params, result, error, id_)
140 validation_error = msg.is_valid()
141 if validation_error is not None:
142 return validation_error
149 if self.method is not None:
150 json["method"] = self.method
152 if self.params is not None:
153 json["params"] = self.params
155 if self.result is not None or self.type == Message.T_ERROR:
156 json["result"] = self.result
158 if self.error is not None or self.type == Message.T_REPLY:
159 json["error"] = self.error
161 if self.id is not None or self.type == Message.T_NOTIFY:
167 s = [Message.type_to_string(self.type)]
168 if self.method is not None:
169 s.append("method=\"%s\"" % self.method)
170 if self.params is not None:
171 s.append("params=" + ovs.json.to_string(self.params))
172 if self.result is not None:
173 s.append("result=" + ovs.json.to_string(self.result))
174 if self.error is not None:
175 s.append("error=" + ovs.json.to_string(self.error))
176 if self.id is not None:
177 s.append("id=" + ovs.json.to_string(self.id))
181 class Connection(object):
182 def __init__(self, stream):
183 self.name = stream.name
189 self.received_bytes = 0
199 while len(self.output):
200 retval = self.stream.send(self.output)
202 self.output = self.output[retval:]
204 if retval != -errno.EAGAIN:
205 vlog.warn("%s: send error: %s" %
206 (self.name, os.strerror(-retval)))
210 def wait(self, poller):
212 self.stream.run_wait(poller)
214 self.stream.send_wait(poller)
216 def get_status(self):
219 def get_backlog(self):
223 return len(self.output)
225 def get_received_bytes(self):
226 return self.received_bytes
228 def __log_msg(self, title, msg):
229 vlog.dbg("%s: %s %s" % (self.name, title, msg))
235 self.__log_msg("send", msg)
237 was_empty = len(self.output) == 0
238 self.output += ovs.json.to_string(msg.to_json())
243 def send_block(self, msg):
244 error = self.send(msg)
250 if not self.get_backlog() or self.get_status():
253 poller = ovs.poller.Poller()
259 return self.status, None
263 error, data = self.stream.recv(4096)
265 if error == errno.EAGAIN:
269 vlog.warn("%s: receive error: %s"
270 % (self.name, os.strerror(error)))
272 return self.status, None
278 self.received_bytes += len(data)
280 if self.parser is None:
281 self.parser = ovs.json.Parser()
282 self.input = self.input[self.parser.feed(self.input):]
283 if self.parser.is_done():
284 msg = self.__process_msg()
288 return self.status, None
290 def recv_block(self):
292 error, msg = self.recv()
293 if error != errno.EAGAIN:
298 poller = ovs.poller.Poller()
300 self.recv_wait(poller)
303 def transact_block(self, request):
306 error = self.send(request)
309 error, reply = self.recv_block()
311 and (reply.type == Message.T_REPLY
312 or reply.type == Message.T_ERROR)
313 and reply.id == id_):
317 def __process_msg(self):
318 json = self.parser.finish()
320 if type(json) in [str, unicode]:
322 vlog.warn("%s: error parsing stream: %s" % (self.name, json))
323 self.error(errno.EPROTO)
326 msg = Message.from_json(json)
327 if not isinstance(msg, Message):
329 vlog.warn("%s: received bad JSON-RPC message: %s"
331 self.error(errno.EPROTO)
334 self.__log_msg("received", msg)
337 def recv_wait(self, poller):
338 if self.status or self.input:
339 poller.immediate_wake()
341 self.stream.recv_wait(poller)
343 def error(self, error):
350 class Session(object):
351 """A JSON-RPC session with reconnection."""
353 def __init__(self, reconnect, rpc):
354 self.reconnect = reconnect
362 """Creates and returns a Session that maintains a JSON-RPC session to
363 'name', which should be a string acceptable to ovs.stream.Stream or
364 ovs.stream.PassiveStream's initializer.
366 If 'name' is an active connection method, e.g. "tcp:127.1.2.3", the new
367 session connects and reconnects, with back-off, to 'name'.
369 If 'name' is a passive connection method, e.g. "ptcp:", the new session
370 listens for connections to 'name'. It maintains at most one connection
371 at any given time. Any new connection causes the previous one (if any)
373 reconnect = ovs.reconnect.Reconnect(ovs.timeval.msec())
374 reconnect.set_name(name)
375 reconnect.enable(ovs.timeval.msec())
377 if ovs.stream.PassiveStream.is_valid_name(name):
378 reconnect.set_passive(True, ovs.timeval.msec())
380 if ovs.stream.stream_or_pstream_needs_probes(name):
381 reconnect.set_probe_interval(0)
383 return Session(reconnect, None)
386 def open_unreliably(jsonrpc):
387 reconnect = ovs.reconnect.Reconnect(ovs.timeval.msec())
388 reconnect.set_quiet(True)
389 reconnect.set_name(jsonrpc.name)
390 reconnect.set_max_tries(0)
391 reconnect.connected(ovs.timeval.msec())
392 return Session(reconnect, jsonrpc)
395 if self.rpc is not None:
398 if self.stream is not None:
401 if self.pstream is not None:
405 def __disconnect(self):
406 if self.rpc is not None:
411 elif self.stream is not None:
419 name = self.reconnect.get_name()
420 if not self.reconnect.is_passive():
421 error, self.stream = ovs.stream.Stream.open(name)
423 self.reconnect.connecting(ovs.timeval.msec())
425 self.reconnect.connect_failed(ovs.timeval.msec(), error)
426 elif self.pstream is not None:
427 error, self.pstream = ovs.stream.PassiveStream.open(name)
429 self.reconnect.listening(ovs.timeval.msec())
431 self.reconnect.connect_failed(ovs.timeval.msec(), error)
436 if self.pstream is not None:
437 error, stream = self.pstream.accept()
439 if self.rpc or self.stream:
441 vlog.info("%s: new connection replacing active "
442 "connection" % self.reconnect.get_name())
444 self.reconnect.connected(ovs.timeval.msec())
445 self.rpc = Connection(stream)
446 elif error != errno.EAGAIN:
447 self.reconnect.listen_error(ovs.timeval.msec(), error)
452 received_bytes = self.rpc.get_received_bytes()
454 if received_bytes != self.rpc.get_received_bytes():
455 # Data was successfully received.
457 # Previously we only counted receiving a full message as
458 # activity, but with large messages or a slow connection that
459 # policy could time out the session mid-message.
460 self.reconnect.activity(ovs.timeval.msec())
462 error = self.rpc.get_status()
464 self.reconnect.disconnected(ovs.timeval.msec(), error)
466 elif self.stream is not None:
468 error = self.stream.connect()
470 self.reconnect.connected(ovs.timeval.msec())
471 self.rpc = Connection(self.stream)
473 elif error != errno.EAGAIN:
474 self.reconnect.connect_failed(ovs.timeval.msec(), error)
478 action = self.reconnect.run(ovs.timeval.msec())
479 if action == ovs.reconnect.CONNECT:
481 elif action == ovs.reconnect.DISCONNECT:
482 self.reconnect.disconnected(ovs.timeval.msec(), 0)
484 elif action == ovs.reconnect.PROBE:
486 request = Message.create_request("echo", [])
488 self.rpc.send(request)
490 assert action == None
492 def wait(self, poller):
493 if self.rpc is not None:
494 self.rpc.wait(poller)
495 elif self.stream is not None:
496 self.stream.run_wait(poller)
497 self.stream.connect_wait(poller)
498 if self.pstream is not None:
499 self.pstream.wait(poller)
500 self.reconnect.wait(poller, ovs.timeval.msec())
502 def get_backlog(self):
503 if self.rpc is not None:
504 return self.rpc.get_backlog()
509 return self.reconnect.get_name()
512 if self.rpc is not None:
513 return self.rpc.send(msg)
515 return errno.ENOTCONN
518 if self.rpc is not None:
519 backlog = self.rpc.get_backlog()
520 error, msg = self.rpc.recv()
521 if self.rpc.get_backlog() < backlog:
522 # Data previously caught in a queue was successfully sent (or
523 # there's an error, which we'll catch below).
525 # We don't count data that is successfully sent immediately as
526 # activity, because there's a lot of queuing downstream from
527 # us, which means that we can push a lot of data into a
528 # connection that has stalled and won't ever recover.
529 self.reconnect.activity(ovs.timeval.msec())
532 if msg.type == Message.T_REQUEST and msg.method == "echo":
533 # Echo request. Send reply.
534 self.send(Message.create_reply(msg.params, msg.id))
535 elif msg.type == Message.T_REPLY and msg.id == "echo":
536 # It's a reply to our echo request. Suppress it.
542 def recv_wait(self, poller):
543 if self.rpc is not None:
544 self.rpc.recv_wait(poller)
547 if self.rpc is not None or self.stream is not None:
550 max_tries = self.reconnect.get_max_tries()
551 return max_tries is None or max_tries > 0
553 def is_connected(self):
554 return self.rpc is not None
559 def force_reconnect(self):
560 self.reconnect.force_reconnect(ovs.timeval.msec())