rconn: Add byte counting feature to rconn_packet_counter.
[openvswitch] / lib / rconn.c
index 6a86de6c9173320e5db2b47ebe6e2bf0316ede0f..1129a3b7a69207e6da6a873284a68ef57fd72904 100644 (file)
@@ -1,5 +1,5 @@
 /*
- * Copyright (c) 2008, 2009, 2010, 2011 Nicira Networks.
+ * Copyright (c) 2008, 2009, 2010, 2011, 2012 Nicira, Inc.
  *
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -104,16 +104,6 @@ struct rconn {
     time_t creation_time;
     unsigned long int total_time_connected;
 
-    /* If we can't connect to the peer, it could be for any number of reasons.
-     * Usually, one would assume it is because the peer is not running or
-     * because the network is partitioned.  But it could also be because the
-     * network topology has changed, in which case the upper layer will need to
-     * reassess it (in particular, obtain a new IP address via DHCP and find
-     * the new location of the controller).  We set this flag when we suspect
-     * that this could be the case. */
-    bool questionable_connectivity;
-    time_t last_questioned;
-
     /* Throughout this file, "probe" is shorthand for "inactivity probe".
      * When nothing has been received from the peer for a while, we send out
      * an echo request as an inactivity probe packet.  We should receive back
@@ -129,8 +119,9 @@ struct rconn {
      *
      * We don't cache the local port, because that changes from one connection
      * attempt to the next. */
-    uint32_t local_ip, remote_ip;
-    uint16_t remote_port;
+    ovs_be32 local_ip, remote_ip;
+    ovs_be16 remote_port;
+    uint8_t dscp;
 
     /* Messages sent or received are copied to the monitor connections. */
 #define MAX_MONITORS 8
@@ -149,7 +140,6 @@ static void reconnect(struct rconn *);
 static void report_error(struct rconn *, int error);
 static void disconnect(struct rconn *, int error);
 static void flush_queue(struct rconn *);
-static void question_connectivity(struct rconn *);
 static void copy_to_monitor(struct rconn *, const struct ofpbuf *);
 static bool is_connected_state(enum state);
 static bool is_admitted_msg(const struct ofpbuf *);
@@ -171,7 +161,7 @@ static bool rconn_logging_connection_attempts__(const struct rconn *);
  * The new rconn is initially unconnected.  Use rconn_connect() or
  * rconn_connect_unreliably() to connect it. */
 struct rconn *
-rconn_create(int probe_interval, int max_backoff)
+rconn_create(int probe_interval, int max_backoff, uint8_t dscp)
 {
     struct rconn *rc = xzalloc(sizeof *rc);
 
@@ -204,10 +194,8 @@ rconn_create(int probe_interval, int max_backoff)
     rc->creation_time = time_now();
     rc->total_time_connected = 0;
 
-    rc->questionable_connectivity = false;
-    rc->last_questioned = time_now();
-
     rconn_set_probe_interval(rc, probe_interval);
+    rconn_set_dscp(rc, dscp);
 
     rc->n_monitors = 0;
 
@@ -232,6 +220,18 @@ rconn_get_max_backoff(const struct rconn *rc)
     return rc->max_backoff;
 }
 
+void
+rconn_set_dscp(struct rconn *rc, uint8_t dscp)
+{
+    rc->dscp = dscp;
+}
+
+uint8_t
+rconn_get_dscp(const struct rconn *rc)
+{
+    return rc->dscp;
+}
+
 void
 rconn_set_probe_interval(struct rconn *rc, int probe_interval)
 {
@@ -349,7 +349,7 @@ reconnect(struct rconn *rc)
         VLOG_INFO("%s: connecting...", rc->name);
     }
     rc->n_attempted_connections++;
