1 # Copyright (c) 2010 Nicira Networks
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at:
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
26 class Message(object):
27 T_REQUEST = 0 # Request.
28 T_NOTIFY = 1 # Notification.
29 T_REPLY = 2 # Successful reply.
30 T_ERROR = 3 # Error reply.
32 __types = {T_REQUEST: "request",
33 T_NOTIFY: "notification",
38 def __init__(self, type, method, params, result, error, id):
49 this_id = Message._next_id
54 def create_request(method, params):
55 return Message(Message.T_REQUEST, method, params, None, None,
59 def create_notify(method, params):
60 return Message(Message.T_NOTIFY, method, params, None, None,
64 def create_reply(result, id):
65 return Message(Message.T_REPLY, None, None, result, None, id)
68 def create_error(error, id):
69 return Message(Message.T_ERROR, None, None, None, error, id)
72 def type_to_string(type):
73 return Message.__types[type]
76 def __validate_arg(value, name, must_have):
77 if (value is not None) == (must_have != 0):
80 type_name = Message.type_to_string(self.type)
85 return "%s %s have \"%s\"" % (type_name, verb, name)
88 if self.params is not None and type(self.params) != list:
89 return "\"params\" must be JSON array"
91 pattern = {Message.T_REQUEST: 0x11001,
92 Message.T_NOTIFY: 0x11000,
93 Message.T_REPLY: 0x00101,
94 Message.T_ERROR: 0x00011}.get(self.type)
96 return "invalid JSON-RPC message type %s" % self.type
99 Message.__validate_arg(self.method, "method", pattern & 0x10000) or
100 Message.__validate_arg(self.params, "params", pattern & 0x1000) or
101 Message.__validate_arg(self.result, "result", pattern & 0x100) or
102 Message.__validate_arg(self.error, "error", pattern & 0x10) or
103 Message.__validate_arg(self.id, "id", pattern & 0x1))
107 if type(json) != dict:
108 return "message is not a JSON object"
110 # Make a copy to avoid modifying the caller's dict.
114 method = json.pop("method")
115 if type(method) not in [str, unicode]:
116 return "method is not a JSON string"
120 params = json.pop("params", None)
121 result = json.pop("result", None)
122 error = json.pop("error", None)
123 id = json.pop("id", None)
125 return "message has unexpected member \"%s\"" % json.popitem()[0]
127 if result is not None:
128 msg_type = Message.T_REPLY
129 elif error is not None:
130 msg_type = Message.T_ERROR
132 msg_type = Message.T_REQUEST
134 msg_type = Message.T_NOTIFY
136 msg = Message(msg_type, method, params, result, error, id)
137 validation_error = msg.is_valid()
138 if validation_error is not None:
139 return validation_error
146 if self.method is not None:
147 json["method"] = self.method
149 if self.params is not None:
150 json["params"] = self.params
152 if self.result is not None or self.type == Message.T_ERROR:
153 json["result"] = self.result
155 if self.error is not None or self.type == Message.T_REPLY:
156 json["error"] = self.error
158 if self.id is not None or self.type == Message.T_NOTIFY:
164 s = [Message.type_to_string(self.type)]
165 if self.method is not None:
166 s.append("method=\"%s\"" % self.method)
167 if self.params is not None:
168 s.append("params=" + ovs.json.to_string(self.params))
169 if self.error is not None:
170 s.append("error=" + ovs.json.to_string(self.error))
171 if self.id is not None:
172 s.append("id=" + ovs.json.to_string(self.id))
175 class Connection(object):
176 def __init__(self, stream):
177 self.name = stream.get_name()
192 while len(self.output):
193 retval = self.stream.send(self.output)
195 self.output = self.output[retval:]
197 if retval != -errno.EAGAIN:
198 logging.warn("%s: send error: %s" % (self.name,
199 os.strerror(-retval)))
203 def wait(self, poller):
205 self.stream.run_wait(poller)
207 self.stream.send_wait()
209 def get_status(self):
212 def get_backlog(self):
216 return len(self.output)
221 def __log_msg(self, title, msg):
222 logging.debug("%s: %s %s" % (self.name, title, msg))
228 self.__log_msg("send", msg)
230 was_empty = len(self.output) == 0
231 self.output += ovs.json.to_string(msg.to_json())
236 def send_block(self, msg):
237 error = self.send(msg)
243 if not self.get_backlog() or self.get_status():
246 poller = ovs.poller.Poller()
252 return self.status, None
255 if len(self.input) == 0:
256 error, data = self.stream.recv(4096)
258 if error == errno.EAGAIN:
262 logging.warning("%s: receive error: %s"
263 % (self.name, os.strerror(error)))
265 return self.status, None
272 if self.parser is None:
273 self.parser = ovs.json.Parser()
274 self.input = self.input[self.parser.feed(self.input):]
275 if self.parser.is_done():
276 msg = self.__process_msg()
280 return self.status, None
282 def recv_block(self):
284 error, msg = self.recv()
285 if error != errno.EAGAIN:
290 poller = ovs.poller.Poller()
292 self.recv_wait(poller)
295 def transact_block(self, request):
298 error = self.send(request)
301 error, reply = self.recv_block()
302 if reply and reply.type == Message.T_REPLY and reply.id == id:
306 def __process_msg(self):
307 json = self.parser.finish()
309 if type(json) in [str, unicode]:
311 logging.warning("%s: error parsing stream: %s" % (self.name, json))
312 self.error(errno.EPROTO)
315 msg = Message.from_json(json)
316 if not isinstance(msg, Message):
318 logging.warning("%s: received bad JSON-RPC message: %s"
320 self.error(errno.EPROTO)
323 self.__log_msg("received", msg)
326 def recv_wait(self, poller):
327 if self.status or len(self.input) > 0:
328 poller.immediate_wake()
330 self.stream.recv_wait(poller)
332 def error(self, error):
338 class Session(object):
339 """A JSON-RPC session with reconnection."""
341 def __init__(self, reconnect, rpc):
342 self.reconnect = reconnect
350 """Creates and returns a Session that maintains a JSON-RPC session to
351 'name', which should be a string acceptable to ovs.stream.Stream or
352 ovs.stream.PassiveStream's initializer.
354 If 'name' is an active connection method, e.g. "tcp:127.1.2.3", the new
355 session connects and reconnects, with back-off, to 'name'.
357 If 'name' is a passive connection method, e.g. "ptcp:", the new session
358 listens for connections to 'name'. It maintains at most one connection
359 at any given time. Any new connection causes the previous one (if any)
361 reconnect = ovs.reconnect.Reconnect(ovs.timeval.msec())
362 reconnect.set_name(name)
363 reconnect.enable(ovs.timeval.msec())
365 if ovs.stream.PassiveStream.is_valid_name(name):
366 self.reconnect.set_passive(True, ovs.timeval.msec())
368 return Session(reconnect, None)
371 def open_unreliably(jsonrpc):
372 reconnect = ovs.reconnect.Reconnect(ovs.timeval.msec())
373 reconnect.set_quiet(True)
374 reconnect.set_name(jsonrpc.get_name())
375 reconnect.set_max_tries(0)
376 reconnect.connected(ovs.timeval.msec())
377 return Session(reconnect, jsonrpc)
380 if self.rpc is not None:
383 if self.stream is not None:
386 if self.pstream is not None:
390 def __disconnect(self):
391 if self.rpc is not None:
396 elif self.stream is not None:
404 name = self.reconnect.get_name()
405 if not self.reconnect.is_passive():
406 error, self.stream = ovs.stream.Stream.open(name)
408 self.reconnect.connecting(ovs.timeval.msec())
410 self.reconnect.connect_failed(ovs.timeval.msec(), error)
411 elif self.pstream is not None:
412 error, self.pstream = ovs.stream.PassiveStream.open(name)
414 self.reconnect.listening(ovs.timeval.msec())
416 self.reconnect.connect_failed(ovs.timeval.msec(), error)
421 if self.pstream is not None:
422 error, stream = self.pstream.accept()
424 if self.rpc or self.stream:
426 logging.info("%s: new connection replacing active "
427 "connection" % self.reconnect.get_name())
429 self.reconnect.connected(ovs.timeval.msec())
430 self.rpc = Connection(stream)
431 elif error != errno.EAGAIN:
432 self.reconnect.listen_error(ovs.timeval.msec(), error)
438 error = self.rpc.get_status()
440 self.reconnect.disconnected(ovs.timeval.msec(), error)
442 elif self.stream is not None:
444 error = self.stream.connect()
446 self.reconnect.connected(ovs.timeval.msec())
447 self.rpc = Connection(self.stream)
449 elif error != errno.EAGAIN:
450 self.reconnect.connect_failed(ovs.timeval.msec(), error)
454 action = self.reconnect.run(ovs.timeval.msec())
455 if action == ovs.reconnect.CONNECT:
457 elif action == ovs.reconnect.DISCONNECT:
458 self.reconnect.disconnected(ovs.timeval.msec(), 0)
460 elif action == ovs.reconnect.PROBE:
462 request = Message.create_request("echo", [])
464 self.rpc.send(request)
466 assert action == None
468 def wait(self, poller):
469 if self.rpc is not None:
470 self.rpc.wait(poller)
471 elif self.stream is not None:
472 self.stream.run_wait(poller)
473 self.stream.connect_wait(poller)
474 if self.pstream is not None:
475 self.pstream.wait(poller)
476 self.reconnect.wait(poller, ovs.timeval.msec())
478 def get_backlog(self):
479 if self.rpc is not None:
480 return self.rpc.get_backlog()
485 return self.reconnect.get_name()
488 if self.rpc is not None:
489 return self.rpc.send(msg)
491 return errno.ENOTCONN
494 if self.rpc is not None:
495 error, msg = self.rpc.recv()
497 self.reconnect.received(ovs.timeval.msec())
498 if msg.type == Message.T_REQUEST and msg.method == "echo":
499 # Echo request. Send reply.
500 self.send(Message.create_reply(msg.params, msg.id))
501 elif msg.type == Message.T_REPLY and msg.id == "echo":
502 # It's a reply to our echo request. Suppress it.
508 def recv_wait(self, poller):
509 if self.rpc is not None:
510 self.rpc.recv_wait(poller)
513 if self.rpc is not None or self.stream is not None:
516 max_tries = self.reconnect.get_max_tries()
517 return max_tries is None or max_tries > 0
519 def is_connected(self):
520 return self.rpc is not None
525 def force_reconnect(self):
526 self.reconnect.force_reconnect(ovs.timeval.msec())