vswitchd: Implement stats request manager.
authorBen Pfaff <blp@nicira.com>
Wed, 24 Dec 2008 23:09:41 +0000 (15:09 -0800)
committerBen Pfaff <blp@nicira.com>
Fri, 26 Dec 2008 21:27:24 +0000 (13:27 -0800)
lib/rconn.c
lib/rconn.h
lib/vlog-modules.def
vswitchd/automake.mk
vswitchd/stats.c [new file with mode: 0644]
vswitchd/stats.h [new file with mode: 0644]

index 086cf7d9d3cc77c0efffdd84a7235a9951809bd7..2051bc1a440fada63d260ffc85780dc7dd74d255 100644 (file)
@@ -91,6 +91,7 @@ struct rconn {
     time_t last_received;
     time_t last_connected;
     unsigned int packets_sent;
+    unsigned int seqno;
 
     /* In S_ACTIVE and S_IDLE, probably_admitted reports whether we believe
      * that the peer has made a (positive) admission control decision on our
@@ -194,6 +195,7 @@ rconn_create(int probe_interval, int max_backoff)
     rc->backoff_deadline = TIME_MIN;
     rc->last_received = time_now();
     rc->last_connected = time_now();
+    rc->seqno = 0;
 
     rc->packets_sent = 0;
 
@@ -693,6 +695,14 @@ rconn_get_state_elapsed(const struct rconn *rc)
 {
     return elapsed_in_this_state(rc);
 }
+
+/* Returns 'rc''s current connection sequence number, a number that changes
+ * every time that 'rconn' connects or disconnects. */
+unsigned int
+rconn_get_connection_seqno(const struct rconn *rc)
+{
+    return rc->seqno;
+}
 \f
 /* Tries to send a packet from 'rc''s send buffer.  Returns 0 if successful,
  * otherwise a positive errno value. */
@@ -805,6 +815,7 @@ timed_out(const struct rconn *rc)
 static void
 state_transition(struct rconn *rc, enum state state)
 {
+    rc->seqno += (rc->state == S_ACTIVE) != (state == S_ACTIVE);
     if (is_connected_state(state) && !is_connected_state(rc->state)) {
         rc->probably_admitted = false;
     }
index 1ce73e9c9a99ccb312c4ebc398dd153d2e64cc1f..694665ef4e166fbbc3cece0ac44f85abf098f848 100644 (file)
@@ -91,5 +91,6 @@ time_t rconn_get_creation_time(const struct rconn *);
 unsigned long int rconn_get_total_time_connected(const struct rconn *);
 int rconn_get_backoff(const struct rconn *);
 unsigned int rconn_get_state_elapsed(const struct rconn *);
+unsigned int rconn_get_connection_seqno(const struct rconn *);
 
 #endif /* rconn.h */
index 2bb5919a61f892dbb205b0172d4365ccf893fc6d..3fcd61f0a8bcd1143823264f03e1fde5ddbb103e 100644 (file)
@@ -29,6 +29,7 @@ VLOG_MODULE(rconn)
 VLOG_MODULE(snat)
 VLOG_MODULE(stp)
 VLOG_MODULE(stp_secchan)
+VLOG_MODULE(stats)
 VLOG_MODULE(status)
 VLOG_MODULE(switch)
 VLOG_MODULE(terminal)
index d059a07f3f24931fa2bbe6b5440f7f5a8278152a..98bc31b6871101bcd5ed54f4bbc77df4d8a17066 100644 (file)
@@ -9,6 +9,8 @@ vswitchd_vswitchd_SOURCES = \
        vswitchd/cfg.h \
        vswitchd/flowtrack.c \
        vswitchd/flowtrack.h \
+       vswitchd/stats.c \
+       vswitchd/stats.h \
        vswitchd/vswitchd.c
 vswitchd_vswitchd_LDADD = lib/libopenflow.a $(FAULT_LIBS) $(SSL_LIBS)
 
diff --git a/vswitchd/stats.c b/vswitchd/stats.c
new file mode 100644 (file)
index 0000000..ab2dc14
--- /dev/null
@@ -0,0 +1,313 @@
+/* Copyright (c) 2008 The Board of Trustees of The Leland Stanford
+ * Junior University
+ *
+ * We are making the OpenFlow specification and associated documentation
+ * (Software) available for public use and benefit with the expectation
+ * that others will use, modify and enhance the Software and contribute
+ * those enhancements back to the community. However, since we would
+ * like to make the Software available for broadest use, with as few
+ * restrictions as possible permission is hereby granted, free of
+ * charge, to any person obtaining a copy of this Software to deal in
+ * the Software under the copyrights without restriction, including
+ * without limitation the rights to use, copy, modify, merge, publish,
+ * distribute, sublicense, and/or sell copies of the Software, and to
+ * permit persons to whom the Software is furnished to do so, subject to
+ * the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be
+ * included in all copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
+ * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
+ * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
+ * NONINFRINGEMENT.  IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
+ * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
+ * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
+ * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ *
+ * The name and trademarks of copyright holder(s) may NOT be used in
+ * advertising or publicity pertaining to the Software or any
+ * derivatives without specific, written prior permission.
+ */
+
+#include <config.h>
+#include "stats.h"
+#include <assert.h>
+#include <errno.h>
+#include <inttypes.h>
+#include <arpa/inet.h>
+#include <stdlib.h>
+#include "list.h"
+#include "ofpbuf.h"
+#include "openflow/openflow.h"
+#include "poll-loop.h"
+#include "rconn.h"
+#include "timeval.h"
+#include "util.h"
+#include "vconn.h"
+
+#define THIS_MODULE VLM_stats
+#include "vlog.h"
+
+/* XXX when an ongoing stats requests is canceled, might want to wait for the
+ * completed replies before sending the next request */
+
+struct stats_mgr {
+    struct stats_request *rq;
+    struct list requests;
+    struct rconn *rconn;
+    unsigned int rconn_seqno;
+    int txqlen;
+    time_t timeout;
+};
+
+/* Number of seconds we'll wait for a stats reply before giving up. */
+#define STATS_TIMEOUT 5
+
+struct stats_request {
+    struct stats_mgr *mgr;
+    struct list node;
+    struct ofpbuf *osr;
+    uint32_t xid;
+    uint16_t type;
+    enum {
+        SR_INIT,
+        SR_PARTIAL,
+        SR_DONE
+    } state;
+    int error;
+    struct ofp_queue replies;
+};
+
+static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 5);
+
+static void complete_request(struct stats_request *, int error);
+static void send_stats_request(struct stats_mgr *);
+static void cancel_all_requests(struct stats_mgr *, int error);
+\f
+/* Creates and returns a new stats_mgr that sends stats requests on 'rconn'. */
+struct stats_mgr *
+stats_mgr_create(struct rconn *rconn)
+{
+    struct stats_mgr *mgr = xcalloc(1, sizeof *mgr);
+    mgr->rq = NULL;
+    list_init(&mgr->requests);
+    mgr->rconn = rconn;
+    mgr->rconn_seqno = rconn_get_connection_seqno(rconn);
+    mgr->txqlen = 0;
+    return mgr;
+}
+
+/* Destroys 'mgr'.  Careful: the rconn underlying 'mgr' must be destroyed
+ * *before* calling this function (because destroying the rconn will update
+ * 'mgr->txqlen').
+ *
+ * stats_requests on 'mgr' are canceled, but not destroyed; the client must
+ * destroy them individually with stats_request_destroy(). */
+void
+stats_mgr_destroy(struct stats_mgr *mgr)
+{
+    if (mgr) {
+        cancel_all_requests(mgr, ECONNABORTED);
+        free(mgr);
+    }
+}
+
+/* Allows 'mgr' to process 'osr' that was received on 'mgr''s rconn. */
+void
+stats_mgr_receive_stats_reply(struct stats_mgr *mgr,
+                              const struct ofp_stats_reply *osr)
+{
+    struct stats_request *rq = mgr->rq;
+
+    if (!rq || check_ofp_message_array(&osr->header, OFPT_STATS_REPLY,
+                                       sizeof *osr, 1, NULL)) {
+        return;
+    }
+    if (osr->header.xid != rq->xid) {
+        VLOG_DBG_RL(&rl, "stats reply has xid %08"PRIx32" != "
+                    "expected %08"PRIx32, osr->header.xid, rq->xid);
+        return;
+    }
+    if (osr->type != rq->type) {
+        VLOG_WARN_RL(&rl, "stats reply has type %04"PRIx16" != "
+                     "expected %04"PRIx16, osr->type, rq->type);
+        return;
+    }
+
+    queue_push_tail(&rq->replies,
+                    ofpbuf_clone_data(osr, ntohs(osr->header.length)));
+    if (osr->flags & htons(OFPSF_REPLY_MORE)) {
+        rq->state = SR_PARTIAL;
+        mgr->timeout = time_now() + STATS_TIMEOUT;
+    } else {
+        complete_request(rq, 0);
+        send_stats_request(mgr);
+    }
+}
+
+/* Allows 'mgr' to process 'oem' that was received on 'mgr''s rconn. */
+void
+stats_mgr_receive_error_msg(struct stats_mgr *mgr,
+                            const struct ofp_error_msg *oem)
+{
+    struct stats_request *rq = mgr->rq;
+
+    if (!rq
+        || check_ofp_message_array(&oem->header, OFPT_ERROR,
+                                   sizeof *oem, 1, NULL)
+        || oem->header.xid != rq->xid) {
+        return;
+    }
+
+    VLOG_WARN_RL(&rl, "error reply to stats request (%"PRIu16", %"PRIu16")",
+                 ntohs(oem->type), ntohs(oem->code));
+    complete_request(rq, EPROTO);
+    send_stats_request(mgr);
+}
+
+/* Performs internal processing for 'mgr'. */
+void
+stats_mgr_run(struct stats_mgr *mgr)
+{
+    if (mgr->rq && time_now() > mgr->timeout) {
+        VLOG_WARN_RL(&rl, "stats request timed out");
+        complete_request(mgr->rq, ETIMEDOUT);
+        send_stats_request(mgr);
+    }
+}
+
+/* Causes the next call to poll_block() to wake up when stats_mgr_run()
+ * should be called on 'mgr'. */
+void
+stats_mgr_wait(struct stats_mgr *mgr)
+{
+    if (mgr->rq) {
+        poll_timer_wait((mgr->timeout - time_now()) * 1000);
+    }
+}
+\f
+/* Creates a new stats_request, adds it to 'mgr', and returns it.  'msg' will
+ * be sent as the statistics request. */
+struct stats_request *
+stats_request_create(struct stats_mgr *mgr, struct ofpbuf *msg)
+{
+    struct ofp_stats_request *osr = msg->data;
+    struct stats_request *rq;
+
+    /* Flush out any dead requests if the rconn connected or disconnected, and
+     * update mgr->rconn_seqno so that the request we are about to queue can be
+     * sent. */
+    send_stats_request(mgr);
+
+    /* Queue the request. */
+    rq = xcalloc(1, sizeof *rq);
+    rq->mgr = mgr;
+    list_push_back(&mgr->requests, &rq->node);
+    rq->xid = osr->header.xid;
+    rq->type = osr->type;
+    rq->osr = msg;
+    rq->state = SR_INIT;
+    queue_init(&rq->replies);
+
+    /* Send the request, if possible. */
+    send_stats_request(mgr);
+
+    return rq;
+}
+
+/* Destroys 'rq'. */
+void
+stats_request_destroy(struct stats_request *rq)
+{
+    if (rq) {
+        complete_request(rq, ECANCELED);
+        queue_destroy(&rq->replies);
+        free(rq);
+    }
+}
+
+/* Reports the status of 'rq': 0 if the request completed successfully, EAGAIN
+ * if the request is in progress, otherwise a positive errno value if the
+ * request completed with an error. */
+int
+stats_request_get_status(const struct stats_request *rq)
+{
+    return rq->state == SR_DONE ? rq->error : EAGAIN;
+}
+
+/*
+ * If statistics reply messages remain in the queue in 'rq', stores the first
+ * one in '*msg' and returns true.  Otherwise, if no statistics reply messages
+ * remain in 'rq''s queue, stores a null pointer in '*msg' and returns false.
+ *
+ * This function may only be called for 'rq' if stats_request_get_status(rq)
+ * returns 0. */
+bool
+stats_request_get_reply(struct stats_request *rq, struct ofpbuf **msg)
+{
+    assert(rq->state == SR_DONE && !rq->error);
+    if (rq->replies.n) {
+        *msg = queue_pop_head(&rq->replies);
+        return true;
+    } else {
+        *msg = NULL;
+        return false;
+    }
+}
+\f
+static void
+complete_request(struct stats_request *rq, int error)
+{
+    struct stats_mgr *mgr = rq->mgr;
+    if (mgr) {
+        if (mgr->rq == rq) {
+            mgr->rq = NULL;
+        } else {
+            list_remove(&rq->node);
+        }
+        rq->mgr = NULL;
+    }
+    if (rq->osr) {
+        ofpbuf_delete(rq->osr);
+        rq->osr = NULL;
+    }
+    rq->state = SR_DONE;
+    rq->error = error;
+}
+
+static void
+cancel_all_requests(struct stats_mgr *mgr, int error)
+{
+    struct stats_request *rq, *next;
+    LIST_FOR_EACH_SAFE (rq, next, struct stats_request, node, &mgr->requests) {
+        complete_request(rq, error);
+    }
+}
+
+static void
+send_stats_request(struct stats_mgr *mgr)
+{
+    if (mgr->rconn_seqno != rconn_get_connection_seqno(mgr->rconn)
+        || !rconn_is_connected(mgr->rconn)) {
+        mgr->rconn_seqno = rconn_get_connection_seqno(mgr->rconn);
+        cancel_all_requests(mgr, ENOTCONN);
+    } else if (!mgr->txqlen && !mgr->rq && !list_is_empty(&mgr->requests)) {
+        struct stats_request *rq;
+        int retval;
+
+        rq = mgr->rq = CONTAINER_OF(list_pop_front(&mgr->requests),
+                                  struct stats_request, node);
+        retval = rconn_send(mgr->rconn, rq->osr, &mgr->txqlen);
+        if (!retval) {
+            /* rconn_send() consumed the message. */
+            rq->osr = NULL;
+            mgr->timeout = time_now() + STATS_TIMEOUT;
+        } else {
+            /* Sending failed, although it shouldn't have since we already
+             * checked that we were connected.  Odd. */
+            cancel_all_requests(mgr, ENOTCONN);
+        }
+    }
+}
diff --git a/vswitchd/stats.h b/vswitchd/stats.h
new file mode 100644 (file)
index 0000000..9aa0585
--- /dev/null
@@ -0,0 +1,63 @@
+/* Copyright (c) 2008 The Board of Trustees of The Leland Stanford
+ * Junior University
+ *
+ * We are making the OpenFlow specification and associated documentation
+ * (Software) available for public use and benefit with the expectation
+ * that others will use, modify and enhance the Software and contribute
+ * those enhancements back to the community. However, since we would
+ * like to make the Software available for broadest use, with as few
+ * restrictions as possible permission is hereby granted, free of
+ * charge, to any person obtaining a copy of this Software to deal in
+ * the Software under the copyrights without restriction, including
+ * without limitation the rights to use, copy, modify, merge, publish,
+ * distribute, sublicense, and/or sell copies of the Software, and to
+ * permit persons to whom the Software is furnished to do so, subject to
+ * the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be
+ * included in all copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
+ * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
+ * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
+ * NONINFRINGEMENT.  IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
+ * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
+ * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
+ * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ *
+ * The name and trademarks of copyright holder(s) may NOT be used in
+ * advertising or publicity pertaining to the Software or any
+ * derivatives without specific, written prior permission.
+ */
+
+#ifndef VSWITCHD_STATS_H
+#define VSWITCHD_STATS_H 1
+
+#include <stdbool.h>
+
+struct ofp_error_msg;
+struct ofp_stats_reply;
+struct ofpbuf;
+struct rconn;
+
+/* A stats_mgr managing sending OpenFlow statistics requests over an rconn
+ * and receiving the replies. */
+struct stats_mgr *stats_mgr_create(struct rconn *);
+void stats_mgr_destroy(struct stats_mgr *);
+void stats_mgr_receive_stats_reply(struct stats_mgr *,
+                                   const struct ofp_stats_reply *);
+void stats_mgr_receive_error_msg(struct stats_mgr *,
+                                 const struct ofp_error_msg *);
+void stats_mgr_run(struct stats_mgr *);
+void stats_mgr_wait(struct stats_mgr *);
+
+/* A stats_request represents a single OpenFlow statistics request within a
+ * stats_mgr. */
+struct stats_request *stats_request_create(struct stats_mgr *,
+                                           struct ofpbuf *);
+void stats_request_destroy(struct stats_request *);
+int stats_request_get_status(const struct stats_request *);
+bool stats_request_get_reply(struct stats_request *, struct ofpbuf **);
+
+#endif /* vswitchd/stats.h */