secchan: Don't let queued packets exhaust memory.
authorBen Pfaff <blp@nicira.com>
Fri, 1 May 2009 20:20:07 +0000 (13:20 -0700)
committerBen Pfaff <blp@nicira.com>
Fri, 1 May 2009 20:20:34 +0000 (13:20 -0700)
The ofproto code was queuing OpenFlow messages to connections without
limiting the maximum number that could be queued at a time.  Thus, the
backlog could grow without bound and exhaust all system memory.

This commit introduces a cap on the maximum number of queued messages
in two different categories: packet-in messages and replies to OpenFlow
requests.

lib/rconn.h
secchan/ofproto.c

index 93ccec05b14a372c24e8dd77634596f96bfebb57..82aefe5232b48cc39da1ebd5b381866b9ea7336a 100644 (file)
@@ -112,4 +112,10 @@ void rconn_packet_counter_destroy(struct rconn_packet_counter *);
 void rconn_packet_counter_inc(struct rconn_packet_counter *);
 void rconn_packet_counter_dec(struct rconn_packet_counter *);
 
+static inline int
+rconn_packet_counter_read(const struct rconn_packet_counter *counter)
+{
+    return counter->n;
+}
+
 #endif /* rconn.h */
index bed9e511c9f1d93bb14a266314fd30d97f7f7197..71eaca4d7c4087f4d8591c06bb464d4afd085270 100644 (file)
@@ -179,13 +179,21 @@ struct ofconn {
     struct pktbuf *pktbuf;
     bool send_flow_exp;
     int miss_send_len;
+
+    struct rconn_packet_counter *packet_in_counter;
+
+    /* Number of OpenFlow messages queued as replies to OpenFlow requests, and
+     * the maximum number before we stop reading OpenFlow requests.  */
+#define OFCONN_REPLY_MAX 100
+    struct rconn_packet_counter *reply_counter;
 };
 
 static struct ofconn *ofconn_create(struct ofproto *, struct rconn *);
 static void ofconn_destroy(struct ofconn *, struct ofproto *);
 static void ofconn_run(struct ofconn *, struct ofproto *);
 static void ofconn_wait(struct ofconn *);
