queue: Get rid of ovs_queue data structure.
authorBen Pfaff <blp@nicira.com>
Mon, 6 Dec 2010 18:03:31 +0000 (10:03 -0800)
committerBen Pfaff <blp@nicira.com>
Mon, 6 Dec 2010 18:03:31 +0000 (10:03 -0800)
ovs_queue doesn't seem very useful; it's just a singly-linked list.  It's
more generally useful to use a general-purpose "struct list" for lists of
packets, so this commit adds such a member to "struct ofpbuf" and shifts
the existing users to use it.

15 files changed:
lib/automake.mk
lib/dpif-netdev.c
lib/jsonrpc.c
lib/learning-switch.c
lib/learning-switch.h
lib/list.c
lib/list.h
lib/ofpbuf.c
lib/ofpbuf.h
lib/queue.c [deleted file]
lib/queue.h [deleted file]
lib/rconn.c
lib/rconn.h
ofproto/pinsched.c
utilities/ovs-controller.c

index 6d84a92121533f74f07166303a1c55017171d7d1..8fd47c05d00a5927099a74640d605c9478120b0f 100644 (file)
@@ -106,8 +106,6 @@ lib_libopenvswitch_a_SOURCES = \
        lib/port-array.h \
        lib/process.c \
        lib/process.h \
-       lib/queue.c \
-       lib/queue.h \
        lib/random.c \
        lib/random.h \
        lib/rconn.c \
index 9f78be44ab1cc5a8de5f93db4b005dc962a024b0..435f90900190fdaed73ef73f8187d5f8741dd3f9 100644 (file)
@@ -44,7 +44,6 @@
 #include "ofpbuf.h"
 #include "packets.h"
 #include "poll-loop.h"
-#include "queue.h"
 #include "shash.h"
 #include "timeval.h"
 #include "util.h"
@@ -70,7 +69,8 @@ struct dp_netdev {
     bool destroyed;
 
     bool drop_frags;            /* Drop all IP fragments, if true. */
-    struct ovs_queue queues[N_QUEUES]; /* Messages queued for dpif_recv(). */
+    struct list queues[N_QUEUES]; /* Contain ofpbufs queued for dpif_recv(). */
+    size_t queue_len[N_QUEUES]; /* Number of packets in each queue. */
     struct hmap flow_table;     /* Flow table. */
 
     /* Statistics. */
@@ -187,7 +187,7 @@ create_dp_netdev(const char *name, const struct dpif_class *class,
     dp->open_cnt = 0;
     dp->drop_frags = false;
     for (i = 0; i < N_QUEUES; i++) {
-        queue_init(&dp->queues[i]);
+        list_init(&dp->queues[i]);
     }
     hmap_init(&dp->flow_table);
     list_init(&dp->port_list);
@@ -244,7 +244,7 @@ dp_netdev_free(struct dp_netdev *dp)
         do_del_port(dp, port->port_no);
     }
     for (i = 0; i < N_QUEUES; i++) {
-        queue_destroy(&dp->queues[i]);
+        ofpbuf_list_delete(&dp->queues[i]);
     }
     hmap_destroy(&dp->flow_table);
     free(dp->name);
@@ -852,7 +852,7 @@ dpif_netdev_recv_set_mask(struct dpif *dpif, int listen_mask)
     }
 }
 
