From: Ben Pfaff Date: Fri, 13 Nov 2009 22:47:25 +0000 (-0800) Subject: ovsdb: Refactor JSON-RPC database server implementation. X-Git-Url: https://pintos-os.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=b93d3b6cb28a925f530aef2009602c06f2f5ae60;p=openvswitch ovsdb: Refactor JSON-RPC database server implementation. This refactoring breaks up jsonrpc-server.c in a more modular fashion, in preparation for adding code for table monitors. --- diff --git a/ovsdb/jsonrpc-server.c b/ovsdb/jsonrpc-server.c index cea5ddc7..a42696ac 100644 --- a/ovsdb/jsonrpc-server.c +++ b/ovsdb/jsonrpc-server.c @@ -19,53 +19,37 @@ #include +#include "column.h" #include "json.h" #include "jsonrpc.h" #include "ovsdb.h" #include "reconnect.h" #include "stream.h" -#include "svec.h" #include "timeval.h" #include "trigger.h" #define THIS_MODULE VLM_ovsdb_jsonrpc_server #include "vlog.h" -struct ovsdb_jsonrpc_trigger { - struct ovsdb_trigger trigger; - struct ovsdb_jsonrpc_session *session; - struct hmap_node hmap_node; /* Element in session's trigger table. */ - struct json *id; -}; - -static struct ovsdb_jsonrpc_trigger *ovsdb_jsonrpc_trigger_find( - struct ovsdb_jsonrpc_session *, const struct json *id, size_t hash); -static void ovsdb_jsonrpc_trigger_complete(struct ovsdb_jsonrpc_trigger *); - -struct ovsdb_jsonrpc_session { - struct ovsdb_jsonrpc_server *server; - struct list node; /* Element in server's sessions list. */ - struct hmap triggers; - struct list completions; /* Completed triggers. */ - - struct reconnect *reconnect; /* For back-off. */ - bool active; /* Active or passive connection? */ - struct jsonrpc *rpc; - struct stream *stream; /* Only if active == false and rpc == NULL. */ -}; +struct ovsdb_jsonrpc_session; static void ovsdb_jsonrpc_session_create_active(struct ovsdb_jsonrpc_server *, const char *name); static void ovsdb_jsonrpc_session_create_passive(struct ovsdb_jsonrpc_server *, struct stream *); -static void ovsdb_jsonrpc_session_close(struct ovsdb_jsonrpc_session *); -static void ovsdb_jsonrpc_session_disconnect(struct ovsdb_jsonrpc_session *s); -static int ovsdb_jsonrpc_session_run(struct ovsdb_jsonrpc_session *); -static void ovsdb_jsonrpc_session_wait(struct ovsdb_jsonrpc_session *); -static void ovsdb_jsonrpc_session_got_request(struct ovsdb_jsonrpc_session *, - struct jsonrpc_msg *); -static void ovsdb_jsonrpc_session_got_notify(struct ovsdb_jsonrpc_session *, - struct jsonrpc_msg *); +static void ovsdb_jsonrpc_session_run_all(struct ovsdb_jsonrpc_server *); +static void ovsdb_jsonrpc_session_wait_all(struct ovsdb_jsonrpc_server *); + +static void ovsdb_jsonrpc_trigger_create(struct ovsdb_jsonrpc_session *, + struct json *id, struct json *params); +static struct ovsdb_jsonrpc_trigger *ovsdb_jsonrpc_trigger_find( + struct ovsdb_jsonrpc_session *, const struct json *id, size_t hash); +static void ovsdb_jsonrpc_trigger_complete(struct ovsdb_jsonrpc_trigger *); +static void ovsdb_jsonrpc_trigger_complete_all(struct ovsdb_jsonrpc_session *); +static void ovsdb_jsonrpc_trigger_complete_done( + struct ovsdb_jsonrpc_session *); + +/* JSON-RPC database server. */ struct ovsdb_jsonrpc_server { struct ovsdb *db; @@ -78,50 +62,46 @@ struct ovsdb_jsonrpc_server { size_t n_listeners, allocated_listeners; }; -static void ovsdb_jsonrpc_server_listen(struct ovsdb_jsonrpc_server *, - struct pstream *); - -int -ovsdb_jsonrpc_server_create(struct ovsdb *db, const struct svec *active, - const struct svec *passive, - struct ovsdb_jsonrpc_server **serverp) +struct ovsdb_jsonrpc_server * +ovsdb_jsonrpc_server_create(struct ovsdb *db) { - struct ovsdb_jsonrpc_server *server; - const char *name; - int retval = 0; - size_t i; - - server = xzalloc(sizeof *server); + struct ovsdb_jsonrpc_server *server = xzalloc(sizeof *server); server->db = db; server->max_sessions = 64; server->max_triggers = 64; list_init(&server->sessions); + return server; +} - SVEC_FOR_EACH (i, name, active) { - ovsdb_jsonrpc_session_create_active(server, name); - } +int +ovsdb_jsonrpc_server_listen(struct ovsdb_jsonrpc_server *svr, const char *name) +{ + struct pstream *pstream; + int error; - SVEC_FOR_EACH (i, name, passive) { - struct pstream *pstream; - int error; + error = pstream_open(name, &pstream); + if (error) { + return error; + } - error = pstream_open(name, &pstream); - if (!error) { - ovsdb_jsonrpc_server_listen(server, pstream); - } else { - ovs_error(error, "failed to listen on %s", name); - retval = error; - } + if (svr->n_listeners >= svr->allocated_listeners) { + svr->listeners = x2nrealloc(svr->listeners, &svr->allocated_listeners, + sizeof *svr->listeners); } + svr->listeners[svr->n_listeners++] = pstream; + return 0; +} - *serverp = server; - return retval; +void +ovsdb_jsonrpc_server_connect(struct ovsdb_jsonrpc_server *svr, + const char *name) +{ + ovsdb_jsonrpc_session_create_active(svr, name); } void ovsdb_jsonrpc_server_run(struct ovsdb_jsonrpc_server *svr) { - struct ovsdb_jsonrpc_session *s, *next; size_t i; /* Accept new connections. */ @@ -144,20 +124,12 @@ ovsdb_jsonrpc_server_run(struct ovsdb_jsonrpc_server *svr) } /* Handle each session. */ - LIST_FOR_EACH_SAFE (s, next, struct ovsdb_jsonrpc_session, node, - &svr->sessions) { - int error = ovsdb_jsonrpc_session_run(s); - if (error) { - ovsdb_jsonrpc_session_close(s); - } - } + ovsdb_jsonrpc_session_run_all(svr); } void ovsdb_jsonrpc_server_wait(struct ovsdb_jsonrpc_server *svr) { - struct ovsdb_jsonrpc_session *s; - if (svr->n_sessions < svr->max_sessions) { size_t i; @@ -166,62 +138,34 @@ ovsdb_jsonrpc_server_wait(struct ovsdb_jsonrpc_server *svr) } } - LIST_FOR_EACH (s, struct ovsdb_jsonrpc_session, node, &svr->sessions) { - ovsdb_jsonrpc_session_wait(s); - } -} - -static void -ovsdb_jsonrpc_server_listen(struct ovsdb_jsonrpc_server *svr, - struct pstream *pstream) -{ - if (svr->n_listeners >= svr->allocated_listeners) { - svr->listeners = x2nrealloc(svr->listeners, &svr->allocated_listeners, - sizeof *svr->listeners); - } - svr->listeners[svr->n_listeners++] = pstream; -} - -static struct ovsdb_jsonrpc_trigger * -ovsdb_jsonrpc_trigger_find(struct ovsdb_jsonrpc_session *s, - const struct json *id, size_t hash) -{ - struct ovsdb_jsonrpc_trigger *t; - - HMAP_FOR_EACH_WITH_HASH (t, struct ovsdb_jsonrpc_trigger, hmap_node, hash, - &s->triggers) { - if (json_equal(t->id, id)) { - return t; - } - } - - return NULL; + ovsdb_jsonrpc_session_wait_all(svr); } + +/* JSON-RPC database server session. */ -static void -ovsdb_jsonrpc_trigger_complete(struct ovsdb_jsonrpc_trigger *t) -{ - struct ovsdb_jsonrpc_session *s = t->session; +struct ovsdb_jsonrpc_session { + struct ovsdb_jsonrpc_server *server; + struct list node; /* Element in server's sessions list. */ - if (s->rpc && !jsonrpc_get_status(s->rpc)) { - struct jsonrpc_msg *reply; - struct json *result; + /* Triggers. */ + struct hmap triggers; /* Hmap of "struct ovsdb_jsonrpc_trigger"s. */ + struct list completions; /* Completed triggers. */ - result = ovsdb_trigger_steal_result(&t->trigger); - if (result) { - reply = jsonrpc_create_reply(result, t->id); - } else { - reply = jsonrpc_create_error(json_string_create("canceled"), - t->id); - } - jsonrpc_send(s->rpc, reply); - } + /* Connecting and reconnecting. */ + struct reconnect *reconnect; /* For back-off. */ + bool active; /* Active or passive connection? */ + struct jsonrpc *rpc; + struct stream *stream; /* Only if active == false and rpc == NULL. */ +}; - json_destroy(t->id); - ovsdb_trigger_destroy(&t->trigger); - hmap_remove(&s->triggers, &t->hmap_node); - free(t); -} +static void ovsdb_jsonrpc_session_close(struct ovsdb_jsonrpc_session *); +static void ovsdb_jsonrpc_session_disconnect(struct ovsdb_jsonrpc_session *s); +static int ovsdb_jsonrpc_session_run(struct ovsdb_jsonrpc_session *); +static void ovsdb_jsonrpc_session_wait(struct ovsdb_jsonrpc_session *); +static void ovsdb_jsonrpc_session_got_request(struct ovsdb_jsonrpc_session *, + struct jsonrpc_msg *); +static void ovsdb_jsonrpc_session_got_notify(struct ovsdb_jsonrpc_session *, + struct jsonrpc_msg *); static struct ovsdb_jsonrpc_session * ovsdb_jsonrpc_session_create(struct ovsdb_jsonrpc_server *svr, @@ -275,14 +219,8 @@ ovsdb_jsonrpc_session_disconnect(struct ovsdb_jsonrpc_session *s) { reconnect_disconnected(s->reconnect, time_msec(), 0); if (s->rpc) { - struct ovsdb_jsonrpc_trigger *t, *next; - jsonrpc_error(s->rpc, EOF); - HMAP_FOR_EACH_SAFE (t, next, struct ovsdb_jsonrpc_trigger, hmap_node, - &s->triggers) { - ovsdb_jsonrpc_trigger_complete(t); - } - + ovsdb_jsonrpc_trigger_complete_all(s); jsonrpc_close(s->rpc); s->rpc = NULL; } else if (s->stream) { @@ -314,12 +252,7 @@ ovsdb_jsonrpc_session_run(struct ovsdb_jsonrpc_session *s) jsonrpc_run(s->rpc); - while (!list_is_empty(&s->completions)) { - struct ovsdb_jsonrpc_trigger *t - = CONTAINER_OF(s->completions.next, - struct ovsdb_jsonrpc_trigger, trigger.node); - ovsdb_jsonrpc_trigger_complete(t); - } + ovsdb_jsonrpc_trigger_complete_done(s); if (!jsonrpc_get_backlog(s->rpc) && !jsonrpc_recv(s->rpc, &msg)) { reconnect_received(s->reconnect, time_msec()); @@ -374,7 +307,20 @@ ovsdb_jsonrpc_session_run(struct ovsdb_jsonrpc_session *s) break; } return s->active || s->rpc ? 0 : ETIMEDOUT; +} +static void +ovsdb_jsonrpc_session_run_all(struct ovsdb_jsonrpc_server *svr) +{ + struct ovsdb_jsonrpc_session *s, *next; + + LIST_FOR_EACH_SAFE (s, next, struct ovsdb_jsonrpc_session, node, + &svr->sessions) { + int error = ovsdb_jsonrpc_session_run(s); + if (error) { + ovsdb_jsonrpc_session_close(s); + } + } } static void @@ -391,38 +337,23 @@ ovsdb_jsonrpc_session_wait(struct ovsdb_jsonrpc_session *s) reconnect_wait(s->reconnect, time_msec()); } -static struct jsonrpc_msg * -execute_transaction(struct ovsdb_jsonrpc_session *s, - struct jsonrpc_msg *request) +static void +ovsdb_jsonrpc_session_wait_all(struct ovsdb_jsonrpc_server *svr) { - struct ovsdb_jsonrpc_trigger *t; - size_t hash; + struct ovsdb_jsonrpc_session *s; - /* Check for duplicate ID. */ - hash = json_hash(request->id, 0); - t = ovsdb_jsonrpc_trigger_find(s, request->id, hash); - if (t) { - return jsonrpc_create_error( - json_string_create("duplicate request ID"), request->id); + LIST_FOR_EACH (s, struct ovsdb_jsonrpc_session, node, &svr->sessions) { + ovsdb_jsonrpc_session_wait(s); } +} - /* Insert into trigger table. */ - t = xmalloc(sizeof *t); - ovsdb_trigger_init(s->server->db, - &t->trigger, request->params, &s->completions, - time_msec()); - t->session = s; - t->id = request->id; - hmap_insert(&s->triggers, &t->hmap_node, hash); - +static struct jsonrpc_msg * +execute_transaction(struct ovsdb_jsonrpc_session *s, + struct jsonrpc_msg *request) +{ + ovsdb_jsonrpc_trigger_create(s, request->id, request->params); request->id = NULL; request->params = NULL; - - /* Complete early if possible. */ - if (ovsdb_trigger_is_complete(&t->trigger)) { - ovsdb_jsonrpc_trigger_complete(t); - } - return NULL; } @@ -474,3 +405,110 @@ ovsdb_jsonrpc_session_got_notify(struct ovsdb_jsonrpc_session *s, } jsonrpc_msg_destroy(request); } + +/* JSON-RPC database server triggers. + * + * (Every transaction is treated as a trigger even if it doesn't actually have + * any "wait" operations.) */ + +struct ovsdb_jsonrpc_trigger { + struct ovsdb_trigger trigger; + struct ovsdb_jsonrpc_session *session; + struct hmap_node hmap_node; /* In session's "triggers" hmap. */ + struct json *id; +}; + +static void +ovsdb_jsonrpc_trigger_create(struct ovsdb_jsonrpc_session *s, + struct json *id, struct json *params) +{ + struct ovsdb_jsonrpc_trigger *t; + size_t hash; + + /* Check for duplicate ID. */ + hash = json_hash(id, 0); + t = ovsdb_jsonrpc_trigger_find(s, id, hash); + if (t) { + jsonrpc_send(s->rpc, jsonrpc_create_error( + json_string_create("duplicate request ID"), id)); + json_destroy(id); + json_destroy(params); + return; + } + + /* Insert into trigger table. */ + t = xmalloc(sizeof *t); + ovsdb_trigger_init(s->server->db, + &t->trigger, params, &s->completions, + time_msec()); + t->session = s; + t->id = id; + hmap_insert(&s->triggers, &t->hmap_node, hash); + + /* Complete early if possible. */ + if (ovsdb_trigger_is_complete(&t->trigger)) { + ovsdb_jsonrpc_trigger_complete(t); + } +} + +static struct ovsdb_jsonrpc_trigger * +ovsdb_jsonrpc_trigger_find(struct ovsdb_jsonrpc_session *s, + const struct json *id, size_t hash) +{ + struct ovsdb_jsonrpc_trigger *t; + + HMAP_FOR_EACH_WITH_HASH (t, struct ovsdb_jsonrpc_trigger, hmap_node, hash, + &s->triggers) { + if (json_equal(t->id, id)) { + return t; + } + } + + return NULL; +} + +static void +ovsdb_jsonrpc_trigger_complete(struct ovsdb_jsonrpc_trigger *t) +{ + struct ovsdb_jsonrpc_session *s = t->session; + + if (s->rpc && !jsonrpc_get_status(s->rpc)) { + struct jsonrpc_msg *reply; + struct json *result; + + result = ovsdb_trigger_steal_result(&t->trigger); + if (result) { + reply = jsonrpc_create_reply(result, t->id); + } else { + reply = jsonrpc_create_error(json_string_create("canceled"), + t->id); + } + jsonrpc_send(s->rpc, reply); + } + + json_destroy(t->id); + ovsdb_trigger_destroy(&t->trigger); + hmap_remove(&s->triggers, &t->hmap_node); + free(t); +} + +static void +ovsdb_jsonrpc_trigger_complete_all(struct ovsdb_jsonrpc_session *s) +{ + struct ovsdb_jsonrpc_trigger *t, *next; + HMAP_FOR_EACH_SAFE (t, next, struct ovsdb_jsonrpc_trigger, hmap_node, + &s->triggers) { + ovsdb_jsonrpc_trigger_complete(t); + } +} + +static void +ovsdb_jsonrpc_trigger_complete_done(struct ovsdb_jsonrpc_session *s) +{ + while (!list_is_empty(&s->completions)) { + struct ovsdb_jsonrpc_trigger *t + = CONTAINER_OF(s->completions.next, + struct ovsdb_jsonrpc_trigger, trigger.node); + ovsdb_jsonrpc_trigger_complete(t); + } +} diff --git a/ovsdb/jsonrpc-server.h b/ovsdb/jsonrpc-server.h index 49b5f8a9..0a80ac61 100644 --- a/ovsdb/jsonrpc-server.h +++ b/ovsdb/jsonrpc-server.h @@ -17,12 +17,14 @@ #define OVSDB_JSONRPC_SERVER_H 1 struct ovsdb; -struct ovsdb_jsonrpc_server; -struct svec; -int ovsdb_jsonrpc_server_create(struct ovsdb *, const struct svec *active, - const struct svec *passive, - struct ovsdb_jsonrpc_server **); +struct ovsdb_jsonrpc_server *ovsdb_jsonrpc_server_create(struct ovsdb *); + +int ovsdb_jsonrpc_server_listen(struct ovsdb_jsonrpc_server *, + const char *name); +void ovsdb_jsonrpc_server_connect(struct ovsdb_jsonrpc_server *, + const char *name); + void ovsdb_jsonrpc_server_run(struct ovsdb_jsonrpc_server *); void ovsdb_jsonrpc_server_wait(struct ovsdb_jsonrpc_server *); diff --git a/ovsdb/ovsdb-server.c b/ovsdb/ovsdb-server.c index a91e7789..00c465d7 100644 --- a/ovsdb/ovsdb-server.c +++ b/ovsdb/ovsdb-server.c @@ -57,8 +57,10 @@ main(int argc, char *argv[]) struct svec active, passive; struct ovsdb_error *error; struct ovsdb *db; + const char *name; char *file_name; int retval; + size_t i; set_program_name(argv[0]); register_fault_handlers(); @@ -74,9 +76,15 @@ main(int argc, char *argv[]) ovs_fatal(0, "%s", ovsdb_error_to_string(error)); } - retval = ovsdb_jsonrpc_server_create(db, &active, &passive, &jsonrpc); - if (retval) { - ovs_fatal(retval, "failed to initialize JSON-RPC server for OVSDB"); + jsonrpc = ovsdb_jsonrpc_server_create(db); + SVEC_FOR_EACH (i, name, &active) { + ovsdb_jsonrpc_server_connect(jsonrpc, name); + } + SVEC_FOR_EACH (i, name, &passive) { + retval = ovsdb_jsonrpc_server_listen(jsonrpc, name); + if (retval) { + ovs_fatal(retval, "failed to listen on %s", name); + } } svec_destroy(&active); svec_destroy(&passive);