stream: Add stream_run(), stream_run_wait() functions.
authorBen Pfaff <blp@nicira.com>
Wed, 6 Jan 2010 22:26:48 +0000 (14:26 -0800)
committerBen Pfaff <blp@nicira.com>
Wed, 6 Jan 2010 22:26:48 +0000 (14:26 -0800)
SSL, which will be added in an upcoming commit, requires some background
processing, which is best done in a "run" function in our architecture.
This commit adds stream_run() and stream_run_wait() and calls to them from
the places where they will be required.

lib/jsonrpc.c
lib/stream-fd.c
lib/stream-provider.h
lib/stream-tcp.c
lib/stream-unix.c
lib/stream.c
lib/stream.h

index 0f535155ea62ff74039bb8a8b12f0e3c427c7567..ec9108f85d5692ec42311d6fe182e584f62727f7 100644 (file)
@@ -1,5 +1,5 @@
 /*
- * Copyright (c) 2009 Nicira Networks.
+ * Copyright (c) 2009, 2010 Nicira Networks.
  *
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -88,6 +88,7 @@ jsonrpc_run(struct jsonrpc *rpc)
         return;
     }
 
+    stream_run(rpc->stream);
     while (!queue_is_empty(&rpc->output)) {
         struct ofpbuf *buf = rpc->output.head;
         int retval;
@@ -113,8 +114,11 @@ jsonrpc_run(struct jsonrpc *rpc)
 void
 jsonrpc_wait(struct jsonrpc *rpc)
 {
-    if (!rpc->status && !queue_is_empty(&rpc->output)) {
-        stream_send_wait(rpc->stream);
+    if (!rpc->status) {
+        stream_run_wait(rpc->stream);
+        if (!queue_is_empty(&rpc->output)) {
+            stream_send_wait(rpc->stream);
+        }
     }
 }
 
@@ -721,7 +725,10 @@ jsonrpc_session_run(struct jsonrpc_session *s)
             jsonrpc_session_disconnect(s);
         }
     } else if (s->stream) {
-        int error = stream_connect(s->stream);
+        int error;
+
+        stream_run(s->stream);
+        error = stream_connect(s->stream);
         if (!error) {
             reconnect_connected(s->reconnect, time_msec());
             s->rpc = jsonrpc_open(s->stream);
@@ -763,6 +770,7 @@ jsonrpc_session_wait(struct jsonrpc_session *s)
     if (s->rpc) {
         jsonrpc_wait(s->rpc);
     } else if (s->stream) {
+        stream_run_wait(s->stream);
         stream_connect_wait(s->stream);
     }
     reconnect_wait(s->reconnect, time_msec());
index 46aa8e7386cf5cdd934a237a01c859cc3572c578..94c84340966e67994af4c7b831395fb870483aba 100644 (file)
@@ -139,6 +139,8 @@ static struct stream_class stream_fd_class = {
     fd_connect,                 /* connect */
     fd_recv,                    /* recv */
     fd_send,                    /* send */
+    NULL,                       /* run */
+    NULL,                       /* run_wait */
     fd_wait,                    /* wait */
 };
 \f
index 6beaab75e9e3bcd9c9fdf39af8a9fcabb0fb189b..872da3c7d5ad2018a14a8c8ce9549bab95cbebe6 100644 (file)
@@ -109,6 +109,18 @@ struct stream_class {
      * accepted for transmission, it should return -EAGAIN immediately. */
     ssize_t (*send)(struct stream *stream, const void *buffer, size_t n);
 
+    /* Allows 'stream' to perform maintenance activities, such as flushing
+     * output buffers.
+     *
+     * May be null if 'stream' doesn't have anything to do here. */
+    void (*run)(struct stream *stream);
+
+    /* Arranges for the poll loop to wake up when 'stream' needs to perform
+     * maintenance activities.
+     *
+     * May be null if 'stream' doesn't have anything to do here. */
+    void (*run_wait)(struct stream *stream);
+
     /* Arranges for the poll loop to wake up when 'stream' is ready to take an
      * action of the given 'type'. */
     void (*wait)(struct stream *stream, enum stream_wait_type type);
index 947be9f19828a07c7e8593a8995dd8713d37bdcf..bfcf35c7402eefc8e91bc3cb3281ad8c337eb810 100644 (file)
@@ -90,6 +90,8 @@ struct stream_class tcp_stream_class = {
     NULL,                       /* connect */
     NULL,                       /* recv */
     NULL,                       /* send */
+    NULL,                       /* run */
+    NULL,                       /* run_wait */
     NULL,                       /* wait */
 };
 \f
index 9046da154828852e5d529ede214324dfb760c32c..6ce7790b541dae2979b97670011b7d0a4bf4921a 100644 (file)
@@ -69,6 +69,8 @@ struct stream_class unix_stream_class = {
     NULL,                       /* connect */
     NULL,                       /* recv */
     NULL,                       /* send */
+    NULL,                       /* run */
+    NULL,                       /* run_wait */
     NULL,                       /* wait */
 };
 \f
index 337fb5c754977b356f0d96a5b3178d5caf8ee5f7..d4f5de29ee3586a3e3ba6cf0290fd85f0ff2b1d6 100644 (file)
@@ -1,5 +1,5 @@
 /*
- * Copyright (c) 2008, 2009 Nicira Networks.
+ * Copyright (c) 2008, 2009, 2010 Nicira Networks.
  *
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -66,7 +66,8 @@ check_stream_classes(void)
         struct stream_class *class = stream_classes[i];
         assert(class->name != NULL);
         assert(class->open != NULL);
-        if (class->close || class->recv || class->send || class->wait) {
+        if (class->close || class->recv || class->send || class->run
+            || class->run_wait || class->wait) {
             assert(class->close != NULL);
             assert(class->recv != NULL);
             assert(class->send != NULL);
@@ -166,6 +167,8 @@ stream_open_block(const char *name, struct stream **streamp)
 
     error = stream_open(name, &stream);
     while (error == EAGAIN) {
+        stream_run(stream);
+        stream_run_wait(stream);
         stream_connect_wait(stream);
         poll_block();
         error = stream_connect(stream);
@@ -312,6 +315,28 @@ stream_send(struct stream *stream, const void *buffer, size_t n)
             : (stream->class->send)(stream, buffer, n));
 }
 
+/* Allows 'stream' to perform maintenance activities, such as flushing
+ * output buffers. */
+void
+stream_run(struct stream *stream)
+{
+    if (stream->class->run) {
+        (stream->class->run)(stream);
+    }
+}
+
+/* Arranges for the poll loop to wake up when 'stream' needs to perform
+ * maintenance activities. */
+void
+stream_run_wait(struct stream *stream)
+{
+    if (stream->class->run_wait) {
+        (stream->class->run_wait)(stream);
+    }
+}
+
+/* Arranges for the poll loop to wake up when 'stream' is ready to take an
+ * action of the given 'type'. */
 void
 stream_wait(struct stream *stream, enum stream_wait_type wait)
 {
index 7a62a5a374d0284ac0142832b35941e160c92353..ae30b1031c127bec23a0e06668f9778409bc98a9 100644 (file)
@@ -42,6 +42,9 @@ int stream_connect(struct stream *);
 int stream_recv(struct stream *, void *buffer, size_t n);
 int stream_send(struct stream *, const void *buffer, size_t n);
 
+void stream_run(struct stream *);
+void stream_run_wait(struct stream *);
+
 enum stream_wait_type {
     STREAM_CONNECT,
     STREAM_RECV,