From: Ben Pfaff Date: Fri, 1 May 2009 20:20:07 +0000 (-0700) Subject: secchan: Don't let queued packets exhaust memory. X-Git-Url: https://pintos-os.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=e40e39d67f8e6ff49c46ecdd91cebbb94be353c8;p=openvswitch secchan: Don't let queued packets exhaust memory. The ofproto code was queuing OpenFlow messages to connections without limiting the maximum number that could be queued at a time. Thus, the backlog could grow without bound and exhaust all system memory. This commit introduces a cap on the maximum number of queued messages in two different categories: packet-in messages and replies to OpenFlow requests. --- diff --git a/lib/rconn.h b/lib/rconn.h index 93ccec05..82aefe52 100644 --- a/lib/rconn.h +++ b/lib/rconn.h @@ -112,4 +112,10 @@ void rconn_packet_counter_destroy(struct rconn_packet_counter *); void rconn_packet_counter_inc(struct rconn_packet_counter *); void rconn_packet_counter_dec(struct rconn_packet_counter *); +static inline int +rconn_packet_counter_read(const struct rconn_packet_counter *counter) +{ + return counter->n; +} + #endif /* rconn.h */ diff --git a/secchan/ofproto.c b/secchan/ofproto.c index bed9e511..71eaca4d 100644 --- a/secchan/ofproto.c +++ b/secchan/ofproto.c @@ -179,13 +179,21 @@ struct ofconn { struct pktbuf *pktbuf; bool send_flow_exp; int miss_send_len; + + struct rconn_packet_counter *packet_in_counter; + + /* Number of OpenFlow messages queued as replies to OpenFlow requests, and + * the maximum number before we stop reading OpenFlow requests. */ +#define OFCONN_REPLY_MAX 100 + struct rconn_packet_counter *reply_counter; }; static struct ofconn *ofconn_create(struct ofproto *, struct rconn *); static void ofconn_destroy(struct ofconn *, struct ofproto *); static void ofconn_run(struct ofconn *, struct ofproto *); static void ofconn_wait(struct ofconn *); -static void queue_tx(struct ofpbuf *msg, const struct ofconn *ofconn); +static void queue_tx(struct ofpbuf *msg, const struct ofconn *ofconn, + struct rconn_packet_counter *counter); struct ofproto { /* Settings. */ @@ -1140,6 +1148,7 @@ static void send_port_status(struct ofproto *p, const struct ofport *ofport, uint8_t reason) { + /* XXX Should limit the number of queued port status change messages. */ struct ofconn *ofconn; LIST_FOR_EACH (ofconn, struct ofconn, node, &p->all_conns) { struct ofp_port_status *ops; @@ -1149,7 +1158,7 @@ send_port_status(struct ofproto *p, const struct ofport *ofport, ops->reason = reason; ops->desc = ofport->opp; hton_ofp_phy_port(&ops->desc); - queue_tx(b, ofconn); + queue_tx(b, ofconn, NULL); } if (p->ofhooks->port_changed_cb) { p->ofhooks->port_changed_cb(reason, &ofport->opp, p->aux); @@ -1270,6 +1279,8 @@ ofconn_create(struct ofproto *p, struct rconn *rconn) ofconn->pktbuf = NULL; ofconn->send_flow_exp = false; ofconn->miss_send_len = 0; + ofconn->packet_in_counter = rconn_packet_counter_create (); + ofconn->reply_counter = rconn_packet_counter_create (); return ofconn; } @@ -1282,6 +1293,8 @@ ofconn_destroy(struct ofconn *ofconn, struct ofproto *p) list_remove(&ofconn->node); rconn_destroy(ofconn->rconn); + rconn_packet_counter_destroy(ofconn->packet_in_counter); + rconn_packet_counter_destroy(ofconn->reply_counter); pktbuf_destroy(ofconn->pktbuf); free(ofconn); } @@ -1293,14 +1306,17 @@ ofconn_run(struct ofconn *ofconn, struct ofproto *p) rconn_run(ofconn->rconn); - /* Limit the number of iterations to prevent other tasks from starving. */ - for (iteration = 0; iteration < 50; iteration++) { - struct ofpbuf *of_msg = rconn_recv(ofconn->rconn); - if (!of_msg) { - break; + if (rconn_packet_counter_read (ofconn->reply_counter) < OFCONN_REPLY_MAX) { + /* Limit the number of iterations to prevent other tasks from + * starving. */ + for (iteration = 0; iteration < 50; iteration++) { + struct ofpbuf *of_msg = rconn_recv(ofconn->rconn); + if (!of_msg) { + break; + } + handle_openflow(ofconn, p, of_msg); + ofpbuf_delete(of_msg); } - handle_openflow(ofconn, p, of_msg); - ofpbuf_delete(of_msg); } if (ofconn != p->controller && !rconn_is_alive(ofconn->rconn)) { @@ -1312,7 +1328,9 @@ static void ofconn_wait(struct ofconn *ofconn) { rconn_run_wait(ofconn->rconn); - rconn_recv_wait(ofconn->rconn); + if (rconn_packet_counter_read (ofconn->reply_counter) < OFCONN_REPLY_MAX) { + rconn_recv_wait(ofconn->rconn); + } } /* Caller is responsible for initializing the 'cr' member of the returned @@ -1626,10 +1644,11 @@ rule_post_uninstall(struct ofproto *ofproto, struct rule *rule) } static void -queue_tx(struct ofpbuf *msg, const struct ofconn *ofconn) +queue_tx(struct ofpbuf *msg, const struct ofconn *ofconn, + struct rconn_packet_counter *counter) { update_openflow_length(msg); - if (rconn_send(ofconn->rconn, msg, NULL)) { /* XXX */ + if (rconn_send(ofconn->rconn, msg, counter)) { ofpbuf_delete(msg); } } @@ -1652,7 +1671,7 @@ send_error(const struct ofconn *ofconn, const struct ofp_header *oh, oem->type = htons((unsigned int) error >> 16); oem->code = htons(error & 0xffff); memcpy(oem->data, data, len); - queue_tx(buf, ofconn); + queue_tx(buf, ofconn, ofconn->reply_counter); } static void @@ -1679,7 +1698,7 @@ static int handle_echo_request(struct ofconn *ofconn, struct ofp_header *oh) { struct ofp_header *rq = oh; - queue_tx(make_echo_reply(rq), ofconn); + queue_tx(make_echo_reply(rq), ofconn, ofconn->reply_counter); return 0; } @@ -1713,7 +1732,7 @@ handle_features_request(struct ofproto *p, struct ofconn *ofconn, hton_ofp_phy_port(ofpbuf_put(buf, &port->opp, sizeof port->opp)); } - queue_tx(buf, ofconn); + queue_tx(buf, ofconn, ofconn->reply_counter); return 0; } @@ -1737,7 +1756,7 @@ handle_get_config_request(struct ofproto *p, struct ofconn *ofconn, osc = make_openflow_xid(sizeof *osc, OFPT_GET_CONFIG_REPLY, oh->xid, &buf); osc->flags = htons(flags); osc->miss_send_len = htons(ofconn->miss_send_len); - queue_tx(buf, ofconn); + queue_tx(buf, ofconn, ofconn->reply_counter); return 0; } @@ -2156,7 +2175,7 @@ append_stats_reply(size_t nbytes, struct ofconn *ofconn, struct ofpbuf **msgp) struct ofp_stats_reply *reply = msg->data; reply->flags = htons(OFPSF_REPLY_MORE); *msgp = make_stats_reply(reply->header.xid, reply->type, nbytes); - queue_tx(msg, ofconn); + queue_tx(msg, ofconn, ofconn->reply_counter); } return ofpbuf_put_uninit(*msgp, nbytes); } @@ -2174,7 +2193,7 @@ handle_desc_stats_request(struct ofproto *p, struct ofconn *ofconn, strncpy(ods->hw_desc, p->hardware, sizeof ods->hw_desc); strncpy(ods->sw_desc, p->software, sizeof ods->sw_desc); strncpy(ods->serial_num, p->serial, sizeof ods->serial_num); - queue_tx(msg, ofconn); + queue_tx(msg, ofconn, ofconn->reply_counter); return 0; } @@ -2231,7 +2250,7 @@ handle_table_stats_request(struct ofproto *p, struct ofconn *ofconn, ots->lookup_count = htonll(0); /* XXX */ ots->matched_count = htonll(0); /* XXX */ - queue_tx(msg, ofconn); + queue_tx(msg, ofconn, ofconn->reply_counter); return 0; } @@ -2270,7 +2289,7 @@ handle_port_stats_request(struct ofproto *p, struct ofconn *ofconn, ops->collisions = htonll(stats.collisions); } - queue_tx(msg, ofconn); + queue_tx(msg, ofconn, ofconn->reply_counter); return 0; } @@ -2381,7 +2400,7 @@ handle_flow_stats_request(struct ofproto *p, struct ofconn *ofconn, classifier_for_each_match(&p->cls, &target, table_id_to_include(fsr->table_id), flow_stats_cb, &cbdata); - queue_tx(cbdata.msg, ofconn); + queue_tx(cbdata.msg, ofconn, ofconn->reply_counter); return 0; } @@ -2442,7 +2461,7 @@ handle_aggregate_stats_request(struct ofproto *p, struct ofconn *ofconn, reply->flow_count = htonl(cbdata.n_flows); reply->packet_count = htonll(cbdata.packet_count); reply->byte_count = htonll(cbdata.byte_count); - queue_tx(msg, ofconn); + queue_tx(msg, ofconn, ofconn->reply_counter); return 0; } @@ -2702,7 +2721,7 @@ send_capability_reply(struct ofproto *p, struct ofconn *ofconn, uint32_t xid) ofpbuf_put(b, capabilities, strlen(capabilities)); - queue_tx(b, ofconn); + queue_tx(b, ofconn, ofconn->reply_counter); } static int @@ -2970,11 +2989,17 @@ send_flow_exp(struct ofproto *p, struct rule *rule, struct ofconn *prev; struct ofpbuf *buf; + /* We limit the maximum number of queued flow expirations it by accounting + * them under the counter for replies. That works because preventing + * OpenFlow requests from being processed also prevents new flows from + * being added (and expiring). (It also prevents processing OpenFlow + * requests that would not add new flows, so it is imperfect.) */ + prev = NULL; LIST_FOR_EACH (ofconn, struct ofconn, node, &p->all_conns) { if (ofconn->send_flow_exp && rconn_is_connected(ofconn->rconn)) { if (prev) { - queue_tx(ofpbuf_clone(buf), prev); + queue_tx(ofpbuf_clone(buf), prev, ofconn->reply_counter); } else { buf = compose_flow_exp(rule, now, reason); } @@ -2982,7 +3007,7 @@ send_flow_exp(struct ofproto *p, struct rule *rule, } } if (prev) { - queue_tx(buf, prev); + queue_tx(buf, prev, ofconn->reply_counter); } } @@ -3095,7 +3120,8 @@ do_send_packet_in(struct ofconn *ofconn, uint32_t buffer_id, opi->in_port = htons(odp_port_to_ofp_port(msg->port)); opi->reason = msg->type == _ODPL_ACTION_NR ? OFPR_ACTION : OFPR_NO_MATCH; ofpbuf_put(buf, payload.data, MIN(send_len, payload.size)); - queue_tx(buf, ofconn); + update_openflow_length(buf); + rconn_send_with_limit(ofconn->rconn, buf, ofconn->packet_in_counter, 100); } static void