-static struct ovs_queue *
+static int
 find_nonempty_queue(struct dpif *dpif)
 {
     struct dpif_netdev *dpif_netdev = dpif_netdev_cast(dpif);
@@ -861,20 +861,24 @@ find_nonempty_queue(struct dpif *dpif)
     int i;
 
     for (i = 0; i < N_QUEUES; i++) {
-        struct ovs_queue *q = &dp->queues[i];
-        if (q->n && mask & (1u << i)) {
-            return q;
+        struct list *queue = &dp->queues[i];
+        if (!list_is_empty(queue) && mask & (1u << i)) {
+            return i;
         }
     }
-    return NULL;
+    return -1;
 }
 
 static int
 dpif_netdev_recv(struct dpif *dpif, struct ofpbuf **bufp)
 {
-    struct ovs_queue *q = find_nonempty_queue(dpif);
-    if (q) {
-        *bufp = queue_pop_head(q);
+    int queue_idx = find_nonempty_queue(dpif);
+    if (queue_idx >= 0) {
+        struct dp_netdev *dp = get_dp_netdev(dpif);
+
+        *bufp = ofpbuf_from_list(list_pop_front(&dp->queues[queue_idx]));
+        dp->queue_len[queue_idx]--;
+
         return 0;
     } else {
         return EAGAIN;
@@ -884,8 +888,7 @@ dpif_netdev_recv(struct dpif *dpif, struct ofpbuf **bufp)
 static void
 dpif_netdev_recv_wait(struct dpif *dpif)
 {
-    struct ovs_queue *q = find_nonempty_queue(dpif);
-    if (q) {
+    if (find_nonempty_queue(dpif) >= 0) {
         poll_immediate_wake();
     } else {
         /* No messages ready to be received, and dp_wait() will ensure that we
@@ -1128,12 +1131,11 @@ static int
 dp_netdev_output_control(struct dp_netdev *dp, const struct ofpbuf *packet,
                          int queue_no, int port_no, uint32_t arg)
 {
-    struct ovs_queue *q = &dp->queues[queue_no];
     struct odp_msg *header;
     struct ofpbuf *msg;
     size_t msg_size;
 
-    if (q->n >= MAX_QUEUE_LEN) {
+    if (dp->queue_len[queue_no] >= MAX_QUEUE_LEN) {
         dp->n_lost++;
         return ENOBUFS;
     }
@@ -1146,7 +1148,8 @@ dp_netdev_output_control(struct dp_netdev *dp, const struct ofpbuf *packet,
     header->port = port_no;
     header->arg = arg;
     ofpbuf_put(msg, packet->data, packet->size);
-    queue_push_tail(q, msg);
+    list_push_back(&dp->queues[queue_no], &msg->list_node);
+    dp->queue_len[queue_no]++;
 
     return 0;
 }
index a83dde9b5ae2b35f9ebc8b2a4bfa6f1940cf1f10..7c761ea0dcf5a736e59cb60ce0a4e436bc8e1be6 100644 (file)
@@ -28,7 +28,6 @@
 #include "list.h"
 #include "ofpbuf.h"
 #include "poll-loop.h"
-#include "queue.h"
 #include "reconnect.h"
 #include "stream.h"
 #include "timeval.h"
@@ -47,7 +46,7 @@ struct jsonrpc {
     struct jsonrpc_msg *received;
 
     /* Output. */
-    struct ovs_queue output;
+    struct list output;         /* Contains "struct ofpbuf"s. */
     size_t backlog;
 };
 
@@ -86,7 +85,7 @@ jsonrpc_open(struct stream *stream)
     rpc->name = xstrdup(stream_get_name(stream));
     rpc->stream = stream;
     byteq_init(&rpc->input);
-    queue_init(&rpc->output);
+    list_init(&rpc->output);
 
     return rpc;
 }
@@ -109,8 +108,8 @@ jsonrpc_run(struct jsonrpc *rpc)
     }
 
     stream_run(rpc->stream);
-    while (!queue_is_empty(&rpc->output)) {
-        struct ofpbuf *buf = rpc->output.head;
+    while (!list_is_empty(&rpc->output)) {
+        struct ofpbuf *buf = ofpbuf_from_list(rpc->output.next);
         int retval;
 
         retval = stream_send(rpc->stream, buf->data, buf->size);
@@ -118,7 +117,8 @@ jsonrpc_run(struct jsonrpc *rpc)
             rpc->backlog -= retval;
             ofpbuf_pull(buf, retval);
             if (!buf->size) {
-                ofpbuf_delete(queue_pop_head(&rpc->output));
+                list_remove(&buf->list_node);
+                ofpbuf_delete(buf);
             }
         } else {
             if (retval != -EAGAIN) {
@@ -136,7 +136,7 @@ jsonrpc_wait(struct jsonrpc *rpc)
 {
     if (!rpc->status) {
         stream_run_wait(rpc->stream);
-        if (!queue_is_empty(&rpc->output)) {
+        if (!list_is_empty(&rpc->output)) {
             stream_send_wait(rpc->stream);
         }
     }
@@ -215,10 +215,10 @@ jsonrpc_send(struct jsonrpc *rpc, struct jsonrpc_msg *msg)
     buf = xmalloc(sizeof *buf);
     ofpbuf_use(buf, s, length);
     buf->size = length;
-    queue_push_tail(&rpc->output, buf);
+    list_push_back(&rpc->output, &buf->list_node);
     rpc->backlog += length;
 
-    if (rpc->output.n == 1) {
+    if (rpc->backlog == length) {
         jsonrpc_run(rpc);
     }
     return rpc->status;
@@ -308,7 +308,7 @@ jsonrpc_send_block(struct jsonrpc *rpc, struct jsonrpc_msg *msg)
 
     for (;;) {
         jsonrpc_run(rpc);
-        if (queue_is_empty(&rpc->output) || rpc->status) {
+        if (list_is_empty(&rpc->output) || rpc->status) {
             return rpc->status;
         }
         jsonrpc_wait(rpc);
@@ -412,7 +412,7 @@ jsonrpc_cleanup(struct jsonrpc *rpc)
     jsonrpc_msg_destroy(rpc->received);
     rpc->received = NULL;
 
-    queue_clear(&rpc->output);
+    ofpbuf_list_delete(&rpc->output);
     rpc->backlog = 0;
 }
 \f
index 511096ab8e798639eecd996fe4ca305e3bb44e15..7b99a93385c39e4599931c716a019b55c650ba64 100644 (file)
@@ -34,7 +34,6 @@
 #include "ofp-util.h"
 #include "openflow/openflow.h"
 #include "poll-loop.h"
-#include "queue.h"
 #include "rconn.h"
 #include "shash.h"
 #include "timeval.h"
@@ -89,7 +88,6 @@ static packet_handler_func process_echo_request;
 struct lswitch *
 lswitch_create(struct rconn *rconn, const struct lswitch_config *cfg)
 {
-    const struct ofpbuf *b;
     struct lswitch *sw;
 
     sw = xzalloc(sizeof *sw);
@@ -127,10 +125,14 @@ lswitch_create(struct rconn *rconn, const struct lswitch_config *cfg)
     sw->queued = rconn_packet_counter_create();
     send_features_request(sw, rconn);
 
-    for (b = cfg->default_flows; b; b = b->next) {
-        queue_tx(sw, rconn, ofpbuf_clone(b));
-    }
+    if (cfg->default_flows) {
+        const struct ofpbuf *b;
 
+        LIST_FOR_EACH (b, list_node, cfg->default_flows) {
+            queue_tx(sw, rconn, ofpbuf_clone(b));
+        }
+    }
+    
     return sw;
 }
 
index d0892576adc15180b8cd6dad28620b7e8665960d..e5036906fdb32c7d83aeef36bf4a7606af7392da 100644 (file)
@@ -41,10 +41,10 @@ struct lswitch_config {
      * OFP_FLOW_PERMANENT: Set up permanent flows. */
     int max_idle;
 
-    /* Optionally, a chain of one or more OpenFlow messages to send to the
-     * switch at time of connection.  Presumably these will be OFPT_FLOW_MOD
-     * requests to set up the flow table. */
-    const struct ofpbuf *default_flows;
+    /* Optionally, a list of one or more "struct ofpbuf"s containing OpenFlow
+     * messages to send to the switch at time of connection.  Presumably these
+     * will be OFPT_FLOW_MOD requests to set up the flow table. */
+    const struct list *default_flows;
 
     /* The OpenFlow queue to use by default.  Use UINT32_MAX to avoid
      * specifying a particular queue. */
index 8aedd7573656cbc4beedd514019c42a855bdb686..4ffa8837756c36264094bf4729eea26faedc0eaf 100644 (file)
@@ -1,5 +1,5 @@
 /*
- * Copyright (c) 2008, 2009 Nicira Networks.
+ * Copyright (c) 2008, 2009, 2010 Nicira Networks.
  *
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -24,6 +24,14 @@ list_init(struct list *list)
     list->next = list->prev = list;
 }
 
+/* Initializes 'list' with pointers that will (probably) cause segfaults if
+ * dereferenced and, better yet, show up clearly in a debugger. */
+void
+list_poison(struct list *list)
+{
+    memset(list, 0xcc, sizeof *list);
+}
+
 /* Inserts 'elem' just before 'before'. */
 void
 list_insert(struct list *before, struct list *elem)
index 013f0488240ad7c9b6fa47ef795ae2351ce35a1f..ddb0e6592babd20e9108e6d22b7ace2dc34af7f4 100644 (file)
@@ -31,6 +31,7 @@ struct list {
 #define LIST_INITIALIZER(LIST) { LIST, LIST }
 
 void list_init(struct list *);
+void list_poison(struct list *);
 
 /* List insertion. */
 void list_insert(struct list *, struct list *);
index bf5567251066a12bb7f05e9c45339617b48e5fe3..3f18d29d0f83616d6ce06583effc5e94904c9616 100644 (file)
@@ -36,7 +36,7 @@ ofpbuf_use(struct ofpbuf *b, void *base, size_t allocated)
     b->allocated = allocated;
     b->size = 0;
     b->l2 = b->l3 = b->l4 = b->l7 = NULL;
-    b->next = NULL;
+    list_poison(&b->list_node);
     b->private_p = NULL;
 }
 
@@ -344,3 +344,16 @@ ofpbuf_to_string(const struct ofpbuf *b, size_t maxbytes)
     ds_put_hex_dump(&s, b->data, MIN(b->size, maxbytes), 0, false);
     return ds_cstr(&s);
 }
+
+/* Removes each of the "struct ofpbuf"s on 'list' from the list and frees
+ * them.  */
+void
+ofpbuf_list_delete(struct list *list)
+{
+    struct ofpbuf *b, *next;
+
+    LIST_FOR_EACH_SAFE (b, next, list_node, list) {
+        list_remove(&b->list_node);
+        ofpbuf_delete(b);
+    }
+}
index 5e20aab0b62f861fba49a0243ae3ff880d745385..7d106d888d551e38c26dead9e84019e3cf61d213 100644 (file)
@@ -18,6 +18,7 @@
 #define OFPBUF_H 1
 
 #include <stddef.h>
+#include "list.h"
 
 #ifdef  __cplusplus
 extern "C" {
@@ -37,7 +38,7 @@ struct ofpbuf {
     void *l4;                   /* Transport-level header. */
     void *l7;                   /* Application data. */
 
-    struct ofpbuf *next;        /* Next in a list of ofpbufs. */
+    struct list list_node;      /* Private list element for use by owner. */
     void *private_p;            /* Private pointer for use by owner. */
 };
 
@@ -80,6 +81,12 @@ void *ofpbuf_try_pull(struct ofpbuf *, size_t);
 
 char *ofpbuf_to_string(const struct ofpbuf *, size_t maxbytes);
 
+static inline struct ofpbuf *ofpbuf_from_list(const struct list *list)
+{
+    return CONTAINER_OF(list, struct ofpbuf, list_node);
+}
+void ofpbuf_list_delete(struct list *);
+
 #ifdef  __cplusplus
 }
 #endif
diff --git a/lib/queue.c b/lib/queue.c
deleted file mode 100644 (file)
index d204a46..0000000
+++ /dev/null
@@ -1,120 +0,0 @@
-/*
- * Copyright (c) 2008, 2009, 2010 Nicira Networks.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at:
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#include <config.h>
-#include "queue.h"
-#include <assert.h>
-#include "compiler.h"
-#include "leak-checker.h"
-#include "ofpbuf.h"
-
-static void check_queue(struct ovs_queue *q);
-
-/* Initializes 'q' as an empty packet queue. */
-void
-queue_init(struct ovs_queue *q)
-{
-    q->n = 0;
-    q->head = NULL;
-    q->tail = NULL;
-}
-
-/* Destroys 'q' and all of the packets that it contains. */
-void
-queue_destroy(struct ovs_queue *q)
-{
-    struct ofpbuf *cur, *next;
-    for (cur = q->head; cur != NULL; cur = next) {
-        next = cur->next;
-        ofpbuf_delete(cur);
-    }
-}
-
-/* Removes and destroys all of the packets in 'q', rendering it empty. */
-void
-queue_clear(struct ovs_queue *q)
-{
-    queue_destroy(q);
-    queue_init(q);
-}
-
-/* Advances the first packet in 'q' from 'q->head' to 'next', which should be
- * the second packet in the queue.
- *
- * The odd, unsafe interface here allows the first packet in the queue to be
- * passed to a function for possible consumption (and destruction) and only
- * dropped from the queue if that function actually accepts it. */
-void
-queue_advance_head(struct ovs_queue *q, struct ofpbuf *next)
-{
-    assert(q->n);
-    assert(q->head);
-    q->head = next;
-    if (q->head == NULL) {
-        q->tail = NULL;
-    }
-    q->n--;
-}
-
-/* Appends 'b' to the tail of 'q'. */
-void
-queue_push_tail(struct ovs_queue *q, struct ofpbuf *b)
-{
-    check_queue(q);
-    leak_checker_claim(b);
-
-    b->next = NULL;
-    if (q->n++) {
-        q->tail->next = b;
-    } else {
-        q->head = b;
-    }
-    q->tail = b;
-
-    check_queue(q);
-}
-
-/* Removes the first buffer from 'q', which must not be empty, and returns
- * it.  The caller must free the buffer (with ofpbuf_delete()) when it is no
- * longer needed. */
-struct ofpbuf *
-queue_pop_head(struct ovs_queue *q)
-{
-    struct ofpbuf *head = q->head;
-    queue_advance_head(q, head->next);
-    return head;
-}
-
-/* Checks the internal integrity of 'q'.  For use in debugging. */
-static void
-check_queue(struct ovs_queue *q OVS_UNUSED)
-{
-#if 0
-    struct ofpbuf *iter;
-    size_t n;
-
-    assert(q->n == 0
-           ? q->head == NULL && q->tail == NULL
-           : q->head != NULL && q->tail != NULL);
-
-    n = 0;
-    for (iter = q->head; iter != NULL; iter = iter->next) {
-        n++;
-        assert((iter->next != NULL) == (iter != q->tail));
-    }
-    assert(n == q->n);
-#endif
-}
diff --git a/lib/queue.h b/lib/queue.h
deleted file mode 100644 (file)
index e30b84c..0000000
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Copyright (c) 2008, 2009, 2010 Nicira Networks.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at:
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#ifndef QUEUE_H
-#define QUEUE_H 1
-
-#include <stdbool.h>
-#include <stddef.h>
-
-/* Packet queue. */
-struct ovs_queue {
-    int n;                      /* Number of queued packets. */
-    struct ofpbuf *head;        /* First queued packet, null if n == 0. */
-    struct ofpbuf *tail;        /* Last queued packet, null if n == 0. */
-};
-
-#define OVS_QUEUE_INITIALIZER { 0, NULL, NULL }
-
-void queue_init(struct ovs_queue *);
-void queue_destroy(struct ovs_queue *);
-void queue_clear(struct ovs_queue *);
-void queue_advance_head(struct ovs_queue *, struct ofpbuf *next);
-void queue_push_tail(struct ovs_queue *, struct ofpbuf *);
-struct ofpbuf *queue_pop_head(struct ovs_queue *);
-
-static inline bool queue_is_empty(const struct ovs_queue *q)
-{
-    return q->n == 0;
-}
-
-#endif /* queue.h */
index c21b7e849210e0189bf43b3d0010aa1d9e9337f2..61875764b8a3260ed1a7a8966765e5de48081cd6 100644 (file)
@@ -74,7 +74,7 @@ struct rconn {
     char *target;               /* vconn name, passed to vconn_open(). */
     bool reliable;
 
-    struct ovs_queue txq;
+    struct list txq;            /* Contains "struct ofpbuf"s. */
 
     int backoff;
     int max_backoff;
@@ -182,7 +182,7 @@ rconn_create(int probe_interval, int max_backoff)
     rc->target = xstrdup("void");
     rc->reliable = false;
 
-    queue_init(&rc->txq);
+    list_init(&rc->txq);
 
     rc->backoff = 0;
     rc->max_backoff = max_backoff ? max_backoff : 8;
@@ -318,7 +318,7 @@ rconn_destroy(struct rconn *rc)
         free(rc->target);
         vconn_close(rc->vconn);
         flush_queue(rc);
-        queue_destroy(&rc->txq);
+        ofpbuf_list_delete(&rc->txq);
         for (i = 0; i < rc->n_monitors; i++) {
             vconn_close(rc->monitors[i]);
         }
@@ -408,16 +408,16 @@ run_CONNECTING(struct rconn *rc)
 static void
 do_tx_work(struct rconn *rc)
 {
-    if (!rc->txq.n) {
+    if (list_is_empty(&rc->txq)) {
         return;
     }
-    while (rc->txq.n > 0) {
+    while (!list_is_empty(&rc->txq)) {
         int error = try_send(rc);
         if (error) {
             break;
         }
     }
-    if (!rc->txq.n) {
+    if (list_is_empty(&rc->txq)) {
         poll_immediate_wake();
     }
 }
@@ -521,7 +521,7 @@ rconn_run_wait(struct rconn *rc)
         poll_timer_wait_until(expires * 1000);
     }
 
-    if ((rc->state & (S_ACTIVE | S_IDLE)) && rc->txq.n) {
+    if ((rc->state & (S_ACTIVE | S_IDLE)) && !list_is_empty(&rc->txq)) {
         vconn_wait(rc->vconn, WAIT_SEND);
     }
 }
@@ -590,13 +590,13 @@ rconn_send(struct rconn *rc, struct ofpbuf *b,
         if (counter) {
             rconn_packet_counter_inc(counter);
         }
-        queue_push_tail(&rc->txq, b);
+        list_push_back(&rc->txq, &b->list_node);
 
         /* If the queue was empty before we added 'b', try to send some
          * packets.  (But if the queue had packets in it, it's because the
          * vconn is backlogged and there's no point in stuffing more into it
          * now.  We'll get back to that in rconn_run().) */
-        if (rc->txq.n == 1) {
+        if (rc->txq.next == &b->list_node) {
             try_send(rc);
         }
         return 0;
@@ -920,11 +920,18 @@ rconn_set_target__(struct rconn *rc, const char *target, const char *name)
 static int
 try_send(struct rconn *rc)
 {
-    int retval = 0;
-    struct ofpbuf *next = rc->txq.head->next;
-    struct rconn_packet_counter *counter = rc->txq.head->private_p;
-    retval = vconn_send(rc->vconn, rc->txq.head);
+    struct ofpbuf *msg = ofpbuf_from_list(rc->txq.next);
+    struct rconn_packet_counter *counter = msg->private_p;
+    int retval;
+
+    /* Eagerly remove 'msg' from the txq.  We can't remove it from the list
+     * after sending, if sending is successful, because it is then owned by the
+     * vconn, which might have freed it already. */
+    list_remove(&msg->list_node);
+
+    retval = vconn_send(rc->vconn, msg);
     if (retval) {
+        list_push_front(&rc->txq, &msg->list_node);
         if (retval != EAGAIN) {
             report_error(rc, retval);
             disconnect(rc, retval);
@@ -936,7 +943,6 @@ try_send(struct rconn *rc)
     if (counter) {
         rconn_packet_counter_dec(counter);
     }
-    queue_advance_head(&rc->txq, next);
     return 0;
 }
 
@@ -1009,11 +1015,11 @@ disconnect(struct rconn *rc, int error)
 static void
 flush_queue(struct rconn *rc)
 {
-    if (!rc->txq.n) {
+    if (list_is_empty(&rc->txq)) {
         return;
     }
-    while (rc->txq.n > 0) {
-        struct ofpbuf *b = queue_pop_head(&rc->txq);
+    while (!list_is_empty(&rc->txq)) {
+        struct ofpbuf *b = ofpbuf_from_list(list_pop_front(&rc->txq));
         struct rconn_packet_counter *counter = b->private_p;
         if (counter) {
             rconn_packet_counter_dec(counter);
index 4df2f34f9a94c9695f0f3f970233cb19ec2ac45d..47b211b29d41adb3094a14b7cdb6da768a769f49 100644 (file)
@@ -17,7 +17,6 @@
 #ifndef RCONN_H
 #define RCONN_H 1
 
-#include "queue.h"
 #include <stdbool.h>
 #include <stdint.h>
 #include <time.h>
index d749ee4e6f912cd71f1d75873cc45e94626c7533..29bd65e5c05b3baf8287de8a079095f8bf9d9b3f 100644 (file)
 #include "openflow/openflow.h"
 #include "poll-loop.h"
 #include "port-array.h"
-#include "queue.h"
 #include "random.h"
 #include "rconn.h"
 #include "status.h"
 #include "timeval.h"
 #include "vconn.h"
 
+struct pinqueue {
+    struct list packets;        /* Contains "struct ofpbuf"s. */
+    int n;                      /* Number of packets in 'packets'. */
+};
+
 struct pinsched {
     /* Client-supplied parameters. */
     int rate_limit;           /* Packets added to bucket per second. */
     int burst_limit;          /* Maximum token bucket size, in packets. */
 
     /* One queue per physical port. */
-    struct port_array queues;   /* Array of "struct ovs_queue *". */
+    struct port_array queues;   /* Array of "struct pinqueue *"s. */
     int n_queued;               /* Sum over queues[*].n. */
     unsigned int last_tx_port;  /* Last port checked in round-robin. */
 
@@ -64,11 +68,10 @@ struct pinsched {
 };
 
 static struct ofpbuf *
-dequeue_packet(struct pinsched *ps, struct ovs_queue *q,
-               unsigned int port_no)
+dequeue_packet(struct pinsched *ps, struct pinqueue *q, unsigned int port_no)
 {
-    struct ofpbuf *packet = queue_pop_head(q);
-    if (!q->n) {
+    struct ofpbuf *packet = ofpbuf_from_list(list_pop_front(&q->packets));
+    if (--q->n == 0) {
         free(q);
         port_array_delete(&ps->queues, port_no);
     }
@@ -80,11 +83,11 @@ dequeue_packet(struct pinsched *ps, struct ovs_queue *q,
 static void
 drop_packet(struct pinsched *ps)
 {
-    struct ovs_queue *longest;  /* Queue currently selected as longest. */
+    struct pinqueue *longest;   /* Queue currently selected as longest. */
     int n_longest;              /* # of queues of same length as 'longest'. */
     unsigned int longest_port_no;
     unsigned int port_no;
-    struct ovs_queue *q;
+    struct pinqueue *q;
 
     ps->n_queue_dropped++;
 
@@ -115,7 +118,7 @@ drop_packet(struct pinsched *ps)
 static struct ofpbuf *
 get_tx_packet(struct pinsched *ps)
 {
-    struct ovs_queue *q = port_array_next(&ps->queues, &ps->last_tx_port);
+    struct pinqueue *q = port_array_next(&ps->queues, &ps->last_tx_port);
     if (!q) {
         q = port_array_first(&ps->queues, &ps->last_tx_port);
     }
@@ -161,7 +164,7 @@ pinsched_send(struct pinsched *ps, uint16_t port_no,
         cb(packet, aux);
     } else {
         /* Otherwise queue it up for the periodic callback to drain out. */
-        struct ovs_queue *q;
+        struct pinqueue *q;
 
         /* We are called with a buffer obtained from dpif_recv() that has much
          * more allocated space than actual content most of the time.  Since
@@ -175,10 +178,12 @@ pinsched_send(struct pinsched *ps, uint16_t port_no,
         q = port_array_get(&ps->queues, port_no);
         if (!q) {
             q = xmalloc(sizeof *q);
-            queue_init(q);
+            list_init(&q->packets);
+            q->n = 0;
             port_array_set(&ps->queues, port_no, q);
         }
-        queue_push_tail(q, packet);
+        list_push_back(&q->packets, &packet->list_node);
+        q->n++;
         ps->n_queued++;
         ps->n_limited++;
     }
@@ -254,11 +259,11 @@ void
 pinsched_destroy(struct pinsched *ps)
 {
     if (ps) {
-        struct ovs_queue *queue;
+        struct pinqueue *queue;
         unsigned int port_no;
 
         PORT_ARRAY_FOR_EACH (queue, &ps->queues, port_no) {
-            queue_destroy(queue);
+            ofpbuf_list_delete(&queue->packets);
             free(queue);
         }
         port_array_destroy(&ps->queues);
index 1be84f78706ffd688d16180b9cf7e556f1793466..5675f8974738f674249dba772b2a3784e80068c9 100644 (file)
@@ -77,9 +77,9 @@ static uint32_t default_queue = UINT32_MAX;
 /* -Q, --port-queue: map from port name to port number (cast to void *). */
 static struct shash port_queues = SHASH_INITIALIZER(&port_queues);
 
-/* --with-flows: File with flows to send to switch, or null to not load
- * any default flows. */
-static struct ovs_queue default_flows = OVS_QUEUE_INITIALIZER;
+/* --with-flows: Flows to send to switch, or an empty list not to send any
+ * default flows. */
+static struct list default_flows = LIST_INITIALIZER(&default_flows);
 
 /* --unixctl: Name of unixctl socket, or null to use the default. */
 static char *unixctl_path = NULL;
@@ -229,7 +229,7 @@ new_switch(struct switch_ *sw, struct vconn *vconn)
                 : learn_macs ? LSW_LEARN
                 : LSW_FLOOD);
     cfg.max_idle = set_up_flows ? max_idle : -1;
-    cfg.default_flows = default_flows.head;
+    cfg.default_flows = &default_flows;
     cfg.default_queue = default_queue;
     cfg.port_queues = &port_queues;
     sw->lswitch = lswitch_create(sw->rconn, &cfg);
@@ -269,7 +269,7 @@ read_flow_file(const char *name)
     }
 
     while ((b = parse_ofp_add_flow_file(stream)) != NULL) {
-        queue_push_tail(&default_flows, b);
+        list_push_back(&default_flows, &b->list_node);
     }
 
     fclose(stream);