1 /* Copyright (c) 2009 Nicira Networks
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at:
7 * http://www.apache.org/licenses/LICENSE-2.0
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
18 #include "jsonrpc-server.h"
30 #define THIS_MODULE VLM_ovsdb_jsonrpc_server
33 struct ovsdb_jsonrpc_trigger {
34 struct ovsdb_trigger trigger;
35 struct ovsdb_jsonrpc_session *session;
36 struct hmap_node hmap_node; /* Element in session's trigger table. */
40 static struct ovsdb_jsonrpc_trigger *ovsdb_jsonrpc_trigger_find(
41 struct ovsdb_jsonrpc_session *, const struct json *id, size_t hash);
42 static void ovsdb_jsonrpc_trigger_complete(struct ovsdb_jsonrpc_trigger *);
44 struct ovsdb_jsonrpc_session {
45 struct ovsdb_jsonrpc_server *server;
46 struct list node; /* Element in server's sessions list. */
49 struct list completions; /* Completed triggers. */
52 static void ovsdb_jsonrpc_session_open(struct ovsdb_jsonrpc_server *,
54 static void ovsdb_jsonrpc_session_close(struct ovsdb_jsonrpc_session *);
55 static void ovsdb_jsonrpc_session_got_request(struct ovsdb_jsonrpc_session *,
56 struct jsonrpc_msg *);
57 static void ovsdb_jsonrpc_session_got_notify(struct ovsdb_jsonrpc_session *,
58 struct jsonrpc_msg *);
60 struct ovsdb_jsonrpc_server {
63 struct list sessions; /* List of "struct ovsdb_jsonrpc_session"s. */
64 unsigned int n_sessions, max_sessions;
65 unsigned int max_triggers;
67 struct pstream **listeners;
68 size_t n_listeners, allocated_listeners;
71 static void ovsdb_jsonrpc_server_listen(struct ovsdb_jsonrpc_server *,
75 ovsdb_jsonrpc_server_create(struct ovsdb *db, const struct svec *active,
76 const struct svec *passive,
77 struct ovsdb_jsonrpc_server **serverp)
79 struct ovsdb_jsonrpc_server *server;
84 server = xzalloc(sizeof *server);
86 server->max_sessions = 64;
87 server->max_triggers = 64;
88 list_init(&server->sessions);
90 SVEC_FOR_EACH (i, name, active) {
91 struct stream *stream;
94 error = stream_open(name, &stream);
96 ovsdb_jsonrpc_session_open(server, stream);
98 ovs_error(error, "%s: connection failed", name);
103 SVEC_FOR_EACH (i, name, passive) {
104 struct pstream *pstream;
107 error = pstream_open(name, &pstream);
109 ovsdb_jsonrpc_server_listen(server, pstream);
111 ovs_error(error, "failed to listen on %s", name);
121 ovsdb_jsonrpc_server_run(struct ovsdb_jsonrpc_server *svr)
123 struct ovsdb_jsonrpc_session *s, *next;
126 /* Accept new connections. */
127 for (i = 0; i < svr->n_listeners && svr->n_sessions < svr->max_sessions;) {
128 struct pstream *listener = svr->listeners[i];
129 struct stream *stream;
132 error = pstream_accept(listener, &stream);
134 ovsdb_jsonrpc_session_open(svr, stream);
135 } else if (error == EAGAIN) {
138 VLOG_WARN("%s: accept failed: %s",
139 pstream_get_name(listener), strerror(error));
140 pstream_close(listener);
141 svr->listeners[i] = svr->listeners[--svr->n_listeners];
145 /* Handle each session. */
146 LIST_FOR_EACH_SAFE (s, next, struct ovsdb_jsonrpc_session, node,
148 struct jsonrpc_msg *msg;
153 while (!list_is_empty(&s->completions)) {
154 struct ovsdb_jsonrpc_trigger *t
155 = CONTAINER_OF(s->completions.next,
156 struct ovsdb_jsonrpc_trigger, trigger.node);
157 ovsdb_jsonrpc_trigger_complete(t);
160 if (!jsonrpc_get_backlog(s->rpc) && !jsonrpc_recv(s->rpc, &msg)) {
161 if (msg->type == JSONRPC_REQUEST) {
162 ovsdb_jsonrpc_session_got_request(s, msg);
163 } else if (msg->type == JSONRPC_NOTIFY) {
164 ovsdb_jsonrpc_session_got_notify(s, msg);
166 VLOG_WARN("%s: received unexpected %s message",
167 jsonrpc_get_name(s->rpc),
168 jsonrpc_msg_type_to_string(msg->type));
169 jsonrpc_error(s->rpc, EPROTO);
170 jsonrpc_msg_destroy(msg);
174 error = jsonrpc_get_status(s->rpc);
176 ovsdb_jsonrpc_session_close(s);
182 ovsdb_jsonrpc_server_wait(struct ovsdb_jsonrpc_server *svr)
184 struct ovsdb_jsonrpc_session *s;
186 if (svr->n_sessions < svr->max_sessions) {
189 for (i = 0; i < svr->n_sessions; i++) {
190 pstream_wait(svr->listeners[i]);
194 LIST_FOR_EACH (s, struct ovsdb_jsonrpc_session, node, &svr->sessions) {
195 jsonrpc_wait(s->rpc);
196 if (!jsonrpc_get_backlog(s->rpc)) {
197 jsonrpc_recv_wait(s->rpc);
203 ovsdb_jsonrpc_server_listen(struct ovsdb_jsonrpc_server *svr,
204 struct pstream *pstream)
206 if (svr->n_listeners >= svr->allocated_listeners) {
207 svr->listeners = x2nrealloc(svr->listeners, &svr->allocated_listeners,
208 sizeof *svr->listeners);
210 svr->listeners[svr->n_listeners++] = pstream;
213 static struct ovsdb_jsonrpc_trigger *
214 ovsdb_jsonrpc_trigger_find(struct ovsdb_jsonrpc_session *s,
215 const struct json *id, size_t hash)
217 struct ovsdb_jsonrpc_trigger *t;
219 HMAP_FOR_EACH_WITH_HASH (t, struct ovsdb_jsonrpc_trigger, hmap_node, hash,
221 if (json_equal(t->id, id)) {
230 ovsdb_jsonrpc_trigger_complete(struct ovsdb_jsonrpc_trigger *t)
232 struct ovsdb_jsonrpc_session *s = t->session;
234 if (!jsonrpc_get_status(s->rpc)) {
235 struct jsonrpc_msg *reply;
238 result = ovsdb_trigger_steal_result(&t->trigger);
240 reply = jsonrpc_create_reply(result, t->id);
242 reply = jsonrpc_create_error(json_string_create("canceled"),
245 jsonrpc_send(s->rpc, reply);
249 ovsdb_trigger_destroy(&t->trigger);
250 hmap_remove(&s->triggers, &t->hmap_node);
255 ovsdb_jsonrpc_session_open(struct ovsdb_jsonrpc_server *svr,
256 struct stream *stream)
258 struct ovsdb_jsonrpc_session *s;
260 s = xzalloc(sizeof *s);
262 list_push_back(&svr->sessions, &s->node);
263 s->rpc = jsonrpc_open(stream);
264 hmap_init(&s->triggers);
265 list_init(&s->completions);
269 ovsdb_jsonrpc_session_close(struct ovsdb_jsonrpc_session *s)
271 struct ovsdb_jsonrpc_trigger *t, *next;
273 jsonrpc_error(s->rpc, EOF);
274 HMAP_FOR_EACH_SAFE (t, next, struct ovsdb_jsonrpc_trigger, hmap_node,
276 ovsdb_jsonrpc_trigger_complete(t);
279 jsonrpc_close(s->rpc);
281 list_remove(&s->node);
282 s->server->n_sessions--;
285 static struct jsonrpc_msg *
286 execute_transaction(struct ovsdb_jsonrpc_session *s,
287 struct jsonrpc_msg *request)
289 struct ovsdb_jsonrpc_trigger *t;
292 /* Check for duplicate ID. */
293 hash = json_hash(request->id, 0);
294 t = ovsdb_jsonrpc_trigger_find(s, request->id, hash);
296 return jsonrpc_create_error(
297 json_string_create("duplicate request ID"), request->id);
300 /* Insert into trigger table. */
301 t = xmalloc(sizeof *t);
302 ovsdb_trigger_init(s->server->db,
303 &t->trigger, request->params, &s->completions,
307 hmap_insert(&s->triggers, &t->hmap_node, hash);
310 request->params = NULL;
312 /* Complete early if possible. */
313 if (ovsdb_trigger_is_complete(&t->trigger)) {
314 ovsdb_jsonrpc_trigger_complete(t);
321 ovsdb_jsonrpc_session_got_request(struct ovsdb_jsonrpc_session *s,
322 struct jsonrpc_msg *request)
324 struct jsonrpc_msg *reply;
326 if (!strcmp(request->method, "transact")) {
327 reply = execute_transaction(s, request);
328 } else if (!strcmp(request->method, "get_schema")) {
329 reply = jsonrpc_create_reply(
330 ovsdb_schema_to_json(s->server->db->schema), request->id);
332 reply = jsonrpc_create_error(json_string_create("unknown method"),
337 jsonrpc_msg_destroy(request);
338 jsonrpc_send(s->rpc, reply);
343 execute_cancel(struct ovsdb_jsonrpc_session *s, struct jsonrpc_msg *request)
345 size_t hash = json_hash(request->id, 0);
346 struct ovsdb_jsonrpc_trigger *t;
348 t = ovsdb_jsonrpc_trigger_find(s, request->params, hash);
350 ovsdb_jsonrpc_trigger_complete(t);
355 ovsdb_jsonrpc_session_got_notify(struct ovsdb_jsonrpc_session *s,
356 struct jsonrpc_msg *request)
358 if (!strcmp(request->method, "cancel")) {
359 execute_cancel(s, request);
361 jsonrpc_msg_destroy(request);