Implement JSON-RPC protocol.
authorBen Pfaff <blp@nicira.com>
Mon, 26 Oct 2009 22:04:05 +0000 (15:04 -0700)
committerBen Pfaff <blp@nicira.com>
Wed, 4 Nov 2009 23:24:40 +0000 (15:24 -0800)
lib/automake.mk
lib/jsonrpc.c [new file with mode: 0644]
lib/jsonrpc.h [new file with mode: 0644]
lib/vlog-modules.def
tests/automake.mk
tests/jsonrpc.at [new file with mode: 0644]
tests/test-json.c
tests/test-jsonrpc.c [new file with mode: 0644]
tests/testsuite.at

index 8dc2899065c3a6ab3eaa43aab3e1589a2cc9a7e4..07f7ce71ae556034f929ea3e5a508a6347e84c58 100644 (file)
@@ -55,6 +55,8 @@ lib_libopenvswitch_a_SOURCES = \
        lib/hmap.h \
        lib/json.c \
        lib/json.h \
+       lib/jsonrpc.c \
+       lib/jsonrpc.h \
        lib/leak-checker.c \
        lib/leak-checker.h \
        lib/learning-switch.c \
diff --git a/lib/jsonrpc.c b/lib/jsonrpc.c
new file mode 100644 (file)
index 0000000..cf2a497
--- /dev/null
@@ -0,0 +1,555 @@
+/*
+ * Copyright (c) 2009 Nicira Networks.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <config.h>
+
+#include "jsonrpc.h"
+
+#include <errno.h>
+
+#include "byteq.h"
+#include "json.h"
+#include "list.h"
+#include "ofpbuf.h"
+#include "poll-loop.h"
+#include "queue.h"
+#include "stream.h"
+
+#define THIS_MODULE VLM_jsonrpc
+#include "vlog.h"
+\f
+struct jsonrpc {
+    struct stream *stream;
+    char *name;
+    int status;
+
+    /* Input. */
+    struct byteq input;
+    struct json_parser *parser;
+    struct jsonrpc_msg *received;
+
+    /* Output. */
+    struct ovs_queue output;
+    size_t backlog;
+};
+
+/* Rate limit for error messages. */
+static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(5, 5);
+
+static void jsonrpc_received(struct jsonrpc *);
+static void jsonrpc_cleanup(struct jsonrpc *);
+
+struct jsonrpc *
+jsonrpc_open(struct stream *stream)
+{
+    struct jsonrpc *rpc;
+
+    assert(stream != NULL);
+
+    rpc = xzalloc(sizeof *rpc);
+    rpc->name = xstrdup(stream_get_name(stream));
+    rpc->stream = stream;
+    byteq_init(&rpc->input);
+    queue_init(&rpc->output);
+
+    return rpc;
+}
+
+void
+jsonrpc_close(struct jsonrpc *rpc)
+{
+    if (rpc) {
+        jsonrpc_cleanup(rpc);
+        free(rpc->name);
+        free(rpc);
+    }
+}
+
+void
+jsonrpc_run(struct jsonrpc *rpc)
+{
+    if (rpc->status) {
+        return;
+    }
+
+    while (!queue_is_empty(&rpc->output)) {
+        struct ofpbuf *buf = rpc->output.head;
+        int retval;
+
+        retval = stream_send(rpc->stream, buf->data, buf->size);
+        if (retval >= 0) {
+            rpc->backlog -= retval;
+            ofpbuf_pull(buf, retval);
+            if (!buf->size) {
+                ofpbuf_delete(queue_pop_head(&rpc->output));
+            }
+        } else {
+            if (retval != -EAGAIN) {
+                VLOG_WARN_RL(&rl, "%s: send error: %s",
+                             rpc->name, strerror(-retval));
+                jsonrpc_error(rpc, -retval);
+            }
+            break;
+        }
+    }
+}
+
+void
+jsonrpc_wait(struct jsonrpc *rpc)
+{
+    if (!rpc->status && !queue_is_empty(&rpc->output)) {
+        stream_send_wait(rpc->stream);
+    }
+}
+
+int
+jsonrpc_get_status(const struct jsonrpc *rpc)
+{
+    return rpc->status;
+}
+
+size_t
+jsonrpc_get_backlog(const struct jsonrpc *rpc)
+{
+    return rpc->status ? 0 : rpc->backlog;
+}
+
+const char *
+jsonrpc_get_name(const struct jsonrpc *rpc)
+{
+    return rpc->name;
+}
+
+int
+jsonrpc_send(struct jsonrpc *rpc, struct jsonrpc_msg *msg)
+{
+    struct ofpbuf *buf;
+    struct json *json;
+    size_t length;
+    char *s;
+
+    if (rpc->status) {
+        jsonrpc_msg_destroy(msg);
+        return rpc->status;
+    }
+
+    json = jsonrpc_msg_to_json(msg);
+    s = json_to_string(json, 0);
+    length = strlen(s);
+    json_destroy(json);
+
+    buf = xmalloc(sizeof *buf);
+    ofpbuf_use(buf, s, length);
+    buf->size = length;
+    queue_push_tail(&rpc->output, buf);
+    rpc->backlog += length;
+
+    if (rpc->output.n == 1) {
+        jsonrpc_run(rpc);
+    }
+    return rpc->status;
+}
+
+int
+jsonrpc_recv(struct jsonrpc *rpc, struct jsonrpc_msg **msgp)
+{
+    *msgp = NULL;
+    if (rpc->status) {
+        return rpc->status;
+    }
+
+    while (!rpc->received) {
+        if (byteq_is_empty(&rpc->input)) {
+            size_t chunk;
+            int retval;
+
+            chunk = byteq_headroom(&rpc->input);
+            retval = stream_recv(rpc->stream, byteq_head(&rpc->input), chunk);
+            if (retval < 0) {
+                if (retval == -EAGAIN) {
+                    return EAGAIN;
+                } else {
+                    VLOG_WARN_RL(&rl, "%s: receive error: %s",
+                                 rpc->name, strerror(-retval));
+                    jsonrpc_error(rpc, -retval);
+                    return rpc->status;
+                }
+            } else if (retval == 0) {
+                VLOG_INFO_RL(&rl, "%s: connection closed", rpc->name);
+                jsonrpc_error(rpc, EOF);
+                return EOF;
+            }
+            byteq_advance_head(&rpc->input, retval);
+        } else {
+            size_t n, used;
+
+            if (!rpc->parser) {
+                rpc->parser = json_parser_create(0);
+            }
+            n = byteq_tailroom(&rpc->input);
+            used = json_parser_feed(rpc->parser,
+                                    (char *) byteq_tail(&rpc->input), n);
+            byteq_advance_tail(&rpc->input, used);
+            if (json_parser_is_done(rpc->parser)) {
+                jsonrpc_received(rpc);
+                if (rpc->status) {
+                    return rpc->status;
+                }
+            }
+        }
+    }
+
+    *msgp = rpc->received;
+    rpc->received = NULL;
+    return 0;
+}
+
+void
+jsonrpc_recv_wait(struct jsonrpc *rpc)
+{
+    if (rpc->status || rpc->received || !byteq_is_empty(&rpc->input)) {
+        poll_immediate_wake();
+    } else {
+        stream_recv_wait(rpc->stream);
+    }
+}
+
+int
+jsonrpc_send_block(struct jsonrpc *rpc, struct jsonrpc_msg *msg)
+{
+    int error;
+
+    error = jsonrpc_send(rpc, msg);
+    if (error) {
+        return error;
+    }
+
+    while (!queue_is_empty(&rpc->output) && !rpc->status) {
+        jsonrpc_run(rpc);
+        jsonrpc_wait(rpc);
+        poll_block();
+    }
+    return rpc->status;
+}
+
+int
+jsonrpc_recv_block(struct jsonrpc *rpc, struct jsonrpc_msg **msgp)
+{
+    for (;;) {
+        int error = jsonrpc_recv(rpc, msgp);
+        if (error != EAGAIN) {
+            return error;
+        }
+
+        jsonrpc_run(rpc);
+        jsonrpc_wait(rpc);
+        jsonrpc_recv_wait(rpc);
+        poll_block();
+    }
+}
+
+static void
+jsonrpc_received(struct jsonrpc *rpc)
+{
+    struct jsonrpc_msg *msg;
+    struct json *json;
+    char *error;
+
+    json = json_parser_finish(rpc->parser);
+    rpc->parser = NULL;
+    if (json->type == JSON_STRING) {
+        VLOG_WARN_RL(&rl, "%s: error parsing stream: %s",
+                     rpc->name, json_string(json));
+        jsonrpc_error(rpc, EPROTO);
+        json_destroy(json);
+        return;
+    }
+
+    error = jsonrpc_msg_from_json(json, &msg);
+    if (error) {
+        VLOG_WARN_RL(&rl, "%s: received bad JSON-RPC message: %s",
+                     rpc->name, error);
+        free(error);
+        jsonrpc_error(rpc, EPROTO);
+        return;
+    }
+
+    rpc->received = msg;
+}
+
+void
+jsonrpc_error(struct jsonrpc *rpc, int error)
+{
+    assert(error);
+    if (!rpc->status) {
+        rpc->status = error;
+        jsonrpc_cleanup(rpc);
+    }
+}
+
+static void
+jsonrpc_cleanup(struct jsonrpc *rpc)
+{
+    stream_close(rpc->stream);
+    rpc->stream = NULL;
+
+    json_parser_abort(rpc->parser);
+    rpc->parser = NULL;
+
+    jsonrpc_msg_destroy(rpc->received);
+    rpc->received = NULL;
+
+    queue_clear(&rpc->output);
+    rpc->backlog = 0;
+}
+\f
+static struct jsonrpc_msg *
+jsonrpc_create(enum jsonrpc_msg_type type, const char *method,
+                struct json *params, struct json *result, struct json *error,
+                struct json *id)
+{
+    struct jsonrpc_msg *msg = xmalloc(sizeof *msg);
+    msg->type = type;
+    msg->method = method ? xstrdup(method) : NULL;
+    msg->params = params;
+    msg->result = result;
+    msg->error = error;
+    msg->id = id;
+    return msg;
+}
+
+static struct json *
+jsonrpc_create_id(void)
+{
+    static unsigned int id;
+    return json_integer_create(id++);
+}
+
+struct jsonrpc_msg *
+jsonrpc_create_request(const char *method, struct json *params)
+{
+    return jsonrpc_create(JSONRPC_REQUEST, method, params, NULL, NULL,
+                           jsonrpc_create_id());
+}
+
+struct jsonrpc_msg *
+jsonrpc_create_notify(const char *method, struct json *params)
+{
+    return jsonrpc_create(JSONRPC_NOTIFY, method, params, NULL, NULL, NULL);
+}
+
+struct jsonrpc_msg *
+jsonrpc_create_reply(struct json *result, const struct json *id)
+{
+    return jsonrpc_create(JSONRPC_REPLY, NULL, NULL, result, NULL,
+                           json_clone(id));
+}
+
+struct jsonrpc_msg *
+jsonrpc_create_error(struct json *error, const struct json *id)
+{
+    return jsonrpc_create(JSONRPC_REPLY, NULL, NULL, NULL, error,
+                           json_clone(id));
+}
+
+const char *
+jsonrpc_msg_type_to_string(enum jsonrpc_msg_type type)
+{
+    switch (type) {
+    case JSONRPC_REQUEST:
+        return "request";
+
+    case JSONRPC_NOTIFY:
+        return "notification";
+
+    case JSONRPC_REPLY:
+        return "reply";
+
+    case JSONRPC_ERROR:
+        return "error";
+    }
+    return "(null)";
+}
+
+char *
+jsonrpc_msg_is_valid(const struct jsonrpc_msg *m)
+{
+    const char *type_name;
+    unsigned int pattern;
+
+    if (m->params && m->params->type != JSON_ARRAY) {
+        return xstrdup("\"params\" must be JSON array");
+    }
+
+    switch (m->type) {
+    case JSONRPC_REQUEST:
+        pattern = 0x11001;
+        break;
+
+    case JSONRPC_NOTIFY:
+        pattern = 0x11000;
+        break;
+
+    case JSONRPC_REPLY:
+        pattern = 0x00101;
+        break;
+
+    case JSONRPC_ERROR:
+        pattern = 0x00011;
+        break;
+
+    default:
+        return xasprintf("invalid JSON-RPC message type %d", m->type);
+    }
+
+    type_name = jsonrpc_msg_type_to_string(m->type);
+    if ((m->method != NULL) != ((pattern & 0x10000) != 0)) {
+        return xasprintf("%s must%s have \"method\"",
+                         type_name, (pattern & 0x10000) ? "" : " not");
+
+    }
+    if ((m->params != NULL) != ((pattern & 0x1000) != 0)) {
+        return xasprintf("%s must%s have \"params\"",
+                         type_name, (pattern & 0x1000) ? "" : " not");
+
+    }
+    if ((m->result != NULL) != ((pattern & 0x100) != 0)) {
+        return xasprintf("%s must%s have \"result\"",
+                         type_name, (pattern & 0x100) ? "" : " not");
+
+    }
+    if ((m->error != NULL) != ((pattern & 0x10) != 0)) {
+        return xasprintf("%s must%s have \"error\"",
+                         type_name, (pattern & 0x10) ? "" : " not");
+
+    }
+    if ((m->id != NULL) != ((pattern & 0x1) != 0)) {
+        return xasprintf("%s must%s have \"id\"",
+                         type_name, (pattern & 0x1) ? "" : " not");
+
+    }
+    return NULL;
+}
+
+void
+jsonrpc_msg_destroy(struct jsonrpc_msg *m)
+{
+    if (m) {
+        free(m->method);
+        json_destroy(m->params);
+        json_destroy(m->result);
+        json_destroy(m->error);
+        json_destroy(m->id);
+        free(m);
+    }
+}
+
+static struct json *
+null_from_json_null(struct json *json)
+{
+    if (json && json->type == JSON_NULL) {
+        json_destroy(json);
+        return NULL;
+    }
+    return json;
+}
+
+char *
+jsonrpc_msg_from_json(struct json *json, struct jsonrpc_msg **msgp)
+{
+    struct json *method = NULL;
+    struct jsonrpc_msg *msg = NULL;
+    struct shash *object;
+    char *error;
+
+    if (json->type != JSON_OBJECT) {
+        error = xstrdup("message is not a JSON object");
+        goto exit;
+    }
+    object = json_object(json);
+
+    method = shash_find_and_delete(object, "method");
+    if (method && method->type != JSON_STRING) {
+        error = xstrdup("method is not a JSON string");
+        goto exit;
+    }
+
+    msg = xzalloc(sizeof *msg);
+    msg->method = method ? xstrdup(method->u.string) : NULL;
+    msg->params = null_from_json_null(shash_find_and_delete(object, "params"));
+    msg->result = null_from_json_null(shash_find_and_delete(object, "result"));
+    msg->error = null_from_json_null(shash_find_and_delete(object, "error"));
+    msg->id = null_from_json_null(shash_find_and_delete(object, "id"));
+    msg->type = (msg->result ? JSONRPC_REPLY
+                 : msg->error ? JSONRPC_ERROR
+                 : msg->id ? JSONRPC_REQUEST
+                 : JSONRPC_NOTIFY);
+    if (!shash_is_empty(object)) {
+        error = xasprintf("message has unexpected member \"%s\"",
+                          shash_first(object)->name);
+        goto exit;
+    }
+    error = jsonrpc_msg_is_valid(msg);
+    if (error) {
+        goto exit;
+    }
+
+exit:
+    json_destroy(method);
+    json_destroy(json);
+    if (error) {
+        jsonrpc_msg_destroy(msg);
+        msg = NULL;
+    }
+    *msgp = msg;
+    return error;
+}
+
+struct json *
+jsonrpc_msg_to_json(struct jsonrpc_msg *m)
+{
+    struct json *json = json_object_create();
+
+    if (m->method) {
+        json_object_put(json, "method", json_string_create_nocopy(m->method));
+    }
+
+    if (m->params) {
+        json_object_put(json, "params", m->params);
+    }
+
+    if (m->result) {
+        json_object_put(json, "result", m->result);
+    } else if (m->type == JSONRPC_ERROR) {
+        json_object_put(json, "result", json_null_create());
+    }
+
+    if (m->error) {
+        json_object_put(json, "error", m->error);
+    } else if (m->type == JSONRPC_REPLY) {
+        json_object_put(json, "error", json_null_create());
+    }
+
+    if (m->id) {
+        json_object_put(json, "id", m->id);
+    } else if (m->type == JSONRPC_NOTIFY) {
+        json_object_put(json, "id", json_null_create());
+    }
+
+    free(m);
+
+    return json;
+}
diff --git a/lib/jsonrpc.h b/lib/jsonrpc.h
new file mode 100644 (file)
index 0000000..931983f
--- /dev/null
@@ -0,0 +1,83 @@
+/*
+ * Copyright (c) 2009 Nicira Networks.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef JSONRPC_H
+#define JSONRPC_H 1
+
+/* This is an implementation of the JSON-RPC 1.0 specification defined at
+ * http://json-rpc.org/wiki/specification. */
+
+#include <stdbool.h>
+#include <stddef.h>
+
+struct json;
+struct jsonrpc_msg;
+struct stream;
+\f
+/* API for a JSON-RPC stream. */
+
+struct jsonrpc *jsonrpc_open(struct stream *);
+void jsonrpc_close(struct jsonrpc *);
+
+void jsonrpc_run(struct jsonrpc *);
+void jsonrpc_wait(struct jsonrpc *);
+
+void jsonrpc_error(struct jsonrpc *, int error);
+int jsonrpc_get_status(const struct jsonrpc *);
+size_t jsonrpc_get_backlog(const struct jsonrpc *);
+const char *jsonrpc_get_name(const struct jsonrpc *);
+
+int jsonrpc_send(struct jsonrpc *, struct jsonrpc_msg *);
+int jsonrpc_recv(struct jsonrpc *, struct jsonrpc_msg **);
+void jsonrpc_recv_wait(struct jsonrpc *);
+
+int jsonrpc_send_block(struct jsonrpc *, struct jsonrpc_msg *);
+int jsonrpc_recv_block(struct jsonrpc *, struct jsonrpc_msg **);
+
+/* Messages. */
+enum jsonrpc_msg_type {
+    JSONRPC_REQUEST,           /* Request. */
+    JSONRPC_NOTIFY,            /* Notification. */
+    JSONRPC_REPLY,             /* Successful reply. */
+    JSONRPC_ERROR              /* Error reply. */
+};
+
+struct jsonrpc_msg {
+    enum jsonrpc_msg_type type;
+    char *method;               /* Request or notification only. */
+    struct json *params;        /* Request or notification only. */
+    struct json *result;        /* Successful reply only. */
+    struct json *error;         /* Error reply only. */
+    struct json *id;            /* Request or reply only. */
+};
+
+struct jsonrpc_msg *jsonrpc_create_request(const char *method,
+                                           struct json *params);
+struct jsonrpc_msg *jsonrpc_create_notify(const char *method,
+                                          struct json *params);
+struct jsonrpc_msg *jsonrpc_create_reply(struct json *result,
+                                         const struct json *id);
+struct jsonrpc_msg *jsonrpc_create_error(struct json *error,
+                                         const struct json *id);
+
+const char *jsonrpc_msg_type_to_string(enum jsonrpc_msg_type);
+char *jsonrpc_msg_is_valid(const struct jsonrpc_msg *);
+void jsonrpc_msg_destroy(struct jsonrpc_msg *);
+
+char *jsonrpc_msg_from_json(struct json *, struct jsonrpc_msg **);
+struct json *jsonrpc_msg_to_json(struct jsonrpc_msg *);
+
+#endif /* jsonrpc.h */
index da345df428fb864f1fde3872dd103497fd121b64..684954ea75779371060947e842ed92cce2e4c1c9 100644 (file)
@@ -40,6 +40,7 @@ VLOG_MODULE(fatal_signal)
 VLOG_MODULE(fault)
 VLOG_MODULE(flow)
 VLOG_MODULE(in_band)
