ofproto: New feature to notify controllers of flow table changes.
authorBen Pfaff <blp@nicira.com>
Thu, 12 Jul 2012 21:18:05 +0000 (14:18 -0700)
committerBen Pfaff <blp@nicira.com>
Thu, 12 Jul 2012 21:18:05 +0000 (14:18 -0700)
OpenFlow switching monitoring and controller coordination can be made more
efficient if the switch can notify a controller of flow table changes as
they occur, rather than periodically polling for changes.  This commit
implements such a feature.

Feature #6633.
CC: Natasha Gude <natasha@nicira.com>
Signed-off-by: Ben Pfaff <blp@nicira.com>
17 files changed:
NEWS
include/openflow/nicira-ext.h
lib/learning-switch.c
lib/ofp-errors.h
lib/ofp-parse.c
lib/ofp-parse.h
lib/ofp-print.c
lib/ofp-util.c
lib/ofp-util.h
ofproto/connmgr.c
ofproto/connmgr.h
ofproto/ofproto-provider.h
ofproto/ofproto.c
tests/ofp-print.at
tests/ofproto.at
utilities/ovs-ofctl.8.in
utilities/ovs-ofctl.c

diff --git a/NEWS b/NEWS
index f5190c0f5b25bf3badab5ec25fabc52ca5584df7..5b9db40d6aff936453bb9b124b2a9f8cd3a4afea 100644 (file)
--- a/NEWS
+++ b/NEWS
@@ -22,6 +22,8 @@ post-v1.7.0
         queue does not exist, or for requests for a specific queue on all
         ports, if the specified queue does not exist on any port.  (Previous
         versions generally reported an empty set of results.)
+      - New "flow monitor" feature to allow controllers to be notified of
+        flow table changes as they happen.
     - Additional protocols are not mirrored and dropped when forward-bpdu is
       false.  For a full list, see the ovs-vswitchd.conf.db man page.
     - Open vSwitch now sends RARP packets in situations where it previously
index 82deeb0f72ae22fbaaac5ac540c6b931ad96f511..1104dbf8396e60da6e00ea940723b5647e7737a6 100644 (file)
@@ -117,6 +117,11 @@ enum nicira_type {
 
     NXT_SET_ASYNC_CONFIG = 19,  /* struct nx_async_config. */
     NXT_SET_CONTROLLER_ID = 20, /* struct nx_controller_id. */
+
+    /* Flow table monitoring (see also NXST_FLOW_MONITOR). */
+    NXT_FLOW_MONITOR_CANCEL = 21,  /* struct nx_flow_monitor_cancel. */
+    NXT_FLOW_MONITOR_PAUSED = 22,  /* struct nicira_header. */
+    NXT_FLOW_MONITOR_RESUMED = 23, /* struct nicira_header. */
 };
 
 /* Header for Nicira vendor stats request and reply messages. */
@@ -131,7 +136,10 @@ OFP_ASSERT(sizeof(struct nicira_stats_msg) == 24);
 enum nicira_stats_type {
     /* Flexible flow specification (aka NXM = Nicira Extended Match). */
     NXST_FLOW,                  /* Analogous to OFPST_FLOW. */
-    NXST_AGGREGATE              /* Analogous to OFPST_AGGREGATE. */
+    NXST_AGGREGATE,             /* Analogous to OFPST_AGGREGATE. */
+
+    /* Flow table monitoring. */
+    NXST_FLOW_MONITOR,
 };
 
 /* Fields to use when hashing flows. */
@@ -1976,5 +1984,240 @@ struct nx_action_controller {
     uint8_t zero;                   /* Must be zero. */
 };
 OFP_ASSERT(sizeof(struct nx_action_controller) == 16);
+\f
+/* Flow Table Monitoring
+ * =====================
+ *
+ * NXST_FLOW_MONITOR allows a controller to keep track of changes to OpenFlow
+ * flow table(s) or subsets of them, with the following workflow:
+ *
+ * 1. The controller sends an NXST_FLOW_MONITOR request to begin monitoring
+ *    flows.  The 'id' in the request must be unique among all monitors that
+ *    the controller has started and not yet canceled on this OpenFlow
+ *    connection.
+ *
+ * 2. The switch responds with an NXST_FLOW_MONITOR reply.  If the request's
+ *    'flags' included NXFMF_INITIAL, the reply includes all the flows that
+ *    matched the request at the time of the request (with event NXFME_ADDED).
+ *    If 'flags' did not include NXFMF_INITIAL, the reply is empty.
+ *
+ *    The reply uses the xid of the request (as do all replies to OpenFlow
+ *    requests).
+ *
+ * 3. Whenever a change to a flow table entry matches some outstanding monitor
+ *    request's criteria and flags, the switch sends a notification to the
+ *    controller as an additional NXST_FLOW_MONITOR reply with xid 0.
+ *
+ *    When multiple outstanding monitors match a single change, only a single
+ *    notification is sent.  This merged notification includes the information
+ *    requested in any of the individual monitors.  That is, if any of the
+ *    matching monitors requests actions (NXFMF_ACTIONS), the notification
+ *    includes actions, and if any of the monitors request full changes for the
+ *    controller's own changes (NXFMF_OWN), the controller's own changes will
+ *    be included in full.
+ *
+ * 4. The controller may cancel a monitor with NXT_FLOW_MONITOR_CANCEL.  No
+ *    further notifications will be sent on the basis of the canceled monitor
+ *    afterward.
+ *
+ *
+ * Buffer Management
+ * =================
+ *
+ * OpenFlow messages for flow monitor notifications can overflow the buffer
+ * space available to the switch, either temporarily (e.g. due to network
+ * conditions slowing OpenFlow traffic) or more permanently (e.g. the sustained
+ * rate of flow table change exceeds the network bandwidth between switch and
+ * controller).
+ *
+ * When Open vSwitch's notification buffer space reaches a limiting threshold,
+ * OVS reacts as follows:
+ *
+ * 1. OVS sends an NXT_FLOW_MONITOR_PAUSED message to the controller, following
+ *    all the already queued notifications.  After it receives this message,
+ *    the controller knows that its view of the flow table, as represented by
+ *    flow monitor notifications, is incomplete.
+ *
+ * 2. As long as the notification buffer is not empty:
+ *
+ *        - NXMFE_ADD and NXFME_MODIFIED notifications will not be sent.
+ *
+ *        - NXFME_DELETED notifications will still be sent, but only for flows
+ *          that existed before OVS sent NXT_FLOW_MONITOR_PAUSED.
+ *
+ *        - NXFME_ABBREV notifications will not be sent.  They are treated as
+ *          the expanded version (and therefore only the NXFME_DELETED
+ *          components, if any, are sent).
+ *
+ * 3. When the notification buffer empties, OVS sends NXFME_ADD notifications
+ *    for flows added since the buffer reached its limit and NXFME_MODIFIED
+ *    notifications for flows that existed before the limit was reached and
+ *    changed after the limit was reached.
+ *
+ * 4. OVS sends an NXT_FLOW_MONITOR_RESUMED message to the controller.  After
+ *    it receives this message, the controller knows that its view of the flow
+ *    table, as represented by flow monitor notifications, is again complete.
+ *
+ * This allows the maximum buffer space requirement for notifications to be
+ * bounded by the limit plus the maximum number of supported flows.
+ *
+ *
+ * "Flow Removed" messages
+ * =======================
+ *
+ * The flow monitor mechanism is independent of OFPT_FLOW_REMOVED and
+ * NXT_FLOW_REMOVED.  Flow monitor updates for deletion are sent if
+ * NXFMF_DELETE is set on a monitor, regardless of whether the
+ * OFPFF_SEND_FLOW_REM flag was set when the flow was added. */
+
+/* NXST_FLOW_MONITOR request.
+ *
+ * The NXST_FLOW_MONITOR request's body consists of an array of zero or more
+ * instances of this structure.  The request arranges to monitor the flows
+ * that match the specified criteria, which are interpreted in the same way as
+ * for NXST_FLOW.
+ *
+ * 'id' identifies a particular monitor for the purpose of allowing it to be
+ * canceled later with NXT_FLOW_MONITOR_CANCEL.  'id' must be unique among
+ * existing monitors that have not already been canceled.
+ *
+ * The reply includes the initial flow matches for monitors that have the
+ * NXFMF_INITIAL flag set.  No single flow will be included in the reply more
+ * than once, even if more than one requested monitor matches that flow.  The
+ * reply will be empty if none of the monitors has NXFMF_INITIAL set or if none
+ * of the monitors initially matches any flows.
+ *
+ * For NXFMF_ADD, an event will be reported if 'out_port' matches against the
+ * actions of the flow being added or, for a flow that is replacing an existing
+ * flow, if 'out_port' matches against the actions of the flow being replaced.
+ * For NXFMF_DELETE, 'out_port' matches against the actions of a flow being
+ * deleted.  For NXFMF_MODIFY, an event will be reported if 'out_port' matches
+ * either the old or the new actions. */
+struct nx_flow_monitor_request {
+    ovs_be32 id;                /* Controller-assigned ID for this monitor. */
+    ovs_be16 flags;             /* NXFMF_*. */
+    ovs_be16 out_port;          /* Required output port, if not OFPP_NONE. */
+    ovs_be16 match_len;         /* Length of nx_match. */
+    uint8_t table_id;           /* One table's ID or 0xff for all tables. */
+    uint8_t zeros[5];           /* Align to 64 bits (must be zero). */
+    /* Followed by:
+     *   - Exactly match_len (possibly 0) bytes containing the nx_match, then
+     *   - Exactly (match_len + 7)/8*8 - match_len (between 0 and 7) bytes of
+     *     all-zero bytes. */
+};
+OFP_ASSERT(sizeof(struct nx_flow_monitor_request) == 16);
+
+/* 'flags' bits in struct nx_flow_monitor_request. */
+enum nx_flow_monitor_flags {
+    /* When to send updates. */
+    NXFMF_INITIAL = 1 << 0,     /* Initially matching flows. */
+    NXFMF_ADD = 1 << 1,         /* New matching flows as they are added. */
+    NXFMF_DELETE = 1 << 2,      /* Old matching flows as they are removed. */
+    NXFMF_MODIFY = 1 << 3,      /* Matching flows as they are changed. */
+
+    /* What to include in updates. */
+    NXFMF_ACTIONS = 1 << 4,     /* If set, actions are included. */
+    NXFMF_OWN = 1 << 5,         /* If set, include own changes in full. */
+};
+
+/* NXST_FLOW_MONITOR reply header.
+ *
+ * The body of an NXST_FLOW_MONITOR reply is an array of variable-length
+ * structures, each of which begins with this header.  The 'length' member may
+ * be used to traverse the array, and the 'event' member may be used to
+ * determine the particular structure.
+ *
+ * Every instance is a multiple of 8 bytes long. */
+struct nx_flow_update_header {
+    ovs_be16 length;            /* Length of this entry. */
+    ovs_be16 event;             /* One of NXFME_*. */
+    /* ...other data depending on 'event'... */
+};
+OFP_ASSERT(sizeof(struct nx_flow_update_header) == 4);
+
+/* 'event' values in struct nx_flow_update_header. */
+enum nx_flow_update_event {
+    /* struct nx_flow_update_full. */
+    NXFME_ADDED = 0,            /* Flow was added. */
+    NXFME_DELETED = 1,          /* Flow was deleted. */
+    NXFME_MODIFIED = 2,         /* Flow (generally its actions) was changed. */
+
+    /* struct nx_flow_update_abbrev. */
+    NXFME_ABBREV = 3,           /* Abbreviated reply. */
+};
+
+/* NXST_FLOW_MONITOR reply for NXFME_ADDED, NXFME_DELETED, and
+ * NXFME_MODIFIED. */
+struct nx_flow_update_full {
+    ovs_be16 length;            /* Length is 24. */
+    ovs_be16 event;             /* One of NXFME_*. */
+    ovs_be16 reason;            /* OFPRR_* for NXFME_DELETED, else zero. */
+    ovs_be16 priority;          /* Priority of the entry. */
+    ovs_be16 idle_timeout;      /* Number of seconds idle before expiration. */
+    ovs_be16 hard_timeout;      /* Number of seconds before expiration. */
+    ovs_be16 match_len;         /* Length of nx_match. */
+    uint8_t table_id;           /* ID of flow's table. */
+    uint8_t pad;                /* Reserved, currently zeroed. */
+    ovs_be64 cookie;            /* Opaque controller-issued identifier. */
+    /* Followed by:
+     *   - Exactly match_len (possibly 0) bytes containing the nx_match, then
+     *   - Exactly (match_len + 7)/8*8 - match_len (between 0 and 7) bytes of
+     *     all-zero bytes, then
+     *   - Actions to fill out the remainder 'length' bytes (always a multiple
+     *     of 8).  If NXFMF_ACTIONS was not specified, or 'event' is
+     *     NXFME_DELETED, no actions are included.
+     */
+};
+OFP_ASSERT(sizeof(struct nx_flow_update_full) == 24);
+
+/* NXST_FLOW_MONITOR reply for NXFME_ABBREV.
+ *
+ * When the controller does not specify NXFMF_OWN in a monitor request, any
+ * flow tables changes due to the controller's own requests (on the same
+ * OpenFlow channel) will be abbreviated, when possible, to this form, which
+ * simply specifies the 'xid' of the OpenFlow request (e.g. an OFPT_FLOW_MOD or
+ * NXT_FLOW_MOD) that caused the change.
+ *
+ * Some changes cannot be abbreviated and will be sent in full:
+ *
+ *   - Changes that only partially succeed.  This can happen if, for example,
+ *     a flow_mod with type OFPFC_MODIFY affects multiple flows, but only some
+ *     of those modifications succeed (e.g. due to hardware limitations).
+ *
+ *     This cannot occur with the current implementation of the Open vSwitch
+ *     software datapath.  It could happen with other datapath implementations.
+ *
+ *   - Changes that race with conflicting changes made by other controllers or
+ *     other flow_mods (not separated by barriers) by the same controller.
+ *
+ *     This cannot occur with the current Open vSwitch implementation
+ *     (regardless of datapath) because Open vSwitch internally serializes
+ *     potentially conflicting changes.
+ *
+ * A flow_mod that does not change the flow table will not trigger any
+ * notification, even an abbreviated one.  For example, a "modify" or "delete"
+ * flow_mod that does not match any flows will not trigger a notification.
+ * Whether an "add" or "modify" that specifies all the same parameters that a
+ * flow already has triggers a notification is unspecified and subject to
+ * change in future versions of Open vSwitch.
+ *
+ * OVS will always send the notifications for a given flow table change before
+ * the reply to a OFPT_BARRIER_REQUEST request that precedes the flow table
+ * change.  Thus, if the controller does not receive an abbreviated
+ * notification for a flow_mod before the next OFPT_BARRIER_REPLY, it will
+ * never receive one. */
+struct nx_flow_update_abbrev {
+    ovs_be16 length;            /* Length is 8. */
+    ovs_be16 event;             /* NXFME_ABBREV. */
+    ovs_be32 xid;               /* Controller-specified xid from flow_mod. */
+};
+OFP_ASSERT(sizeof(struct nx_flow_update_abbrev) == 8);
+
+/* Used by a controller to cancel an outstanding monitor. */
+struct nx_flow_monitor_cancel {
+    struct nicira_header nxh;   /* Type NXT_FLOW_MONITOR_CANCEL. */
+    ovs_be32 id;                /* 'id' from nx_flow_monitor_request. */
+};
+OFP_ASSERT(sizeof(struct nx_flow_monitor_cancel) == 20);
 
 #endif /* openflow/nicira-ext.h */
