lib/hmap.h \
lib/json.c \
lib/json.h \
+ lib/jsonrpc.c \
+ lib/jsonrpc.h \
lib/leak-checker.c \
lib/leak-checker.h \
lib/learning-switch.c \
--- /dev/null
+/*
+ * Copyright (c) 2009 Nicira Networks.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <config.h>
+
+#include "jsonrpc.h"
+
+#include <errno.h>
+
+#include "byteq.h"
+#include "json.h"
+#include "list.h"
+#include "ofpbuf.h"
+#include "poll-loop.h"
+#include "queue.h"
+#include "stream.h"
+
+#define THIS_MODULE VLM_jsonrpc
+#include "vlog.h"
+\f
+struct jsonrpc {
+ struct stream *stream;
+ char *name;
+ int status;
+
+ /* Input. */
+ struct byteq input;
+ struct json_parser *parser;
+ struct jsonrpc_msg *received;
+
+ /* Output. */
+ struct ovs_queue output;
+ size_t backlog;
+};
+
+/* Rate limit for error messages. */
+static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(5, 5);
+
+static void jsonrpc_received(struct jsonrpc *);
+static void jsonrpc_cleanup(struct jsonrpc *);
+
+struct jsonrpc *
+jsonrpc_open(struct stream *stream)
+{
+ struct jsonrpc *rpc;
+
+ assert(stream != NULL);
+
+ rpc = xzalloc(sizeof *rpc);
+ rpc->name = xstrdup(stream_get_name(stream));
+ rpc->stream = stream;
+ byteq_init(&rpc->input);
+ queue_init(&rpc->output);
+
+ return rpc;
+}
+
+void
+jsonrpc_close(struct jsonrpc *rpc)
+{
+ if (rpc) {
+ jsonrpc_cleanup(rpc);
+ free(rpc->name);
+ free(rpc);
+ }
+}
+
+void
+jsonrpc_run(struct jsonrpc *rpc)
+{
+ if (rpc->status) {
+ return;
+ }
+
+ while (!queue_is_empty(&rpc->output)) {
+ struct ofpbuf *buf = rpc->output.head;
+ int retval;
+
+ retval = stream_send(rpc->stream, buf->data, buf->size);
+ if (retval >= 0) {
+ rpc->backlog -= retval;
+ ofpbuf_pull(buf, retval);
+ if (!buf->size) {
+ ofpbuf_delete(queue_pop_head(&rpc->output));
+ }
+ } else {
+ if (retval != -EAGAIN) {
+ VLOG_WARN_RL(&rl, "%s: send error: %s",
+ rpc->name, strerror(-retval));
+ jsonrpc_error(rpc, -retval);
+ }
+ break;
+ }
+ }
+}
+
+void
+jsonrpc_wait(struct jsonrpc *rpc)
+{
+ if (!rpc->status && !queue_is_empty(&rpc->output)) {
+ stream_send_wait(rpc->stream);
+ }
+}
+
+int
+jsonrpc_get_status(const struct jsonrpc *rpc)
+{
+ return rpc->status;
+}
+
+size_t
+jsonrpc_get_backlog(const struct jsonrpc *rpc)
+{
+ return rpc->status ? 0 : rpc->backlog;
+}
+
+const char *
+jsonrpc_get_name(const struct jsonrpc *rpc)
+{
+ return rpc->name;
+}
+
+int
+jsonrpc_send(struct jsonrpc *rpc, struct jsonrpc_msg *msg)
+{
+ struct ofpbuf *buf;
+ struct json *json;
+ size_t length;
+ char *s;
+
+ if (rpc->status) {
+ jsonrpc_msg_destroy(msg);
+ return rpc->status;
+ }
+
+ json = jsonrpc_msg_to_json(msg);
+ s = json_to_string(json, 0);
+ length = strlen(s);
+ json_destroy(json);
+
+ buf = xmalloc(sizeof *buf);
+ ofpbuf_use(buf, s, length);
+ buf->size = length;
+ queue_push_tail(&rpc->output, buf);
+ rpc->backlog += length;
+
+ if (rpc->output.n == 1) {
+ jsonrpc_run(rpc);
+ }
+ return rpc->status;
+}
+
+int
+jsonrpc_recv(struct jsonrpc *rpc, struct jsonrpc_msg **msgp)
+{
+ *msgp = NULL;
+ if (rpc->status) {
+ return rpc->status;
+ }
+
+ while (!rpc->received) {
+ if (byteq_is_empty(&rpc->input)) {
+ size_t chunk;
+ int retval;
+
+ chunk = byteq_headroom(&rpc->input);
+ retval = stream_recv(rpc->stream, byteq_head(&rpc->input), chunk);
+ if (retval < 0) {
+ if (retval == -EAGAIN) {
+ return EAGAIN;
+ } else {
+ VLOG_WARN_RL(&rl, "%s: receive error: %s",
+ rpc->name, strerror(-retval));
+ jsonrpc_error(rpc, -retval);
+ return rpc->status;
+ }
+ } else if (retval == 0) {
+ VLOG_INFO_RL(&rl, "%s: connection closed", rpc->name);
+ jsonrpc_error(rpc, EOF);
+ return EOF;
+ }
+ byteq_advance_head(&rpc->input, retval);
+ } else {
+ size_t n, used;
+
+ if (!rpc->parser) {
+ rpc->parser = json_parser_create(0);
+ }
+ n = byteq_tailroom(&rpc->input);
+ used = json_parser_feed(rpc->parser,
+ (char *) byteq_tail(&rpc->input), n);
+ byteq_advance_tail(&rpc->input, used);
+ if (json_parser_is_done(rpc->parser)) {
+ jsonrpc_received(rpc);
+ if (rpc->status) {
+ return rpc->status;
+ }
+ }
+ }
+ }
+
+ *msgp = rpc->received;
+ rpc->received = NULL;
+ return 0;
+}
+
+void
+jsonrpc_recv_wait(struct jsonrpc *rpc)
+{
+ if (rpc->status || rpc->received || !byteq_is_empty(&rpc->input)) {
+ poll_immediate_wake();
+ } else {
+ stream_recv_wait(rpc->stream);
+ }
+}
+
+int
+jsonrpc_send_block(struct jsonrpc *rpc, struct jsonrpc_msg *msg)
+{
+ int error;
+
+ error = jsonrpc_send(rpc, msg);
+ if (error) {
+ return error;
+ }
+
+ while (!queue_is_empty(&rpc->output) && !rpc->status) {
+ jsonrpc_run(rpc);
+ jsonrpc_wait(rpc);
+ poll_block();
+ }
+ return rpc->status;
+}
+
+int
+jsonrpc_recv_block(struct jsonrpc *rpc, struct jsonrpc_msg **msgp)
+{
+ for (;;) {
+ int error = jsonrpc_recv(rpc, msgp);
+ if (error != EAGAIN) {
+ return error;
+ }
+
+ jsonrpc_run(rpc);
+ jsonrpc_wait(rpc);
+ jsonrpc_recv_wait(rpc);
+ poll_block();
+ }
+}
+
+static void
+jsonrpc_received(struct jsonrpc *rpc)
+{
+ struct jsonrpc_msg *msg;
+ struct json *json;
+ char *error;
+
+ json = json_parser_finish(rpc->parser);
+ rpc->parser = NULL;
+ if (json->type == JSON_STRING) {
+ VLOG_WARN_RL(&rl, "%s: error parsing stream: %s",
+ rpc->name, json_string(json));
+ jsonrpc_error(rpc, EPROTO);
+ json_destroy(json);
+ return;
+ }
+
+ error = jsonrpc_msg_from_json(json, &msg);
+ if (error) {
+ VLOG_WARN_RL(&rl, "%s: received bad JSON-RPC message: %s",
+ rpc->name, error);
+ free(error);
+ jsonrpc_error(rpc, EPROTO);
+ return;
+ }
+
+ rpc->received = msg;
+}
+
+void
+jsonrpc_error(struct jsonrpc *rpc, int error)
+{
+ assert(error);
+ if (!rpc->status) {
+ rpc->status = error;
+ jsonrpc_cleanup(rpc);
+ }
+}
+
+static void
+jsonrpc_cleanup(struct jsonrpc *rpc)
+{
+ stream_close(rpc->stream);
+ rpc->stream = NULL;
+
+ json_parser_abort(rpc->parser);
+ rpc->parser = NULL;
+
+ jsonrpc_msg_destroy(rpc->received);
+ rpc->received = NULL;
+
+ queue_clear(&rpc->output);
+ rpc->backlog = 0;
+}
+\f
+static struct jsonrpc_msg *
+jsonrpc_create(enum jsonrpc_msg_type type, const char *method,
+ struct json *params, struct json *result, struct json *error,
+ struct json *id)
+{
+ struct jsonrpc_msg *msg = xmalloc(sizeof *msg);
+ msg->type = type;
+ msg->method = method ? xstrdup(method) : NULL;
+ msg->params = params;
+ msg->result = result;
+ msg->error = error;
+ msg->id = id;
+ return msg;
+}
+
+static struct json *
+jsonrpc_create_id(void)
+{
+ static unsigned int id;
+ return json_integer_create(id++);
+}
+
+struct jsonrpc_msg *
+jsonrpc_create_request(const char *method, struct json *params)
+{
+ return jsonrpc_create(JSONRPC_REQUEST, method, params, NULL, NULL,
+ jsonrpc_create_id());
+}
+
+struct jsonrpc_msg *
+jsonrpc_create_notify(const char *method, struct json *params)
+{
+ return jsonrpc_create(JSONRPC_NOTIFY, method, params, NULL, NULL, NULL);
+}
+
+struct jsonrpc_msg *
+jsonrpc_create_reply(struct json *result, const struct json *id)
+{
+ return jsonrpc_create(JSONRPC_REPLY, NULL, NULL, result, NULL,
+ json_clone(id));
+}
+
+struct jsonrpc_msg *
+jsonrpc_create_error(struct json *error, const struct json *id)
+{
+ return jsonrpc_create(JSONRPC_REPLY, NULL, NULL, NULL, error,
+ json_clone(id));
+}
+
+const char *
+jsonrpc_msg_type_to_string(enum jsonrpc_msg_type type)
+{
+ switch (type) {
+ case JSONRPC_REQUEST:
+ return "request";
+
+ case JSONRPC_NOTIFY:
+ return "notification";
+
+ case JSONRPC_REPLY:
+ return "reply";
+
+ case JSONRPC_ERROR:
+ return "error";
+ }
+ return "(null)";
+}
+
+char *
+jsonrpc_msg_is_valid(const struct jsonrpc_msg *m)
+{
+ const char *type_name;
+ unsigned int pattern;
+
+ if (m->params && m->params->type != JSON_ARRAY) {
+ return xstrdup("\"params\" must be JSON array");
+ }
+
+ switch (m->type) {
+ case JSONRPC_REQUEST:
+ pattern = 0x11001;
+ break;
+
+ case JSONRPC_NOTIFY:
+ pattern = 0x11000;
+ break;
+
+ case JSONRPC_REPLY:
+ pattern = 0x00101;
+ break;
+
+ case JSONRPC_ERROR:
+ pattern = 0x00011;
+ break;
+
+ default:
+ return xasprintf("invalid JSON-RPC message type %d", m->type);
+ }
+
+ type_name = jsonrpc_msg_type_to_string(m->type);
+ if ((m->method != NULL) != ((pattern & 0x10000) != 0)) {
+ return xasprintf("%s must%s have \"method\"",
+ type_name, (pattern & 0x10000) ? "" : " not");
+
+ }
+ if ((m->params != NULL) != ((pattern & 0x1000) != 0)) {
+ return xasprintf("%s must%s have \"params\"",
+ type_name, (pattern & 0x1000) ? "" : " not");
+
+ }
+ if ((m->result != NULL) != ((pattern & 0x100) != 0)) {
+ return xasprintf("%s must%s have \"result\"",
+ type_name, (pattern & 0x100) ? "" : " not");
+
+ }
+ if ((m->error != NULL) != ((pattern & 0x10) != 0)) {
+ return xasprintf("%s must%s have \"error\"",
+ type_name, (pattern & 0x10) ? "" : " not");
+
+ }
+ if ((m->id != NULL) != ((pattern & 0x1) != 0)) {
+ return xasprintf("%s must%s have \"id\"",
+ type_name, (pattern & 0x1) ? "" : " not");
+
+ }
+ return NULL;
+}
+
+void
+jsonrpc_msg_destroy(struct jsonrpc_msg *m)
+{
+ if (m) {
+ free(m->method);
+ json_destroy(m->params);
+ json_destroy(m->result);
+ json_destroy(m->error);
+ json_destroy(m->id);
+ free(m);
+ }
+}
+
+static struct json *
+null_from_json_null(struct json *json)
+{
+ if (json && json->type == JSON_NULL) {
+ json_destroy(json);
+ return NULL;
+ }
+ return json;
+}
+
+char *
+jsonrpc_msg_from_json(struct json *json, struct jsonrpc_msg **msgp)
+{
+ struct json *method = NULL;
+ struct jsonrpc_msg *msg = NULL;
+ struct shash *object;
+ char *error;
+
+ if (json->type != JSON_OBJECT) {
+ error = xstrdup("message is not a JSON object");
+ goto exit;
+ }
+ object = json_object(json);
+
+ method = shash_find_and_delete(object, "method");
+ if (method && method->type != JSON_STRING) {
+ error = xstrdup("method is not a JSON string");
+ goto exit;
+ }
+
+ msg = xzalloc(sizeof *msg);
+ msg->method = method ? xstrdup(method->u.string) : NULL;
+ msg->params = null_from_json_null(shash_find_and_delete(object, "params"));
+ msg->result = null_from_json_null(shash_find_and_delete(object, "result"));
+ msg->error = null_from_json_null(shash_find_and_delete(object, "error"));
+ msg->id = null_from_json_null(shash_find_and_delete(object, "id"));
+ msg->type = (msg->result ? JSONRPC_REPLY
+ : msg->error ? JSONRPC_ERROR
+ : msg->id ? JSONRPC_REQUEST
+ : JSONRPC_NOTIFY);
+ if (!shash_is_empty(object)) {
+ error = xasprintf("message has unexpected member \"%s\"",
+ shash_first(object)->name);
+ goto exit;
+ }
+ error = jsonrpc_msg_is_valid(msg);
+ if (error) {
+ goto exit;
+ }
+
+exit:
+ json_destroy(method);
+ json_destroy(json);
+ if (error) {
+ jsonrpc_msg_destroy(msg);
+ msg = NULL;
+ }
+ *msgp = msg;
+ return error;
+}
+
+struct json *
+jsonrpc_msg_to_json(struct jsonrpc_msg *m)
+{
+ struct json *json = json_object_create();
+
+ if (m->method) {
+ json_object_put(json, "method", json_string_create_nocopy(m->method));
+ }
+
+ if (m->params) {
+ json_object_put(json, "params", m->params);
+ }
+
+ if (m->result) {
+ json_object_put(json, "result", m->result);
+ } else if (m->type == JSONRPC_ERROR) {
+ json_object_put(json, "result", json_null_create());
+ }
+
+ if (m->error) {
+ json_object_put(json, "error", m->error);
+ } else if (m->type == JSONRPC_REPLY) {
+ json_object_put(json, "error", json_null_create());
+ }
+
+ if (m->id) {
+ json_object_put(json, "id", m->id);
+ } else if (m->type == JSONRPC_NOTIFY) {
+ json_object_put(json, "id", json_null_create());
+ }
+
+ free(m);
+
+ return json;
+}
--- /dev/null
+/*
+ * Copyright (c) 2009 Nicira Networks.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef JSONRPC_H
+#define JSONRPC_H 1
+
+/* This is an implementation of the JSON-RPC 1.0 specification defined at
+ * http://json-rpc.org/wiki/specification. */
+
+#include <stdbool.h>
+#include <stddef.h>
+
+struct json;
+struct jsonrpc_msg;
+struct stream;
+\f
+/* API for a JSON-RPC stream. */
+
+struct jsonrpc *jsonrpc_open(struct stream *);
+void jsonrpc_close(struct jsonrpc *);
+
+void jsonrpc_run(struct jsonrpc *);
+void jsonrpc_wait(struct jsonrpc *);
+
+void jsonrpc_error(struct jsonrpc *, int error);
+int jsonrpc_get_status(const struct jsonrpc *);
+size_t jsonrpc_get_backlog(const struct jsonrpc *);
+const char *jsonrpc_get_name(const struct jsonrpc *);
+
+int jsonrpc_send(struct jsonrpc *, struct jsonrpc_msg *);
+int jsonrpc_recv(struct jsonrpc *, struct jsonrpc_msg **);
+void jsonrpc_recv_wait(struct jsonrpc *);
+
+int jsonrpc_send_block(struct jsonrpc *, struct jsonrpc_msg *);
+int jsonrpc_recv_block(struct jsonrpc *, struct jsonrpc_msg **);
+
+/* Messages. */
+enum jsonrpc_msg_type {
+ JSONRPC_REQUEST, /* Request. */
+ JSONRPC_NOTIFY, /* Notification. */
+ JSONRPC_REPLY, /* Successful reply. */
+ JSONRPC_ERROR /* Error reply. */
+};
+
+struct jsonrpc_msg {
+ enum jsonrpc_msg_type type;
+ char *method; /* Request or notification only. */
+ struct json *params; /* Request or notification only. */
+ struct json *result; /* Successful reply only. */
+ struct json *error; /* Error reply only. */
+ struct json *id; /* Request or reply only. */
+};
+
+struct jsonrpc_msg *jsonrpc_create_request(const char *method,
+ struct json *params);
+struct jsonrpc_msg *jsonrpc_create_notify(const char *method,
+ struct json *params);
+struct jsonrpc_msg *jsonrpc_create_reply(struct json *result,
+ const struct json *id);
+struct jsonrpc_msg *jsonrpc_create_error(struct json *error,
+ const struct json *id);
+
+const char *jsonrpc_msg_type_to_string(enum jsonrpc_msg_type);
+char *jsonrpc_msg_is_valid(const struct jsonrpc_msg *);
+void jsonrpc_msg_destroy(struct jsonrpc_msg *);
+
+char *jsonrpc_msg_from_json(struct json *, struct jsonrpc_msg **);
+struct json *jsonrpc_msg_to_json(struct jsonrpc_msg *);
+
+#endif /* jsonrpc.h */
VLOG_MODULE(fault)
VLOG_MODULE(flow)
VLOG_MODULE(in_band)
+VLOG_MODULE(jsonrpc)
VLOG_MODULE(leak_checker)
VLOG_MODULE(learning_switch)
VLOG_MODULE(lockfile)
tests/aes128.at \
tests/uuid.at \
tests/json.at \
+ tests/jsonrpc.at \
tests/timeval.at \
tests/lockfile.at \
tests/stp.at \
tests_test_json_SOURCES = tests/test-json.c
tests_test_json_LDADD = lib/libopenvswitch.a
+noinst_PROGRAMS += tests/test-jsonrpc
+tests_test_jsonrpc_SOURCES = tests/test-jsonrpc.c
+tests_test_jsonrpc_LDADD = lib/libopenvswitch.a
+
noinst_PROGRAMS += tests/test-list
tests_test_list_SOURCES = tests/test-list.c
tests_test_list_LDADD = lib/libopenvswitch.a
--- /dev/null
+AT_BANNER([JSON-RPC])
+
+AT_SETUP([JSON-RPC request and successful reply])
+AT_CHECK([test-jsonrpc --detach --pidfile=$PWD/pid listen punix:socket])
+AT_CHECK([test -s pid])
+AT_CHECK([kill -0 `cat pid`])
+AT_CHECK(
+ [[test-jsonrpc request unix:socket echo '[{"a": "b", "x": null}]']], [0],
+ [[{"error":null,"id":0,"result":[{"a":"b","x":null}]}
+]], [ignore], [test ! -e pid || kill `cat pid`])
+AT_CHECK([kill `cat pid`])
+AT_CLEANUP
+
+AT_SETUP([JSON-RPC request and error reply])
+AT_CHECK([test-jsonrpc --detach --pidfile=$PWD/pid listen punix:socket])
+AT_CHECK([test -s pid])
+AT_CHECK([kill -0 `cat pid`])
+AT_CHECK(
+ [[test-jsonrpc request unix:socket bad-request '[]']], [0],
+ [[{"error":{"error":"unknown method"},"id":0,"result":null}
+]], [ignore], [test ! -e pid || kill `cat pid`])
+AT_CHECK([kill `cat pid`])
+AT_CLEANUP
+
+AT_SETUP([JSON-RPC notification])
+AT_CHECK([test-jsonrpc --detach --pidfile=$PWD/pid listen punix:socket])
+AT_CHECK([test -s pid])
+# When a daemon dies it deletes its pidfile, so make a copy.
+AT_CHECK([cp pid pid2])
+AT_CHECK([kill -0 `cat pid2`])
+OVS_CHECK_LCOV([[test-jsonrpc notify unix:socket shutdown '[]']], [0], [],
+ [ignore], [kill `cat pid2`])
+AT_CHECK(
+ [pid=`cat pid2`
+ # First try a quick sleep, so that the test completes very quickly
+ # in the normal case. POSIX doesn't require fractional times to
+ # work, so this might not work.
+ sleep 0.1; if kill -0 $pid; then :; else echo success; exit 0; fi
+ # Then wait up to 2 seconds.
+ sleep 1; if kill -0 $pid; then :; else echo success; exit 0; fi
+ sleep 1; if kill -0 $pid; then :; else echo success; exit 0; fi
+ echo failure; exit 1], [0], [success
+], [ignore])
+AT_CHECK([test ! -e pid])
+AT_CLEANUP
parser = json_parser_create(0);
}
- used = n - json_parser_feed(parser, &buffer[used], n - used);
+ used += json_parser_feed(parser, &buffer[used], n - used);
if (used < n) {
if (!print_and_free_json(json_parser_finish(parser))) {
ok = false;
--- /dev/null
+/*
+ * Copyright (c) 2009 Nicira Networks.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <config.h>
+
+#include "jsonrpc.h"
+
+#include <errno.h>
+#include <fcntl.h>
+#include <getopt.h>
+#include <stdio.h>
+#include <stdlib.h>
+
+#include "command-line.h"
+#include "daemon.h"
+#include "json.h"
+#include "poll-loop.h"
+#include "stream.h"
+#include "timeval.h"
+#include "util.h"
+#include "vlog.h"
+
+static struct command all_commands[];
+
+static void usage(void) NO_RETURN;
+static void parse_options(int argc, char *argv[]);
+
+int
+main(int argc, char *argv[])
+{
+ set_program_name(argv[0]);
+ time_init();
+ vlog_init();
+ parse_options(argc, argv);
+ run_command(argc - optind, argv + optind, all_commands);
+ return 0;
+}
+
+static void
+parse_options(int argc, char *argv[])
+{
+ static struct option long_options[] = {
+ {"verbose", optional_argument, 0, 'v'},
+ {"help", no_argument, 0, 'h'},
+ DAEMON_LONG_OPTIONS,
+ {0, 0, 0, 0},
+ };
+ char *short_options = long_options_to_short_options(long_options);
+
+ for (;;) {
+ int c = getopt_long(argc, argv, short_options, long_options, NULL);
+ if (c == -1) {
+ break;
+ }
+
+ switch (c) {
+ case 'h':
+ usage();
+
+ case 'v':
+ vlog_set_verbosity(optarg);
+ break;
+
+ DAEMON_OPTION_HANDLERS
+
+ case '?':
+ exit(EXIT_FAILURE);
+
+ default:
+ abort();
+ }
+ }
+ free(short_options);
+}
+
+static void
+usage(void)
+{
+ printf("%s: JSON-RPC test utility\n"
+ "usage: %s [OPTIONS] COMMAND [ARG...]\n"
+ " listen LOCAL listen for connections on LOCAL\n"
+ " request REMOTE METHOD PARAMS send request, print reply\n"
+ " notify REMOTE METHOD PARAMS send notification and exit\n",
+ program_name, program_name);
+ stream_usage("JSON-RPC", true, true);
+ daemon_usage();
+ vlog_usage();
+ printf("\nOther options:\n"
+ " -h, --help display this help message\n");
+ exit(EXIT_SUCCESS);
+}
+\f
+/* Command helper functions. */
+
+static struct json *
+parse_json(const char *s)
+{
+ struct json *json = json_from_string(s);
+ if (json->type == JSON_STRING) {
+ ovs_fatal(0, "\"%s\": %s", s, json->u.string);
+ }
+ return json;
+}
+
+static void
+print_and_free_json(struct json *json)
+{
+ char *string = json_to_string(json, JSSF_SORT);
+ json_destroy(json);
+ puts(string);
+ free(string);
+}
+\f
+/* Command implementations. */
+
+static void
+handle_rpc(struct jsonrpc *rpc, struct jsonrpc_msg *msg, bool *done)
+{
+ struct jsonrpc_msg *reply = NULL;
+ if (msg->type == JSONRPC_REQUEST) {
+ if (!strcmp(msg->method, "echo")) {
+ reply = jsonrpc_create_reply(json_clone(msg->params), msg->id);
+ } else {
+ struct json *error = json_object_create();
+ json_object_put_string(error, "error", "unknown method");
+ reply = jsonrpc_create_error(error, msg->id);
+ ovs_error(0, "unknown request %s", msg->method);
+ }
+
+ } else if (msg->type == JSONRPC_NOTIFY) {
+ if (!strcmp(msg->method, "shutdown")) {
+ *done = true;
+ } else {
+ jsonrpc_error(rpc, ENOTTY);
+ ovs_error(0, "unknown notification %s", msg->method);
+ }
+ } else {
+ jsonrpc_error(rpc, EPROTO);
+ ovs_error(0, "unsolicited JSON-RPC reply or error");
+ }
+
+ if (reply) {
+ jsonrpc_send(rpc, reply);
+ }
+}
+
+static void
+do_listen(int argc UNUSED, char *argv[])
+{
+ struct pstream *pstream;
+ struct jsonrpc **rpcs;
+ size_t n_rpcs, allocated_rpcs;
+ bool done;
+ int error;
+
+ die_if_already_running();
+
+ error = pstream_open(argv[1], &pstream);
+ if (error) {
+ ovs_fatal(error, "could not listen on \"%s\"", argv[1]);
+ }
+
+ daemonize();
+
+ rpcs = NULL;
+ n_rpcs = allocated_rpcs = 0;
+ done = false;
+ for (;;) {
+ struct stream *stream;
+ size_t i;
+
+ /* Accept new connections. */
+ error = pstream_accept(pstream, &stream);
+ if (!error) {
+ if (n_rpcs >= allocated_rpcs) {
+ rpcs = x2nrealloc(rpcs, &allocated_rpcs, sizeof *rpcs);
+ }
+ rpcs[n_rpcs++] = jsonrpc_open(stream);
+ } else if (error != EAGAIN) {
+ ovs_fatal(error, "pstream_accept failed");
+ }
+
+ /* Service existing connections. */
+ for (i = 0; i < n_rpcs; ) {
+ struct jsonrpc *rpc = rpcs[i];
+ struct jsonrpc_msg *msg;
+
+ jsonrpc_run(rpc);
+ if (!jsonrpc_get_backlog(rpc)) {
+ error = jsonrpc_recv(rpc, &msg);
+ if (!error) {
+ handle_rpc(rpc, msg, &done);
+ jsonrpc_msg_destroy(msg);
+ }
+ }
+
+ error = jsonrpc_get_status(rpc);
+ if (error) {
+ jsonrpc_close(rpc);
+ ovs_error(error, "connection closed");
+ memmove(&rpcs[i], &rpcs[i + 1],
+ (n_rpcs - i - 1) * sizeof *rpcs);
+ n_rpcs--;
+ } else {
+ i++;
+ }
+ }
+
+ /* Wait for something to do. */
+ if (done && !n_rpcs) {
+ break;
+ }
+ pstream_wait(pstream);
+ for (i = 0; i < n_rpcs; i++) {
+ struct jsonrpc *rpc = rpcs[i];
+
+ jsonrpc_wait(rpc);
+ if (!jsonrpc_get_backlog(rpc)) {
+ jsonrpc_recv_wait(rpc);
+ }
+ }
+ poll_block();
+ }
+}
+
+
+static void
+do_request(int argc UNUSED, char *argv[])
+{
+ struct jsonrpc_msg *msg;
+ struct jsonrpc *rpc;
+ struct json *params;
+ struct stream *stream;
+ const char *method;
+ char *string;
+ int error;
+
+ method = argv[2];
+ params = parse_json(argv[3]);
+ msg = jsonrpc_create_request(method, params);
+ string = jsonrpc_msg_is_valid(msg);
+ if (string) {
+ ovs_fatal(0, "not a valid JSON-RPC request: %s", string);
+ }
+
+ error = stream_open_block(argv[1], &stream);
+ if (error) {
+ ovs_fatal(error, "could not open \"%s\"", argv[1]);
+ }
+ rpc = jsonrpc_open(stream);
+
+ error = jsonrpc_send(rpc, msg);
+ if (error) {
+ ovs_fatal(error, "could not send request");
+ }
+
+ error = jsonrpc_recv_block(rpc, &msg);
+ if (error) {
+ ovs_fatal(error, "error waiting for reply");
+ }
+ print_and_free_json(jsonrpc_msg_to_json(msg));
+
+ jsonrpc_close(rpc);
+}
+
+static void
+do_notify(int argc UNUSED, char *argv[])
+{
+ struct jsonrpc_msg *msg;
+ struct jsonrpc *rpc;
+ struct json *params;
+ struct stream *stream;
+ const char *method;
+ char *string;
+ int error;
+
+ method = argv[2];
+ params = parse_json(argv[3]);
+ msg = jsonrpc_create_notify(method, params);
+ string = jsonrpc_msg_is_valid(msg);
+ if (string) {
+ ovs_fatal(0, "not a JSON RPC-valid notification: %s", string);
+ }
+
+ error = stream_open_block(argv[1], &stream);
+ if (error) {
+ ovs_fatal(error, "could not open \"%s\"", argv[1]);
+ }
+ rpc = jsonrpc_open(stream);
+
+ error = jsonrpc_send_block(rpc, msg);
+ if (error) {
+ ovs_fatal(error, "could not send request");
+ }
+ jsonrpc_close(rpc);
+}
+
+static void
+do_help(int argc UNUSED, char *argv[] UNUSED)
+{
+ usage();
+}
+
+static struct command all_commands[] = {
+ { "listen", 1, 1, do_listen },
+ { "request", 3, 3, do_request },
+ { "notify", 3, 3, do_notify },
+ { "help", 0, INT_MAX, do_help },
+ { NULL, 0, 0, NULL },
+};
m4_include([tests/aes128.at])
m4_include([tests/uuid.at])
m4_include([tests/json.at])
+m4_include([tests/jsonrpc.at])
m4_include([tests/timeval.at])
m4_include([tests/lockfile.at])
m4_include([tests/stp.at])