ofproto-dpif: Factor NetFlow active timeouts out of flow expiration.
authorBen Pfaff <blp@nicira.com>
Wed, 23 Nov 2011 00:46:05 +0000 (16:46 -0800)
committerBen Pfaff <blp@nicira.com>
Wed, 23 Nov 2011 21:19:53 +0000 (13:19 -0800)
NetFlow active timeouts were only mixed in with flow expiration for
convenience: both processes need to iterate all the facets.  But
an upcoming commit will change flow expiration to work in terms of
a new "subfacet" entity, so they will no longer fit together well.

This change could be seen as an optimization, since NetFlow active
timeouts don't ordinarily have to run as often as flow expiration,
especially when the flow expiration rate is stepped up due to a
large volume of flows.

ofproto/netflow.c
ofproto/netflow.h
ofproto/ofproto-dpif.c

index bf2e6287197034904875c82fa0826e81ee80329c..6e2ddb848c59f543537315dd6e2cfa0f2481e36c 100644 (file)
@@ -27,6 +27,7 @@
 #include "ofpbuf.h"
 #include "ofproto.h"
 #include "packets.h"
+#include "poll-loop.h"
 #include "socket-util.h"
 #include "timeval.h"
 #include "util.h"
@@ -99,6 +100,7 @@ struct netflow {
     uint32_t netflow_cnt;         /* Flow sequence number for NetFlow. */
     struct ofpbuf packet;         /* NetFlow packet being accumulated. */
     long long int active_timeout; /* Timeout for flows that are still active. */
+    long long int next_timeout;   /* Next scheduled active timeout. */
     long long int reconfig_time;  /* When we reconfigured the timeouts. */
 };
 
@@ -221,13 +223,33 @@ netflow_expire(struct netflow *nf, struct netflow_flow *nf_flow,
     nf_flow->tcp_flags = 0;
 }
 
-void
+/* Returns true if it's time to send out a round of NetFlow active timeouts,
+ * false otherwise. */
+bool
 netflow_run(struct netflow *nf)
 {
     if (nf->packet.size) {
         collectors_send(nf->collectors, nf->packet.data, nf->packet.size);
         nf->packet.size = 0;
     }
+
+    if (nf->active_timeout && time_msec() >= nf->next_timeout) {
+        nf->next_timeout = time_msec() + 1000;
+        return true;
+    } else {
+        return false;
+    }
+}
+
+void
+netflow_wait(struct netflow *nf)
+{
+    if (nf->active_timeout) {
+        poll_timer_wait_until(nf->next_timeout);
+    }
+    if (nf->packet.size) {
+        poll_immediate_wake();
+    }
 }
 
 int
