#include "json.h"
#include "jsonrpc.h"
#include "ovsdb.h"
+#include "reconnect.h"
#include "stream.h"
#include "svec.h"
#include "timeval.h"
struct ovsdb_jsonrpc_session {
struct ovsdb_jsonrpc_server *server;
struct list node; /* Element in server's sessions list. */
- struct jsonrpc *rpc;
struct hmap triggers;
struct list completions; /* Completed triggers. */
+
+ struct reconnect *reconnect; /* For back-off. */
+ bool active; /* Active or passive connection? */
+ struct jsonrpc *rpc;
+ struct stream *stream; /* Only if active == false and rpc == NULL. */
};
-static void ovsdb_jsonrpc_session_open(struct ovsdb_jsonrpc_server *,
- struct stream *);
+static void ovsdb_jsonrpc_session_create_active(struct ovsdb_jsonrpc_server *,
+ const char *name);
+static void ovsdb_jsonrpc_session_create_passive(struct ovsdb_jsonrpc_server *,
+ struct stream *);
static void ovsdb_jsonrpc_session_close(struct ovsdb_jsonrpc_session *);
+static void ovsdb_jsonrpc_session_disconnect(struct ovsdb_jsonrpc_session *s);
+static int ovsdb_jsonrpc_session_run(struct ovsdb_jsonrpc_session *);
+static void ovsdb_jsonrpc_session_wait(struct ovsdb_jsonrpc_session *);
static void ovsdb_jsonrpc_session_got_request(struct ovsdb_jsonrpc_session *,
struct jsonrpc_msg *);
static void ovsdb_jsonrpc_session_got_notify(struct ovsdb_jsonrpc_session *,
list_init(&server->sessions);
SVEC_FOR_EACH (i, name, active) {
- struct stream *stream;
- int error;
-
- error = stream_open(name, &stream);
- if (!error) {
- ovsdb_jsonrpc_session_open(server, stream);
- } else {
- ovs_error(error, "%s: connection failed", name);
- retval = error;
- }
+ ovsdb_jsonrpc_session_create_active(server, name);
}
SVEC_FOR_EACH (i, name, passive) {
error = pstream_accept(listener, &stream);
if (!error) {
- ovsdb_jsonrpc_session_open(svr, stream);
+ ovsdb_jsonrpc_session_create_passive(svr, stream);
} else if (error == EAGAIN) {
i++;
} else if (error) {
/* Handle each session. */
LIST_FOR_EACH_SAFE (s, next, struct ovsdb_jsonrpc_session, node,
&svr->sessions) {
- struct jsonrpc_msg *msg;
- int error;
-
- jsonrpc_run(s->rpc);
-
- while (!list_is_empty(&s->completions)) {
- struct ovsdb_jsonrpc_trigger *t
- = CONTAINER_OF(s->completions.next,
- struct ovsdb_jsonrpc_trigger, trigger.node);
- ovsdb_jsonrpc_trigger_complete(t);
- }
-
- if (!jsonrpc_get_backlog(s->rpc) && !jsonrpc_recv(s->rpc, &msg)) {
- if (msg->type == JSONRPC_REQUEST) {
- ovsdb_jsonrpc_session_got_request(s, msg);
- } else if (msg->type == JSONRPC_NOTIFY) {
- ovsdb_jsonrpc_session_got_notify(s, msg);
- } else {
- VLOG_WARN("%s: received unexpected %s message",
- jsonrpc_get_name(s->rpc),
- jsonrpc_msg_type_to_string(msg->type));
- jsonrpc_error(s->rpc, EPROTO);
- jsonrpc_msg_destroy(msg);
- }
- }
-
- error = jsonrpc_get_status(s->rpc);
+ int error = ovsdb_jsonrpc_session_run(s);
if (error) {
ovsdb_jsonrpc_session_close(s);
}
}
LIST_FOR_EACH (s, struct ovsdb_jsonrpc_session, node, &svr->sessions) {
- jsonrpc_wait(s->rpc);
- if (!jsonrpc_get_backlog(s->rpc)) {
- jsonrpc_recv_wait(s->rpc);
- }
+ ovsdb_jsonrpc_session_wait(s);
}
}
{
struct ovsdb_jsonrpc_session *s = t->session;
- if (!jsonrpc_get_status(s->rpc)) {
+ if (s->rpc && !jsonrpc_get_status(s->rpc)) {
struct jsonrpc_msg *reply;
struct json *result;
free(t);
}
-static void
-ovsdb_jsonrpc_session_open(struct ovsdb_jsonrpc_server *svr,
- struct stream *stream)
+static struct ovsdb_jsonrpc_session *
+ovsdb_jsonrpc_session_create(struct ovsdb_jsonrpc_server *svr,
+ const char *name, bool active)
{
struct ovsdb_jsonrpc_session *s;
s = xzalloc(sizeof *s);
s->server = svr;
list_push_back(&svr->sessions, &s->node);
- s->rpc = jsonrpc_open(stream);
hmap_init(&s->triggers);
list_init(&s->completions);
+ s->reconnect = reconnect_create(time_msec());
+ reconnect_set_name(s->reconnect, name);
+ reconnect_enable(s->reconnect, time_msec());
+ s->active = active;
+
svr->n_sessions++;
+
+ return s;
}
static void
-ovsdb_jsonrpc_session_close(struct ovsdb_jsonrpc_session *s)
+ovsdb_jsonrpc_session_create_active(struct ovsdb_jsonrpc_server *svr,
+ const char *name)
{
- struct ovsdb_jsonrpc_trigger *t, *next;
+ ovsdb_jsonrpc_session_create(svr, name, true);
+}
- jsonrpc_error(s->rpc, EOF);
- HMAP_FOR_EACH_SAFE (t, next, struct ovsdb_jsonrpc_trigger, hmap_node,
- &s->triggers) {
- ovsdb_jsonrpc_trigger_complete(t);
- }
+static void
+ovsdb_jsonrpc_session_create_passive(struct ovsdb_jsonrpc_server *svr,
+ struct stream *stream)
+{
+ struct ovsdb_jsonrpc_session *s;
- jsonrpc_close(s->rpc);
+ s = ovsdb_jsonrpc_session_create(svr, stream_get_name(stream), false);
+ reconnect_connected(s->reconnect, time_msec());
+ s->rpc = jsonrpc_open(stream);
+}
+static void
+ovsdb_jsonrpc_session_close(struct ovsdb_jsonrpc_session *s)
+{
+ ovsdb_jsonrpc_session_disconnect(s);
list_remove(&s->node);
s->server->n_sessions--;
}
+static void
+ovsdb_jsonrpc_session_disconnect(struct ovsdb_jsonrpc_session *s)
+{
+ reconnect_disconnected(s->reconnect, time_msec(), 0);
+ if (s->rpc) {
+ struct ovsdb_jsonrpc_trigger *t, *next;
+
+ jsonrpc_error(s->rpc, EOF);
+ HMAP_FOR_EACH_SAFE (t, next, struct ovsdb_jsonrpc_trigger, hmap_node,
+ &s->triggers) {
+ ovsdb_jsonrpc_trigger_complete(t);
+ }
+
+ jsonrpc_close(s->rpc);
+ s->rpc = NULL;
+ } else if (s->stream) {
+ stream_close(s->stream);
+ s->stream = NULL;
+ }
+}
+
+static void
+ovsdb_jsonrpc_session_connect(struct ovsdb_jsonrpc_session *s)
+{
+ ovsdb_jsonrpc_session_disconnect(s);
+ if (s->active) {
+ int error = stream_open(reconnect_get_name(s->reconnect), &s->stream);
+ if (error) {
+ reconnect_connect_failed(s->reconnect, time_msec(), error);
+ } else {
+ reconnect_connecting(s->reconnect, time_msec());
+ }
+ }
+}
+
+static int
+ovsdb_jsonrpc_session_run(struct ovsdb_jsonrpc_session *s)
+{
+ if (s->rpc) {
+ struct jsonrpc_msg *msg;
+ int error;
+
+ jsonrpc_run(s->rpc);
+
+ while (!list_is_empty(&s->completions)) {
+ struct ovsdb_jsonrpc_trigger *t
+ = CONTAINER_OF(s->completions.next,
+ struct ovsdb_jsonrpc_trigger, trigger.node);
+ ovsdb_jsonrpc_trigger_complete(t);
+ }
+
+ if (!jsonrpc_get_backlog(s->rpc) && !jsonrpc_recv(s->rpc, &msg)) {
+ reconnect_received(s->reconnect, time_msec());
+ if (msg->type == JSONRPC_REQUEST) {
+ ovsdb_jsonrpc_session_got_request(s, msg);
+ } else if (msg->type == JSONRPC_NOTIFY) {
+ ovsdb_jsonrpc_session_got_notify(s, msg);
+ } else {
+ VLOG_WARN("%s: received unexpected %s message",
+ jsonrpc_get_name(s->rpc),
+ jsonrpc_msg_type_to_string(msg->type));
+ jsonrpc_error(s->rpc, EPROTO);
+ jsonrpc_msg_destroy(msg);
+ }
+ }
+
+ error = jsonrpc_get_status(s->rpc);
+ if (error) {
+ if (s->active) {
+ ovsdb_jsonrpc_session_disconnect(s);
+ } else {
+ return error;
+ }
+ }
+ } else if (s->stream) {
+ int error = stream_connect(s->stream);
+ if (!error) {
+ reconnect_connected(s->reconnect, time_msec());
+ s->rpc = jsonrpc_open(s->stream);
+ s->stream = NULL;
+ } else if (error != EAGAIN) {
+ reconnect_connect_failed(s->reconnect, time_msec(), error);
+ stream_close(s->stream);
+ s->stream = NULL;
+ }
+ }
+
+ switch (reconnect_run(s->reconnect, time_msec())) {
+ case RECONNECT_CONNECT:
+ ovsdb_jsonrpc_session_connect(s);
+ break;
+
+ case RECONNECT_DISCONNECT:
+ ovsdb_jsonrpc_session_disconnect(s);
+ break;
+
+ case RECONNECT_PROBE:
+ if (s->rpc) {
+ struct json *params = json_integer_create(0);
+ jsonrpc_send(s->rpc, jsonrpc_create_request("echo", params));
+ }
+ break;
+ }
+ return s->active || s->rpc ? 0 : ETIMEDOUT;
+
+}
+
+static void
+ovsdb_jsonrpc_session_wait(struct ovsdb_jsonrpc_session *s)
+{
+ if (s->rpc) {
+ jsonrpc_wait(s->rpc);
+ if (!jsonrpc_get_backlog(s->rpc)) {
+ jsonrpc_recv_wait(s->rpc);
+ }
+ } else if (s->stream) {
+ stream_connect_wait(s->stream);
+ }
+ reconnect_wait(s->reconnect, time_msec());
+}
+
static struct jsonrpc_msg *
execute_transaction(struct ovsdb_jsonrpc_session *s,
struct jsonrpc_msg *request)
} else if (!strcmp(request->method, "get_schema")) {
reply = jsonrpc_create_reply(
ovsdb_schema_to_json(s->server->db->schema), request->id);
+ } else if (!strcmp(request->method, "echo")) {
+ reply = jsonrpc_create_reply(json_clone(request->params), request->id);
} else {
reply = jsonrpc_create_error(json_string_create("unknown method"),
request->id);