index cb0e49bc63b83a80056d7231268c7fbad1f2d77d..b41bea0d21a9bd425782b05df9da0ed606251a47 100644 (file)
@@ -295,12 +295,17 @@ lswitch_process_packet(struct lswitch *sw, struct rconn *rconn,
     case OFPUTIL_NXT_FLOW_MOD:
     case OFPUTIL_NXT_FLOW_REMOVED:
     case OFPUTIL_NXT_FLOW_AGE:
+    case OFPUTIL_NXT_FLOW_MONITOR_CANCEL:
+    case OFPUTIL_NXT_FLOW_MONITOR_PAUSED:
+    case OFPUTIL_NXT_FLOW_MONITOR_RESUMED:
     case OFPUTIL_NXT_SET_ASYNC_CONFIG:
     case OFPUTIL_NXT_SET_CONTROLLER_ID:
     case OFPUTIL_NXST_FLOW_REQUEST:
     case OFPUTIL_NXST_AGGREGATE_REQUEST:
+    case OFPUTIL_NXST_FLOW_MONITOR_REQUEST:
     case OFPUTIL_NXST_FLOW_REPLY:
     case OFPUTIL_NXST_AGGREGATE_REPLY:
+    case OFPUTIL_NXST_FLOW_MONITOR_REPLY:
     default:
         if (VLOG_IS_DBG_ENABLED()) {
             char *s = ofp_to_string(msg->data, msg->size, 2);
index 61cef41859a7443e7885520beb0d9ebf52dee18b..dddf8d0469cb9a5fcac3c930e7a6bf2e5d186f94 100644 (file)
@@ -129,6 +129,20 @@ enum ofperr {
      * valid. */
     OFPERR_NXBRC_BAD_REASON,
 
+    /* NX1.0+(1,517).  The 'id' in an NXST_FLOW_MONITOR request is the same as
+     * an existing monitor id (or two monitors in the same NXST_FLOW_MONITOR
+     * request have the same 'id').  */
+    OFPERR_NXBRC_FM_DUPLICATE_ID,
+
+    /* NX1.0+(1,518).  The 'flags' in an NXST_FLOW_MONITOR request either does
+     * not specify at least one of the NXFMF_ADD, NXFMF_DELETE, or NXFMF_MODIFY
+     * flags, or specifies a flag bit that is not defined. */
+    OFPERR_NXBRC_FM_BAD_FLAGS,
+
+    /* NX1.0+(1,519).  The 'id' in an NXT_FLOW_MONITOR_CANCEL request is not
+     * the id of any existing monitor. */
+    OFPERR_NXBRC_FM_BAD_ID,
+
 /* ## ---------------- ## */
 /* ## OFPET_BAD_ACTION ## */
 /* ## ---------------- ## */
@@ -469,7 +483,6 @@ enum ofperr {
     /* NX1.0(1,513), NX1.1(1,513), OF1.2+(11,2).  Invalid role. */
     OFPERR_OFPRRFC_BAD_ROLE,
 
-
 /* ## ------------------ ## */
 /* ## OFPET_EXPERIMENTER ## */
 /* ## ------------------ ## */
index 922e2968e4d84e62b4f8bbcf9e03ffc9366f0f28..32d38368908bed37a6f271e639ba7d317e4788f1 100644 (file)
@@ -700,6 +700,68 @@ parse_ofp_str(struct ofputil_flow_mod *fm, int command, const char *str_,
     free(string);
 }
 
+/* Convert 'str_' (as described in the documentation for the "monitor" command
+ * in the ovs-ofctl man page) into 'fmr'. */
+void
+parse_flow_monitor_request(struct ofputil_flow_monitor_request *fmr,
+                           const char *str_)
+{
+    static uint32_t id;
+
+    char *string = xstrdup(str_);
+    char *save_ptr = NULL;
+    char *name;
+
+    fmr->id = id++;
+    fmr->flags = (NXFMF_INITIAL | NXFMF_ADD | NXFMF_DELETE | NXFMF_MODIFY
+                  | NXFMF_OWN | NXFMF_ACTIONS);
+    fmr->out_port = OFPP_NONE;
+    fmr->table_id = 0xff;
+    cls_rule_init_catchall(&fmr->match, 0);
+
+    for (name = strtok_r(string, "=, \t\r\n", &save_ptr); name;
+         name = strtok_r(NULL, "=, \t\r\n", &save_ptr)) {
+        const struct protocol *p;
+
+        if (!strcmp(name, "!initial")) {
+            fmr->flags &= ~NXFMF_INITIAL;
+        } else if (!strcmp(name, "!add")) {
+            fmr->flags &= ~NXFMF_ADD;
+        } else if (!strcmp(name, "!delete")) {
+            fmr->flags &= ~NXFMF_DELETE;
+        } else if (!strcmp(name, "!modify")) {
+            fmr->flags &= ~NXFMF_MODIFY;
+        } else if (!strcmp(name, "!actions")) {
+            fmr->flags &= ~NXFMF_ACTIONS;
+        } else if (!strcmp(name, "!own")) {
+            fmr->flags &= ~NXFMF_OWN;
+        } else if (parse_protocol(name, &p)) {
+            cls_rule_set_dl_type(&fmr->match, htons(p->dl_type));
+            if (p->nw_proto) {
+                cls_rule_set_nw_proto(&fmr->match, p->nw_proto);
+            }
+        } else {
+            char *value;
+
+            value = strtok_r(NULL, ", \t\r\n", &save_ptr);
+            if (!value) {
+                ovs_fatal(0, "%s: field %s missing value", str_, name);
+            }
+
+            if (!strcmp(name, "table")) {
+                fmr->table_id = str_to_table_id(value);
+            } else if (!strcmp(name, "out_port")) {
+                fmr->out_port = atoi(value);
+            } else if (mf_from_name(name)) {
+                parse_field(mf_from_name(name), value, &fmr->match);
+            } else {
+                ovs_fatal(0, "%s: unknown keyword %s", str_, name);
+            }
+        }
+    }
+    free(string);
+}
+
 /* Parses 's' as a set of OpenFlow actions and appends the actions to
  * 'actions'.
  *
index e930388528838e94f71cdb02f963c10a77e19787..d2d3c3cffcafd986ebef682807a6a045f747fbd0 100644 (file)
@@ -26,6 +26,7 @@
 struct flow;
 struct ofpbuf;
 struct ofputil_flow_mod;
+struct ofputil_flow_monitor_request;
 struct ofputil_flow_stats_request;
 
 void parse_ofp_str(struct ofputil_flow_mod *, int command, const char *str_,
@@ -44,4 +45,7 @@ void parse_ofpacts(const char *, struct ofpbuf *ofpacts);
 
 char *parse_ofp_exact_flow(struct flow *, const char *);
 
+void parse_flow_monitor_request(struct ofputil_flow_monitor_request *,
+                                const char *);
+
 #endif /* ofp-parse.h */
index 03de5f1e58a6f2840f06e22ab3c8a890fed11e9b..48b8daa964570c52e3a0c3e894c1bdb9e727def7 100644 (file)
@@ -1368,6 +1368,137 @@ ofp_print_nxt_set_controller_id(struct ds *string,
     ds_put_format(string, " id=%"PRIu16, ntohs(nci->controller_id));
 }
 
+static void
+ofp_print_nxt_flow_monitor_cancel(struct ds *string,
+                                  const struct ofp_header *oh)
+{
+    ds_put_format(string, " id=%"PRIu32,
+                  ofputil_decode_flow_monitor_cancel(oh));
+}
+
+static const char *
+nx_flow_monitor_flags_to_name(uint32_t bit)
+{
+    enum nx_flow_monitor_flags fmf = bit;
+
+    switch (fmf) {
+    case NXFMF_INITIAL: return "initial";
+    case NXFMF_ADD: return "add";
+    case NXFMF_DELETE: return "delete";
+    case NXFMF_MODIFY: return "modify";
+    case NXFMF_ACTIONS: return "actions";
+    case NXFMF_OWN: return "own";
+    }
+
+    return NULL;
+}
+
+static void
+ofp_print_nxst_flow_monitor_request(struct ds *string,
+                                    const struct ofp_header *oh)
+{
+    struct ofpbuf b;
+
+    ofpbuf_use_const(&b, oh, ntohs(oh->length));
+    for (;;) {
+        struct ofputil_flow_monitor_request request;
+        int retval;
+
+        retval = ofputil_decode_flow_monitor_request(&request, &b);
+        if (retval) {
+            if (retval != EOF) {
+                ofp_print_error(string, retval);
+            }
+            return;
+        }
+
+        ds_put_format(string, "\n id=%"PRIu32" flags=", request.id);
+        ofp_print_bit_names(string, request.flags,
+                            nx_flow_monitor_flags_to_name, ',');
+
+        if (request.out_port != OFPP_NONE) {
+            ds_put_cstr(string, " out_port=");
+            ofputil_format_port(request.out_port, string);
+        }
+
+        if (request.table_id != 0xff) {
+            ds_put_format(string, " table=%"PRIu8, request.table_id);
+        }
+
+        ds_put_char(string, ' ');
+        cls_rule_format(&request.match, string);
+        ds_chomp(string, ' ');
+    }
+}
+
+static void
+ofp_print_nxst_flow_monitor_reply(struct ds *string,
+                                  const struct ofp_header *oh)
+{
+    uint64_t ofpacts_stub[1024 / 8];
+    struct ofpbuf ofpacts;
+    struct ofpbuf b;
+
+    ofpbuf_use_const(&b, oh, ntohs(oh->length));
+    ofpbuf_use_stub(&ofpacts, ofpacts_stub, sizeof ofpacts_stub);
+    for (;;) {
+        struct ofputil_flow_update update;
+        struct cls_rule match;
+        int retval;
+
+        update.match = &match;
+        retval = ofputil_decode_flow_update(&update, &b, &ofpacts);
+        if (retval) {
+            if (retval != EOF) {
+                ofp_print_error(string, retval);
+            }
+            ofpbuf_uninit(&ofpacts);
+            return;
+        }
+
+        ds_put_cstr(string, "\n event=");
+        switch (update.event) {
+        case NXFME_ADDED:
+            ds_put_cstr(string, "ADDED");
+            break;
+
+        case NXFME_DELETED:
+            ds_put_format(string, "DELETED reason=%s",
+                          ofp_flow_removed_reason_to_string(update.reason));
+            break;
+
+        case NXFME_MODIFIED:
+            ds_put_cstr(string, "MODIFIED");
+            break;
+
+        case NXFME_ABBREV:
+            ds_put_format(string, "ABBREV xid=0x%"PRIx32, ntohl(update.xid));
+            continue;
+        }
+
+        ds_put_format(string, " table=%"PRIu8, update.table_id);
+        if (update.idle_timeout != OFP_FLOW_PERMANENT) {
+            ds_put_format(string, " idle_timeout=%"PRIu16,
+                          update.idle_timeout);
+        }
+        if (update.hard_timeout != OFP_FLOW_PERMANENT) {
+            ds_put_format(string, " hard_timeout=%"PRIu16,
+                          update.hard_timeout);
+        }
+        ds_put_format(string, " cookie=%#"PRIx64, ntohll(update.cookie));
+
+        ds_put_char(string, ' ');
+        cls_rule_format(update.match, string);
+
+        if (update.ofpacts_len) {
+            if (string->string[string->length - 1] != ' ') {
+                ds_put_char(string, ' ');
+            }
+            ofpacts_format(update.ofpacts, update.ofpacts_len, string);
+        }
+    }
+}
+
 void
 ofp_print_version(const struct ofp_header *oh,
                   struct ds *string)
@@ -1564,6 +1695,22 @@ ofp_to_string__(const struct ofp_header *oh,
         ofp_print_stats_reply(string, oh);
         ofp_print_nxst_aggregate_reply(string, msg);
         break;
+
+    case OFPUTIL_NXT_FLOW_MONITOR_CANCEL:
+        ofp_print_nxt_flow_monitor_cancel(string, msg);
+        break;
+
+    case OFPUTIL_NXT_FLOW_MONITOR_PAUSED:
+    case OFPUTIL_NXT_FLOW_MONITOR_RESUMED:
+        break;
+
+    case OFPUTIL_NXST_FLOW_MONITOR_REQUEST:
+        ofp_print_nxst_flow_monitor_request(string, msg);
+        break;
+
+    case OFPUTIL_NXST_FLOW_MONITOR_REPLY:
+        ofp_print_nxst_flow_monitor_reply(string, msg);
+        break;
     }
 }
 
index cd261439f6e40b183f3b7589b7846a515fe77191..5fb8d8f2d1caa81704c73a54868474f833d107f2 100644 (file)
@@ -678,6 +678,18 @@ ofputil_decode_vendor(const struct ofp_header *oh, size_t length,
         { OFPUTIL_NXT_SET_CONTROLLER_ID, OFP10_VERSION,
           NXT_SET_CONTROLLER_ID, "NXT_SET_CONTROLLER_ID",
           sizeof(struct nx_controller_id), 0 },
+
+        { OFPUTIL_NXT_FLOW_MONITOR_CANCEL, OFP10_VERSION,
+          NXT_FLOW_MONITOR_CANCEL, "NXT_FLOW_MONITOR_CANCEL",
+          sizeof(struct nx_flow_monitor_cancel), 0 },
+
+        { OFPUTIL_NXT_FLOW_MONITOR_PAUSED, OFP10_VERSION,
+          NXT_FLOW_MONITOR_PAUSED, "NXT_FLOW_MONITOR_PAUSED",
+          sizeof(struct nicira_header), 0 },
+
+        { OFPUTIL_NXT_FLOW_MONITOR_RESUMED, OFP10_VERSION,
+          NXT_FLOW_MONITOR_RESUMED, "NXT_FLOW_MONITOR_RESUMED",
+          sizeof(struct nicira_header), 0 },
     };
 
     static const struct ofputil_msg_category nxt_category = {
@@ -760,6 +772,10 @@ ofputil_decode_nxst_request(const struct ofp_header *oh, size_t length,
         { OFPUTIL_NXST_AGGREGATE_REQUEST, OFP10_VERSION,
           NXST_AGGREGATE, "NXST_AGGREGATE request",
           sizeof(struct nx_aggregate_stats_request), 8 },
+
+        { OFPUTIL_NXST_FLOW_MONITOR_REQUEST, OFP10_VERSION,
+          NXST_FLOW_MONITOR, "NXST_FLOW_MONITOR request",
+          sizeof(struct nicira_stats_msg), 8 },
     };
 
     static const struct ofputil_msg_category nxst_request_category = {
@@ -793,6 +809,10 @@ ofputil_decode_nxst_reply(const struct ofp_header *oh, size_t length,
         { OFPUTIL_NXST_AGGREGATE_REPLY, OFP10_VERSION,
           NXST_AGGREGATE, "NXST_AGGREGATE reply",
           sizeof(struct nx_aggregate_stats_reply), 0 },
+
+        { OFPUTIL_NXST_FLOW_MONITOR_REPLY, OFP10_VERSION,
+          NXST_FLOW_MONITOR, "NXST_FLOW_MONITOR reply",
+          sizeof(struct nicira_stats_msg), 8 },
     };
 
     static const struct ofputil_msg_category nxst_reply_category = {
@@ -3095,7 +3115,268 @@ ofputil_encode_port_mod(const struct ofputil_port_mod *pm,
 
     return b;
 }
+\f
+/* ofputil_flow_monitor_request */
+
+/* Converts an NXST_FLOW_MONITOR request in 'msg' into an abstract
+ * ofputil_flow_monitor_request in 'rq'.
+ *
+ * Multiple NXST_FLOW_MONITOR requests can be packed into a single OpenFlow
+ * message.  Calling this function multiple times for a single 'msg' iterates
+ * through the requests.  The caller must initially leave 'msg''s layer
+ * pointers null and not modify them between calls.
+ *
+ * Returns 0 if successful, EOF if no requests were left in this 'msg',
+ * otherwise an OFPERR_* value. */
+int
+ofputil_decode_flow_monitor_request(struct ofputil_flow_monitor_request *rq,
+                                    struct ofpbuf *msg)
+{
+    struct nx_flow_monitor_request *nfmr;
+    uint16_t flags;
+
+    if (!msg->l2) {
+        msg->l2 = msg->data;
+        ofpbuf_pull(msg, sizeof(struct nicira_stats_msg));
+    }
+
+    if (!msg->size) {
+        return EOF;
+    }
+
+    nfmr = ofpbuf_try_pull(msg, sizeof *nfmr);
+    if (!nfmr) {
+        VLOG_WARN_RL(&bad_ofmsg_rl, "NXST_FLOW_MONITOR request has %zu "
+                     "leftover bytes at end", msg->size);
+        return OFPERR_OFPBRC_BAD_LEN;
+    }
+
+    flags = ntohs(nfmr->flags);
+    if (!(flags & (NXFMF_ADD | NXFMF_DELETE | NXFMF_MODIFY))
+        || flags & ~(NXFMF_INITIAL | NXFMF_ADD | NXFMF_DELETE
+                     | NXFMF_MODIFY | NXFMF_ACTIONS | NXFMF_OWN)) {
+        VLOG_WARN_RL(&bad_ofmsg_rl, "NXST_FLOW_MONITOR has bad flags %#"PRIx16,
+                     flags);
+        return OFPERR_NXBRC_FM_BAD_FLAGS;
+    }
+
+    if (!is_all_zeros(nfmr->zeros, sizeof nfmr->zeros)) {
+        return OFPERR_NXBRC_MUST_BE_ZERO;
+    }
+
+    rq->id = ntohl(nfmr->id);
+    rq->flags = flags;
+    rq->out_port = ntohs(nfmr->out_port);
+    rq->table_id = nfmr->table_id;
+
+    return nx_pull_match(msg, ntohs(nfmr->match_len), OFP_DEFAULT_PRIORITY,
+                         &rq->match, NULL, NULL);
+}
+
+void
+ofputil_append_flow_monitor_request(
+    const struct ofputil_flow_monitor_request *rq, struct ofpbuf *msg)
+{
+    struct nx_flow_monitor_request *nfmr;
+    size_t start_ofs;
+    int match_len;
+
+    if (!msg->size) {
+        ofputil_put_stats_header(alloc_xid(), OFPT10_STATS_REQUEST,
+                                 htons(OFPST_VENDOR),
+                                 htonl(NXST_FLOW_MONITOR), msg);
+    }
+
+    start_ofs = msg->size;
+    ofpbuf_put_zeros(msg, sizeof *nfmr);
+    match_len = nx_put_match(msg, false, &rq->match, htonll(0), htonll(0));
+
+    nfmr = ofpbuf_at_assert(msg, start_ofs, sizeof *nfmr);
+    nfmr->id = htonl(rq->id);
+    nfmr->flags = htons(rq->flags);
+    nfmr->out_port = htons(rq->out_port);
+    nfmr->match_len = htons(match_len);
+    nfmr->table_id = rq->table_id;
+}
+
+/* Converts an NXST_FLOW_MONITOR reply (also known as a flow update) in 'msg'
+ * into an abstract ofputil_flow_update in 'update'.  The caller must have
+ * initialized update->match to point to space allocated for a cls_rule.
+ *
+ * Uses 'ofpacts' to store the abstract OFPACT_* version of the update's
+ * actions (except for NXFME_ABBREV, which never includes actions).  The caller
+ * must initialize 'ofpacts' and retains ownership of it.  'update->ofpacts'
+ * will point into the 'ofpacts' buffer.
+ *
+ * Multiple flow updates can be packed into a single OpenFlow message.  Calling
+ * this function multiple times for a single 'msg' iterates through the
+ * updates.  The caller must initially leave 'msg''s layer pointers null and
+ * not modify them between calls.
+ *
+ * Returns 0 if successful, EOF if no updates were left in this 'msg',
+ * otherwise an OFPERR_* value. */
+int
+ofputil_decode_flow_update(struct ofputil_flow_update *update,
+                           struct ofpbuf *msg, struct ofpbuf *ofpacts)
+{
+    struct nx_flow_update_header *nfuh;
+    unsigned int length;
+
+    if (!msg->l2) {
+        msg->l2 = msg->data;
+        ofpbuf_pull(msg, sizeof(struct nicira_stats_msg));
+    }
+
+    if (!msg->size) {
+        return EOF;
+    }
+
+    if (msg->size < sizeof(struct nx_flow_update_header)) {
+        goto bad_len;
+    }
+
+    nfuh = msg->data;
+    update->event = ntohs(nfuh->event);
+    length = ntohs(nfuh->length);
+    if (length > msg->size || length % 8) {
+        goto bad_len;
+    }
 
+    if (update->event == NXFME_ABBREV) {
+        struct nx_flow_update_abbrev *nfua;
+
+        if (length != sizeof *nfua) {
+            goto bad_len;
+        }
+
+        nfua = ofpbuf_pull(msg, sizeof *nfua);
+        update->xid = nfua->xid;
+        return 0;
+    } else if (update->event == NXFME_ADDED
+               || update->event == NXFME_DELETED
+               || update->event == NXFME_MODIFIED) {
+        struct nx_flow_update_full *nfuf;
+        unsigned int actions_len;
+        unsigned int match_len;
+        enum ofperr error;
+
+        if (length < sizeof *nfuf) {
+            goto bad_len;
+        }
+
+        nfuf = ofpbuf_pull(msg, sizeof *nfuf);
+        match_len = ntohs(nfuf->match_len);
+        if (sizeof *nfuf + match_len > length) {
+            goto bad_len;
+        }
+
+        update->reason = ntohs(nfuf->reason);
+        update->idle_timeout = ntohs(nfuf->idle_timeout);
+        update->hard_timeout = ntohs(nfuf->hard_timeout);
+        update->table_id = nfuf->table_id;
+        update->cookie = nfuf->cookie;
+
+        error = nx_pull_match(msg, match_len, ntohs(nfuf->priority),
+                              update->match, NULL, NULL);
+        if (error) {
+            return error;
+        }
+
+        actions_len = length - sizeof *nfuf - ROUND_UP(match_len, 8);
+        error = ofpacts_pull_openflow10(msg, actions_len, ofpacts);
+        if (error) {
+            return error;
+        }
+
+        update->ofpacts = ofpacts->data;
+        update->ofpacts_len = ofpacts->size;
+        return 0;
+    } else {
+        VLOG_WARN_RL(&bad_ofmsg_rl,
+                     "NXST_FLOW_MONITOR reply has bad event %"PRIu16,
+                     ntohs(nfuh->event));
+        return OFPERR_OFPET_BAD_REQUEST;
+    }
+
+bad_len:
+    VLOG_WARN_RL(&bad_ofmsg_rl, "NXST_FLOW_MONITOR reply has %zu "
+                 "leftover bytes at end", msg->size);
+    return OFPERR_OFPBRC_BAD_LEN;
+}
+
+uint32_t
+ofputil_decode_flow_monitor_cancel(const struct ofp_header *oh)
+{
+    return ntohl(((const struct nx_flow_monitor_cancel *) oh)->id);
+}
+
+struct ofpbuf *
+ofputil_encode_flow_monitor_cancel(uint32_t id)
+{
+    struct nx_flow_monitor_cancel *nfmc;
+    struct ofpbuf *msg;
+
+    nfmc = make_nxmsg(sizeof *nfmc, NXT_FLOW_MONITOR_CANCEL, &msg);
+    nfmc->id = htonl(id);
+    return msg;
+}
+
+void
+ofputil_start_flow_update(struct list *replies)
+{
+    struct ofpbuf *msg;
+
+    msg = ofpbuf_new(1024);
+    ofputil_put_stats_header(htonl(0), OFPT10_STATS_REPLY,
+                             htons(OFPST_VENDOR),
+                             htonl(NXST_FLOW_MONITOR), msg);
+
+    list_init(replies);
+    list_push_back(replies, &msg->list_node);
+}
+
+void
+ofputil_append_flow_update(const struct ofputil_flow_update *update,
+                           struct list *replies)
+{
+    struct nx_flow_update_header *nfuh;
+    struct ofpbuf *msg;
+    size_t start_ofs;
+
+    msg = ofpbuf_from_list(list_back(replies));
+    start_ofs = msg->size;
+
+    if (update->event == NXFME_ABBREV) {
+        struct nx_flow_update_abbrev *nfua;
+
+        nfua = ofpbuf_put_zeros(msg, sizeof *nfua);
+        nfua->xid = update->xid;
+    } else {
+        struct nx_flow_update_full *nfuf;
+        int match_len;
+
+        ofpbuf_put_zeros(msg, sizeof *nfuf);
+        match_len = nx_put_match(msg, false, update->match,
+                                 htonll(0), htonll(0));
+        ofpacts_put_openflow10(update->ofpacts, update->ofpacts_len, msg);
+
+        nfuf = ofpbuf_at_assert(msg, start_ofs, sizeof *nfuf);
+        nfuf->reason = htons(update->reason);
+        nfuf->priority = htons(update->match->priority);
+        nfuf->idle_timeout = htons(update->idle_timeout);
+        nfuf->hard_timeout = htons(update->hard_timeout);
+        nfuf->match_len = htons(match_len);
+        nfuf->table_id = update->table_id;
+        nfuf->cookie = update->cookie;
+    }
+
+    nfuh = ofpbuf_at_assert(msg, start_ofs, sizeof *nfuh);
+    nfuh->length = htons(msg->size - start_ofs);
+    nfuh->event = htons(update->event);
+
+    ofputil_postappend_stats_reply(start_ofs, replies);
+}
+\f
 struct ofpbuf *
 ofputil_encode_packet_out(const struct ofputil_packet_out *po)
 {
index 5b1e8edb9915c8c8215ec34cabaa2d18bcf74b37..f7d3307f6cecfc6fb696d1be31fb073446c1acea 100644 (file)
@@ -86,14 +86,19 @@ enum ofputil_msg_code {
     OFPUTIL_NXT_FLOW_AGE,
     OFPUTIL_NXT_SET_ASYNC_CONFIG,
     OFPUTIL_NXT_SET_CONTROLLER_ID,
+    OFPUTIL_NXT_FLOW_MONITOR_CANCEL,
+    OFPUTIL_NXT_FLOW_MONITOR_PAUSED,
+    OFPUTIL_NXT_FLOW_MONITOR_RESUMED,
 
     /* NXST_* stat requests. */
     OFPUTIL_NXST_FLOW_REQUEST,
     OFPUTIL_NXST_AGGREGATE_REQUEST,
+    OFPUTIL_NXST_FLOW_MONITOR_REQUEST,
 
     /* NXST_* stat replies. */
     OFPUTIL_NXST_FLOW_REPLY,
-    OFPUTIL_NXST_AGGREGATE_REPLY
+    OFPUTIL_NXST_AGGREGATE_REPLY,
+    OFPUTIL_NXST_FLOW_MONITOR_REPLY,
 };
 
 struct ofputil_msg_type;
@@ -506,6 +511,48 @@ enum ofperr ofputil_decode_port_mod(const struct ofp_header *,
 struct ofpbuf *ofputil_encode_port_mod(const struct ofputil_port_mod *,
                                        enum ofputil_protocol);
 
+/* Abstract nx_flow_monitor_request. */
+struct ofputil_flow_monitor_request {
+    uint32_t id;
+    enum nx_flow_monitor_flags flags;
+    uint16_t out_port;
+    uint8_t table_id;
+    struct cls_rule match;
+};
+
+int ofputil_decode_flow_monitor_request(struct ofputil_flow_monitor_request *,
+                                        struct ofpbuf *msg);
+void ofputil_append_flow_monitor_request(
+    const struct ofputil_flow_monitor_request *, struct ofpbuf *msg);
+
+/* Abstract nx_flow_update. */
+struct ofputil_flow_update {
+    enum nx_flow_update_event event;
+
+    /* Used only for NXFME_ADDED, NXFME_DELETED, NXFME_MODIFIED. */
+    enum ofp_flow_removed_reason reason;
+    uint16_t idle_timeout;
+    uint16_t hard_timeout;
+    uint8_t table_id;
+    ovs_be64 cookie;
+    struct cls_rule *match;
+    struct ofpact *ofpacts;
+    size_t ofpacts_len;
+
+    /* Used only for NXFME_ABBREV. */
+    ovs_be32 xid;
+};
+
+int ofputil_decode_flow_update(struct ofputil_flow_update *,
+                               struct ofpbuf *msg, struct ofpbuf *ofpacts);
+void ofputil_start_flow_update(struct list *replies);
+void ofputil_append_flow_update(const struct ofputil_flow_update *,
+                                struct list *replies);
+
+/* Abstract nx_flow_monitor_cancel. */
+uint32_t ofputil_decode_flow_monitor_cancel(const struct ofp_header *);
+struct ofpbuf *ofputil_encode_flow_monitor_cancel(uint32_t id);
+
 /* OpenFlow protocol utility functions. */
 void *make_openflow(size_t openflow_len, uint8_t type, struct ofpbuf **);
 void *make_nxmsg(size_t openflow_len, uint32_t subtype, struct ofpbuf **);
index 3e750d24cc446eaf8bc355db08a726b37eebcacf..b70b0708531a7c398bedd938519ebc86835defbe 100644 (file)
@@ -88,6 +88,13 @@ struct ofconn {
      * that the message might be generated, a 0-bit disables it. */
     uint32_t master_async_config[OAM_N_TYPES]; /* master, other */
     uint32_t slave_async_config[OAM_N_TYPES];  /* slave */
+
+    /* Flow monitors. */
+    struct hmap monitors;       /* Contains "struct ofmonitor"s. */
+    struct list updates;        /* List of "struct ofpbuf"s. */
+    bool sent_abbrev_update;    /* Does 'updates' contain NXFME_ABBREV? */
+    struct rconn_packet_counter *monitor_counter;
+    uint64_t monitor_paused;
 };
 
 static struct ofconn *ofconn_create(struct connmgr *, struct rconn *,
@@ -162,6 +169,8 @@ struct connmgr {
 
 static void update_in_band_remotes(struct connmgr *);
 static void add_snooper(struct connmgr *, struct vconn *);
+static void ofmonitor_run(struct connmgr *);
+static void ofmonitor_wait(struct connmgr *);
 
 /* Creates and returns a new connection manager owned by 'ofproto'.  'name' is
  * a name for the ofproto suitable for using in log messages.
@@ -267,6 +276,7 @@ connmgr_run(struct connmgr *mgr,
     LIST_FOR_EACH_SAFE (ofconn, next_ofconn, node, &mgr->all_conns) {
         ofconn_run(ofconn, handle_openflow);
     }
+    ofmonitor_run(mgr);
 
     /* Fail-open maintenance.  Do this after processing the ofconns since
      * fail-open checks the status of the controller rconn. */
@@ -326,6 +336,7 @@ connmgr_wait(struct connmgr *mgr, bool handling_openflow)
     LIST_FOR_EACH (ofconn, node, &mgr->all_conns) {
         ofconn_wait(ofconn, handling_openflow);
     }
+    ofmonitor_wait(mgr);
     if (handling_openflow && mgr->in_band) {
         in_band_wait(mgr->in_band);
     }
@@ -1002,6 +1013,9 @@ ofconn_create(struct connmgr *mgr, struct rconn *rconn, enum ofconn_type type,
 
     list_init(&ofconn->opgroups);
 
+    hmap_init(&ofconn->monitors);
+    list_init(&ofconn->updates);
+
     ofconn_flush(ofconn);
 
     return ofconn;
@@ -1012,6 +1026,7 @@ ofconn_create(struct connmgr *mgr, struct rconn *rconn, enum ofconn_type type,
 static void
 ofconn_flush(struct ofconn *ofconn)
 {
+    struct ofmonitor *monitor, *next_monitor;
     int i;
 
     ofconn->role = NX_ROLE_OTHER;
@@ -1080,6 +1095,14 @@ ofconn_flush(struct ofconn *ofconn)
         memset(ofconn->slave_async_config, 0,
                sizeof ofconn->slave_async_config);
     }
+
+    HMAP_FOR_EACH_SAFE (monitor, next_monitor, ofconn_node,
+                        &ofconn->monitors) {
+        ofmonitor_destroy(monitor);
+    }
+    rconn_packet_counter_destroy(ofconn->monitor_counter);
+    ofconn->monitor_counter = rconn_packet_counter_create();
+    ofpbuf_list_delete(&ofconn->updates); /* ...but it should be empty. */
 }
 
 static void
@@ -1096,6 +1119,7 @@ ofconn_destroy(struct ofconn *ofconn)
     rconn_packet_counter_destroy(ofconn->packet_in_counter);
     rconn_packet_counter_destroy(ofconn->reply_counter);
     pktbuf_destroy(ofconn->pktbuf);
+    rconn_packet_counter_destroy(ofconn->monitor_counter);
     free(ofconn);
 }
 
@@ -1646,3 +1670,239 @@ ofservice_lookup(struct connmgr *mgr, const char *target)
     }
     return NULL;
 }
+\f
+/* Flow monitors (NXST_FLOW_MONITOR). */
+
+/* A counter incremented when something significant happens to an OpenFlow
+ * rule.
+ *
+ *     - When a rule is added, its 'add_seqno' and 'modify_seqno' are set to
+ *       the current value (which is then incremented).
+ *
+ *     - When a rule is modified, its 'modify_seqno' is set to the current
+ *       value (which is then incremented).
+ *
+ * Thus, by comparing an old value of monitor_seqno against a rule's
+ * 'add_seqno', one can tell whether the rule was added before or after the old
+ * value was read, and similarly for 'modify_seqno'.
+ *
+ * 32 bits should normally be sufficient (and would be nice, to save space in
+ * each rule) but then we'd have to have some special cases for wraparound.
+ *
+ * We initialize monitor_seqno to 1 to allow 0 to be used as an invalid
+ * value. */
+static uint64_t monitor_seqno = 1;
+
+COVERAGE_DEFINE(ofmonitor_pause);
+COVERAGE_DEFINE(ofmonitor_resume);
+
+enum ofperr
+ofmonitor_create(const struct ofputil_flow_monitor_request *request,
+                 struct ofconn *ofconn, struct ofmonitor **monitorp)
+{
+    struct ofmonitor *m;
+
+    *monitorp = NULL;
+
+    m = ofmonitor_lookup(ofconn, request->id);
+    if (m) {
+        return OFPERR_NXBRC_FM_DUPLICATE_ID;
+    }
+
+    m = xmalloc(sizeof *m);
+    m->ofconn = ofconn;
+    hmap_insert(&ofconn->monitors, &m->ofconn_node, hash_int(request->id, 0));
+    m->id = request->id;
+    m->flags = request->flags;
+    m->out_port = request->out_port;
+    m->table_id = request->table_id;
+    m->match = request->match;
+
+    *monitorp = m;
+    return 0;
+}
+
+struct ofmonitor *
+ofmonitor_lookup(struct ofconn *ofconn, uint32_t id)
+{
+    struct ofmonitor *m;
+
+    HMAP_FOR_EACH_IN_BUCKET (m, ofconn_node, hash_int(id, 0),
+                             &ofconn->monitors) {
+        if (m->id == id) {
+            return m;
+        }
+    }
+    return NULL;
+}
+
+void
+ofmonitor_destroy(struct ofmonitor *m)
+{
+    if (m) {
+        hmap_remove(&m->ofconn->monitors, &m->ofconn_node);
+        free(m);
+    }
+}
+
+void
+ofmonitor_report(struct connmgr *mgr, struct rule *rule,
+                 enum nx_flow_update_event event,
+                 enum ofp_flow_removed_reason reason,
+                 const struct ofconn *abbrev_ofconn, ovs_be32 abbrev_xid)
+{
+    enum nx_flow_monitor_flags update;
+    struct ofconn *ofconn;
+
+    switch (event) {
+    case NXFME_ADDED:
+        update = NXFMF_ADD;
+        rule->add_seqno = rule->modify_seqno = monitor_seqno++;
+        break;
+
+    case NXFME_DELETED:
+        update = NXFMF_DELETE;
+        break;
+
+    case NXFME_MODIFIED:
+        update = NXFMF_MODIFY;
+        rule->modify_seqno = monitor_seqno++;
+        break;
+
+    default:
+    case NXFME_ABBREV:
+        NOT_REACHED();
+    }
+
+    LIST_FOR_EACH (ofconn, node, &mgr->all_conns) {
+        enum nx_flow_monitor_flags flags = 0;
+        struct ofmonitor *m;
+
+        if (ofconn->monitor_paused) {
+            /* Only send NXFME_DELETED notifications for flows that were added
+             * before we paused. */
+            if (event != NXFME_DELETED
+                || rule->add_seqno > ofconn->monitor_paused) {
+                continue;
+            }
+        }
+
+        HMAP_FOR_EACH (m, ofconn_node, &ofconn->monitors) {
+            if (m->flags & update
+                && (m->table_id == 0xff || m->table_id == rule->table_id)
+                && ofoperation_has_out_port(rule->pending, m->out_port)
+                && cls_rule_is_loose_match(&rule->cr, &m->match)) {
+                flags |= m->flags;
+            }
+        }
+
+        if (flags) {
+            if (list_is_empty(&ofconn->updates)) {
+                ofputil_start_flow_update(&ofconn->updates);
+                ofconn->sent_abbrev_update = false;
+            }
+
+            if (ofconn != abbrev_ofconn || ofconn->monitor_paused) {
+                struct ofputil_flow_update fu;
+
+                fu.event = event;
+                fu.reason = event == NXFME_DELETED ? reason : 0;
+                fu.idle_timeout = rule->idle_timeout;
+                fu.hard_timeout = rule->hard_timeout;
+                fu.table_id = rule->table_id;
+                fu.cookie = rule->flow_cookie;
+                fu.match = &rule->cr;
+                if (flags & NXFMF_ACTIONS) {
+                    fu.ofpacts = rule->ofpacts;
+                    fu.ofpacts_len = rule->ofpacts_len;
+                } else {
+                    fu.ofpacts = NULL;
+                    fu.ofpacts_len = 0;
+                }
+                ofputil_append_flow_update(&fu, &ofconn->updates);
+            } else if (!ofconn->sent_abbrev_update) {
+                struct ofputil_flow_update fu;
+
+                fu.event = NXFME_ABBREV;
+                fu.xid = abbrev_xid;
+                ofputil_append_flow_update(&fu, &ofconn->updates);
+
+                ofconn->sent_abbrev_update = true;
+            }
+        }
+    }
+}
+
+void
+ofmonitor_flush(struct connmgr *mgr)
+{
+    struct ofconn *ofconn;
+
+    LIST_FOR_EACH (ofconn, node, &mgr->all_conns) {
+        struct ofpbuf *msg, *next;
+
+        LIST_FOR_EACH_SAFE (msg, next, list_node, &ofconn->updates) {
+            list_remove(&msg->list_node);
+            ofconn_send(ofconn, msg, ofconn->monitor_counter);
+            if (!ofconn->monitor_paused
+                && ofconn->monitor_counter->n_bytes > 128 * 1024) {
+                struct ofpbuf *pause;
+
+                COVERAGE_INC(ofmonitor_pause);
+                ofconn->monitor_paused = monitor_seqno++;
+                make_nxmsg_xid(sizeof(struct nicira_header),
+                               NXT_FLOW_MONITOR_PAUSED, htonl(0), &pause);
+                ofconn_send(ofconn, pause, ofconn->monitor_counter);
+            }
+        }
+    }
+}
+
+static void
+ofmonitor_resume(struct ofconn *ofconn)
+{
+    struct ofpbuf *resume;
+    struct ofmonitor *m;
+    struct list rules;
+    struct list msgs;
+
+    list_init(&rules);
+    HMAP_FOR_EACH (m, ofconn_node, &ofconn->monitors) {
+        ofmonitor_collect_resume_rules(m, ofconn->monitor_paused, &rules);
+    }
+
+    list_init(&msgs);
+    ofmonitor_compose_refresh_updates(&rules, &msgs);
+
+    make_nxmsg_xid(sizeof(struct nicira_header),
+                   NXT_FLOW_MONITOR_RESUMED, htonl(0), &resume);
+    list_push_back(&msgs, &resume->list_node);
+    ofconn_send_replies(ofconn, &msgs);
+
+    ofconn->monitor_paused = 0;
+}
+
+static void
+ofmonitor_run(struct connmgr *mgr)
+{
+    struct ofconn *ofconn;
+
+    LIST_FOR_EACH (ofconn, node, &mgr->all_conns) {
+        if (ofconn->monitor_paused && !ofconn->monitor_counter->n_packets) {
+            COVERAGE_INC(ofmonitor_resume);
+            ofmonitor_resume(ofconn);
+        }
+    }
+}
+
+static void
+ofmonitor_wait(struct connmgr *mgr)
+{
+    struct ofconn *ofconn;
+
+    LIST_FOR_EACH (ofconn, node, &mgr->all_conns) {
+        if (ofconn->monitor_paused && !ofconn->monitor_counter->n_packets) {
+            poll_immediate_wake();
+        }
+    }
+}
index dec5b71c6244fb1e5e0dd5430766ff2c571acd63..24a33fb0e1c6908962e663a4894f496574d54e06 100644 (file)
@@ -17,6 +17,7 @@
 #ifndef CONNMGR_H
 #define CONNMGR_H 1
 
+#include "classifier.h"
 #include "hmap.h"
 #include "list.h"
 #include "ofp-errors.h"
@@ -30,6 +31,7 @@ struct ofopgroup;
 struct ofputil_flow_removed;
 struct ofputil_packet_in;
 struct ofputil_phy_port;
+struct rule;
 struct simap;
 struct sset;
 
@@ -159,4 +161,34 @@ bool connmgr_may_set_up_flow(struct connmgr *, const struct flow *,
 /* Fail-open and in-band implementation. */
 void connmgr_flushed(struct connmgr *);
 
+/* A flow monitor managed by NXST_FLOW_MONITOR and related requests. */
+struct ofmonitor {
+    struct ofconn *ofconn;      /* Owning 'ofconn'. */
+    struct hmap_node ofconn_node; /* In ofconn's 'monitors' hmap. */
+    uint32_t id;
+
+    enum nx_flow_monitor_flags flags;
+
+    /* Matching. */
+    uint16_t out_port;
+    uint8_t table_id;
+    struct cls_rule match;
+};
+
+struct ofputil_flow_monitor_request;
+
+enum ofperr ofmonitor_create(const struct ofputil_flow_monitor_request *,
+                             struct ofconn *, struct ofmonitor **);
+struct ofmonitor *ofmonitor_lookup(struct ofconn *, uint32_t id);
+void ofmonitor_destroy(struct ofmonitor *);
+
+void ofmonitor_report(struct connmgr *, struct rule *,
+                      enum nx_flow_update_event, enum ofp_flow_removed_reason,
+                      const struct ofconn *abbrev_ofconn, ovs_be32 abbrev_xid);
+void ofmonitor_flush(struct connmgr *);
+
+void ofmonitor_collect_resume_rules(struct ofmonitor *, uint64_t seqno,
+                                    struct list *rules);
+void ofmonitor_compose_refresh_updates(struct list *rules, struct list *msgs);
+
 #endif /* connmgr.h */
index f22c9f61da021e67f76f374a3442adc448e34ffd..6eef1063ea9db3c28635bf19f910ead4d6bb569b 100644 (file)
@@ -187,6 +187,11 @@ struct rule {
 
     struct ofpact *ofpacts;      /* Sequence of "struct ofpacts". */
     unsigned int ofpacts_len;    /* Size of 'ofpacts', in bytes. */
+
+    /* Flow monitors. */
+    enum nx_flow_monitor_flags monitor_flags;
+    uint64_t add_seqno;         /* Sequence number when added. */
+    uint64_t modify_seqno;      /* Sequence number when changed. */
 };
 
 static inline struct rule *
@@ -199,9 +204,15 @@ void ofproto_rule_update_used(struct rule *, long long int used);
 void ofproto_rule_expire(struct rule *, uint8_t reason);
 void ofproto_rule_destroy(struct rule *);
 
+bool ofproto_rule_has_out_port(const struct rule *, uint16_t out_port);
+
 void ofoperation_complete(struct ofoperation *, enum ofperr);
 struct rule *ofoperation_get_victim(struct ofoperation *);
 
+bool ofoperation_has_out_port(const struct ofoperation *, uint16_t out_port);
+
+bool ofproto_rule_is_hidden(const struct rule *);
+
 /* ofproto class structure, to be defined by each ofproto implementation.
  *
  *
index 93401919e00d554a373fd4e1e20e1ea6704d9ff5..b187c86fbc3c7fb89e3d6123bbddd481bc7bd859 100644 (file)
@@ -126,13 +126,17 @@ struct ofoperation {
     struct ofpact *ofpacts;
     size_t ofpacts_len;
 
+    /* OFOPERATION_DELETE. */
+    enum ofp_flow_removed_reason reason; /* Reason flow was removed. */
+
     ovs_be64 flow_cookie;       /* Rule's old flow cookie. */
     enum ofperr error;          /* 0 if no error. */
 };
 
 static struct ofoperation *ofoperation_create(struct ofopgroup *,
                                               struct rule *,
-                                              enum ofoperation_type);
+                                              enum ofoperation_type,
+                                              enum ofp_flow_removed_reason);
 static void ofoperation_destroy(struct ofoperation *);
 
 /* oftable. */
@@ -188,7 +192,6 @@ static void reinit_ports(struct ofproto *);
 static void ofproto_rule_destroy__(struct rule *);
 static void ofproto_rule_send_removed(struct rule *, uint8_t reason);
 static bool rule_is_modifiable(const struct rule *);
-static bool rule_is_hidden(const struct rule *);
 
 /* OpenFlow. */
 static enum ofperr add_flow(struct ofproto *, struct ofconn *,
@@ -952,7 +955,8 @@ ofproto_flush__(struct ofproto *ofproto)
         cls_cursor_init(&cursor, &table->cls, NULL);
         CLS_CURSOR_FOR_EACH_SAFE (rule, next_rule, cr, &cursor) {
             if (!rule->pending) {
-                ofoperation_create(group, rule, OFOPERATION_DELETE);
+                ofoperation_create(group, rule, OFOPERATION_DELETE,
+                                   OFPRR_DELETE);
                 oftable_remove_rule(rule);
                 ofproto->ofproto_class->rule_destruct(rule);
             }
@@ -1445,7 +1449,7 @@ ofproto_delete_flow(struct ofproto *ofproto, const struct cls_rule *target)
     } else {
         /* Initiate deletion -> success. */
         struct ofopgroup *group = ofopgroup_create_unattached(ofproto);
-        ofoperation_create(group, rule, OFOPERATION_DELETE);
+        ofoperation_create(group, rule, OFOPERATION_DELETE, OFPRR_DELETE);
         oftable_remove_rule(rule);
         ofproto->ofproto_class->rule_destruct(rule);
         ofopgroup_submit(group);
@@ -1894,13 +1898,36 @@ ofproto_rule_destroy(struct rule *rule)
 
 /* Returns true if 'rule' has an OpenFlow OFPAT_OUTPUT or OFPAT_ENQUEUE action
  * that outputs to 'port' (output to OFPP_FLOOD and OFPP_ALL doesn't count). */
-static bool
-rule_has_out_port(const struct rule *rule, uint16_t port)
+bool
+ofproto_rule_has_out_port(const struct rule *rule, uint16_t port)
 {
     return (port == OFPP_NONE
             || ofpacts_output_to_port(rule->ofpacts, rule->ofpacts_len, port));
 }
 
+/* Returns true if a rule related to 'op' has an OpenFlow OFPAT_OUTPUT or
+ * OFPAT_ENQUEUE action that outputs to 'out_port'. */
+bool
+ofoperation_has_out_port(const struct ofoperation *op, uint16_t out_port)
+{
+    if (ofproto_rule_has_out_port(op->rule, out_port)) {
+        return true;
+    }
+
+    switch (op->type) {
+    case OFOPERATION_ADD:
+        return op->victim && ofproto_rule_has_out_port(op->victim, out_port);
+
+    case OFOPERATION_DELETE:
+        return false;
+
+    case OFOPERATION_MODIFY:
+        return ofpacts_output_to_port(op->ofpacts, op->ofpacts_len, out_port);
+    }
+
+    NOT_REACHED();
+}
+
 /* Executes the actions indicated by 'rule' on 'packet' and credits 'rule''s
  * statistics appropriately.  'packet' must have at least sizeof(struct
  * ofp_packet_in) bytes of headroom.
@@ -1925,8 +1952,8 @@ rule_execute(struct rule *rule, uint16_t in_port, struct ofpbuf *packet)
  * Rules with priority higher than UINT16_MAX are set up by ofproto itself
  * (e.g. by in-band control) and are intentionally hidden from the
  * controller. */
-static bool
-rule_is_hidden(const struct rule *rule)
+bool
+ofproto_rule_is_hidden(const struct rule *rule)
 {
     return rule->cr.priority > UINT16_MAX;
 }
@@ -2393,7 +2420,8 @@ collect_rules_loose(struct ofproto *ofproto, uint8_t table_id,
             if (rule->pending) {
                 return OFPROTO_POSTPONE;
             }
-            if (!rule_is_hidden(rule) && rule_has_out_port(rule, out_port)
+            if (!ofproto_rule_is_hidden(rule)
+                && ofproto_rule_has_out_port(rule, out_port)
                     && !((rule->flow_cookie ^ cookie) & cookie_mask)) {
                 list_push_back(rules, &rule->ofproto_node);
             }
@@ -2437,7 +2465,8 @@ collect_rules_strict(struct ofproto *ofproto, uint8_t table_id,
             if (rule->pending) {
                 return OFPROTO_POSTPONE;
             }
-            if (!rule_is_hidden(rule) && rule_has_out_port(rule, out_port)
+            if (!ofproto_rule_is_hidden(rule)
+                && ofproto_rule_has_out_port(rule, out_port)
                     && !((rule->flow_cookie ^ cookie) & cookie_mask)) {
                 list_push_back(rules, &rule->ofproto_node);
             }
@@ -2855,6 +2884,9 @@ add_flow(struct ofproto *ofproto, struct ofconn *ofconn,
     rule->ofpacts_len = fm->ofpacts_len;
     rule->evictable = true;
     rule->eviction_group = NULL;
+    rule->monitor_flags = 0;
+    rule->add_seqno = 0;
+    rule->modify_seqno = 0;
 
     /* Insert new rule. */
     victim = oftable_replace_rule(rule);
@@ -2886,7 +2918,7 @@ add_flow(struct ofproto *ofproto, struct ofconn *ofconn,
         }
 
         group = ofopgroup_create(ofproto, ofconn, request, fm->buffer_id);
-        op = ofoperation_create(group, rule, OFOPERATION_ADD);
+        op = ofoperation_create(group, rule, OFOPERATION_ADD, 0);
         op->victim = victim;
 
         error = ofproto->ofproto_class->rule_construct(rule);
@@ -2950,7 +2982,7 @@ modify_flows__(struct ofproto *ofproto, struct ofconn *ofconn,
             continue;
         }
 
-        op = ofoperation_create(group, rule, OFOPERATION_MODIFY);
+        op = ofoperation_create(group, rule, OFOPERATION_MODIFY, 0);
         rule->flow_cookie = new_cookie;
         if (actions_changed) {
             op->ofpacts = rule->ofpacts;
@@ -3029,7 +3061,7 @@ delete_flow__(struct rule *rule, struct ofopgroup *group)
 
     ofproto_rule_send_removed(rule, OFPRR_DELETE);
 
-    ofoperation_create(group, rule, OFOPERATION_DELETE);
+    ofoperation_create(group, rule, OFOPERATION_DELETE, OFPRR_DELETE);
     oftable_remove_rule(rule);
     ofproto->ofproto_class->rule_destruct(rule);
 }
@@ -3094,7 +3126,7 @@ ofproto_rule_send_removed(struct rule *rule, uint8_t reason)
 {
     struct ofputil_flow_removed fr;
 
-    if (rule_is_hidden(rule) || !rule->send_flow_removed) {
+    if (ofproto_rule_is_hidden(rule) || !rule->send_flow_removed) {
         return;
     }
 
@@ -3144,7 +3176,7 @@ ofproto_rule_expire(struct rule *rule, uint8_t reason)
     ofproto_rule_send_removed(rule, reason);
 
     group = ofopgroup_create_unattached(ofproto);
-    ofoperation_create(group, rule, OFOPERATION_DELETE);
+    ofoperation_create(group, rule, OFOPERATION_DELETE, reason);
     oftable_remove_rule(rule);
     ofproto->ofproto_class->rule_destruct(rule);
     ofopgroup_submit(group);
@@ -3397,6 +3429,255 @@ handle_barrier_request(struct ofconn *ofconn, const struct ofp_header *oh)
     return 0;
 }
 
+static void
+ofproto_compose_flow_refresh_update(const struct rule *rule,
+                                    enum nx_flow_monitor_flags flags,
+                                    struct list *msgs)
+{
+    struct ofoperation *op = rule->pending;
+    struct ofputil_flow_update fu;
+
+    if (op && op->type == OFOPERATION_ADD && !op->victim) {
+        /* We'll report the final flow when the operation completes.  Reporting
+         * it now would cause a duplicate report later. */
+        return;
+    }
+
+    fu.event = (flags & (NXFMF_INITIAL | NXFMF_ADD)
+                ? NXFME_ADDED : NXFME_MODIFIED);
+    fu.reason = 0;
+    fu.idle_timeout = rule->idle_timeout;
+    fu.hard_timeout = rule->hard_timeout;
+    fu.table_id = rule->table_id;
+    fu.cookie = rule->flow_cookie;
+    fu.match = (struct cls_rule *) &rule->cr;
+    if (!(flags & NXFMF_ACTIONS)) {
+        fu.ofpacts = NULL;
+        fu.ofpacts_len = 0;
+    } else if (!op) {
+        fu.ofpacts = rule->ofpacts;
+        fu.ofpacts_len = rule->ofpacts_len;
+    } else {
+        /* An operation is in progress.  Use the previous version of the flow's
+         * actions, so that when the operation commits we report the change. */
+        switch (op->type) {
+        case OFOPERATION_ADD:
+            /* We already verified that there was a victim. */
+            fu.ofpacts = op->victim->ofpacts;
+            fu.ofpacts_len = op->victim->ofpacts_len;
+            break;
+
+        case OFOPERATION_MODIFY:
+            if (op->ofpacts) {
+                fu.ofpacts = op->ofpacts;
+                fu.ofpacts_len = op->ofpacts_len;
+            } else {
+                fu.ofpacts = rule->ofpacts;
+                fu.ofpacts_len = rule->ofpacts_len;
+            }
+            break;
+
+        case OFOPERATION_DELETE:
+            fu.ofpacts = rule->ofpacts;
+            fu.ofpacts_len = rule->ofpacts_len;
+            break;
+
+        default:
+            NOT_REACHED();
+        }
+    }
+
+    if (list_is_empty(msgs)) {
+        ofputil_start_flow_update(msgs);
+    }
+    ofputil_append_flow_update(&fu, msgs);
+}
+
+void
+ofmonitor_compose_refresh_updates(struct list *rules, struct list *msgs)
+{
+    struct rule *rule;
+
+    LIST_FOR_EACH (rule, ofproto_node, rules) {
+        enum nx_flow_monitor_flags flags = rule->monitor_flags;
+        rule->monitor_flags = 0;
+
+        ofproto_compose_flow_refresh_update(rule, flags, msgs);
+    }
+}
+
+static void
+ofproto_collect_ofmonitor_refresh_rule(const struct ofmonitor *m,
+                                       struct rule *rule, uint64_t seqno,
+                                       struct list *rules)
+{
+    enum nx_flow_monitor_flags update;
+
+    if (ofproto_rule_is_hidden(rule)) {
+        return;
+    }
+
+    if (!(rule->pending
+          ? ofoperation_has_out_port(rule->pending, m->out_port)
+          : ofproto_rule_has_out_port(rule, m->out_port))) {
+        return;
+    }
+
+    if (seqno) {
+        if (rule->add_seqno > seqno) {
+            update = NXFMF_ADD | NXFMF_MODIFY;
+        } else if (rule->modify_seqno > seqno) {
+            update = NXFMF_MODIFY;
+        } else {
+            return;
+        }
+
+        if (!(m->flags & update)) {
+            return;
+        }
+    } else {
+        update = NXFMF_INITIAL;
+    }
+
+    if (!rule->monitor_flags) {
+        list_push_back(rules, &rule->ofproto_node);
+    }
+    rule->monitor_flags |= update | (m->flags & NXFMF_ACTIONS);
+}
+
+static void
+ofproto_collect_ofmonitor_refresh_rules(const struct ofmonitor *m,
+                                        uint64_t seqno,
+                                        struct list *rules)
+{
+    const struct ofproto *ofproto = ofconn_get_ofproto(m->ofconn);
+    const struct ofoperation *op;
+    const struct oftable *table;
+
+    FOR_EACH_MATCHING_TABLE (table, m->table_id, ofproto) {
+        struct cls_cursor cursor;
+        struct rule *rule;
+
+        cls_cursor_init(&cursor, &table->cls, &m->match);
+        CLS_CURSOR_FOR_EACH (rule, cr, &cursor) {
+            assert(!rule->pending); /* XXX */
+            ofproto_collect_ofmonitor_refresh_rule(m, rule, seqno, rules);
+        }
+    }
+
+    HMAP_FOR_EACH (op, hmap_node, &ofproto->deletions) {
+        struct rule *rule = op->rule;
+
+        if (((m->table_id == 0xff
+              ? !(ofproto->tables[rule->table_id].flags & OFTABLE_HIDDEN)
+              : m->table_id == rule->table_id))
+            && cls_rule_is_loose_match(&rule->cr, &m->match)) {
+            ofproto_collect_ofmonitor_refresh_rule(m, rule, seqno, rules);
+        }
+    }
+}
+
+static void
+ofproto_collect_ofmonitor_initial_rules(struct ofmonitor *m,
+                                        struct list *rules)
+{
+    if (m->flags & NXFMF_INITIAL) {
+        ofproto_collect_ofmonitor_refresh_rules(m, 0, rules);
+    }
+}
+
+void
+ofmonitor_collect_resume_rules(struct ofmonitor *m,
+                               uint64_t seqno, struct list *rules)
+{
+    ofproto_collect_ofmonitor_refresh_rules(m, seqno, rules);
+}
+
+static enum ofperr
+handle_flow_monitor_request(struct ofconn *ofconn,
+                            const struct ofp_stats_msg *osm)
+{
+    struct ofproto *ofproto = ofconn_get_ofproto(ofconn);
+    struct ofmonitor **monitors;
+    size_t n_monitors, allocated_monitors;
+    struct list replies;
+    enum ofperr error;
+    struct list rules;
+    struct ofpbuf b;
+    size_t i;
+
+    error = 0;
+    ofpbuf_use_const(&b, osm, ntohs(osm->header.length));
+    monitors = NULL;
+    n_monitors = allocated_monitors = 0;
+    for (;;) {
+        struct ofputil_flow_monitor_request request;
+        struct ofmonitor *m;
+        int retval;
+
+        retval = ofputil_decode_flow_monitor_request(&request, &b);
+        if (retval == EOF) {
+            break;
+        } else if (retval) {
+            error = retval;
+            goto error;
+        }
+
+        if (request.table_id != 0xff
+            && request.table_id >= ofproto->n_tables) {
+            error = OFPERR_OFPBRC_BAD_TABLE_ID;
+            goto error;
+        }
+
+        error = ofmonitor_create(&request, ofconn, &m);
+        if (error) {
+            goto error;
+        }
+
+        if (n_monitors >= allocated_monitors) {
+            monitors = x2nrealloc(monitors, &allocated_monitors,
+                                  sizeof *monitors);
+        }
+        monitors[n_monitors++] = m;
+    }
+
+    list_init(&rules);
+    for (i = 0; i < n_monitors; i++) {
+        ofproto_collect_ofmonitor_initial_rules(monitors[i], &rules);
+    }
+
+    ofputil_start_stats_reply(osm, &replies);
+    ofmonitor_compose_refresh_updates(&rules, &replies);
+    ofconn_send_replies(ofconn, &replies);
+
+    free(monitors);
+
+    return 0;
+
+error:
+    for (i = 0; i < n_monitors; i++) {
+        ofmonitor_destroy(monitors[i]);
+    }
+    free(monitors);
+    return error;
+}
+
+static enum ofperr
+handle_flow_monitor_cancel(struct ofconn *ofconn, const struct ofp_header *oh)
+{
+    struct ofmonitor *m;
+    uint32_t id;
+
+    id = ofputil_decode_flow_monitor_cancel(oh);
+    m = ofmonitor_lookup(ofconn, id);
+    if (!m) {
+        return OFPERR_NXBRC_FM_BAD_ID;
+    }
+
+    ofmonitor_destroy(m);
+    return 0;
+}
+
 static enum ofperr
 handle_openflow__(struct ofconn *ofconn, const struct ofpbuf *msg)
 {
@@ -3462,6 +3743,9 @@ handle_openflow__(struct ofconn *ofconn, const struct ofpbuf *msg)
         /* Nothing to do. */
         return 0;
 
+    case OFPUTIL_NXT_FLOW_MONITOR_CANCEL:
+        return handle_flow_monitor_cancel(ofconn, oh);
+
     case OFPUTIL_NXT_SET_ASYNC_CONFIG:
         return handle_nxt_set_async_config(ofconn, oh);
 
@@ -3489,6 +3773,9 @@ handle_openflow__(struct ofconn *ofconn, const struct ofpbuf *msg)
     case OFPUTIL_OFPST_PORT_DESC_REQUEST:
         return handle_port_desc_stats_request(ofconn, msg->data);
 
+    case OFPUTIL_NXST_FLOW_MONITOR_REQUEST:
+        return handle_flow_monitor_request(ofconn, msg->data);
+
     case OFPUTIL_MSG_INVALID:
     case OFPUTIL_OFPT_HELLO:
     case OFPUTIL_OFPT_ERROR:
@@ -3510,8 +3797,11 @@ handle_openflow__(struct ofconn *ofconn, const struct ofpbuf *msg)
     case OFPUTIL_NXT_ROLE_REPLY:
     case OFPUTIL_NXT_FLOW_REMOVED:
     case OFPUTIL_NXT_PACKET_IN:
+    case OFPUTIL_NXT_FLOW_MONITOR_PAUSED:
+    case OFPUTIL_NXT_FLOW_MONITOR_RESUMED:
     case OFPUTIL_NXST_FLOW_REPLY:
     case OFPUTIL_NXST_AGGREGATE_REPLY:
+    case OFPUTIL_NXST_FLOW_MONITOR_REPLY:
     default:
         return (oh->type == OFPT10_STATS_REQUEST ||
                 oh->type == OFPT10_STATS_REPLY
@@ -3600,6 +3890,10 @@ static void
 ofopgroup_complete(struct ofopgroup *group)
 {
     struct ofproto *ofproto = group->ofproto;
+
+    struct ofconn *abbrev_ofconn;
+    ovs_be32 abbrev_xid;
+
     struct ofoperation *op, *next_op;
     int error;
 
@@ -3630,8 +3924,28 @@ ofopgroup_complete(struct ofopgroup *group)
         }
     }
 
+    if (!error && !list_is_empty(&group->ofconn_node)) {
+        abbrev_ofconn = group->ofconn;
+        abbrev_xid = group->request->xid;
+    } else {
+        abbrev_ofconn = NULL;
+        abbrev_xid = htonl(0);
+    }
     LIST_FOR_EACH_SAFE (op, next_op, group_node, &group->ops) {
         struct rule *rule = op->rule;
+
+        if (!op->error && !ofproto_rule_is_hidden(rule)) {
+            /* Check that we can just cast from ofoperation_type to
+             * nx_flow_update_event. */
+            BUILD_ASSERT_DECL(OFOPERATION_ADD == NXFME_ADDED);
+            BUILD_ASSERT_DECL(OFOPERATION_DELETE == NXFME_DELETED);
+            BUILD_ASSERT_DECL(OFOPERATION_MODIFY == NXFME_MODIFIED);
+
+            ofmonitor_report(ofproto->connmgr, rule,
+                             (enum nx_flow_update_event) op->type,
+                             op->reason, abbrev_ofconn, abbrev_xid);
+        }
+
         rule->pending = NULL;
 
         switch (op->type) {
@@ -3685,6 +3999,8 @@ ofopgroup_complete(struct ofopgroup *group)
         ofoperation_destroy(op);
     }
 
+    ofmonitor_flush(ofproto->connmgr);
+
     if (!list_is_empty(&group->ofproto_node)) {
         assert(ofproto->n_pending > 0);
         ofproto->n_pending--;
@@ -3704,11 +4020,15 @@ ofopgroup_complete(struct ofopgroup *group)
 /* Initiates a new operation on 'rule', of the specified 'type', within
  * 'group'.  Prior to calling, 'rule' must not have any pending operation.
  *
+ * For a 'type' of OFOPERATION_DELETE, 'reason' should specify the reason that
+ * the flow is being deleted.  For other 'type's, 'reason' is ignored (use 0).
+ *
  * Returns the newly created ofoperation (which is also available as
  * rule->pending). */
 static struct ofoperation *
 ofoperation_create(struct ofopgroup *group, struct rule *rule,
-                   enum ofoperation_type type)
+                   enum ofoperation_type type,
+                   enum ofp_flow_removed_reason reason)
 {
     struct ofproto *ofproto = group->ofproto;
     struct ofoperation *op;
@@ -3720,6 +4040,7 @@ ofoperation_create(struct ofopgroup *group, struct rule *rule,
     list_push_back(&group->ops, &op->group_node);
     op->rule = rule;
     op->type = type;
+    op->reason = reason;
     op->flow_cookie = rule->flow_cookie;
 
     group->n_running++;
@@ -3891,7 +4212,8 @@ ofproto_evict(struct ofproto *ofproto)
                 break;
             }
 
-            ofoperation_create(group, rule, OFOPERATION_DELETE);
+            ofoperation_create(group, rule,
+                               OFOPERATION_DELETE, OFPRR_EVICTION);
             oftable_remove_rule(rule);
             ofproto->ofproto_class->rule_destruct(rule);
         }
index 20148512dc769bd972a4d0ee3f18d96d1f1fbc2d..1fe54f1702a7a07d94786567b74c900806983abd 100644 (file)
@@ -810,6 +810,34 @@ NXT_SET_CONTROLLER_ID (xid=0x3): id=123
 ])
 AT_CLEANUP
 
+AT_SETUP([NXT_FLOW_MONITOR_CANCEL])
+AT_KEYWORDS([ofp-print])
+AT_CHECK([ovs-ofctl ofp-print "\
+01 04 00 14 00 00 00 03 00 00 23 20 00 00 00 15 \
+01 02 30 40 \
+"], [0], [dnl
+NXT_FLOW_MONITOR_CANCEL (xid=0x3): id=16920640
+])
+AT_CLEANUP
+
+AT_SETUP([NXT_FLOW_MONITOR_PAUSED])
+AT_KEYWORDS([ofp-print])
+AT_CHECK([ovs-ofctl ofp-print "\
+01 04 00 10 00 00 00 03 00 00 23 20 00 00 00 16 \
+"], [0], [dnl
+NXT_FLOW_MONITOR_PAUSED (xid=0x3):
+])
+AT_CLEANUP
+
+AT_SETUP([NXT_FLOW_MONITOR_RESUMED])
+AT_KEYWORDS([ofp-print])
+AT_CHECK([ovs-ofctl ofp-print "\
+01 04 00 10 00 00 00 03 00 00 23 20 00 00 00 17 \
+"], [0], [dnl
+NXT_FLOW_MONITOR_RESUMED (xid=0x3):
+])
+AT_CLEANUP
+
 AT_SETUP([NXT_SET_FLOW_FORMAT])
 AT_KEYWORDS([ofp-print])
 AT_CHECK([ovs-ofctl ofp-print "\
@@ -1061,3 +1089,30 @@ AT_CHECK([ovs-ofctl ofp-print "\
 NXST_AGGREGATE reply (xid=0x4): packet_count=7 byte_count=420 flow_count=7
 ])
 AT_CLEANUP
+
+AT_SETUP([NXST_FLOW_MONITOR request])
+AT_KEYWORDS([ofp-print OFPT_STATS_REPLY])
+AT_CHECK([ovs-ofctl ofp-print "\
+01 10 00 40 00 00 00 04 ff ff 00 00 00 00 23 20 00 00 00 02 00 00 00 00 \
+00 00 40 00 00 3f ff fe 00 00 01 00 00 00 00 00 \
+00 00 20 00 00 04 ff ff 00 06 02 00 00 00 00 00 00 00 00 02 00 01 00 00 \
+"], [0], [dnl
+NXST_FLOW_MONITOR request (xid=0x4):
+ id=16384 flags=initial,add,delete,modify,actions,own out_port=LOCAL table=1
+ id=8192 flags=delete table=2 in_port=1
+])
+AT_CLEANUP
+
+AT_SETUP([NXST_FLOW_MONITOR reply])
+AT_KEYWORDS([ofp-print OFPT_STATS_REPLY])
+AT_CHECK([ovs-ofctl ofp-print "\
+01 11 00 40 00 00 00 04 ff ff 00 00 00 00 23 20 00 00 00 02 00 00 00 00 \
+00 20 00 01 00 04 80 00 00 05 00 10 00 06 01 00 12 34 56 78 9a bc de f0 \
+00 00 00 02 00 01 00 00 \
+00 08 00 03 00 01 86 a0 \
+"], [0], [dnl
+NXST_FLOW_MONITOR reply (xid=0x4):
+ event=DELETED reason=eviction table=1 idle_timeout=5 hard_timeout=16 cookie=0x123456789abcdef0 in_port=1
+ event=ABBREV xid=0x186a0
+])
+AT_CLEANUP
index d703fa839cd8ab3d0c26adadcd15afda4396c6a6..804965b6f1ae3a10f78055a0f41c58be945bacac 100644 (file)
@@ -736,3 +736,133 @@ OFPT_BARRIER_REPLY:
 
 OVS_VSWITCHD_STOP
 AT_CLEANUP
+
+AT_SETUP([ofproto - flow monitoring])
+AT_KEYWORDS([monitor])
+OVS_VSWITCHD_START
+
+ovs-ofctl add-flow br0 in_port=0,dl_vlan=123,actions=output:1
+
+# Start a monitor watching the flow table and check the initial reply.
+ovs-ofctl monitor br0 watch: --detach --no-chdir --pidfile >monitor.log 2>&1
+AT_CAPTURE_FILE([monitor.log])
+ovs-appctl -t ovs-ofctl ofctl/barrier
+AT_CHECK([sed 's/ (xid=0x[[1-9a-fA-F]][[0-9a-fA-F]]*)//' monitor.log], [0],
+  [NXST_FLOW_MONITOR reply:
+ event=ADDED table=0 cookie=0 in_port=0,dl_vlan=123 actions=output:1
+OFPT_BARRIER_REPLY:
+])
+
+# Add, delete, and modify some flows and check the updates.
+ovs-appctl -t ovs-ofctl ofctl/set-output-file monitor.log
+ovs-ofctl add-flow br0 in_port=0,dl_vlan=124,actions=output:2
+ovs-ofctl add-flow br0 in_port=0,dl_vlan=123,actions=output:5
+ovs-ofctl mod-flows br0 cookie=5,dl_vlan=123,actions=output:3
+ovs-ofctl del-flows br0 dl_vlan=123
+ovs-ofctl del-flows br0
+ovs-appctl -t ovs-ofctl ofctl/barrier
+AT_CHECK([sed 's/ (xid=0x[[1-9a-fA-F]][[0-9a-fA-F]]*)//' monitor.log], [0],
+[NXST_FLOW_MONITOR reply (xid=0x0):
+ event=ADDED table=0 cookie=0 in_port=0,dl_vlan=124 actions=output:2
+NXST_FLOW_MONITOR reply (xid=0x0):
+ event=ADDED table=0 cookie=0 in_port=0,dl_vlan=123 actions=output:5
+NXST_FLOW_MONITOR reply (xid=0x0):
+ event=MODIFIED table=0 cookie=0x5 in_port=0,dl_vlan=123 actions=output:3
+NXST_FLOW_MONITOR reply (xid=0x0):
+ event=DELETED reason=delete table=0 cookie=0x5 in_port=0,dl_vlan=123 actions=output:3
+NXST_FLOW_MONITOR reply (xid=0x0):
+ event=DELETED reason=delete table=0 cookie=0 in_port=0,dl_vlan=124 actions=output:2
+OFPT_BARRIER_REPLY:
+])
+
+# Check that our own changes are reported as abbreviations.
+ovs-appctl -t ovs-ofctl ofctl/set-output-file monitor.log
+ovs-ofctl add-flow br0 in_port=1,actions=output:2
+ovs-ofctl add-flow br0 in_port=2,actions=output:1
+ovs-appctl -t ovs-ofctl ofctl/send 010e004812345678003fffff00000000000000000000000000000000000000000000000000000000000000000000000000000000000000000003000000000000ffffffffffff0000
+ovs-appctl -t ovs-ofctl ofctl/barrier
+AT_CHECK([ovs-ofctl dump-flows br0 | ofctl_strip], [0], [NXST_FLOW reply:
+])
+AT_CHECK([sed 's/ (xid=0x[[1-9a-fA-F]][[0-9a-fA-F]]*)//' monitor.log], [0],
+[NXST_FLOW_MONITOR reply (xid=0x0):
+ event=ADDED table=0 cookie=0 in_port=1 actions=output:2
+NXST_FLOW_MONITOR reply (xid=0x0):
+ event=ADDED table=0 cookie=0 in_port=2 actions=output:1
+send: OFPT_FLOW_MOD: DEL priority=0 actions=drop
+NXST_FLOW_MONITOR reply (xid=0x0):
+ event=ABBREV xid=0x12345678
+OFPT_BARRIER_REPLY:
+])
+
+ovs-appctl -t ovs-ofctl exit
+OVS_VSWITCHD_STOP
+AT_CLEANUP
+
+AT_SETUP([ofproto - flow monitoring pause and resume])
+AT_KEYWORDS([monitor])
+
+# With a Linux kernel, this file has the maximum socket receive buffer
+# size.  That's important for this test, which tests behavior when the
+# receive buffer overflows.
+AT_SKIP_IF([test ! -e /proc/sys/net/core/rmem_max])
+
+# Calculate the total amount of queuing: rmem_max in the kernel, 128 kB
+# in ofproto sending userspace (see ofmonitor_flush() in connmgr.c).
+rmem_max=`cat /proc/sys/net/core/rmem_max`
+queue_size=`expr $rmem_max + 128 \* 1024`
+echo rmem_max=$rmem_max queue_size=$queue_size
+
+# Each flow update message takes up at least 48 bytes of space in queues
+# and in practice more than that.
+n_msgs=`expr $queue_size / 48`
+echo n_msgs=$n_msgs
+
+OVS_VSWITCHD_START
+
+# Start a monitor watching the flow table, then make it block.
+ovs-ofctl monitor br0 watch: --detach --no-chdir --pidfile >monitor.log 2>&1
+AT_CAPTURE_FILE([monitor.log])
+ovs-appctl -t ovs-ofctl ofctl/block
+
+# Add $n_msgs flows.
+(echo "in_port=2,actions=output:2"
+perl -e '
+    for ($i = 0; $i < '$n_msgs'; $i++) {
+        print "cookie=1,reg1=$i,actions=drop\n";
+    }
+') > flows.txt
+AT_CHECK([ovs-ofctl add-flows br0 flows.txt])
+AT_CHECK([ovs-ofctl add-flow br0 in_port=1,cookie=3,actions=drop])
+AT_CHECK([ovs-ofctl mod-flows br0 in_port=2,cookie=2,actions=output:2])
+AT_CHECK([ovs-ofctl del-flows br0 cookie=1/-1])
+
+ovs-appctl -t ovs-ofctl ofctl/unblock
+ovs-appctl -t ovs-ofctl ofctl/barrier
+
+ovs-appctl -t ovs-ofctl exit
+
+# Check that the flow monitor reported the same number of flows
+# added and deleted, but fewer than we actually added and deleted.
+adds=`grep -c 'ADDED.*reg1=' monitor.log`
+deletes=`grep -c 'DELETED.*reg1=' monitor.log`
+echo adds=$adds deletes=$deletes
+AT_CHECK([test $adds -gt 100 && test $adds -lt $n_msgs])
+AT_CHECK([test $adds = $deletes])
+
+# Check that the flow monitor reported everything in the expected order.
+AT_CHECK([ofctl_strip < monitor.log | sed -n -e '
+/reg1=0x22\b/p
+/cookie=0x[[23]]/p
+/NXT_FLOW_MONITOR_PAUSED:/p
+/NXT_FLOW_MONITOR_RESUMED:/p
+'], [0],
+[ event=ADDED table=0 cookie=0x1 reg1=0x22
+NXT_FLOW_MONITOR_PAUSED:
+ event=DELETED reason=delete table=0 cookie=0x1 reg1=0x22
+ event=ADDED table=0 cookie=0x3 in_port=1
+ event=MODIFIED table=0 cookie=0x2 in_port=2 actions=output:2
+NXT_FLOW_MONITOR_RESUMED:
+])
+
+OVS_VSWITCHD_STOP
+AT_CLEANUP
index ebfde0fa81ddcf962d99b226179ccda00c196973..65fc6e8e6dedbb05d9dfe5e3a4e1066a30a850d8 100644 (file)
@@ -282,7 +282,7 @@ If a switch has no controller configured, or if
 the configured controller is disconnected, no traffic is sent, so
 monitoring will not show any traffic.
 .
-.IP "\fBmonitor \fIswitch\fR [\fImiss-len\fR] [\fBinvalid_ttl\fR]"
+.IP "\fBmonitor \fIswitch\fR [\fImiss-len\fR] [\fBinvalid_ttl\fR] [\fBwatch:\fR[\fIspec\fR...]]"
 Connects to \fIswitch\fR and prints to the console all OpenFlow
 messages received.  Usually, \fIswitch\fR should specify the name of a
 bridge in the \fBovs\-vswitchd\fR database.
@@ -295,15 +295,46 @@ does not send these and other asynchronous messages to an
 specified on this argument.  (Thus, if \fImiss\-len\fR is not
 specified, very little traffic will ordinarily be printed.)
 .IP
-.IP
 If \fBinvalid_ttl\fR is passed, \fBovs\-ofctl\fR sends an OpenFlow ``set
 configuration'' message at connection setup time that requests
 \fBINVALID_TTL_TO_CONTROLLER\fR, so that \fBovs\-ofctl monitor\fR can
 receive ``packet-in'' messages when TTL reaches zero on \fBdec_ttl\fR action.
 .IP
-
+\fBwatch:\fR[\fB\fIspec\fR...] causes \fBovs\-ofctl\fR to send a
+``monitor request'' Nicira extension message to the switch at
+connection setup time.  This message causes the switch to send
+information about flow table changes as they occur.  The following
+comma-separated \fIspec\fR syntax is available:
+.RS
+.IP "\fB!initial\fR"
+Do not report the switch's initial flow table contents.
+.IP "\fB!add\fR"
+Do not report newly added flows.
+.IP "\fB!delete\fR"
+Do not report deleted flows.
+.IP "\fB!modify\fR"
+Do not report modifications to existing flows.
+.IP "\fB!own\fR"
+Abbreviate changes made to the flow table by \fBovs\-ofctl\fR's own
+connection to the switch.  (These could only occur using the
+\fBofctl/send\fR command described below under \fBRUNTIME MANAGEMENT
+COMMANDS\fR.)
+.IP "\fB!actions\fR"
+Do not report actions as part of flow updates.
+.IP "\fBtable=\fInumber\fR"
+Limits the monitoring to the table with the given \fInumber\fR between
+0 and 254.  By default, all tables are monitored.
+.IP "\fBout_port=\fIport\fR"
+If set, only flows that output to \fIport\fR are monitored.
+.IP "\fIfield\fB=\fIvalue\fR"
+Monitors only flows that have \fIfield\fR specified as the given
+\fIvalue\fR.  Any syntax valid for matching on \fBdump\-flows\fR may
+be used.
+.RE
+.IP
 This command may be useful for debugging switch or controller
-implementations.
+implementations.  With \fBwatch:\fR, it is particularly useful for
+observing how a controller updates flow tables.
 .
 .SS "OpenFlow Switch and Controller Commands"
 .
index 1c75f4647624f60a19f623dfa2511cdd22d46092..d633d1c6c46a991618d8f284029ff2c2083aaaa5 100644 (file)
@@ -15,6 +15,7 @@
  */
 
 #include <config.h>
+#include <ctype.h>
 #include <errno.h>
 #include <getopt.h>
 #include <inttypes.h>
@@ -278,7 +279,7 @@ usage(void)
            "  diff-flows SOURCE1 SOURCE2  compare flows from two sources\n"
            "  packet-out SWITCH IN_PORT ACTIONS PACKET...\n"
            "                              execute ACTIONS on PACKET\n"
-           "  monitor SWITCH [MISSLEN] [invalid_ttl]\n"
+           "  monitor SWITCH [MISSLEN] [invalid_ttl] [watch:[...]]\n"
            "                              print packets received from SWITCH\n"
            "  snoop SWITCH                snoop on SWITCH and its controller\n"
            "\nFor OpenFlow switches and controllers:\n"
@@ -1255,10 +1256,57 @@ ofctl_set_output_file(struct unixctl_conn *conn, int argc OVS_UNUSED,
     unixctl_command_reply(conn, NULL);
 }
 
+struct block_aux {
+    struct vconn *vconn;
+    struct unixctl_server *server;
+    bool blocked;
+};
+
+static void
+ofctl_block(struct unixctl_conn *conn, int argc OVS_UNUSED,
+            const char *argv[] OVS_UNUSED, void *block_)
+{
+    struct block_aux *block = block_;
+
+    if (block->blocked) {
+        unixctl_command_reply(conn, "already blocking");
+        return;
+    }
+
+    block->blocked = true;
+    unixctl_command_reply(conn, NULL);
+    for (;;) {
+        unixctl_server_run(block->server);
+        if (!block->blocked) {
+            break;
+        }
+        vconn_run(block->vconn);
+
+        unixctl_server_wait(block->server);
+        vconn_run_wait(block->vconn);
+        poll_block();
+    }
+}
+
+static void
+ofctl_unblock(struct unixctl_conn *conn, int argc OVS_UNUSED,
+              const char *argv[] OVS_UNUSED, void *block_)
+{
+    struct block_aux *block = block_;
+
+    if (!block->blocked) {
+        unixctl_command_reply(conn, "not blocking");
+    } else {
+        block->blocked = false;
+        unixctl_command_reply(conn, NULL);
+    }
+}
+
 static void
 monitor_vconn(struct vconn *vconn)
 {
     struct barrier_aux barrier_aux = { vconn, NULL };
+    struct block_aux block;
     struct unixctl_server *server;
     bool exiting = false;
     int error;
@@ -1276,6 +1324,13 @@ monitor_vconn(struct vconn *vconn)
                              ofctl_barrier, &barrier_aux);
     unixctl_command_register("ofctl/set-output-file", "FILE", 1, 1,
                              ofctl_set_output_file, NULL);
+
+    block.vconn = vconn;
+    block.server = server;
+    block.blocked = false;
+    unixctl_command_register("ofctl/block", "", 0, 0, ofctl_block, &block);
+    unixctl_command_register("ofctl/unblock", "", 0, 0, ofctl_unblock, &block);
+
     daemonize_complete();
 
     for (;;) {
@@ -1329,20 +1384,34 @@ static void
 ofctl_monitor(int argc, char *argv[])
 {
     struct vconn *vconn;
+    int i;
 
     open_vconn(argv[1], &vconn);
-    if (argc > 2) {
-        struct ofp_switch_config config;
+    for (i = 2; i < argc; i++) {
+        const char *arg = argv[i];
 
-        fetch_switch_config(vconn, &config);
-        config.miss_send_len = htons(atoi(argv[2]));
-        set_switch_config(vconn, &config);
-    }
-    if (argc > 3) {
-        if (!strcmp(argv[3], "invalid_ttl")) {
+        if (isdigit((unsigned char) *arg)) {
+            struct ofp_switch_config config;
+
+            fetch_switch_config(vconn, &config);
+            config.miss_send_len = htons(atoi(arg));
+            set_switch_config(vconn, &config);
+        } else if (!strcmp(arg, "invalid_ttl")) {
             monitor_set_invalid_ttl_to_controller(vconn);
+        } else if (!strncmp(arg, "watch:", 6)) {
+            struct ofputil_flow_monitor_request fmr;
+            struct ofpbuf *msg;
+
+            parse_flow_monitor_request(&fmr, arg + 6);
+
+            msg = ofpbuf_new(0);
+            ofputil_append_flow_monitor_request(&fmr, msg);
+            dump_stats_transaction__(vconn, msg);
+        } else {
+            ovs_fatal(0, "%s: unsupported \"monitor\" argument", arg);
         }
     }
+
     if (preferred_packet_in_format >= 0) {
         set_packet_in_format(vconn, preferred_packet_in_format);
     } else {