struct pktbuf *pktbuf;
bool send_flow_exp;
int miss_send_len;
+
+ struct rconn_packet_counter *packet_in_counter;
+
+ /* Number of OpenFlow messages queued as replies to OpenFlow requests, and
+ * the maximum number before we stop reading OpenFlow requests. */
+#define OFCONN_REPLY_MAX 100
+ struct rconn_packet_counter *reply_counter;
};
static struct ofconn *ofconn_create(struct ofproto *, struct rconn *);
static void ofconn_destroy(struct ofconn *, struct ofproto *);
static void ofconn_run(struct ofconn *, struct ofproto *);
static void ofconn_wait(struct ofconn *);
-static void queue_tx(struct ofpbuf *msg, const struct ofconn *ofconn);
+static void queue_tx(struct ofpbuf *msg, const struct ofconn *ofconn,
+ struct rconn_packet_counter *counter);
struct ofproto {
/* Settings. */
send_port_status(struct ofproto *p, const struct ofport *ofport,
uint8_t reason)
{
+ /* XXX Should limit the number of queued port status change messages. */
struct ofconn *ofconn;
LIST_FOR_EACH (ofconn, struct ofconn, node, &p->all_conns) {
struct ofp_port_status *ops;
ops->reason = reason;
ops->desc = ofport->opp;
hton_ofp_phy_port(&ops->desc);
- queue_tx(b, ofconn);
+ queue_tx(b, ofconn, NULL);
}
if (p->ofhooks->port_changed_cb) {
p->ofhooks->port_changed_cb(reason, &ofport->opp, p->aux);
ofconn->pktbuf = NULL;
ofconn->send_flow_exp = false;
ofconn->miss_send_len = 0;
+ ofconn->packet_in_counter = rconn_packet_counter_create ();
+ ofconn->reply_counter = rconn_packet_counter_create ();
return ofconn;
}
list_remove(&ofconn->node);
rconn_destroy(ofconn->rconn);
+ rconn_packet_counter_destroy(ofconn->packet_in_counter);
+ rconn_packet_counter_destroy(ofconn->reply_counter);
pktbuf_destroy(ofconn->pktbuf);
free(ofconn);
}
rconn_run(ofconn->rconn);
- /* Limit the number of iterations to prevent other tasks from starving. */
- for (iteration = 0; iteration < 50; iteration++) {
- struct ofpbuf *of_msg = rconn_recv(ofconn->rconn);
- if (!of_msg) {
- break;
+ if (rconn_packet_counter_read (ofconn->reply_counter) < OFCONN_REPLY_MAX) {
+ /* Limit the number of iterations to prevent other tasks from
+ * starving. */
+ for (iteration = 0; iteration < 50; iteration++) {
+ struct ofpbuf *of_msg = rconn_recv(ofconn->rconn);
+ if (!of_msg) {
+ break;
+ }
+ handle_openflow(ofconn, p, of_msg);
+ ofpbuf_delete(of_msg);
}
- handle_openflow(ofconn, p, of_msg);
- ofpbuf_delete(of_msg);
}
if (ofconn != p->controller && !rconn_is_alive(ofconn->rconn)) {
ofconn_wait(struct ofconn *ofconn)
{
rconn_run_wait(ofconn->rconn);
- rconn_recv_wait(ofconn->rconn);
+ if (rconn_packet_counter_read (ofconn->reply_counter) < OFCONN_REPLY_MAX) {
+ rconn_recv_wait(ofconn->rconn);
+ }
}
\f
/* Caller is responsible for initializing the 'cr' member of the returned
}
\f
static void
-queue_tx(struct ofpbuf *msg, const struct ofconn *ofconn)
+queue_tx(struct ofpbuf *msg, const struct ofconn *ofconn,
+ struct rconn_packet_counter *counter)
{
update_openflow_length(msg);
- if (rconn_send(ofconn->rconn, msg, NULL)) { /* XXX */
+ if (rconn_send(ofconn->rconn, msg, counter)) {
ofpbuf_delete(msg);
}
}
oem->type = htons((unsigned int) error >> 16);
oem->code = htons(error & 0xffff);
memcpy(oem->data, data, len);
- queue_tx(buf, ofconn);
+ queue_tx(buf, ofconn, ofconn->reply_counter);
}
static void
handle_echo_request(struct ofconn *ofconn, struct ofp_header *oh)
{
struct ofp_header *rq = oh;
- queue_tx(make_echo_reply(rq), ofconn);
+ queue_tx(make_echo_reply(rq), ofconn, ofconn->reply_counter);
return 0;
}
hton_ofp_phy_port(ofpbuf_put(buf, &port->opp, sizeof port->opp));
}
- queue_tx(buf, ofconn);
+ queue_tx(buf, ofconn, ofconn->reply_counter);
return 0;
}
osc = make_openflow_xid(sizeof *osc, OFPT_GET_CONFIG_REPLY, oh->xid, &buf);
osc->flags = htons(flags);
osc->miss_send_len = htons(ofconn->miss_send_len);
- queue_tx(buf, ofconn);
+ queue_tx(buf, ofconn, ofconn->reply_counter);
return 0;
}
struct ofp_stats_reply *reply = msg->data;
reply->flags = htons(OFPSF_REPLY_MORE);
*msgp = make_stats_reply(reply->header.xid, reply->type, nbytes);
- queue_tx(msg, ofconn);
+ queue_tx(msg, ofconn, ofconn->reply_counter);
}
return ofpbuf_put_uninit(*msgp, nbytes);
}
strncpy(ods->hw_desc, p->hardware, sizeof ods->hw_desc);
strncpy(ods->sw_desc, p->software, sizeof ods->sw_desc);
strncpy(ods->serial_num, p->serial, sizeof ods->serial_num);
- queue_tx(msg, ofconn);
+ queue_tx(msg, ofconn, ofconn->reply_counter);
return 0;
}
ots->lookup_count = htonll(0); /* XXX */
ots->matched_count = htonll(0); /* XXX */
- queue_tx(msg, ofconn);
+ queue_tx(msg, ofconn, ofconn->reply_counter);
return 0;
}
ops->collisions = htonll(stats.collisions);
}
- queue_tx(msg, ofconn);
+ queue_tx(msg, ofconn, ofconn->reply_counter);
return 0;
}
classifier_for_each_match(&p->cls, &target,
table_id_to_include(fsr->table_id),
flow_stats_cb, &cbdata);
- queue_tx(cbdata.msg, ofconn);
+ queue_tx(cbdata.msg, ofconn, ofconn->reply_counter);
return 0;
}
reply->flow_count = htonl(cbdata.n_flows);
reply->packet_count = htonll(cbdata.packet_count);
reply->byte_count = htonll(cbdata.byte_count);
- queue_tx(msg, ofconn);
+ queue_tx(msg, ofconn, ofconn->reply_counter);
return 0;
}
ofpbuf_put(b, capabilities, strlen(capabilities));
- queue_tx(b, ofconn);
+ queue_tx(b, ofconn, ofconn->reply_counter);
}
static int
struct ofconn *prev;
struct ofpbuf *buf;
+ /* We limit the maximum number of queued flow expirations it by accounting
+ * them under the counter for replies. That works because preventing
+ * OpenFlow requests from being processed also prevents new flows from
+ * being added (and expiring). (It also prevents processing OpenFlow
+ * requests that would not add new flows, so it is imperfect.) */
+
prev = NULL;
LIST_FOR_EACH (ofconn, struct ofconn, node, &p->all_conns) {
if (ofconn->send_flow_exp && rconn_is_connected(ofconn->rconn)) {
if (prev) {
- queue_tx(ofpbuf_clone(buf), prev);
+ queue_tx(ofpbuf_clone(buf), prev, ofconn->reply_counter);
} else {
buf = compose_flow_exp(rule, now, reason);
}
}
}
if (prev) {
- queue_tx(buf, prev);
+ queue_tx(buf, prev, ofconn->reply_counter);
}
}
opi->in_port = htons(odp_port_to_ofp_port(msg->port));
opi->reason = msg->type == _ODPL_ACTION_NR ? OFPR_ACTION : OFPR_NO_MATCH;
ofpbuf_put(buf, payload.data, MIN(send_len, payload.size));
- queue_tx(buf, ofconn);
+ update_openflow_length(buf);
+ rconn_send_with_limit(ofconn->rconn, buf, ofconn->packet_in_counter, 100);
}
static void