X-Git-Url: https://pintos-os.org/cgi-bin/gitweb.cgi?a=blobdiff_plain;ds=sidebyside;f=python%2Fovs%2Fjsonrpc.py;h=c1540eb78e9c59d3af53ee1856e417ce4145d1c5;hb=9387b97098afe90403cc139ad2b55ce7e02bc4a2;hp=a7b69c59039e87ccd8661e6f272012ec7759abfe;hpb=26bb0f31299d3f8eb06551d6a219846929c27149;p=openvswitch diff --git a/python/ovs/jsonrpc.py b/python/ovs/jsonrpc.py index a7b69c59..c1540eb7 100644 --- a/python/ovs/jsonrpc.py +++ b/python/ovs/jsonrpc.py @@ -1,4 +1,4 @@ -# Copyright (c) 2010, 2011 Nicira Networks +# Copyright (c) 2010, 2011, 2012 Nicira, Inc. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -13,7 +13,6 @@ # limitations under the License. import errno -import logging import os import ovs.json @@ -21,8 +20,11 @@ import ovs.poller import ovs.reconnect import ovs.stream import ovs.timeval +import ovs.util +import ovs.vlog -EOF = -1 +EOF = ovs.util.EOF +vlog = ovs.vlog.Vlog("jsonrpc") class Message(object): @@ -184,6 +186,7 @@ class Connection(object): self.input = "" self.output = "" self.parser = None + self.received_bytes = 0 def close(self): self.stream.close() @@ -199,8 +202,8 @@ class Connection(object): self.output = self.output[retval:] else: if retval != -errno.EAGAIN: - logging.warn("%s: send error: %s" % (self.name, - os.strerror(-retval))) + vlog.warn("%s: send error: %s" % + (self.name, os.strerror(-retval))) self.error(-retval) break @@ -208,7 +211,7 @@ class Connection(object): if not self.status: self.stream.run_wait(poller) if len(self.output): - self.stream.send_wait() + self.stream.send_wait(poller) def get_status(self): return self.status @@ -219,8 +222,11 @@ class Connection(object): else: return len(self.output) + def get_received_bytes(self): + return self.received_bytes + def __log_msg(self, title, msg): - logging.debug("%s: %s %s" % (self.name, title, msg)) + vlog.dbg("%s: %s %s" % (self.name, title, msg)) def send(self, msg): if self.status: @@ -260,8 +266,8 @@ class Connection(object): return error, None else: # XXX rate-limit - logging.warning("%s: receive error: %s" - % (self.name, os.strerror(error))) + vlog.warn("%s: receive error: %s" + % (self.name, os.strerror(error))) self.error(error) return self.status, None elif not data: @@ -269,6 +275,7 @@ class Connection(object): return EOF, None else: self.input += data + self.received_bytes += len(data) else: if self.parser is None: self.parser = ovs.json.Parser() @@ -300,7 +307,10 @@ class Connection(object): reply = None while not error: error, reply = self.recv_block() - if reply and reply.type == Message.T_REPLY and reply.id == id_: + if (reply + and (reply.type == Message.T_REPLY + or reply.type == Message.T_ERROR) + and reply.id == id_): break return error, reply @@ -309,15 +319,15 @@ class Connection(object): self.parser = None if type(json) in [str, unicode]: # XXX rate-limit - logging.warning("%s: error parsing stream: %s" % (self.name, json)) + vlog.warn("%s: error parsing stream: %s" % (self.name, json)) self.error(errno.EPROTO) return msg = Message.from_json(json) if not isinstance(msg, Message): # XXX rate-limit - logging.warning("%s: received bad JSON-RPC message: %s" - % (self.name, msg)) + vlog.warn("%s: received bad JSON-RPC message: %s" + % (self.name, msg)) self.error(errno.EPROTO) return @@ -367,6 +377,9 @@ class Session(object): if ovs.stream.PassiveStream.is_valid_name(name): reconnect.set_passive(True, ovs.timeval.msec()) + if ovs.stream.stream_or_pstream_needs_probes(name): + reconnect.set_probe_interval(0) + return Session(reconnect, None) @staticmethod @@ -425,8 +438,8 @@ class Session(object): if error == 0: if self.rpc or self.stream: # XXX rate-limit - logging.info("%s: new connection replacing active " - "connection" % self.reconnect.get_name()) + vlog.info("%s: new connection replacing active " + "connection" % self.reconnect.get_name()) self.__disconnect() self.reconnect.connected(ovs.timeval.msec()) self.rpc = Connection(stream) @@ -436,7 +449,18 @@ class Session(object): self.pstream = None if self.rpc: + backlog = self.rpc.get_backlog() self.rpc.run() + if self.rpc.get_backlog() < backlog: + # Data previously caught in a queue was successfully sent (or + # there's an error, which we'll catch below). + # + # We don't count data that is successfully sent immediately as + # activity, because there's a lot of queuing downstream from + # us, which means that we can push a lot of data into a + # connection that has stalled and won't ever recover. + self.reconnect.activity(ovs.timeval.msec()) + error = self.rpc.get_status() if error != 0: self.reconnect.disconnected(ovs.timeval.msec(), error) @@ -494,9 +518,17 @@ class Session(object): def recv(self): if self.rpc is not None: + received_bytes = self.rpc.get_received_bytes() error, msg = self.rpc.recv() + if received_bytes != self.rpc.get_received_bytes(): + # Data was successfully received. + # + # Previously we only counted receiving a full message as + # activity, but with large messages or a slow connection that + # policy could time out the session mid-message. + self.reconnect.activity(ovs.timeval.msec()) + if not error: - self.reconnect.received(ovs.timeval.msec()) if msg.type == Message.T_REQUEST and msg.method == "echo": # Echo request. Send reply. self.send(Message.create_reply(msg.params, msg.id))