From: Ben Pfaff Date: Mon, 26 Oct 2009 22:04:05 +0000 (-0700) Subject: Implement JSON-RPC protocol. X-Git-Url: https://pintos-os.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=f212909325be9bc7e296e1a32e2fc89694a0049f;p=openvswitch Implement JSON-RPC protocol. --- diff --git a/lib/automake.mk b/lib/automake.mk index 8dc28990..07f7ce71 100644 --- a/lib/automake.mk +++ b/lib/automake.mk @@ -55,6 +55,8 @@ lib_libopenvswitch_a_SOURCES = \ lib/hmap.h \ lib/json.c \ lib/json.h \ + lib/jsonrpc.c \ + lib/jsonrpc.h \ lib/leak-checker.c \ lib/leak-checker.h \ lib/learning-switch.c \ diff --git a/lib/jsonrpc.c b/lib/jsonrpc.c new file mode 100644 index 00000000..cf2a4971 --- /dev/null +++ b/lib/jsonrpc.c @@ -0,0 +1,555 @@ +/* + * Copyright (c) 2009 Nicira Networks. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include + +#include "jsonrpc.h" + +#include + +#include "byteq.h" +#include "json.h" +#include "list.h" +#include "ofpbuf.h" +#include "poll-loop.h" +#include "queue.h" +#include "stream.h" + +#define THIS_MODULE VLM_jsonrpc +#include "vlog.h" + +struct jsonrpc { + struct stream *stream; + char *name; + int status; + + /* Input. */ + struct byteq input; + struct json_parser *parser; + struct jsonrpc_msg *received; + + /* Output. */ + struct ovs_queue output; + size_t backlog; +}; + +/* Rate limit for error messages. */ +static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(5, 5); + +static void jsonrpc_received(struct jsonrpc *); +static void jsonrpc_cleanup(struct jsonrpc *); + +struct jsonrpc * +jsonrpc_open(struct stream *stream) +{ + struct jsonrpc *rpc; + + assert(stream != NULL); + + rpc = xzalloc(sizeof *rpc); + rpc->name = xstrdup(stream_get_name(stream)); + rpc->stream = stream; + byteq_init(&rpc->input); + queue_init(&rpc->output); + + return rpc; +} + +void +jsonrpc_close(struct jsonrpc *rpc) +{ + if (rpc) { + jsonrpc_cleanup(rpc); + free(rpc->name); + free(rpc); + } +} + +void +jsonrpc_run(struct jsonrpc *rpc) +{ + if (rpc->status) { + return; + } + + while (!queue_is_empty(&rpc->output)) { + struct ofpbuf *buf = rpc->output.head; + int retval; + + retval = stream_send(rpc->stream, buf->data, buf->size); + if (retval >= 0) { + rpc->backlog -= retval; + ofpbuf_pull(buf, retval); + if (!buf->size) { + ofpbuf_delete(queue_pop_head(&rpc->output)); + } + } else { + if (retval != -EAGAIN) { + VLOG_WARN_RL(&rl, "%s: send error: %s", + rpc->name, strerror(-retval)); + jsonrpc_error(rpc, -retval); + } + break; + } + } +} + +void +jsonrpc_wait(struct jsonrpc *rpc) +{ + if (!rpc->status && !queue_is_empty(&rpc->output)) { + stream_send_wait(rpc->stream); + } +} + +int +jsonrpc_get_status(const struct jsonrpc *rpc) +{ + return rpc->status; +} + +size_t +jsonrpc_get_backlog(const struct jsonrpc *rpc) +{ + return rpc->status ? 0 : rpc->backlog; +} + +const char * +jsonrpc_get_name(const struct jsonrpc *rpc) +{ + return rpc->name; +} + +int +jsonrpc_send(struct jsonrpc *rpc, struct jsonrpc_msg *msg) +{ + struct ofpbuf *buf; + struct json *json; + size_t length; + char *s; + + if (rpc->status) { + jsonrpc_msg_destroy(msg); + return rpc->status; + } + + json = jsonrpc_msg_to_json(msg); + s = json_to_string(json, 0); + length = strlen(s); + json_destroy(json); + + buf = xmalloc(sizeof *buf); + ofpbuf_use(buf, s, length); + buf->size = length; + queue_push_tail(&rpc->output, buf); + rpc->backlog += length; + + if (rpc->output.n == 1) { + jsonrpc_run(rpc); + } + return rpc->status; +} + +int +jsonrpc_recv(struct jsonrpc *rpc, struct jsonrpc_msg **msgp) +{ + *msgp = NULL; + if (rpc->status) { + return rpc->status; + } + + while (!rpc->received) { + if (byteq_is_empty(&rpc->input)) { + size_t chunk; + int retval; + + chunk = byteq_headroom(&rpc->input); + retval = stream_recv(rpc->stream, byteq_head(&rpc->input), chunk); + if (retval < 0) { + if (retval == -EAGAIN) { + return EAGAIN; + } else { + VLOG_WARN_RL(&rl, "%s: receive error: %s", + rpc->name, strerror(-retval)); + jsonrpc_error(rpc, -retval); + return rpc->status; + } + } else if (retval == 0) { + VLOG_INFO_RL(&rl, "%s: connection closed", rpc->name); + jsonrpc_error(rpc, EOF); + return EOF; + } + byteq_advance_head(&rpc->input, retval); + } else { + size_t n, used; + + if (!rpc->parser) { + rpc->parser = json_parser_create(0); + } + n = byteq_tailroom(&rpc->input); + used = json_parser_feed(rpc->parser, + (char *) byteq_tail(&rpc->input), n); + byteq_advance_tail(&rpc->input, used); + if (json_parser_is_done(rpc->parser)) { + jsonrpc_received(rpc); + if (rpc->status) { + return rpc->status; + } + } + } + } + + *msgp = rpc->received; + rpc->received = NULL; + return 0; +} + +void +jsonrpc_recv_wait(struct jsonrpc *rpc) +{ + if (rpc->status || rpc->received || !byteq_is_empty(&rpc->input)) { + poll_immediate_wake(); + } else { + stream_recv_wait(rpc->stream); + } +} + +int +jsonrpc_send_block(struct jsonrpc *rpc, struct jsonrpc_msg *msg) +{ + int error; + + error = jsonrpc_send(rpc, msg); + if (error) { + return error; + } + + while (!queue_is_empty(&rpc->output) && !rpc->status) { + jsonrpc_run(rpc); + jsonrpc_wait(rpc); + poll_block(); + } + return rpc->status; +} + +int +jsonrpc_recv_block(struct jsonrpc *rpc, struct jsonrpc_msg **msgp) +{ + for (;;) { + int error = jsonrpc_recv(rpc, msgp); + if (error != EAGAIN) { + return error; + } + + jsonrpc_run(rpc); + jsonrpc_wait(rpc); + jsonrpc_recv_wait(rpc); + poll_block(); + } +} + +static void +jsonrpc_received(struct jsonrpc *rpc) +{ + struct jsonrpc_msg *msg; + struct json *json; + char *error; + + json = json_parser_finish(rpc->parser); + rpc->parser = NULL; + if (json->type == JSON_STRING) { + VLOG_WARN_RL(&rl, "%s: error parsing stream: %s", + rpc->name, json_string(json)); + jsonrpc_error(rpc, EPROTO); + json_destroy(json); + return; + } + + error = jsonrpc_msg_from_json(json, &msg); + if (error) { + VLOG_WARN_RL(&rl, "%s: received bad JSON-RPC message: %s", + rpc->name, error); + free(error); + jsonrpc_error(rpc, EPROTO); + return; + } + + rpc->received = msg; +} + +void +jsonrpc_error(struct jsonrpc *rpc, int error) +{ + assert(error); + if (!rpc->status) { + rpc->status = error; + jsonrpc_cleanup(rpc); + } +} + +static void +jsonrpc_cleanup(struct jsonrpc *rpc) +{ + stream_close(rpc->stream); + rpc->stream = NULL; + + json_parser_abort(rpc->parser); + rpc->parser = NULL; + + jsonrpc_msg_destroy(rpc->received); + rpc->received = NULL; + + queue_clear(&rpc->output); + rpc->backlog = 0; +} + +static struct jsonrpc_msg * +jsonrpc_create(enum jsonrpc_msg_type type, const char *method, + struct json *params, struct json *result, struct json *error, + struct json *id) +{ + struct jsonrpc_msg *msg = xmalloc(sizeof *msg); + msg->type = type; + msg->method = method ? xstrdup(method) : NULL; + msg->params = params; + msg->result = result; + msg->error = error; + msg->id = id; + return msg; +} + +static struct json * +jsonrpc_create_id(void) +{ + static unsigned int id; + return json_integer_create(id++); +} + +struct jsonrpc_msg * +jsonrpc_create_request(const char *method, struct json *params) +{ + return jsonrpc_create(JSONRPC_REQUEST, method, params, NULL, NULL, + jsonrpc_create_id()); +} + +struct jsonrpc_msg * +jsonrpc_create_notify(const char *method, struct json *params) +{ + return jsonrpc_create(JSONRPC_NOTIFY, method, params, NULL, NULL, NULL); +} + +struct jsonrpc_msg * +jsonrpc_create_reply(struct json *result, const struct json *id) +{ + return jsonrpc_create(JSONRPC_REPLY, NULL, NULL, result, NULL, + json_clone(id)); +} + +struct jsonrpc_msg * +jsonrpc_create_error(struct json *error, const struct json *id) +{ + return jsonrpc_create(JSONRPC_REPLY, NULL, NULL, NULL, error, + json_clone(id)); +} + +const char * +jsonrpc_msg_type_to_string(enum jsonrpc_msg_type type) +{ + switch (type) { + case JSONRPC_REQUEST: + return "request"; + + case JSONRPC_NOTIFY: + return "notification"; + + case JSONRPC_REPLY: + return "reply"; + + case JSONRPC_ERROR: + return "error"; + } + return "(null)"; +} + +char * +jsonrpc_msg_is_valid(const struct jsonrpc_msg *m) +{ + const char *type_name; + unsigned int pattern; + + if (m->params && m->params->type != JSON_ARRAY) { + return xstrdup("\"params\" must be JSON array"); + } + + switch (m->type) { + case JSONRPC_REQUEST: + pattern = 0x11001; + break; + + case JSONRPC_NOTIFY: + pattern = 0x11000; + break; + + case JSONRPC_REPLY: + pattern = 0x00101; + break; + + case JSONRPC_ERROR: + pattern = 0x00011; + break; + + default: + return xasprintf("invalid JSON-RPC message type %d", m->type); + } + + type_name = jsonrpc_msg_type_to_string(m->type); + if ((m->method != NULL) != ((pattern & 0x10000) != 0)) { + return xasprintf("%s must%s have \"method\"", + type_name, (pattern & 0x10000) ? "" : " not"); + + } + if ((m->params != NULL) != ((pattern & 0x1000) != 0)) { + return xasprintf("%s must%s have \"params\"", + type_name, (pattern & 0x1000) ? "" : " not"); + + } + if ((m->result != NULL) != ((pattern & 0x100) != 0)) { + return xasprintf("%s must%s have \"result\"", + type_name, (pattern & 0x100) ? "" : " not"); + + } + if ((m->error != NULL) != ((pattern & 0x10) != 0)) { + return xasprintf("%s must%s have \"error\"", + type_name, (pattern & 0x10) ? "" : " not"); + + } + if ((m->id != NULL) != ((pattern & 0x1) != 0)) { + return xasprintf("%s must%s have \"id\"", + type_name, (pattern & 0x1) ? "" : " not"); + + } + return NULL; +} + +void +jsonrpc_msg_destroy(struct jsonrpc_msg *m) +{ + if (m) { + free(m->method); + json_destroy(m->params); + json_destroy(m->result); + json_destroy(m->error); + json_destroy(m->id); + free(m); + } +} + +static struct json * +null_from_json_null(struct json *json) +{ + if (json && json->type == JSON_NULL) { + json_destroy(json); + return NULL; + } + return json; +} + +char * +jsonrpc_msg_from_json(struct json *json, struct jsonrpc_msg **msgp) +{ + struct json *method = NULL; + struct jsonrpc_msg *msg = NULL; + struct shash *object; + char *error; + + if (json->type != JSON_OBJECT) { + error = xstrdup("message is not a JSON object"); + goto exit; + } + object = json_object(json); + + method = shash_find_and_delete(object, "method"); + if (method && method->type != JSON_STRING) { + error = xstrdup("method is not a JSON string"); + goto exit; + } + + msg = xzalloc(sizeof *msg); + msg->method = method ? xstrdup(method->u.string) : NULL; + msg->params = null_from_json_null(shash_find_and_delete(object, "params")); + msg->result = null_from_json_null(shash_find_and_delete(object, "result")); + msg->error = null_from_json_null(shash_find_and_delete(object, "error")); + msg->id = null_from_json_null(shash_find_and_delete(object, "id")); + msg->type = (msg->result ? JSONRPC_REPLY + : msg->error ? JSONRPC_ERROR + : msg->id ? JSONRPC_REQUEST + : JSONRPC_NOTIFY); + if (!shash_is_empty(object)) { + error = xasprintf("message has unexpected member \"%s\"", + shash_first(object)->name); + goto exit; + } + error = jsonrpc_msg_is_valid(msg); + if (error) { + goto exit; + } + +exit: + json_destroy(method); + json_destroy(json); + if (error) { + jsonrpc_msg_destroy(msg); + msg = NULL; + } + *msgp = msg; + return error; +} + +struct json * +jsonrpc_msg_to_json(struct jsonrpc_msg *m) +{ + struct json *json = json_object_create(); + + if (m->method) { + json_object_put(json, "method", json_string_create_nocopy(m->method)); + } + + if (m->params) { + json_object_put(json, "params", m->params); + } + + if (m->result) { + json_object_put(json, "result", m->result); + } else if (m->type == JSONRPC_ERROR) { + json_object_put(json, "result", json_null_create()); + } + + if (m->error) { + json_object_put(json, "error", m->error); + } else if (m->type == JSONRPC_REPLY) { + json_object_put(json, "error", json_null_create()); + } + + if (m->id) { + json_object_put(json, "id", m->id); + } else if (m->type == JSONRPC_NOTIFY) { + json_object_put(json, "id", json_null_create()); + } + + free(m); + + return json; +} diff --git a/lib/jsonrpc.h b/lib/jsonrpc.h new file mode 100644 index 00000000..931983ff --- /dev/null +++ b/lib/jsonrpc.h @@ -0,0 +1,83 @@ +/* + * Copyright (c) 2009 Nicira Networks. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef JSONRPC_H +#define JSONRPC_H 1 + +/* This is an implementation of the JSON-RPC 1.0 specification defined at + * http://json-rpc.org/wiki/specification. */ + +#include +#include + +struct json; +struct jsonrpc_msg; +struct stream; + +/* API for a JSON-RPC stream. */ + +struct jsonrpc *jsonrpc_open(struct stream *); +void jsonrpc_close(struct jsonrpc *); + +void jsonrpc_run(struct jsonrpc *); +void jsonrpc_wait(struct jsonrpc *); + +void jsonrpc_error(struct jsonrpc *, int error); +int jsonrpc_get_status(const struct jsonrpc *); +size_t jsonrpc_get_backlog(const struct jsonrpc *); +const char *jsonrpc_get_name(const struct jsonrpc *); + +int jsonrpc_send(struct jsonrpc *, struct jsonrpc_msg *); +int jsonrpc_recv(struct jsonrpc *, struct jsonrpc_msg **); +void jsonrpc_recv_wait(struct jsonrpc *); + +int jsonrpc_send_block(struct jsonrpc *, struct jsonrpc_msg *); +int jsonrpc_recv_block(struct jsonrpc *, struct jsonrpc_msg **); + +/* Messages. */ +enum jsonrpc_msg_type { + JSONRPC_REQUEST, /* Request. */ + JSONRPC_NOTIFY, /* Notification. */ + JSONRPC_REPLY, /* Successful reply. */ + JSONRPC_ERROR /* Error reply. */ +}; + +struct jsonrpc_msg { + enum jsonrpc_msg_type type; + char *method; /* Request or notification only. */ + struct json *params; /* Request or notification only. */ + struct json *result; /* Successful reply only. */ + struct json *error; /* Error reply only. */ + struct json *id; /* Request or reply only. */ +}; + +struct jsonrpc_msg *jsonrpc_create_request(const char *method, + struct json *params); +struct jsonrpc_msg *jsonrpc_create_notify(const char *method, + struct json *params); +struct jsonrpc_msg *jsonrpc_create_reply(struct json *result, + const struct json *id); +struct jsonrpc_msg *jsonrpc_create_error(struct json *error, + const struct json *id); + +const char *jsonrpc_msg_type_to_string(enum jsonrpc_msg_type); +char *jsonrpc_msg_is_valid(const struct jsonrpc_msg *); +void jsonrpc_msg_destroy(struct jsonrpc_msg *); + +char *jsonrpc_msg_from_json(struct json *, struct jsonrpc_msg **); +struct json *jsonrpc_msg_to_json(struct jsonrpc_msg *); + +#endif /* jsonrpc.h */ diff --git a/lib/vlog-modules.def b/lib/vlog-modules.def index da345df4..684954ea 100644 --- a/lib/vlog-modules.def +++ b/lib/vlog-modules.def @@ -40,6 +40,7 @@ VLOG_MODULE(fatal_signal) VLOG_MODULE(fault) VLOG_MODULE(flow) VLOG_MODULE(in_band) +VLOG_MODULE(jsonrpc) VLOG_MODULE(leak_checker) VLOG_MODULE(learning_switch) VLOG_MODULE(lockfile) diff --git a/tests/automake.mk b/tests/automake.mk index a9edee0e..62845112 100644 --- a/tests/automake.mk +++ b/tests/automake.mk @@ -12,6 +12,7 @@ TESTSUITE_AT = \ tests/aes128.at \ tests/uuid.at \ tests/json.at \ + tests/jsonrpc.at \ tests/timeval.at \ tests/lockfile.at \ tests/stp.at \ @@ -76,6 +77,10 @@ noinst_PROGRAMS += tests/test-json tests_test_json_SOURCES = tests/test-json.c tests_test_json_LDADD = lib/libopenvswitch.a +noinst_PROGRAMS += tests/test-jsonrpc +tests_test_jsonrpc_SOURCES = tests/test-jsonrpc.c +tests_test_jsonrpc_LDADD = lib/libopenvswitch.a + noinst_PROGRAMS += tests/test-list tests_test_list_SOURCES = tests/test-list.c tests_test_list_LDADD = lib/libopenvswitch.a diff --git a/tests/jsonrpc.at b/tests/jsonrpc.at new file mode 100644 index 00000000..d5ebf948 --- /dev/null +++ b/tests/jsonrpc.at @@ -0,0 +1,45 @@ +AT_BANNER([JSON-RPC]) + +AT_SETUP([JSON-RPC request and successful reply]) +AT_CHECK([test-jsonrpc --detach --pidfile=$PWD/pid listen punix:socket]) +AT_CHECK([test -s pid]) +AT_CHECK([kill -0 `cat pid`]) +AT_CHECK( + [[test-jsonrpc request unix:socket echo '[{"a": "b", "x": null}]']], [0], + [[{"error":null,"id":0,"result":[{"a":"b","x":null}]} +]], [ignore], [test ! -e pid || kill `cat pid`]) +AT_CHECK([kill `cat pid`]) +AT_CLEANUP + +AT_SETUP([JSON-RPC request and error reply]) +AT_CHECK([test-jsonrpc --detach --pidfile=$PWD/pid listen punix:socket]) +AT_CHECK([test -s pid]) +AT_CHECK([kill -0 `cat pid`]) +AT_CHECK( + [[test-jsonrpc request unix:socket bad-request '[]']], [0], + [[{"error":{"error":"unknown method"},"id":0,"result":null} +]], [ignore], [test ! -e pid || kill `cat pid`]) +AT_CHECK([kill `cat pid`]) +AT_CLEANUP + +AT_SETUP([JSON-RPC notification]) +AT_CHECK([test-jsonrpc --detach --pidfile=$PWD/pid listen punix:socket]) +AT_CHECK([test -s pid]) +# When a daemon dies it deletes its pidfile, so make a copy. +AT_CHECK([cp pid pid2]) +AT_CHECK([kill -0 `cat pid2`]) +OVS_CHECK_LCOV([[test-jsonrpc notify unix:socket shutdown '[]']], [0], [], + [ignore], [kill `cat pid2`]) +AT_CHECK( + [pid=`cat pid2` + # First try a quick sleep, so that the test completes very quickly + # in the normal case. POSIX doesn't require fractional times to + # work, so this might not work. + sleep 0.1; if kill -0 $pid; then :; else echo success; exit 0; fi + # Then wait up to 2 seconds. + sleep 1; if kill -0 $pid; then :; else echo success; exit 0; fi + sleep 1; if kill -0 $pid; then :; else echo success; exit 0; fi + echo failure; exit 1], [0], [success +], [ignore]) +AT_CHECK([test ! -e pid]) +AT_CLEANUP diff --git a/tests/test-json.c b/tests/test-json.c index bb9fadb0..6261786d 100644 --- a/tests/test-json.c +++ b/tests/test-json.c @@ -92,7 +92,7 @@ parse_multiple(const char *input_file) parser = json_parser_create(0); } - used = n - json_parser_feed(parser, &buffer[used], n - used); + used += json_parser_feed(parser, &buffer[used], n - used); if (used < n) { if (!print_and_free_json(json_parser_finish(parser))) { ok = false; diff --git a/tests/test-jsonrpc.c b/tests/test-jsonrpc.c new file mode 100644 index 00000000..7f2166f4 --- /dev/null +++ b/tests/test-jsonrpc.c @@ -0,0 +1,323 @@ +/* + * Copyright (c) 2009 Nicira Networks. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include + +#include "jsonrpc.h" + +#include +#include +#include +#include +#include + +#include "command-line.h" +#include "daemon.h" +#include "json.h" +#include "poll-loop.h" +#include "stream.h" +#include "timeval.h" +#include "util.h" +#include "vlog.h" + +static struct command all_commands[]; + +static void usage(void) NO_RETURN; +static void parse_options(int argc, char *argv[]); + +int +main(int argc, char *argv[]) +{ + set_program_name(argv[0]); + time_init(); + vlog_init(); + parse_options(argc, argv); + run_command(argc - optind, argv + optind, all_commands); + return 0; +} + +static void +parse_options(int argc, char *argv[]) +{ + static struct option long_options[] = { + {"verbose", optional_argument, 0, 'v'}, + {"help", no_argument, 0, 'h'}, + DAEMON_LONG_OPTIONS, + {0, 0, 0, 0}, + }; + char *short_options = long_options_to_short_options(long_options); + + for (;;) { + int c = getopt_long(argc, argv, short_options, long_options, NULL); + if (c == -1) { + break; + } + + switch (c) { + case 'h': + usage(); + + case 'v': + vlog_set_verbosity(optarg); + break; + + DAEMON_OPTION_HANDLERS + + case '?': + exit(EXIT_FAILURE); + + default: + abort(); + } + } + free(short_options); +} + +static void +usage(void) +{ + printf("%s: JSON-RPC test utility\n" + "usage: %s [OPTIONS] COMMAND [ARG...]\n" + " listen LOCAL listen for connections on LOCAL\n" + " request REMOTE METHOD PARAMS send request, print reply\n" + " notify REMOTE METHOD PARAMS send notification and exit\n", + program_name, program_name); + stream_usage("JSON-RPC", true, true); + daemon_usage(); + vlog_usage(); + printf("\nOther options:\n" + " -h, --help display this help message\n"); + exit(EXIT_SUCCESS); +} + +/* Command helper functions. */ + +static struct json * +parse_json(const char *s) +{ + struct json *json = json_from_string(s); + if (json->type == JSON_STRING) { + ovs_fatal(0, "\"%s\": %s", s, json->u.string); + } + return json; +} + +static void +print_and_free_json(struct json *json) +{ + char *string = json_to_string(json, JSSF_SORT); + json_destroy(json); + puts(string); + free(string); +} + +/* Command implementations. */ + +static void +handle_rpc(struct jsonrpc *rpc, struct jsonrpc_msg *msg, bool *done) +{ + struct jsonrpc_msg *reply = NULL; + if (msg->type == JSONRPC_REQUEST) { + if (!strcmp(msg->method, "echo")) { + reply = jsonrpc_create_reply(json_clone(msg->params), msg->id); + } else { + struct json *error = json_object_create(); + json_object_put_string(error, "error", "unknown method"); + reply = jsonrpc_create_error(error, msg->id); + ovs_error(0, "unknown request %s", msg->method); + } + + } else if (msg->type == JSONRPC_NOTIFY) { + if (!strcmp(msg->method, "shutdown")) { + *done = true; + } else { + jsonrpc_error(rpc, ENOTTY); + ovs_error(0, "unknown notification %s", msg->method); + } + } else { + jsonrpc_error(rpc, EPROTO); + ovs_error(0, "unsolicited JSON-RPC reply or error"); + } + + if (reply) { + jsonrpc_send(rpc, reply); + } +} + +static void +do_listen(int argc UNUSED, char *argv[]) +{ + struct pstream *pstream; + struct jsonrpc **rpcs; + size_t n_rpcs, allocated_rpcs; + bool done; + int error; + + die_if_already_running(); + + error = pstream_open(argv[1], &pstream); + if (error) { + ovs_fatal(error, "could not listen on \"%s\"", argv[1]); + } + + daemonize(); + + rpcs = NULL; + n_rpcs = allocated_rpcs = 0; + done = false; + for (;;) { + struct stream *stream; + size_t i; + + /* Accept new connections. */ + error = pstream_accept(pstream, &stream); + if (!error) { + if (n_rpcs >= allocated_rpcs) { + rpcs = x2nrealloc(rpcs, &allocated_rpcs, sizeof *rpcs); + } + rpcs[n_rpcs++] = jsonrpc_open(stream); + } else if (error != EAGAIN) { + ovs_fatal(error, "pstream_accept failed"); + } + + /* Service existing connections. */ + for (i = 0; i < n_rpcs; ) { + struct jsonrpc *rpc = rpcs[i]; + struct jsonrpc_msg *msg; + + jsonrpc_run(rpc); + if (!jsonrpc_get_backlog(rpc)) { + error = jsonrpc_recv(rpc, &msg); + if (!error) { + handle_rpc(rpc, msg, &done); + jsonrpc_msg_destroy(msg); + } + } + + error = jsonrpc_get_status(rpc); + if (error) { + jsonrpc_close(rpc); + ovs_error(error, "connection closed"); + memmove(&rpcs[i], &rpcs[i + 1], + (n_rpcs - i - 1) * sizeof *rpcs); + n_rpcs--; + } else { + i++; + } + } + + /* Wait for something to do. */ + if (done && !n_rpcs) { + break; + } + pstream_wait(pstream); + for (i = 0; i < n_rpcs; i++) { + struct jsonrpc *rpc = rpcs[i]; + + jsonrpc_wait(rpc); + if (!jsonrpc_get_backlog(rpc)) { + jsonrpc_recv_wait(rpc); + } + } + poll_block(); + } +} + + +static void +do_request(int argc UNUSED, char *argv[]) +{ + struct jsonrpc_msg *msg; + struct jsonrpc *rpc; + struct json *params; + struct stream *stream; + const char *method; + char *string; + int error; + + method = argv[2]; + params = parse_json(argv[3]); + msg = jsonrpc_create_request(method, params); + string = jsonrpc_msg_is_valid(msg); + if (string) { + ovs_fatal(0, "not a valid JSON-RPC request: %s", string); + } + + error = stream_open_block(argv[1], &stream); + if (error) { + ovs_fatal(error, "could not open \"%s\"", argv[1]); + } + rpc = jsonrpc_open(stream); + + error = jsonrpc_send(rpc, msg); + if (error) { + ovs_fatal(error, "could not send request"); + } + + error = jsonrpc_recv_block(rpc, &msg); + if (error) { + ovs_fatal(error, "error waiting for reply"); + } + print_and_free_json(jsonrpc_msg_to_json(msg)); + + jsonrpc_close(rpc); +} + +static void +do_notify(int argc UNUSED, char *argv[]) +{ + struct jsonrpc_msg *msg; + struct jsonrpc *rpc; + struct json *params; + struct stream *stream; + const char *method; + char *string; + int error; + + method = argv[2]; + params = parse_json(argv[3]); + msg = jsonrpc_create_notify(method, params); + string = jsonrpc_msg_is_valid(msg); + if (string) { + ovs_fatal(0, "not a JSON RPC-valid notification: %s", string); + } + + error = stream_open_block(argv[1], &stream); + if (error) { + ovs_fatal(error, "could not open \"%s\"", argv[1]); + } + rpc = jsonrpc_open(stream); + + error = jsonrpc_send_block(rpc, msg); + if (error) { + ovs_fatal(error, "could not send request"); + } + jsonrpc_close(rpc); +} + +static void +do_help(int argc UNUSED, char *argv[] UNUSED) +{ + usage(); +} + +static struct command all_commands[] = { + { "listen", 1, 1, do_listen }, + { "request", 3, 3, do_request }, + { "notify", 3, 3, do_notify }, + { "help", 0, INT_MAX, do_help }, + { NULL, 0, 0, NULL }, +}; diff --git a/tests/testsuite.at b/tests/testsuite.at index 816bed1c..781b6a6c 100644 --- a/tests/testsuite.at +++ b/tests/testsuite.at @@ -23,6 +23,7 @@ m4_include([tests/dir_name.at]) m4_include([tests/aes128.at]) m4_include([tests/uuid.at]) m4_include([tests/json.at]) +m4_include([tests/jsonrpc.at]) m4_include([tests/timeval.at]) m4_include([tests/lockfile.at]) m4_include([tests/stp.at])