Make port status change messages reliable.
authorBen Pfaff <blp@nicira.com>
Wed, 7 Jan 2009 17:28:19 +0000 (09:28 -0800)
committerBen Pfaff <blp@nicira.com>
Fri, 23 Jan 2009 18:52:06 +0000 (10:52 -0800)
Until now, port status change messages were sent out by the datapath
implementations and simply relayed by secchan.  In the kernel
implementation, they were unreliable because they were sent out over the
multicast socket used for packet-in events: if many packet-in messages
arrived and filled up that socket's receive buffer, then any subsequent
port status change messages were dropped.

This change moves port status change detection from the datapath
implementations into secchan, making them reliable, by using a
netdev_monitor.

(An alternate implementation would have been to detect the socket
receive buffer overflow and poll the network devices.  The current
implementation was chosen because it removes code from the datapaths,
which is the direction we want to move in for the future.)

datapath/dp_notify.c
secchan/port-watcher.c
udatapath/datapath.c

index cd3163afad8937a3e02fa48985f8d6fe13e9439e..54c8840239cdc3ad8b584ff93e56b12ddc75aecd 100644 (file)
@@ -17,7 +17,6 @@ static int dp_device_event(struct notifier_block *unused, unsigned long event,
        struct net_device *dev = ptr;
        struct net_bridge_port *p = dev->br_port;
        unsigned long int flags;
-       uint32_t orig_state, orig_config;
 
 
        /* Check if monitored port */
@@ -25,25 +24,7 @@ static int dp_device_event(struct notifier_block *unused, unsigned long event,
                return NOTIFY_DONE;
 
        spin_lock_irqsave(&p->lock, flags);
-       orig_state = p->state;
-       orig_config = p->config;
-
        switch (event) {
-               case NETDEV_CHANGE:
-                       if (netif_carrier_ok(p->dev))
-                               p->state &= ~OFPPS_LINK_DOWN;
-                       else
-                               p->state |= OFPPS_LINK_DOWN;
-                       break;
-
-               case NETDEV_DOWN:
-                       p->config |= OFPPC_PORT_DOWN;
-                       break;
-
-               case NETDEV_UP:
-                       p->config &= ~OFPPC_PORT_DOWN;
-                       break;
-
                case NETDEV_UNREGISTER:
                        spin_unlock_irqrestore(&p->lock, flags);
                        mutex_lock(&dp_mutex);
@@ -54,9 +35,6 @@ static int dp_device_event(struct notifier_block *unused, unsigned long event,
        }
        spin_unlock_irqrestore(&p->lock, flags);
 
-       if ((orig_state != p->state) || (orig_config != p->config))
-               dp_send_port_status(p, OFPPR_MODIFY);
-
        return NOTIFY_DONE;
 }
 
index 97e5a9ffa4f885b23681d320a436ddb4a5aebf07..1d8d69d4096c4b10a1a66c5336f7a1c33b35e335 100644 (file)
@@ -44,6 +44,8 @@
 #include "poll-loop.h"
 #include "port-array.h"
 #include "rconn.h"
+#include "shash.h"
+#include "svec.h"
 #include "timeval.h"
 #include "vconn.h"
 #include "xtoxll.h"
@@ -74,6 +76,8 @@ struct port_watcher {
     struct port_watcher_local_cb local_cbs[4];
     int n_local_cbs;
     char local_port_name[OFP_MAX_PORT_NAME_LEN + 1];
+    struct netdev_monitor *mon;
+    struct shash port_by_name;
 };
 
 /* Returns the number of fields that differ from 'a' to 'b'. */
@@ -203,6 +207,25 @@ update_phy_port(struct port_watcher *pw, struct ofp_phy_port *opp,
     }
 }
 
+static void
+update_netdev_monitor_devices(struct port_watcher *pw)
+{
+    struct ofp_phy_port *p;
+    struct svec netdevs;
+    unsigned int port_no;
+
+    svec_init(&netdevs);
+    shash_clear(&pw->port_by_name);
+    for (p = port_array_first(&pw->ports, &port_no); p;
+         p = port_array_next(&pw->ports, &port_no)) {
+        const char *name = (const char *) p->name;
+        svec_add(&netdevs, name);
+        shash_add(&pw->port_by_name, name, p);
+    }
+    netdev_monitor_set_devices(pw->mon, netdevs.names, netdevs.n);
+    svec_destroy(&netdevs);
+}
+
 static bool
 port_watcher_local_packet_cb(struct relay *r, void *pw_)
 {
@@ -243,6 +266,8 @@ port_watcher_local_packet_cb(struct relay *r, void *pw_)
             }
         }
 
+        update_netdev_monitor_devices(pw);
+
         call_local_port_changed_callbacks(pw);
     } else if (oh->type == OFPT_PORT_STATUS
                && msg->size >= sizeof(struct ofp_port_status)) {
@@ -251,6 +276,9 @@ port_watcher_local_packet_cb(struct relay *r, void *pw_)
         if (ops->desc.port_no == htons(OFPP_LOCAL)) {
             call_local_port_changed_callbacks(pw);
         }
+        if (ops->reason == OFPPR_ADD || OFPPR_DELETE) {
+            update_netdev_monitor_devices(pw);
+        }
     }
     return false;
 }
@@ -310,10 +338,22 @@ port_watcher_remote_packet_cb(struct relay *r, void *pw_)
     return false;
 }
 
