rconn: Add byte counting feature to rconn_packet_counter.
[openvswitch] / lib / rconn.c
index 7d0f4ce65ceb8d01d00d10192f8154ddb73ec898..1129a3b7a69207e6da6a873284a68ef57fd72904 100644 (file)
@@ -1,5 +1,5 @@
 /*
- * Copyright (c) 2008, 2009, 2010, 2011 Nicira Networks.
+ * Copyright (c) 2008, 2009, 2010, 2011, 2012 Nicira, Inc.
  *
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -119,8 +119,9 @@ struct rconn {
      *
      * We don't cache the local port, because that changes from one connection
      * attempt to the next. */
-    uint32_t local_ip, remote_ip;
-    uint16_t remote_port;
+    ovs_be32 local_ip, remote_ip;
+    ovs_be16 remote_port;
+    uint8_t dscp;
 
     /* Messages sent or received are copied to the monitor connections. */
 #define MAX_MONITORS 8
@@ -160,7 +161,7 @@ static bool rconn_logging_connection_attempts__(const struct rconn *);
  * The new rconn is initially unconnected.  Use rconn_connect() or
  * rconn_connect_unreliably() to connect it. */
 struct rconn *
-rconn_create(int probe_interval, int max_backoff)
+rconn_create(int probe_interval, int max_backoff, uint8_t dscp)
 {
     struct rconn *rc = xzalloc(sizeof *rc);
 
@@ -194,6 +195,7 @@ rconn_create(int probe_interval, int max_backoff)
     rc->total_time_connected = 0;
 
     rconn_set_probe_interval(rc, probe_interval);
+    rconn_set_dscp(rc, dscp);
 
     rc->n_monitors = 0;
 
@@ -218,6 +220,18 @@ rconn_get_max_backoff(const struct rconn *rc)
     return rc->max_backoff;
 }
 
+void
+rconn_set_dscp(struct rconn *rc, uint8_t dscp)
+{
+    rc->dscp = dscp;
+}
+
+uint8_t
+rconn_get_dscp(const struct rconn *rc)
+{
+    return rc->dscp;
+}
+
 void
 rconn_set_probe_interval(struct rconn *rc, int probe_interval)
 {
@@ -335,7 +349,7 @@ reconnect(struct rconn *rc)
         VLOG_INFO("%s: connecting...", rc->name);
     }
     rc->n_attempted_connections++;
-    retval = vconn_open(rc->target, OFP_VERSION, &rc->vconn);
+    retval = vconn_open(rc->target, OFP10_VERSION, &rc->vconn, rc->dscp);
     if (!retval) {
         rc->remote_ip = vconn_get_remote_ip(rc->vconn);
         rc->local_ip = vconn_get_local_ip(rc->vconn);
@@ -552,9 +566,8 @@ rconn_recv_wait(struct rconn *rc)
     }
 }
 
-/* Sends 'b' on 'rc'.  Returns 0 if successful (in which case 'b' is
- * destroyed), or ENOTCONN if 'rc' is not currently connected (in which case
- * the caller retains ownership of 'b').
+/* Sends 'b' on 'rc'.  Returns 0 if successful, or ENOTCONN if 'rc' is not
+ * currently connected.  Takes ownership of 'b'.
  *
  * If 'counter' is non-null, then 'counter' will be incremented while the
  * packet is in flight, then decremented when it has been sent (or discarded
@@ -574,7 +587,7 @@ rconn_send(struct rconn *rc, struct ofpbuf *b,
         copy_to_monitor(rc, b);
         b->private_p = counter;
         if (counter) {
-            rconn_packet_counter_inc(counter);
+            rconn_packet_counter_inc(counter, b->size);
         }
         list_push_back(&rc->txq, &b->list_node);
 
@@ -587,6 +600,7 @@ rconn_send(struct rconn *rc, struct ofpbuf *b,
         }
         return 0;
     } else {
+        ofpbuf_delete(b);
         return ENOTCONN;
     }
 }
@@ -608,10 +622,11 @@ rconn_send_with_limit(struct rconn *rc, struct ofpbuf *b,
                       struct rconn_packet_counter *counter, int queue_limit)
 {
     int retval;
-    retval = counter->n >= queue_limit ? EAGAIN : rconn_send(rc, b, counter);
+    retval = (counter->n_packets >= queue_limit
+              ? EAGAIN
+              : rconn_send(rc, b, counter));
     if (retval) {
         COVERAGE_INC(rconn_overflow);
-        ofpbuf_delete(b);
     }
     return retval;
 }
@@ -731,6 +746,14 @@ rconn_get_local_port(const struct rconn *rconn)
     return rconn->vconn ? vconn_get_local_port(rconn->vconn) : 0;
 }
 
+/* Returns the OpenFlow version negotiated with the peer, or -1 if there is
+ * currently no connection or if version negotiation is not yet complete. */
+int
+rconn_get_version(const struct rconn *rconn)
+{
+    return rconn->vconn ? vconn_get_version(rconn->vconn) : -1;
+}
+
 /* Returns the total number of packets successfully received by the underlying
  * vconn.  */
 unsigned int