+VLOG_MODULE(jsonrpc)
 VLOG_MODULE(leak_checker)
 VLOG_MODULE(learning_switch)
 VLOG_MODULE(lockfile)
index a9edee0e5ce0640acf6cf945e510f2201dd032c4..6284511250629b7cc6bcbb9afd888f6e0cbd1f61 100644 (file)
@@ -12,6 +12,7 @@ TESTSUITE_AT = \
        tests/aes128.at \
        tests/uuid.at \
        tests/json.at \
+       tests/jsonrpc.at \
        tests/timeval.at \
        tests/lockfile.at \
        tests/stp.at \
@@ -76,6 +77,10 @@ noinst_PROGRAMS += tests/test-json
 tests_test_json_SOURCES = tests/test-json.c
 tests_test_json_LDADD = lib/libopenvswitch.a
 
+noinst_PROGRAMS += tests/test-jsonrpc
+tests_test_jsonrpc_SOURCES = tests/test-jsonrpc.c
+tests_test_jsonrpc_LDADD = lib/libopenvswitch.a
+
 noinst_PROGRAMS += tests/test-list
 tests_test_list_SOURCES = tests/test-list.c
 tests_test_list_LDADD = lib/libopenvswitch.a
diff --git a/tests/jsonrpc.at b/tests/jsonrpc.at
new file mode 100644 (file)
index 0000000..d5ebf94
--- /dev/null
@@ -0,0 +1,45 @@
+AT_BANNER([JSON-RPC])
+
+AT_SETUP([JSON-RPC request and successful reply])
+AT_CHECK([test-jsonrpc --detach --pidfile=$PWD/pid listen punix:socket])
+AT_CHECK([test -s pid])
+AT_CHECK([kill -0 `cat pid`])
+AT_CHECK(
+  [[test-jsonrpc request unix:socket echo '[{"a": "b", "x": null}]']], [0],
+  [[{"error":null,"id":0,"result":[{"a":"b","x":null}]}
+]], [ignore], [test ! -e pid || kill `cat pid`])
+AT_CHECK([kill `cat pid`])
+AT_CLEANUP
+
+AT_SETUP([JSON-RPC request and error reply])
+AT_CHECK([test-jsonrpc --detach --pidfile=$PWD/pid listen punix:socket])
+AT_CHECK([test -s pid])
+AT_CHECK([kill -0 `cat pid`])
+AT_CHECK(
+  [[test-jsonrpc request unix:socket bad-request '[]']], [0],
+  [[{"error":{"error":"unknown method"},"id":0,"result":null}
+]], [ignore], [test ! -e pid || kill `cat pid`])
+AT_CHECK([kill `cat pid`])
+AT_CLEANUP
+
+AT_SETUP([JSON-RPC notification])
+AT_CHECK([test-jsonrpc --detach --pidfile=$PWD/pid listen punix:socket])
+AT_CHECK([test -s pid])
+# When a daemon dies it deletes its pidfile, so make a copy.
+AT_CHECK([cp pid pid2])
+AT_CHECK([kill -0 `cat pid2`])
+OVS_CHECK_LCOV([[test-jsonrpc notify unix:socket shutdown '[]']], [0], [], 
+  [ignore], [kill `cat pid2`])
+AT_CHECK(
+  [pid=`cat pid2`
+   # First try a quick sleep, so that the test completes very quickly
+   # in the normal case.  POSIX doesn't require fractional times to
+   # work, so this might not work.
+   sleep 0.1; if kill -0 $pid; then :; else echo success; exit 0; fi
+   # Then wait up to 2 seconds.
+   sleep 1; if kill -0 $pid; then :; else echo success; exit 0; fi
+   sleep 1; if kill -0 $pid; then :; else echo success; exit 0; fi
+   echo failure; exit 1], [0], [success
+], [ignore])
+AT_CHECK([test ! -e pid])
+AT_CLEANUP
index bb9fadb0641231d34034a346a8e5d87aff9b9220..6261786db0a0e4a6139d33230b50dbd6553a7b69 100644 (file)
@@ -92,7 +92,7 @@ parse_multiple(const char *input_file)
                 parser = json_parser_create(0);
             }
 