-static void queue_tx(struct ofpbuf *msg, const struct ofconn *ofconn);
+static void queue_tx(struct ofpbuf *msg, const struct ofconn *ofconn,
+                     struct rconn_packet_counter *counter);
 
 struct ofproto {
     /* Settings. */
@@ -1140,6 +1148,7 @@ static void
 send_port_status(struct ofproto *p, const struct ofport *ofport,
                  uint8_t reason)
 {
+    /* XXX Should limit the number of queued port status change messages. */
     struct ofconn *ofconn;
     LIST_FOR_EACH (ofconn, struct ofconn, node, &p->all_conns) {
         struct ofp_port_status *ops;
@@ -1149,7 +1158,7 @@ send_port_status(struct ofproto *p, const struct ofport *ofport,
         ops->reason = reason;
         ops->desc = ofport->opp;
         hton_ofp_phy_port(&ops->desc);
-        queue_tx(b, ofconn);
+        queue_tx(b, ofconn, NULL);
     }
     if (p->ofhooks->port_changed_cb) {
         p->ofhooks->port_changed_cb(reason, &ofport->opp, p->aux);
@@ -1270,6 +1279,8 @@ ofconn_create(struct ofproto *p, struct rconn *rconn)
     ofconn->pktbuf = NULL;
     ofconn->send_flow_exp = false;
     ofconn->miss_send_len = 0;
+    ofconn->packet_in_counter = rconn_packet_counter_create ();
+    ofconn->reply_counter = rconn_packet_counter_create ();
     return ofconn;
 }
 
@@ -1282,6 +1293,8 @@ ofconn_destroy(struct ofconn *ofconn, struct ofproto *p)
 
     list_remove(&ofconn->node);
     rconn_destroy(ofconn->rconn);
+    rconn_packet_counter_destroy(ofconn->packet_in_counter);
+    rconn_packet_counter_destroy(ofconn->reply_counter);
     pktbuf_destroy(ofconn->pktbuf);
     free(ofconn);
 }
@@ -1293,14 +1306,17 @@ ofconn_run(struct ofconn *ofconn, struct ofproto *p)
 
     rconn_run(ofconn->rconn);
 
-    /* Limit the number of iterations to prevent other tasks from starving. */
-    for (iteration = 0; iteration < 50; iteration++) {
-        struct ofpbuf *of_msg = rconn_recv(ofconn->rconn);
-        if (!of_msg) {
-            break;
+    if (rconn_packet_counter_read (ofconn->reply_counter) < OFCONN_REPLY_MAX) {
+        /* Limit the number of iterations to prevent other tasks from
+         * starving. */
+        for (iteration = 0; iteration < 50; iteration++) {
+            struct ofpbuf *of_msg = rconn_recv(ofconn->rconn);
+            if (!of_msg) {
+                break;
+            }
+            handle_openflow(ofconn, p, of_msg);
+            ofpbuf_delete(of_msg);
         }
-        handle_openflow(ofconn, p, of_msg);
-        ofpbuf_delete(of_msg);
     }
 
     if (ofconn != p->controller && !rconn_is_alive(ofconn->rconn)) {
@@ -1312,7 +1328,9 @@ static void
 ofconn_wait(struct ofconn *ofconn)
 {
     rconn_run_wait(ofconn->rconn);
-    rconn_recv_wait(ofconn->rconn);
+    if (rconn_packet_counter_read (ofconn->reply_counter) < OFCONN_REPLY_MAX) {
+        rconn_recv_wait(ofconn->rconn);
+    }
 }
 \f
 /* Caller is responsible for initializing the 'cr' member of the returned
@@ -1626,10 +1644,11 @@ rule_post_uninstall(struct ofproto *ofproto, struct rule *rule)
 }
 \f
 static void
-queue_tx(struct ofpbuf *msg, const struct ofconn *ofconn)
+queue_tx(struct ofpbuf *msg, const struct ofconn *ofconn,
+         struct rconn_packet_counter *counter)
 {
     update_openflow_length(msg);
-    if (rconn_send(ofconn->rconn, msg, NULL)) { /* XXX */
+    if (rconn_send(ofconn->rconn, msg, counter)) {
         ofpbuf_delete(msg);
     }
 }
@@ -1652,7 +1671,7 @@ send_error(const struct ofconn *ofconn, const struct ofp_header *oh,
     oem->type = htons((unsigned int) error >> 16);
     oem->code = htons(error & 0xffff);
     memcpy(oem->data, data, len);
-    queue_tx(buf, ofconn);
+    queue_tx(buf, ofconn, ofconn->reply_counter);
 }
 
 static void
@@ -1679,7 +1698,7 @@ static int
 handle_echo_request(struct ofconn *ofconn, struct ofp_header *oh)
 {
     struct ofp_header *rq = oh;
-    queue_tx(make_echo_reply(rq), ofconn);
+    queue_tx(make_echo_reply(rq), ofconn, ofconn->reply_counter);
     return 0;
 }
 
@@ -1713,7 +1732,7 @@ handle_features_request(struct ofproto *p, struct ofconn *ofconn,
         hton_ofp_phy_port(ofpbuf_put(buf, &port->opp, sizeof port->opp));
     }
 
-    queue_tx(buf, ofconn);
+    queue_tx(buf, ofconn, ofconn->reply_counter);
     return 0;
 }
 
@@ -1737,7 +1756,7 @@ handle_get_config_request(struct ofproto *p, struct ofconn *ofconn,
     osc = make_openflow_xid(sizeof *osc, OFPT_GET_CONFIG_REPLY, oh->xid, &buf);
     osc->flags = htons(flags);
     osc->miss_send_len = htons(ofconn->miss_send_len);
-    queue_tx(buf, ofconn);
+    queue_tx(buf, ofconn, ofconn->reply_counter);
 
     return 0;
 }
@@ -2156,7 +2175,7 @@ append_stats_reply(size_t nbytes, struct ofconn *ofconn, struct ofpbuf **msgp)
         struct ofp_stats_reply *reply = msg->data;
         reply->flags = htons(OFPSF_REPLY_MORE);
         *msgp = make_stats_reply(reply->header.xid, reply->type, nbytes);
-        queue_tx(msg, ofconn);
+        queue_tx(msg, ofconn, ofconn->reply_counter);
     }
     return ofpbuf_put_uninit(*msgp, nbytes);
 }
@@ -2174,7 +2193,7 @@ handle_desc_stats_request(struct ofproto *p, struct ofconn *ofconn,
     strncpy(ods->hw_desc, p->hardware, sizeof ods->hw_desc);
     strncpy(ods->sw_desc, p->software, sizeof ods->sw_desc);
     strncpy(ods->serial_num, p->serial, sizeof ods->serial_num);
-    queue_tx(msg, ofconn);
+    queue_tx(msg, ofconn, ofconn->reply_counter);
 
     return 0;
 }
@@ -2231,7 +2250,7 @@ handle_table_stats_request(struct ofproto *p, struct ofconn *ofconn,
     ots->lookup_count = htonll(0);              /* XXX */
     ots->matched_count = htonll(0);             /* XXX */
 
-    queue_tx(msg, ofconn);
+    queue_tx(msg, ofconn, ofconn->reply_counter);
     return 0;
 }
 
@@ -2270,7 +2289,7 @@ handle_port_stats_request(struct ofproto *p, struct ofconn *ofconn,
         ops->collisions = htonll(stats.collisions);
     }
 
-    queue_tx(msg, ofconn);
+    queue_tx(msg, ofconn, ofconn->reply_counter);
     return 0;
 }
 
