ofpbuf: New function ofpbuf_push_zeros().
[openvswitch] / lib / jsonrpc.c
1 /*
2  * Copyright (c) 2009, 2010 Nicira Networks.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at:
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #include <config.h>
18
19 #include "jsonrpc.h"
20
21 #include <assert.h>
22 #include <errno.h>
23
24 #include "byteq.h"
25 #include "dynamic-string.h"
26 #include "fatal-signal.h"
27 #include "json.h"
28 #include "list.h"
29 #include "ofpbuf.h"
30 #include "poll-loop.h"
31 #include "queue.h"
32 #include "reconnect.h"
33 #include "stream.h"
34 #include "timeval.h"
35
36 #define THIS_MODULE VLM_jsonrpc
37 #include "vlog.h"
38 \f
39 struct jsonrpc {
40     struct stream *stream;
41     char *name;
42     int status;
43
44     /* Input. */
45     struct byteq input;
46     struct json_parser *parser;
47     struct jsonrpc_msg *received;
48
49     /* Output. */
50     struct ovs_queue output;
51     size_t backlog;
52 };
53
54 /* Rate limit for error messages. */
55 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(5, 5);
56
57 static void jsonrpc_received(struct jsonrpc *);
58 static void jsonrpc_cleanup(struct jsonrpc *);
59
60 /* This is just the same as stream_open() except that it uses the default
61  * JSONRPC ports if none is specified. */
62 int
63 jsonrpc_stream_open(const char *name, struct stream **streamp)
64 {
65     return stream_open_with_default_ports(name, JSONRPC_TCP_PORT,
66                                           JSONRPC_SSL_PORT, streamp);
67 }
68
69 /* This is just the same as pstream_open() except that it uses the default
70  * JSONRPC ports if none is specified. */
71 int
72 jsonrpc_pstream_open(const char *name, struct pstream **pstreamp)
73 {
74     return pstream_open_with_default_ports(name, JSONRPC_TCP_PORT,
75                                            JSONRPC_SSL_PORT, pstreamp);
76 }
77
78 struct jsonrpc *
79 jsonrpc_open(struct stream *stream)
80 {
81     struct jsonrpc *rpc;
82
83     assert(stream != NULL);
84
85     rpc = xzalloc(sizeof *rpc);
86     rpc->name = xstrdup(stream_get_name(stream));
87     rpc->stream = stream;
88     byteq_init(&rpc->input);
89     queue_init(&rpc->output);
90
91     return rpc;
92 }
93
94 void
95 jsonrpc_close(struct jsonrpc *rpc)
96 {
97     if (rpc) {
98         jsonrpc_cleanup(rpc);
99         free(rpc->name);
100         free(rpc);
101     }
102 }
103
104 void
105 jsonrpc_run(struct jsonrpc *rpc)
106 {
107     if (rpc->status) {
108         return;
109     }
110
111     stream_run(rpc->stream);
112     while (!queue_is_empty(&rpc->output)) {
113         struct ofpbuf *buf = rpc->output.head;
114         int retval;
115
116         retval = stream_send(rpc->stream, buf->data, buf->size);
117         if (retval >= 0) {
118             rpc->backlog -= retval;
119             ofpbuf_pull(buf, retval);
120             if (!buf->size) {
121                 ofpbuf_delete(queue_pop_head(&rpc->output));
122             }
123         } else {
124             if (retval != -EAGAIN) {
125                 VLOG_WARN_RL(&rl, "%s: send error: %s",
126                              rpc->name, strerror(-retval));
127                 jsonrpc_error(rpc, -retval);
128             }
129             break;
130         }
131     }
132 }
133
134 void
135 jsonrpc_wait(struct jsonrpc *rpc)
136 {
137     if (!rpc->status) {
138         stream_run_wait(rpc->stream);
139         if (!queue_is_empty(&rpc->output)) {
140             stream_send_wait(rpc->stream);
141         }
142     }
143 }
144
145 int
146 jsonrpc_get_status(const struct jsonrpc *rpc)
147 {
148     return rpc->status;
149 }
150
151 size_t
152 jsonrpc_get_backlog(const struct jsonrpc *rpc)
153 {
154     return rpc->status ? 0 : rpc->backlog;
155 }
156
157 const char *
158 jsonrpc_get_name(const struct jsonrpc *rpc)
159 {
160     return rpc->name;
161 }
162
163 static void
164 jsonrpc_log_msg(const struct jsonrpc *rpc, const char *title,
165                 const struct jsonrpc_msg *msg)
166 {
167     if (VLOG_IS_DBG_ENABLED()) {
168         struct ds s = DS_EMPTY_INITIALIZER;
169         if (msg->method) {
170             ds_put_format(&s, ", method=\"%s\"", msg->method);
171         }
172         if (msg->params) {
173             ds_put_cstr(&s, ", params=");
174             json_to_ds(msg->params, 0, &s);
175         }
176         if (msg->result) {
177             ds_put_cstr(&s, ", result=");
178             json_to_ds(msg->result, 0, &s);
179         }
180         if (msg->error) {
181             ds_put_cstr(&s, ", error=");
182             json_to_ds(msg->error, 0, &s);
183         }
184         if (msg->id) {
185             ds_put_cstr(&s, ", id=");
186             json_to_ds(msg->id, 0, &s);
187         }
188         VLOG_DBG("%s: %s %s%s", rpc->name, title,
189                  jsonrpc_msg_type_to_string(msg->type), ds_cstr(&s));
190         ds_destroy(&s);
191     }
192 }
193
194 /* Always takes ownership of 'msg', regardless of success. */
195 int
196 jsonrpc_send(struct jsonrpc *rpc, struct jsonrpc_msg *msg)
197 {
198     struct ofpbuf *buf;
199     struct json *json;
200     size_t length;
201     char *s;
202
203     if (rpc->status) {
204         jsonrpc_msg_destroy(msg);
205         return rpc->status;
206     }
207
208     jsonrpc_log_msg(rpc, "send", msg);
209
210     json = jsonrpc_msg_to_json(msg);
211     s = json_to_string(json, 0);
212     length = strlen(s);
213     json_destroy(json);
214
215     buf = xmalloc(sizeof *buf);
216     ofpbuf_use(buf, s, length);
217     buf->size = length;
218     queue_push_tail(&rpc->output, buf);
219     rpc->backlog += length;
220
221     if (rpc->output.n == 1) {
222         jsonrpc_run(rpc);
223     }
224     return rpc->status;
225 }
226
227 int
228 jsonrpc_recv(struct jsonrpc *rpc, struct jsonrpc_msg **msgp)
229 {
230     *msgp = NULL;
231     if (rpc->status) {
232         return rpc->status;
233     }
234
235     while (!rpc->received) {
236         if (byteq_is_empty(&rpc->input)) {
237             size_t chunk;
238             int retval;
239
240             chunk = byteq_headroom(&rpc->input);
241             retval = stream_recv(rpc->stream, byteq_head(&rpc->input), chunk);
242             if (retval < 0) {
243                 if (retval == -EAGAIN) {
244                     return EAGAIN;
245                 } else {
246                     VLOG_WARN_RL(&rl, "%s: receive error: %s",
247                                  rpc->name, strerror(-retval));
248                     jsonrpc_error(rpc, -retval);
249                     return rpc->status;
250                 }
251             } else if (retval == 0) {
252                 VLOG_INFO_RL(&rl, "%s: connection closed", rpc->name);
253                 jsonrpc_error(rpc, EOF);
254                 return EOF;
255             }
256             byteq_advance_head(&rpc->input, retval);
257         } else {
258             size_t n, used;
259
260             if (!rpc->parser) {
261                 rpc->parser = json_parser_create(0);
262             }
263             n = byteq_tailroom(&rpc->input);
264             used = json_parser_feed(rpc->parser,
265                                     (char *) byteq_tail(&rpc->input), n);
266             byteq_advance_tail(&rpc->input, used);
267             if (json_parser_is_done(rpc->parser)) {
268                 jsonrpc_received(rpc);
269                 if (rpc->status) {
270                     return rpc->status;
271                 }
272             }
273         }
274     }
275
276     *msgp = rpc->received;
277     rpc->received = NULL;
278     return 0;
279 }
280
281 void
282 jsonrpc_recv_wait(struct jsonrpc *rpc)
283 {
284     if (rpc->status || rpc->received || !byteq_is_empty(&rpc->input)) {
285         poll_immediate_wake();
286     } else {
287         stream_recv_wait(rpc->stream);
288     }
289 }
290
291 /* Always takes ownership of 'msg', regardless of success. */
292 int
293 jsonrpc_send_block(struct jsonrpc *rpc, struct jsonrpc_msg *msg)
294 {
295     int error;
296
297     fatal_signal_run();
298
299     error = jsonrpc_send(rpc, msg);
300     if (error) {
301         return error;
302     }
303
304     for (;;) {
305         jsonrpc_run(rpc);
306         if (queue_is_empty(&rpc->output) || rpc->status) {
307             return rpc->status;
308         }
309         jsonrpc_wait(rpc);
310         poll_block();
311     }
312 }
313
314 int
315 jsonrpc_recv_block(struct jsonrpc *rpc, struct jsonrpc_msg **msgp)
316 {
317     for (;;) {
318         int error = jsonrpc_recv(rpc, msgp);
319         if (error != EAGAIN) {
320             fatal_signal_run();
321             return error;
322         }
323
324         jsonrpc_run(rpc);
325         jsonrpc_wait(rpc);
326         jsonrpc_recv_wait(rpc);
327         poll_block();
328     }
329 }
330
331 /* Always takes ownership of 'request', regardless of success. */
332 int
333 jsonrpc_transact_block(struct jsonrpc *rpc, struct jsonrpc_msg *request,
334                        struct jsonrpc_msg **replyp)
335 {
336     struct jsonrpc_msg *reply = NULL;
337     struct json *id;
338     int error;
339
340     id = json_clone(request->id);
341     error = jsonrpc_send_block(rpc, request);
342     if (!error) {
343         for (;;) {
344             error = jsonrpc_recv_block(rpc, &reply);
345             if (error
346                 || (reply->type == JSONRPC_REPLY
347                     && json_equal(id, reply->id))) {
348                 break;
349             }
350             jsonrpc_msg_destroy(reply);
351         }
352     }
353     *replyp = error ? NULL : reply;
354     json_destroy(id);
355     return error;
356 }
357
358 static void
359 jsonrpc_received(struct jsonrpc *rpc)
360 {
361     struct jsonrpc_msg *msg;
362     struct json *json;
363     char *error;
364
365     json = json_parser_finish(rpc->parser);
366     rpc->parser = NULL;
367     if (json->type == JSON_STRING) {
368         VLOG_WARN_RL(&rl, "%s: error parsing stream: %s",
369                      rpc->name, json_string(json));
370         jsonrpc_error(rpc, EPROTO);
371         json_destroy(json);
372         return;
373     }
374
375     error = jsonrpc_msg_from_json(json, &msg);
376     if (error) {
377         VLOG_WARN_RL(&rl, "%s: received bad JSON-RPC message: %s",
378                      rpc->name, error);
379         free(error);
380         jsonrpc_error(rpc, EPROTO);
381         return;
382     }
383
384     jsonrpc_log_msg(rpc, "received", msg);
385     rpc->received = msg;
386 }
387
388 void
389 jsonrpc_error(struct jsonrpc *rpc, int error)
390 {
391     assert(error);
392     if (!rpc->status) {
393         rpc->status = error;
394         jsonrpc_cleanup(rpc);
395     }
396 }
397
398 static void
399 jsonrpc_cleanup(struct jsonrpc *rpc)
400 {
401     stream_close(rpc->stream);
402     rpc->stream = NULL;
403
404     json_parser_abort(rpc->parser);
405     rpc->parser = NULL;
406
407     jsonrpc_msg_destroy(rpc->received);
408     rpc->received = NULL;
409
410     queue_clear(&rpc->output);
411     rpc->backlog = 0;
412 }
413 \f
414 static struct jsonrpc_msg *
415 jsonrpc_create(enum jsonrpc_msg_type type, const char *method,
416                 struct json *params, struct json *result, struct json *error,
417                 struct json *id)
418 {
419     struct jsonrpc_msg *msg = xmalloc(sizeof *msg);
420     msg->type = type;
421     msg->method = method ? xstrdup(method) : NULL;
422     msg->params = params;
423     msg->result = result;
424     msg->error = error;
425     msg->id = id;
426     return msg;
427 }
428
429 static struct json *
430 jsonrpc_create_id(void)
431 {
432     static unsigned int id;
433     return json_integer_create(id++);
434 }
435
436 struct jsonrpc_msg *
437 jsonrpc_create_request(const char *method, struct json *params,
438                        struct json **idp)
439 {
440     struct json *id = jsonrpc_create_id();
441     if (idp) {
442         *idp = json_clone(id);
443     }
444     return jsonrpc_create(JSONRPC_REQUEST, method, params, NULL, NULL, id);
445 }
446
447 struct jsonrpc_msg *
448 jsonrpc_create_notify(const char *method, struct json *params)
449 {
450     return jsonrpc_create(JSONRPC_NOTIFY, method, params, NULL, NULL, NULL);
451 }
452
453 struct jsonrpc_msg *
454 jsonrpc_create_reply(struct json *result, const struct json *id)
455 {
456     return jsonrpc_create(JSONRPC_REPLY, NULL, NULL, result, NULL,
457                            json_clone(id));
458 }
459
460 struct jsonrpc_msg *
461 jsonrpc_create_error(struct json *error, const struct json *id)
462 {
463     return jsonrpc_create(JSONRPC_REPLY, NULL, NULL, NULL, error,
464                            json_clone(id));
465 }
466
467 const char *
468 jsonrpc_msg_type_to_string(enum jsonrpc_msg_type type)
469 {
470     switch (type) {
471     case JSONRPC_REQUEST:
472         return "request";
473
474     case JSONRPC_NOTIFY:
475         return "notification";
476
477     case JSONRPC_REPLY:
478         return "reply";
479
480     case JSONRPC_ERROR:
481         return "error";
482     }
483     return "(null)";
484 }
485
486 char *
487 jsonrpc_msg_is_valid(const struct jsonrpc_msg *m)
488 {
489     const char *type_name;
490     unsigned int pattern;
491
492     if (m->params && m->params->type != JSON_ARRAY) {
493         return xstrdup("\"params\" must be JSON array");
494     }
495
496     switch (m->type) {
497     case JSONRPC_REQUEST:
498         pattern = 0x11001;
499         break;
500
501     case JSONRPC_NOTIFY:
502         pattern = 0x11000;
503         break;
504
505     case JSONRPC_REPLY:
506         pattern = 0x00101;
507         break;
508
509     case JSONRPC_ERROR:
510         pattern = 0x00011;
511         break;
512
513     default:
514         return xasprintf("invalid JSON-RPC message type %d", m->type);
515     }
516
517     type_name = jsonrpc_msg_type_to_string(m->type);
518     if ((m->method != NULL) != ((pattern & 0x10000) != 0)) {
519         return xasprintf("%s must%s have \"method\"",
520                          type_name, (pattern & 0x10000) ? "" : " not");
521
522     }
523     if ((m->params != NULL) != ((pattern & 0x1000) != 0)) {
524         return xasprintf("%s must%s have \"params\"",
525                          type_name, (pattern & 0x1000) ? "" : " not");
526
527     }
528     if ((m->result != NULL) != ((pattern & 0x100) != 0)) {
529         return xasprintf("%s must%s have \"result\"",
530                          type_name, (pattern & 0x100) ? "" : " not");
531
532     }
533     if ((m->error != NULL) != ((pattern & 0x10) != 0)) {
534         return xasprintf("%s must%s have \"error\"",
535                          type_name, (pattern & 0x10) ? "" : " not");
536
537     }
538     if ((m->id != NULL) != ((pattern & 0x1) != 0)) {
539         return xasprintf("%s must%s have \"id\"",
540                          type_name, (pattern & 0x1) ? "" : " not");
541
542     }
543     return NULL;
544 }
545
546 void
547 jsonrpc_msg_destroy(struct jsonrpc_msg *m)
548 {
549     if (m) {
550         free(m->method);
551         json_destroy(m->params);
552         json_destroy(m->result);
553         json_destroy(m->error);
554         json_destroy(m->id);
555         free(m);
556     }
557 }
558
559 static struct json *
560 null_from_json_null(struct json *json)
561 {
562     if (json && json->type == JSON_NULL) {
563         json_destroy(json);
564         return NULL;
565     }
566     return json;
567 }
568
569 char *
570 jsonrpc_msg_from_json(struct json *json, struct jsonrpc_msg **msgp)
571 {
572     struct json *method = NULL;
573     struct jsonrpc_msg *msg = NULL;
574     struct shash *object;
575     char *error;
576
577     if (json->type != JSON_OBJECT) {
578         error = xstrdup("message is not a JSON object");
579         goto exit;
580     }
581     object = json_object(json);
582
583     method = shash_find_and_delete(object, "method");
584     if (method && method->type != JSON_STRING) {
585         error = xstrdup("method is not a JSON string");
586         goto exit;
587     }
588
589     msg = xzalloc(sizeof *msg);
590     msg->method = method ? xstrdup(method->u.string) : NULL;
591     msg->params = null_from_json_null(shash_find_and_delete(object, "params"));
592     msg->result = null_from_json_null(shash_find_and_delete(object, "result"));
593     msg->error = null_from_json_null(shash_find_and_delete(object, "error"));
594     msg->id = null_from_json_null(shash_find_and_delete(object, "id"));
595     msg->type = (msg->result ? JSONRPC_REPLY
596                  : msg->error ? JSONRPC_ERROR
597                  : msg->id ? JSONRPC_REQUEST
598                  : JSONRPC_NOTIFY);
599     if (!shash_is_empty(object)) {
600         error = xasprintf("message has unexpected member \"%s\"",
601                           shash_first(object)->name);
602         goto exit;
603     }
604     error = jsonrpc_msg_is_valid(msg);
605     if (error) {
606         goto exit;
607     }
608
609 exit:
610     json_destroy(method);
611     json_destroy(json);
612     if (error) {
613         jsonrpc_msg_destroy(msg);
614         msg = NULL;
615     }
616     *msgp = msg;
617     return error;
618 }
619
620 struct json *
621 jsonrpc_msg_to_json(struct jsonrpc_msg *m)
622 {
623     struct json *json = json_object_create();
624
625     if (m->method) {
626         json_object_put(json, "method", json_string_create_nocopy(m->method));
627     }
628
629     if (m->params) {
630         json_object_put(json, "params", m->params);
631     }
632
633     if (m->result) {
634         json_object_put(json, "result", m->result);
635     } else if (m->type == JSONRPC_ERROR) {
636         json_object_put(json, "result", json_null_create());
637     }
638
639     if (m->error) {
640         json_object_put(json, "error", m->error);
641     } else if (m->type == JSONRPC_REPLY) {
642         json_object_put(json, "error", json_null_create());
643     }
644
645     if (m->id) {
646         json_object_put(json, "id", m->id);
647     } else if (m->type == JSONRPC_NOTIFY) {
648         json_object_put(json, "id", json_null_create());
649     }
650
651     free(m);
652
653     return json;
654 }
655 \f
656 /* A JSON-RPC session with reconnection. */
657
658 struct jsonrpc_session {
659     struct reconnect *reconnect;
660     struct jsonrpc *rpc;
661     struct stream *stream;
662     struct pstream *pstream;
663     unsigned int seqno;
664 };
665
666 /* Creates and returns a jsonrpc_session to 'name', which should be a string
667  * acceptable to stream_open() or pstream_open().
668  *
669  * If 'name' is an active connection method, e.g. "tcp:127.1.2.3", the new
670  * jsonrpc_session connects and reconnects, with back-off, to 'name'.
671  *
672  * If 'name' is a passive connection method, e.g. "ptcp:", the new
673  * jsonrpc_session listens for connections to 'name'.  It maintains at most one
674  * connection at any given time.  Any new connection causes the previous one
675  * (if any) to be dropped. */
676 struct jsonrpc_session *
677 jsonrpc_session_open(const char *name)
678 {
679     struct jsonrpc_session *s;
680
681     s = xmalloc(sizeof *s);
682     s->reconnect = reconnect_create(time_msec());
683     reconnect_set_name(s->reconnect, name);
684     reconnect_enable(s->reconnect, time_msec());
685     s->rpc = NULL;
686     s->stream = NULL;
687     s->pstream = NULL;
688     s->seqno = 0;
689
690     if (!pstream_verify_name(name)) {
691         reconnect_set_passive(s->reconnect, true, time_msec());
692     }
693
694     return s;
695 }
696
697 /* Creates and returns a jsonrpc_session that is initially connected to
698  * 'jsonrpc'.  If the connection is dropped, it will not be reconnected. */
699 struct jsonrpc_session *
700 jsonrpc_session_open_unreliably(struct jsonrpc *jsonrpc)
701 {
702     struct jsonrpc_session *s;
703
704     s = xmalloc(sizeof *s);
705     s->reconnect = reconnect_create(time_msec());
706     reconnect_set_name(s->reconnect, jsonrpc_get_name(jsonrpc));
707     reconnect_set_max_tries(s->reconnect, 0);
708     reconnect_connected(s->reconnect, time_msec());
709     s->rpc = jsonrpc;
710     s->stream = NULL;
711     s->pstream = NULL;
712     s->seqno = 0;
713
714     return s;
715 }
716
717 void
718 jsonrpc_session_close(struct jsonrpc_session *s)
719 {
720     if (s) {
721         jsonrpc_close(s->rpc);
722         reconnect_destroy(s->reconnect);
723         stream_close(s->stream);
724         pstream_close(s->pstream);
725         free(s);
726     }
727 }
728
729 static void
730 jsonrpc_session_disconnect(struct jsonrpc_session *s)
731 {
732     if (s->rpc) {
733         jsonrpc_error(s->rpc, EOF);
734         jsonrpc_close(s->rpc);
735         s->rpc = NULL;
736         s->seqno++;
737     } else if (s->stream) {
738         stream_close(s->stream);
739         s->stream = NULL;
740         s->seqno++;
741     }
742 }
743
744 static void
745 jsonrpc_session_connect(struct jsonrpc_session *s)
746 {
747     const char *name = reconnect_get_name(s->reconnect);
748     int error;
749
750     jsonrpc_session_disconnect(s);
751     if (!reconnect_is_passive(s->reconnect)) {
752         error = jsonrpc_stream_open(name, &s->stream);
753         if (!error) {
754             reconnect_connecting(s->reconnect, time_msec());
755         }
756     } else {
757         error = s->pstream ? 0 : jsonrpc_pstream_open(name, &s->pstream);
758         if (!error) {
759             reconnect_listening(s->reconnect, time_msec());
760         }
761     }
762
763     if (error) {
764         reconnect_connect_failed(s->reconnect, time_msec(), error);
765     }
766     s->seqno++;
767 }
768
769 void
770 jsonrpc_session_run(struct jsonrpc_session *s)
771 {
772     if (s->pstream) {
773         struct stream *stream;
774         int error;
775
776         error = pstream_accept(s->pstream, &stream);
777         if (!error) {
778             if (s->rpc || s->stream) {
779                 VLOG_INFO_RL(&rl,
780                              "%s: new connection replacing active connection",
781                              reconnect_get_name(s->reconnect));
782                 jsonrpc_session_disconnect(s);
783             }
784             reconnect_connected(s->reconnect, time_msec());
785             s->rpc = jsonrpc_open(stream);
786         } else if (error != EAGAIN) {
787             reconnect_listen_error(s->reconnect, time_msec(), error);
788             pstream_close(s->pstream);
789             s->pstream = NULL;
790         }
791     }
792
793     if (s->rpc) {
794         int error;
795
796         jsonrpc_run(s->rpc);
797         error = jsonrpc_get_status(s->rpc);
798         if (error) {
799             reconnect_disconnected(s->reconnect, time_msec(), 0);
800             jsonrpc_session_disconnect(s);
801         }
802     } else if (s->stream) {
803         int error;
804
805         stream_run(s->stream);
806         error = stream_connect(s->stream);
807         if (!error) {
808             reconnect_connected(s->reconnect, time_msec());
809             s->rpc = jsonrpc_open(s->stream);
810             s->stream = NULL;
811         } else if (error != EAGAIN) {
812             reconnect_connect_failed(s->reconnect, time_msec(), error);
813             stream_close(s->stream);
814             s->stream = NULL;
815         }
816     }
817
818     switch (reconnect_run(s->reconnect, time_msec())) {
819     case RECONNECT_CONNECT:
820         jsonrpc_session_connect(s);
821         break;
822
823     case RECONNECT_DISCONNECT:
824         reconnect_disconnected(s->reconnect, time_msec(), 0);
825         jsonrpc_session_disconnect(s);
826         break;
827
828     case RECONNECT_PROBE:
829         if (s->rpc) {
830             struct json *params;
831             struct jsonrpc_msg *request;
832
833             params = json_array_create_empty();
834             request = jsonrpc_create_request("echo", params, NULL);
835             json_destroy(request->id);
836             request->id = json_string_create("echo");
837             jsonrpc_send(s->rpc, request);
838         }
839         break;
840     }
841 }
842
843 void
844 jsonrpc_session_wait(struct jsonrpc_session *s)
845 {
846     if (s->rpc) {
847         jsonrpc_wait(s->rpc);
848     } else if (s->stream) {
849         stream_run_wait(s->stream);
850         stream_connect_wait(s->stream);
851     }
852     if (s->pstream) {
853         pstream_wait(s->pstream);
854     }
855     reconnect_wait(s->reconnect, time_msec());
856 }
857
858 size_t
859 jsonrpc_session_get_backlog(const struct jsonrpc_session *s)
860 {
861     return s->rpc ? jsonrpc_get_backlog(s->rpc) : 0;
862 }
863
864 const char *
865 jsonrpc_session_get_name(const struct jsonrpc_session *s)
866 {
867     return reconnect_get_name(s->reconnect);
868 }
869
870 /* Always takes ownership of 'msg', regardless of success. */
871 int
872 jsonrpc_session_send(struct jsonrpc_session *s, struct jsonrpc_msg *msg)
873 {
874     if (s->rpc) {
875         return jsonrpc_send(s->rpc, msg);
876     } else {
877         jsonrpc_msg_destroy(msg);
878         return ENOTCONN;
879     }
880 }
881
882 struct jsonrpc_msg *
883 jsonrpc_session_recv(struct jsonrpc_session *s)
884 {
885     if (s->rpc) {
886         struct jsonrpc_msg *msg;
887         jsonrpc_recv(s->rpc, &msg);
888         if (msg) {
889             reconnect_received(s->reconnect, time_msec());
890             if (msg->type == JSONRPC_REQUEST && !strcmp(msg->method, "echo")) {
891                 /* Echo request.  Send reply. */
892                 struct jsonrpc_msg *reply;
893
894                 reply = jsonrpc_create_reply(json_clone(msg->params), msg->id);
895                 jsonrpc_session_send(s, reply);
896             } else if (msg->type == JSONRPC_REPLY
897                 && msg->id && msg->id->type == JSON_STRING
898                 && !strcmp(msg->id->u.string, "echo")) {
899                 /* It's a reply to our echo request.  Suppress it. */
900             } else {
901                 return msg;
902             }
903             jsonrpc_msg_destroy(msg);
904         }
905     }
906     return NULL;
907 }
908
909 void
910 jsonrpc_session_recv_wait(struct jsonrpc_session *s)
911 {
912     if (s->rpc) {
913         jsonrpc_recv_wait(s->rpc);
914     }
915 }
916
917 bool
918 jsonrpc_session_is_alive(const struct jsonrpc_session *s)
919 {
920     return s->rpc || s->stream || reconnect_get_max_tries(s->reconnect);
921 }
922
923 bool
924 jsonrpc_session_is_connected(const struct jsonrpc_session *s)
925 {
926     return s->rpc != NULL;
927 }
928
929 unsigned int
930 jsonrpc_session_get_seqno(const struct jsonrpc_session *s)
931 {
932     return s->seqno;
933 }
934
935 void
936 jsonrpc_session_force_reconnect(struct jsonrpc_session *s)
937 {
938     reconnect_force_reconnect(s->reconnect, time_msec());
939 }