1 # Copyright (c) 2010, 2011 Nicira Networks
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at:
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
27 class Message(object):
28 T_REQUEST = 0 # Request.
29 T_NOTIFY = 1 # Notification.
30 T_REPLY = 2 # Successful reply.
31 T_ERROR = 3 # Error reply.
33 __types = {T_REQUEST: "request",
34 T_NOTIFY: "notification",
38 def __init__(self, type_, method, params, result, error, id):
49 this_id = Message._next_id
54 def create_request(method, params):
55 return Message(Message.T_REQUEST, method, params, None, None,
59 def create_notify(method, params):
60 return Message(Message.T_NOTIFY, method, params, None, None,
64 def create_reply(result, id):
65 return Message(Message.T_REPLY, None, None, result, None, id)
68 def create_error(error, id):
69 return Message(Message.T_ERROR, None, None, None, error, id)
72 def type_to_string(type_):
73 return Message.__types[type_]
75 def __validate_arg(self, value, name, must_have):
76 if (value is not None) == (must_have != 0):
79 type_name = Message.type_to_string(self.type)
84 return "%s %s have \"%s\"" % (type_name, verb, name)
87 if self.params is not None and type(self.params) != list:
88 return "\"params\" must be JSON array"
90 pattern = {Message.T_REQUEST: 0x11001,
91 Message.T_NOTIFY: 0x11000,
92 Message.T_REPLY: 0x00101,
93 Message.T_ERROR: 0x00011}.get(self.type)
95 return "invalid JSON-RPC message type %s" % self.type
98 self.__validate_arg(self.method, "method", pattern & 0x10000) or
99 self.__validate_arg(self.params, "params", pattern & 0x1000) or
100 self.__validate_arg(self.result, "result", pattern & 0x100) or
101 self.__validate_arg(self.error, "error", pattern & 0x10) or
102 self.__validate_arg(self.id, "id", pattern & 0x1))
106 if type(json) != dict:
107 return "message is not a JSON object"
109 # Make a copy to avoid modifying the caller's dict.
113 method = json.pop("method")
114 if type(method) not in [str, unicode]:
115 return "method is not a JSON string"
119 params = json.pop("params", None)
120 result = json.pop("result", None)
121 error = json.pop("error", None)
122 id = json.pop("id", None)
124 return "message has unexpected member \"%s\"" % json.popitem()[0]
126 if result is not None:
127 msg_type = Message.T_REPLY
128 elif error is not None:
129 msg_type = Message.T_ERROR
131 msg_type = Message.T_REQUEST
133 msg_type = Message.T_NOTIFY
135 msg = Message(msg_type, method, params, result, error, id)
136 validation_error = msg.is_valid()
137 if validation_error is not None:
138 return validation_error
145 if self.method is not None:
146 json["method"] = self.method
148 if self.params is not None:
149 json["params"] = self.params
151 if self.result is not None or self.type == Message.T_ERROR:
152 json["result"] = self.result
154 if self.error is not None or self.type == Message.T_REPLY:
155 json["error"] = self.error
157 if self.id is not None or self.type == Message.T_NOTIFY:
163 s = [Message.type_to_string(self.type)]
164 if self.method is not None:
165 s.append("method=\"%s\"" % self.method)
166 if self.params is not None:
167 s.append("params=" + ovs.json.to_string(self.params))
168 if self.error is not None:
169 s.append("error=" + ovs.json.to_string(self.error))
170 if self.id is not None:
171 s.append("id=" + ovs.json.to_string(self.id))
174 class Connection(object):
175 def __init__(self, stream):
176 self.name = stream.name
191 while len(self.output):
192 retval = self.stream.send(self.output)
194 self.output = self.output[retval:]
196 if retval != -errno.EAGAIN:
197 logging.warn("%s: send error: %s" % (self.name,
198 os.strerror(-retval)))
202 def wait(self, poller):
204 self.stream.run_wait(poller)
206 self.stream.send_wait()
208 def get_status(self):
211 def get_backlog(self):
215 return len(self.output)
217 def __log_msg(self, title, msg):
218 logging.debug("%s: %s %s" % (self.name, title, msg))
224 self.__log_msg("send", msg)
226 was_empty = len(self.output) == 0
227 self.output += ovs.json.to_string(msg.to_json())
232 def send_block(self, msg):
233 error = self.send(msg)
239 if not self.get_backlog() or self.get_status():
242 poller = ovs.poller.Poller()
248 return self.status, None
252 error, data = self.stream.recv(4096)
254 if error == errno.EAGAIN:
258 logging.warning("%s: receive error: %s"
259 % (self.name, os.strerror(error)))
261 return self.status, None
268 if self.parser is None:
269 self.parser = ovs.json.Parser()
270 self.input = self.input[self.parser.feed(self.input):]
271 if self.parser.is_done():
272 msg = self.__process_msg()
276 return self.status, None
278 def recv_block(self):
280 error, msg = self.recv()
281 if error != errno.EAGAIN:
286 poller = ovs.poller.Poller()
288 self.recv_wait(poller)
291 def transact_block(self, request):
294 error = self.send(request)
297 error, reply = self.recv_block()
298 if reply and reply.type == Message.T_REPLY and reply.id == id:
302 def __process_msg(self):
303 json = self.parser.finish()
305 if type(json) in [str, unicode]:
307 logging.warning("%s: error parsing stream: %s" % (self.name, json))
308 self.error(errno.EPROTO)
311 msg = Message.from_json(json)
312 if not isinstance(msg, Message):
314 logging.warning("%s: received bad JSON-RPC message: %s"
316 self.error(errno.EPROTO)
319 self.__log_msg("received", msg)
322 def recv_wait(self, poller):
323 if self.status or self.input:
324 poller.immediate_wake()
326 self.stream.recv_wait(poller)
328 def error(self, error):
334 class Session(object):
335 """A JSON-RPC session with reconnection."""
337 def __init__(self, reconnect, rpc):
338 self.reconnect = reconnect
346 """Creates and returns a Session that maintains a JSON-RPC session to
347 'name', which should be a string acceptable to ovs.stream.Stream or
348 ovs.stream.PassiveStream's initializer.
350 If 'name' is an active connection method, e.g. "tcp:127.1.2.3", the new
351 session connects and reconnects, with back-off, to 'name'.
353 If 'name' is a passive connection method, e.g. "ptcp:", the new session
354 listens for connections to 'name'. It maintains at most one connection
355 at any given time. Any new connection causes the previous one (if any)
357 reconnect = ovs.reconnect.Reconnect(ovs.timeval.msec())
358 reconnect.set_name(name)
359 reconnect.enable(ovs.timeval.msec())
361 if ovs.stream.PassiveStream.is_valid_name(name):
362 reconnect.set_passive(True, ovs.timeval.msec())
364 return Session(reconnect, None)
367 def open_unreliably(jsonrpc):
368 reconnect = ovs.reconnect.Reconnect(ovs.timeval.msec())
369 reconnect.set_quiet(True)
370 reconnect.set_name(jsonrpc.name)
371 reconnect.set_max_tries(0)
372 reconnect.connected(ovs.timeval.msec())
373 return Session(reconnect, jsonrpc)
376 if self.rpc is not None:
379 if self.stream is not None:
382 if self.pstream is not None:
386 def __disconnect(self):
387 if self.rpc is not None:
392 elif self.stream is not None:
400 name = self.reconnect.get_name()
401 if not self.reconnect.is_passive():
402 error, self.stream = ovs.stream.Stream.open(name)
404 self.reconnect.connecting(ovs.timeval.msec())
406 self.reconnect.connect_failed(ovs.timeval.msec(), error)
407 elif self.pstream is not None:
408 error, self.pstream = ovs.stream.PassiveStream.open(name)
410 self.reconnect.listening(ovs.timeval.msec())
412 self.reconnect.connect_failed(ovs.timeval.msec(), error)
417 if self.pstream is not None:
418 error, stream = self.pstream.accept()
420 if self.rpc or self.stream:
422 logging.info("%s: new connection replacing active "
423 "connection" % self.reconnect.get_name())
425 self.reconnect.connected(ovs.timeval.msec())
426 self.rpc = Connection(stream)
427 elif error != errno.EAGAIN:
428 self.reconnect.listen_error(ovs.timeval.msec(), error)
434 error = self.rpc.get_status()
436 self.reconnect.disconnected(ovs.timeval.msec(), error)
438 elif self.stream is not None:
440 error = self.stream.connect()
442 self.reconnect.connected(ovs.timeval.msec())
443 self.rpc = Connection(self.stream)
445 elif error != errno.EAGAIN:
446 self.reconnect.connect_failed(ovs.timeval.msec(), error)
450 action = self.reconnect.run(ovs.timeval.msec())
451 if action == ovs.reconnect.CONNECT:
453 elif action == ovs.reconnect.DISCONNECT:
454 self.reconnect.disconnected(ovs.timeval.msec(), 0)
456 elif action == ovs.reconnect.PROBE:
458 request = Message.create_request("echo", [])
460 self.rpc.send(request)
462 assert action == None
464 def wait(self, poller):
465 if self.rpc is not None:
466 self.rpc.wait(poller)
467 elif self.stream is not None:
468 self.stream.run_wait(poller)
469 self.stream.connect_wait(poller)
470 if self.pstream is not None:
471 self.pstream.wait(poller)
472 self.reconnect.wait(poller, ovs.timeval.msec())
474 def get_backlog(self):
475 if self.rpc is not None:
476 return self.rpc.get_backlog()
481 return self.reconnect.get_name()
484 if self.rpc is not None:
485 return self.rpc.send(msg)
487 return errno.ENOTCONN
490 if self.rpc is not None:
491 error, msg = self.rpc.recv()
493 self.reconnect.received(ovs.timeval.msec())
494 if msg.type == Message.T_REQUEST and msg.method == "echo":
495 # Echo request. Send reply.
496 self.send(Message.create_reply(msg.params, msg.id))
497 elif msg.type == Message.T_REPLY and msg.id == "echo":
498 # It's a reply to our echo request. Suppress it.
504 def recv_wait(self, poller):
505 if self.rpc is not None:
506 self.rpc.recv_wait(poller)
509 if self.rpc is not None or self.stream is not None:
512 max_tries = self.reconnect.get_max_tries()
513 return max_tries is None or max_tries > 0
515 def is_connected(self):
516 return self.rpc is not None
521 def force_reconnect(self):
522 self.reconnect.force_reconnect(ovs.timeval.msec())