#include "cfg.h"
#include "flow.h"
#include "netflow.h"
+#include "ofpbuf.h"
#include "ofproto.h"
#include "packets.h"
#include "socket-util.h"
int *fds; /* Sockets for NetFlow collectors. */
size_t n_fds; /* Number of Netflow collectors. */
uint32_t netflow_cnt; /* Flow sequence number for NetFlow. */
+ struct ofpbuf packet; /* NetFlow packet being accumulated. */
};
static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 5);
void
netflow_expire(struct netflow *nf, const struct ofexpired *expired)
{
- struct netflow_v5_header nf_hdr;
- struct netflow_v5_record nf_rec;
+ struct netflow_v5_header *nf_hdr;
+ struct netflow_v5_record *nf_rec;
struct timeval now;
- int i;
time_timeval(&now);
- memset(&nf_hdr, 0, sizeof nf_hdr);
- nf_hdr.version = htons(NETFLOW_V5_VERSION);
- nf_hdr.count = htons(1);
- nf_hdr.sysuptime = htonl(time_msec() - nf->boot_time);
- nf_hdr.unix_secs = htonl(now.tv_sec);
- nf_hdr.unix_nsecs = htonl(now.tv_usec * 1000);
- nf_hdr.flow_seq = htonl(nf->netflow_cnt);
- nf_hdr.engine_type = 0;
- nf_hdr.engine_id = 0;
- nf_hdr.sampling_interval = htons(0);
-
- memset(&nf_rec, 0, sizeof nf_rec);
- nf_rec.src_addr = expired->flow.nw_src;
- nf_rec.dst_addr = expired->flow.nw_dst;
- nf_rec.nexthop = htons(0);
- nf_rec.input = htons(expired->flow.in_port);
- nf_rec.output = htons(0);
- nf_rec.packet_count = htonl(expired->packet_count);
- nf_rec.byte_count = htonl(expired->byte_count);
- nf_rec.init_time = htonl(expired->created - nf->boot_time);
- nf_rec.used_time = htonl(MAX(expired->created, expired->used)
- - nf->boot_time);
+ if (!nf->packet.size) {
+ nf_hdr = ofpbuf_put_zeros(&nf->packet, sizeof *nf_hdr);
+ nf_hdr->version = htons(NETFLOW_V5_VERSION);
+ nf_hdr->count = htons(0);
+ nf_hdr->sysuptime = htonl(time_msec() - nf->boot_time);
+ nf_hdr->unix_secs = htonl(now.tv_sec);
+ nf_hdr->unix_nsecs = htonl(now.tv_usec * 1000);
+ nf_hdr->flow_seq = htonl(nf->netflow_cnt++);
+ nf_hdr->engine_type = 0;
+ nf_hdr->engine_id = 0;
+ nf_hdr->sampling_interval = htons(0);
+ }
+ nf_hdr = nf->packet.data;
+ nf_hdr->count = htons(ntohs(nf_hdr->count) + 1);
+
+ nf_rec = ofpbuf_put_zeros(&nf->packet, sizeof *nf_rec);
+ nf_rec->src_addr = expired->flow.nw_src;
+ nf_rec->dst_addr = expired->flow.nw_dst;
+ nf_rec->nexthop = htons(0);
+ nf_rec->input = htons(expired->flow.in_port);
+ nf_rec->output = htons(0);
+ nf_rec->packet_count = htonl(expired->packet_count);
+ nf_rec->byte_count = htonl(expired->byte_count);
+ nf_rec->init_time = htonl(expired->created - nf->boot_time);
+ nf_rec->used_time = htonl(MAX(expired->created, expired->used)
+ - nf->boot_time);
if (expired->flow.nw_proto == IP_TYPE_ICMP) {
/* In NetFlow, the ICMP type and code are concatenated and
* placed in the 'dst_port' field. */
uint8_t type = ntohs(expired->flow.tp_src);
uint8_t code = ntohs(expired->flow.tp_dst);
- nf_rec.src_port = htons(0);
- nf_rec.dst_port = htons((type << 8) | code);
+ nf_rec->src_port = htons(0);
+ nf_rec->dst_port = htons((type << 8) | code);
} else {
- nf_rec.src_port = expired->flow.tp_src;
- nf_rec.dst_port = expired->flow.tp_dst;
+ nf_rec->src_port = expired->flow.tp_src;
+ nf_rec->dst_port = expired->flow.tp_dst;
+ }
+ nf_rec->tcp_flags = expired->tcp_flags;
+ nf_rec->ip_proto = expired->flow.nw_proto;
+ nf_rec->ip_tos = expired->ip_tos;
+
+ if (nf->packet.size >= 1400) {
+ netflow_run(nf);
}
+}
- nf_rec.tcp_flags = expired->tcp_flags;
- nf_rec.ip_proto = expired->flow.nw_proto;
- nf_rec.ip_tos = expired->ip_tos;
+void
+netflow_run(struct netflow *nf)
+{
+ size_t i;
- nf_rec.src_as = htons(0);
- nf_rec.dst_as = htons(0);
- nf_rec.src_mask = 0;
- nf_rec.dst_mask = 0;
+ if (!nf->packet.size) {
+ return;
+ }
for (i = 0; i < nf->n_fds; i++) {
- struct msghdr msghdr;
- struct iovec iov[2];
-
- iov[0].iov_base = &nf_hdr;
- iov[0].iov_len = sizeof nf_hdr;
- iov[1].iov_base = &nf_rec;
- iov[1].iov_len = sizeof nf_rec;
- msghdr.msg_name = NULL;
- msghdr.msg_namelen = 0;
- msghdr.msg_iov = iov;
- msghdr.msg_iovlen = 2;
- msghdr.msg_control = NULL;
- msghdr.msg_controllen = 0;
- msghdr.msg_flags = 0;
- if (sendmsg(nf->fds[i], &msghdr, 0) < 0) {
+ if (send(nf->fds[i], nf->packet.data, nf->packet.size, 0)) {
VLOG_WARN_RL(&rl, "netflow message send failed: %s",
strerror(errno));
}
}
- nf->netflow_cnt++;
+ nf->packet.size = 0;
}
static void
nf->fds = NULL;
nf->n_fds = 0;
nf->netflow_cnt = 0;
+ ofpbuf_init(&nf->packet, 1500);
return nf;
}
netflow_destroy(struct netflow *nf)
{
if (nf) {
+ ofpbuf_uninit(&nf->packet);
clear_collectors(nf);
free(nf);
}