#include <errno.h>
+#include "column.h"
#include "json.h"
#include "jsonrpc.h"
#include "ovsdb.h"
#include "reconnect.h"
#include "stream.h"
-#include "svec.h"
#include "timeval.h"
#include "trigger.h"
#define THIS_MODULE VLM_ovsdb_jsonrpc_server
#include "vlog.h"
-struct ovsdb_jsonrpc_trigger {
- struct ovsdb_trigger trigger;
- struct ovsdb_jsonrpc_session *session;
- struct hmap_node hmap_node; /* Element in session's trigger table. */
- struct json *id;
-};
-
-static struct ovsdb_jsonrpc_trigger *ovsdb_jsonrpc_trigger_find(
- struct ovsdb_jsonrpc_session *, const struct json *id, size_t hash);
-static void ovsdb_jsonrpc_trigger_complete(struct ovsdb_jsonrpc_trigger *);
-
-struct ovsdb_jsonrpc_session {
- struct ovsdb_jsonrpc_server *server;
- struct list node; /* Element in server's sessions list. */
- struct hmap triggers;
- struct list completions; /* Completed triggers. */
-
- struct reconnect *reconnect; /* For back-off. */
- bool active; /* Active or passive connection? */
- struct jsonrpc *rpc;
- struct stream *stream; /* Only if active == false and rpc == NULL. */
-};
+struct ovsdb_jsonrpc_session;
static void ovsdb_jsonrpc_session_create_active(struct ovsdb_jsonrpc_server *,
const char *name);
static void ovsdb_jsonrpc_session_create_passive(struct ovsdb_jsonrpc_server *,
struct stream *);
-static void ovsdb_jsonrpc_session_close(struct ovsdb_jsonrpc_session *);
-static void ovsdb_jsonrpc_session_disconnect(struct ovsdb_jsonrpc_session *s);
-static int ovsdb_jsonrpc_session_run(struct ovsdb_jsonrpc_session *);
-static void ovsdb_jsonrpc_session_wait(struct ovsdb_jsonrpc_session *);
-static void ovsdb_jsonrpc_session_got_request(struct ovsdb_jsonrpc_session *,
- struct jsonrpc_msg *);
-static void ovsdb_jsonrpc_session_got_notify(struct ovsdb_jsonrpc_session *,
- struct jsonrpc_msg *);
+static void ovsdb_jsonrpc_session_run_all(struct ovsdb_jsonrpc_server *);
+static void ovsdb_jsonrpc_session_wait_all(struct ovsdb_jsonrpc_server *);
+
+static void ovsdb_jsonrpc_trigger_create(struct ovsdb_jsonrpc_session *,
+ struct json *id, struct json *params);
+static struct ovsdb_jsonrpc_trigger *ovsdb_jsonrpc_trigger_find(
+ struct ovsdb_jsonrpc_session *, const struct json *id, size_t hash);
+static void ovsdb_jsonrpc_trigger_complete(struct ovsdb_jsonrpc_trigger *);
+static void ovsdb_jsonrpc_trigger_complete_all(struct ovsdb_jsonrpc_session *);
+static void ovsdb_jsonrpc_trigger_complete_done(
+ struct ovsdb_jsonrpc_session *);
+\f
+/* JSON-RPC database server. */
struct ovsdb_jsonrpc_server {
struct ovsdb *db;
size_t n_listeners, allocated_listeners;
};
-static void ovsdb_jsonrpc_server_listen(struct ovsdb_jsonrpc_server *,
- struct pstream *);
-
-int
-ovsdb_jsonrpc_server_create(struct ovsdb *db, const struct svec *active,
- const struct svec *passive,
- struct ovsdb_jsonrpc_server **serverp)
+struct ovsdb_jsonrpc_server *
+ovsdb_jsonrpc_server_create(struct ovsdb *db)
{
- struct ovsdb_jsonrpc_server *server;
- const char *name;
- int retval = 0;
- size_t i;
-
- server = xzalloc(sizeof *server);
+ struct ovsdb_jsonrpc_server *server = xzalloc(sizeof *server);
server->db = db;
server->max_sessions = 64;
server->max_triggers = 64;
list_init(&server->sessions);
+ return server;
+}
- SVEC_FOR_EACH (i, name, active) {
- ovsdb_jsonrpc_session_create_active(server, name);
- }
+int
+ovsdb_jsonrpc_server_listen(struct ovsdb_jsonrpc_server *svr, const char *name)
+{
+ struct pstream *pstream;
+ int error;
- SVEC_FOR_EACH (i, name, passive) {
- struct pstream *pstream;
- int error;
+ error = pstream_open(name, &pstream);
+ if (error) {
+ return error;
+ }
- error = pstream_open(name, &pstream);
- if (!error) {
- ovsdb_jsonrpc_server_listen(server, pstream);
- } else {
- ovs_error(error, "failed to listen on %s", name);
- retval = error;
- }
+ if (svr->n_listeners >= svr->allocated_listeners) {
+ svr->listeners = x2nrealloc(svr->listeners, &svr->allocated_listeners,
+ sizeof *svr->listeners);
}
+ svr->listeners[svr->n_listeners++] = pstream;
+ return 0;
+}
- *serverp = server;
- return retval;
+void
+ovsdb_jsonrpc_server_connect(struct ovsdb_jsonrpc_server *svr,
+ const char *name)
+{
+ ovsdb_jsonrpc_session_create_active(svr, name);
}
void
ovsdb_jsonrpc_server_run(struct ovsdb_jsonrpc_server *svr)
{
- struct ovsdb_jsonrpc_session *s, *next;
size_t i;
/* Accept new connections. */
}
/* Handle each session. */
- LIST_FOR_EACH_SAFE (s, next, struct ovsdb_jsonrpc_session, node,
- &svr->sessions) {
- int error = ovsdb_jsonrpc_session_run(s);
- if (error) {
- ovsdb_jsonrpc_session_close(s);
- }
- }
+ ovsdb_jsonrpc_session_run_all(svr);
}
void
ovsdb_jsonrpc_server_wait(struct ovsdb_jsonrpc_server *svr)
{
- struct ovsdb_jsonrpc_session *s;
-
if (svr->n_sessions < svr->max_sessions) {
size_t i;
}
}
- LIST_FOR_EACH (s, struct ovsdb_jsonrpc_session, node, &svr->sessions) {
- ovsdb_jsonrpc_session_wait(s);
- }
-}
-
-static void
-ovsdb_jsonrpc_server_listen(struct ovsdb_jsonrpc_server *svr,
- struct pstream *pstream)
-{
- if (svr->n_listeners >= svr->allocated_listeners) {
- svr->listeners = x2nrealloc(svr->listeners, &svr->allocated_listeners,
- sizeof *svr->listeners);
- }
- svr->listeners[svr->n_listeners++] = pstream;
-}
-
-static struct ovsdb_jsonrpc_trigger *
-ovsdb_jsonrpc_trigger_find(struct ovsdb_jsonrpc_session *s,
- const struct json *id, size_t hash)
-{
- struct ovsdb_jsonrpc_trigger *t;
-
- HMAP_FOR_EACH_WITH_HASH (t, struct ovsdb_jsonrpc_trigger, hmap_node, hash,
- &s->triggers) {
- if (json_equal(t->id, id)) {
- return t;
- }
- }
-
- return NULL;
+ ovsdb_jsonrpc_session_wait_all(svr);
}
+\f
+/* JSON-RPC database server session. */
-static void
-ovsdb_jsonrpc_trigger_complete(struct ovsdb_jsonrpc_trigger *t)
-{
- struct ovsdb_jsonrpc_session *s = t->session;
+struct ovsdb_jsonrpc_session {
+ struct ovsdb_jsonrpc_server *server;
+ struct list node; /* Element in server's sessions list. */
- if (s->rpc && !jsonrpc_get_status(s->rpc)) {
- struct jsonrpc_msg *reply;
- struct json *result;
+ /* Triggers. */
+ struct hmap triggers; /* Hmap of "struct ovsdb_jsonrpc_trigger"s. */
+ struct list completions; /* Completed triggers. */
- result = ovsdb_trigger_steal_result(&t->trigger);
- if (result) {
- reply = jsonrpc_create_reply(result, t->id);
- } else {
- reply = jsonrpc_create_error(json_string_create("canceled"),
- t->id);
- }
- jsonrpc_send(s->rpc, reply);
- }
+ /* Connecting and reconnecting. */
+ struct reconnect *reconnect; /* For back-off. */
+ bool active; /* Active or passive connection? */
+ struct jsonrpc *rpc;
+ struct stream *stream; /* Only if active == false and rpc == NULL. */
+};
- json_destroy(t->id);
- ovsdb_trigger_destroy(&t->trigger);
- hmap_remove(&s->triggers, &t->hmap_node);
- free(t);
-}
+static void ovsdb_jsonrpc_session_close(struct ovsdb_jsonrpc_session *);
+static void ovsdb_jsonrpc_session_disconnect(struct ovsdb_jsonrpc_session *s);
+static int ovsdb_jsonrpc_session_run(struct ovsdb_jsonrpc_session *);
+static void ovsdb_jsonrpc_session_wait(struct ovsdb_jsonrpc_session *);
+static void ovsdb_jsonrpc_session_got_request(struct ovsdb_jsonrpc_session *,
+ struct jsonrpc_msg *);
+static void ovsdb_jsonrpc_session_got_notify(struct ovsdb_jsonrpc_session *,
+ struct jsonrpc_msg *);
static struct ovsdb_jsonrpc_session *
ovsdb_jsonrpc_session_create(struct ovsdb_jsonrpc_server *svr,
{
reconnect_disconnected(s->reconnect, time_msec(), 0);
if (s->rpc) {
- struct ovsdb_jsonrpc_trigger *t, *next;
-
jsonrpc_error(s->rpc, EOF);
- HMAP_FOR_EACH_SAFE (t, next, struct ovsdb_jsonrpc_trigger, hmap_node,
- &s->triggers) {
- ovsdb_jsonrpc_trigger_complete(t);
- }
-
+ ovsdb_jsonrpc_trigger_complete_all(s);
jsonrpc_close(s->rpc);
s->rpc = NULL;
} else if (s->stream) {
jsonrpc_run(s->rpc);
- while (!list_is_empty(&s->completions)) {
- struct ovsdb_jsonrpc_trigger *t
- = CONTAINER_OF(s->completions.next,
- struct ovsdb_jsonrpc_trigger, trigger.node);
- ovsdb_jsonrpc_trigger_complete(t);
- }
+ ovsdb_jsonrpc_trigger_complete_done(s);
if (!jsonrpc_get_backlog(s->rpc) && !jsonrpc_recv(s->rpc, &msg)) {
reconnect_received(s->reconnect, time_msec());
break;
}
return s->active || s->rpc ? 0 : ETIMEDOUT;
+}
+static void
+ovsdb_jsonrpc_session_run_all(struct ovsdb_jsonrpc_server *svr)
+{
+ struct ovsdb_jsonrpc_session *s, *next;
+
+ LIST_FOR_EACH_SAFE (s, next, struct ovsdb_jsonrpc_session, node,
+ &svr->sessions) {
+ int error = ovsdb_jsonrpc_session_run(s);
+ if (error) {
+ ovsdb_jsonrpc_session_close(s);
+ }
+ }
}
static void
reconnect_wait(s->reconnect, time_msec());
}
-static struct jsonrpc_msg *
-execute_transaction(struct ovsdb_jsonrpc_session *s,
- struct jsonrpc_msg *request)
+static void
+ovsdb_jsonrpc_session_wait_all(struct ovsdb_jsonrpc_server *svr)
{
- struct ovsdb_jsonrpc_trigger *t;
- size_t hash;
+ struct ovsdb_jsonrpc_session *s;
- /* Check for duplicate ID. */
- hash = json_hash(request->id, 0);
- t = ovsdb_jsonrpc_trigger_find(s, request->id, hash);
- if (t) {
- return jsonrpc_create_error(
- json_string_create("duplicate request ID"), request->id);
+ LIST_FOR_EACH (s, struct ovsdb_jsonrpc_session, node, &svr->sessions) {
+ ovsdb_jsonrpc_session_wait(s);
}
+}
- /* Insert into trigger table. */
- t = xmalloc(sizeof *t);
- ovsdb_trigger_init(s->server->db,
- &t->trigger, request->params, &s->completions,
- time_msec());
- t->session = s;
- t->id = request->id;
- hmap_insert(&s->triggers, &t->hmap_node, hash);
-
+static struct jsonrpc_msg *
+execute_transaction(struct ovsdb_jsonrpc_session *s,
+ struct jsonrpc_msg *request)
+{
+ ovsdb_jsonrpc_trigger_create(s, request->id, request->params);
request->id = NULL;
request->params = NULL;
-
- /* Complete early if possible. */
- if (ovsdb_trigger_is_complete(&t->trigger)) {
- ovsdb_jsonrpc_trigger_complete(t);
- }
-
return NULL;
}
}
jsonrpc_msg_destroy(request);
}
+\f
+/* JSON-RPC database server triggers.
+ *
+ * (Every transaction is treated as a trigger even if it doesn't actually have
+ * any "wait" operations.) */
+
+struct ovsdb_jsonrpc_trigger {
+ struct ovsdb_trigger trigger;
+ struct ovsdb_jsonrpc_session *session;
+ struct hmap_node hmap_node; /* In session's "triggers" hmap. */
+ struct json *id;
+};
+
+static void
+ovsdb_jsonrpc_trigger_create(struct ovsdb_jsonrpc_session *s,
+ struct json *id, struct json *params)
+{
+ struct ovsdb_jsonrpc_trigger *t;
+ size_t hash;
+
+ /* Check for duplicate ID. */
+ hash = json_hash(id, 0);
+ t = ovsdb_jsonrpc_trigger_find(s, id, hash);
+ if (t) {
+ jsonrpc_send(s->rpc, jsonrpc_create_error(
+ json_string_create("duplicate request ID"), id));
+ json_destroy(id);
+ json_destroy(params);
+ return;
+ }
+
+ /* Insert into trigger table. */
+ t = xmalloc(sizeof *t);
+ ovsdb_trigger_init(s->server->db,
+ &t->trigger, params, &s->completions,
+ time_msec());
+ t->session = s;
+ t->id = id;
+ hmap_insert(&s->triggers, &t->hmap_node, hash);
+
+ /* Complete early if possible. */
+ if (ovsdb_trigger_is_complete(&t->trigger)) {
+ ovsdb_jsonrpc_trigger_complete(t);
+ }
+}
+
+static struct ovsdb_jsonrpc_trigger *
+ovsdb_jsonrpc_trigger_find(struct ovsdb_jsonrpc_session *s,
+ const struct json *id, size_t hash)
+{
+ struct ovsdb_jsonrpc_trigger *t;
+
+ HMAP_FOR_EACH_WITH_HASH (t, struct ovsdb_jsonrpc_trigger, hmap_node, hash,
+ &s->triggers) {
+ if (json_equal(t->id, id)) {
+ return t;
+ }
+ }
+
+ return NULL;
+}
+
+static void
+ovsdb_jsonrpc_trigger_complete(struct ovsdb_jsonrpc_trigger *t)
+{
+ struct ovsdb_jsonrpc_session *s = t->session;
+
+ if (s->rpc && !jsonrpc_get_status(s->rpc)) {
+ struct jsonrpc_msg *reply;
+ struct json *result;
+
+ result = ovsdb_trigger_steal_result(&t->trigger);
+ if (result) {
+ reply = jsonrpc_create_reply(result, t->id);
+ } else {
+ reply = jsonrpc_create_error(json_string_create("canceled"),
+ t->id);
+ }
+ jsonrpc_send(s->rpc, reply);
+ }
+
+ json_destroy(t->id);
+ ovsdb_trigger_destroy(&t->trigger);
+ hmap_remove(&s->triggers, &t->hmap_node);
+ free(t);
+}
+
+static void
+ovsdb_jsonrpc_trigger_complete_all(struct ovsdb_jsonrpc_session *s)
+{
+ struct ovsdb_jsonrpc_trigger *t, *next;
+ HMAP_FOR_EACH_SAFE (t, next, struct ovsdb_jsonrpc_trigger, hmap_node,
+ &s->triggers) {
+ ovsdb_jsonrpc_trigger_complete(t);
+ }
+}
+
+static void
+ovsdb_jsonrpc_trigger_complete_done(struct ovsdb_jsonrpc_session *s)
+{
+ while (!list_is_empty(&s->completions)) {
+ struct ovsdb_jsonrpc_trigger *t
+ = CONTAINER_OF(s->completions.next,
+ struct ovsdb_jsonrpc_trigger, trigger.node);
+ ovsdb_jsonrpc_trigger_complete(t);
+ }
+}