@@ -253,6 +275,7 @@ netflow_set_options(struct netflow *nf,
     nf->active_timeout *= 1000;
     if (old_timeout != nf->active_timeout) {
         nf->reconfig_time = time_msec();
+        nf->next_timeout = time_msec();
     }
 
     return error;
@@ -261,7 +284,7 @@ netflow_set_options(struct netflow *nf,
 struct netflow *
 netflow_create(void)
 {
-    struct netflow *nf = xmalloc(sizeof *nf);
+    struct netflow *nf = xzalloc(sizeof *nf);
     nf->engine_type = 0;
     nf->engine_id = 0;
     nf->boot_time = time_msec();
index bf5bf45bf69006184b3d6107ec233504e654d662..daabbace5fe7898d0e9892c89a2eb692c2a13eed 100644 (file)
@@ -60,7 +60,9 @@ void netflow_destroy(struct netflow *);
 int netflow_set_options(struct netflow *, const struct netflow_options *);
 void netflow_expire(struct netflow *, struct netflow_flow *,
                     struct ofexpired *);
-void netflow_run(struct netflow *);
+
+bool netflow_run(struct netflow *);
+void netflow_wait(struct netflow *);
 
 void netflow_flow_init(struct netflow_flow *);
 void netflow_flow_clear(struct netflow_flow *);
index 9324de4062bd4b98911108d6b0106f828d2db72a..84f6abf26ecf813eb9163df1b4118b96d3cd2a09 100644 (file)
@@ -436,6 +436,9 @@ static void handle_miss_upcalls(struct ofproto_dpif *,
 /* Flow expiration. */
 static int expire(struct ofproto_dpif *);
 
+/* NetFlow. */
+static void send_netflow_active_timeouts(struct ofproto_dpif *);
+
 /* Utilities. */
 static int send_packet(struct ofproto_dpif *, uint32_t odp_port,
                        const struct ofpbuf *packet);
@@ -641,7 +644,9 @@ run(struct ofproto *ofproto_)
     }
 
     if (ofproto->netflow) {
-        netflow_run(ofproto->netflow);
+        if (netflow_run(ofproto->netflow)) {
+            send_netflow_active_timeouts(ofproto);
+        }
     }
     if (ofproto->sflow) {
         dpif_sflow_run(ofproto->sflow);
@@ -704,6 +709,9 @@ wait(struct ofproto *ofproto_)
     HMAP_FOR_EACH (bundle, hmap_node, &ofproto->bundles) {
         bundle_wait(bundle);
     }
+    if (ofproto->netflow) {
+        netflow_wait(ofproto->netflow);
+    }
     mac_learning_wait(ofproto->ml);
     stp_wait(ofproto);
     if (ofproto->need_revalidate) {
@@ -767,24 +775,6 @@ get_tables(struct ofproto *ofproto_, struct ofp_table_stats *ots)
                        htonll(s.n_hit + ofproto->n_matches));
 }
 
-static int
-set_netflow(struct ofproto *ofproto_,
-            const struct netflow_options *netflow_options)
-{
-    struct ofproto_dpif *ofproto = ofproto_dpif_cast(ofproto_);
-
-    if (netflow_options) {
-        if (!ofproto->netflow) {
-            ofproto->netflow = netflow_create();
-        }
-        return netflow_set_options(ofproto->netflow, netflow_options);
-    } else {
-        netflow_destroy(ofproto->netflow);
-        ofproto->netflow = NULL;
-        return 0;
-    }
-}
-
 static struct ofport *
 port_alloc(void)
 {
@@ -2698,29 +2688,6 @@ facet_max_idle(const struct ofproto_dpif *ofproto)
     return bucket * BUCKET_WIDTH;
 }
 
-static void
-facet_active_timeout(struct ofproto_dpif *ofproto, struct facet *facet)
-{
-    if (ofproto->netflow && !facet_is_controller_flow(facet) &&
-        netflow_active_timeout_expired(ofproto->netflow, &facet->nf_flow)) {
-        struct ofexpired expired;
-
-        if (facet->installed) {
-            struct dpif_flow_stats stats;
-
-            facet_put__(ofproto, facet, facet->actions, facet->actions_len,
-                        &stats);
-            facet_update_stats(ofproto, facet, &stats);
-        }
-
-        expired.flow = facet->flow;
-        expired.packet_count = facet->packet_count;
-        expired.byte_count = facet->byte_count;
-        expired.used = facet->used;
-        netflow_expire(ofproto->netflow, &facet->nf_flow, &expired);
-    }
-}
-
 static void
 expire_facets(struct ofproto_dpif *ofproto, int dp_max_idle)
 {
@@ -2728,7 +2695,6 @@ expire_facets(struct ofproto_dpif *ofproto, int dp_max_idle)
     struct facet *facet, *next_facet;
 
     HMAP_FOR_EACH_SAFE (facet, next_facet, hmap_node, &ofproto->facets) {
-        facet_active_timeout(ofproto, facet);
         if (facet->used < cutoff) {
             facet_remove(ofproto, facet);
         }
@@ -5139,6 +5105,26 @@ packet_out(struct ofproto *ofproto_, struct ofpbuf *packet,
     }
     return error;
 }
+\f
+/* NetFlow. */
+
+static int
+set_netflow(struct ofproto *ofproto_,
+            const struct netflow_options *netflow_options)
+{
+    struct ofproto_dpif *ofproto = ofproto_dpif_cast(ofproto_);
+
+    if (netflow_options) {
+        if (!ofproto->netflow) {
+            ofproto->netflow = netflow_create();
+        }
+        return netflow_set_options(ofproto->netflow, netflow_options);
+    } else {
+        netflow_destroy(ofproto->netflow);
+        ofproto->netflow = NULL;
+        return 0;
+    }
+}
 
 static void
 get_netflow_ids(const struct ofproto *ofproto_,
@@ -5148,6 +5134,39 @@ get_netflow_ids(const struct ofproto *ofproto_,
 
     dpif_get_netflow_ids(ofproto->dpif, engine_type, engine_id);
 }
+
+static void
+send_active_timeout(struct ofproto_dpif *ofproto, struct facet *facet)
+{
+    if (!facet_is_controller_flow(facet) &&
+        netflow_active_timeout_expired(ofproto->netflow, &facet->nf_flow)) {
+        struct ofexpired expired;
+
+        if (facet->installed) {
+            struct dpif_flow_stats stats;
+
+            facet_put__(ofproto, facet, facet->actions, facet->actions_len,
+                        &stats);
+            facet_update_stats(ofproto, facet, &stats);
+        }
+
+        expired.flow = facet->flow;
+        expired.packet_count = facet->packet_count;
+        expired.byte_count = facet->byte_count;
+        expired.used = facet->used;
+        netflow_expire(ofproto->netflow, &facet->nf_flow, &expired);
+    }
+}
+
+static void
+send_netflow_active_timeouts(struct ofproto_dpif *ofproto)
+{
+    struct facet *facet;
+
+    HMAP_FOR_EACH (facet, hmap_node, &ofproto->facets) {
+        send_active_timeout(ofproto, facet);
+    }
+}
 \f
 static struct ofproto_dpif *
 ofproto_dpif_lookup(const char *name)