1 /* Copyright (c) 2009 Nicira Networks
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at:
7 * http://www.apache.org/licenses/LICENSE-2.0
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
18 #include "jsonrpc-server.h"
26 #include "reconnect.h"
31 #define THIS_MODULE VLM_ovsdb_jsonrpc_server
34 struct ovsdb_jsonrpc_session;
36 static void ovsdb_jsonrpc_session_create_active(struct ovsdb_jsonrpc_server *,
38 static void ovsdb_jsonrpc_session_create_passive(struct ovsdb_jsonrpc_server *,
40 static void ovsdb_jsonrpc_session_run_all(struct ovsdb_jsonrpc_server *);
41 static void ovsdb_jsonrpc_session_wait_all(struct ovsdb_jsonrpc_server *);
43 static void ovsdb_jsonrpc_trigger_create(struct ovsdb_jsonrpc_session *,
44 struct json *id, struct json *params);
45 static struct ovsdb_jsonrpc_trigger *ovsdb_jsonrpc_trigger_find(
46 struct ovsdb_jsonrpc_session *, const struct json *id, size_t hash);
47 static void ovsdb_jsonrpc_trigger_complete(struct ovsdb_jsonrpc_trigger *);
48 static void ovsdb_jsonrpc_trigger_complete_all(struct ovsdb_jsonrpc_session *);
49 static void ovsdb_jsonrpc_trigger_complete_done(
50 struct ovsdb_jsonrpc_session *);
52 /* JSON-RPC database server. */
54 struct ovsdb_jsonrpc_server {
57 struct list sessions; /* List of "struct ovsdb_jsonrpc_session"s. */
58 unsigned int n_sessions, max_sessions;
59 unsigned int max_triggers;
61 struct pstream **listeners;
62 size_t n_listeners, allocated_listeners;
65 struct ovsdb_jsonrpc_server *
66 ovsdb_jsonrpc_server_create(struct ovsdb *db)
68 struct ovsdb_jsonrpc_server *server = xzalloc(sizeof *server);
70 server->max_sessions = 64;
71 server->max_triggers = 64;
72 list_init(&server->sessions);
77 ovsdb_jsonrpc_server_listen(struct ovsdb_jsonrpc_server *svr, const char *name)
79 struct pstream *pstream;
82 error = pstream_open(name, &pstream);
87 if (svr->n_listeners >= svr->allocated_listeners) {
88 svr->listeners = x2nrealloc(svr->listeners, &svr->allocated_listeners,
89 sizeof *svr->listeners);
91 svr->listeners[svr->n_listeners++] = pstream;
96 ovsdb_jsonrpc_server_connect(struct ovsdb_jsonrpc_server *svr,
99 ovsdb_jsonrpc_session_create_active(svr, name);
103 ovsdb_jsonrpc_server_run(struct ovsdb_jsonrpc_server *svr)
107 /* Accept new connections. */
108 for (i = 0; i < svr->n_listeners && svr->n_sessions < svr->max_sessions;) {
109 struct pstream *listener = svr->listeners[i];
110 struct stream *stream;
113 error = pstream_accept(listener, &stream);
115 ovsdb_jsonrpc_session_create_passive(svr, stream);
116 } else if (error == EAGAIN) {
119 VLOG_WARN("%s: accept failed: %s",
120 pstream_get_name(listener), strerror(error));
121 pstream_close(listener);
122 svr->listeners[i] = svr->listeners[--svr->n_listeners];
126 /* Handle each session. */
127 ovsdb_jsonrpc_session_run_all(svr);
131 ovsdb_jsonrpc_server_wait(struct ovsdb_jsonrpc_server *svr)
133 if (svr->n_sessions < svr->max_sessions) {
136 for (i = 0; i < svr->n_listeners; i++) {
137 pstream_wait(svr->listeners[i]);
141 ovsdb_jsonrpc_session_wait_all(svr);
144 /* JSON-RPC database server session. */
146 struct ovsdb_jsonrpc_session {
147 struct ovsdb_jsonrpc_server *server;
148 struct list node; /* Element in server's sessions list. */
151 struct hmap triggers; /* Hmap of "struct ovsdb_jsonrpc_trigger"s. */
152 struct list completions; /* Completed triggers. */
154 /* Connecting and reconnecting. */
155 struct reconnect *reconnect; /* For back-off. */
156 bool active; /* Active or passive connection? */
158 struct stream *stream; /* Only if active == false and rpc == NULL. */
161 static void ovsdb_jsonrpc_session_close(struct ovsdb_jsonrpc_session *);
162 static void ovsdb_jsonrpc_session_disconnect(struct ovsdb_jsonrpc_session *s);
163 static int ovsdb_jsonrpc_session_run(struct ovsdb_jsonrpc_session *);
164 static void ovsdb_jsonrpc_session_wait(struct ovsdb_jsonrpc_session *);
165 static void ovsdb_jsonrpc_session_got_request(struct ovsdb_jsonrpc_session *,
166 struct jsonrpc_msg *);
167 static void ovsdb_jsonrpc_session_got_notify(struct ovsdb_jsonrpc_session *,
168 struct jsonrpc_msg *);
170 static struct ovsdb_jsonrpc_session *
171 ovsdb_jsonrpc_session_create(struct ovsdb_jsonrpc_server *svr,
172 const char *name, bool active)
174 struct ovsdb_jsonrpc_session *s;
176 s = xzalloc(sizeof *s);
178 list_push_back(&svr->sessions, &s->node);
179 hmap_init(&s->triggers);
180 list_init(&s->completions);
181 s->reconnect = reconnect_create(time_msec());
182 reconnect_set_name(s->reconnect, name);
183 reconnect_enable(s->reconnect, time_msec());
192 ovsdb_jsonrpc_session_create_active(struct ovsdb_jsonrpc_server *svr,
195 ovsdb_jsonrpc_session_create(svr, name, true);
199 ovsdb_jsonrpc_session_create_passive(struct ovsdb_jsonrpc_server *svr,
200 struct stream *stream)
202 struct ovsdb_jsonrpc_session *s;
204 s = ovsdb_jsonrpc_session_create(svr, stream_get_name(stream), false);
205 reconnect_connected(s->reconnect, time_msec());
206 s->rpc = jsonrpc_open(stream);
210 ovsdb_jsonrpc_session_close(struct ovsdb_jsonrpc_session *s)
212 ovsdb_jsonrpc_session_disconnect(s);
213 list_remove(&s->node);
214 s->server->n_sessions--;
218 ovsdb_jsonrpc_session_disconnect(struct ovsdb_jsonrpc_session *s)
220 reconnect_disconnected(s->reconnect, time_msec(), 0);
222 jsonrpc_error(s->rpc, EOF);
223 ovsdb_jsonrpc_trigger_complete_all(s);
224 jsonrpc_close(s->rpc);
226 } else if (s->stream) {
227 stream_close(s->stream);
233 ovsdb_jsonrpc_session_connect(struct ovsdb_jsonrpc_session *s)
235 ovsdb_jsonrpc_session_disconnect(s);
237 int error = stream_open(reconnect_get_name(s->reconnect), &s->stream);
239 reconnect_connect_failed(s->reconnect, time_msec(), error);
241 reconnect_connecting(s->reconnect, time_msec());
247 ovsdb_jsonrpc_session_run(struct ovsdb_jsonrpc_session *s)
250 struct jsonrpc_msg *msg;
255 ovsdb_jsonrpc_trigger_complete_done(s);
257 if (!jsonrpc_get_backlog(s->rpc) && !jsonrpc_recv(s->rpc, &msg)) {
258 reconnect_received(s->reconnect, time_msec());
259 if (msg->type == JSONRPC_REQUEST) {
260 ovsdb_jsonrpc_session_got_request(s, msg);
261 } else if (msg->type == JSONRPC_NOTIFY) {
262 ovsdb_jsonrpc_session_got_notify(s, msg);
264 VLOG_WARN("%s: received unexpected %s message",
265 jsonrpc_get_name(s->rpc),
266 jsonrpc_msg_type_to_string(msg->type));
267 jsonrpc_error(s->rpc, EPROTO);
268 jsonrpc_msg_destroy(msg);
272 error = jsonrpc_get_status(s->rpc);
275 ovsdb_jsonrpc_session_disconnect(s);
280 } else if (s->stream) {
281 int error = stream_connect(s->stream);
283 reconnect_connected(s->reconnect, time_msec());
284 s->rpc = jsonrpc_open(s->stream);
286 } else if (error != EAGAIN) {
287 reconnect_connect_failed(s->reconnect, time_msec(), error);
288 stream_close(s->stream);
293 switch (reconnect_run(s->reconnect, time_msec())) {
294 case RECONNECT_CONNECT:
295 ovsdb_jsonrpc_session_connect(s);
298 case RECONNECT_DISCONNECT:
299 ovsdb_jsonrpc_session_disconnect(s);
302 case RECONNECT_PROBE:
304 struct json *params = json_array_create_empty();
305 jsonrpc_send(s->rpc, jsonrpc_create_request("echo", params));
309 return s->active || s->rpc ? 0 : ETIMEDOUT;
313 ovsdb_jsonrpc_session_run_all(struct ovsdb_jsonrpc_server *svr)
315 struct ovsdb_jsonrpc_session *s, *next;
317 LIST_FOR_EACH_SAFE (s, next, struct ovsdb_jsonrpc_session, node,
319 int error = ovsdb_jsonrpc_session_run(s);
321 ovsdb_jsonrpc_session_close(s);
327 ovsdb_jsonrpc_session_wait(struct ovsdb_jsonrpc_session *s)
330 jsonrpc_wait(s->rpc);
331 if (!jsonrpc_get_backlog(s->rpc)) {
332 jsonrpc_recv_wait(s->rpc);
334 } else if (s->stream) {
335 stream_connect_wait(s->stream);
337 reconnect_wait(s->reconnect, time_msec());
341 ovsdb_jsonrpc_session_wait_all(struct ovsdb_jsonrpc_server *svr)
343 struct ovsdb_jsonrpc_session *s;
345 LIST_FOR_EACH (s, struct ovsdb_jsonrpc_session, node, &svr->sessions) {
346 ovsdb_jsonrpc_session_wait(s);
350 static struct jsonrpc_msg *
351 execute_transaction(struct ovsdb_jsonrpc_session *s,
352 struct jsonrpc_msg *request)
354 ovsdb_jsonrpc_trigger_create(s, request->id, request->params);
356 request->params = NULL;
361 ovsdb_jsonrpc_session_got_request(struct ovsdb_jsonrpc_session *s,
362 struct jsonrpc_msg *request)
364 struct jsonrpc_msg *reply;
366 if (!strcmp(request->method, "transact")) {
367 reply = execute_transaction(s, request);
368 } else if (!strcmp(request->method, "get_schema")) {
369 reply = jsonrpc_create_reply(
370 ovsdb_schema_to_json(s->server->db->schema), request->id);
371 } else if (!strcmp(request->method, "echo")) {
372 reply = jsonrpc_create_reply(json_clone(request->params), request->id);
374 reply = jsonrpc_create_error(json_string_create("unknown method"),
379 jsonrpc_msg_destroy(request);
380 jsonrpc_send(s->rpc, reply);
385 execute_cancel(struct ovsdb_jsonrpc_session *s, struct jsonrpc_msg *request)
387 if (json_array(request->params)->n == 1) {
388 struct ovsdb_jsonrpc_trigger *t;
391 id = request->params->u.array.elems[0];
392 t = ovsdb_jsonrpc_trigger_find(s, id, json_hash(id, 0));
394 ovsdb_jsonrpc_trigger_complete(t);
400 ovsdb_jsonrpc_session_got_notify(struct ovsdb_jsonrpc_session *s,
401 struct jsonrpc_msg *request)
403 if (!strcmp(request->method, "cancel")) {
404 execute_cancel(s, request);
406 jsonrpc_msg_destroy(request);
409 /* JSON-RPC database server triggers.
411 * (Every transaction is treated as a trigger even if it doesn't actually have
412 * any "wait" operations.) */
414 struct ovsdb_jsonrpc_trigger {
415 struct ovsdb_trigger trigger;
416 struct ovsdb_jsonrpc_session *session;
417 struct hmap_node hmap_node; /* In session's "triggers" hmap. */
422 ovsdb_jsonrpc_trigger_create(struct ovsdb_jsonrpc_session *s,
423 struct json *id, struct json *params)
425 struct ovsdb_jsonrpc_trigger *t;
428 /* Check for duplicate ID. */
429 hash = json_hash(id, 0);
430 t = ovsdb_jsonrpc_trigger_find(s, id, hash);
432 jsonrpc_send(s->rpc, jsonrpc_create_error(
433 json_string_create("duplicate request ID"), id));
435 json_destroy(params);
439 /* Insert into trigger table. */
440 t = xmalloc(sizeof *t);
441 ovsdb_trigger_init(s->server->db,
442 &t->trigger, params, &s->completions,
446 hmap_insert(&s->triggers, &t->hmap_node, hash);
448 /* Complete early if possible. */
449 if (ovsdb_trigger_is_complete(&t->trigger)) {
450 ovsdb_jsonrpc_trigger_complete(t);
454 static struct ovsdb_jsonrpc_trigger *
455 ovsdb_jsonrpc_trigger_find(struct ovsdb_jsonrpc_session *s,
456 const struct json *id, size_t hash)
458 struct ovsdb_jsonrpc_trigger *t;
460 HMAP_FOR_EACH_WITH_HASH (t, struct ovsdb_jsonrpc_trigger, hmap_node, hash,
462 if (json_equal(t->id, id)) {
471 ovsdb_jsonrpc_trigger_complete(struct ovsdb_jsonrpc_trigger *t)
473 struct ovsdb_jsonrpc_session *s = t->session;
475 if (s->rpc && !jsonrpc_get_status(s->rpc)) {
476 struct jsonrpc_msg *reply;
479 result = ovsdb_trigger_steal_result(&t->trigger);
481 reply = jsonrpc_create_reply(result, t->id);
483 reply = jsonrpc_create_error(json_string_create("canceled"),
486 jsonrpc_send(s->rpc, reply);
490 ovsdb_trigger_destroy(&t->trigger);
491 hmap_remove(&s->triggers, &t->hmap_node);
496 ovsdb_jsonrpc_trigger_complete_all(struct ovsdb_jsonrpc_session *s)
498 struct ovsdb_jsonrpc_trigger *t, *next;
499 HMAP_FOR_EACH_SAFE (t, next, struct ovsdb_jsonrpc_trigger, hmap_node,
501 ovsdb_jsonrpc_trigger_complete(t);
506 ovsdb_jsonrpc_trigger_complete_done(struct ovsdb_jsonrpc_session *s)
508 while (!list_is_empty(&s->completions)) {
509 struct ovsdb_jsonrpc_trigger *t
510 = CONTAINER_OF(s->completions.next,
511 struct ovsdb_jsonrpc_trigger, trigger.node);
512 ovsdb_jsonrpc_trigger_complete(t);