2 * Copyright (c) 2009, 2010, 2011, 2012 Nicira Networks.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at:
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
25 #include "dynamic-string.h"
26 #include "fatal-signal.h"
30 #include "poll-loop.h"
31 #include "reconnect.h"
36 VLOG_DEFINE_THIS_MODULE(jsonrpc);
39 struct stream *stream;
45 struct json_parser *parser;
46 struct jsonrpc_msg *received;
49 struct list output; /* Contains "struct ofpbuf"s. */
53 /* Rate limit for error messages. */
54 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(5, 5);
56 static void jsonrpc_received(struct jsonrpc *);
57 static void jsonrpc_cleanup(struct jsonrpc *);
58 static void jsonrpc_error(struct jsonrpc *, int error);
60 /* This is just the same as stream_open() except that it uses the default
61 * JSONRPC ports if none is specified. */
63 jsonrpc_stream_open(const char *name, struct stream **streamp)
65 return stream_open_with_default_ports(name, JSONRPC_TCP_PORT,
66 JSONRPC_SSL_PORT, streamp);
69 /* This is just the same as pstream_open() except that it uses the default
70 * JSONRPC ports if none is specified. */
72 jsonrpc_pstream_open(const char *name, struct pstream **pstreamp)
74 return pstream_open_with_default_ports(name, JSONRPC_TCP_PORT,
75 JSONRPC_SSL_PORT, pstreamp);
79 jsonrpc_open(struct stream *stream)
83 assert(stream != NULL);
85 rpc = xzalloc(sizeof *rpc);
86 rpc->name = xstrdup(stream_get_name(stream));
88 byteq_init(&rpc->input);
89 list_init(&rpc->output);
95 jsonrpc_close(struct jsonrpc *rpc)
105 jsonrpc_run(struct jsonrpc *rpc)
111 stream_run(rpc->stream);
112 while (!list_is_empty(&rpc->output)) {
113 struct ofpbuf *buf = ofpbuf_from_list(rpc->output.next);
116 retval = stream_send(rpc->stream, buf->data, buf->size);
118 rpc->backlog -= retval;
119 ofpbuf_pull(buf, retval);
121 list_remove(&buf->list_node);
125 if (retval != -EAGAIN) {
126 VLOG_WARN_RL(&rl, "%s: send error: %s",
127 rpc->name, strerror(-retval));
128 jsonrpc_error(rpc, -retval);
136 jsonrpc_wait(struct jsonrpc *rpc)
139 stream_run_wait(rpc->stream);
140 if (!list_is_empty(&rpc->output)) {
141 stream_send_wait(rpc->stream);
147 * Possible status values:
150 * - EOF: end of file (remote end closed connection; not necessarily an error)
153 jsonrpc_get_status(const struct jsonrpc *rpc)
159 jsonrpc_get_backlog(const struct jsonrpc *rpc)
161 return rpc->status ? 0 : rpc->backlog;
165 jsonrpc_get_name(const struct jsonrpc *rpc)
171 jsonrpc_log_msg(const struct jsonrpc *rpc, const char *title,
172 const struct jsonrpc_msg *msg)
174 if (VLOG_IS_DBG_ENABLED()) {
175 struct ds s = DS_EMPTY_INITIALIZER;
177 ds_put_format(&s, ", method=\"%s\"", msg->method);
180 ds_put_cstr(&s, ", params=");
181 json_to_ds(msg->params, 0, &s);
184 ds_put_cstr(&s, ", result=");
185 json_to_ds(msg->result, 0, &s);
188 ds_put_cstr(&s, ", error=");
189 json_to_ds(msg->error, 0, &s);
192 ds_put_cstr(&s, ", id=");
193 json_to_ds(msg->id, 0, &s);
195 VLOG_DBG("%s: %s %s%s", rpc->name, title,
196 jsonrpc_msg_type_to_string(msg->type), ds_cstr(&s));
201 /* Always takes ownership of 'msg', regardless of success. */
203 jsonrpc_send(struct jsonrpc *rpc, struct jsonrpc_msg *msg)
211 jsonrpc_msg_destroy(msg);
215 jsonrpc_log_msg(rpc, "send", msg);
217 json = jsonrpc_msg_to_json(msg);
218 s = json_to_string(json, 0);
222 buf = xmalloc(sizeof *buf);
223 ofpbuf_use(buf, s, length);
225 list_push_back(&rpc->output, &buf->list_node);
226 rpc->backlog += length;
228 if (rpc->backlog == length) {
235 jsonrpc_recv(struct jsonrpc *rpc, struct jsonrpc_msg **msgp)
242 while (!rpc->received) {
243 if (byteq_is_empty(&rpc->input)) {
247 chunk = byteq_headroom(&rpc->input);
248 retval = stream_recv(rpc->stream, byteq_head(&rpc->input), chunk);
250 if (retval == -EAGAIN) {
253 VLOG_WARN_RL(&rl, "%s: receive error: %s",
254 rpc->name, strerror(-retval));
255 jsonrpc_error(rpc, -retval);
258 } else if (retval == 0) {
259 jsonrpc_error(rpc, EOF);
262 byteq_advance_head(&rpc->input, retval);
267 rpc->parser = json_parser_create(0);
269 n = byteq_tailroom(&rpc->input);
270 used = json_parser_feed(rpc->parser,
271 (char *) byteq_tail(&rpc->input), n);
272 byteq_advance_tail(&rpc->input, used);
273 if (json_parser_is_done(rpc->parser)) {
274 jsonrpc_received(rpc);
276 const struct byteq *q = &rpc->input;
277 if (q->head <= BYTEQ_SIZE) {
278 stream_report_content(q->buffer, q->head,
280 THIS_MODULE, rpc->name);
288 *msgp = rpc->received;
289 rpc->received = NULL;
294 jsonrpc_recv_wait(struct jsonrpc *rpc)
296 if (rpc->status || rpc->received || !byteq_is_empty(&rpc->input)) {
297 (poll_immediate_wake)(rpc->name);
299 stream_recv_wait(rpc->stream);
303 /* Always takes ownership of 'msg', regardless of success. */
305 jsonrpc_send_block(struct jsonrpc *rpc, struct jsonrpc_msg *msg)
311 error = jsonrpc_send(rpc, msg);
318 if (list_is_empty(&rpc->output) || rpc->status) {
327 jsonrpc_recv_block(struct jsonrpc *rpc, struct jsonrpc_msg **msgp)
330 int error = jsonrpc_recv(rpc, msgp);
331 if (error != EAGAIN) {
338 jsonrpc_recv_wait(rpc);
343 /* Always takes ownership of 'request', regardless of success. */
345 jsonrpc_transact_block(struct jsonrpc *rpc, struct jsonrpc_msg *request,
346 struct jsonrpc_msg **replyp)
348 struct jsonrpc_msg *reply = NULL;
352 id = json_clone(request->id);
353 error = jsonrpc_send_block(rpc, request);
356 error = jsonrpc_recv_block(rpc, &reply);
358 || (reply->type == JSONRPC_REPLY
359 && json_equal(id, reply->id))) {
362 jsonrpc_msg_destroy(reply);
365 *replyp = error ? NULL : reply;
371 jsonrpc_received(struct jsonrpc *rpc)
373 struct jsonrpc_msg *msg;
377 json = json_parser_finish(rpc->parser);
379 if (json->type == JSON_STRING) {
380 VLOG_WARN_RL(&rl, "%s: error parsing stream: %s",
381 rpc->name, json_string(json));
382 jsonrpc_error(rpc, EPROTO);
387 error = jsonrpc_msg_from_json(json, &msg);
389 VLOG_WARN_RL(&rl, "%s: received bad JSON-RPC message: %s",
392 jsonrpc_error(rpc, EPROTO);
396 jsonrpc_log_msg(rpc, "received", msg);
401 jsonrpc_error(struct jsonrpc *rpc, int error)
406 jsonrpc_cleanup(rpc);
411 jsonrpc_cleanup(struct jsonrpc *rpc)
413 stream_close(rpc->stream);
416 json_parser_abort(rpc->parser);
419 jsonrpc_msg_destroy(rpc->received);
420 rpc->received = NULL;
422 ofpbuf_list_delete(&rpc->output);
426 static struct jsonrpc_msg *
427 jsonrpc_create(enum jsonrpc_msg_type type, const char *method,
428 struct json *params, struct json *result, struct json *error,
431 struct jsonrpc_msg *msg = xmalloc(sizeof *msg);
433 msg->method = method ? xstrdup(method) : NULL;
434 msg->params = params;
435 msg->result = result;
442 jsonrpc_create_id(void)
444 static unsigned int id;
445 return json_integer_create(id++);
449 jsonrpc_create_request(const char *method, struct json *params,
452 struct json *id = jsonrpc_create_id();
454 *idp = json_clone(id);
456 return jsonrpc_create(JSONRPC_REQUEST, method, params, NULL, NULL, id);
460 jsonrpc_create_notify(const char *method, struct json *params)
462 return jsonrpc_create(JSONRPC_NOTIFY, method, params, NULL, NULL, NULL);
466 jsonrpc_create_reply(struct json *result, const struct json *id)
468 return jsonrpc_create(JSONRPC_REPLY, NULL, NULL, result, NULL,
473 jsonrpc_create_error(struct json *error, const struct json *id)
475 return jsonrpc_create(JSONRPC_REPLY, NULL, NULL, NULL, error,
480 jsonrpc_msg_type_to_string(enum jsonrpc_msg_type type)
483 case JSONRPC_REQUEST:
487 return "notification";
499 jsonrpc_msg_is_valid(const struct jsonrpc_msg *m)
501 const char *type_name;
502 unsigned int pattern;
504 if (m->params && m->params->type != JSON_ARRAY) {
505 return xstrdup("\"params\" must be JSON array");
509 case JSONRPC_REQUEST:
526 return xasprintf("invalid JSON-RPC message type %d", m->type);
529 type_name = jsonrpc_msg_type_to_string(m->type);
530 if ((m->method != NULL) != ((pattern & 0x10000) != 0)) {
531 return xasprintf("%s must%s have \"method\"",
532 type_name, (pattern & 0x10000) ? "" : " not");
535 if ((m->params != NULL) != ((pattern & 0x1000) != 0)) {
536 return xasprintf("%s must%s have \"params\"",
537 type_name, (pattern & 0x1000) ? "" : " not");
540 if ((m->result != NULL) != ((pattern & 0x100) != 0)) {
541 return xasprintf("%s must%s have \"result\"",
542 type_name, (pattern & 0x100) ? "" : " not");
545 if ((m->error != NULL) != ((pattern & 0x10) != 0)) {
546 return xasprintf("%s must%s have \"error\"",
547 type_name, (pattern & 0x10) ? "" : " not");
550 if ((m->id != NULL) != ((pattern & 0x1) != 0)) {
551 return xasprintf("%s must%s have \"id\"",
552 type_name, (pattern & 0x1) ? "" : " not");
559 jsonrpc_msg_destroy(struct jsonrpc_msg *m)
563 json_destroy(m->params);
564 json_destroy(m->result);
565 json_destroy(m->error);
572 null_from_json_null(struct json *json)
574 if (json && json->type == JSON_NULL) {
582 jsonrpc_msg_from_json(struct json *json, struct jsonrpc_msg **msgp)
584 struct json *method = NULL;
585 struct jsonrpc_msg *msg = NULL;
586 struct shash *object;
589 if (json->type != JSON_OBJECT) {
590 error = xstrdup("message is not a JSON object");
593 object = json_object(json);
595 method = shash_find_and_delete(object, "method");
596 if (method && method->type != JSON_STRING) {
597 error = xstrdup("method is not a JSON string");
601 msg = xzalloc(sizeof *msg);
602 msg->method = method ? xstrdup(method->u.string) : NULL;
603 msg->params = null_from_json_null(shash_find_and_delete(object, "params"));
604 msg->result = null_from_json_null(shash_find_and_delete(object, "result"));
605 msg->error = null_from_json_null(shash_find_and_delete(object, "error"));
606 msg->id = null_from_json_null(shash_find_and_delete(object, "id"));
607 msg->type = (msg->result ? JSONRPC_REPLY
608 : msg->error ? JSONRPC_ERROR
609 : msg->id ? JSONRPC_REQUEST
611 if (!shash_is_empty(object)) {
612 error = xasprintf("message has unexpected member \"%s\"",
613 shash_first(object)->name);
616 error = jsonrpc_msg_is_valid(msg);
622 json_destroy(method);
625 jsonrpc_msg_destroy(msg);
633 jsonrpc_msg_to_json(struct jsonrpc_msg *m)
635 struct json *json = json_object_create();
638 json_object_put(json, "method", json_string_create_nocopy(m->method));
642 json_object_put(json, "params", m->params);
646 json_object_put(json, "result", m->result);
647 } else if (m->type == JSONRPC_ERROR) {
648 json_object_put(json, "result", json_null_create());
652 json_object_put(json, "error", m->error);
653 } else if (m->type == JSONRPC_REPLY) {
654 json_object_put(json, "error", json_null_create());
658 json_object_put(json, "id", m->id);
659 } else if (m->type == JSONRPC_NOTIFY) {
660 json_object_put(json, "id", json_null_create());
668 /* A JSON-RPC session with reconnection. */
670 struct jsonrpc_session {
671 struct reconnect *reconnect;
673 struct stream *stream;
674 struct pstream *pstream;
678 /* Creates and returns a jsonrpc_session to 'name', which should be a string
679 * acceptable to stream_open() or pstream_open().
681 * If 'name' is an active connection method, e.g. "tcp:127.1.2.3", the new
682 * jsonrpc_session connects and reconnects, with back-off, to 'name'.
684 * If 'name' is a passive connection method, e.g. "ptcp:", the new
685 * jsonrpc_session listens for connections to 'name'. It maintains at most one
686 * connection at any given time. Any new connection causes the previous one
687 * (if any) to be dropped. */
688 struct jsonrpc_session *
689 jsonrpc_session_open(const char *name)
691 struct jsonrpc_session *s;
693 s = xmalloc(sizeof *s);
694 s->reconnect = reconnect_create(time_msec());
695 reconnect_set_name(s->reconnect, name);
696 reconnect_enable(s->reconnect, time_msec());
702 if (!pstream_verify_name(name)) {
703 reconnect_set_passive(s->reconnect, true, time_msec());
709 /* Creates and returns a jsonrpc_session that is initially connected to
710 * 'jsonrpc'. If the connection is dropped, it will not be reconnected.
712 * On the assumption that such connections are likely to be short-lived
713 * (e.g. from ovs-vsctl), informational logging for them is suppressed. */
714 struct jsonrpc_session *
715 jsonrpc_session_open_unreliably(struct jsonrpc *jsonrpc)
717 struct jsonrpc_session *s;
719 s = xmalloc(sizeof *s);
720 s->reconnect = reconnect_create(time_msec());
721 reconnect_set_quiet(s->reconnect, true);
722 reconnect_set_name(s->reconnect, jsonrpc_get_name(jsonrpc));
723 reconnect_set_max_tries(s->reconnect, 0);
724 reconnect_connected(s->reconnect, time_msec());
734 jsonrpc_session_close(struct jsonrpc_session *s)
737 jsonrpc_close(s->rpc);
738 reconnect_destroy(s->reconnect);
739 stream_close(s->stream);
740 pstream_close(s->pstream);
746 jsonrpc_session_disconnect(struct jsonrpc_session *s)
749 jsonrpc_error(s->rpc, EOF);
750 jsonrpc_close(s->rpc);
753 } else if (s->stream) {
754 stream_close(s->stream);
761 jsonrpc_session_connect(struct jsonrpc_session *s)
763 const char *name = reconnect_get_name(s->reconnect);
766 jsonrpc_session_disconnect(s);
767 if (!reconnect_is_passive(s->reconnect)) {
768 error = jsonrpc_stream_open(name, &s->stream);
770 reconnect_connecting(s->reconnect, time_msec());
773 error = s->pstream ? 0 : jsonrpc_pstream_open(name, &s->pstream);
775 reconnect_listening(s->reconnect, time_msec());
780 reconnect_connect_failed(s->reconnect, time_msec(), error);
786 jsonrpc_session_run(struct jsonrpc_session *s)
789 struct stream *stream;
792 error = pstream_accept(s->pstream, &stream);
794 if (s->rpc || s->stream) {
796 "%s: new connection replacing active connection",
797 reconnect_get_name(s->reconnect));
798 jsonrpc_session_disconnect(s);
800 reconnect_connected(s->reconnect, time_msec());
801 s->rpc = jsonrpc_open(stream);
802 } else if (error != EAGAIN) {
803 reconnect_listen_error(s->reconnect, time_msec(), error);
804 pstream_close(s->pstream);
813 error = jsonrpc_get_status(s->rpc);
815 reconnect_disconnected(s->reconnect, time_msec(), error);
816 jsonrpc_session_disconnect(s);
818 } else if (s->stream) {
821 stream_run(s->stream);
822 error = stream_connect(s->stream);
824 reconnect_connected(s->reconnect, time_msec());
825 s->rpc = jsonrpc_open(s->stream);
827 } else if (error != EAGAIN) {
828 reconnect_connect_failed(s->reconnect, time_msec(), error);
829 stream_close(s->stream);
834 switch (reconnect_run(s->reconnect, time_msec())) {
835 case RECONNECT_CONNECT:
836 jsonrpc_session_connect(s);
839 case RECONNECT_DISCONNECT:
840 reconnect_disconnected(s->reconnect, time_msec(), 0);
841 jsonrpc_session_disconnect(s);
844 case RECONNECT_PROBE:
847 struct jsonrpc_msg *request;
849 params = json_array_create_empty();
850 request = jsonrpc_create_request("echo", params, NULL);
851 json_destroy(request->id);
852 request->id = json_string_create("echo");
853 jsonrpc_send(s->rpc, request);
860 jsonrpc_session_wait(struct jsonrpc_session *s)
863 jsonrpc_wait(s->rpc);
864 } else if (s->stream) {
865 stream_run_wait(s->stream);
866 stream_connect_wait(s->stream);
869 pstream_wait(s->pstream);
871 reconnect_wait(s->reconnect, time_msec());
875 jsonrpc_session_get_backlog(const struct jsonrpc_session *s)
877 return s->rpc ? jsonrpc_get_backlog(s->rpc) : 0;
880 /* Always returns a pointer to a valid C string, assuming 's' was initialized
883 jsonrpc_session_get_name(const struct jsonrpc_session *s)
885 return reconnect_get_name(s->reconnect);
888 /* Always takes ownership of 'msg', regardless of success. */
890 jsonrpc_session_send(struct jsonrpc_session *s, struct jsonrpc_msg *msg)
893 return jsonrpc_send(s->rpc, msg);
895 jsonrpc_msg_destroy(msg);
901 jsonrpc_session_recv(struct jsonrpc_session *s)
904 struct jsonrpc_msg *msg;
905 jsonrpc_recv(s->rpc, &msg);
907 reconnect_received(s->reconnect, time_msec());
908 if (msg->type == JSONRPC_REQUEST && !strcmp(msg->method, "echo")) {
909 /* Echo request. Send reply. */
910 struct jsonrpc_msg *reply;
912 reply = jsonrpc_create_reply(json_clone(msg->params), msg->id);
913 jsonrpc_session_send(s, reply);
914 } else if (msg->type == JSONRPC_REPLY
915 && msg->id && msg->id->type == JSON_STRING
916 && !strcmp(msg->id->u.string, "echo")) {
917 /* It's a reply to our echo request. Suppress it. */
921 jsonrpc_msg_destroy(msg);
928 jsonrpc_session_recv_wait(struct jsonrpc_session *s)
931 jsonrpc_recv_wait(s->rpc);
936 jsonrpc_session_is_alive(const struct jsonrpc_session *s)
938 return s->rpc || s->stream || reconnect_get_max_tries(s->reconnect);
942 jsonrpc_session_is_connected(const struct jsonrpc_session *s)
944 return s->rpc != NULL;
948 jsonrpc_session_get_seqno(const struct jsonrpc_session *s)
954 jsonrpc_session_get_status(const struct jsonrpc_session *s)
956 return s && s->rpc ? jsonrpc_get_status(s->rpc) : 0;
960 jsonrpc_session_get_reconnect_stats(const struct jsonrpc_session *s,
961 struct reconnect_stats *stats)
963 reconnect_get_stats(s->reconnect, time_msec(), stats);
967 jsonrpc_session_force_reconnect(struct jsonrpc_session *s)
969 reconnect_force_reconnect(s->reconnect, time_msec());
973 jsonrpc_session_set_max_backoff(struct jsonrpc_session *s, int max_backoff)
975 reconnect_set_backoff(s->reconnect, 0, max_backoff);
979 jsonrpc_session_set_probe_interval(struct jsonrpc_session *s,
982 reconnect_set_probe_interval(s->reconnect, probe_interval);