@@ -840,12 +863,18 @@ rconn_get_last_error(const struct rconn *rc)
 {
     return rc->last_error;
 }
+
+/* Returns the number of messages queued for transmission on 'rc'. */
+unsigned int
+rconn_count_txqlen(const struct rconn *rc)
+{
+    return list_size(&rc->txq);
+}
 \f
 struct rconn_packet_counter *
 rconn_packet_counter_create(void)
 {
-    struct rconn_packet_counter *c = xmalloc(sizeof *c);
-    c->n = 0;
+    struct rconn_packet_counter *c = xzalloc(sizeof *c);
     c->ref_cnt = 1;
     return c;
 }
@@ -855,24 +884,32 @@ rconn_packet_counter_destroy(struct rconn_packet_counter *c)
 {
     if (c) {
         assert(c->ref_cnt > 0);
-        if (!--c->ref_cnt && !c->n) {
+        if (!--c->ref_cnt && !c->n_packets) {
             free(c);
         }
     }
 }
 
 void
-rconn_packet_counter_inc(struct rconn_packet_counter *c)
+rconn_packet_counter_inc(struct rconn_packet_counter *c, unsigned int n_bytes)
 {
-    c->n++;
+    c->n_packets++;
+    c->n_bytes += n_bytes;
 }
 
 void
-rconn_packet_counter_dec(struct rconn_packet_counter *c)
+rconn_packet_counter_dec(struct rconn_packet_counter *c, unsigned int n_bytes)
 {
-    assert(c->n > 0);
-    if (!--c->n && !c->ref_cnt) {
-        free(c);
+    assert(c->n_packets > 0);
+    assert(c->n_bytes >= n_bytes);
+
+    c->n_bytes -= n_bytes;
+    c->n_packets--;
+    if (!c->n_packets) {
+        assert(!c->n_bytes);
+        if (!c->ref_cnt) {
+            free(c);
+        }
     }
 }
 \f
@@ -899,6 +936,7 @@ static int
 try_send(struct rconn *rc)
 {
     struct ofpbuf *msg = ofpbuf_from_list(rc->txq.next);
+    unsigned int n_bytes = msg->size;
     struct rconn_packet_counter *counter = msg->private_p;
     int retval;
 
@@ -919,7 +957,7 @@ try_send(struct rconn *rc)
     COVERAGE_INC(rconn_sent);
     rc->packets_sent++;
     if (counter) {
-        rconn_packet_counter_dec(counter);
+        rconn_packet_counter_dec(counter, n_bytes);
     }
     return 0;
 }
@@ -999,7 +1037,7 @@ flush_queue(struct rconn *rc)
         struct ofpbuf *b = ofpbuf_from_list(list_pop_front(&rc->txq));
         struct rconn_packet_counter *counter = b->private_p;
         if (counter) {
-            rconn_packet_counter_dec(counter);
+            rconn_packet_counter_dec(counter, b->size);
         }
         COVERAGE_INC(rconn_discarded);
         ofpbuf_delete(b);