Revert DSCP update changes.
[openvswitch] / lib / jsonrpc.c
1 /*
2  * Copyright (c) 2009, 2010, 2011, 2012 Nicira, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at:
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #include <config.h>
18
19 #include "jsonrpc.h"
20
21 #include <assert.h>
22 #include <errno.h>
23
24 #include "byteq.h"
25 #include "dynamic-string.h"
26 #include "fatal-signal.h"
27 #include "json.h"
28 #include "list.h"
29 #include "ofpbuf.h"
30 #include "poll-loop.h"
31 #include "reconnect.h"
32 #include "stream.h"
33 #include "timeval.h"
34 #include "vlog.h"
35
36 VLOG_DEFINE_THIS_MODULE(jsonrpc);
37 \f
38 struct jsonrpc {
39     struct stream *stream;
40     char *name;
41     int status;
42
43     /* Input. */
44     struct byteq input;
45     struct json_parser *parser;
46     struct jsonrpc_msg *received;
47
48     /* Output. */
49     struct list output;         /* Contains "struct ofpbuf"s. */
50     size_t backlog;
51 };
52
53 /* Rate limit for error messages. */
54 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(5, 5);
55
56 static void jsonrpc_received(struct jsonrpc *);
57 static void jsonrpc_cleanup(struct jsonrpc *);
58 static void jsonrpc_error(struct jsonrpc *, int error);
59
60 /* This is just the same as stream_open() except that it uses the default
61  * JSONRPC ports if none is specified. */
62 int
63 jsonrpc_stream_open(const char *name, struct stream **streamp, uint8_t dscp)
64 {
65     return stream_open_with_default_ports(name, JSONRPC_TCP_PORT,
66                                           JSONRPC_SSL_PORT, streamp,
67                                           dscp);
68 }
69
70 /* This is just the same as pstream_open() except that it uses the default
71  * JSONRPC ports if none is specified. */
72 int
73 jsonrpc_pstream_open(const char *name, struct pstream **pstreamp, uint8_t dscp)
74 {
75     return pstream_open_with_default_ports(name, JSONRPC_TCP_PORT,
76                                            JSONRPC_SSL_PORT, pstreamp, dscp);
77 }
78
79 /* Returns a new JSON-RPC stream that uses 'stream' for input and output.  The
80  * new jsonrpc object takes ownership of 'stream'. */
81 struct jsonrpc *
82 jsonrpc_open(struct stream *stream)
83 {
84     struct jsonrpc *rpc;
85
86     assert(stream != NULL);
87
88     rpc = xzalloc(sizeof *rpc);
89     rpc->name = xstrdup(stream_get_name(stream));
90     rpc->stream = stream;
91     byteq_init(&rpc->input);
92     list_init(&rpc->output);
93
94     return rpc;
95 }
96
97 /* Destroys 'rpc', closing the stream on which it is based, and frees its
98  * memory. */
99 void
100 jsonrpc_close(struct jsonrpc *rpc)
101 {
102     if (rpc) {
103         jsonrpc_cleanup(rpc);
104         free(rpc->name);
105         free(rpc);
106     }
107 }
108
109 /* Performs periodic maintenance on 'rpc', such as flushing output buffers. */
110 void
111 jsonrpc_run(struct jsonrpc *rpc)
112 {
113     if (rpc->status) {
114         return;
115     }
116
117     stream_run(rpc->stream);
118     while (!list_is_empty(&rpc->output)) {
119         struct ofpbuf *buf = ofpbuf_from_list(rpc->output.next);
120         int retval;
121
122         retval = stream_send(rpc->stream, buf->data, buf->size);
123         if (retval >= 0) {
124             rpc->backlog -= retval;
125             ofpbuf_pull(buf, retval);
126             if (!buf->size) {
127                 list_remove(&buf->list_node);
128                 ofpbuf_delete(buf);
129             }
130         } else {
131             if (retval != -EAGAIN) {
132                 VLOG_WARN_RL(&rl, "%s: send error: %s",
133                              rpc->name, strerror(-retval));
134                 jsonrpc_error(rpc, -retval);
135             }
136             break;
137         }
138     }
139 }
140
141 /* Arranges for the poll loop to wake up when 'rpc' needs to perform
142  * maintenance activities. */
143 void
144 jsonrpc_wait(struct jsonrpc *rpc)
145 {
146     if (!rpc->status) {
147         stream_run_wait(rpc->stream);
148         if (!list_is_empty(&rpc->output)) {
149             stream_send_wait(rpc->stream);
150         }
151     }
152 }
153
154 /*
155  * Returns the current status of 'rpc'.  The possible return values are:
156  * - 0: no error yet
157  * - >0: errno value
158  * - EOF: end of file (remote end closed connection; not necessarily an error).
159  *
160  * When this functions nonzero, 'rpc' is effectively out of commission.  'rpc'
161  * will not receive any more messages and any further messages that one
162  * attempts to send with 'rpc' will be discarded.  The caller can keep 'rpc'
163  * around as long as it wants, but it's not going to provide any more useful
164  * services.
165  */
166 int
167 jsonrpc_get_status(const struct jsonrpc *rpc)
168 {
169     return rpc->status;
170 }
171
172 /* Returns the number of bytes buffered by 'rpc' to be written to the
173  * underlying stream.  Always returns 0 if 'rpc' has encountered an error or if
174  * the remote end closed the connection. */
175 size_t
176 jsonrpc_get_backlog(const struct jsonrpc *rpc)
177 {
178     return rpc->status ? 0 : rpc->backlog;
179 }
180
181 /* Returns 'rpc''s name, that is, the name returned by stream_get_name() for
182  * the stream underlying 'rpc' when 'rpc' was created. */
183 const char *
184 jsonrpc_get_name(const struct jsonrpc *rpc)
185 {
186     return rpc->name;
187 }
188
189 static void
190 jsonrpc_log_msg(const struct jsonrpc *rpc, const char *title,
191                 const struct jsonrpc_msg *msg)
192 {
193     if (VLOG_IS_DBG_ENABLED()) {
194         struct ds s = DS_EMPTY_INITIALIZER;
195         if (msg->method) {
196             ds_put_format(&s, ", method=\"%s\"", msg->method);
197         }
198         if (msg->params) {
199             ds_put_cstr(&s, ", params=");
200             json_to_ds(msg->params, 0, &s);
201         }
202         if (msg->result) {
203             ds_put_cstr(&s, ", result=");
204             json_to_ds(msg->result, 0, &s);
205         }
206         if (msg->error) {
207             ds_put_cstr(&s, ", error=");
208             json_to_ds(msg->error, 0, &s);
209         }
210         if (msg->id) {
211             ds_put_cstr(&s, ", id=");
212             json_to_ds(msg->id, 0, &s);
213         }
214         VLOG_DBG("%s: %s %s%s", rpc->name, title,
215                  jsonrpc_msg_type_to_string(msg->type), ds_cstr(&s));
216         ds_destroy(&s);
217     }
218 }
219
220 /* Schedules 'msg' to be sent on 'rpc' and returns 'rpc''s status (as with
221  * jsonrpc_get_status()).
222  *
223  * If 'msg' cannot be sent immediately, it is appended to a buffer.  The caller
224  * is responsible for ensuring that the amount of buffered data is somehow
225  * limited.  (jsonrpc_get_backlog() returns the amount of data currently
226  * buffered in 'rpc'.)
227  *
228  * Always takes ownership of 'msg', regardless of success. */
229 int
230 jsonrpc_send(struct jsonrpc *rpc, struct jsonrpc_msg *msg)
231 {
232     struct ofpbuf *buf;
233     struct json *json;
234     size_t length;
235     char *s;
236
237     if (rpc->status) {
238         jsonrpc_msg_destroy(msg);
239         return rpc->status;
240     }
241
242     jsonrpc_log_msg(rpc, "send", msg);
243
244     json = jsonrpc_msg_to_json(msg);
245     s = json_to_string(json, 0);
246     length = strlen(s);
247     json_destroy(json);
248
249     buf = xmalloc(sizeof *buf);
250     ofpbuf_use(buf, s, length);
251     buf->size = length;
252     list_push_back(&rpc->output, &buf->list_node);
253     rpc->backlog += length;
254
255     if (rpc->backlog == length) {
256         jsonrpc_run(rpc);
257     }
258     return rpc->status;
259 }
260
261 /* Attempts to receive a message from 'rpc'.
262  *
263  * If successful, stores the received message in '*msgp' and returns 0.  The
264  * caller takes ownership of '*msgp' and must eventually destroy it with
265  * jsonrpc_msg_destroy().
266  *
267  * Otherwise, stores NULL in '*msgp' and returns one of the following:
268  *
269  *   - EAGAIN: No message has been received.
270  *
271  *   - EOF: The remote end closed the connection gracefully.
272  *
273  *   - Otherwise an errno value that represents a JSON-RPC protocol violation
274  *     or another error fatal to the connection.  'rpc' will not send or
275  *     receive any more messages.
276  */
277 int
278 jsonrpc_recv(struct jsonrpc *rpc, struct jsonrpc_msg **msgp)
279 {
280     int i;
281
282     *msgp = NULL;
283     if (rpc->status) {
284         return rpc->status;
285     }
286
287     for (i = 0; i < 50; i++) {
288         if (rpc->received) {
289             *msgp = rpc->received;
290             rpc->received = NULL;
291             return 0;
292         } else if (byteq_is_empty(&rpc->input)) {
293             size_t chunk;
294             int retval;
295
296             chunk = byteq_headroom(&rpc->input);
297             retval = stream_recv(rpc->stream, byteq_head(&rpc->input), chunk);
298             if (retval < 0) {
299                 if (retval == -EAGAIN) {
300                     return EAGAIN;
301                 } else {
302                     VLOG_WARN_RL(&rl, "%s: receive error: %s",
303                                  rpc->name, strerror(-retval));
304                     jsonrpc_error(rpc, -retval);
305                     return rpc->status;
306                 }
307             } else if (retval == 0) {
308                 jsonrpc_error(rpc, EOF);
309                 return EOF;
310             }
311             byteq_advance_head(&rpc->input, retval);
312         } else {
313             size_t n, used;
314
315             if (!rpc->parser) {
316                 rpc->parser = json_parser_create(0);
317             }
318             n = byteq_tailroom(&rpc->input);
319             used = json_parser_feed(rpc->parser,
320                                     (char *) byteq_tail(&rpc->input), n);
321             byteq_advance_tail(&rpc->input, used);
322             if (json_parser_is_done(rpc->parser)) {
323                 jsonrpc_received(rpc);
324                 if (rpc->status) {
325                     const struct byteq *q = &rpc->input;
326                     if (q->head <= BYTEQ_SIZE) {
327                         stream_report_content(q->buffer, q->head,
328                                               STREAM_JSONRPC,
329                                               THIS_MODULE, rpc->name);
330                     }
331                     return rpc->status;
332                 }
333             }
334         }
335     }
336
337     return EAGAIN;
338 }
339
340 /* Causes the poll loop to wake up when jsonrpc_recv() may return a value other
341  * than EAGAIN. */
342 void
343 jsonrpc_recv_wait(struct jsonrpc *rpc)
344 {
345     if (rpc->status || rpc->received || !byteq_is_empty(&rpc->input)) {
346         (poll_immediate_wake)(rpc->name);
347     } else {
348         stream_recv_wait(rpc->stream);
349     }
350 }
351
352 /* Sends 'msg' on 'rpc' and waits for it to be successfully queued to the
353  * underlying stream.  Returns 0 if 'msg' was sent successfully, otherwise a
354  * status value (see jsonrpc_get_status()).
355  *
356  * Always takes ownership of 'msg', regardless of success. */
357 int
358 jsonrpc_send_block(struct jsonrpc *rpc, struct jsonrpc_msg *msg)
359 {
360     int error;
361
362     fatal_signal_run();
363
364     error = jsonrpc_send(rpc, msg);
365     if (error) {
366         return error;
367     }
368
369     for (;;) {
370         jsonrpc_run(rpc);
371         if (list_is_empty(&rpc->output) || rpc->status) {
372             return rpc->status;
373         }
374         jsonrpc_wait(rpc);
375         poll_block();
376     }
377 }
378
379 /* Waits for a message to be received on 'rpc'.  Same semantics as
380  * jsonrpc_recv() except that EAGAIN will never be returned. */
381 int
382 jsonrpc_recv_block(struct jsonrpc *rpc, struct jsonrpc_msg **msgp)
383 {
384     for (;;) {
385         int error = jsonrpc_recv(rpc, msgp);
386         if (error != EAGAIN) {
387             fatal_signal_run();
388             return error;
389         }
390
391         jsonrpc_run(rpc);
392         jsonrpc_wait(rpc);
393         jsonrpc_recv_wait(rpc);
394         poll_block();
395     }
396 }
397
398 /* Sends 'request' to 'rpc' then waits for a reply.  The return value is 0 if
399  * successful, in which case '*replyp' is set to the reply, which the caller
400  * must eventually free with jsonrpc_msg_destroy().  Otherwise returns a status
401  * value (see jsonrpc_get_status()).
402  *
403  * Discards any message received on 'rpc' that is not a reply to 'request'
404  * (based on message id).
405  *
406  * Always takes ownership of 'request', regardless of success. */
407 int
408 jsonrpc_transact_block(struct jsonrpc *rpc, struct jsonrpc_msg *request,
409                        struct jsonrpc_msg **replyp)
410 {
411     struct jsonrpc_msg *reply = NULL;
412     struct json *id;
413     int error;
414
415     id = json_clone(request->id);
416     error = jsonrpc_send_block(rpc, request);
417     if (!error) {
418         for (;;) {
419             error = jsonrpc_recv_block(rpc, &reply);
420             if (error) {
421                 break;
422             }
423             if ((reply->type == JSONRPC_REPLY || reply->type == JSONRPC_ERROR)
424                 && json_equal(id, reply->id)) {
425                 break;
426             }
427             jsonrpc_msg_destroy(reply);
428         }
429     }
430     *replyp = error ? NULL : reply;
431     json_destroy(id);
432     return error;
433 }
434
435 static void
436 jsonrpc_received(struct jsonrpc *rpc)
437 {
438     struct jsonrpc_msg *msg;
439     struct json *json;
440     char *error;
441
442     json = json_parser_finish(rpc->parser);
443     rpc->parser = NULL;
444     if (json->type == JSON_STRING) {
445         VLOG_WARN_RL(&rl, "%s: error parsing stream: %s",
446                      rpc->name, json_string(json));
447         jsonrpc_error(rpc, EPROTO);
448         json_destroy(json);
449         return;
450     }
451
452     error = jsonrpc_msg_from_json(json, &msg);
453     if (error) {
454         VLOG_WARN_RL(&rl, "%s: received bad JSON-RPC message: %s",
455                      rpc->name, error);
456         free(error);
457         jsonrpc_error(rpc, EPROTO);
458         return;
459     }
460
461     jsonrpc_log_msg(rpc, "received", msg);
462     rpc->received = msg;
463 }
464
465 static void
466 jsonrpc_error(struct jsonrpc *rpc, int error)
467 {
468     assert(error);
469     if (!rpc->status) {
470         rpc->status = error;
471         jsonrpc_cleanup(rpc);
472     }
473 }
474
475 static void
476 jsonrpc_cleanup(struct jsonrpc *rpc)
477 {
478     stream_close(rpc->stream);
479     rpc->stream = NULL;
480
481     json_parser_abort(rpc->parser);
482     rpc->parser = NULL;
483
484     jsonrpc_msg_destroy(rpc->received);
485     rpc->received = NULL;
486
487     ofpbuf_list_delete(&rpc->output);
488     rpc->backlog = 0;
489 }
490 \f
491 static struct jsonrpc_msg *
492 jsonrpc_create(enum jsonrpc_msg_type type, const char *method,
493                 struct json *params, struct json *result, struct json *error,
494                 struct json *id)
495 {
496     struct jsonrpc_msg *msg = xmalloc(sizeof *msg);
497     msg->type = type;
498     msg->method = method ? xstrdup(method) : NULL;
499     msg->params = params;
500     msg->result = result;
501     msg->error = error;
502     msg->id = id;
503     return msg;
504 }
505
506 static struct json *
507 jsonrpc_create_id(void)
508 {
509     static unsigned int id;
510     return json_integer_create(id++);
511 }
512
513 struct jsonrpc_msg *
514 jsonrpc_create_request(const char *method, struct json *params,
515                        struct json **idp)
516 {
517     struct json *id = jsonrpc_create_id();
518     if (idp) {
519         *idp = json_clone(id);
520     }
521     return jsonrpc_create(JSONRPC_REQUEST, method, params, NULL, NULL, id);
522 }
523
524 struct jsonrpc_msg *
525 jsonrpc_create_notify(const char *method, struct json *params)
526 {
527     return jsonrpc_create(JSONRPC_NOTIFY, method, params, NULL, NULL, NULL);
528 }
529
530 struct jsonrpc_msg *
531 jsonrpc_create_reply(struct json *result, const struct json *id)
532 {
533     return jsonrpc_create(JSONRPC_REPLY, NULL, NULL, result, NULL,
534                            json_clone(id));
535 }
536
537 struct jsonrpc_msg *
538 jsonrpc_create_error(struct json *error, const struct json *id)
539 {
540     return jsonrpc_create(JSONRPC_REPLY, NULL, NULL, NULL, error,
541                            json_clone(id));
542 }
543
544 const char *
545 jsonrpc_msg_type_to_string(enum jsonrpc_msg_type type)
546 {
547     switch (type) {
548     case JSONRPC_REQUEST:
549         return "request";
550
551     case JSONRPC_NOTIFY:
552         return "notification";
553
554     case JSONRPC_REPLY:
555         return "reply";
556
557     case JSONRPC_ERROR:
558         return "error";
559     }
560     return "(null)";
561 }
562
563 char *
564 jsonrpc_msg_is_valid(const struct jsonrpc_msg *m)
565 {
566     const char *type_name;
567     unsigned int pattern;
568
569     if (m->params && m->params->type != JSON_ARRAY) {
570         return xstrdup("\"params\" must be JSON array");
571     }
572
573     switch (m->type) {
574     case JSONRPC_REQUEST:
575         pattern = 0x11001;
576         break;
577
578     case JSONRPC_NOTIFY:
579         pattern = 0x11000;
580         break;
581
582     case JSONRPC_REPLY:
583         pattern = 0x00101;
584         break;
585
586     case JSONRPC_ERROR:
587         pattern = 0x00011;
588         break;
589
590     default:
591         return xasprintf("invalid JSON-RPC message type %d", m->type);
592     }
593
594     type_name = jsonrpc_msg_type_to_string(m->type);
595     if ((m->method != NULL) != ((pattern & 0x10000) != 0)) {
596         return xasprintf("%s must%s have \"method\"",
597                          type_name, (pattern & 0x10000) ? "" : " not");
598
599     }
600     if ((m->params != NULL) != ((pattern & 0x1000) != 0)) {
601         return xasprintf("%s must%s have \"params\"",
602                          type_name, (pattern & 0x1000) ? "" : " not");
603
604     }
605     if ((m->result != NULL) != ((pattern & 0x100) != 0)) {
606         return xasprintf("%s must%s have \"result\"",
607                          type_name, (pattern & 0x100) ? "" : " not");
608
609     }
610     if ((m->error != NULL) != ((pattern & 0x10) != 0)) {
611         return xasprintf("%s must%s have \"error\"",
612                          type_name, (pattern & 0x10) ? "" : " not");
613
614     }
615     if ((m->id != NULL) != ((pattern & 0x1) != 0)) {
616         return xasprintf("%s must%s have \"id\"",
617                          type_name, (pattern & 0x1) ? "" : " not");
618
619     }
620     return NULL;
621 }
622
623 void
624 jsonrpc_msg_destroy(struct jsonrpc_msg *m)
625 {
626     if (m) {
627         free(m->method);
628         json_destroy(m->params);
629         json_destroy(m->result);
630         json_destroy(m->error);
631         json_destroy(m->id);
632         free(m);
633     }
634 }
635
636 static struct json *
637 null_from_json_null(struct json *json)
638 {
639     if (json && json->type == JSON_NULL) {
640         json_destroy(json);
641         return NULL;
642     }
643     return json;
644 }
645
646 char *
647 jsonrpc_msg_from_json(struct json *json, struct jsonrpc_msg **msgp)
648 {
649     struct json *method = NULL;
650     struct jsonrpc_msg *msg = NULL;
651     struct shash *object;
652     char *error;
653
654     if (json->type != JSON_OBJECT) {
655         error = xstrdup("message is not a JSON object");
656         goto exit;
657     }
658     object = json_object(json);
659
660     method = shash_find_and_delete(object, "method");
661     if (method && method->type != JSON_STRING) {
662         error = xstrdup("method is not a JSON string");
663         goto exit;
664     }
665
666     msg = xzalloc(sizeof *msg);
667     msg->method = method ? xstrdup(method->u.string) : NULL;
668     msg->params = null_from_json_null(shash_find_and_delete(object, "params"));
669     msg->result = null_from_json_null(shash_find_and_delete(object, "result"));
670     msg->error = null_from_json_null(shash_find_and_delete(object, "error"));
671     msg->id = null_from_json_null(shash_find_and_delete(object, "id"));
672     msg->type = (msg->result ? JSONRPC_REPLY
673                  : msg->error ? JSONRPC_ERROR
674                  : msg->id ? JSONRPC_REQUEST
675                  : JSONRPC_NOTIFY);
676     if (!shash_is_empty(object)) {
677         error = xasprintf("message has unexpected member \"%s\"",
678                           shash_first(object)->name);
679         goto exit;
680     }
681     error = jsonrpc_msg_is_valid(msg);
682     if (error) {
683         goto exit;
684     }
685
686 exit:
687     json_destroy(method);
688     json_destroy(json);
689     if (error) {
690         jsonrpc_msg_destroy(msg);
691         msg = NULL;
692     }
693     *msgp = msg;
694     return error;
695 }
696
697 struct json *
698 jsonrpc_msg_to_json(struct jsonrpc_msg *m)
699 {
700     struct json *json = json_object_create();
701
702     if (m->method) {
703         json_object_put(json, "method", json_string_create_nocopy(m->method));
704     }
705
706     if (m->params) {
707         json_object_put(json, "params", m->params);
708     }
709
710     if (m->result) {
711         json_object_put(json, "result", m->result);
712     } else if (m->type == JSONRPC_ERROR) {
713         json_object_put(json, "result", json_null_create());
714     }
715
716     if (m->error) {
717         json_object_put(json, "error", m->error);
718     } else if (m->type == JSONRPC_REPLY) {
719         json_object_put(json, "error", json_null_create());
720     }
721
722     if (m->id) {
723         json_object_put(json, "id", m->id);
724     } else if (m->type == JSONRPC_NOTIFY) {
725         json_object_put(json, "id", json_null_create());
726     }
727
728     free(m);
729
730     return json;
731 }
732 \f
733 /* A JSON-RPC session with reconnection. */
734
735 struct jsonrpc_session {
736     struct reconnect *reconnect;
737     struct jsonrpc *rpc;
738     struct stream *stream;
739     struct pstream *pstream;
740     unsigned int seqno;
741     uint8_t dscp;
742 };
743
744 /* Creates and returns a jsonrpc_session to 'name', which should be a string
745  * acceptable to stream_open() or pstream_open().
746  *
747  * If 'name' is an active connection method, e.g. "tcp:127.1.2.3", the new
748  * jsonrpc_session connects and reconnects, with back-off, to 'name'.
749  *
750  * If 'name' is a passive connection method, e.g. "ptcp:", the new
751  * jsonrpc_session listens for connections to 'name'.  It maintains at most one
752  * connection at any given time.  Any new connection causes the previous one
753  * (if any) to be dropped. */
754 struct jsonrpc_session *
755 jsonrpc_session_open(const char *name)
756 {
757     struct jsonrpc_session *s;
758
759     s = xmalloc(sizeof *s);
760     s->reconnect = reconnect_create(time_msec());
761     reconnect_set_name(s->reconnect, name);
762     reconnect_enable(s->reconnect, time_msec());
763     s->rpc = NULL;
764     s->stream = NULL;
765     s->pstream = NULL;
766     s->seqno = 0;
767     s->dscp = 0;
768
769     if (!pstream_verify_name(name)) {
770         reconnect_set_passive(s->reconnect, true, time_msec());
771     }
772
773     if (!stream_or_pstream_needs_probes(name)) {
774         reconnect_set_probe_interval(s->reconnect, 0);
775     }
776
777     return s;
778 }
779
780 /* Creates and returns a jsonrpc_session that is initially connected to
781  * 'jsonrpc'.  If the connection is dropped, it will not be reconnected.
782  *
783  * On the assumption that such connections are likely to be short-lived
784  * (e.g. from ovs-vsctl), informational logging for them is suppressed. */
785 struct jsonrpc_session *
786 jsonrpc_session_open_unreliably(struct jsonrpc *jsonrpc)
787 {
788     struct jsonrpc_session *s;
789
790     s = xmalloc(sizeof *s);
791     s->reconnect = reconnect_create(time_msec());
792     reconnect_set_quiet(s->reconnect, true);
793     reconnect_set_name(s->reconnect, jsonrpc_get_name(jsonrpc));
794     reconnect_set_max_tries(s->reconnect, 0);
795     reconnect_connected(s->reconnect, time_msec());
796     s->rpc = jsonrpc;
797     s->stream = NULL;
798     s->pstream = NULL;
799     s->seqno = 0;
800
801     return s;
802 }
803
804 void
805 jsonrpc_session_close(struct jsonrpc_session *s)
806 {
807     if (s) {
808         jsonrpc_close(s->rpc);
809         reconnect_destroy(s->reconnect);
810         stream_close(s->stream);
811         pstream_close(s->pstream);
812         free(s);
813     }
814 }
815
816 static void
817 jsonrpc_session_disconnect(struct jsonrpc_session *s)
818 {
819     if (s->rpc) {
820         jsonrpc_error(s->rpc, EOF);
821         jsonrpc_close(s->rpc);
822         s->rpc = NULL;
823         s->seqno++;
824     } else if (s->stream) {
825         stream_close(s->stream);
826         s->stream = NULL;
827         s->seqno++;
828     }
829 }
830
831 static void
832 jsonrpc_session_connect(struct jsonrpc_session *s)
833 {
834     const char *name = reconnect_get_name(s->reconnect);
835     int error;
836
837     jsonrpc_session_disconnect(s);
838     if (!reconnect_is_passive(s->reconnect)) {
839         error = jsonrpc_stream_open(name, &s->stream, s->dscp);
840         if (!error) {
841             reconnect_connecting(s->reconnect, time_msec());
842         }
843     } else {
844         error = s->pstream ? 0 : jsonrpc_pstream_open(name, &s->pstream,
845                                                       s->dscp);
846         if (!error) {
847             reconnect_listening(s->reconnect, time_msec());
848         }
849     }
850
851     if (error) {
852         reconnect_connect_failed(s->reconnect, time_msec(), error);
853     }
854     s->seqno++;
855 }
856
857 void
858 jsonrpc_session_run(struct jsonrpc_session *s)
859 {
860     if (s->pstream) {
861         struct stream *stream;
862         int error;
863
864         error = pstream_accept(s->pstream, &stream);
865         if (!error) {
866             if (s->rpc || s->stream) {
867                 VLOG_INFO_RL(&rl,
868                              "%s: new connection replacing active connection",
869                              reconnect_get_name(s->reconnect));
870                 jsonrpc_session_disconnect(s);
871             }
872             reconnect_connected(s->reconnect, time_msec());
873             s->rpc = jsonrpc_open(stream);
874         } else if (error != EAGAIN) {
875             reconnect_listen_error(s->reconnect, time_msec(), error);
876             pstream_close(s->pstream);
877             s->pstream = NULL;
878         }
879     }
880
881     if (s->rpc) {
882         int error;
883
884         jsonrpc_run(s->rpc);
885         error = jsonrpc_get_status(s->rpc);
886         if (error) {
887             reconnect_disconnected(s->reconnect, time_msec(), error);
888             jsonrpc_session_disconnect(s);
889         }
890     } else if (s->stream) {
891         int error;
892
893         stream_run(s->stream);
894         error = stream_connect(s->stream);
895         if (!error) {
896             reconnect_connected(s->reconnect, time_msec());
897             s->rpc = jsonrpc_open(s->stream);
898             s->stream = NULL;
899         } else if (error != EAGAIN) {
900             reconnect_connect_failed(s->reconnect, time_msec(), error);
901             stream_close(s->stream);
902             s->stream = NULL;
903         }
904     }
905
906     switch (reconnect_run(s->reconnect, time_msec())) {
907     case RECONNECT_CONNECT:
908         jsonrpc_session_connect(s);
909         break;
910
911     case RECONNECT_DISCONNECT:
912         reconnect_disconnected(s->reconnect, time_msec(), 0);
913         jsonrpc_session_disconnect(s);
914         break;
915
916     case RECONNECT_PROBE:
917         if (s->rpc) {
918             struct json *params;
919             struct jsonrpc_msg *request;
920
921             params = json_array_create_empty();
922             request = jsonrpc_create_request("echo", params, NULL);
923             json_destroy(request->id);
924             request->id = json_string_create("echo");
925             jsonrpc_send(s->rpc, request);
926         }
927         break;
928     }
929 }
930
931 void
932 jsonrpc_session_wait(struct jsonrpc_session *s)
933 {
934     if (s->rpc) {
935         jsonrpc_wait(s->rpc);
936     } else if (s->stream) {
937         stream_run_wait(s->stream);
938         stream_connect_wait(s->stream);
939     }
940     if (s->pstream) {
941         pstream_wait(s->pstream);
942     }
943     reconnect_wait(s->reconnect, time_msec());
944 }
945
946 size_t
947 jsonrpc_session_get_backlog(const struct jsonrpc_session *s)
948 {
949     return s->rpc ? jsonrpc_get_backlog(s->rpc) : 0;
950 }
951
952 /* Always returns a pointer to a valid C string, assuming 's' was initialized
953  * correctly. */
954 const char *
955 jsonrpc_session_get_name(const struct jsonrpc_session *s)
956 {
957     return reconnect_get_name(s->reconnect);
958 }
959
960 /* Always takes ownership of 'msg', regardless of success. */
961 int
962 jsonrpc_session_send(struct jsonrpc_session *s, struct jsonrpc_msg *msg)
963 {
964     if (s->rpc) {
965         return jsonrpc_send(s->rpc, msg);
966     } else {
967         jsonrpc_msg_destroy(msg);
968         return ENOTCONN;
969     }
970 }
971
972 struct jsonrpc_msg *
973 jsonrpc_session_recv(struct jsonrpc_session *s)
974 {
975     if (s->rpc) {
976         struct jsonrpc_msg *msg;
977         jsonrpc_recv(s->rpc, &msg);
978         if (msg) {
979             reconnect_received(s->reconnect, time_msec());
980             if (msg->type == JSONRPC_REQUEST && !strcmp(msg->method, "echo")) {
981                 /* Echo request.  Send reply. */
982                 struct jsonrpc_msg *reply;
983
984                 reply = jsonrpc_create_reply(json_clone(msg->params), msg->id);
985                 jsonrpc_session_send(s, reply);
986             } else if (msg->type == JSONRPC_REPLY
987                        && msg->id && msg->id->type == JSON_STRING
988                        && !strcmp(msg->id->u.string, "echo")) {
989                 /* It's a reply to our echo request.  Suppress it. */
990             } else {
991                 return msg;
992             }
993             jsonrpc_msg_destroy(msg);
994         }
995     }
996     return NULL;
997 }
998
999 void
1000 jsonrpc_session_recv_wait(struct jsonrpc_session *s)
1001 {
1002     if (s->rpc) {
1003         jsonrpc_recv_wait(s->rpc);
1004     }
1005 }
1006
1007 bool
1008 jsonrpc_session_is_alive(const struct jsonrpc_session *s)
1009 {
1010     return s->rpc || s->stream || reconnect_get_max_tries(s->reconnect);
1011 }
1012
1013 bool
1014 jsonrpc_session_is_connected(const struct jsonrpc_session *s)
1015 {
1016     return s->rpc != NULL;
1017 }
1018
1019 unsigned int
1020 jsonrpc_session_get_seqno(const struct jsonrpc_session *s)
1021 {
1022     return s->seqno;
1023 }
1024
1025 int
1026 jsonrpc_session_get_status(const struct jsonrpc_session *s)
1027 {
1028     return s && s->rpc ? jsonrpc_get_status(s->rpc) : 0;
1029 }
1030
1031 void
1032 jsonrpc_session_get_reconnect_stats(const struct jsonrpc_session *s,
1033                                     struct reconnect_stats *stats)
1034 {
1035     reconnect_get_stats(s->reconnect, time_msec(), stats);
1036 }
1037
1038 void
1039 jsonrpc_session_force_reconnect(struct jsonrpc_session *s)
1040 {
1041     reconnect_force_reconnect(s->reconnect, time_msec());
1042 }
1043
1044 void
1045 jsonrpc_session_set_max_backoff(struct jsonrpc_session *s, int max_backoff)
1046 {
1047     reconnect_set_backoff(s->reconnect, 0, max_backoff);
1048 }
1049
1050 void
1051 jsonrpc_session_set_probe_interval(struct jsonrpc_session *s,
1052                                    int probe_interval)
1053 {
1054     reconnect_set_probe_interval(s->reconnect, probe_interval);
1055 }
1056
1057 void
1058 jsonrpc_session_set_dscp(struct jsonrpc_session *s,
1059                          uint8_t dscp)
1060 {
1061     s->dscp = dscp;
1062 }