vswitchd: Work on flow statistics gathering.
authorBen Pfaff <blp@nicira.com>
Sat, 27 Dec 2008 00:48:27 +0000 (16:48 -0800)
committerBen Pfaff <blp@nicira.com>
Sat, 27 Dec 2008 00:48:27 +0000 (16:48 -0800)
lib/vconn.c
vswitchd/bridge.c
vswitchd/flowtrack.c
vswitchd/flowtrack.h

index 4361734ce1edff21cb2ed02b3a25bcd377faa91a..745cd7619564a33db4d45d4978898dd8ce3c076c 100644 (file)
@@ -1052,7 +1052,8 @@ flow_stats_first(struct flow_stats_iterator *iter,
                  const struct ofp_stats_reply *osr)
 {
     iter->pos = osr->body;
-    iter->end = osr->body + (ntohs(osr->header.length) + sizeof *osr);
+    iter->end = osr->body + (ntohs(osr->header.length)
+                             - offsetof(struct ofp_stats_reply, body));
     return flow_stats_next(iter);
 }
 
index 2dc6bb9a1257d3b4b3ad1ee39ed0a166eac9cddd..998208b0692d8daaa35f9c4b85f5a0f6970d583e 100644 (file)
 #include "netdev.h"
 #include "ofp-print.h"
 #include "ofpbuf.h"
+#include "poll-loop.h"
 #include "process.h"
 #include "rconn.h"
 #include "socket-util.h"
+#include "stats.h"
 #include "svec.h"
+#include "timeval.h"
 #include "util.h"
 #include "vconn.h"
+#include "xtoxll.h"
 
 #define THIS_MODULE VLM_bridge
 #include "vlog.h"
@@ -126,6 +130,11 @@ struct bridge {
     /* Flow tracking. */
     struct ft *ft;
     struct tag_set revalidate_set;
+
+    /* Flow statistics gathering. */
+    struct stats_mgr *stats_mgr;
+    struct stats_request *flow_rq; /* Current flow statistics request. */
+    time_t next_stats_request;
 };
 
 /* List of all bridges. */
@@ -149,7 +158,10 @@ static bool bridge_is_backlogged(const struct bridge *);
 static int bridge_fetch_dp_ifaces(struct bridge *, struct svec *iface_names);
 
 static void bridge_process_msg(struct bridge *, struct ofpbuf *);
-static void revalidate_flow(struct bridge *br, struct ft_flow *f);
+static void revalidate_flow(struct bridge *, struct ft_flow *);
+
+static void flowstats_run(struct bridge *);
+static void flowstats_wait(struct bridge *);
 
 static void port_create(struct bridge *, const char *name);
 static void port_reconfigure(struct port *);
@@ -317,6 +329,8 @@ bridge_wait(void)
         if (br->ml) {
             mac_learning_wait(br->ml);
         }
+        stats_mgr_wait(br->stats_mgr);
+        flowstats_wait(br);
     }
 }
 \f
@@ -349,6 +363,8 @@ bridge_create(const char *name)
 
     br->ft = ft_create();
 