@@ -2381,7 +2400,7 @@ handle_flow_stats_request(struct ofproto *p, struct ofconn *ofconn,
     classifier_for_each_match(&p->cls, &target,
                               table_id_to_include(fsr->table_id),
                               flow_stats_cb, &cbdata);
-    queue_tx(cbdata.msg, ofconn);
+    queue_tx(cbdata.msg, ofconn, ofconn->reply_counter);
     return 0;
 }
 
@@ -2442,7 +2461,7 @@ handle_aggregate_stats_request(struct ofproto *p, struct ofconn *ofconn,
     reply->flow_count = htonl(cbdata.n_flows);
     reply->packet_count = htonll(cbdata.packet_count);
     reply->byte_count = htonll(cbdata.byte_count);
-    queue_tx(msg, ofconn);
+    queue_tx(msg, ofconn, ofconn->reply_counter);
     return 0;
 }
 
@@ -2702,7 +2721,7 @@ send_capability_reply(struct ofproto *p, struct ofconn *ofconn, uint32_t xid)
 
     ofpbuf_put(b, capabilities, strlen(capabilities));
 
-    queue_tx(b, ofconn);
+    queue_tx(b, ofconn, ofconn->reply_counter);
 }
 
 static int
@@ -2970,11 +2989,17 @@ send_flow_exp(struct ofproto *p, struct rule *rule,
     struct ofconn *prev;
     struct ofpbuf *buf;
 
+    /* We limit the maximum number of queued flow expirations it by accounting
+     * them under the counter for replies.  That works because preventing
+     * OpenFlow requests from being processed also prevents new flows from
+     * being added (and expiring).  (It also prevents processing OpenFlow
+     * requests that would not add new flows, so it is imperfect.) */
+
     prev = NULL;
     LIST_FOR_EACH (ofconn, struct ofconn, node, &p->all_conns) {
         if (ofconn->send_flow_exp && rconn_is_connected(ofconn->rconn)) {
             if (prev) {
-                queue_tx(ofpbuf_clone(buf), prev);
+                queue_tx(ofpbuf_clone(buf), prev, ofconn->reply_counter);
             } else {
                 buf = compose_flow_exp(rule, now, reason);
             }
@@ -2982,7 +3007,7 @@ send_flow_exp(struct ofproto *p, struct rule *rule,
         }
     }
     if (prev) {
-        queue_tx(buf, prev);
+        queue_tx(buf, prev, ofconn->reply_counter);
     }
 }
 
@@ -3095,7 +3120,8 @@ do_send_packet_in(struct ofconn *ofconn, uint32_t buffer_id,
     opi->in_port = htons(odp_port_to_ofp_port(msg->port));
     opi->reason = msg->type == _ODPL_ACTION_NR ? OFPR_ACTION : OFPR_NO_MATCH;
     ofpbuf_put(buf, payload.data, MIN(send_len, payload.size));
-    queue_tx(buf, ofconn);
+    update_openflow_length(buf);
+    rconn_send_with_limit(ofconn->rconn, buf, ofconn->packet_in_counter, 100);
 }
 
 static void