1 # Copyright (c) 2010, 2011 Nicira Networks
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at:
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
27 class Message(object):
28 T_REQUEST = 0 # Request.
29 T_NOTIFY = 1 # Notification.
30 T_REPLY = 2 # Successful reply.
31 T_ERROR = 3 # Error reply.
33 __types = {T_REQUEST: "request",
34 T_NOTIFY: "notification",
38 def __init__(self, type_, method, params, result, error, id):
49 this_id = Message._next_id
54 def create_request(method, params):
55 return Message(Message.T_REQUEST, method, params, None, None,
59 def create_notify(method, params):
60 return Message(Message.T_NOTIFY, method, params, None, None,
64 def create_reply(result, id):
65 return Message(Message.T_REPLY, None, None, result, None, id)
68 def create_error(error, id):
69 return Message(Message.T_ERROR, None, None, None, error, id)
72 def type_to_string(type_):
73 return Message.__types[type_]
75 def __validate_arg(self, value, name, must_have):
76 if (value is not None) == (must_have != 0):
79 type_name = Message.type_to_string(self.type)
84 return "%s %s have \"%s\"" % (type_name, verb, name)
87 if self.params is not None and type(self.params) != list:
88 return "\"params\" must be JSON array"
90 pattern = {Message.T_REQUEST: 0x11001,
91 Message.T_NOTIFY: 0x11000,
92 Message.T_REPLY: 0x00101,
93 Message.T_ERROR: 0x00011}.get(self.type)
95 return "invalid JSON-RPC message type %s" % self.type
98 self.__validate_arg(self.method, "method", pattern & 0x10000) or
99 self.__validate_arg(self.params, "params", pattern & 0x1000) or
100 self.__validate_arg(self.result, "result", pattern & 0x100) or
101 self.__validate_arg(self.error, "error", pattern & 0x10) or
102 self.__validate_arg(self.id, "id", pattern & 0x1))
106 if type(json) != dict:
107 return "message is not a JSON object"
109 # Make a copy to avoid modifying the caller's dict.
113 method = json.pop("method")
114 if type(method) not in [str, unicode]:
115 return "method is not a JSON string"
119 params = json.pop("params", None)
120 result = json.pop("result", None)
121 error = json.pop("error", None)
122 id_ = json.pop("id", None)
124 return "message has unexpected member \"%s\"" % json.popitem()[0]
126 if result is not None:
127 msg_type = Message.T_REPLY
128 elif error is not None:
129 msg_type = Message.T_ERROR
130 elif id_ is not None:
131 msg_type = Message.T_REQUEST
133 msg_type = Message.T_NOTIFY
135 msg = Message(msg_type, method, params, result, error, id_)
136 validation_error = msg.is_valid()
137 if validation_error is not None:
138 return validation_error
145 if self.method is not None:
146 json["method"] = self.method
148 if self.params is not None:
149 json["params"] = self.params
151 if self.result is not None or self.type == Message.T_ERROR:
152 json["result"] = self.result
154 if self.error is not None or self.type == Message.T_REPLY:
155 json["error"] = self.error
157 if self.id is not None or self.type == Message.T_NOTIFY:
163 s = [Message.type_to_string(self.type)]
164 if self.method is not None:
165 s.append("method=\"%s\"" % self.method)
166 if self.params is not None:
167 s.append("params=" + ovs.json.to_string(self.params))
168 if self.result is not None:
169 s.append("result=" + ovs.json.to_string(self.result))
170 if self.error is not None:
171 s.append("error=" + ovs.json.to_string(self.error))
172 if self.id is not None:
173 s.append("id=" + ovs.json.to_string(self.id))
176 class Connection(object):
177 def __init__(self, stream):
178 self.name = stream.name
193 while len(self.output):
194 retval = self.stream.send(self.output)
196 self.output = self.output[retval:]
198 if retval != -errno.EAGAIN:
199 logging.warn("%s: send error: %s" % (self.name,
200 os.strerror(-retval)))
204 def wait(self, poller):
206 self.stream.run_wait(poller)
208 self.stream.send_wait()
210 def get_status(self):
213 def get_backlog(self):
217 return len(self.output)
219 def __log_msg(self, title, msg):
220 logging.debug("%s: %s %s" % (self.name, title, msg))
226 self.__log_msg("send", msg)
228 was_empty = len(self.output) == 0
229 self.output += ovs.json.to_string(msg.to_json())
234 def send_block(self, msg):
235 error = self.send(msg)
241 if not self.get_backlog() or self.get_status():
244 poller = ovs.poller.Poller()
250 return self.status, None
254 error, data = self.stream.recv(4096)
256 if error == errno.EAGAIN:
260 logging.warning("%s: receive error: %s"
261 % (self.name, os.strerror(error)))
263 return self.status, None
270 if self.parser is None:
271 self.parser = ovs.json.Parser()
272 self.input = self.input[self.parser.feed(self.input):]
273 if self.parser.is_done():
274 msg = self.__process_msg()
278 return self.status, None
280 def recv_block(self):
282 error, msg = self.recv()
283 if error != errno.EAGAIN:
288 poller = ovs.poller.Poller()
290 self.recv_wait(poller)
293 def transact_block(self, request):
296 error = self.send(request)
299 error, reply = self.recv_block()
300 if reply and reply.type == Message.T_REPLY and reply.id == id_:
304 def __process_msg(self):
305 json = self.parser.finish()
307 if type(json) in [str, unicode]:
309 logging.warning("%s: error parsing stream: %s" % (self.name, json))
310 self.error(errno.EPROTO)
313 msg = Message.from_json(json)
314 if not isinstance(msg, Message):
316 logging.warning("%s: received bad JSON-RPC message: %s"
318 self.error(errno.EPROTO)
321 self.__log_msg("received", msg)
324 def recv_wait(self, poller):
325 if self.status or self.input:
326 poller.immediate_wake()
328 self.stream.recv_wait(poller)
330 def error(self, error):
336 class Session(object):
337 """A JSON-RPC session with reconnection."""
339 def __init__(self, reconnect, rpc):
340 self.reconnect = reconnect
348 """Creates and returns a Session that maintains a JSON-RPC session to
349 'name', which should be a string acceptable to ovs.stream.Stream or
350 ovs.stream.PassiveStream's initializer.
352 If 'name' is an active connection method, e.g. "tcp:127.1.2.3", the new
353 session connects and reconnects, with back-off, to 'name'.
355 If 'name' is a passive connection method, e.g. "ptcp:", the new session
356 listens for connections to 'name'. It maintains at most one connection
357 at any given time. Any new connection causes the previous one (if any)
359 reconnect = ovs.reconnect.Reconnect(ovs.timeval.msec())
360 reconnect.set_name(name)
361 reconnect.enable(ovs.timeval.msec())
363 if ovs.stream.PassiveStream.is_valid_name(name):
364 reconnect.set_passive(True, ovs.timeval.msec())
366 return Session(reconnect, None)
369 def open_unreliably(jsonrpc):
370 reconnect = ovs.reconnect.Reconnect(ovs.timeval.msec())
371 reconnect.set_quiet(True)
372 reconnect.set_name(jsonrpc.name)
373 reconnect.set_max_tries(0)
374 reconnect.connected(ovs.timeval.msec())
375 return Session(reconnect, jsonrpc)
378 if self.rpc is not None:
381 if self.stream is not None:
384 if self.pstream is not None:
388 def __disconnect(self):
389 if self.rpc is not None:
394 elif self.stream is not None:
402 name = self.reconnect.get_name()
403 if not self.reconnect.is_passive():
404 error, self.stream = ovs.stream.Stream.open(name)
406 self.reconnect.connecting(ovs.timeval.msec())
408 self.reconnect.connect_failed(ovs.timeval.msec(), error)
409 elif self.pstream is not None:
410 error, self.pstream = ovs.stream.PassiveStream.open(name)
412 self.reconnect.listening(ovs.timeval.msec())
414 self.reconnect.connect_failed(ovs.timeval.msec(), error)
419 if self.pstream is not None:
420 error, stream = self.pstream.accept()
422 if self.rpc or self.stream:
424 logging.info("%s: new connection replacing active "
425 "connection" % self.reconnect.get_name())
427 self.reconnect.connected(ovs.timeval.msec())
428 self.rpc = Connection(stream)
429 elif error != errno.EAGAIN:
430 self.reconnect.listen_error(ovs.timeval.msec(), error)
436 error = self.rpc.get_status()
438 self.reconnect.disconnected(ovs.timeval.msec(), error)
440 elif self.stream is not None:
442 error = self.stream.connect()
444 self.reconnect.connected(ovs.timeval.msec())
445 self.rpc = Connection(self.stream)
447 elif error != errno.EAGAIN:
448 self.reconnect.connect_failed(ovs.timeval.msec(), error)
452 action = self.reconnect.run(ovs.timeval.msec())
453 if action == ovs.reconnect.CONNECT:
455 elif action == ovs.reconnect.DISCONNECT:
456 self.reconnect.disconnected(ovs.timeval.msec(), 0)
458 elif action == ovs.reconnect.PROBE:
460 request = Message.create_request("echo", [])
462 self.rpc.send(request)
464 assert action == None
466 def wait(self, poller):
467 if self.rpc is not None:
468 self.rpc.wait(poller)
469 elif self.stream is not None:
470 self.stream.run_wait(poller)
471 self.stream.connect_wait(poller)
472 if self.pstream is not None:
473 self.pstream.wait(poller)
474 self.reconnect.wait(poller, ovs.timeval.msec())
476 def get_backlog(self):
477 if self.rpc is not None:
478 return self.rpc.get_backlog()
483 return self.reconnect.get_name()
486 if self.rpc is not None:
487 return self.rpc.send(msg)
489 return errno.ENOTCONN
492 if self.rpc is not None:
493 error, msg = self.rpc.recv()
495 self.reconnect.received(ovs.timeval.msec())
496 if msg.type == Message.T_REQUEST and msg.method == "echo":
497 # Echo request. Send reply.
498 self.send(Message.create_reply(msg.params, msg.id))
499 elif msg.type == Message.T_REPLY and msg.id == "echo":
500 # It's a reply to our echo request. Suppress it.
506 def recv_wait(self, poller):
507 if self.rpc is not None:
508 self.rpc.recv_wait(poller)
511 if self.rpc is not None or self.stream is not None:
514 max_tries = self.reconnect.get_max_tries()
515 return max_tries is None or max_tries > 0
517 def is_connected(self):
518 return self.rpc is not None
523 def force_reconnect(self):
524 self.reconnect.force_reconnect(ovs.timeval.msec())