+    br->stats_mgr = stats_mgr_create(br->rconn);
+
     /* Create kernel datapath. */
     for (;;) {
         /* Pick a datapath index.
@@ -489,6 +505,7 @@ start_secchan(struct bridge *br)
     }
     br->sc_state = SC_RUNNING;
     br->sent_config = false;
+    br->next_stats_request = time_now();
     return;
 
 error:
@@ -525,6 +542,8 @@ bridge_destroy(struct bridge *br)
             port_destroy(br->ports[i]);
         }
         ft_destroy(br->ft);
+        stats_request_destroy(br->flow_rq);
+        stats_mgr_destroy(br->stats_mgr);
         mac_learning_destroy(br->ml);
         free(br->ports);
         free(br);
@@ -571,29 +590,37 @@ if_up(const char *netdev_name)
     return retval;
 }
 
+static void
+send_set_config(struct bridge *br)
+{
+    struct ofp_switch_config *osc;
+    struct ofpbuf *msg;
+    int retval;
+
+    osc = make_openflow(sizeof *osc, OFPT_SET_CONFIG, &msg);
+    osc->flags = htons(OFPC_SEND_FLOW_EXP | OFPC_FRAG_NORMAL);
+    osc->miss_send_len = htons(OFP_DEFAULT_MISS_SEND_LEN);
+    retval = rconn_send(br->rconn, msg, &br->txqlen);
+    if (retval) {
+        ofpbuf_delete(msg);
+    }
+}
+
 static void
 bridge_run_one(struct bridge *br)
 {
     int iteration;
 
     rconn_run(br->rconn);
+
     if (rconn_is_connected(br->rconn) && !br->sent_config) {
-        struct ofp_switch_config *osc;
-        struct ofpbuf *msg;
-        int retval;
-
-        osc = make_openflow(sizeof *osc, OFPT_SET_CONFIG, &msg);
-        osc->flags = htons(OFPC_SEND_FLOW_EXP | OFPC_FRAG_NORMAL);
-        osc->miss_send_len = htons(OFP_DEFAULT_MISS_SEND_LEN);
-        retval = rconn_send(br->rconn, msg, &br->txqlen);
-        if (retval) {
-            ofpbuf_delete(msg);
-        } else {
-            br->sent_config = true;
-        }
+        send_set_config(br);
+        br->sent_config = true;
     }
 
     tag_set_init(&br->revalidate_set);
+
+    /* Now do the things that may want to revalidate flows. */
     for (iteration = 0; iteration < 50 && !bridge_is_backlogged(br);
          iteration++) {
         struct ofpbuf *msg = rconn_recv(br->rconn);
@@ -608,6 +635,9 @@ bridge_run_one(struct bridge *br)
     if (br->ml) {
         mac_learning_run(br->ml, &br->revalidate_set);
     }
+    flowstats_run(br);
+
+    /* Now revalidate any flows that need it. */
     if (!tag_set_is_empty(&br->revalidate_set)) {
         struct ft_flow *f, *next;
 
@@ -617,6 +647,8 @@ bridge_run_one(struct bridge *br)
             }
         }
     }
+
+    /* Start or restart secchan if necessary. */
     run_secchan(br);
 }
 
@@ -807,6 +839,10 @@ typedef void packet_handler_func(struct bridge *, void *);
 static packet_handler_func process_echo_request;
 static packet_handler_func process_packet_in;
 static packet_handler_func process_flow_expired;
+static packet_handler_func process_stats_reply;
+static packet_handler_func process_error_msg;
+
+static void flow_from_match(struct flow *, const struct ofp_match *);
 
 static void
 bridge_process_msg(struct bridge *br, struct ofpbuf *msg)
@@ -832,6 +868,14 @@ bridge_process_msg(struct bridge *br, struct ofpbuf *msg)
             OFPT_FLOW_EXPIRED,
             process_flow_expired
         },
+        {
+            OFPT_STATS_REPLY,
+            process_stats_reply
+        },
+        {
+            OFPT_ERROR,
+            process_error_msg
+        },
     };
     const size_t n_processors = ARRAY_SIZE(processors);
     const struct processor *p;
