ovsdb-idl: fix compile warning of lib/ovsdb-idl.c
[openvswitch] / lib / jsonrpc.c
1 /*
2  * Copyright (c) 2009, 2010 Nicira Networks.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at:
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #include <config.h>
18
19 #include "jsonrpc.h"
20
21 #include <assert.h>
22 #include <errno.h>
23
24 #include "byteq.h"
25 #include "dynamic-string.h"
26 #include "fatal-signal.h"
27 #include "json.h"
28 #include "list.h"
29 #include "ofpbuf.h"
30 #include "poll-loop.h"
31 #include "queue.h"
32 #include "reconnect.h"
33 #include "stream.h"
34 #include "timeval.h"
35
36 #define THIS_MODULE VLM_jsonrpc
37 #include "vlog.h"
38 \f
39 struct jsonrpc {
40     struct stream *stream;
41     char *name;
42     int status;
43
44     /* Input. */
45     struct byteq input;
46     struct json_parser *parser;
47     struct jsonrpc_msg *received;
48
49     /* Output. */
50     struct ovs_queue output;
51     size_t backlog;
52 };
53
54 /* Rate limit for error messages. */
55 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(5, 5);
56
57 static void jsonrpc_received(struct jsonrpc *);
58 static void jsonrpc_cleanup(struct jsonrpc *);
59
60 /* This is just the same as stream_open() except that it uses the default
61  * JSONRPC ports if none is specified. */
62 int
63 jsonrpc_stream_open(const char *name, struct stream **streamp)
64 {
65     return stream_open_with_default_ports(name, JSONRPC_TCP_PORT,
66                                           JSONRPC_SSL_PORT, streamp);
67 }
68
69 /* This is just the same as pstream_open() except that it uses the default
70  * JSONRPC ports if none is specified. */
71 int
72 jsonrpc_pstream_open(const char *name, struct pstream **pstreamp)
73 {
74     return pstream_open_with_default_ports(name, JSONRPC_TCP_PORT,
75                                            JSONRPC_SSL_PORT, pstreamp);
76 }
77
78 struct jsonrpc *
79 jsonrpc_open(struct stream *stream)
80 {
81     struct jsonrpc *rpc;
82
83     assert(stream != NULL);
84
85     rpc = xzalloc(sizeof *rpc);
86     rpc->name = xstrdup(stream_get_name(stream));
87     rpc->stream = stream;
88     byteq_init(&rpc->input);
89     queue_init(&rpc->output);
90
91     return rpc;
92 }
93
94 void
95 jsonrpc_close(struct jsonrpc *rpc)
96 {
97     if (rpc) {
98         jsonrpc_cleanup(rpc);
99         free(rpc->name);
100         free(rpc);
101     }
102 }
103
104 void
105 jsonrpc_run(struct jsonrpc *rpc)
106 {
107     if (rpc->status) {
108         return;
109     }
110
111     stream_run(rpc->stream);
112     while (!queue_is_empty(&rpc->output)) {
113         struct ofpbuf *buf = rpc->output.head;
114         int retval;
115
116         retval = stream_send(rpc->stream, buf->data, buf->size);
117         if (retval >= 0) {
118             rpc->backlog -= retval;
119             ofpbuf_pull(buf, retval);
120             if (!buf->size) {
121                 ofpbuf_delete(queue_pop_head(&rpc->output));
122             }
123         } else {
124             if (retval != -EAGAIN) {
125                 VLOG_WARN_RL(&rl, "%s: send error: %s",
126                              rpc->name, strerror(-retval));
127                 jsonrpc_error(rpc, -retval);
128             }
129             break;
130         }
131     }
132 }
133
134 void
135 jsonrpc_wait(struct jsonrpc *rpc)
136 {
137     if (!rpc->status) {
138         stream_run_wait(rpc->stream);
139         if (!queue_is_empty(&rpc->output)) {
140             stream_send_wait(rpc->stream);
141         }
142     }
143 }
144
145 int
146 jsonrpc_get_status(const struct jsonrpc *rpc)
147 {
148     return rpc->status;
149 }
150
151 size_t
152 jsonrpc_get_backlog(const struct jsonrpc *rpc)
153 {
154     return rpc->status ? 0 : rpc->backlog;
155 }
156
157 const char *
158 jsonrpc_get_name(const struct jsonrpc *rpc)
159 {
160     return rpc->name;
161 }
162
163 static void
164 jsonrpc_log_msg(const struct jsonrpc *rpc, const char *title,
165                 const struct jsonrpc_msg *msg)
166 {
167     if (VLOG_IS_DBG_ENABLED()) {
168         struct ds s = DS_EMPTY_INITIALIZER;
169         if (msg->method) {
170             ds_put_format(&s, ", method=\"%s\"", msg->method);
171         }
172         if (msg->params) {
173             ds_put_cstr(&s, ", params=");
174             json_to_ds(msg->params, 0, &s);
175         }
176         if (msg->result) {
177             ds_put_cstr(&s, ", result=");
178             json_to_ds(msg->result, 0, &s);
179         }
180         if (msg->error) {
181             ds_put_cstr(&s, ", error=");
182             json_to_ds(msg->error, 0, &s);
183         }
184         if (msg->id) {
185             ds_put_cstr(&s, ", id=");
186             json_to_ds(msg->id, 0, &s);
187         }
188         VLOG_DBG("%s: %s %s%s", rpc->name, title,
189                  jsonrpc_msg_type_to_string(msg->type), ds_cstr(&s));
190         ds_destroy(&s);
191     }
192 }
193
194 /* Always takes ownership of 'msg', regardless of success. */
195 int
196 jsonrpc_send(struct jsonrpc *rpc, struct jsonrpc_msg *msg)
197 {
198     struct ofpbuf *buf;
199     struct json *json;
200     size_t length;
201     char *s;
202
203     if (rpc->status) {
204         jsonrpc_msg_destroy(msg);
205         return rpc->status;
206     }
207
208     jsonrpc_log_msg(rpc, "send", msg);
209
210     json = jsonrpc_msg_to_json(msg);
211     s = json_to_string(json, 0);
212     length = strlen(s);
213     json_destroy(json);
214
215     buf = xmalloc(sizeof *buf);
216     ofpbuf_use(buf, s, length);
217     buf->size = length;
218     queue_push_tail(&rpc->output, buf);
219     rpc->backlog += length;
220
221     if (rpc->output.n == 1) {
222         jsonrpc_run(rpc);
223     }
224     return rpc->status;
225 }
226
227 int
228 jsonrpc_recv(struct jsonrpc *rpc, struct jsonrpc_msg **msgp)
229 {
230     *msgp = NULL;
231     if (rpc->status) {
232         return rpc->status;
233     }
234
235     while (!rpc->received) {
236         if (byteq_is_empty(&rpc->input)) {
237             size_t chunk;
238             int retval;
239
240             chunk = byteq_headroom(&rpc->input);
241             retval = stream_recv(rpc->stream, byteq_head(&rpc->input), chunk);
242             if (retval < 0) {
243                 if (retval == -EAGAIN) {
244                     return EAGAIN;
245                 } else {
246                     VLOG_WARN_RL(&rl, "%s: receive error: %s",
247                                  rpc->name, strerror(-retval));
248                     jsonrpc_error(rpc, -retval);
249                     return rpc->status;
250                 }
251             } else if (retval == 0) {
252                 VLOG_INFO_RL(&rl, "%s: connection closed", rpc->name);
253                 jsonrpc_error(rpc, EOF);
254                 return EOF;
255             }
256             byteq_advance_head(&rpc->input, retval);
257         } else {
258             size_t n, used;
259
260             if (!rpc->parser) {
261                 rpc->parser = json_parser_create(0);
262             }
263             n = byteq_tailroom(&rpc->input);
264             used = json_parser_feed(rpc->parser,
265                                     (char *) byteq_tail(&rpc->input), n);
266             byteq_advance_tail(&rpc->input, used);
267             if (json_parser_is_done(rpc->parser)) {
268                 jsonrpc_received(rpc);
269                 if (rpc->status) {
270                     const struct byteq *q = &rpc->input;
271                     if (q->head <= BYTEQ_SIZE) {
272                         stream_report_content(q->buffer, q->head,
273                                               STREAM_JSONRPC,
274                                               THIS_MODULE, rpc->name);
275                     }
276                     return rpc->status;
277                 }
278             }
279         }
280     }
281
282     *msgp = rpc->received;
283     rpc->received = NULL;
284     return 0;
285 }
286
287 void
288 jsonrpc_recv_wait(struct jsonrpc *rpc)
289 {
290     if (rpc->status || rpc->received || !byteq_is_empty(&rpc->input)) {
291         poll_immediate_wake();
292     } else {
293         stream_recv_wait(rpc->stream);
294     }
295 }
296
297 /* Always takes ownership of 'msg', regardless of success. */
298 int
299 jsonrpc_send_block(struct jsonrpc *rpc, struct jsonrpc_msg *msg)
300 {
301     int error;
302
303     fatal_signal_run();
304
305     error = jsonrpc_send(rpc, msg);
306     if (error) {
307         return error;
308     }
309
310     for (;;) {
311         jsonrpc_run(rpc);
312         if (queue_is_empty(&rpc->output) || rpc->status) {
313             return rpc->status;
314         }
315         jsonrpc_wait(rpc);
316         poll_block();
317     }
318 }
319
320 int
321 jsonrpc_recv_block(struct jsonrpc *rpc, struct jsonrpc_msg **msgp)
322 {
323     for (;;) {
324         int error = jsonrpc_recv(rpc, msgp);
325         if (error != EAGAIN) {
326             fatal_signal_run();
327             return error;
328         }
329
330         jsonrpc_run(rpc);
331         jsonrpc_wait(rpc);
332         jsonrpc_recv_wait(rpc);
333         poll_block();
334     }
335 }
336
337 /* Always takes ownership of 'request', regardless of success. */
338 int
339 jsonrpc_transact_block(struct jsonrpc *rpc, struct jsonrpc_msg *request,
340                        struct jsonrpc_msg **replyp)
341 {
342     struct jsonrpc_msg *reply = NULL;
343     struct json *id;
344     int error;
345
346     id = json_clone(request->id);
347     error = jsonrpc_send_block(rpc, request);
348     if (!error) {
349         for (;;) {
350             error = jsonrpc_recv_block(rpc, &reply);
351             if (error
352                 || (reply->type == JSONRPC_REPLY
353                     && json_equal(id, reply->id))) {
354                 break;
355             }
356             jsonrpc_msg_destroy(reply);
357         }
358     }
359     *replyp = error ? NULL : reply;
360     json_destroy(id);
361     return error;
362 }
363
364 static void
365 jsonrpc_received(struct jsonrpc *rpc)
366 {
367     struct jsonrpc_msg *msg;
368     struct json *json;
369     char *error;
370
371     json = json_parser_finish(rpc->parser);
372     rpc->parser = NULL;
373     if (json->type == JSON_STRING) {
374         VLOG_WARN_RL(&rl, "%s: error parsing stream: %s",
375                      rpc->name, json_string(json));
376         jsonrpc_error(rpc, EPROTO);
377         json_destroy(json);
378         return;
379     }
380
381     error = jsonrpc_msg_from_json(json, &msg);
382     if (error) {
383         VLOG_WARN_RL(&rl, "%s: received bad JSON-RPC message: %s",
384                      rpc->name, error);
385         free(error);
386         jsonrpc_error(rpc, EPROTO);
387         return;
388     }
389
390     jsonrpc_log_msg(rpc, "received", msg);
391     rpc->received = msg;
392 }
393
394 void
395 jsonrpc_error(struct jsonrpc *rpc, int error)
396 {
397     assert(error);
398     if (!rpc->status) {
399         rpc->status = error;
400         jsonrpc_cleanup(rpc);
401     }
402 }
403
404 static void
405 jsonrpc_cleanup(struct jsonrpc *rpc)
406 {
407     stream_close(rpc->stream);
408     rpc->stream = NULL;
409
410     json_parser_abort(rpc->parser);
411     rpc->parser = NULL;
412
413     jsonrpc_msg_destroy(rpc->received);
414     rpc->received = NULL;
415
416     queue_clear(&rpc->output);
417     rpc->backlog = 0;
418 }
419 \f
420 static struct jsonrpc_msg *
421 jsonrpc_create(enum jsonrpc_msg_type type, const char *method,
422                 struct json *params, struct json *result, struct json *error,
423                 struct json *id)
424 {
425     struct jsonrpc_msg *msg = xmalloc(sizeof *msg);
426     msg->type = type;
427     msg->method = method ? xstrdup(method) : NULL;
428     msg->params = params;
429     msg->result = result;
430     msg->error = error;
431     msg->id = id;
432     return msg;
433 }
434
435 static struct json *
436 jsonrpc_create_id(void)
437 {
438     static unsigned int id;
439     return json_integer_create(id++);
440 }
441
442 struct jsonrpc_msg *
443 jsonrpc_create_request(const char *method, struct json *params,
444                        struct json **idp)
445 {
446     struct json *id = jsonrpc_create_id();
447     if (idp) {
448         *idp = json_clone(id);
449     }
450     return jsonrpc_create(JSONRPC_REQUEST, method, params, NULL, NULL, id);
451 }
452
453 struct jsonrpc_msg *
454 jsonrpc_create_notify(const char *method, struct json *params)
455 {
456     return jsonrpc_create(JSONRPC_NOTIFY, method, params, NULL, NULL, NULL);
457 }
458
459 struct jsonrpc_msg *
460 jsonrpc_create_reply(struct json *result, const struct json *id)
461 {
462     return jsonrpc_create(JSONRPC_REPLY, NULL, NULL, result, NULL,
463                            json_clone(id));
464 }
465
466 struct jsonrpc_msg *
467 jsonrpc_create_error(struct json *error, const struct json *id)
468 {
469     return jsonrpc_create(JSONRPC_REPLY, NULL, NULL, NULL, error,
470                            json_clone(id));
471 }
472
473 const char *
474 jsonrpc_msg_type_to_string(enum jsonrpc_msg_type type)
475 {
476     switch (type) {
477     case JSONRPC_REQUEST:
478         return "request";
479
480     case JSONRPC_NOTIFY:
481         return "notification";
482
483     case JSONRPC_REPLY:
484         return "reply";
485
486     case JSONRPC_ERROR:
487         return "error";
488     }
489     return "(null)";
490 }
491
492 char *
493 jsonrpc_msg_is_valid(const struct jsonrpc_msg *m)
494 {
495     const char *type_name;
496     unsigned int pattern;
497
498     if (m->params && m->params->type != JSON_ARRAY) {
499         return xstrdup("\"params\" must be JSON array");
500     }
501
502     switch (m->type) {
503     case JSONRPC_REQUEST:
504         pattern = 0x11001;
505         break;
506
507     case JSONRPC_NOTIFY:
508         pattern = 0x11000;
509         break;
510
511     case JSONRPC_REPLY:
512         pattern = 0x00101;
513         break;
514
515     case JSONRPC_ERROR:
516         pattern = 0x00011;
517         break;
518
519     default:
520         return xasprintf("invalid JSON-RPC message type %d", m->type);
521     }
522
523     type_name = jsonrpc_msg_type_to_string(m->type);
524     if ((m->method != NULL) != ((pattern & 0x10000) != 0)) {
525         return xasprintf("%s must%s have \"method\"",
526                          type_name, (pattern & 0x10000) ? "" : " not");
527
528     }
529     if ((m->params != NULL) != ((pattern & 0x1000) != 0)) {
530         return xasprintf("%s must%s have \"params\"",
531                          type_name, (pattern & 0x1000) ? "" : " not");
532
533     }
534     if ((m->result != NULL) != ((pattern & 0x100) != 0)) {
535         return xasprintf("%s must%s have \"result\"",
536                          type_name, (pattern & 0x100) ? "" : " not");
537
538     }
539     if ((m->error != NULL) != ((pattern & 0x10) != 0)) {
540         return xasprintf("%s must%s have \"error\"",
541                          type_name, (pattern & 0x10) ? "" : " not");
542
543     }
544     if ((m->id != NULL) != ((pattern & 0x1) != 0)) {
545         return xasprintf("%s must%s have \"id\"",
546                          type_name, (pattern & 0x1) ? "" : " not");
547
548     }
549     return NULL;
550 }
551
552 void
553 jsonrpc_msg_destroy(struct jsonrpc_msg *m)
554 {
555     if (m) {
556         free(m->method);
557         json_destroy(m->params);
558         json_destroy(m->result);
559         json_destroy(m->error);
560         json_destroy(m->id);
561         free(m);
562     }
563 }
564
565 static struct json *
566 null_from_json_null(struct json *json)
567 {
568     if (json && json->type == JSON_NULL) {
569         json_destroy(json);
570         return NULL;
571     }
572     return json;
573 }
574
575 char *
576 jsonrpc_msg_from_json(struct json *json, struct jsonrpc_msg **msgp)
577 {
578     struct json *method = NULL;
579     struct jsonrpc_msg *msg = NULL;
580     struct shash *object;
581     char *error;
582
583     if (json->type != JSON_OBJECT) {
584         error = xstrdup("message is not a JSON object");
585         goto exit;
586     }
587     object = json_object(json);
588
589     method = shash_find_and_delete(object, "method");
590     if (method && method->type != JSON_STRING) {
591         error = xstrdup("method is not a JSON string");
592         goto exit;
593     }
594
595     msg = xzalloc(sizeof *msg);
596     msg->method = method ? xstrdup(method->u.string) : NULL;
597     msg->params = null_from_json_null(shash_find_and_delete(object, "params"));
598     msg->result = null_from_json_null(shash_find_and_delete(object, "result"));
599     msg->error = null_from_json_null(shash_find_and_delete(object, "error"));
600     msg->id = null_from_json_null(shash_find_and_delete(object, "id"));
601     msg->type = (msg->result ? JSONRPC_REPLY
602                  : msg->error ? JSONRPC_ERROR
603                  : msg->id ? JSONRPC_REQUEST
604                  : JSONRPC_NOTIFY);
605     if (!shash_is_empty(object)) {
606         error = xasprintf("message has unexpected member \"%s\"",
607                           shash_first(object)->name);
608         goto exit;
609     }
610     error = jsonrpc_msg_is_valid(msg);
611     if (error) {
612         goto exit;
613     }
614
615 exit:
616     json_destroy(method);
617     json_destroy(json);
618     if (error) {
619         jsonrpc_msg_destroy(msg);
620         msg = NULL;
621     }
622     *msgp = msg;
623     return error;
624 }
625
626 struct json *
627 jsonrpc_msg_to_json(struct jsonrpc_msg *m)
628 {
629     struct json *json = json_object_create();
630
631     if (m->method) {
632         json_object_put(json, "method", json_string_create_nocopy(m->method));
633     }
634
635     if (m->params) {
636         json_object_put(json, "params", m->params);
637     }
638
639     if (m->result) {
640         json_object_put(json, "result", m->result);
641     } else if (m->type == JSONRPC_ERROR) {
642         json_object_put(json, "result", json_null_create());
643     }
644
645     if (m->error) {
646         json_object_put(json, "error", m->error);
647     } else if (m->type == JSONRPC_REPLY) {
648         json_object_put(json, "error", json_null_create());
649     }
650
651     if (m->id) {
652         json_object_put(json, "id", m->id);
653     } else if (m->type == JSONRPC_NOTIFY) {
654         json_object_put(json, "id", json_null_create());
655     }
656
657     free(m);
658
659     return json;
660 }
661 \f
662 /* A JSON-RPC session with reconnection. */
663
664 struct jsonrpc_session {
665     struct reconnect *reconnect;
666     struct jsonrpc *rpc;
667     struct stream *stream;
668     struct pstream *pstream;
669     unsigned int seqno;
670 };
671
672 /* Creates and returns a jsonrpc_session to 'name', which should be a string
673  * acceptable to stream_open() or pstream_open().
674  *
675  * If 'name' is an active connection method, e.g. "tcp:127.1.2.3", the new
676  * jsonrpc_session connects and reconnects, with back-off, to 'name'.
677  *
678  * If 'name' is a passive connection method, e.g. "ptcp:", the new
679  * jsonrpc_session listens for connections to 'name'.  It maintains at most one
680  * connection at any given time.  Any new connection causes the previous one
681  * (if any) to be dropped. */
682 struct jsonrpc_session *
683 jsonrpc_session_open(const char *name)
684 {
685     struct jsonrpc_session *s;
686
687     s = xmalloc(sizeof *s);
688     s->reconnect = reconnect_create(time_msec());
689     reconnect_set_name(s->reconnect, name);
690     reconnect_enable(s->reconnect, time_msec());
691     s->rpc = NULL;
692     s->stream = NULL;
693     s->pstream = NULL;
694     s->seqno = 0;
695
696     if (!pstream_verify_name(name)) {
697         reconnect_set_passive(s->reconnect, true, time_msec());
698     }
699
700     return s;
701 }
702
703 /* Creates and returns a jsonrpc_session that is initially connected to
704  * 'jsonrpc'.  If the connection is dropped, it will not be reconnected. */
705 struct jsonrpc_session *
706 jsonrpc_session_open_unreliably(struct jsonrpc *jsonrpc)
707 {
708     struct jsonrpc_session *s;
709
710     s = xmalloc(sizeof *s);
711     s->reconnect = reconnect_create(time_msec());
712     reconnect_set_name(s->reconnect, jsonrpc_get_name(jsonrpc));
713     reconnect_set_max_tries(s->reconnect, 0);
714     reconnect_connected(s->reconnect, time_msec());
715     s->rpc = jsonrpc;
716     s->stream = NULL;
717     s->pstream = NULL;
718     s->seqno = 0;
719
720     return s;
721 }
722
723 void
724 jsonrpc_session_close(struct jsonrpc_session *s)
725 {
726     if (s) {
727         jsonrpc_close(s->rpc);
728         reconnect_destroy(s->reconnect);
729         stream_close(s->stream);
730         pstream_close(s->pstream);
731         free(s);
732     }
733 }
734
735 static void
736 jsonrpc_session_disconnect(struct jsonrpc_session *s)
737 {
738     if (s->rpc) {
739         jsonrpc_error(s->rpc, EOF);
740         jsonrpc_close(s->rpc);
741         s->rpc = NULL;
742         s->seqno++;
743     } else if (s->stream) {
744         stream_close(s->stream);
745         s->stream = NULL;
746         s->seqno++;
747     }
748 }
749
750 static void
751 jsonrpc_session_connect(struct jsonrpc_session *s)
752 {
753     const char *name = reconnect_get_name(s->reconnect);
754     int error;
755
756     jsonrpc_session_disconnect(s);
757     if (!reconnect_is_passive(s->reconnect)) {
758         error = jsonrpc_stream_open(name, &s->stream);
759         if (!error) {
760             reconnect_connecting(s->reconnect, time_msec());
761         }
762     } else {
763         error = s->pstream ? 0 : jsonrpc_pstream_open(name, &s->pstream);
764         if (!error) {
765             reconnect_listening(s->reconnect, time_msec());
766         }
767     }
768
769     if (error) {
770         reconnect_connect_failed(s->reconnect, time_msec(), error);
771     }
772     s->seqno++;
773 }
774
775 void
776 jsonrpc_session_run(struct jsonrpc_session *s)
777 {
778     if (s->pstream) {
779         struct stream *stream;
780         int error;
781
782         error = pstream_accept(s->pstream, &stream);
783         if (!error) {
784             if (s->rpc || s->stream) {
785                 VLOG_INFO_RL(&rl,
786                              "%s: new connection replacing active connection",
787                              reconnect_get_name(s->reconnect));
788                 jsonrpc_session_disconnect(s);
789             }
790             reconnect_connected(s->reconnect, time_msec());
791             s->rpc = jsonrpc_open(stream);
792         } else if (error != EAGAIN) {
793             reconnect_listen_error(s->reconnect, time_msec(), error);
794             pstream_close(s->pstream);
795             s->pstream = NULL;
796         }
797     }
798
799     if (s->rpc) {
800         int error;
801
802         jsonrpc_run(s->rpc);
803         error = jsonrpc_get_status(s->rpc);
804         if (error) {
805             reconnect_disconnected(s->reconnect, time_msec(), 0);
806             jsonrpc_session_disconnect(s);
807         }
808     } else if (s->stream) {
809         int error;
810
811         stream_run(s->stream);
812         error = stream_connect(s->stream);
813         if (!error) {
814             reconnect_connected(s->reconnect, time_msec());
815             s->rpc = jsonrpc_open(s->stream);
816             s->stream = NULL;
817         } else if (error != EAGAIN) {
818             reconnect_connect_failed(s->reconnect, time_msec(), error);
819             stream_close(s->stream);
820             s->stream = NULL;
821         }
822     }
823
824     switch (reconnect_run(s->reconnect, time_msec())) {
825     case RECONNECT_CONNECT:
826         jsonrpc_session_connect(s);
827         break;
828
829     case RECONNECT_DISCONNECT:
830         reconnect_disconnected(s->reconnect, time_msec(), 0);
831         jsonrpc_session_disconnect(s);
832         break;
833
834     case RECONNECT_PROBE:
835         if (s->rpc) {
836             struct json *params;
837             struct jsonrpc_msg *request;
838
839             params = json_array_create_empty();
840             request = jsonrpc_create_request("echo", params, NULL);
841             json_destroy(request->id);
842             request->id = json_string_create("echo");
843             jsonrpc_send(s->rpc, request);
844         }
845         break;
846     }
847 }
848
849 void
850 jsonrpc_session_wait(struct jsonrpc_session *s)
851 {
852     if (s->rpc) {
853         jsonrpc_wait(s->rpc);
854     } else if (s->stream) {
855         stream_run_wait(s->stream);
856         stream_connect_wait(s->stream);
857     }
858     if (s->pstream) {
859         pstream_wait(s->pstream);
860     }
861     reconnect_wait(s->reconnect, time_msec());
862 }
863
864 size_t
865 jsonrpc_session_get_backlog(const struct jsonrpc_session *s)
866 {
867     return s->rpc ? jsonrpc_get_backlog(s->rpc) : 0;
868 }
869
870 const char *
871 jsonrpc_session_get_name(const struct jsonrpc_session *s)
872 {
873     return reconnect_get_name(s->reconnect);
874 }
875
876 /* Always takes ownership of 'msg', regardless of success. */
877 int
878 jsonrpc_session_send(struct jsonrpc_session *s, struct jsonrpc_msg *msg)
879 {
880     if (s->rpc) {
881         return jsonrpc_send(s->rpc, msg);
882     } else {
883         jsonrpc_msg_destroy(msg);
884         return ENOTCONN;
885     }
886 }
887
888 struct jsonrpc_msg *
889 jsonrpc_session_recv(struct jsonrpc_session *s)
890 {
891     if (s->rpc) {
892         struct jsonrpc_msg *msg;
893         jsonrpc_recv(s->rpc, &msg);
894         if (msg) {
895             reconnect_received(s->reconnect, time_msec());
896             if (msg->type == JSONRPC_REQUEST && !strcmp(msg->method, "echo")) {
897                 /* Echo request.  Send reply. */
898                 struct jsonrpc_msg *reply;
899
900                 reply = jsonrpc_create_reply(json_clone(msg->params), msg->id);
901                 jsonrpc_session_send(s, reply);
902             } else if (msg->type == JSONRPC_REPLY
903                 && msg->id && msg->id->type == JSON_STRING
904                 && !strcmp(msg->id->u.string, "echo")) {
905                 /* It's a reply to our echo request.  Suppress it. */
906             } else {
907                 return msg;
908             }
909             jsonrpc_msg_destroy(msg);
910         }
911     }
912     return NULL;
913 }
914
915 void
916 jsonrpc_session_recv_wait(struct jsonrpc_session *s)
917 {
918     if (s->rpc) {
919         jsonrpc_recv_wait(s->rpc);
920     }
921 }
922
923 bool
924 jsonrpc_session_is_alive(const struct jsonrpc_session *s)
925 {
926     return s->rpc || s->stream || reconnect_get_max_tries(s->reconnect);
927 }
928
929 bool
930 jsonrpc_session_is_connected(const struct jsonrpc_session *s)
931 {
932     return s->rpc != NULL;
933 }
934
935 unsigned int
936 jsonrpc_session_get_seqno(const struct jsonrpc_session *s)
937 {
938     return s->seqno;
939 }
940
941 void
942 jsonrpc_session_force_reconnect(struct jsonrpc_session *s)
943 {
944     reconnect_force_reconnect(s->reconnect, time_msec());
945 }