secchan: Aggregate multiple NetFlow messages into a single packet.
authorBen Pfaff <blp@nicira.com>
Thu, 19 Mar 2009 00:12:25 +0000 (17:12 -0700)
committerBen Pfaff <blp@nicira.com>
Thu, 19 Mar 2009 00:12:25 +0000 (17:12 -0700)
Completes feature #1012.

secchan/netflow.c
secchan/netflow.h
secchan/ofproto.c

index b8e4d4379a5b87d1f53f76490c8e484d3d2e332e..f2edd58b95320c74b63539409fead3bc73b2c5f0 100644 (file)
@@ -40,6 +40,7 @@
 #include "cfg.h"
 #include "flow.h"
 #include "netflow.h"
+#include "ofpbuf.h"
 #include "ofproto.h"
 #include "packets.h"
 #include "socket-util.h"
@@ -110,6 +111,7 @@ struct netflow {
     int *fds;                     /* Sockets for NetFlow collectors. */
     size_t n_fds;                 /* Number of Netflow collectors. */
     uint32_t netflow_cnt;         /* Flow sequence number for NetFlow. */
+    struct ofpbuf packet;         /* NetFlow packet being accumulated. */
 };
 
 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 5);
@@ -172,78 +174,75 @@ open_collector(char *dst)
 void
 netflow_expire(struct netflow *nf, const struct ofexpired *expired)
 {
-    struct netflow_v5_header nf_hdr;
-    struct netflow_v5_record nf_rec;
+    struct netflow_v5_header *nf_hdr;
+    struct netflow_v5_record *nf_rec;
     struct timeval now;
-    int i;
 
     time_timeval(&now);
 
-    memset(&nf_hdr, 0, sizeof nf_hdr);
-    nf_hdr.version = htons(NETFLOW_V5_VERSION);
-    nf_hdr.count = htons(1);
-    nf_hdr.sysuptime = htonl(time_msec() - nf->boot_time);
-    nf_hdr.unix_secs = htonl(now.tv_sec);
-    nf_hdr.unix_nsecs = htonl(now.tv_usec * 1000);
-    nf_hdr.flow_seq = htonl(nf->netflow_cnt);
-    nf_hdr.engine_type = 0;
-    nf_hdr.engine_id = 0;
-    nf_hdr.sampling_interval = htons(0);
-
-    memset(&nf_rec, 0, sizeof nf_rec);
-    nf_rec.src_addr = expired->flow.nw_src;
-    nf_rec.dst_addr = expired->flow.nw_dst;
-    nf_rec.nexthop = htons(0);
-    nf_rec.input = htons(expired->flow.in_port);
-    nf_rec.output = htons(0);
-    nf_rec.packet_count = htonl(expired->packet_count);
-    nf_rec.byte_count = htonl(expired->byte_count);
-    nf_rec.init_time = htonl(expired->created - nf->boot_time);
-    nf_rec.used_time = htonl(MAX(expired->created, expired->used)
-                             - nf->boot_time);
+    if (!nf->packet.size) {
+        nf_hdr = ofpbuf_put_zeros(&nf->packet, sizeof *nf_hdr);
+        nf_hdr->version = htons(NETFLOW_V5_VERSION);
+        nf_hdr->count = htons(0);
+        nf_hdr->sysuptime = htonl(time_msec() - nf->boot_time);
+        nf_hdr->unix_secs = htonl(now.tv_sec);
+        nf_hdr->unix_nsecs = htonl(now.tv_usec * 1000);
+        nf_hdr->flow_seq = htonl(nf->netflow_cnt++);
+        nf_hdr->engine_type = 0;
+        nf_hdr->engine_id = 0;
+        nf_hdr->sampling_interval = htons(0);
+    }
 
+    nf_hdr = nf->packet.data;
+    nf_hdr->count = htons(ntohs(nf_hdr->count) + 1);
+
+    nf_rec = ofpbuf_put_zeros(&nf->packet, sizeof *nf_rec);
+    nf_rec->src_addr = expired->flow.nw_src;
+    nf_rec->dst_addr = expired->flow.nw_dst;
+    nf_rec->nexthop = htons(0);
+    nf_rec->input = htons(expired->flow.in_port);
+    nf_rec->output = htons(0);
+    nf_rec->packet_count = htonl(expired->packet_count);
+    nf_rec->byte_count = htonl(expired->byte_count);
+    nf_rec->init_time = htonl(expired->created - nf->boot_time);
+    nf_rec->used_time = htonl(MAX(expired->created, expired->used)
+                             - nf->boot_time);
     if (expired->flow.nw_proto == IP_TYPE_ICMP) {
         /* In NetFlow, the ICMP type and code are concatenated and
          * placed in the 'dst_port' field. */
         uint8_t type = ntohs(expired->flow.tp_src);
         uint8_t code = ntohs(expired->flow.tp_dst);
-        nf_rec.src_port = htons(0);
-        nf_rec.dst_port = htons((type << 8) | code);
+        nf_rec->src_port = htons(0);
+        nf_rec->dst_port = htons((type << 8) | code);
     } else {
-        nf_rec.src_port = expired->flow.tp_src;
-        nf_rec.dst_port = expired->flow.tp_dst;
+        nf_rec->src_port = expired->flow.tp_src;
+        nf_rec->dst_port = expired->flow.tp_dst;
+    }
+    nf_rec->tcp_flags = expired->tcp_flags;
+    nf_rec->ip_proto = expired->flow.nw_proto;
+    nf_rec->ip_tos = expired->ip_tos;
+
+    if (nf->packet.size >= 1400) {
+        netflow_run(nf);
     }
+}
 
