1 /* Copyright (c) 2009 Nicira Networks
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at:
7 * http://www.apache.org/licenses/LICENSE-2.0
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
18 #include "jsonrpc-server.h"
25 #include "reconnect.h"
31 #define THIS_MODULE VLM_ovsdb_jsonrpc_server
34 struct ovsdb_jsonrpc_trigger {
35 struct ovsdb_trigger trigger;
36 struct ovsdb_jsonrpc_session *session;
37 struct hmap_node hmap_node; /* Element in session's trigger table. */
41 static struct ovsdb_jsonrpc_trigger *ovsdb_jsonrpc_trigger_find(
42 struct ovsdb_jsonrpc_session *, const struct json *id, size_t hash);
43 static void ovsdb_jsonrpc_trigger_complete(struct ovsdb_jsonrpc_trigger *);
45 struct ovsdb_jsonrpc_session {
46 struct ovsdb_jsonrpc_server *server;
47 struct list node; /* Element in server's sessions list. */
49 struct list completions; /* Completed triggers. */
51 struct reconnect *reconnect; /* For back-off. */
52 bool active; /* Active or passive connection? */
54 struct stream *stream; /* Only if active == false and rpc == NULL. */
57 static void ovsdb_jsonrpc_session_create_active(struct ovsdb_jsonrpc_server *,
59 static void ovsdb_jsonrpc_session_create_passive(struct ovsdb_jsonrpc_server *,
61 static void ovsdb_jsonrpc_session_close(struct ovsdb_jsonrpc_session *);
62 static void ovsdb_jsonrpc_session_disconnect(struct ovsdb_jsonrpc_session *s);
63 static int ovsdb_jsonrpc_session_run(struct ovsdb_jsonrpc_session *);
64 static void ovsdb_jsonrpc_session_wait(struct ovsdb_jsonrpc_session *);
65 static void ovsdb_jsonrpc_session_got_request(struct ovsdb_jsonrpc_session *,
66 struct jsonrpc_msg *);
67 static void ovsdb_jsonrpc_session_got_notify(struct ovsdb_jsonrpc_session *,
68 struct jsonrpc_msg *);
70 struct ovsdb_jsonrpc_server {
73 struct list sessions; /* List of "struct ovsdb_jsonrpc_session"s. */
74 unsigned int n_sessions, max_sessions;
75 unsigned int max_triggers;
77 struct pstream **listeners;
78 size_t n_listeners, allocated_listeners;
81 static void ovsdb_jsonrpc_server_listen(struct ovsdb_jsonrpc_server *,
85 ovsdb_jsonrpc_server_create(struct ovsdb *db, const struct svec *active,
86 const struct svec *passive,
87 struct ovsdb_jsonrpc_server **serverp)
89 struct ovsdb_jsonrpc_server *server;
94 server = xzalloc(sizeof *server);
96 server->max_sessions = 64;
97 server->max_triggers = 64;
98 list_init(&server->sessions);
100 SVEC_FOR_EACH (i, name, active) {
101 ovsdb_jsonrpc_session_create_active(server, name);
104 SVEC_FOR_EACH (i, name, passive) {
105 struct pstream *pstream;
108 error = pstream_open(name, &pstream);
110 ovsdb_jsonrpc_server_listen(server, pstream);
112 ovs_error(error, "failed to listen on %s", name);
122 ovsdb_jsonrpc_server_run(struct ovsdb_jsonrpc_server *svr)
124 struct ovsdb_jsonrpc_session *s, *next;
127 /* Accept new connections. */
128 for (i = 0; i < svr->n_listeners && svr->n_sessions < svr->max_sessions;) {
129 struct pstream *listener = svr->listeners[i];
130 struct stream *stream;
133 error = pstream_accept(listener, &stream);
135 ovsdb_jsonrpc_session_create_passive(svr, stream);
136 } else if (error == EAGAIN) {
139 VLOG_WARN("%s: accept failed: %s",
140 pstream_get_name(listener), strerror(error));
141 pstream_close(listener);
142 svr->listeners[i] = svr->listeners[--svr->n_listeners];
146 /* Handle each session. */
147 LIST_FOR_EACH_SAFE (s, next, struct ovsdb_jsonrpc_session, node,
149 int error = ovsdb_jsonrpc_session_run(s);
151 ovsdb_jsonrpc_session_close(s);
157 ovsdb_jsonrpc_server_wait(struct ovsdb_jsonrpc_server *svr)
159 struct ovsdb_jsonrpc_session *s;
161 if (svr->n_sessions < svr->max_sessions) {
164 for (i = 0; i < svr->n_listeners; i++) {
165 pstream_wait(svr->listeners[i]);
169 LIST_FOR_EACH (s, struct ovsdb_jsonrpc_session, node, &svr->sessions) {
170 ovsdb_jsonrpc_session_wait(s);
175 ovsdb_jsonrpc_server_listen(struct ovsdb_jsonrpc_server *svr,
176 struct pstream *pstream)
178 if (svr->n_listeners >= svr->allocated_listeners) {
179 svr->listeners = x2nrealloc(svr->listeners, &svr->allocated_listeners,
180 sizeof *svr->listeners);
182 svr->listeners[svr->n_listeners++] = pstream;
185 static struct ovsdb_jsonrpc_trigger *
186 ovsdb_jsonrpc_trigger_find(struct ovsdb_jsonrpc_session *s,
187 const struct json *id, size_t hash)
189 struct ovsdb_jsonrpc_trigger *t;
191 HMAP_FOR_EACH_WITH_HASH (t, struct ovsdb_jsonrpc_trigger, hmap_node, hash,
193 if (json_equal(t->id, id)) {
202 ovsdb_jsonrpc_trigger_complete(struct ovsdb_jsonrpc_trigger *t)
204 struct ovsdb_jsonrpc_session *s = t->session;
206 if (s->rpc && !jsonrpc_get_status(s->rpc)) {
207 struct jsonrpc_msg *reply;
210 result = ovsdb_trigger_steal_result(&t->trigger);
212 reply = jsonrpc_create_reply(result, t->id);
214 reply = jsonrpc_create_error(json_string_create("canceled"),
217 jsonrpc_send(s->rpc, reply);
221 ovsdb_trigger_destroy(&t->trigger);
222 hmap_remove(&s->triggers, &t->hmap_node);
226 static struct ovsdb_jsonrpc_session *
227 ovsdb_jsonrpc_session_create(struct ovsdb_jsonrpc_server *svr,
228 const char *name, bool active)
230 struct ovsdb_jsonrpc_session *s;
232 s = xzalloc(sizeof *s);
234 list_push_back(&svr->sessions, &s->node);
235 hmap_init(&s->triggers);
236 list_init(&s->completions);
237 s->reconnect = reconnect_create(time_msec());
238 reconnect_set_name(s->reconnect, name);
239 reconnect_enable(s->reconnect, time_msec());
248 ovsdb_jsonrpc_session_create_active(struct ovsdb_jsonrpc_server *svr,
251 ovsdb_jsonrpc_session_create(svr, name, true);
255 ovsdb_jsonrpc_session_create_passive(struct ovsdb_jsonrpc_server *svr,
256 struct stream *stream)
258 struct ovsdb_jsonrpc_session *s;
260 s = ovsdb_jsonrpc_session_create(svr, stream_get_name(stream), false);
261 reconnect_connected(s->reconnect, time_msec());
262 s->rpc = jsonrpc_open(stream);
266 ovsdb_jsonrpc_session_close(struct ovsdb_jsonrpc_session *s)
268 ovsdb_jsonrpc_session_disconnect(s);
269 list_remove(&s->node);
270 s->server->n_sessions--;
274 ovsdb_jsonrpc_session_disconnect(struct ovsdb_jsonrpc_session *s)
276 reconnect_disconnected(s->reconnect, time_msec(), 0);
278 struct ovsdb_jsonrpc_trigger *t, *next;
280 jsonrpc_error(s->rpc, EOF);
281 HMAP_FOR_EACH_SAFE (t, next, struct ovsdb_jsonrpc_trigger, hmap_node,
283 ovsdb_jsonrpc_trigger_complete(t);
286 jsonrpc_close(s->rpc);
288 } else if (s->stream) {
289 stream_close(s->stream);
295 ovsdb_jsonrpc_session_connect(struct ovsdb_jsonrpc_session *s)
297 ovsdb_jsonrpc_session_disconnect(s);
299 int error = stream_open(reconnect_get_name(s->reconnect), &s->stream);
301 reconnect_connect_failed(s->reconnect, time_msec(), error);
303 reconnect_connecting(s->reconnect, time_msec());
309 ovsdb_jsonrpc_session_run(struct ovsdb_jsonrpc_session *s)
312 struct jsonrpc_msg *msg;
317 while (!list_is_empty(&s->completions)) {
318 struct ovsdb_jsonrpc_trigger *t
319 = CONTAINER_OF(s->completions.next,
320 struct ovsdb_jsonrpc_trigger, trigger.node);
321 ovsdb_jsonrpc_trigger_complete(t);
324 if (!jsonrpc_get_backlog(s->rpc) && !jsonrpc_recv(s->rpc, &msg)) {
325 reconnect_received(s->reconnect, time_msec());
326 if (msg->type == JSONRPC_REQUEST) {
327 ovsdb_jsonrpc_session_got_request(s, msg);
328 } else if (msg->type == JSONRPC_NOTIFY) {
329 ovsdb_jsonrpc_session_got_notify(s, msg);
331 VLOG_WARN("%s: received unexpected %s message",
332 jsonrpc_get_name(s->rpc),
333 jsonrpc_msg_type_to_string(msg->type));
334 jsonrpc_error(s->rpc, EPROTO);
335 jsonrpc_msg_destroy(msg);
339 error = jsonrpc_get_status(s->rpc);
342 ovsdb_jsonrpc_session_disconnect(s);
347 } else if (s->stream) {
348 int error = stream_connect(s->stream);
350 reconnect_connected(s->reconnect, time_msec());
351 s->rpc = jsonrpc_open(s->stream);
353 } else if (error != EAGAIN) {
354 reconnect_connect_failed(s->reconnect, time_msec(), error);
355 stream_close(s->stream);
360 switch (reconnect_run(s->reconnect, time_msec())) {
361 case RECONNECT_CONNECT:
362 ovsdb_jsonrpc_session_connect(s);
365 case RECONNECT_DISCONNECT:
366 ovsdb_jsonrpc_session_disconnect(s);
369 case RECONNECT_PROBE:
371 struct json *params = json_array_create_empty();
372 jsonrpc_send(s->rpc, jsonrpc_create_request("echo", params));
376 return s->active || s->rpc ? 0 : ETIMEDOUT;
381 ovsdb_jsonrpc_session_wait(struct ovsdb_jsonrpc_session *s)
384 jsonrpc_wait(s->rpc);
385 if (!jsonrpc_get_backlog(s->rpc)) {
386 jsonrpc_recv_wait(s->rpc);
388 } else if (s->stream) {
389 stream_connect_wait(s->stream);
391 reconnect_wait(s->reconnect, time_msec());
394 static struct jsonrpc_msg *
395 execute_transaction(struct ovsdb_jsonrpc_session *s,
396 struct jsonrpc_msg *request)
398 struct ovsdb_jsonrpc_trigger *t;
401 /* Check for duplicate ID. */
402 hash = json_hash(request->id, 0);
403 t = ovsdb_jsonrpc_trigger_find(s, request->id, hash);
405 return jsonrpc_create_error(
406 json_string_create("duplicate request ID"), request->id);
409 /* Insert into trigger table. */
410 t = xmalloc(sizeof *t);
411 ovsdb_trigger_init(s->server->db,
412 &t->trigger, request->params, &s->completions,
416 hmap_insert(&s->triggers, &t->hmap_node, hash);
419 request->params = NULL;
421 /* Complete early if possible. */
422 if (ovsdb_trigger_is_complete(&t->trigger)) {
423 ovsdb_jsonrpc_trigger_complete(t);
430 ovsdb_jsonrpc_session_got_request(struct ovsdb_jsonrpc_session *s,
431 struct jsonrpc_msg *request)
433 struct jsonrpc_msg *reply;
435 if (!strcmp(request->method, "transact")) {
436 reply = execute_transaction(s, request);
437 } else if (!strcmp(request->method, "get_schema")) {
438 reply = jsonrpc_create_reply(
439 ovsdb_schema_to_json(s->server->db->schema), request->id);
440 } else if (!strcmp(request->method, "echo")) {
441 reply = jsonrpc_create_reply(json_clone(request->params), request->id);
443 reply = jsonrpc_create_error(json_string_create("unknown method"),
448 jsonrpc_msg_destroy(request);
449 jsonrpc_send(s->rpc, reply);
454 execute_cancel(struct ovsdb_jsonrpc_session *s, struct jsonrpc_msg *request)
456 if (json_array(request->params)->n == 1) {
457 struct ovsdb_jsonrpc_trigger *t;
460 id = request->params->u.array.elems[0];
461 t = ovsdb_jsonrpc_trigger_find(s, id, json_hash(id, 0));
463 ovsdb_jsonrpc_trigger_complete(t);
469 ovsdb_jsonrpc_session_got_notify(struct ovsdb_jsonrpc_session *s,
470 struct jsonrpc_msg *request)
472 if (!strcmp(request->method, "cancel")) {
473 execute_cancel(s, request);
475 jsonrpc_msg_destroy(request);