1 # Copyright (c) 2010 Nicira Networks
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at:
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
27 class Message(object):
28 T_REQUEST = 0 # Request.
29 T_NOTIFY = 1 # Notification.
30 T_REPLY = 2 # Successful reply.
31 T_ERROR = 3 # Error reply.
33 __types = {T_REQUEST: "request",
34 T_NOTIFY: "notification",
39 def __init__(self, type, method, params, result, error, id):
50 this_id = Message._next_id
55 def create_request(method, params):
56 return Message(Message.T_REQUEST, method, params, None, None,
60 def create_notify(method, params):
61 return Message(Message.T_NOTIFY, method, params, None, None,
65 def create_reply(result, id):
66 return Message(Message.T_REPLY, None, None, result, None, id)
69 def create_error(error, id):
70 return Message(Message.T_ERROR, None, None, None, error, id)
73 def type_to_string(type):
74 return Message.__types[type]
77 def __validate_arg(value, name, must_have):
78 if (value is not None) == (must_have != 0):
81 type_name = Message.type_to_string(self.type)
86 return "%s %s have \"%s\"" % (type_name, verb, name)
89 if self.params is not None and type(self.params) != list:
90 return "\"params\" must be JSON array"
92 pattern = {Message.T_REQUEST: 0x11001,
93 Message.T_NOTIFY: 0x11000,
94 Message.T_REPLY: 0x00101,
95 Message.T_ERROR: 0x00011}.get(self.type)
97 return "invalid JSON-RPC message type %s" % self.type
100 Message.__validate_arg(self.method, "method", pattern & 0x10000) or
101 Message.__validate_arg(self.params, "params", pattern & 0x1000) or
102 Message.__validate_arg(self.result, "result", pattern & 0x100) or
103 Message.__validate_arg(self.error, "error", pattern & 0x10) or
104 Message.__validate_arg(self.id, "id", pattern & 0x1))
108 if type(json) != dict:
109 return "message is not a JSON object"
111 # Make a copy to avoid modifying the caller's dict.
115 method = json.pop("method")
116 if type(method) not in [str, unicode]:
117 return "method is not a JSON string"
121 params = json.pop("params", None)
122 result = json.pop("result", None)
123 error = json.pop("error", None)
124 id = json.pop("id", None)
126 return "message has unexpected member \"%s\"" % json.popitem()[0]
128 if result is not None:
129 msg_type = Message.T_REPLY
130 elif error is not None:
131 msg_type = Message.T_ERROR
133 msg_type = Message.T_REQUEST
135 msg_type = Message.T_NOTIFY
137 msg = Message(msg_type, method, params, result, error, id)
138 validation_error = msg.is_valid()
139 if validation_error is not None:
140 return validation_error
147 if self.method is not None:
148 json["method"] = self.method
150 if self.params is not None:
151 json["params"] = self.params
153 if self.result is not None or self.type == Message.T_ERROR:
154 json["result"] = self.result
156 if self.error is not None or self.type == Message.T_REPLY:
157 json["error"] = self.error
159 if self.id is not None or self.type == Message.T_NOTIFY:
165 s = [Message.type_to_string(self.type)]
166 if self.method is not None:
167 s.append("method=\"%s\"" % self.method)
168 if self.params is not None:
169 s.append("params=" + ovs.json.to_string(self.params))
170 if self.error is not None:
171 s.append("error=" + ovs.json.to_string(self.error))
172 if self.id is not None:
173 s.append("id=" + ovs.json.to_string(self.id))
176 class Connection(object):
177 def __init__(self, stream):
178 self.name = stream.get_name()
193 while len(self.output):
194 retval = self.stream.send(self.output)
196 self.output = self.output[retval:]
198 if retval != -errno.EAGAIN:
199 logging.warn("%s: send error: %s" % (self.name,
200 os.strerror(-retval)))
204 def wait(self, poller):
206 self.stream.run_wait(poller)
208 self.stream.send_wait()
210 def get_status(self):
213 def get_backlog(self):
217 return len(self.output)
222 def __log_msg(self, title, msg):
223 logging.debug("%s: %s %s" % (self.name, title, msg))
229 self.__log_msg("send", msg)
231 was_empty = len(self.output) == 0
232 self.output += ovs.json.to_string(msg.to_json())
237 def send_block(self, msg):
238 error = self.send(msg)
244 if not self.get_backlog() or self.get_status():
247 poller = ovs.poller.Poller()
253 return self.status, None
256 if len(self.input) == 0:
257 error, data = self.stream.recv(4096)
259 if error == errno.EAGAIN:
263 logging.warning("%s: receive error: %s"
264 % (self.name, os.strerror(error)))
266 return self.status, None
273 if self.parser is None:
274 self.parser = ovs.json.Parser()
275 self.input = self.input[self.parser.feed(self.input):]
276 if self.parser.is_done():
277 msg = self.__process_msg()
281 return self.status, None
283 def recv_block(self):
285 error, msg = self.recv()
286 if error != errno.EAGAIN:
291 poller = ovs.poller.Poller()
293 self.recv_wait(poller)
296 def transact_block(self, request):
299 error = self.send(request)
302 error, reply = self.recv_block()
303 if reply and reply.type == Message.T_REPLY and reply.id == id:
307 def __process_msg(self):
308 json = self.parser.finish()
310 if type(json) in [str, unicode]:
312 logging.warning("%s: error parsing stream: %s" % (self.name, json))
313 self.error(errno.EPROTO)
316 msg = Message.from_json(json)
317 if not isinstance(msg, Message):
319 logging.warning("%s: received bad JSON-RPC message: %s"
321 self.error(errno.EPROTO)
324 self.__log_msg("received", msg)
327 def recv_wait(self, poller):
328 if self.status or len(self.input) > 0:
329 poller.immediate_wake()
331 self.stream.recv_wait(poller)
333 def error(self, error):
339 class Session(object):
340 """A JSON-RPC session with reconnection."""
342 def __init__(self, reconnect, rpc):
343 self.reconnect = reconnect
351 """Creates and returns a Session that maintains a JSON-RPC session to
352 'name', which should be a string acceptable to ovs.stream.Stream or
353 ovs.stream.PassiveStream's initializer.
355 If 'name' is an active connection method, e.g. "tcp:127.1.2.3", the new
356 session connects and reconnects, with back-off, to 'name'.
358 If 'name' is a passive connection method, e.g. "ptcp:", the new session
359 listens for connections to 'name'. It maintains at most one connection
360 at any given time. Any new connection causes the previous one (if any)
362 reconnect = ovs.reconnect.Reconnect(ovs.timeval.msec())
363 reconnect.set_name(name)
364 reconnect.enable(ovs.timeval.msec())
366 if ovs.stream.PassiveStream.is_valid_name(name):
367 self.reconnect.set_passive(True, ovs.timeval.msec())
369 return Session(reconnect, None)
372 def open_unreliably(jsonrpc):
373 reconnect = ovs.reconnect.Reconnect(ovs.timeval.msec())
374 reconnect.set_quiet(True)
375 reconnect.set_name(jsonrpc.get_name())
376 reconnect.set_max_tries(0)
377 reconnect.connected(ovs.timeval.msec())
378 return Session(reconnect, jsonrpc)
381 if self.rpc is not None:
384 if self.stream is not None:
387 if self.pstream is not None:
391 def __disconnect(self):
392 if self.rpc is not None:
397 elif self.stream is not None:
405 name = self.reconnect.get_name()
406 if not self.reconnect.is_passive():
407 error, self.stream = ovs.stream.Stream.open(name)
409 self.reconnect.connecting(ovs.timeval.msec())
411 self.reconnect.connect_failed(ovs.timeval.msec(), error)
412 elif self.pstream is not None:
413 error, self.pstream = ovs.stream.PassiveStream.open(name)
415 self.reconnect.listening(ovs.timeval.msec())
417 self.reconnect.connect_failed(ovs.timeval.msec(), error)
422 if self.pstream is not None:
423 error, stream = self.pstream.accept()
425 if self.rpc or self.stream:
427 logging.info("%s: new connection replacing active "
428 "connection" % self.reconnect.get_name())
430 self.reconnect.connected(ovs.timeval.msec())
431 self.rpc = Connection(stream)
432 elif error != errno.EAGAIN:
433 self.reconnect.listen_error(ovs.timeval.msec(), error)
439 error = self.rpc.get_status()
441 self.reconnect.disconnected(ovs.timeval.msec(), error)
443 elif self.stream is not None:
445 error = self.stream.connect()
447 self.reconnect.connected(ovs.timeval.msec())
448 self.rpc = Connection(self.stream)
450 elif error != errno.EAGAIN:
451 self.reconnect.connect_failed(ovs.timeval.msec(), error)
455 action = self.reconnect.run(ovs.timeval.msec())
456 if action == ovs.reconnect.CONNECT:
458 elif action == ovs.reconnect.DISCONNECT:
459 self.reconnect.disconnected(ovs.timeval.msec(), 0)
461 elif action == ovs.reconnect.PROBE:
463 request = Message.create_request("echo", [])
465 self.rpc.send(request)
467 assert action == None
469 def wait(self, poller):
470 if self.rpc is not None:
471 self.rpc.wait(poller)
472 elif self.stream is not None:
473 self.stream.run_wait(poller)
474 self.stream.connect_wait(poller)
475 if self.pstream is not None:
476 self.pstream.wait(poller)
477 self.reconnect.wait(poller, ovs.timeval.msec())
479 def get_backlog(self):
480 if self.rpc is not None:
481 return self.rpc.get_backlog()
486 return self.reconnect.get_name()
489 if self.rpc is not None:
490 return self.rpc.send(msg)
492 return errno.ENOTCONN
495 if self.rpc is not None:
496 error, msg = self.rpc.recv()
498 self.reconnect.received(ovs.timeval.msec())
499 if msg.type == Message.T_REQUEST and msg.method == "echo":
500 # Echo request. Send reply.
501 self.send(Message.create_reply(msg.params, msg.id))
502 elif msg.type == Message.T_REPLY and msg.id == "echo":
503 # It's a reply to our echo request. Suppress it.
509 def recv_wait(self, poller):
510 if self.rpc is not None:
511 self.rpc.recv_wait(poller)
514 if self.rpc is not None or self.stream is not None:
517 max_tries = self.reconnect.get_max_tries()
518 return max_tries is None or max_tries > 0
520 def is_connected(self):
521 return self.rpc is not None
526 def force_reconnect(self):
527 self.reconnect.force_reconnect(ovs.timeval.msec())