@@ -1010,7 +1054,7 @@ send_packets(struct bridge *br, const struct flow *flow,
         struct ft_flow *f;
         bool queue;
 
-        f = ft_lookup(br->ft, flow);
+        f = ft_lookup(br->ft, flow, flow_hash(flow, 0));
         if (f) {
             if (!ftd_equal(dsts, n_dsts, f->dsts, f->n_dsts)) {
                 ftf_set_dsts(f, dsts, n_dsts);
@@ -1036,9 +1080,12 @@ send_packets(struct bridge *br, const struct flow *flow,
                                                 actions_len);
             put_actions(dsts, n_dsts, ntohs(flow->dl_vlan), fbuf);
             queue_tx(br, fbuf);
+
+            /* The add_flow message will reset the byte counters. */
+            f->last_byte_count = f->byte_count = 0;
         }
     } else {
-        struct ft_flow *f = ft_lookup(br->ft, flow);
+        struct ft_flow *f = ft_lookup(br->ft, flow, flow_hash(flow, 0));
         if (f) {
             /* XXX delete flow from ft, queue delete-flow openflow message */
         }
@@ -1096,7 +1143,7 @@ process_flow(struct bridge *br, const struct flow *flow,
         queue_tx(br, make_add_flow(flow, pkt->buffer_id,
                                    br->flow_idle_time, 0));
 
-        f = ft_lookup(br->ft, flow);
+        f = ft_lookup(br->ft, flow, flow_hash(flow, 0));
         if (f) {
             ftf_set_dsts(f, NULL, 0);
             f->tags = tags;
@@ -1255,7 +1302,7 @@ process_packet_in(struct bridge *br, void *opi_)
     if (opi->reason == OFPR_NO_MATCH) {
         /* Delete any existing flow from the flow table.  It must not really be
          * there (otherwise we wouldn't be getting a packet-in). */
-        struct ft_flow *f = ft_lookup(br->ft, &flow);
+        struct ft_flow *f = ft_lookup(br->ft, &flow, flow_hash(&flow, 0));
         if (f) {
             ft_remove(br->ft, f);
             ftf_destroy(f);
@@ -1287,20 +1334,14 @@ process_flow_expired(struct bridge *br, void *ofe_)
         /* We don't use flows with wildcards, so there's nothing to do. */
         return;
     }
-    flow.nw_src = ofe->match.nw_src;
-    flow.nw_dst = ofe->match.nw_dst;
-    flow.in_port = ofe->match.in_port;
-    flow.dl_vlan = ofe->match.dl_vlan;
-    flow.dl_type = ofe->match.dl_type;
-    flow.tp_src = ofe->match.tp_src;
-    flow.tp_dst = ofe->match.tp_dst;
-    memcpy(flow.dl_src, ofe->match.dl_src, ETH_ADDR_LEN);
-    memcpy(flow.dl_dst, ofe->match.dl_dst, ETH_ADDR_LEN);
-    flow.nw_proto = ofe->match.nw_proto;
-    flow.reserved = 0;
-
-    f = ft_lookup(br->ft, &flow);
+    flow_from_match(&flow, &ofe->match);
+
+    f = ft_lookup(br->ft, &flow, flow_hash(&flow, 0));
     if (f) {
+        /* Update statistics. */
+        f->last_byte_count = f->byte_count;
+        f->byte_count = ntohll(ofe->byte_count);
+
         ft_remove(br->ft, f);
         ftf_destroy(f);
     } else if (VLOG_IS_DBG_ENABLED()) {
@@ -1308,6 +1349,175 @@ process_flow_expired(struct bridge *br, void *ofe_)
         VLOG_DBG_RL(&rl, "received flow expiration for flow not in table");
     }
 }
+
+static void
+process_stats_reply(struct bridge *br, void *osr)
+{
+    stats_mgr_receive_stats_reply(br->stats_mgr, osr);
+}
+
+static void
+process_error_msg(struct bridge *br, void *oem)
+{
+    stats_mgr_receive_error_msg(br->stats_mgr, oem);
+}
+
+static void
+flow_from_match(struct flow *flow, const struct ofp_match *match)
+{
+    flow->nw_src = match->nw_src;
+    flow->nw_dst = match->nw_dst;
+    flow->in_port = match->in_port;
+    flow->dl_vlan = match->dl_vlan;
+    flow->dl_type = match->dl_type;
+    flow->tp_src = match->tp_src;
+    flow->tp_dst = match->tp_dst;
+    memcpy(flow->dl_src, match->dl_src, ETH_ADDR_LEN);
+    memcpy(flow->dl_dst, match->dl_dst, ETH_ADDR_LEN);
+    flow->nw_proto = match->nw_proto;
+    flow->reserved = 0;
+}
+\f
+/* Flow statistics collection. */
+
+static void
+request_flow_stats(struct bridge *br)
+{
+    struct ofp_flow_stats_request *ofsr;
+    struct ofp_stats_request *osr;
+    struct ofpbuf *msg;
+
+    osr = make_openflow(sizeof *osr + sizeof *ofsr, OFPT_STATS_REQUEST, &msg);
+    osr->type = htons(OFPST_FLOW);
+    osr->flags = htons(0);
+    ofsr = (struct ofp_flow_stats_request *) osr->body;
+    ofsr->match.wildcards = htonl(OFPFW_ALL);
+    ofsr->table_id = 0xff;
+    ofsr->out_port = htons(OFPP_NONE);
+
+    stats_request_destroy(br->flow_rq);
+    br->flow_rq = stats_request_create(br->stats_mgr, msg);
+    br->next_stats_request = time_now() + 10;
+}
+
+static void
+flowstats_process(struct bridge *br)
+{
+    /* This ft_dst will never appear as a valid flow's action.  Thus, we can
+     * set it as a flow's action to force revalidation to update that flow. */
+    static const struct ft_dst invalid_dst = {0xffff, 0xffff};
+
+    tag_type flowstats_tag;
+    struct ofpbuf *msg;
+    struct ft *new_ft;
+    size_t n_tagged;
+
+    new_ft = ft_create();
+    flowstats_tag = tag_create_random();
+    n_tagged = 0;
+    while (stats_request_get_reply(br->flow_rq, &msg)) {
+        struct ofp_stats_reply *osr = msg->data;
+        const struct ofp_flow_stats *fs;
+        struct flow_stats_iterator i;
+
+        for (fs = flow_stats_first(&i, osr); fs; fs = flow_stats_next(&i)) {
+            size_t n_actions = ((ntohs(fs->length)
+                                 - offsetof(struct ofp_flow_stats, actions))
+                                / sizeof(struct ofp_action_header));
+            struct ft_flow *f;
+            struct flow flow;
+            size_t hash;
+
+            if (fs->match.wildcards != htonl(0)) {
+                /* XXX delete flow */
+                continue;
+            }
+            flow_from_match(&flow, &fs->match);
+            hash = flow_hash(&flow, 0);
+
+            f = ft_lookup(br->ft, &flow, hash);
+            if (f) {
+                /* Move from bt->ft to new_ft. */
+                ft_remove(br->ft, f);
+                hmap_insert_fast(&new_ft->flows, &f->node, f->node.hash);
+
+                /* Update statistics. */
+                f->last_byte_count = f->byte_count;
+                f->byte_count = ntohll(fs->byte_count);
+
+                /* Force the flow to be revalidated and replaced if its actions
+                 * aren't what we expect. */
+                if (!ftd_equal_actions(f->dsts, f->n_dsts,
+                                       fs->actions, n_actions,
+                                       ntohs(flow.dl_vlan))) {
+                    ftf_set_dsts(f, &invalid_dst, 1);
+                    f->tags |= flowstats_tag;
+                    n_tagged++;
+                }
+            } else {
+                static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 5);
+                f = ft_lookup(new_ft, &flow, hash);
+                if (!f) {
+                    VLOG_WARN_RL(&rl, "unexpected flow in flow table");
+                    f = ftf_create(&flow, &invalid_dst, 1, flowstats_tag);
+                    hmap_insert_fast(&new_ft->flows, &f->node, f->node.hash);
+                    f->tags |= flowstats_tag;
+                    n_tagged++;
+                    f->last_byte_count = f->byte_count = fs->byte_count;
+                } else {
+                    VLOG_WARN_RL(&rl, "duplicate flow in flow stats reply");
+                }
+            }
+        }
+        ofpbuf_delete(msg);
+    }
+    hmap_expand(&new_ft->flows);
+    hmap_swap(&br->ft->flows, &new_ft->flows);
+    ft_destroy(new_ft);
+    if (n_tagged) {
+        tag_set_add(&br->revalidate_set, flowstats_tag);
+    }
+}
+
+static void
+flowstats_run(struct bridge *br)
+{
+    stats_mgr_run(br->stats_mgr);
+    if (br->flow_rq) {
+        static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 5);
+
+        switch (stats_request_get_status(br->flow_rq)) {
+        case EAGAIN:
+            return;
+
+        case 0:
+            flowstats_process(br);
+            break;
+
+        default:
+            VLOG_WARN_RL(&rl, "flow stats request returned error: %s",
+                         strerror(stats_request_get_status(br->flow_rq)));
+            br->next_stats_request = time_now() + 1;
+            break;
+        }
+        stats_request_destroy(br->flow_rq);
+        br->flow_rq = NULL;
+    } else {
+        if (rconn_is_connected(br->rconn)
+            && time_now() >= br->next_stats_request) {
+            request_flow_stats(br);
+            br->next_stats_request = time_now() + 60;
+        }
+    }
+}
+
+static void
+flowstats_wait(struct bridge *br)
+{
+    if (!br->flow_rq && rconn_is_connected(br->rconn)) {
+        poll_timer_wait((br->next_stats_request - time_now()) * 1000);
+    }
+}
 \f
 /* Port functions. */
 
index d475050a5b612bae6323cdd66e8f5403f653dc45..bfcf7a7a272dc073329484baf59f43a7340cb5af 100644 (file)
 #include <config.h>
 #include "flowtrack.h"
 #include <inttypes.h>
+#include <netinet/in.h>
 #include <stdlib.h>
 #include "flow.h"
 #include "openflow/openflow.h"
 #include "util.h"
 
+static bool
+is_vlan_action(const struct ofp_action_header *oah, uint16_t vlan)
+{
+    if (vlan == htons(OFP_VLAN_NONE)) {
+        return (oah->type == htons(OFPAT_STRIP_VLAN)
+                && oah->len != htons(sizeof *oah));
+    } else {
+        const struct ofp_action_vlan_vid *oavv
+            = (const struct ofp_action_vlan_vid *) oah;
+        return (oavv->type == htons(OFPAT_SET_VLAN_VID)
+                && oavv->len == htons(sizeof *oavv)
+                && oavv->vlan_vid == htons(vlan));
+    }
+}
+
+static bool
+is_output_action(const struct ofp_action_header *oah, uint16_t port)
+{
+    const struct ofp_action_output *oao
+        = (const struct ofp_action_output *) oah;
+    return (oao->type == htons(OFPAT_OUTPUT)
+            && oao->len == htons(sizeof *oao)
+            && oao->port == htons(port));
+}
+
+bool
+ftd_equal_actions(const struct ft_dst *dsts, size_t n_dsts,
+                  const struct ofp_action_header *oahs, size_t n_oahs,
+                  uint16_t flow_vlan)
+{
+    const struct ft_dst *dst;
+    uint16_t vlan = flow_vlan;
+    for (dst = dsts; dst < &dsts[n_dsts]; dst++) {
+        if (dst->vlan != vlan) {
+            vlan = dst->vlan;
+            if (!n_oahs-- || !is_vlan_action(oahs++, vlan)) {
+                return false;
+            }
+        }
+        if (!n_oahs-- || !is_output_action(oahs++, dst->dp_ifidx)) {
+            return false;
+        }
+    }
+    return !n_oahs;
+}
+
 void
 ftd_print(const struct ft_dst *dsts, size_t n)
 {
@@ -64,6 +111,7 @@ ftf_create(const struct flow *flow,
     f->node.hash = flow_hash(&f->flow, 0);
     f->dsts = &f->one_dst;
     ftf_set_dsts(f, dsts, n_dsts);
+    f->last_byte_count = f->byte_count = 0;
     return f;
 }
 
@@ -116,13 +164,11 @@ ft_destroy(struct ft *ft)
 }
 
 struct ft_flow *
-ft_lookup(const struct ft *ft, const struct flow *target)
+ft_lookup(const struct ft *ft, const struct flow *target, size_t hash)
 {
     struct ft_flow *f;
-    size_t hash;
 
     assert(!target->reserved);
-    hash = flow_hash(target, 0);
     HMAP_FOR_EACH_WITH_HASH (f, struct ft_flow, node, hash, &ft->flows) {
         if (flow_equal(&f->flow, target)) {
             return f;
index 4c12cb17e03d4b589427f2b3b3a487a99e06f08a..ae1f3b877edd3e642e9a1bd6b7ab443f753d0673 100644 (file)
@@ -41,6 +41,8 @@
 #include "hmap.h"
 #include "tag.h"
 
+struct ofp_action_header;
+
 struct ft_dst {
     uint16_t vlan;
     uint16_t dp_ifidx;
@@ -48,6 +50,9 @@ struct ft_dst {
 
 static inline bool ftd_equal(const struct ft_dst *, size_t,
                              const struct ft_dst *, size_t);
+bool ftd_equal_actions(const struct ft_dst *, size_t,
+                       const struct ofp_action_header *, size_t n_oah,
+                       uint16_t flow_vlan);
 void ftd_print(const struct ft_dst *, size_t);
 
 static inline bool
@@ -62,6 +67,12 @@ struct ft_flow {
     tag_type tags;
     struct hmap_node node;
     struct flow flow;
+
+    /* Statistics. */
+    uint64_t last_byte_count;
+    uint64_t byte_count;
+
+    /* Actions. */
     struct ft_dst *dsts;
     size_t n_dsts;
     struct ft_dst one_dst;
@@ -79,7 +90,8 @@ struct ft {
 
 struct ft *ft_create(void);
 void ft_destroy(struct ft *);
-struct ft_flow *ft_lookup(const struct ft *, const struct flow *);
+void ft_swap(struct ft *, struct ft *);
+struct ft_flow *ft_lookup(const struct ft *, const struct flow *, size_t hash);
 void ft_remove(struct ft *, struct ft_flow *);
 void ft_insert(struct ft *, struct ft_flow *);