1 # Copyright (c) 2010, 2011 Nicira Networks
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at:
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
27 class Message(object):
28 T_REQUEST = 0 # Request.
29 T_NOTIFY = 1 # Notification.
30 T_REPLY = 2 # Successful reply.
31 T_ERROR = 3 # Error reply.
33 __types = {T_REQUEST: "request",
34 T_NOTIFY: "notification",
38 def __init__(self, type_, method, params, result, error, id):
49 this_id = Message._next_id
54 def create_request(method, params):
55 return Message(Message.T_REQUEST, method, params, None, None,
59 def create_notify(method, params):
60 return Message(Message.T_NOTIFY, method, params, None, None,
64 def create_reply(result, id):
65 return Message(Message.T_REPLY, None, None, result, None, id)
68 def create_error(error, id):
69 return Message(Message.T_ERROR, None, None, None, error, id)
72 def type_to_string(type_):
73 return Message.__types[type_]
76 def __validate_arg(value, name, must_have):
77 if (value is not None) == (must_have != 0):
80 type_name = Message.type_to_string(self.type)
85 return "%s %s have \"%s\"" % (type_name, verb, name)
88 if self.params is not None and type(self.params) != list:
89 return "\"params\" must be JSON array"
91 pattern = {Message.T_REQUEST: 0x11001,
92 Message.T_NOTIFY: 0x11000,
93 Message.T_REPLY: 0x00101,
94 Message.T_ERROR: 0x00011}.get(self.type)
96 return "invalid JSON-RPC message type %s" % self.type
99 Message.__validate_arg(self.method, "method", pattern & 0x10000) or
100 Message.__validate_arg(self.params, "params", pattern & 0x1000) or
101 Message.__validate_arg(self.result, "result", pattern & 0x100) or
102 Message.__validate_arg(self.error, "error", pattern & 0x10) or
103 Message.__validate_arg(self.id, "id", pattern & 0x1))
107 if type(json) != dict:
108 return "message is not a JSON object"
110 # Make a copy to avoid modifying the caller's dict.
114 method = json.pop("method")
115 if type(method) not in [str, unicode]:
116 return "method is not a JSON string"
120 params = json.pop("params", None)
121 result = json.pop("result", None)
122 error = json.pop("error", None)
123 id = json.pop("id", None)
125 return "message has unexpected member \"%s\"" % json.popitem()[0]
127 if result is not None:
128 msg_type = Message.T_REPLY
129 elif error is not None:
130 msg_type = Message.T_ERROR
132 msg_type = Message.T_REQUEST
134 msg_type = Message.T_NOTIFY
136 msg = Message(msg_type, method, params, result, error, id)
137 validation_error = msg.is_valid()
138 if validation_error is not None:
139 return validation_error
146 if self.method is not None:
147 json["method"] = self.method
149 if self.params is not None:
150 json["params"] = self.params
152 if self.result is not None or self.type == Message.T_ERROR:
153 json["result"] = self.result
155 if self.error is not None or self.type == Message.T_REPLY:
156 json["error"] = self.error
158 if self.id is not None or self.type == Message.T_NOTIFY:
164 s = [Message.type_to_string(self.type)]
165 if self.method is not None:
166 s.append("method=\"%s\"" % self.method)
167 if self.params is not None:
168 s.append("params=" + ovs.json.to_string(self.params))
169 if self.error is not None:
170 s.append("error=" + ovs.json.to_string(self.error))
171 if self.id is not None:
172 s.append("id=" + ovs.json.to_string(self.id))
175 class Connection(object):
176 def __init__(self, stream):
177 self.name = stream.name
192 while len(self.output):
193 retval = self.stream.send(self.output)
195 self.output = self.output[retval:]
197 if retval != -errno.EAGAIN:
198 logging.warn("%s: send error: %s" % (self.name,
199 os.strerror(-retval)))
203 def wait(self, poller):
205 self.stream.run_wait(poller)
207 self.stream.send_wait()
209 def get_status(self):
212 def get_backlog(self):
216 return len(self.output)
218 def __log_msg(self, title, msg):
219 logging.debug("%s: %s %s" % (self.name, title, msg))
225 self.__log_msg("send", msg)
227 was_empty = len(self.output) == 0
228 self.output += ovs.json.to_string(msg.to_json())
233 def send_block(self, msg):
234 error = self.send(msg)
240 if not self.get_backlog() or self.get_status():
243 poller = ovs.poller.Poller()
249 return self.status, None
253 error, data = self.stream.recv(4096)
255 if error == errno.EAGAIN:
259 logging.warning("%s: receive error: %s"
260 % (self.name, os.strerror(error)))
262 return self.status, None
269 if self.parser is None:
270 self.parser = ovs.json.Parser()
271 self.input = self.input[self.parser.feed(self.input):]
272 if self.parser.is_done():
273 msg = self.__process_msg()
277 return self.status, None
279 def recv_block(self):
281 error, msg = self.recv()
282 if error != errno.EAGAIN:
287 poller = ovs.poller.Poller()
289 self.recv_wait(poller)
292 def transact_block(self, request):
295 error = self.send(request)
298 error, reply = self.recv_block()
299 if reply and reply.type == Message.T_REPLY and reply.id == id:
303 def __process_msg(self):
304 json = self.parser.finish()
306 if type(json) in [str, unicode]:
308 logging.warning("%s: error parsing stream: %s" % (self.name, json))
309 self.error(errno.EPROTO)
312 msg = Message.from_json(json)
313 if not isinstance(msg, Message):
315 logging.warning("%s: received bad JSON-RPC message: %s"
317 self.error(errno.EPROTO)
320 self.__log_msg("received", msg)
323 def recv_wait(self, poller):
324 if self.status or self.input:
325 poller.immediate_wake()
327 self.stream.recv_wait(poller)
329 def error(self, error):
335 class Session(object):
336 """A JSON-RPC session with reconnection."""
338 def __init__(self, reconnect, rpc):
339 self.reconnect = reconnect
347 """Creates and returns a Session that maintains a JSON-RPC session to
348 'name', which should be a string acceptable to ovs.stream.Stream or
349 ovs.stream.PassiveStream's initializer.
351 If 'name' is an active connection method, e.g. "tcp:127.1.2.3", the new
352 session connects and reconnects, with back-off, to 'name'.
354 If 'name' is a passive connection method, e.g. "ptcp:", the new session
355 listens for connections to 'name'. It maintains at most one connection
356 at any given time. Any new connection causes the previous one (if any)
358 reconnect = ovs.reconnect.Reconnect(ovs.timeval.msec())
359 reconnect.set_name(name)
360 reconnect.enable(ovs.timeval.msec())
362 if ovs.stream.PassiveStream.is_valid_name(name):
363 self.reconnect.set_passive(True, ovs.timeval.msec())
365 return Session(reconnect, None)
368 def open_unreliably(jsonrpc):
369 reconnect = ovs.reconnect.Reconnect(ovs.timeval.msec())
370 reconnect.set_quiet(True)
371 reconnect.set_name(jsonrpc.name)
372 reconnect.set_max_tries(0)
373 reconnect.connected(ovs.timeval.msec())
374 return Session(reconnect, jsonrpc)
377 if self.rpc is not None:
380 if self.stream is not None:
383 if self.pstream is not None:
387 def __disconnect(self):
388 if self.rpc is not None:
393 elif self.stream is not None:
401 name = self.reconnect.get_name()
402 if not self.reconnect.is_passive():
403 error, self.stream = ovs.stream.Stream.open(name)
405 self.reconnect.connecting(ovs.timeval.msec())
407 self.reconnect.connect_failed(ovs.timeval.msec(), error)
408 elif self.pstream is not None:
409 error, self.pstream = ovs.stream.PassiveStream.open(name)
411 self.reconnect.listening(ovs.timeval.msec())
413 self.reconnect.connect_failed(ovs.timeval.msec(), error)
418 if self.pstream is not None:
419 error, stream = self.pstream.accept()
421 if self.rpc or self.stream:
423 logging.info("%s: new connection replacing active "
424 "connection" % self.reconnect.get_name())
426 self.reconnect.connected(ovs.timeval.msec())
427 self.rpc = Connection(stream)
428 elif error != errno.EAGAIN:
429 self.reconnect.listen_error(ovs.timeval.msec(), error)
435 error = self.rpc.get_status()
437 self.reconnect.disconnected(ovs.timeval.msec(), error)
439 elif self.stream is not None:
441 error = self.stream.connect()
443 self.reconnect.connected(ovs.timeval.msec())
444 self.rpc = Connection(self.stream)
446 elif error != errno.EAGAIN:
447 self.reconnect.connect_failed(ovs.timeval.msec(), error)
451 action = self.reconnect.run(ovs.timeval.msec())
452 if action == ovs.reconnect.CONNECT:
454 elif action == ovs.reconnect.DISCONNECT:
455 self.reconnect.disconnected(ovs.timeval.msec(), 0)
457 elif action == ovs.reconnect.PROBE:
459 request = Message.create_request("echo", [])
461 self.rpc.send(request)
463 assert action == None
465 def wait(self, poller):
466 if self.rpc is not None:
467 self.rpc.wait(poller)
468 elif self.stream is not None:
469 self.stream.run_wait(poller)
470 self.stream.connect_wait(poller)
471 if self.pstream is not None:
472 self.pstream.wait(poller)
473 self.reconnect.wait(poller, ovs.timeval.msec())
475 def get_backlog(self):
476 if self.rpc is not None:
477 return self.rpc.get_backlog()
482 return self.reconnect.get_name()
485 if self.rpc is not None:
486 return self.rpc.send(msg)
488 return errno.ENOTCONN
491 if self.rpc is not None:
492 error, msg = self.rpc.recv()
494 self.reconnect.received(ovs.timeval.msec())
495 if msg.type == Message.T_REQUEST and msg.method == "echo":
496 # Echo request. Send reply.
497 self.send(Message.create_reply(msg.params, msg.id))
498 elif msg.type == Message.T_REPLY and msg.id == "echo":
499 # It's a reply to our echo request. Suppress it.
505 def recv_wait(self, poller):
506 if self.rpc is not None:
507 self.rpc.recv_wait(poller)
510 if self.rpc is not None or self.stream is not None:
513 max_tries = self.reconnect.get_max_tries()
514 return max_tries is None or max_tries > 0
516 def is_connected(self):
517 return self.rpc is not None
522 def force_reconnect(self):
523 self.reconnect.force_reconnect(ovs.timeval.msec())