+/* Sets 'bit' in '*word' to 0 or 1 according to 'value'. */
+static void
+set_bit(uint32_t bit, bool value, uint32_t *word)
+{
+    if (value) {
+        *word |= bit;
+    } else {
+        *word &= ~bit;
+    }
+}
+
 static void
 port_watcher_periodic_cb(void *pw_)
 {
     struct port_watcher *pw = pw_;
+    const char *name;
 
     if (!pw->got_feature_reply
         && time_now() >= pw->last_feature_request + 5
@@ -323,6 +363,47 @@ port_watcher_periodic_cb(void *pw_)
         rconn_send_with_limit(pw->local_rconn, b, &pw->n_txq, 1);
         pw->last_feature_request = time_now();
     }
+
+    netdev_monitor_run(pw->mon);
+    while ((name = netdev_monitor_poll(pw->mon)) != NULL) {
+        struct ofp_phy_port *opp;
+        struct ofp_phy_port new_opp;
+        enum netdev_flags flags;
+        int retval;
+
+        opp = shash_find_data(&pw->port_by_name, name);
+        if (!opp) {
+            continue;
+        }
+
+        retval = netdev_nodev_get_flags(name, &flags);
+        if (retval) {
+            static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 5);
+            VLOG_WARN_RL(&rl, "could not get flags for %s", name);
+            continue;
+        }
+
+        new_opp = *opp;
+        set_bit(htonl(OFPPC_PORT_DOWN), flags & NETDEV_UP, &new_opp.config);
+        set_bit(htonl(OFPPS_LINK_DOWN), flags & NETDEV_CARRIER,
+                &new_opp.state);
+        if (opp->config != new_opp.config || opp->state != new_opp.state) {
+            struct ofp_port_status *ops;
+            struct ofpbuf *b;
+
+            /* Notify other secchan modules. */
+            update_phy_port(pw, &new_opp, OFPPR_MODIFY);
+            if (new_opp.port_no == htons(OFPP_LOCAL)) {
+                call_local_port_changed_callbacks(pw);
+            }
+
+            /* Notify the controller that the flags changed. */
+            ops = make_openflow(sizeof *ops, OFPT_PORT_STATUS, &b);
+            ops->reason = OFPPR_MODIFY;
+            ops->desc = new_opp;
+            rconn_send(pw->remote_rconn, b, NULL);
+        }
+    }
 }
 
 static void
@@ -336,6 +417,7 @@ port_watcher_wait_cb(void *pw_)
             poll_immediate_wake();
         }
     }
+    netdev_monitor_wait(pw->mon);
 }
 
 static void
@@ -520,6 +602,7 @@ port_watcher_start(struct secchan *secchan,
                    struct port_watcher **pwp)
 {
     struct port_watcher *pw;
+    int retval;
 
     pw = *pwp = xcalloc(1, sizeof *pw);
     pw->local_rconn = local_rconn;
@@ -527,6 +610,11 @@ port_watcher_start(struct secchan *secchan,
     pw->last_feature_request = TIME_MIN;
     port_array_init(&pw->ports);
     pw->local_port_name[0] = '\0';
+    retval = netdev_monitor_create(&pw->mon);
+    if (retval) {
+        ofp_fatal(retval, "failed to start network device monitoring");
+    }
+    shash_init(&pw->port_by_name);
     port_watcher_register_callback(pw, log_port_status, NULL);
     add_hook(secchan, &port_watcher_hook_class, pw);
 }
index 61b5c1fdee47c850e7f2083f4d82110e256107dd..06c31164dfe23bd2cc040ac4b2e1361f308345cb 100644 (file)
@@ -113,7 +113,6 @@ static void remote_wait(struct remote *);
 static void remote_destroy(struct remote *);
 
 static void update_port_flags(struct datapath *, const struct ofp_port_mod *);
-static int update_port_status(struct sw_port *p);
 static void send_port_status(struct sw_port *p, uint8_t status);
 
 /* Buffers are identified by a 31-bit opaque ID.  We divide the ID
@@ -297,12 +296,6 @@ dp_run(struct datapath *dp)
         struct list deleted = LIST_INITIALIZER(&deleted);
         struct sw_flow *f, *n;
 
-        LIST_FOR_EACH (p, struct sw_port, node, &dp->port_list) {
-            if (update_port_status(p)) {
-                send_port_status(p, OFPPR_MODIFY);
-            }
-        }
-
         chain_timeout(dp->chain, &deleted);
         LIST_FOR_EACH_SAFE (f, n, struct sw_flow, node, &deleted) {
             dp_send_flow_end(dp, f, f->reason);
@@ -720,41 +713,6 @@ update_port_flags(struct datapath *dp, const struct ofp_port_mod *opm)
     }
 }
 
-/* Update the port status field of the bridge port.  A non-zero return
- * value indicates some field has changed. 
- *
- * NB: Callers of this function may hold the RCU read lock, so any
- * additional checks must not sleep.
- */
-static int
-update_port_status(struct sw_port *p)
-{
-    int retval;
-    enum netdev_flags flags;
-    uint32_t orig_config = p->config;
-    uint32_t orig_state = p->state;
-
-    if (netdev_get_flags(p->netdev, &flags) < 0) {
-        VLOG_WARN_RL(&rl, "could not get netdev flags for %s", 
-                     netdev_get_name(p->netdev));
-        return 0;
-    }
-
-    if (flags & NETDEV_UP) {
-        p->config &= ~OFPPC_PORT_DOWN;
-    } else {
-        p->config |= OFPPC_PORT_DOWN;
-    }
-
-    if (flags & NETDEV_CARRIER) {
-        p->state &= ~OFPPS_LINK_DOWN;
-    } else {
-        p->state |= OFPPS_LINK_DOWN;
-    }
-
-    return ((orig_config != p->config) || (orig_state != p->state));
-}
-
 static void
 send_port_status(struct sw_port *p, uint8_t status) 
 {