Drop rconn's responsibility for limiting the tx queue.
[openvswitch] / secchan / secchan.c
index 6c0a70d25695da7d6ed3592a9c402a801416e506..e4f283d1ad17b822b0f5986df083c31acde3efec 100644 (file)
@@ -103,6 +103,7 @@ struct settings {
 struct half {
     struct rconn *rconn;
     struct buffer *rxbuf;
+    int n_txq;                  /* No. of packets queued for tx on 'rconn'. */
 };
 
 struct relay {
@@ -195,11 +196,11 @@ main(int argc, char *argv[])
     daemonize();
 
     /* Connect to datapath. */
-    local_rconn = rconn_create(1, 0, s.max_backoff);
+    local_rconn = rconn_create(0, s.max_backoff);
     rconn_connect(local_rconn, s.nl_name);
 
     /* Connect to controller. */
-    remote_rconn = rconn_create(1, s.probe_interval, s.max_backoff);
+    remote_rconn = rconn_create(s.probe_interval, s.max_backoff);
     if (s.controller_name) {
         retval = rconn_connect(remote_rconn, s.controller_name);
         if (retval == EAFNOSUPPORT) {
@@ -321,11 +322,11 @@ relay_accept(const struct settings *s, struct vconn *listen_vconn)
     }
 
     /* Create and return relay. */
-    r1 = rconn_create(1, 0, 0);
+    r1 = rconn_create(0, 0);
     rconn_connect_unreliably(r1, nl_name_without_subscription, new_local);
     free(nl_name_without_subscription);
 
-    r2 = rconn_create(1, 0, 0);
+    r2 = rconn_create(0, 0);
     rconn_connect_unreliably(r2, "passive", new_remote);
 
     return relay_create(r1, r2, true);
@@ -334,15 +335,9 @@ relay_accept(const struct settings *s, struct vconn *listen_vconn)
 static struct relay *
 relay_create(struct rconn *local, struct rconn *remote, bool is_mgmt_conn)
 {
-    struct relay *r;
-    int i;
-
-    r = xmalloc(sizeof *r);
+    struct relay *r = xcalloc(1, sizeof *r);
     r->halves[HALF_LOCAL].rconn = local;
     r->halves[HALF_REMOTE].rconn = remote;
-    for (i = 0; i < 2; i++) {
-        r->halves[i].rxbuf = NULL;
-    }
     r->is_mgmt_conn = is_mgmt_conn;
     return r;
 }
@@ -379,8 +374,9 @@ relay_run(struct relay *r, const struct hook hooks[], size_t n_hooks)
                 }
             }
 
-            if (this->rxbuf) {
-                int retval = rconn_send(peer->rconn, this->rxbuf);
+            if (this->rxbuf && !this->n_txq) {
+                int retval = rconn_send(peer->rconn, this->rxbuf,
+                                        &this->n_txq);
                 if (retval != EAGAIN) {
                     if (!retval) {
                         progress = true;
@@ -443,14 +439,13 @@ struct in_band_data {
     struct mac_learning *ml;
     struct netdev *of_device;
     uint8_t mac[ETH_ADDR_LEN];
+    int n_queued;
 };
 
 static void
-queue_tx(struct rconn *rc, struct buffer *b)
+queue_tx(struct rconn *rc, struct in_band_data *in_band, struct buffer *b)
 {
-    if (rconn_force_send(rc, b)) {
-        buffer_delete(b);
-    }
+    rconn_send_with_limit(rc, b, &in_band->n_queued, 10);
 }
 
 static const uint8_t *
@@ -462,7 +457,7 @@ get_controller_mac(struct netdev *netdev, struct rconn *controller)
 
     uint32_t last_ip = ip;
 
-    time_t now = time(0);
+    time_t now = time_now();
 
     ip = rconn_get_ip(controller);
     if (last_ip != ip || !next_refresh || now >= next_refresh) {
@@ -567,8 +562,8 @@ in_band_packet_cb(struct relay *r, int half, void *in_band_)
                && in_port == mac_learning_lookup(in_band->ml,
                                                  controller_mac)) {
         /* Drop controller traffic that arrives on the controller port. */
-        queue_tx(rc, make_add_flow(&flow, ntohl(opi->buffer_id),
-                                   in_band->s->max_idle, 0));
+        queue_tx(rc, in_band, make_add_flow(&flow, ntohl(opi->buffer_id),
+                                            in_band->s->max_idle, 0));
         return true;
     } else {
         return false;
@@ -576,12 +571,14 @@ in_band_packet_cb(struct relay *r, int half, void *in_band_)
 
     if (out_port != OFPP_FLOOD) {
         /* The output port is known, so add a new flow. */
-        queue_tx(rc, make_add_simple_flow(&flow, ntohl(opi->buffer_id),
-                                          out_port, in_band->s->max_idle));
+        queue_tx(rc, in_band,
+                 make_add_simple_flow(&flow, ntohl(opi->buffer_id),
+                                      out_port, in_band->s->max_idle));
 
         /* If the switch didn't buffer the packet, we need to send a copy. */
         if (ntohl(opi->buffer_id) == UINT32_MAX) {
-            queue_tx(rc, make_unbuffered_packet_out(&pkt, in_port, out_port));
+            queue_tx(rc, in_band,
+                     make_unbuffered_packet_out(&pkt, in_port, out_port));
         }
     } else {
         /* We don't know that MAC.  Send along the packet without setting up a
@@ -593,7 +590,7 @@ in_band_packet_cb(struct relay *r, int half, void *in_band_)
             b = make_buffered_packet_out(ntohl(opi->buffer_id),
                                          in_port, out_port);
         }
-        queue_tx(rc, b);
+        queue_tx(rc, in_band, b);
     }
     return true;
 }
@@ -604,7 +601,7 @@ in_band_hook_create(const struct settings *s)
     struct in_band_data *in_band;
     int retval;
 
-    in_band = xmalloc(sizeof *in_band);
+    in_band = xcalloc(1, sizeof *in_band);
     in_band->s = s;
     in_band->ml = mac_learning_create();
     retval = netdev_open(s->of_name, NETDEV_ETH_TYPE_NONE,