lib/port-array.h \
lib/process.c \
lib/process.h \
- lib/queue.c \
- lib/queue.h \
lib/random.c \
lib/random.h \
lib/rconn.c \
#include "ofpbuf.h"
#include "packets.h"
#include "poll-loop.h"
-#include "queue.h"
#include "shash.h"
#include "timeval.h"
#include "util.h"
bool destroyed;
bool drop_frags; /* Drop all IP fragments, if true. */
- struct ovs_queue queues[N_QUEUES]; /* Messages queued for dpif_recv(). */
+ struct list queues[N_QUEUES]; /* Contain ofpbufs queued for dpif_recv(). */
+ size_t queue_len[N_QUEUES]; /* Number of packets in each queue. */
struct hmap flow_table; /* Flow table. */
/* Statistics. */
dp->open_cnt = 0;
dp->drop_frags = false;
for (i = 0; i < N_QUEUES; i++) {
- queue_init(&dp->queues[i]);
+ list_init(&dp->queues[i]);
}
hmap_init(&dp->flow_table);
list_init(&dp->port_list);
do_del_port(dp, port->port_no);
}
for (i = 0; i < N_QUEUES; i++) {
- queue_destroy(&dp->queues[i]);
+ ofpbuf_list_delete(&dp->queues[i]);
}
hmap_destroy(&dp->flow_table);
free(dp->name);
}
}
-static struct ovs_queue *
+static int
find_nonempty_queue(struct dpif *dpif)
{
struct dpif_netdev *dpif_netdev = dpif_netdev_cast(dpif);
int i;
for (i = 0; i < N_QUEUES; i++) {
- struct ovs_queue *q = &dp->queues[i];
- if (q->n && mask & (1u << i)) {
- return q;
+ struct list *queue = &dp->queues[i];
+ if (!list_is_empty(queue) && mask & (1u << i)) {
+ return i;
}
}
- return NULL;
+ return -1;
}
static int
dpif_netdev_recv(struct dpif *dpif, struct ofpbuf **bufp)
{
- struct ovs_queue *q = find_nonempty_queue(dpif);
- if (q) {
- *bufp = queue_pop_head(q);
+ int queue_idx = find_nonempty_queue(dpif);
+ if (queue_idx >= 0) {
+ struct dp_netdev *dp = get_dp_netdev(dpif);
+
+ *bufp = ofpbuf_from_list(list_pop_front(&dp->queues[queue_idx]));
+ dp->queue_len[queue_idx]--;
+
return 0;
} else {
return EAGAIN;
static void
dpif_netdev_recv_wait(struct dpif *dpif)
{
- struct ovs_queue *q = find_nonempty_queue(dpif);
- if (q) {
+ if (find_nonempty_queue(dpif) >= 0) {
poll_immediate_wake();
} else {
/* No messages ready to be received, and dp_wait() will ensure that we
dp_netdev_output_control(struct dp_netdev *dp, const struct ofpbuf *packet,
int queue_no, int port_no, uint32_t arg)
{
- struct ovs_queue *q = &dp->queues[queue_no];
struct odp_msg *header;
struct ofpbuf *msg;
size_t msg_size;
- if (q->n >= MAX_QUEUE_LEN) {
+ if (dp->queue_len[queue_no] >= MAX_QUEUE_LEN) {
dp->n_lost++;
return ENOBUFS;
}
header->port = port_no;
header->arg = arg;
ofpbuf_put(msg, packet->data, packet->size);
- queue_push_tail(q, msg);
+ list_push_back(&dp->queues[queue_no], &msg->list_node);
+ dp->queue_len[queue_no]++;
return 0;
}
#include "list.h"
#include "ofpbuf.h"
#include "poll-loop.h"
-#include "queue.h"
#include "reconnect.h"
#include "stream.h"
#include "timeval.h"
struct jsonrpc_msg *received;
/* Output. */
- struct ovs_queue output;
+ struct list output; /* Contains "struct ofpbuf"s. */
size_t backlog;
};
rpc->name = xstrdup(stream_get_name(stream));
rpc->stream = stream;
byteq_init(&rpc->input);
- queue_init(&rpc->output);
+ list_init(&rpc->output);
return rpc;
}
}
stream_run(rpc->stream);
- while (!queue_is_empty(&rpc->output)) {
- struct ofpbuf *buf = rpc->output.head;
+ while (!list_is_empty(&rpc->output)) {
+ struct ofpbuf *buf = ofpbuf_from_list(rpc->output.next);
int retval;
retval = stream_send(rpc->stream, buf->data, buf->size);
rpc->backlog -= retval;
ofpbuf_pull(buf, retval);
if (!buf->size) {
- ofpbuf_delete(queue_pop_head(&rpc->output));
+ list_remove(&buf->list_node);
+ ofpbuf_delete(buf);
}
} else {
if (retval != -EAGAIN) {
{
if (!rpc->status) {
stream_run_wait(rpc->stream);
- if (!queue_is_empty(&rpc->output)) {
+ if (!list_is_empty(&rpc->output)) {
stream_send_wait(rpc->stream);
}
}
buf = xmalloc(sizeof *buf);
ofpbuf_use(buf, s, length);
buf->size = length;
- queue_push_tail(&rpc->output, buf);
+ list_push_back(&rpc->output, &buf->list_node);
rpc->backlog += length;
- if (rpc->output.n == 1) {
+ if (rpc->backlog == length) {
jsonrpc_run(rpc);
}
return rpc->status;
for (;;) {
jsonrpc_run(rpc);
- if (queue_is_empty(&rpc->output) || rpc->status) {
+ if (list_is_empty(&rpc->output) || rpc->status) {
return rpc->status;
}
jsonrpc_wait(rpc);
jsonrpc_msg_destroy(rpc->received);
rpc->received = NULL;
- queue_clear(&rpc->output);
+ ofpbuf_list_delete(&rpc->output);
rpc->backlog = 0;
}
\f
#include "ofp-util.h"
#include "openflow/openflow.h"
#include "poll-loop.h"
-#include "queue.h"
#include "rconn.h"
#include "shash.h"
#include "timeval.h"
struct lswitch *
lswitch_create(struct rconn *rconn, const struct lswitch_config *cfg)
{
- const struct ofpbuf *b;
struct lswitch *sw;
sw = xzalloc(sizeof *sw);
sw->queued = rconn_packet_counter_create();
send_features_request(sw, rconn);
- for (b = cfg->default_flows; b; b = b->next) {
- queue_tx(sw, rconn, ofpbuf_clone(b));
- }
+ if (cfg->default_flows) {
+ const struct ofpbuf *b;
+ LIST_FOR_EACH (b, list_node, cfg->default_flows) {
+ queue_tx(sw, rconn, ofpbuf_clone(b));
+ }
+ }
+
return sw;
}
* OFP_FLOW_PERMANENT: Set up permanent flows. */
int max_idle;
- /* Optionally, a chain of one or more OpenFlow messages to send to the
- * switch at time of connection. Presumably these will be OFPT_FLOW_MOD
- * requests to set up the flow table. */
- const struct ofpbuf *default_flows;
+ /* Optionally, a list of one or more "struct ofpbuf"s containing OpenFlow
+ * messages to send to the switch at time of connection. Presumably these
+ * will be OFPT_FLOW_MOD requests to set up the flow table. */
+ const struct list *default_flows;
/* The OpenFlow queue to use by default. Use UINT32_MAX to avoid
* specifying a particular queue. */
/*
- * Copyright (c) 2008, 2009 Nicira Networks.
+ * Copyright (c) 2008, 2009, 2010 Nicira Networks.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
list->next = list->prev = list;
}
+/* Initializes 'list' with pointers that will (probably) cause segfaults if
+ * dereferenced and, better yet, show up clearly in a debugger. */
+void
+list_poison(struct list *list)
+{
+ memset(list, 0xcc, sizeof *list);
+}
+
/* Inserts 'elem' just before 'before'. */
void
list_insert(struct list *before, struct list *elem)
#define LIST_INITIALIZER(LIST) { LIST, LIST }
void list_init(struct list *);
+void list_poison(struct list *);
/* List insertion. */
void list_insert(struct list *, struct list *);
b->allocated = allocated;
b->size = 0;
b->l2 = b->l3 = b->l4 = b->l7 = NULL;
- b->next = NULL;
+ list_poison(&b->list_node);
b->private_p = NULL;
}
ds_put_hex_dump(&s, b->data, MIN(b->size, maxbytes), 0, false);
return ds_cstr(&s);
}
+
+/* Removes each of the "struct ofpbuf"s on 'list' from the list and frees
+ * them. */
+void
+ofpbuf_list_delete(struct list *list)
+{
+ struct ofpbuf *b, *next;
+
+ LIST_FOR_EACH_SAFE (b, next, list_node, list) {
+ list_remove(&b->list_node);
+ ofpbuf_delete(b);
+ }
+}
#define OFPBUF_H 1
#include <stddef.h>
+#include "list.h"
#ifdef __cplusplus
extern "C" {
void *l4; /* Transport-level header. */
void *l7; /* Application data. */
- struct ofpbuf *next; /* Next in a list of ofpbufs. */
+ struct list list_node; /* Private list element for use by owner. */
void *private_p; /* Private pointer for use by owner. */
};
char *ofpbuf_to_string(const struct ofpbuf *, size_t maxbytes);
+static inline struct ofpbuf *ofpbuf_from_list(const struct list *list)
+{
+ return CONTAINER_OF(list, struct ofpbuf, list_node);
+}
+void ofpbuf_list_delete(struct list *);
+
#ifdef __cplusplus
}
#endif
+++ /dev/null
-/*
- * Copyright (c) 2008, 2009, 2010 Nicira Networks.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at:
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#include <config.h>
-#include "queue.h"
-#include <assert.h>
-#include "compiler.h"
-#include "leak-checker.h"
-#include "ofpbuf.h"
-
-static void check_queue(struct ovs_queue *q);
-
-/* Initializes 'q' as an empty packet queue. */
-void
-queue_init(struct ovs_queue *q)
-{
- q->n = 0;
- q->head = NULL;
- q->tail = NULL;
-}
-
-/* Destroys 'q' and all of the packets that it contains. */
-void
-queue_destroy(struct ovs_queue *q)
-{
- struct ofpbuf *cur, *next;
- for (cur = q->head; cur != NULL; cur = next) {
- next = cur->next;
- ofpbuf_delete(cur);
- }
-}
-
-/* Removes and destroys all of the packets in 'q', rendering it empty. */
-void
-queue_clear(struct ovs_queue *q)
-{
- queue_destroy(q);
- queue_init(q);
-}
-
-/* Advances the first packet in 'q' from 'q->head' to 'next', which should be
- * the second packet in the queue.
- *
- * The odd, unsafe interface here allows the first packet in the queue to be
- * passed to a function for possible consumption (and destruction) and only
- * dropped from the queue if that function actually accepts it. */
-void
-queue_advance_head(struct ovs_queue *q, struct ofpbuf *next)
-{
- assert(q->n);
- assert(q->head);
- q->head = next;
- if (q->head == NULL) {
- q->tail = NULL;
- }
- q->n--;
-}
-
-/* Appends 'b' to the tail of 'q'. */
-void
-queue_push_tail(struct ovs_queue *q, struct ofpbuf *b)
-{
- check_queue(q);
- leak_checker_claim(b);
-
- b->next = NULL;
- if (q->n++) {
- q->tail->next = b;
- } else {
- q->head = b;
- }
- q->tail = b;
-
- check_queue(q);
-}
-
-/* Removes the first buffer from 'q', which must not be empty, and returns
- * it. The caller must free the buffer (with ofpbuf_delete()) when it is no
- * longer needed. */
-struct ofpbuf *
-queue_pop_head(struct ovs_queue *q)
-{
- struct ofpbuf *head = q->head;
- queue_advance_head(q, head->next);
- return head;
-}
-
-/* Checks the internal integrity of 'q'. For use in debugging. */
-static void
-check_queue(struct ovs_queue *q OVS_UNUSED)
-{
-#if 0
- struct ofpbuf *iter;
- size_t n;
-
- assert(q->n == 0
- ? q->head == NULL && q->tail == NULL
- : q->head != NULL && q->tail != NULL);
-
- n = 0;
- for (iter = q->head; iter != NULL; iter = iter->next) {
- n++;
- assert((iter->next != NULL) == (iter != q->tail));
- }
- assert(n == q->n);
-#endif
-}
+++ /dev/null
-/*
- * Copyright (c) 2008, 2009, 2010 Nicira Networks.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at:
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#ifndef QUEUE_H
-#define QUEUE_H 1
-
-#include <stdbool.h>
-#include <stddef.h>
-
-/* Packet queue. */
-struct ovs_queue {
- int n; /* Number of queued packets. */
- struct ofpbuf *head; /* First queued packet, null if n == 0. */
- struct ofpbuf *tail; /* Last queued packet, null if n == 0. */
-};
-
-#define OVS_QUEUE_INITIALIZER { 0, NULL, NULL }
-
-void queue_init(struct ovs_queue *);
-void queue_destroy(struct ovs_queue *);
-void queue_clear(struct ovs_queue *);
-void queue_advance_head(struct ovs_queue *, struct ofpbuf *next);
-void queue_push_tail(struct ovs_queue *, struct ofpbuf *);
-struct ofpbuf *queue_pop_head(struct ovs_queue *);
-
-static inline bool queue_is_empty(const struct ovs_queue *q)
-{
- return q->n == 0;
-}
-
-#endif /* queue.h */
char *target; /* vconn name, passed to vconn_open(). */
bool reliable;
- struct ovs_queue txq;
+ struct list txq; /* Contains "struct ofpbuf"s. */
int backoff;
int max_backoff;
rc->target = xstrdup("void");
rc->reliable = false;
- queue_init(&rc->txq);
+ list_init(&rc->txq);
rc->backoff = 0;
rc->max_backoff = max_backoff ? max_backoff : 8;
free(rc->target);
vconn_close(rc->vconn);
flush_queue(rc);
- queue_destroy(&rc->txq);
+ ofpbuf_list_delete(&rc->txq);
for (i = 0; i < rc->n_monitors; i++) {
vconn_close(rc->monitors[i]);
}
static void
do_tx_work(struct rconn *rc)
{
- if (!rc->txq.n) {
+ if (list_is_empty(&rc->txq)) {
return;
}
- while (rc->txq.n > 0) {
+ while (!list_is_empty(&rc->txq)) {
int error = try_send(rc);
if (error) {
break;
}
}
- if (!rc->txq.n) {
+ if (list_is_empty(&rc->txq)) {
poll_immediate_wake();
}
}
poll_timer_wait_until(expires * 1000);
}
- if ((rc->state & (S_ACTIVE | S_IDLE)) && rc->txq.n) {
+ if ((rc->state & (S_ACTIVE | S_IDLE)) && !list_is_empty(&rc->txq)) {
vconn_wait(rc->vconn, WAIT_SEND);
}
}
if (counter) {
rconn_packet_counter_inc(counter);
}
- queue_push_tail(&rc->txq, b);
+ list_push_back(&rc->txq, &b->list_node);
/* If the queue was empty before we added 'b', try to send some
* packets. (But if the queue had packets in it, it's because the
* vconn is backlogged and there's no point in stuffing more into it
* now. We'll get back to that in rconn_run().) */
- if (rc->txq.n == 1) {
+ if (rc->txq.next == &b->list_node) {
try_send(rc);
}
return 0;
static int
try_send(struct rconn *rc)
{
- int retval = 0;
- struct ofpbuf *next = rc->txq.head->next;
- struct rconn_packet_counter *counter = rc->txq.head->private_p;
- retval = vconn_send(rc->vconn, rc->txq.head);
+ struct ofpbuf *msg = ofpbuf_from_list(rc->txq.next);
+ struct rconn_packet_counter *counter = msg->private_p;
+ int retval;
+
+ /* Eagerly remove 'msg' from the txq. We can't remove it from the list
+ * after sending, if sending is successful, because it is then owned by the
+ * vconn, which might have freed it already. */
+ list_remove(&msg->list_node);
+
+ retval = vconn_send(rc->vconn, msg);
if (retval) {
+ list_push_front(&rc->txq, &msg->list_node);
if (retval != EAGAIN) {
report_error(rc, retval);
disconnect(rc, retval);
if (counter) {
rconn_packet_counter_dec(counter);
}
- queue_advance_head(&rc->txq, next);
return 0;
}
static void
flush_queue(struct rconn *rc)
{
- if (!rc->txq.n) {
+ if (list_is_empty(&rc->txq)) {
return;
}
- while (rc->txq.n > 0) {
- struct ofpbuf *b = queue_pop_head(&rc->txq);
+ while (!list_is_empty(&rc->txq)) {
+ struct ofpbuf *b = ofpbuf_from_list(list_pop_front(&rc->txq));
struct rconn_packet_counter *counter = b->private_p;
if (counter) {
rconn_packet_counter_dec(counter);
#ifndef RCONN_H
#define RCONN_H 1
-#include "queue.h"
#include <stdbool.h>
#include <stdint.h>
#include <time.h>
#include "openflow/openflow.h"
#include "poll-loop.h"
#include "port-array.h"
-#include "queue.h"
#include "random.h"
#include "rconn.h"
#include "status.h"
#include "timeval.h"
#include "vconn.h"
+struct pinqueue {
+ struct list packets; /* Contains "struct ofpbuf"s. */
+ int n; /* Number of packets in 'packets'. */
+};
+
struct pinsched {
/* Client-supplied parameters. */
int rate_limit; /* Packets added to bucket per second. */
int burst_limit; /* Maximum token bucket size, in packets. */
/* One queue per physical port. */
- struct port_array queues; /* Array of "struct ovs_queue *". */
+ struct port_array queues; /* Array of "struct pinqueue *"s. */
int n_queued; /* Sum over queues[*].n. */
unsigned int last_tx_port; /* Last port checked in round-robin. */
};
static struct ofpbuf *
-dequeue_packet(struct pinsched *ps, struct ovs_queue *q,
- unsigned int port_no)
+dequeue_packet(struct pinsched *ps, struct pinqueue *q, unsigned int port_no)
{
- struct ofpbuf *packet = queue_pop_head(q);
- if (!q->n) {
+ struct ofpbuf *packet = ofpbuf_from_list(list_pop_front(&q->packets));
+ if (--q->n == 0) {
free(q);
port_array_delete(&ps->queues, port_no);
}
static void
drop_packet(struct pinsched *ps)
{
- struct ovs_queue *longest; /* Queue currently selected as longest. */
+ struct pinqueue *longest; /* Queue currently selected as longest. */
int n_longest; /* # of queues of same length as 'longest'. */
unsigned int longest_port_no;
unsigned int port_no;
- struct ovs_queue *q;
+ struct pinqueue *q;
ps->n_queue_dropped++;
static struct ofpbuf *
get_tx_packet(struct pinsched *ps)
{
- struct ovs_queue *q = port_array_next(&ps->queues, &ps->last_tx_port);
+ struct pinqueue *q = port_array_next(&ps->queues, &ps->last_tx_port);
if (!q) {
q = port_array_first(&ps->queues, &ps->last_tx_port);
}
cb(packet, aux);
} else {
/* Otherwise queue it up for the periodic callback to drain out. */
- struct ovs_queue *q;
+ struct pinqueue *q;
/* We are called with a buffer obtained from dpif_recv() that has much
* more allocated space than actual content most of the time. Since
q = port_array_get(&ps->queues, port_no);
if (!q) {
q = xmalloc(sizeof *q);
- queue_init(q);
+ list_init(&q->packets);
+ q->n = 0;
port_array_set(&ps->queues, port_no, q);
}
- queue_push_tail(q, packet);
+ list_push_back(&q->packets, &packet->list_node);
+ q->n++;
ps->n_queued++;
ps->n_limited++;
}
pinsched_destroy(struct pinsched *ps)
{
if (ps) {
- struct ovs_queue *queue;
+ struct pinqueue *queue;
unsigned int port_no;
PORT_ARRAY_FOR_EACH (queue, &ps->queues, port_no) {
- queue_destroy(queue);
+ ofpbuf_list_delete(&queue->packets);
free(queue);
}
port_array_destroy(&ps->queues);
/* -Q, --port-queue: map from port name to port number (cast to void *). */
static struct shash port_queues = SHASH_INITIALIZER(&port_queues);
-/* --with-flows: File with flows to send to switch, or null to not load
- * any default flows. */
-static struct ovs_queue default_flows = OVS_QUEUE_INITIALIZER;
+/* --with-flows: Flows to send to switch, or an empty list not to send any
+ * default flows. */
+static struct list default_flows = LIST_INITIALIZER(&default_flows);
/* --unixctl: Name of unixctl socket, or null to use the default. */
static char *unixctl_path = NULL;
: learn_macs ? LSW_LEARN
: LSW_FLOOD);
cfg.max_idle = set_up_flows ? max_idle : -1;
- cfg.default_flows = default_flows.head;
+ cfg.default_flows = &default_flows;
cfg.default_queue = default_queue;
cfg.port_queues = &port_queues;
sw->lswitch = lswitch_create(sw->rconn, &cfg);
}
while ((b = parse_ofp_add_flow_file(stream)) != NULL) {
- queue_push_tail(&default_flows, b);
+ list_push_back(&default_flows, &b->list_node);
}
fclose(stream);