ovsdb-server: Factor out complication by using jsonrpc_session.
[openvswitch] / ovsdb / jsonrpc-server.c
1 /* Copyright (c) 2009 Nicira Networks
2  *
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at:
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15
16 #include <config.h>
17
18 #include "jsonrpc-server.h"
19
20 #include <errno.h>
21
22 #include "column.h"
23 #include "json.h"
24 #include "jsonrpc.h"
25 #include "ovsdb-error.h"
26 #include "ovsdb-parser.h"
27 #include "ovsdb.h"
28 #include "reconnect.h"
29 #include "row.h"
30 #include "stream.h"
31 #include "table.h"
32 #include "timeval.h"
33 #include "transaction.h"
34 #include "trigger.h"
35
36 #define THIS_MODULE VLM_ovsdb_jsonrpc_server
37 #include "vlog.h"
38
39 struct ovsdb_jsonrpc_session;
40
41 /* Sessions. */
42 static void ovsdb_jsonrpc_session_create_active(struct ovsdb_jsonrpc_server *,
43                                                 const char *name);
44 static void ovsdb_jsonrpc_session_create_passive(struct ovsdb_jsonrpc_server *,
45                                                  struct stream *);
46 static void ovsdb_jsonrpc_session_run_all(struct ovsdb_jsonrpc_server *);
47 static void ovsdb_jsonrpc_session_wait_all(struct ovsdb_jsonrpc_server *);
48
49 /* Triggers. */
50 static void ovsdb_jsonrpc_trigger_create(struct ovsdb_jsonrpc_session *,
51                                          struct json *id, struct json *params);
52 static struct ovsdb_jsonrpc_trigger *ovsdb_jsonrpc_trigger_find(
53     struct ovsdb_jsonrpc_session *, const struct json *id, size_t hash);
54 static void ovsdb_jsonrpc_trigger_complete(struct ovsdb_jsonrpc_trigger *);
55 static void ovsdb_jsonrpc_trigger_complete_all(struct ovsdb_jsonrpc_session *);
56 static void ovsdb_jsonrpc_trigger_complete_done(
57     struct ovsdb_jsonrpc_session *);
58
59 /* Monitors. */
60 static struct json *ovsdb_jsonrpc_monitor_create(
61     struct ovsdb_jsonrpc_session *, struct json *params);
62 static struct jsonrpc_msg *ovsdb_jsonrpc_monitor_cancel(
63     struct ovsdb_jsonrpc_session *,
64     struct json_array *params,
65     const struct json *request_id);
66 static void ovsdb_jsonrpc_monitor_remove_all(struct ovsdb_jsonrpc_session *);
67 \f
68 /* JSON-RPC database server. */
69
70 struct ovsdb_jsonrpc_server {
71     struct ovsdb *db;
72
73     struct list sessions;       /* List of "struct ovsdb_jsonrpc_session"s. */
74     unsigned int n_sessions, max_sessions;
75
76     struct pstream **listeners;
77     size_t n_listeners, allocated_listeners;
78 };
79
80 struct ovsdb_jsonrpc_server *
81 ovsdb_jsonrpc_server_create(struct ovsdb *db)
82 {
83     struct ovsdb_jsonrpc_server *server = xzalloc(sizeof *server);
84     server->db = db;
85     server->max_sessions = 64;
86     list_init(&server->sessions);
87     return server;
88 }
89
90 void
91 ovsdb_jsonrpc_server_listen(struct ovsdb_jsonrpc_server *svr,
92                             struct pstream *pstream)
93 {
94     if (svr->n_listeners >= svr->allocated_listeners) {
95         svr->listeners = x2nrealloc(svr->listeners, &svr->allocated_listeners,
96                                     sizeof *svr->listeners);
97     }
98     svr->listeners[svr->n_listeners++] = pstream;
99 }
100
101 void
102 ovsdb_jsonrpc_server_connect(struct ovsdb_jsonrpc_server *svr,
103                              const char *name)
104 {
105     ovsdb_jsonrpc_session_create_active(svr, name);
106 }
107
108 void
109 ovsdb_jsonrpc_server_run(struct ovsdb_jsonrpc_server *svr)
110 {
111     size_t i;
112
113     /* Accept new connections. */
114     for (i = 0; i < svr->n_listeners && svr->n_sessions < svr->max_sessions;) {
115         struct pstream *listener = svr->listeners[i];
116         struct stream *stream;
117         int error;
118
119         error = pstream_accept(listener, &stream);
120         if (!error) {
121             ovsdb_jsonrpc_session_create_passive(svr, stream);
122         } else if (error == EAGAIN) {
123             i++;
124         } else if (error) {
125             VLOG_WARN("%s: accept failed: %s",
126                       pstream_get_name(listener), strerror(error));
127             pstream_close(listener);
128             svr->listeners[i] = svr->listeners[--svr->n_listeners];
129         }
130     }
131
132     /* Handle each session. */
133     ovsdb_jsonrpc_session_run_all(svr);
134 }
135
136 void
137 ovsdb_jsonrpc_server_wait(struct ovsdb_jsonrpc_server *svr)
138 {
139     if (svr->n_sessions < svr->max_sessions) {
140         size_t i;
141
142         for (i = 0; i < svr->n_listeners; i++) {
143             pstream_wait(svr->listeners[i]);
144         }
145     }
146
147     ovsdb_jsonrpc_session_wait_all(svr);
148 }
149 \f
150 /* JSON-RPC database server session. */
151
152 struct ovsdb_jsonrpc_session {
153     struct ovsdb_jsonrpc_server *server;
154     struct list node;           /* Element in server's sessions list. */
155
156     /* Triggers. */
157     struct hmap triggers;       /* Hmap of "struct ovsdb_jsonrpc_trigger"s. */
158     struct list completions;    /* Completed triggers. */
159
160     /* Monitors. */
161     struct hmap monitors;       /* Hmap of "struct ovsdb_jsonrpc_monitor"s. */
162
163     /* Network connectivity. */
164     struct jsonrpc_session *js;  /* JSON-RPC session. */
165     unsigned int js_seqno;       /* Last jsonrpc_session_get_seqno() value. */
166 };
167
168 static void ovsdb_jsonrpc_session_close(struct ovsdb_jsonrpc_session *);
169 static int ovsdb_jsonrpc_session_run(struct ovsdb_jsonrpc_session *);
170 static void ovsdb_jsonrpc_session_wait(struct ovsdb_jsonrpc_session *);
171 static void ovsdb_jsonrpc_session_got_request(struct ovsdb_jsonrpc_session *,
172                                              struct jsonrpc_msg *);
173 static void ovsdb_jsonrpc_session_got_notify(struct ovsdb_jsonrpc_session *,
174                                              struct jsonrpc_msg *);
175
176 static struct ovsdb_jsonrpc_session *
177 ovsdb_jsonrpc_session_create(struct ovsdb_jsonrpc_server *svr,
178                              struct jsonrpc_session *js)
179 {
180     struct ovsdb_jsonrpc_session *s;
181
182     s = xzalloc(sizeof *s);
183     s->server = svr;
184     list_push_back(&svr->sessions, &s->node);
185     hmap_init(&s->triggers);
186     hmap_init(&s->monitors);
187     list_init(&s->completions);
188     s->js = js;
189     s->js_seqno = jsonrpc_session_get_seqno(js);
190
191     svr->n_sessions++;
192
193     return s;
194 }
195
196 static void
197 ovsdb_jsonrpc_session_create_active(struct ovsdb_jsonrpc_server *svr,
198                                     const char *name)
199 {
200     ovsdb_jsonrpc_session_create(svr, jsonrpc_session_open(name));
201 }
202
203 static void
204 ovsdb_jsonrpc_session_create_passive(struct ovsdb_jsonrpc_server *svr,
205                                      struct stream *stream)
206 {
207     ovsdb_jsonrpc_session_create(
208         svr, jsonrpc_session_open_unreliably(jsonrpc_open(stream)));
209 }
210
211 static void
212 ovsdb_jsonrpc_session_close(struct ovsdb_jsonrpc_session *s)
213 {
214     jsonrpc_session_close(s->js);
215     list_remove(&s->node);
216     s->server->n_sessions--;
217 }
218
219 static int
220 ovsdb_jsonrpc_session_run(struct ovsdb_jsonrpc_session *s)
221 {
222     jsonrpc_session_run(s->js);
223     if (s->js_seqno != jsonrpc_session_get_seqno(s->js)) {
224         s->js_seqno = jsonrpc_session_get_seqno(s->js);
225         ovsdb_jsonrpc_trigger_complete_all(s);
226         ovsdb_jsonrpc_monitor_remove_all(s);
227     }
228
229     ovsdb_jsonrpc_trigger_complete_done(s);
230
231     if (!jsonrpc_session_get_backlog(s->js)) {
232         struct jsonrpc_msg *msg = jsonrpc_session_recv(s->js);
233         if (msg) {
234             if (msg->type == JSONRPC_REQUEST) {
235                 ovsdb_jsonrpc_session_got_request(s, msg);
236             } else if (msg->type == JSONRPC_NOTIFY) {
237                 ovsdb_jsonrpc_session_got_notify(s, msg);
238             } else {
239                 VLOG_WARN("%s: received unexpected %s message",
240                           jsonrpc_session_get_name(s->js),
241                           jsonrpc_msg_type_to_string(msg->type));
242                 jsonrpc_session_force_reconnect(s->js);
243                 jsonrpc_msg_destroy(msg);
244             }
245         }
246     }
247     return jsonrpc_session_is_alive(s->js) ? 0 : ETIMEDOUT;
248 }
249
250 static void
251 ovsdb_jsonrpc_session_run_all(struct ovsdb_jsonrpc_server *svr)
252 {
253     struct ovsdb_jsonrpc_session *s, *next;
254
255     LIST_FOR_EACH_SAFE (s, next, struct ovsdb_jsonrpc_session, node,
256                         &svr->sessions) {
257         int error = ovsdb_jsonrpc_session_run(s);
258         if (error) {
259             ovsdb_jsonrpc_session_close(s);
260         }
261     }
262 }
263
264 static void
265 ovsdb_jsonrpc_session_wait(struct ovsdb_jsonrpc_session *s)
266 {
267     jsonrpc_session_wait(s->js);
268     if (!jsonrpc_session_get_backlog(s->js)) {
269         jsonrpc_session_recv_wait(s->js);
270     }
271 }
272
273 static void
274 ovsdb_jsonrpc_session_wait_all(struct ovsdb_jsonrpc_server *svr)
275 {
276     struct ovsdb_jsonrpc_session *s;
277
278     LIST_FOR_EACH (s, struct ovsdb_jsonrpc_session, node, &svr->sessions) {
279         ovsdb_jsonrpc_session_wait(s);
280     }
281 }
282
283 static struct jsonrpc_msg *
284 execute_transaction(struct ovsdb_jsonrpc_session *s,
285                     struct jsonrpc_msg *request)
286 {
287     ovsdb_jsonrpc_trigger_create(s, request->id, request->params);
288     request->id = NULL;
289     request->params = NULL;
290     return NULL;
291 }
292
293 static void
294 ovsdb_jsonrpc_session_got_request(struct ovsdb_jsonrpc_session *s,
295                                   struct jsonrpc_msg *request)
296 {
297     struct jsonrpc_msg *reply;
298
299     if (!strcmp(request->method, "transact")) {
300         reply = execute_transaction(s, request);
301     } else if (!strcmp(request->method, "monitor")) {
302         reply = jsonrpc_create_reply(
303             ovsdb_jsonrpc_monitor_create(s, request->params), request->id);
304     } else if (!strcmp(request->method, "monitor_cancel")) {
305         reply = ovsdb_jsonrpc_monitor_cancel(s, json_array(request->params),
306                                              request->id);
307     } else if (!strcmp(request->method, "get_schema")) {
308         reply = jsonrpc_create_reply(
309             ovsdb_schema_to_json(s->server->db->schema), request->id);
310     } else if (!strcmp(request->method, "echo")) {
311         reply = jsonrpc_create_reply(json_clone(request->params), request->id);
312     } else {
313         reply = jsonrpc_create_error(json_string_create("unknown method"),
314                                      request->id);
315     }
316
317     if (reply) {
318         jsonrpc_msg_destroy(request);
319         jsonrpc_session_send(s->js, reply);
320     }
321 }
322
323 static void
324 execute_cancel(struct ovsdb_jsonrpc_session *s, struct jsonrpc_msg *request)
325 {
326     if (json_array(request->params)->n == 1) {
327         struct ovsdb_jsonrpc_trigger *t;
328         struct json *id;
329
330         id = request->params->u.array.elems[0];
331         t = ovsdb_jsonrpc_trigger_find(s, id, json_hash(id, 0));
332         if (t) {
333             ovsdb_jsonrpc_trigger_complete(t);
334         }
335     }
336 }
337
338 static void
339 ovsdb_jsonrpc_session_got_notify(struct ovsdb_jsonrpc_session *s,
340                                  struct jsonrpc_msg *request)
341 {
342     if (!strcmp(request->method, "cancel")) {
343         execute_cancel(s, request);
344     }
345     jsonrpc_msg_destroy(request);
346 }
347 \f
348 /* JSON-RPC database server triggers.
349  *
350  * (Every transaction is treated as a trigger even if it doesn't actually have
351  * any "wait" operations.) */
352
353 struct ovsdb_jsonrpc_trigger {
354     struct ovsdb_trigger trigger;
355     struct ovsdb_jsonrpc_session *session;
356     struct hmap_node hmap_node; /* In session's "triggers" hmap. */
357     struct json *id;
358 };
359
360 static void
361 ovsdb_jsonrpc_trigger_create(struct ovsdb_jsonrpc_session *s,
362                              struct json *id, struct json *params)
363 {
364     struct ovsdb_jsonrpc_trigger *t;
365     size_t hash;
366
367     /* Check for duplicate ID. */
368     hash = json_hash(id, 0);
369     t = ovsdb_jsonrpc_trigger_find(s, id, hash);
370     if (t) {
371         struct jsonrpc_msg *msg;
372
373         msg = jsonrpc_create_error(json_string_create("duplicate request ID"),
374                                    id);
375         jsonrpc_session_send(s->js, msg);
376         json_destroy(id);
377         json_destroy(params);
378         return;
379     }
380
381     /* Insert into trigger table. */
382     t = xmalloc(sizeof *t);
383     ovsdb_trigger_init(s->server->db,
384                        &t->trigger, params, &s->completions,
385                        time_msec());
386     t->session = s;
387     t->id = id;
388     hmap_insert(&s->triggers, &t->hmap_node, hash);
389
390     /* Complete early if possible. */
391     if (ovsdb_trigger_is_complete(&t->trigger)) {
392         ovsdb_jsonrpc_trigger_complete(t);
393     }
394 }
395
396 static struct ovsdb_jsonrpc_trigger *
397 ovsdb_jsonrpc_trigger_find(struct ovsdb_jsonrpc_session *s,
398                            const struct json *id, size_t hash)
399 {
400     struct ovsdb_jsonrpc_trigger *t;
401
402     HMAP_FOR_EACH_WITH_HASH (t, struct ovsdb_jsonrpc_trigger, hmap_node, hash,
403                              &s->triggers) {
404         if (json_equal(t->id, id)) {
405             return t;
406         }
407     }
408
409     return NULL;
410 }
411
412 static void
413 ovsdb_jsonrpc_trigger_complete(struct ovsdb_jsonrpc_trigger *t)
414 {
415     struct ovsdb_jsonrpc_session *s = t->session;
416
417     if (jsonrpc_session_is_connected(s->js)) {
418         struct jsonrpc_msg *reply;
419         struct json *result;
420
421         result = ovsdb_trigger_steal_result(&t->trigger);
422         if (result) {
423             reply = jsonrpc_create_reply(result, t->id);
424         } else {
425             reply = jsonrpc_create_error(json_string_create("canceled"),
426                                          t->id);
427         }
428         jsonrpc_session_send(s->js, reply);
429     }
430
431     json_destroy(t->id);
432     ovsdb_trigger_destroy(&t->trigger);
433     hmap_remove(&s->triggers, &t->hmap_node);
434     free(t);
435 }
436
437 static void
438 ovsdb_jsonrpc_trigger_complete_all(struct ovsdb_jsonrpc_session *s)
439 {
440     struct ovsdb_jsonrpc_trigger *t, *next;
441     HMAP_FOR_EACH_SAFE (t, next, struct ovsdb_jsonrpc_trigger, hmap_node,
442                         &s->triggers) {
443         ovsdb_jsonrpc_trigger_complete(t);
444     }
445 }
446
447 static void
448 ovsdb_jsonrpc_trigger_complete_done(struct ovsdb_jsonrpc_session *s)
449 {
450     while (!list_is_empty(&s->completions)) {
451         struct ovsdb_jsonrpc_trigger *t
452             = CONTAINER_OF(s->completions.next,
453                            struct ovsdb_jsonrpc_trigger, trigger.node);
454         ovsdb_jsonrpc_trigger_complete(t);
455     }
456 }
457 \f
458 /* JSON-RPC database table monitors. */
459
460 enum ovsdb_jsonrpc_monitor_selection {
461     OJMS_INITIAL = 1 << 0,      /* All rows when monitor is created. */
462     OJMS_INSERT = 1 << 1,       /* New rows. */
463     OJMS_DELETE = 1 << 2,       /* Deleted rows. */
464     OJMS_MODIFY = 1 << 3        /* Modified rows. */
465 };
466
467 struct ovsdb_jsonrpc_monitor_table {
468     const struct ovsdb_table *table;
469     enum ovsdb_jsonrpc_monitor_selection select;
470     struct ovsdb_column_set columns;
471 };
472
473 struct ovsdb_jsonrpc_monitor {
474     struct ovsdb_replica replica;
475     struct ovsdb_jsonrpc_session *session;
476     struct hmap_node node;      /* In ovsdb_jsonrpc_session's "monitors". */
477
478     struct json *monitor_id;
479     struct shash tables;     /* Holds "struct ovsdb_jsonrpc_monitor_table"s. */
480 };
481
482 static const struct ovsdb_replica_class ovsdb_jsonrpc_replica_class;
483
484 struct ovsdb_jsonrpc_monitor *ovsdb_jsonrpc_monitor_find(
485     struct ovsdb_jsonrpc_session *, const struct json *monitor_id);
486 static void ovsdb_jsonrpc_monitor_destroy(struct ovsdb_replica *);
487 static struct json *ovsdb_jsonrpc_monitor_get_initial(
488     const struct ovsdb_jsonrpc_monitor *);
489
490 static bool
491 parse_bool(struct ovsdb_parser *parser, const char *name, bool default_value)
492 {
493     const struct json *json;
494
495     json = ovsdb_parser_member(parser, name, OP_BOOLEAN | OP_OPTIONAL);
496     return json ? json_boolean(json) : default_value;
497 }
498
499 struct ovsdb_jsonrpc_monitor *
500 ovsdb_jsonrpc_monitor_find(struct ovsdb_jsonrpc_session *s,
501                            const struct json *monitor_id)
502 {
503     struct ovsdb_jsonrpc_monitor *m;
504
505     HMAP_FOR_EACH_WITH_HASH (m, struct ovsdb_jsonrpc_monitor, node,
506                              json_hash(monitor_id, 0), &s->monitors) {
507         if (json_equal(m->monitor_id, monitor_id)) {
508             return m;
509         }
510     }
511
512     return NULL;
513 }
514
515 static struct json *
516 ovsdb_jsonrpc_monitor_create(struct ovsdb_jsonrpc_session *s,
517                              struct json *params)
518 {
519     struct ovsdb_jsonrpc_monitor *m = NULL;
520     struct json *monitor_id, *monitor_requests;
521     struct ovsdb_error *error = NULL;
522     struct shash_node *node;
523     struct json *json;
524
525     if (json_array(params)->n != 2) {
526         error = ovsdb_syntax_error(params, NULL, "invalid parameters");
527         goto error;
528     }
529     monitor_id = params->u.array.elems[0];
530     monitor_requests = params->u.array.elems[1];
531     if (monitor_requests->type != JSON_OBJECT) {
532         error = ovsdb_syntax_error(monitor_requests, NULL,
533                                    "monitor-requests must be object");
534         goto error;
535     }
536
537     if (ovsdb_jsonrpc_monitor_find(s, monitor_id)) {
538         error = ovsdb_syntax_error(monitor_id, NULL, "duplicate monitor ID");
539         goto error;
540     }
541
542     m = xzalloc(sizeof *m);
543     ovsdb_replica_init(&m->replica, &ovsdb_jsonrpc_replica_class);
544     ovsdb_add_replica(s->server->db, &m->replica);
545     m->session = s;
546     hmap_insert(&s->monitors, &m->node, json_hash(monitor_id, 0));
547     m->monitor_id = json_clone(monitor_id);
548     shash_init(&m->tables);
549
550     SHASH_FOR_EACH (node, json_object(monitor_requests)) {
551         const struct ovsdb_table *table;
552         struct ovsdb_jsonrpc_monitor_table *mt;
553         const struct json *columns_json, *select_json;
554         struct ovsdb_parser parser;
555
556         table = ovsdb_get_table(s->server->db, node->name);
557         if (!table) {
558             error = ovsdb_syntax_error(NULL, NULL,
559                                        "no table named %s", node->name);
560             goto error;
561         }
562
563         mt = xzalloc(sizeof *mt);
564         mt->table = table;
565         mt->select = OJMS_INITIAL | OJMS_INSERT | OJMS_DELETE | OJMS_MODIFY;
566         ovsdb_column_set_init(&mt->columns);
567         shash_add(&m->tables, table->schema->name, mt);
568
569         ovsdb_parser_init(&parser, node->data, "table %s", node->name);
570         columns_json = ovsdb_parser_member(&parser, "columns",
571                                            OP_ARRAY | OP_OPTIONAL);
572         select_json = ovsdb_parser_member(&parser, "select",
573                                           OP_OBJECT | OP_OPTIONAL);
574         error = ovsdb_parser_finish(&parser);
575         if (error) {
576             goto error;
577         }
578
579         if (columns_json) {
580             error = ovsdb_column_set_from_json(columns_json, table,
581                                                &mt->columns);
582             if (error) {
583                 goto error;
584             }
585         } else {
586             struct shash_node *node;
587
588             SHASH_FOR_EACH (node, &table->schema->columns) {
589                 const struct ovsdb_column *column = node->data;
590                 if (column->index != OVSDB_COL_UUID) {
591                     ovsdb_column_set_add(&mt->columns, column);
592                 }
593             }
594         }
595
596         if (select_json) {
597             mt->select = 0;
598             ovsdb_parser_init(&parser, select_json, "table %s select",
599                               table->schema->name);
600             if (parse_bool(&parser, "initial", true)) {
601                 mt->select |= OJMS_INITIAL;
602             }
603             if (parse_bool(&parser, "insert", true)) {
604                 mt->select |= OJMS_INSERT;
605             }
606             if (parse_bool(&parser, "delete", true)) {
607                 mt->select |= OJMS_DELETE;
608             }
609             if (parse_bool(&parser, "modify", true)) {
610                 mt->select |= OJMS_MODIFY;
611             }
612             error = ovsdb_parser_finish(&parser);
613             if (error) {
614                 goto error;
615             }
616         }
617     }
618
619     return ovsdb_jsonrpc_monitor_get_initial(m);
620
621 error:
622     if (m) {
623         ovsdb_remove_replica(s->server->db, &m->replica);
624     }
625
626     json = ovsdb_error_to_json(error);
627     ovsdb_error_destroy(error);
628     return json;
629 }
630
631 static struct jsonrpc_msg *
632 ovsdb_jsonrpc_monitor_cancel(struct ovsdb_jsonrpc_session *s,
633                              struct json_array *params,
634                              const struct json *request_id)
635 {
636     if (params->n != 1) {
637         return jsonrpc_create_error(json_string_create("invalid parameters"),
638                                     request_id);
639     } else {
640         struct ovsdb_jsonrpc_monitor *m;
641
642         m = ovsdb_jsonrpc_monitor_find(s, params->elems[0]);
643         if (!m) {
644             return jsonrpc_create_error(json_string_create("unknown monitor"),
645                                         request_id);
646         } else {
647             ovsdb_remove_replica(s->server->db, &m->replica);
648             return jsonrpc_create_reply(json_object_create(), request_id);
649         }
650     }
651 }
652
653 static void
654 ovsdb_jsonrpc_monitor_remove_all(struct ovsdb_jsonrpc_session *s)
655 {
656     struct ovsdb_jsonrpc_monitor *m, *next;
657
658     HMAP_FOR_EACH_SAFE (m, next,
659                         struct ovsdb_jsonrpc_monitor, node, &s->monitors) {
660         ovsdb_remove_replica(s->server->db, &m->replica);
661     }
662 }
663
664 static struct ovsdb_jsonrpc_monitor *
665 ovsdb_jsonrpc_monitor_cast(struct ovsdb_replica *replica)
666 {
667     assert(replica->class == &ovsdb_jsonrpc_replica_class);
668     return CONTAINER_OF(replica, struct ovsdb_jsonrpc_monitor, replica);
669 }
670
671 struct ovsdb_jsonrpc_monitor_aux {
672     bool initial;               /* Sending initial contents of table? */
673     const struct ovsdb_jsonrpc_monitor *monitor;
674     struct json *json;          /* JSON for the whole transaction. */
675
676     /* Current table.  */
677     struct ovsdb_jsonrpc_monitor_table *mt;
678     struct json *table_json;    /* JSON for table's transaction. */
679 };
680
681 static bool
682 ovsdb_jsonrpc_monitor_change_cb(const struct ovsdb_row *old,
683                                 const struct ovsdb_row *new,
684                                 void *aux_)
685 {
686     struct ovsdb_jsonrpc_monitor_aux *aux = aux_;
687     const struct ovsdb_jsonrpc_monitor *m = aux->monitor;
688     struct ovsdb_table *table = new ? new->table : old->table;
689     enum ovsdb_jsonrpc_monitor_selection type;
690     struct json *old_json, *new_json;
691     struct json *row_json;
692     char uuid[UUID_LEN + 1];
693     int n_changed;
694     size_t i;
695
696     if (!aux->mt || table != aux->mt->table) {
697         aux->mt = shash_find_data(&m->tables, table->schema->name);
698         aux->table_json = NULL;
699         if (!aux->mt) {
700             /* We don't care about rows in this table at all.  Tell the caller
701              * to skip it.  */
702             return false;
703         }
704     }
705
706     type = (aux->initial ? OJMS_INITIAL
707             : !old ? OJMS_INSERT
708             : !new ? OJMS_DELETE
709             : OJMS_MODIFY);
710     if (!(aux->mt->select & type)) {
711         /* We don't care about this type of change (but do want to be called
712          * back for changes to other rows in the same table). */
713         return true;
714     }
715
716     old_json = new_json = NULL;
717     n_changed = 0;
718     for (i = 0; i < aux->mt->columns.n_columns; i++) {
719         const struct ovsdb_column *column = aux->mt->columns.columns[i];
720         unsigned int idx = column->index;
721         bool changed = false;
722
723         if (type == OJMS_MODIFY) {
724             changed = !ovsdb_datum_equals(&old->fields[idx],
725                                           &new->fields[idx], &column->type);
726             n_changed += changed;
727         }
728         if (changed || type == OJMS_DELETE) {
729             if (!old_json) {
730                 old_json = json_object_create();
731             }
732             json_object_put(old_json, column->name,
733                             ovsdb_datum_to_json(&old->fields[idx],
734                                                 &column->type));
735         }
736         if (type & (OJMS_INITIAL | OJMS_INSERT | OJMS_MODIFY)) {
737             if (!new_json) {
738                 new_json = json_object_create();
739             }
740             json_object_put(new_json, column->name,
741                             ovsdb_datum_to_json(&new->fields[idx],
742                                                 &column->type));
743         }
744     }
745     if ((type == OJMS_MODIFY && !n_changed) || (!old_json && !new_json)) {
746         /* No reportable changes. */
747         json_destroy(old_json);
748         json_destroy(new_json);
749         return true;
750     }
751
752     /* Create JSON object for transaction overall. */
753     if (!aux->json) {
754         aux->json = json_object_create();
755     }
756
757     /* Create JSON object for transaction on this table. */
758     if (!aux->table_json) {
759         aux->table_json = json_object_create();
760         json_object_put(aux->json, aux->mt->table->schema->name,
761                         aux->table_json);
762     }
763
764     /* Create JSON object for transaction on this row. */
765     row_json = json_object_create();
766     if (old_json) {
767         json_object_put(row_json, "old", old_json);
768     }
769     if (new_json) {
770         json_object_put(row_json, "new", new_json);
771     }
772
773     /* Add JSON row to JSON table. */
774     snprintf(uuid, sizeof uuid,
775              UUID_FMT, UUID_ARGS(ovsdb_row_get_uuid(new ? new : old)));
776     json_object_put(aux->table_json, uuid, row_json);
777
778     return true;
779 }
780
781 static void
782 ovsdb_jsonrpc_monitor_init_aux(struct ovsdb_jsonrpc_monitor_aux *aux,
783                                const struct ovsdb_jsonrpc_monitor *m,
784                                bool initial)
785 {
786     aux->initial = initial;
787     aux->monitor = m;
788     aux->json = NULL;
789     aux->mt = NULL;
790     aux->table_json = NULL;
791 }
792
793 static struct ovsdb_error *
794 ovsdb_jsonrpc_monitor_commit(struct ovsdb_replica *replica,
795                              const struct ovsdb_txn *txn, bool durable UNUSED)
796 {
797     struct ovsdb_jsonrpc_monitor *m = ovsdb_jsonrpc_monitor_cast(replica);
798     struct ovsdb_jsonrpc_monitor_aux aux;
799
800     ovsdb_jsonrpc_monitor_init_aux(&aux, m, false);
801     ovsdb_txn_for_each_change(txn, ovsdb_jsonrpc_monitor_change_cb, &aux);
802     if (aux.json) {
803         struct jsonrpc_msg *msg;
804         struct json *params;
805
806         params = json_array_create_2(json_clone(aux.monitor->monitor_id),
807                                      aux.json);
808         msg = jsonrpc_create_notify("update", params);
809         jsonrpc_session_send(aux.monitor->session->js, msg);
810     }
811
812     return NULL;
813 }
814
815 static struct json *
816 ovsdb_jsonrpc_monitor_get_initial(const struct ovsdb_jsonrpc_monitor *m)
817 {
818     struct ovsdb_jsonrpc_monitor_aux aux;
819     struct shash_node *node;
820
821     ovsdb_jsonrpc_monitor_init_aux(&aux, m, true);
822     SHASH_FOR_EACH (node, &m->tables) {
823         struct ovsdb_jsonrpc_monitor_table *mt = node->data;
824
825         if (mt->select & OJMS_INITIAL) {
826             struct ovsdb_row *row;
827
828             HMAP_FOR_EACH (row, struct ovsdb_row, hmap_node,
829                            &mt->table->rows) {
830                 ovsdb_jsonrpc_monitor_change_cb(NULL, row, &aux);
831             }
832         }
833     }
834     return aux.json ? aux.json : json_object_create();
835 }
836
837 static void
838 ovsdb_jsonrpc_monitor_destroy(struct ovsdb_replica *replica)
839 {
840     struct ovsdb_jsonrpc_monitor *m = ovsdb_jsonrpc_monitor_cast(replica);
841     struct shash_node *node;
842
843     json_destroy(m->monitor_id);
844     SHASH_FOR_EACH (node, &m->tables) {
845         struct ovsdb_jsonrpc_monitor_table *mt = node->data;
846         ovsdb_column_set_destroy(&mt->columns);
847         free(mt);
848     }
849     shash_destroy(&m->tables);
850     hmap_remove(&m->session->monitors, &m->node);
851     free(m);
852 }
853
854 static const struct ovsdb_replica_class ovsdb_jsonrpc_replica_class = {
855     ovsdb_jsonrpc_monitor_commit,
856     ovsdb_jsonrpc_monitor_destroy
857 };