2 * Copyright (c) 2009 Nicira Networks.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at:
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
24 #include "dynamic-string.h"
28 #include "poll-loop.h"
30 #include "reconnect.h"
34 #define THIS_MODULE VLM_jsonrpc
38 struct stream *stream;
44 struct json_parser *parser;
45 struct jsonrpc_msg *received;
48 struct ovs_queue output;
52 /* Rate limit for error messages. */
53 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(5, 5);
55 static void jsonrpc_received(struct jsonrpc *);
56 static void jsonrpc_cleanup(struct jsonrpc *);
59 jsonrpc_open(struct stream *stream)
63 assert(stream != NULL);
65 rpc = xzalloc(sizeof *rpc);
66 rpc->name = xstrdup(stream_get_name(stream));
68 byteq_init(&rpc->input);
69 queue_init(&rpc->output);
75 jsonrpc_close(struct jsonrpc *rpc)
85 jsonrpc_run(struct jsonrpc *rpc)
91 while (!queue_is_empty(&rpc->output)) {
92 struct ofpbuf *buf = rpc->output.head;
95 retval = stream_send(rpc->stream, buf->data, buf->size);
97 rpc->backlog -= retval;
98 ofpbuf_pull(buf, retval);
100 ofpbuf_delete(queue_pop_head(&rpc->output));
103 if (retval != -EAGAIN) {
104 VLOG_WARN_RL(&rl, "%s: send error: %s",
105 rpc->name, strerror(-retval));
106 jsonrpc_error(rpc, -retval);
114 jsonrpc_wait(struct jsonrpc *rpc)
116 if (!rpc->status && !queue_is_empty(&rpc->output)) {
117 stream_send_wait(rpc->stream);
122 jsonrpc_get_status(const struct jsonrpc *rpc)
128 jsonrpc_get_backlog(const struct jsonrpc *rpc)
130 return rpc->status ? 0 : rpc->backlog;
134 jsonrpc_get_name(const struct jsonrpc *rpc)
140 jsonrpc_log_msg(const struct jsonrpc *rpc, const char *title,
141 const struct jsonrpc_msg *msg)
143 if (VLOG_IS_DBG_ENABLED()) {
144 struct ds s = DS_EMPTY_INITIALIZER;
146 ds_put_format(&s, ", method=\"%s\"", msg->method);
149 ds_put_cstr(&s, ", params=");
150 ds_put_and_free_cstr(&s, json_to_string(msg->params, 0));
153 ds_put_cstr(&s, ", result=");
154 ds_put_and_free_cstr(&s, json_to_string(msg->result, 0));
157 ds_put_cstr(&s, ", error=");
158 ds_put_and_free_cstr(&s, json_to_string(msg->error, 0));
161 ds_put_cstr(&s, ", id=");
162 ds_put_and_free_cstr(&s, json_to_string(msg->id, 0));
164 VLOG_DBG("%s: %s %s%s", rpc->name, title,
165 jsonrpc_msg_type_to_string(msg->type), ds_cstr(&s));
171 jsonrpc_send(struct jsonrpc *rpc, struct jsonrpc_msg *msg)
179 jsonrpc_msg_destroy(msg);
183 jsonrpc_log_msg(rpc, "send", msg);
185 json = jsonrpc_msg_to_json(msg);
186 s = json_to_string(json, 0);
190 buf = xmalloc(sizeof *buf);
191 ofpbuf_use(buf, s, length);
193 queue_push_tail(&rpc->output, buf);
194 rpc->backlog += length;
196 if (rpc->output.n == 1) {
203 jsonrpc_recv(struct jsonrpc *rpc, struct jsonrpc_msg **msgp)
210 while (!rpc->received) {
211 if (byteq_is_empty(&rpc->input)) {
215 chunk = byteq_headroom(&rpc->input);
216 retval = stream_recv(rpc->stream, byteq_head(&rpc->input), chunk);
218 if (retval == -EAGAIN) {
221 VLOG_WARN_RL(&rl, "%s: receive error: %s",
222 rpc->name, strerror(-retval));
223 jsonrpc_error(rpc, -retval);
226 } else if (retval == 0) {
227 VLOG_INFO_RL(&rl, "%s: connection closed", rpc->name);
228 jsonrpc_error(rpc, EOF);
231 byteq_advance_head(&rpc->input, retval);
236 rpc->parser = json_parser_create(0);
238 n = byteq_tailroom(&rpc->input);
239 used = json_parser_feed(rpc->parser,
240 (char *) byteq_tail(&rpc->input), n);
241 byteq_advance_tail(&rpc->input, used);
242 if (json_parser_is_done(rpc->parser)) {
243 jsonrpc_received(rpc);
251 *msgp = rpc->received;
252 rpc->received = NULL;
257 jsonrpc_recv_wait(struct jsonrpc *rpc)
259 if (rpc->status || rpc->received || !byteq_is_empty(&rpc->input)) {
260 poll_immediate_wake();
262 stream_recv_wait(rpc->stream);
267 jsonrpc_send_block(struct jsonrpc *rpc, struct jsonrpc_msg *msg)
271 error = jsonrpc_send(rpc, msg);
276 while (!queue_is_empty(&rpc->output) && !rpc->status) {
285 jsonrpc_recv_block(struct jsonrpc *rpc, struct jsonrpc_msg **msgp)
288 int error = jsonrpc_recv(rpc, msgp);
289 if (error != EAGAIN) {
295 jsonrpc_recv_wait(rpc);
301 jsonrpc_transact_block(struct jsonrpc *rpc, struct jsonrpc_msg *request,
302 struct jsonrpc_msg **replyp)
304 struct jsonrpc_msg *reply = NULL;
308 id = json_clone(request->id);
309 error = jsonrpc_send_block(rpc, request);
312 error = jsonrpc_recv_block(rpc, &reply);
314 || (reply->type == JSONRPC_REPLY
315 && json_equal(id, reply->id))) {
318 jsonrpc_msg_destroy(reply);
321 *replyp = error ? NULL : reply;
327 jsonrpc_received(struct jsonrpc *rpc)
329 struct jsonrpc_msg *msg;
333 json = json_parser_finish(rpc->parser);
335 if (json->type == JSON_STRING) {
336 VLOG_WARN_RL(&rl, "%s: error parsing stream: %s",
337 rpc->name, json_string(json));
338 jsonrpc_error(rpc, EPROTO);
343 error = jsonrpc_msg_from_json(json, &msg);
345 VLOG_WARN_RL(&rl, "%s: received bad JSON-RPC message: %s",
348 jsonrpc_error(rpc, EPROTO);
352 jsonrpc_log_msg(rpc, "received", msg);
357 jsonrpc_error(struct jsonrpc *rpc, int error)
362 jsonrpc_cleanup(rpc);
367 jsonrpc_cleanup(struct jsonrpc *rpc)
369 stream_close(rpc->stream);
372 json_parser_abort(rpc->parser);
375 jsonrpc_msg_destroy(rpc->received);
376 rpc->received = NULL;
378 queue_clear(&rpc->output);
382 static struct jsonrpc_msg *
383 jsonrpc_create(enum jsonrpc_msg_type type, const char *method,
384 struct json *params, struct json *result, struct json *error,
387 struct jsonrpc_msg *msg = xmalloc(sizeof *msg);
389 msg->method = method ? xstrdup(method) : NULL;
390 msg->params = params;
391 msg->result = result;
398 jsonrpc_create_id(void)
400 static unsigned int id;
401 return json_integer_create(id++);
405 jsonrpc_create_request(const char *method, struct json *params,
408 struct json *id = jsonrpc_create_id();
410 *idp = json_clone(id);
412 return jsonrpc_create(JSONRPC_REQUEST, method, params, NULL, NULL, id);
416 jsonrpc_create_notify(const char *method, struct json *params)
418 return jsonrpc_create(JSONRPC_NOTIFY, method, params, NULL, NULL, NULL);
422 jsonrpc_create_reply(struct json *result, const struct json *id)
424 return jsonrpc_create(JSONRPC_REPLY, NULL, NULL, result, NULL,
429 jsonrpc_create_error(struct json *error, const struct json *id)
431 return jsonrpc_create(JSONRPC_REPLY, NULL, NULL, NULL, error,
436 jsonrpc_msg_type_to_string(enum jsonrpc_msg_type type)
439 case JSONRPC_REQUEST:
443 return "notification";
455 jsonrpc_msg_is_valid(const struct jsonrpc_msg *m)
457 const char *type_name;
458 unsigned int pattern;
460 if (m->params && m->params->type != JSON_ARRAY) {
461 return xstrdup("\"params\" must be JSON array");
465 case JSONRPC_REQUEST:
482 return xasprintf("invalid JSON-RPC message type %d", m->type);
485 type_name = jsonrpc_msg_type_to_string(m->type);
486 if ((m->method != NULL) != ((pattern & 0x10000) != 0)) {
487 return xasprintf("%s must%s have \"method\"",
488 type_name, (pattern & 0x10000) ? "" : " not");
491 if ((m->params != NULL) != ((pattern & 0x1000) != 0)) {
492 return xasprintf("%s must%s have \"params\"",
493 type_name, (pattern & 0x1000) ? "" : " not");
496 if ((m->result != NULL) != ((pattern & 0x100) != 0)) {
497 return xasprintf("%s must%s have \"result\"",
498 type_name, (pattern & 0x100) ? "" : " not");
501 if ((m->error != NULL) != ((pattern & 0x10) != 0)) {
502 return xasprintf("%s must%s have \"error\"",
503 type_name, (pattern & 0x10) ? "" : " not");
506 if ((m->id != NULL) != ((pattern & 0x1) != 0)) {
507 return xasprintf("%s must%s have \"id\"",
508 type_name, (pattern & 0x1) ? "" : " not");
515 jsonrpc_msg_destroy(struct jsonrpc_msg *m)
519 json_destroy(m->params);
520 json_destroy(m->result);
521 json_destroy(m->error);
528 null_from_json_null(struct json *json)
530 if (json && json->type == JSON_NULL) {
538 jsonrpc_msg_from_json(struct json *json, struct jsonrpc_msg **msgp)
540 struct json *method = NULL;
541 struct jsonrpc_msg *msg = NULL;
542 struct shash *object;
545 if (json->type != JSON_OBJECT) {
546 error = xstrdup("message is not a JSON object");
549 object = json_object(json);
551 method = shash_find_and_delete(object, "method");
552 if (method && method->type != JSON_STRING) {
553 error = xstrdup("method is not a JSON string");
557 msg = xzalloc(sizeof *msg);
558 msg->method = method ? xstrdup(method->u.string) : NULL;
559 msg->params = null_from_json_null(shash_find_and_delete(object, "params"));
560 msg->result = null_from_json_null(shash_find_and_delete(object, "result"));
561 msg->error = null_from_json_null(shash_find_and_delete(object, "error"));
562 msg->id = null_from_json_null(shash_find_and_delete(object, "id"));
563 msg->type = (msg->result ? JSONRPC_REPLY
564 : msg->error ? JSONRPC_ERROR
565 : msg->id ? JSONRPC_REQUEST
567 if (!shash_is_empty(object)) {
568 error = xasprintf("message has unexpected member \"%s\"",
569 shash_first(object)->name);
572 error = jsonrpc_msg_is_valid(msg);
578 json_destroy(method);
581 jsonrpc_msg_destroy(msg);
589 jsonrpc_msg_to_json(struct jsonrpc_msg *m)
591 struct json *json = json_object_create();
594 json_object_put(json, "method", json_string_create_nocopy(m->method));
598 json_object_put(json, "params", m->params);
602 json_object_put(json, "result", m->result);
603 } else if (m->type == JSONRPC_ERROR) {
604 json_object_put(json, "result", json_null_create());
608 json_object_put(json, "error", m->error);
609 } else if (m->type == JSONRPC_REPLY) {
610 json_object_put(json, "error", json_null_create());
614 json_object_put(json, "id", m->id);
615 } else if (m->type == JSONRPC_NOTIFY) {
616 json_object_put(json, "id", json_null_create());
624 /* A JSON-RPC session with reconnection. */
626 struct jsonrpc_session {
627 struct reconnect *reconnect;
629 struct stream *stream;
633 /* Creates and returns a jsonrpc_session that connects and reconnects, with
634 * back-off, to 'name', which should be a string acceptable to
636 struct jsonrpc_session *
637 jsonrpc_session_open(const char *name)
639 struct jsonrpc_session *s;
641 s = xmalloc(sizeof *s);
642 s->reconnect = reconnect_create(time_msec());
643 reconnect_set_name(s->reconnect, name);
644 reconnect_enable(s->reconnect, time_msec());
652 /* Creates and returns a jsonrpc_session that is initially connected to
653 * 'jsonrpc'. If the connection is dropped, it will not be reconnected. */
654 struct jsonrpc_session *
655 jsonrpc_session_open_unreliably(struct jsonrpc *jsonrpc)
657 struct jsonrpc_session *s;
659 s = xmalloc(sizeof *s);
660 s->reconnect = reconnect_create(time_msec());
661 reconnect_set_name(s->reconnect, jsonrpc_get_name(jsonrpc));
662 reconnect_set_max_tries(s->reconnect, 0);
663 reconnect_connected(s->reconnect, time_msec());
672 jsonrpc_session_close(struct jsonrpc_session *s)
675 jsonrpc_close(s->rpc);
676 reconnect_destroy(s->reconnect);
682 jsonrpc_session_disconnect(struct jsonrpc_session *s)
684 reconnect_disconnected(s->reconnect, time_msec(), 0);
686 jsonrpc_error(s->rpc, EOF);
687 jsonrpc_close(s->rpc);
690 } else if (s->stream) {
691 stream_close(s->stream);
698 jsonrpc_session_connect(struct jsonrpc_session *s)
702 jsonrpc_session_disconnect(s);
703 error = stream_open(reconnect_get_name(s->reconnect), &s->stream);
705 reconnect_connect_failed(s->reconnect, time_msec(), error);
707 reconnect_connecting(s->reconnect, time_msec());
713 jsonrpc_session_run(struct jsonrpc_session *s)
719 error = jsonrpc_get_status(s->rpc);
721 jsonrpc_session_disconnect(s);
723 } else if (s->stream) {
724 int error = stream_connect(s->stream);
726 reconnect_connected(s->reconnect, time_msec());
727 s->rpc = jsonrpc_open(s->stream);
729 } else if (error != EAGAIN) {
730 reconnect_connect_failed(s->reconnect, time_msec(), error);
731 stream_close(s->stream);
736 switch (reconnect_run(s->reconnect, time_msec())) {
737 case RECONNECT_CONNECT:
738 jsonrpc_session_connect(s);
741 case RECONNECT_DISCONNECT:
742 jsonrpc_session_disconnect(s);
745 case RECONNECT_PROBE:
748 struct jsonrpc_msg *request;
750 params = json_array_create_empty();
751 request = jsonrpc_create_request("echo", params, NULL);
752 json_destroy(request->id);
753 request->id = json_string_create("echo");
754 jsonrpc_send(s->rpc, request);
761 jsonrpc_session_wait(struct jsonrpc_session *s)
764 jsonrpc_wait(s->rpc);
765 } else if (s->stream) {
766 stream_connect_wait(s->stream);
768 reconnect_wait(s->reconnect, time_msec());
772 jsonrpc_session_get_backlog(const struct jsonrpc_session *s)
774 return s->rpc ? jsonrpc_get_backlog(s->rpc) : 0;
778 jsonrpc_session_get_name(const struct jsonrpc_session *s)
780 return reconnect_get_name(s->reconnect);
784 jsonrpc_session_send(struct jsonrpc_session *s, struct jsonrpc_msg *msg)
786 return s->rpc ? jsonrpc_send(s->rpc, msg) : ENOTCONN;
790 jsonrpc_session_recv(struct jsonrpc_session *s)
793 struct jsonrpc_msg *msg;
794 jsonrpc_recv(s->rpc, &msg);
796 reconnect_received(s->reconnect, time_msec());
797 if (msg->type == JSONRPC_REQUEST && !strcmp(msg->method, "echo")) {
798 /* Echo request. Send reply. */
799 struct jsonrpc_msg *reply;
801 reply = jsonrpc_create_reply(json_clone(msg->params), msg->id);
802 jsonrpc_session_send(s, reply);
803 } else if (msg->type == JSONRPC_REPLY
804 && msg->id && msg->id->type == JSON_STRING
805 && !strcmp(msg->id->u.string, "echo")) {
806 /* It's a reply to our echo request. Suppress it. */
810 jsonrpc_msg_destroy(msg);
817 jsonrpc_session_recv_wait(struct jsonrpc_session *s)
820 jsonrpc_recv_wait(s->rpc);
825 jsonrpc_session_is_alive(const struct jsonrpc_session *s)
827 return s->rpc || s->stream || reconnect_get_max_tries(s->reconnect);
831 jsonrpc_session_is_connected(const struct jsonrpc_session *s)
833 return s->rpc != NULL;
837 jsonrpc_session_get_seqno(const struct jsonrpc_session *s)
843 jsonrpc_session_force_reconnect(struct jsonrpc_session *s)
845 reconnect_force_reconnect(s->reconnect, time_msec());