-            used = n - json_parser_feed(parser, &buffer[used], n - used);
+            used += json_parser_feed(parser, &buffer[used], n - used);
             if (used < n) {
                 if (!print_and_free_json(json_parser_finish(parser))) {
                     ok = false;
diff --git a/tests/test-jsonrpc.c b/tests/test-jsonrpc.c
new file mode 100644 (file)
index 0000000..7f2166f
--- /dev/null
@@ -0,0 +1,323 @@
+/*
+ * Copyright (c) 2009 Nicira Networks.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <config.h>
+
+#include "jsonrpc.h"
+
+#include <errno.h>
+#include <fcntl.h>
+#include <getopt.h>
+#include <stdio.h>
+#include <stdlib.h>
+
+#include "command-line.h"
+#include "daemon.h"
+#include "json.h"
+#include "poll-loop.h"
+#include "stream.h"
+#include "timeval.h"
+#include "util.h"
+#include "vlog.h"
+
+static struct command all_commands[];
+
+static void usage(void) NO_RETURN;
+static void parse_options(int argc, char *argv[]);
+
+int
+main(int argc, char *argv[])
+{
+    set_program_name(argv[0]);
+    time_init();
+    vlog_init();
+    parse_options(argc, argv);
+    run_command(argc - optind, argv + optind, all_commands);
+    return 0;
+}
+
+static void
+parse_options(int argc, char *argv[])
+{
+    static struct option long_options[] = {
+        {"verbose", optional_argument, 0, 'v'},
+        {"help", no_argument, 0, 'h'},
+        DAEMON_LONG_OPTIONS,
+        {0, 0, 0, 0},
+    };
+    char *short_options = long_options_to_short_options(long_options);
+
+    for (;;) {
+        int c = getopt_long(argc, argv, short_options, long_options, NULL);
+        if (c == -1) {
+            break;
+        }
+
+        switch (c) {
+        case 'h':
+            usage();
+
+        case 'v':
+            vlog_set_verbosity(optarg);
+            break;
+
+        DAEMON_OPTION_HANDLERS
+
+        case '?':
+            exit(EXIT_FAILURE);
+
+        default:
+            abort();
+        }
+    }
+    free(short_options);
+}
+
+static void
+usage(void)
+{
+    printf("%s: JSON-RPC test utility\n"
+           "usage: %s [OPTIONS] COMMAND [ARG...]\n"
+           "  listen LOCAL             listen for connections on LOCAL\n"
+           "  request REMOTE METHOD PARAMS   send request, print reply\n"
+           "  notify REMOTE METHOD PARAMS  send notification and exit\n",
+           program_name, program_name);
+    stream_usage("JSON-RPC", true, true);
+    daemon_usage();
+    vlog_usage();
+    printf("\nOther options:\n"
+           "  -h, --help                  display this help message\n");
+    exit(EXIT_SUCCESS);
+}
+\f
+/* Command helper functions. */
+
+static struct json *
+parse_json(const char *s)
+{
+    struct json *json = json_from_string(s);
+    if (json->type == JSON_STRING) {
+        ovs_fatal(0, "\"%s\": %s", s, json->u.string);
+    }
+    return json;
+}
+
+static void
+print_and_free_json(struct json *json)
+{
+    char *string = json_to_string(json, JSSF_SORT);
+    json_destroy(json);
+    puts(string);
+    free(string);
+}
+\f
+/* Command implementations. */
+
+static void
+handle_rpc(struct jsonrpc *rpc, struct jsonrpc_msg *msg, bool *done)
+{
+    struct jsonrpc_msg *reply = NULL;
+    if (msg->type == JSONRPC_REQUEST) {
+        if (!strcmp(msg->method, "echo")) {
+            reply = jsonrpc_create_reply(json_clone(msg->params), msg->id);
+        } else {
+            struct json *error = json_object_create();
+            json_object_put_string(error, "error", "unknown method");
+            reply = jsonrpc_create_error(error, msg->id);
+            ovs_error(0, "unknown request %s", msg->method);
+        }
+
+    } else if (msg->type == JSONRPC_NOTIFY) {
+        if (!strcmp(msg->method, "shutdown")) {
+            *done = true;
+        } else {
+            jsonrpc_error(rpc, ENOTTY);
+            ovs_error(0, "unknown notification %s", msg->method);
+        }
+    } else {
+        jsonrpc_error(rpc, EPROTO);
+        ovs_error(0, "unsolicited JSON-RPC reply or error");
+    }
+
+    if (reply) {
+        jsonrpc_send(rpc, reply);
+    }
+}
+
+static void
+do_listen(int argc UNUSED, char *argv[])
+{
+    struct pstream *pstream;
+    struct jsonrpc **rpcs;
+    size_t n_rpcs, allocated_rpcs;
+    bool done;
+    int error;
+
+    die_if_already_running();
+
+    error = pstream_open(argv[1], &pstream);
+    if (error) {
+        ovs_fatal(error, "could not listen on \"%s\"", argv[1]);
+    }
+
+    daemonize();
+
+    rpcs = NULL;
+    n_rpcs = allocated_rpcs = 0;
+    done = false;
+    for (;;) {
+        struct stream *stream;
+        size_t i;
+
+        /* Accept new connections. */
+        error = pstream_accept(pstream, &stream);
+        if (!error) {
+            if (n_rpcs >= allocated_rpcs) {
+                rpcs = x2nrealloc(rpcs, &allocated_rpcs, sizeof *rpcs);
+            }
+            rpcs[n_rpcs++] = jsonrpc_open(stream);
+        } else if (error != EAGAIN) {
+            ovs_fatal(error, "pstream_accept failed");
+        }
+
+        /* Service existing connections. */
+        for (i = 0; i < n_rpcs; ) {
+            struct jsonrpc *rpc = rpcs[i];
+            struct jsonrpc_msg *msg;
+
+            jsonrpc_run(rpc);
+            if (!jsonrpc_get_backlog(rpc)) {
+                error = jsonrpc_recv(rpc, &msg);
+                if (!error) {
+                    handle_rpc(rpc, msg, &done);
+                    jsonrpc_msg_destroy(msg);
+                }
+            }
+
+            error = jsonrpc_get_status(rpc);
+            if (error) {
+                jsonrpc_close(rpc);
+                ovs_error(error, "connection closed");
+                memmove(&rpcs[i], &rpcs[i + 1],
+                        (n_rpcs - i - 1) * sizeof *rpcs);
+                n_rpcs--;
+            } else {
+                i++;
+            }
+        }
+
+        /* Wait for something to do. */
+        if (done && !n_rpcs) {
+            break;
+        }
+        pstream_wait(pstream);
+        for (i = 0; i < n_rpcs; i++) {
+            struct jsonrpc *rpc = rpcs[i];
+
+            jsonrpc_wait(rpc);
+            if (!jsonrpc_get_backlog(rpc)) {
+                jsonrpc_recv_wait(rpc);
+            }
+        }
+        poll_block();
+    }
+}
+
+
+static void
+do_request(int argc UNUSED, char *argv[])
+{
+    struct jsonrpc_msg *msg;
+    struct jsonrpc *rpc;
+    struct json *params;
+    struct stream *stream;
+    const char *method;
+    char *string;
+    int error;
+
+    method = argv[2];
+    params = parse_json(argv[3]);
+    msg = jsonrpc_create_request(method, params);
+    string = jsonrpc_msg_is_valid(msg);
+    if (string) {
+        ovs_fatal(0, "not a valid JSON-RPC request: %s", string);
+    }
+
+    error = stream_open_block(argv[1], &stream);
+    if (error) {
+        ovs_fatal(error, "could not open \"%s\"", argv[1]);
+    }
+    rpc = jsonrpc_open(stream);
+
+    error = jsonrpc_send(rpc, msg);
+    if (error) {
+        ovs_fatal(error, "could not send request");
+    }
+
+    error = jsonrpc_recv_block(rpc, &msg);
+    if (error) {
+        ovs_fatal(error, "error waiting for reply");
+    }
+    print_and_free_json(jsonrpc_msg_to_json(msg));
+
+    jsonrpc_close(rpc);
+}
+
+static void
+do_notify(int argc UNUSED, char *argv[])
+{
+    struct jsonrpc_msg *msg;
+    struct jsonrpc *rpc;
+    struct json *params;
+    struct stream *stream;
+    const char *method;
+    char *string;
+    int error;
+
+    method = argv[2];
+    params = parse_json(argv[3]);
+    msg = jsonrpc_create_notify(method, params);
+    string = jsonrpc_msg_is_valid(msg);
+    if (string) {
+        ovs_fatal(0, "not a JSON RPC-valid notification: %s", string);
+    }
+
+    error = stream_open_block(argv[1], &stream);
+    if (error) {
+        ovs_fatal(error, "could not open \"%s\"", argv[1]);
+    }
+    rpc = jsonrpc_open(stream);
+
+    error = jsonrpc_send_block(rpc, msg);
+    if (error) {
+        ovs_fatal(error, "could not send request");
+    }
+    jsonrpc_close(rpc);
+}
+
+static void
+do_help(int argc UNUSED, char *argv[] UNUSED)
+{
+    usage();
+}
+
+static struct command all_commands[] = {
+    { "listen", 1, 1, do_listen },
+    { "request", 3, 3, do_request },
+    { "notify", 3, 3, do_notify },
+    { "help", 0, INT_MAX, do_help },
+    { NULL, 0, 0, NULL },
+};
index 816bed1c023eba3106de5bc327a4a34b2a52b653..781b6a6ce4b2d0fe538d2dff89f646e0c4f81a34 100644 (file)
@@ -23,6 +23,7 @@ m4_include([tests/dir_name.at])
 m4_include([tests/aes128.at])
 m4_include([tests/uuid.at])
 m4_include([tests/json.at])
+m4_include([tests/jsonrpc.at])
 m4_include([tests/timeval.at])
 m4_include([tests/lockfile.at])
 m4_include([tests/stp.at])