jsonrpc: Treat receiving part of a message as activity.
authorBen Pfaff <blp@nicira.com>
Wed, 5 Sep 2012 20:34:35 +0000 (13:34 -0700)
committerBen Pfaff <blp@nicira.com>
Fri, 7 Sep 2012 17:50:21 +0000 (10:50 -0700)
Until now, the jsonrpc code has only counted receiving a full JSON-RPC
messages as activity.  This could theoretically time out, then, while a
very long message is in transit or if a slow link is involved.  This commit
changes this code to count receiving any part of a message as activity.

This isn't a problem for OpenFlow connections because OpenFlow messages are
at most 64 kB in size.

This problem hasn't actually been observed in practice.

Bug #12789.
Signed-off-by: Ben Pfaff <blp@nicira.com>
lib/jsonrpc.c
lib/jsonrpc.h
python/ovs/jsonrpc.py

index 27b46c6dcf7054e093e3496abe2993477228a926..0e1788cae2c37b28d29447af4d8e39a906d4a404 100644 (file)
@@ -178,6 +178,14 @@ jsonrpc_get_backlog(const struct jsonrpc *rpc)
     return rpc->status ? 0 : rpc->backlog;
 }
 
+/* Returns the number of bytes that have been received on 'rpc''s underlying
+ * stream.  (The value wraps around if it exceeds UINT_MAX.) */
+unsigned int
+jsonrpc_get_received_bytes(const struct jsonrpc *rpc)
+{
+    return rpc->input.head;
+}
+
 /* Returns 'rpc''s name, that is, the name returned by stream_get_name() for
  * the stream underlying 'rpc' when 'rpc' was created. */
 const char *
@@ -988,10 +996,21 @@ struct jsonrpc_msg *
 jsonrpc_session_recv(struct jsonrpc_session *s)
 {
     if (s->rpc) {
+        unsigned int received_bytes;
         struct jsonrpc_msg *msg;
+
+        received_bytes = jsonrpc_get_received_bytes(s->rpc);
         jsonrpc_recv(s->rpc, &msg);
-        if (msg) {
+        if (received_bytes != jsonrpc_get_received_bytes(s->rpc)) {
+            /* Data was successfully received.
+             *
+             * Previously we only counted receiving a full message as activity,
+             * but with large messages or a slow connection that policy could
+             * time out the session mid-message. */
             reconnect_activity(s->reconnect, time_msec());
+        }
+
+        if (msg) {
             if (msg->type == JSONRPC_REQUEST && !strcmp(msg->method, "echo")) {
                 /* Echo request.  Send reply. */
                 struct jsonrpc_msg *reply;
index cd78170cf9e3cb31f02b36a7e5eebca8249868b3..44ae06f2e444d63d89be989f61daab6305740e8a 100644 (file)
@@ -50,6 +50,7 @@ void jsonrpc_wait(struct jsonrpc *);
 
 int jsonrpc_get_status(const struct jsonrpc *);
 size_t jsonrpc_get_backlog(const struct jsonrpc *);
+unsigned int jsonrpc_get_received_bytes(const struct jsonrpc *);
 const char *jsonrpc_get_name(const struct jsonrpc *);
 
 int jsonrpc_send(struct jsonrpc *, struct jsonrpc_msg *);
index 0eda32d91676fde2e71f0c2510458adc822fdfdc..fa66aabac18bfabd8de8dedec1e4e292780e7cb2 100644 (file)
@@ -186,6 +186,7 @@ class Connection(object):
         self.input = ""
         self.output = ""
         self.parser = None
+        self.received_bytes = 0
 
     def close(self):
         self.stream.close()
@@ -221,6 +222,9 @@ class Connection(object):
         else:
             return len(self.output)
 
+    def get_received_bytes(self):
+        return self.received_bytes
+
     def __log_msg(self, title, msg):
         vlog.dbg("%s: %s %s" % (self.name, title, msg))
 
@@ -271,6 +275,7 @@ class Connection(object):
                     return EOF, None
                 else:
                     self.input += data
+                    self.received_bytes += len(data)
             else:
                 if self.parser is None:
                     self.parser = ovs.json.Parser()
@@ -444,7 +449,16 @@ class Session(object):
                 self.pstream = None
 
         if self.rpc:
+            received_bytes = self.rpc.get_received_bytes()
             self.rpc.run()
+            if received_bytes != self.rpc.get_received_bytes():
+                # Data was successfully received.
+                #
+                # Previously we only counted receiving a full message as
+                # activity, but with large messages or a slow connection that
+                # policy could time out the session mid-message.
+                self.reconnect.activity(ovs.timeval.msec())
+
             error = self.rpc.get_status()
             if error != 0:
                 self.reconnect.disconnected(ovs.timeval.msec(), error)