-    retval = vconn_open(rc->target, OFP_VERSION, &rc->vconn);
+    retval = vconn_open(rc->target, OFP10_VERSION, &rc->vconn, rc->dscp);
     if (!retval) {
         rc->remote_ip = vconn_get_remote_ip(rc->vconn);
         rc->local_ip = vconn_get_local_ip(rc->vconn);
@@ -464,7 +464,6 @@ static void
 run_IDLE(struct rconn *rc)
 {
     if (timed_out(rc)) {
-        question_connectivity(rc);
         VLOG_ERR("%s: no response to inactivity probe after %u "
                  "seconds, disconnecting",
                  rc->name, elapsed_in_this_state(rc));
@@ -567,9 +566,8 @@ rconn_recv_wait(struct rconn *rc)
     }
 }
 
-/* Sends 'b' on 'rc'.  Returns 0 if successful (in which case 'b' is
- * destroyed), or ENOTCONN if 'rc' is not currently connected (in which case
- * the caller retains ownership of 'b').
+/* Sends 'b' on 'rc'.  Returns 0 if successful, or ENOTCONN if 'rc' is not
+ * currently connected.  Takes ownership of 'b'.
  *
  * If 'counter' is non-null, then 'counter' will be incremented while the
  * packet is in flight, then decremented when it has been sent (or discarded
@@ -589,7 +587,7 @@ rconn_send(struct rconn *rc, struct ofpbuf *b,
         copy_to_monitor(rc, b);
         b->private_p = counter;
         if (counter) {
-            rconn_packet_counter_inc(counter);
+            rconn_packet_counter_inc(counter, b->size);
         }
         list_push_back(&rc->txq, &b->list_node);
 
@@ -602,6 +600,7 @@ rconn_send(struct rconn *rc, struct ofpbuf *b,
         }
         return 0;
     } else {
+        ofpbuf_delete(b);
         return ENOTCONN;
     }
 }
@@ -623,10 +622,11 @@ rconn_send_with_limit(struct rconn *rc, struct ofpbuf *b,
                       struct rconn_packet_counter *counter, int queue_limit)
 {
     int retval;
-    retval = counter->n >= queue_limit ? EAGAIN : rconn_send(rc, b, counter);
+    retval = (counter->n_packets >= queue_limit
+              ? EAGAIN
+              : rconn_send(rc, b, counter));
     if (retval) {
         COVERAGE_INC(rconn_overflow);
-        ofpbuf_delete(b);
     }
     return retval;
 }
@@ -715,7 +715,7 @@ rconn_failure_duration(const struct rconn *rconn)
 
 /* Returns the IP address of the peer, or 0 if the peer's IP address is not
  * known. */
-uint32_t
+ovs_be32
 rconn_get_remote_ip(const struct rconn *rconn)
 {
     return rconn->remote_ip;
@@ -723,7 +723,7 @@ rconn_get_remote_ip(const struct rconn *rconn)
 
 /* Returns the transport port of the peer, or 0 if the peer's port is not
  * known. */
-uint16_t
+ovs_be16
 rconn_get_remote_port(const struct rconn *rconn)
 {
     return rconn->remote_port;
@@ -732,7 +732,7 @@ rconn_get_remote_port(const struct rconn *rconn)
 /* Returns the IP address used to connect to the peer, or 0 if the
  * connection is not an IP-based protocol or if its IP address is not
  * known. */
-uint32_t
+ovs_be32
 rconn_get_local_ip(const struct rconn *rconn)
 {
     return rconn->local_ip;
@@ -740,26 +740,18 @@ rconn_get_local_ip(const struct rconn *rconn)
 
 /* Returns the transport port used to connect to the peer, or 0 if the
  * connection does not contain a port or if the port is not known. */
-uint16_t
+ovs_be16
 rconn_get_local_port(const struct rconn *rconn)
 {
     return rconn->vconn ? vconn_get_local_port(rconn->vconn) : 0;
 }
 
-/* If 'rconn' can't connect to the peer, it could be for any number of reasons.
- * Usually, one would assume it is because the peer is not running or because
- * the network is partitioned.  But it could also be because the network
- * topology has changed, in which case the upper layer will need to reassess it
- * (in particular, obtain a new IP address via DHCP and find the new location
- * of the controller).  When this appears that this might be the case, this
- * function returns true.  It also clears the questionability flag and prevents
- * it from being set again for some time. */
-bool
-rconn_is_connectivity_questionable(struct rconn *rconn)
+/* Returns the OpenFlow version negotiated with the peer, or -1 if there is
+ * currently no connection or if version negotiation is not yet complete. */
+int
+rconn_get_version(const struct rconn *rconn)
 {
-    bool questionable = rconn->questionable_connectivity;
-    rconn->questionable_connectivity = false;
-    return questionable;
+    return rconn->vconn ? vconn_get_version(rconn->vconn) : -1;
 }
 
 /* Returns the total number of packets successfully received by the underlying
@@ -871,12 +863,18 @@ rconn_get_last_error(const struct rconn *rc)
 {
     return rc->last_error;
 }
+
+/* Returns the number of messages queued for transmission on 'rc'. */
+unsigned int
+rconn_count_txqlen(const struct rconn *rc)
+{
+    return list_size(&rc->txq);
+}
 \f
 struct rconn_packet_counter *
 rconn_packet_counter_create(void)
 {
-    struct rconn_packet_counter *c = xmalloc(sizeof *c);
-    c->n = 0;
+    struct rconn_packet_counter *c = xzalloc(sizeof *c);
     c->ref_cnt = 1;
     return c;
 }
@@ -886,24 +884,32 @@ rconn_packet_counter_destroy(struct rconn_packet_counter *c)
 {
     if (c) {
         assert(c->ref_cnt > 0);
-        if (!--c->ref_cnt && !c->n) {
+        if (!--c->ref_cnt && !c->n_packets) {
             free(c);
         }
     }
 }
 
 void
-rconn_packet_counter_inc(struct rconn_packet_counter *c)
+rconn_packet_counter_inc(struct rconn_packet_counter *c, unsigned int n_bytes)
 {
-    c->n++;
+    c->n_packets++;
+    c->n_bytes += n_bytes;
 }
 
 void
-rconn_packet_counter_dec(struct rconn_packet_counter *c)
+rconn_packet_counter_dec(struct rconn_packet_counter *c, unsigned int n_bytes)
 {
-    assert(c->n > 0);
-    if (!--c->n && !c->ref_cnt) {
-        free(c);
+    assert(c->n_packets > 0);
+    assert(c->n_bytes >= n_bytes);
+
+    c->n_bytes -= n_bytes;
+    c->n_packets--;
+    if (!c->n_packets) {
+        assert(!c->n_bytes);
+        if (!c->ref_cnt) {
+            free(c);
+        }
     }
 }
 \f
@@ -930,6 +936,7 @@ static int
 try_send(struct rconn *rc)
 {
     struct ofpbuf *msg = ofpbuf_from_list(rc->txq.next);
+    unsigned int n_bytes = msg->size;
     struct rconn_packet_counter *counter = msg->private_p;
     int retval;
 
@@ -950,7 +957,7 @@ try_send(struct rconn *rc)
     COVERAGE_INC(rconn_sent);
     rc->packets_sent++;
     if (counter) {
-        rconn_packet_counter_dec(counter);
+        rconn_packet_counter_dec(counter, n_bytes);
     }
     return 0;
 }
@@ -1012,9 +1019,6 @@ disconnect(struct rconn *rc, int error)
         }
         rc->backoff_deadline = now + rc->backoff;
         state_transition(rc, S_BACKOFF);
-        if (now - rc->last_connected > 60) {
-            question_connectivity(rc);
-        }
     } else {
         rc->last_disconnected = time_now();
         rconn_disconnect(rc);
@@ -1033,7 +1037,7 @@ flush_queue(struct rconn *rc)
         struct ofpbuf *b = ofpbuf_from_list(list_pop_front(&rc->txq));
         struct rconn_packet_counter *counter = b->private_p;
         if (counter) {
-            rconn_packet_counter_dec(counter);
+            rconn_packet_counter_dec(counter, b->size);
         }
         COVERAGE_INC(rconn_discarded);
         ofpbuf_delete(b);
@@ -1080,16 +1084,6 @@ state_transition(struct rconn *rc, enum state state)
     rc->state_entered = time_now();
 }
 
-static void
-question_connectivity(struct rconn *rc)
-{
-    time_t now = time_now();
-    if (now - rc->last_questioned > 60) {
-        rc->questionable_connectivity = true;
-        rc->last_questioned = now;
-    }
-}
-
 static void
 copy_to_monitor(struct rconn *rc, const struct ofpbuf *b)
 {