-    nf_rec.tcp_flags = expired->tcp_flags;
-    nf_rec.ip_proto = expired->flow.nw_proto;
-    nf_rec.ip_tos = expired->ip_tos;
+void
+netflow_run(struct netflow *nf)
+{
+    size_t i;
 
-    nf_rec.src_as = htons(0);
-    nf_rec.dst_as = htons(0);
-    nf_rec.src_mask = 0;
-    nf_rec.dst_mask = 0;
+    if (!nf->packet.size) {
+        return;
+    }
 
     for (i = 0; i < nf->n_fds; i++) {
-        struct msghdr msghdr;
-        struct iovec iov[2];
-
-        iov[0].iov_base = &nf_hdr;
-        iov[0].iov_len = sizeof nf_hdr;
-        iov[1].iov_base = &nf_rec;
-        iov[1].iov_len = sizeof nf_rec;
-        msghdr.msg_name = NULL;
-        msghdr.msg_namelen = 0;
-        msghdr.msg_iov = iov;
-        msghdr.msg_iovlen = 2;
-        msghdr.msg_control = NULL;
-        msghdr.msg_controllen = 0;
-        msghdr.msg_flags = 0;
-        if (sendmsg(nf->fds[i], &msghdr, 0) < 0) {
+        if (send(nf->fds[i], nf->packet.data, nf->packet.size, 0)) {
             VLOG_WARN_RL(&rl, "netflow message send failed: %s",
                          strerror(errno));
         }
     }
-    nf->netflow_cnt++;
+    nf->packet.size = 0;
 }
 
 static void
@@ -300,6 +299,7 @@ netflow_create(void)
     nf->fds = NULL;
     nf->n_fds = 0;
     nf->netflow_cnt = 0;
+    ofpbuf_init(&nf->packet, 1500);
     return nf;
 }
 
@@ -307,6 +307,7 @@ void
 netflow_destroy(struct netflow *nf)
 {
     if (nf) {
+        ofpbuf_uninit(&nf->packet);
         clear_collectors(nf);
         free(nf);
     }
index a8b7076e2afc846f189b0effb05bf39cb28da62c..3e7f51c6e1eee0ef1ef96ab04d0a3fd199c76b59 100644 (file)
@@ -43,5 +43,6 @@ struct netflow *netflow_create(void);
 void netflow_destroy(struct netflow *);
 int netflow_set_collectors(struct netflow *, const struct svec *collectors);
 void netflow_expire(struct netflow *, const struct ofexpired *);
+void netflow_run(struct netflow *);
 
 #endif /* netflow.h */
index 979dfeaaa602719787dd48f6ee2db0c167b6bde0..cf82953b1689b61085d3255d8dba429e42a4c878 100644 (file)
@@ -843,6 +843,10 @@ ofproto_run1(struct ofproto *p)
         classifier_for_each(&p->cls, CLS_INC_EXACT, expire_rule, p);
     }
 
+    if (p->netflow) {
+        netflow_run(p->netflow);
+    }
+
     return 0;
 }