#include "ofpbuf.h"
#include "ofp-print.h"
#include "openflow.h"
+#include "poll-loop.h"
#include "queue.h"
#include "rconn.h"
#include "stp.h"
#define THIS_MODULE VLM_learning_switch
#include "vlog.h"
+enum port_state {
+ P_DISABLED = 1 << 0,
+ P_LISTENING = 1 << 1,
+ P_LEARNING = 1 << 2,
+ P_FORWARDING = 1 << 3,
+ P_BLOCKING = 1 << 4
+};
+
struct lswitch {
/* If nonnegative, the switch sets up flows that expire after the given
* number of seconds (or never expire, if the value is OFP_FLOW_PERMANENT).
* Otherwise, the switch processes every packet. */
int max_idle;
- uint64_t datapath_id;
+ unsigned long long int datapath_id;
uint32_t capabilities;
time_t last_features_request;
struct mac_learning *ml; /* NULL to act as hub instead of switch. */
/* Number of outgoing queued packets on the rconn. */
int n_queued;
+
+ /* Spanning tree protocol implementation.
+ *
+ * We implement STP states by, whenever a port's STP state changes,
+ * querying all the flows on the switch and then deleting any of them that
+ * are inappropriate for a port's STP state. */
+ long long int next_query; /* Next time at which to query all flows. */
+ long long int last_query; /* Last time we sent a query. */
+ long long int last_reply; /* Last time we received a query reply. */
+ unsigned int port_states[STP_MAX_PORTS];
+ uint32_t query_xid; /* XID used for query. */
+ int n_flows, n_no_recv, n_no_send;
};
/* The log messages here could actually be useful in debugging, so keep the
static void queue_tx(struct lswitch *, struct rconn *, struct ofpbuf *);
static void send_features_request(struct lswitch *, struct rconn *);
-static void process_switch_features(struct lswitch *, struct rconn *,
- struct ofp_switch_features *);
-static void process_packet_in(struct lswitch *, struct rconn *,
- struct ofp_packet_in *);
-static void process_echo_request(struct lswitch *, struct rconn *,
- struct ofp_header *);
-static void process_port_status(struct lswitch *, struct rconn *,
- struct ofp_port_status *);
-static void process_phy_port(struct lswitch *, struct rconn *,
- const struct ofp_phy_port *);
+static void schedule_query(struct lswitch *, long long int delay);
+static bool may_learn(const struct lswitch *, uint16_t port_no);
+static bool may_recv(const struct lswitch *, uint16_t port_no,
+ bool any_actions);
+static bool may_send(const struct lswitch *, uint16_t port_no);
+
+typedef void packet_handler_func(struct lswitch *, struct rconn *, void *);
+static packet_handler_func process_switch_features;
+static packet_handler_func process_packet_in;
+static packet_handler_func process_echo_request;
+static packet_handler_func process_port_status;
+static packet_handler_func process_phy_port;
+static packet_handler_func process_stats_reply;
/* Creates and returns a new learning switch.
*
struct lswitch *
lswitch_create(struct rconn *rconn, bool learn_macs, int max_idle)
{
- struct lswitch *sw = xcalloc(1, sizeof *sw);
+ struct lswitch *sw;
+ size_t i;
+
+ sw = xcalloc(1, sizeof *sw);
sw->max_idle = max_idle;
sw->datapath_id = 0;
sw->last_features_request = time_now() - 1;
sw->ml = learn_macs ? mac_learning_create() : NULL;
+ sw->next_query = LLONG_MIN;
+ sw->last_query = LLONG_MIN;
+ sw->last_reply = LLONG_MIN;
+ for (i = 0; i < STP_MAX_PORTS; i++) {
+ sw->port_states[i] = P_DISABLED;
+ }
send_features_request(sw, rconn);
return sw;
}
}
}
-static size_t
-min_size(uint8_t type)
+/* Takes care of necessary 'sw' activity, except for receiving packets (which
+ * the caller must do). */
+void
+lswitch_run(struct lswitch *sw, struct rconn *rconn)
+{
+ long long int now = time_msec();
+
+ /* If we're waiting for more replies, keeping waiting for up to 10 s. */
+ if (sw->last_reply != LLONG_MIN) {
+ if (now - sw->last_reply > 10000) {
+ VLOG_ERR_RL(&rl, "%012llx: No more flow stat replies last 10 s",
+ sw->datapath_id);
+ sw->last_reply = LLONG_MIN;
+ sw->last_query = LLONG_MIN;
+ schedule_query(sw, 0);
+ } else {
+ return;
+ }
+ }
+
+ /* If we're waiting for any reply at all, keep waiting for up to 10 s. */
+ if (sw->last_query != LLONG_MIN) {
+ if (now - sw->last_query > 10000) {
+ VLOG_ERR_RL(&rl, "%012llx: No flow stat replies in last 10 s",
+ sw->datapath_id);
+ sw->last_query = LLONG_MIN;
+ schedule_query(sw, 0);
+ } else {
+ return;
+ }
+ }
+
+ /* If it's time to send another query, do so. */
+ if (sw->next_query != LLONG_MIN && now >= sw->next_query) {
+ sw->next_query = LLONG_MIN;
+ if (!rconn_is_connected(rconn)) {
+ schedule_query(sw, 1000);
+ } else {
+ struct ofp_stats_request *osr;
+ struct ofp_flow_stats_request *ofsr;
+ struct ofpbuf *b;
+ int error;
+
+ VLOG_DBG("%012llx: Sending flow stats request to implement STP",
+ sw->datapath_id);
+
+ sw->last_query = now;
+ sw->query_xid = random_uint32();
+ sw->n_flows = 0;
+ sw->n_no_recv = 0;
+ sw->n_no_send = 0;
+ osr = make_openflow_xid(sizeof *osr + sizeof *ofsr,
+ OFPT_STATS_REQUEST, sw->query_xid, &b);
+ osr->type = htons(OFPST_FLOW);
+ osr->flags = htons(0);
+ ofsr = (struct ofp_flow_stats_request *) osr->body;
+ ofsr->match.wildcards = htonl(OFPFW_ALL);
+ ofsr->table_id = 0xff;
+
+ error = rconn_send(rconn, b, NULL);
+ if (error) {
+ VLOG_WARN_RL(&rl, "%012llx: sending flow stats request "
+ "failed: %s", sw->datapath_id, strerror(error));
+ ofpbuf_delete(b);
+ schedule_query(sw, 1000);
+ }
+ }
+ }
+}
+
+static void
+wait_timeout(long long int started)
+{
+ long long int now = time_msec();
+ long long int timeout = 10000 - (now - started);
+ if (timeout <= 0) {
+ poll_immediate_wake();
+ } else {
+ poll_timer_wait(timeout);
+ }
+}
+
+void
+lswitch_wait(struct lswitch *sw)
{
- return (type == OFPT_FEATURES_REPLY ? sizeof(struct ofp_switch_features)
- : type == OFPT_PACKET_IN ? offsetof (struct ofp_packet_in, data)
- : type == OFPT_PORT_STATUS ? sizeof(struct ofp_port_status)
- : sizeof(struct ofp_header));
+ if (sw->last_reply != LLONG_MIN) {
+ wait_timeout(sw->last_reply);
+ } else if (sw->last_query != LLONG_MIN) {
+ wait_timeout(sw->last_query);
+ }
}
/* Processes 'msg', which should be an OpenFlow received on 'rconn', according
lswitch_process_packet(struct lswitch *sw, struct rconn *rconn,
const struct ofpbuf *msg)
{
+ struct processor {
+ uint8_t type;
+ size_t min_size;
+ packet_handler_func *handler;
+ };
+ static const struct processor processors[] = {
+ {
+ OFPT_ECHO_REQUEST,
+ sizeof(struct ofp_header),
+ process_echo_request
+ },
+ {
+ OFPT_FEATURES_REPLY,
+ sizeof(struct ofp_switch_features),
+ process_switch_features
+ },
+ {
+ OFPT_PACKET_IN,
+ offsetof(struct ofp_packet_in, data),
+ process_packet_in
+ },
+ {
+ OFPT_PORT_STATUS,
+ sizeof(struct ofp_port_status),
+ process_port_status
+ },
+ {
+ OFPT_STATS_REPLY,
+ offsetof(struct ofp_stats_reply, body),
+ process_stats_reply
+ },
+ {
+ OFPT_FLOW_EXPIRED,
+ sizeof(struct ofp_flow_expired),
+ NULL
+ },
+ };
+ const size_t n_processors = ARRAY_SIZE(processors);
+ const struct processor *p;
struct ofp_header *oh;
oh = msg->data;
- if (msg->size < min_size(oh->type)) {
- VLOG_WARN_RL(&rl,
- "%s: too short (%zu bytes) for type %"PRIu8" (min %zu)",
- rconn_get_name(rconn),
- msg->size, oh->type, min_size(oh->type));
+ if (sw->datapath_id == 0
+ && oh->type != OFPT_ECHO_REQUEST
+ && oh->type != OFPT_FEATURES_REPLY) {
+ send_features_request(sw, rconn);
return;
}
- if (oh->type == OFPT_ECHO_REQUEST) {
- process_echo_request(sw, rconn, msg->data);
- } else if (oh->type == OFPT_FEATURES_REPLY) {
- process_switch_features(sw, rconn, msg->data);
- } else if (sw->datapath_id == 0) {
- send_features_request(sw, rconn);
- } else if (oh->type == OFPT_PACKET_IN) {
- process_packet_in(sw, rconn, msg->data);
- } else if (oh->type == OFPT_PORT_STATUS) {
- process_port_status(sw, rconn, msg->data);
- } else {
- if (VLOG_IS_DBG_ENABLED()) {
- char *p = ofp_to_string(msg->data, msg->size, 2);
- VLOG_DBG_RL(&rl, "OpenFlow packet ignored: %s", p);
- free(p);
+ for (p = processors; p < &processors[n_processors]; p++) {
+ if (oh->type == p->type) {
+ if (msg->size < p->min_size) {
+ VLOG_WARN_RL(&rl, "%012llx: %s: too short (%zu bytes) for "
+ "type %"PRIu8" (min %zu)", sw->datapath_id,
+ rconn_get_name(rconn), msg->size, oh->type,
+ p->min_size);
+ return;
+ }
+ if (p->handler) {
+ (p->handler)(sw, rconn, msg->data);
+ }
+ return;
}
}
+ if (VLOG_IS_DBG_ENABLED()) {
+ char *p = ofp_to_string(msg->data, msg->size, 2);
+ VLOG_DBG_RL(&rl, "%012llx: OpenFlow packet ignored: %s",
+ sw->datapath_id, p);
+ free(p);
+ }
}
\f
static void
int retval = rconn_send_with_limit(rconn, b, &sw->n_queued, 10);
if (retval && retval != ENOTCONN) {
if (retval == EAGAIN) {
- VLOG_WARN_RL(&rl, "%s: tx queue overflow", rconn_get_name(rconn));
+ VLOG_WARN_RL(&rl, "%012llx: %s: tx queue overflow",
+ sw->datapath_id, rconn_get_name(rconn));
} else {
- VLOG_WARN_RL(&rl, "%s: send: %s",
- rconn_get_name(rconn), strerror(retval));
+ VLOG_WARN_RL(&rl, "%012llx: %s: send: %s",
+ sw->datapath_id, rconn_get_name(rconn),
+ strerror(retval));
}
}
}
static void
-process_switch_features(struct lswitch *sw, struct rconn *rconn,
- struct ofp_switch_features *osf)
+schedule_query(struct lswitch *sw, long long int delay)
+{
+ long long int now = time_msec();
+ if (sw->next_query == LLONG_MIN || sw->next_query > now + delay) {
+ sw->next_query = now + delay;
+ }
+}
+
+static void
+process_switch_features(struct lswitch *sw, struct rconn *rconn, void *osf_)
{
+ struct ofp_switch_features *osf = osf_;
size_t n_ports = ((ntohs(osf->header.length)
- offsetof(struct ofp_switch_features, ports))
/ sizeof *osf->ports);
size_t i;
- sw->datapath_id = osf->datapath_id;
+ sw->datapath_id = ntohll(osf->datapath_id);
sw->capabilities = ntohl(osf->capabilities);
for (i = 0; i < n_ports; i++) {
process_phy_port(sw, rconn, &osf->ports[i]);
}
+ if (sw->capabilities & OFPC_STP) {
+ schedule_query(sw, 1000);
+ }
}
static void
-process_packet_in(struct lswitch *sw, struct rconn *rconn,
- struct ofp_packet_in *opi)
+process_packet_in(struct lswitch *sw, struct rconn *rconn, void *opi_)
{
+ struct ofp_packet_in *opi = opi_;
uint16_t in_port = ntohs(opi->in_port);
uint16_t out_port = OFPP_FLOOD;
pkt.size = pkt_len;
flow_extract(&pkt, in_port, &flow);
- if (sw->ml) {
+ if (may_learn(sw, in_port) && sw->ml) {
if (mac_learning_learn(sw->ml, flow.dl_src, in_port)) {
- VLOG_DBG_RL(&rl, "learned that "ETH_ADDR_FMT" is on datapath %"
- PRIx64" port %"PRIu16, ETH_ADDR_ARGS(flow.dl_src),
- ntohll(sw->datapath_id), in_port);
+ VLOG_DBG_RL(&rl, "%012llx: learned that "ETH_ADDR_FMT" is on "
+ "port %"PRIu16, sw->datapath_id,
+ ETH_ADDR_ARGS(flow.dl_src), in_port);
+ }
+ }
+
+ if (!may_recv(sw, in_port, false)) {
+ /* STP prevents receiving anything on this port. */
+ goto drop_it;
+ }
+
+ if (sw->ml) {
+ uint16_t learned_port = mac_learning_lookup(sw->ml, flow.dl_dst);
+ if (may_send(sw, learned_port)) {
+ out_port = learned_port;
}
- out_port = mac_learning_lookup(sw->ml, flow.dl_dst);
}
if (in_port == out_port) {
- /* The input and output port match. Set up a flow to drop packets. */
- queue_tx(sw, rconn, make_add_flow(&flow, ntohl(opi->buffer_id),
- sw->max_idle, 0));
+ /* Don't send out packets on their input ports. */
+ goto drop_it;
} else if (sw->max_idle >= 0 && (!sw->ml || out_port != OFPP_FLOOD)) {
/* The output port is known, or we always flood everything, so add a
* new flow. */
}
queue_tx(sw, rconn, b);
}
+ return;
+
+drop_it:
+ /* Set up a flow to drop packets, or just drop the packet if we don't set
+ * up flows at all. */
+ if (sw->max_idle >= 0) {
+ queue_tx(sw, rconn, make_add_flow(&flow, ntohl(opi->buffer_id),
+ sw->max_idle, 0));
+ }
+ return;
}
static void
-process_echo_request(struct lswitch *sw, struct rconn *rconn,
- struct ofp_header *rq)
+process_echo_request(struct lswitch *sw, struct rconn *rconn, void *rq_)
{
+ struct ofp_header *rq = rq_;
queue_tx(sw, rconn, make_echo_reply(rq));
}
static void
-process_port_status(struct lswitch *sw, struct rconn *rconn,
- struct ofp_port_status *ops)
+process_port_status(struct lswitch *sw, struct rconn *rconn, void *ops_)
{
+ struct ofp_port_status *ops = ops_;
process_phy_port(sw, rconn, &ops->desc);
}
static void
-process_phy_port(struct lswitch *sw, struct rconn *rconn,
- const struct ofp_phy_port *opp)
+process_phy_port(struct lswitch *sw, struct rconn *rconn, void *opp_)
{
- if (sw->capabilities & OFPC_STP && ntohs(opp->port_no) < STP_MAX_PORTS) {
+ const struct ofp_phy_port *opp = opp_;
+ uint16_t port_no = ntohs(opp->port_no);
+ if (sw->capabilities & OFPC_STP && port_no < STP_MAX_PORTS) {
uint32_t config = ntohl(opp->config);
uint32_t state = ntohl(opp->state);
- uint32_t new_config = config & ~(OFPPC_NO_RECV | OFPPC_NO_RECV_STP
- | OFPPC_NO_FWD | OFPPC_NO_PACKET_IN);
+ unsigned int *port_state = &sw->port_states[port_no];
+ unsigned int new_port_state;
+
if (!(config & (OFPPC_NO_STP | OFPPC_PORT_DOWN))
- && !(state & OFPPS_LINK_DOWN)) {
- bool forward = false;
- bool learn = false;
+ && !(state & OFPPS_LINK_DOWN))
+ {
switch (state & OFPPS_STP_MASK) {
case OFPPS_STP_LISTEN:
- case OFPPS_STP_BLOCK:
+ new_port_state = P_LISTENING;
break;
case OFPPS_STP_LEARN:
- learn = true;
+ new_port_state = P_LEARNING;
break;
case OFPPS_STP_FORWARD:
- forward = learn = true;
+ new_port_state = P_FORWARDING;
+ break;
+ case OFPPS_STP_BLOCK:
+ new_port_state = P_BLOCKING;
+ break;
+ default:
+ new_port_state = P_DISABLED;
break;
}
- if (!forward) {
- new_config |= OFPPC_NO_RECV | OFPPC_NO_FWD;
+ } else {
+ new_port_state = P_FORWARDING;
+ }
+ if (*port_state != new_port_state) {
+ *port_state = new_port_state;
+ schedule_query(sw, 1000);
+ }
+ }
+}
+
+static unsigned int
+get_port_state(const struct lswitch *sw, uint16_t port_no)
+{
+ return (port_no >= STP_MAX_PORTS || !(sw->capabilities & OFPC_STP)
+ ? P_FORWARDING
+ : sw->port_states[port_no]);
+}
+
+static bool
+may_learn(const struct lswitch *sw, uint16_t port_no)
+{
+ return get_port_state(sw, port_no) & (P_LEARNING | P_FORWARDING);
+}
+
+static bool
+may_recv(const struct lswitch *sw, uint16_t port_no, bool any_actions)
+{
+ unsigned int state = get_port_state(sw, port_no);
+ return !(any_actions
+ ? state & (P_DISABLED | P_LISTENING | P_BLOCKING)
+ : state & (P_DISABLED | P_LISTENING | P_BLOCKING | P_LEARNING));
+}
+
+static bool
+may_send(const struct lswitch *sw, uint16_t port_no)
+{
+ return get_port_state(sw, port_no) & P_FORWARDING;
+}
+
+static void
+process_flow_stats(struct lswitch *sw, struct rconn *rconn,
+ const struct ofp_flow_stats *ofs)
+{
+ const char *end = (char *) ofs + ntohs(ofs->length);
+ bool delete = false;
+
+ /* Decide to delete the flow if it matches on an STP-disabled physical
+ * port. But don't delete it if the flow just drops all received packets,
+ * because that's a perfectly reasonable thing to do for disabled physical
+ * ports. */
+ if (!(ofs->match.wildcards & htonl(OFPFW_IN_PORT))) {
+ if (!may_recv(sw, ntohs(ofs->match.in_port),
+ end > (char *) ofs->actions)) {
+ delete = true;
+ sw->n_no_recv++;
+ }
+ }
+
+ /* Decide to delete the flow if it forwards to an STP-disabled physical
+ * port. */
+ if (!delete) {
+ const struct ofp_action_header *a;
+ size_t len;
+
+ for (a = ofs->actions; (char *) a < end; a += len / 8) {
+ uint16_t type;
+
+ len = ntohs(a->len);
+ if (len > end - (char *) a) {
+ VLOG_DBG_RL(&rl, "%012llx: action exceeds available space "
+ "(%zu > %td)",
+ sw->datapath_id, len, end - (char *) a);
+ break;
+ } else if (len % 8) {
+ VLOG_DBG_RL(&rl, "%012llx: action length (%zu) not multiple "
+ "of 8 bytes", sw->datapath_id, len);
+ break;
}
- if (!learn) {
- new_config |= OFPPC_NO_PACKET_IN;
+
+ type = ntohs(a->type);
+ if (a->type == htons(OFPAT_OUTPUT)) {
+ struct ofp_action_output *oao = (struct ofp_action_output *) a;
+ if (!may_send(sw, ntohs(oao->port))) {
+ delete = true;
+ sw->n_no_send++;
+ break;
+ }
}
}
- if (config != new_config) {
- struct ofp_port_mod *opm;
- struct ofpbuf *b;
- int retval;
-
- VLOG_WARN("port %d: config=%x new_config=%x",
- ntohs(opp->port_no), config, new_config);
- opm = make_openflow(sizeof *opm, OFPT_PORT_MOD, &b);
- opm->port_no = opp->port_no;
- memcpy(opm->hw_addr, opp->hw_addr, OFP_ETH_ALEN);
- opm->config = htonl(new_config);
- opm->mask = htonl(config ^ new_config);
- opm->advertise = htonl(0);
- retval = rconn_send(rconn, b, NULL);
- if (retval) {
- if (retval != ENOTCONN) {
- VLOG_WARN_RL(&rl, "%s: send: %s",
- rconn_get_name(rconn), strerror(retval));
- }
- ofpbuf_delete(b);
+ }
+
+ /* Delete the flow. */
+ if (delete) {
+ struct ofp_flow_mod *ofm;
+ struct ofpbuf *b;
+
+ ofm = make_openflow(offsetof(struct ofp_flow_mod, actions),
+ OFPT_FLOW_MOD, &b);
+ ofm->match = ofs->match;
+ ofm->command = OFPFC_DELETE_STRICT;
+ rconn_send(rconn, b, NULL);
+ }
+}
+
+static void
+process_stats_reply(struct lswitch *sw, struct rconn *rconn, void *osr_)
+{
+ struct ofp_stats_reply *osr = osr_;
+ const uint8_t *body = osr->body;
+ const uint8_t *pos = body;
+ size_t body_len;
+
+ if (sw->last_query == LLONG_MIN
+ || osr->type != htons(OFPST_FLOW)
+ || osr->header.xid != sw->query_xid) {
+ return;
+ }
+ body_len = (ntohs(osr->header.length)
+ - offsetof(struct ofp_stats_reply, body));
+ for (;;) {
+ const struct ofp_flow_stats *fs;
+ ptrdiff_t bytes_left = body + body_len - pos;
+ size_t length;
+
+ if (bytes_left < sizeof *fs) {
+ if (bytes_left != 0) {
+ VLOG_WARN_RL(&rl, "%012llx: %td leftover bytes in flow "
+ "stats reply", sw->datapath_id, bytes_left);
}
+ break;
+ }
+
+ fs = (const void *) pos;
+ length = ntohs(fs->length);
+ if (length < sizeof *fs) {
+ VLOG_WARN_RL(&rl, "%012llx: flow stats length %zu is shorter than "
+ "min %zu", sw->datapath_id, length, sizeof *fs);
+ break;
+ } else if (length > bytes_left) {
+ VLOG_WARN_RL(&rl, "%012llx: flow stats length %zu but only %td "
+ "bytes left", sw->datapath_id, length, bytes_left);
+ break;
+ } else if ((length - sizeof *fs) % sizeof fs->actions[0]) {
+ VLOG_WARN_RL(&rl, "%012llx: flow stats length %zu has %zu bytes "
+ "left over in final action", sw->datapath_id, length,
+ (length - sizeof *fs) % sizeof fs->actions[0]);
+ break;
}
+
+ sw->n_flows++;
+ process_flow_stats(sw, rconn, fs);
+
+ pos += length;
+ }
+ if (!(osr->flags & htons(OFPSF_REPLY_MORE))) {
+ VLOG_DBG("%012llx: Deleted %d of %d received flows to "
+ "implement STP, %d because of no-recv, %d because of "
+ "no-send", sw->datapath_id,
+ sw->n_no_recv + sw->n_no_send, sw->n_flows,
+ sw->n_no_recv, sw->n_no_send);
+ sw->last_query = LLONG_MIN;
+ sw->last_reply = LLONG_MIN;
+ } else {
+ sw->last_reply = time_msec();
}
}
+