From 52bffe69ecdd59b4d596a0b3cae64b03b4d76e07 Mon Sep 17 00:00:00 2001 From: Ben Pfaff Date: Wed, 18 Mar 2009 17:12:25 -0700 Subject: [PATCH] secchan: Aggregate multiple NetFlow messages into a single packet. Completes feature #1012. --- secchan/netflow.c | 105 +++++++++++++++++++++++----------------------- secchan/netflow.h | 1 + secchan/ofproto.c | 4 ++ 3 files changed, 58 insertions(+), 52 deletions(-) diff --git a/secchan/netflow.c b/secchan/netflow.c index b8e4d437..f2edd58b 100644 --- a/secchan/netflow.c +++ b/secchan/netflow.c @@ -40,6 +40,7 @@ #include "cfg.h" #include "flow.h" #include "netflow.h" +#include "ofpbuf.h" #include "ofproto.h" #include "packets.h" #include "socket-util.h" @@ -110,6 +111,7 @@ struct netflow { int *fds; /* Sockets for NetFlow collectors. */ size_t n_fds; /* Number of Netflow collectors. */ uint32_t netflow_cnt; /* Flow sequence number for NetFlow. */ + struct ofpbuf packet; /* NetFlow packet being accumulated. */ }; static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 5); @@ -172,78 +174,75 @@ open_collector(char *dst) void netflow_expire(struct netflow *nf, const struct ofexpired *expired) { - struct netflow_v5_header nf_hdr; - struct netflow_v5_record nf_rec; + struct netflow_v5_header *nf_hdr; + struct netflow_v5_record *nf_rec; struct timeval now; - int i; time_timeval(&now); - memset(&nf_hdr, 0, sizeof nf_hdr); - nf_hdr.version = htons(NETFLOW_V5_VERSION); - nf_hdr.count = htons(1); - nf_hdr.sysuptime = htonl(time_msec() - nf->boot_time); - nf_hdr.unix_secs = htonl(now.tv_sec); - nf_hdr.unix_nsecs = htonl(now.tv_usec * 1000); - nf_hdr.flow_seq = htonl(nf->netflow_cnt); - nf_hdr.engine_type = 0; - nf_hdr.engine_id = 0; - nf_hdr.sampling_interval = htons(0); - - memset(&nf_rec, 0, sizeof nf_rec); - nf_rec.src_addr = expired->flow.nw_src; - nf_rec.dst_addr = expired->flow.nw_dst; - nf_rec.nexthop = htons(0); - nf_rec.input = htons(expired->flow.in_port); - nf_rec.output = htons(0); - nf_rec.packet_count = htonl(expired->packet_count); - nf_rec.byte_count = htonl(expired->byte_count); - nf_rec.init_time = htonl(expired->created - nf->boot_time); - nf_rec.used_time = htonl(MAX(expired->created, expired->used) - - nf->boot_time); + if (!nf->packet.size) { + nf_hdr = ofpbuf_put_zeros(&nf->packet, sizeof *nf_hdr); + nf_hdr->version = htons(NETFLOW_V5_VERSION); + nf_hdr->count = htons(0); + nf_hdr->sysuptime = htonl(time_msec() - nf->boot_time); + nf_hdr->unix_secs = htonl(now.tv_sec); + nf_hdr->unix_nsecs = htonl(now.tv_usec * 1000); + nf_hdr->flow_seq = htonl(nf->netflow_cnt++); + nf_hdr->engine_type = 0; + nf_hdr->engine_id = 0; + nf_hdr->sampling_interval = htons(0); + } + nf_hdr = nf->packet.data; + nf_hdr->count = htons(ntohs(nf_hdr->count) + 1); + + nf_rec = ofpbuf_put_zeros(&nf->packet, sizeof *nf_rec); + nf_rec->src_addr = expired->flow.nw_src; + nf_rec->dst_addr = expired->flow.nw_dst; + nf_rec->nexthop = htons(0); + nf_rec->input = htons(expired->flow.in_port); + nf_rec->output = htons(0); + nf_rec->packet_count = htonl(expired->packet_count); + nf_rec->byte_count = htonl(expired->byte_count); + nf_rec->init_time = htonl(expired->created - nf->boot_time); + nf_rec->used_time = htonl(MAX(expired->created, expired->used) + - nf->boot_time); if (expired->flow.nw_proto == IP_TYPE_ICMP) { /* In NetFlow, the ICMP type and code are concatenated and * placed in the 'dst_port' field. */ uint8_t type = ntohs(expired->flow.tp_src); uint8_t code = ntohs(expired->flow.tp_dst); - nf_rec.src_port = htons(0); - nf_rec.dst_port = htons((type << 8) | code); + nf_rec->src_port = htons(0); + nf_rec->dst_port = htons((type << 8) | code); } else { - nf_rec.src_port = expired->flow.tp_src; - nf_rec.dst_port = expired->flow.tp_dst; + nf_rec->src_port = expired->flow.tp_src; + nf_rec->dst_port = expired->flow.tp_dst; + } + nf_rec->tcp_flags = expired->tcp_flags; + nf_rec->ip_proto = expired->flow.nw_proto; + nf_rec->ip_tos = expired->ip_tos; + + if (nf->packet.size >= 1400) { + netflow_run(nf); } +} - nf_rec.tcp_flags = expired->tcp_flags; - nf_rec.ip_proto = expired->flow.nw_proto; - nf_rec.ip_tos = expired->ip_tos; +void +netflow_run(struct netflow *nf) +{ + size_t i; - nf_rec.src_as = htons(0); - nf_rec.dst_as = htons(0); - nf_rec.src_mask = 0; - nf_rec.dst_mask = 0; + if (!nf->packet.size) { + return; + } for (i = 0; i < nf->n_fds; i++) { - struct msghdr msghdr; - struct iovec iov[2]; - - iov[0].iov_base = &nf_hdr; - iov[0].iov_len = sizeof nf_hdr; - iov[1].iov_base = &nf_rec; - iov[1].iov_len = sizeof nf_rec; - msghdr.msg_name = NULL; - msghdr.msg_namelen = 0; - msghdr.msg_iov = iov; - msghdr.msg_iovlen = 2; - msghdr.msg_control = NULL; - msghdr.msg_controllen = 0; - msghdr.msg_flags = 0; - if (sendmsg(nf->fds[i], &msghdr, 0) < 0) { + if (send(nf->fds[i], nf->packet.data, nf->packet.size, 0)) { VLOG_WARN_RL(&rl, "netflow message send failed: %s", strerror(errno)); } } - nf->netflow_cnt++; + nf->packet.size = 0; } static void @@ -300,6 +299,7 @@ netflow_create(void) nf->fds = NULL; nf->n_fds = 0; nf->netflow_cnt = 0; + ofpbuf_init(&nf->packet, 1500); return nf; } @@ -307,6 +307,7 @@ void netflow_destroy(struct netflow *nf) { if (nf) { + ofpbuf_uninit(&nf->packet); clear_collectors(nf); free(nf); } diff --git a/secchan/netflow.h b/secchan/netflow.h index a8b7076e..3e7f51c6 100644 --- a/secchan/netflow.h +++ b/secchan/netflow.h @@ -43,5 +43,6 @@ struct netflow *netflow_create(void); void netflow_destroy(struct netflow *); int netflow_set_collectors(struct netflow *, const struct svec *collectors); void netflow_expire(struct netflow *, const struct ofexpired *); +void netflow_run(struct netflow *); #endif /* netflow.h */ diff --git a/secchan/ofproto.c b/secchan/ofproto.c index 979dfeaa..cf82953b 100644 --- a/secchan/ofproto.c +++ b/secchan/ofproto.c @@ -843,6 +843,10 @@ ofproto_run1(struct ofproto *p) classifier_for_each(&p->cls, CLS_INC_EXACT, expire_rule, p); } + if (p->netflow) { + netflow_run(p->netflow); + } + return 0; } -- 2.30.2