1 /* Copyright (c) 2009 Nicira Networks
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at:
7 * http://www.apache.org/licenses/LICENSE-2.0
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
18 #include "jsonrpc-server.h"
25 #include "ovsdb-error.h"
26 #include "ovsdb-parser.h"
28 #include "reconnect.h"
33 #include "transaction.h"
36 #define THIS_MODULE VLM_ovsdb_jsonrpc_server
39 struct ovsdb_jsonrpc_session;
42 static void ovsdb_jsonrpc_session_create_active(struct ovsdb_jsonrpc_server *,
44 static void ovsdb_jsonrpc_session_create_passive(struct ovsdb_jsonrpc_server *,
46 static void ovsdb_jsonrpc_session_run_all(struct ovsdb_jsonrpc_server *);
47 static void ovsdb_jsonrpc_session_wait_all(struct ovsdb_jsonrpc_server *);
50 static void ovsdb_jsonrpc_trigger_create(struct ovsdb_jsonrpc_session *,
51 struct json *id, struct json *params);
52 static struct ovsdb_jsonrpc_trigger *ovsdb_jsonrpc_trigger_find(
53 struct ovsdb_jsonrpc_session *, const struct json *id, size_t hash);
54 static void ovsdb_jsonrpc_trigger_complete(struct ovsdb_jsonrpc_trigger *);
55 static void ovsdb_jsonrpc_trigger_complete_all(struct ovsdb_jsonrpc_session *);
56 static void ovsdb_jsonrpc_trigger_complete_done(
57 struct ovsdb_jsonrpc_session *);
60 static struct json *ovsdb_jsonrpc_monitor_create(
61 struct ovsdb_jsonrpc_session *, struct json *params);
62 static struct jsonrpc_msg *ovsdb_jsonrpc_monitor_cancel(
63 struct ovsdb_jsonrpc_session *,
64 struct json_array *params,
65 const struct json *request_id);
66 static void ovsdb_jsonrpc_monitor_remove_all(struct ovsdb_jsonrpc_session *);
68 /* JSON-RPC database server. */
70 struct ovsdb_jsonrpc_server {
73 struct list sessions; /* List of "struct ovsdb_jsonrpc_session"s. */
74 unsigned int n_sessions, max_sessions;
75 unsigned int max_triggers;
77 struct pstream **listeners;
78 size_t n_listeners, allocated_listeners;
81 struct ovsdb_jsonrpc_server *
82 ovsdb_jsonrpc_server_create(struct ovsdb *db)
84 struct ovsdb_jsonrpc_server *server = xzalloc(sizeof *server);
86 server->max_sessions = 64;
87 server->max_triggers = 64;
88 list_init(&server->sessions);
93 ovsdb_jsonrpc_server_listen(struct ovsdb_jsonrpc_server *svr,
94 struct pstream *pstream)
96 if (svr->n_listeners >= svr->allocated_listeners) {
97 svr->listeners = x2nrealloc(svr->listeners, &svr->allocated_listeners,
98 sizeof *svr->listeners);
100 svr->listeners[svr->n_listeners++] = pstream;
104 ovsdb_jsonrpc_server_connect(struct ovsdb_jsonrpc_server *svr,
107 ovsdb_jsonrpc_session_create_active(svr, name);
111 ovsdb_jsonrpc_server_run(struct ovsdb_jsonrpc_server *svr)
115 /* Accept new connections. */
116 for (i = 0; i < svr->n_listeners && svr->n_sessions < svr->max_sessions;) {
117 struct pstream *listener = svr->listeners[i];
118 struct stream *stream;
121 error = pstream_accept(listener, &stream);
123 ovsdb_jsonrpc_session_create_passive(svr, stream);
124 } else if (error == EAGAIN) {
127 VLOG_WARN("%s: accept failed: %s",
128 pstream_get_name(listener), strerror(error));
129 pstream_close(listener);
130 svr->listeners[i] = svr->listeners[--svr->n_listeners];
134 /* Handle each session. */
135 ovsdb_jsonrpc_session_run_all(svr);
139 ovsdb_jsonrpc_server_wait(struct ovsdb_jsonrpc_server *svr)
141 if (svr->n_sessions < svr->max_sessions) {
144 for (i = 0; i < svr->n_listeners; i++) {
145 pstream_wait(svr->listeners[i]);
149 ovsdb_jsonrpc_session_wait_all(svr);
152 /* JSON-RPC database server session. */
154 struct ovsdb_jsonrpc_session {
155 struct ovsdb_jsonrpc_server *server;
156 struct list node; /* Element in server's sessions list. */
159 struct hmap triggers; /* Hmap of "struct ovsdb_jsonrpc_trigger"s. */
160 struct list completions; /* Completed triggers. */
163 struct hmap monitors; /* Hmap of "struct ovsdb_jsonrpc_monitor"s. */
165 /* Connecting and reconnecting. */
166 struct reconnect *reconnect; /* For back-off. */
167 bool active; /* Active or passive connection? */
169 struct stream *stream; /* Only if active == false and rpc == NULL. */
172 static void ovsdb_jsonrpc_session_close(struct ovsdb_jsonrpc_session *);
173 static void ovsdb_jsonrpc_session_disconnect(struct ovsdb_jsonrpc_session *s);
174 static int ovsdb_jsonrpc_session_run(struct ovsdb_jsonrpc_session *);
175 static void ovsdb_jsonrpc_session_wait(struct ovsdb_jsonrpc_session *);
176 static void ovsdb_jsonrpc_session_got_request(struct ovsdb_jsonrpc_session *,
177 struct jsonrpc_msg *);
178 static void ovsdb_jsonrpc_session_got_notify(struct ovsdb_jsonrpc_session *,
179 struct jsonrpc_msg *);
181 static struct ovsdb_jsonrpc_session *
182 ovsdb_jsonrpc_session_create(struct ovsdb_jsonrpc_server *svr,
183 const char *name, bool active)
185 struct ovsdb_jsonrpc_session *s;
187 s = xzalloc(sizeof *s);
189 list_push_back(&svr->sessions, &s->node);
190 hmap_init(&s->triggers);
191 hmap_init(&s->monitors);
192 list_init(&s->completions);
193 s->reconnect = reconnect_create(time_msec());
194 reconnect_set_name(s->reconnect, name);
195 reconnect_enable(s->reconnect, time_msec());
204 ovsdb_jsonrpc_session_create_active(struct ovsdb_jsonrpc_server *svr,
207 ovsdb_jsonrpc_session_create(svr, name, true);
211 ovsdb_jsonrpc_session_create_passive(struct ovsdb_jsonrpc_server *svr,
212 struct stream *stream)
214 struct ovsdb_jsonrpc_session *s;
216 s = ovsdb_jsonrpc_session_create(svr, stream_get_name(stream), false);
217 reconnect_connected(s->reconnect, time_msec());
218 s->rpc = jsonrpc_open(stream);
222 ovsdb_jsonrpc_session_close(struct ovsdb_jsonrpc_session *s)
224 ovsdb_jsonrpc_session_disconnect(s);
225 list_remove(&s->node);
226 s->server->n_sessions--;
230 ovsdb_jsonrpc_session_disconnect(struct ovsdb_jsonrpc_session *s)
232 reconnect_disconnected(s->reconnect, time_msec(), 0);
234 jsonrpc_error(s->rpc, EOF);
235 ovsdb_jsonrpc_trigger_complete_all(s);
236 ovsdb_jsonrpc_monitor_remove_all(s);
237 jsonrpc_close(s->rpc);
239 } else if (s->stream) {
240 stream_close(s->stream);
246 ovsdb_jsonrpc_session_connect(struct ovsdb_jsonrpc_session *s)
248 ovsdb_jsonrpc_session_disconnect(s);
250 int error = stream_open(reconnect_get_name(s->reconnect), &s->stream);
252 reconnect_connect_failed(s->reconnect, time_msec(), error);
254 reconnect_connecting(s->reconnect, time_msec());
260 ovsdb_jsonrpc_session_run(struct ovsdb_jsonrpc_session *s)
263 struct jsonrpc_msg *msg;
268 ovsdb_jsonrpc_trigger_complete_done(s);
270 if (!jsonrpc_get_backlog(s->rpc) && !jsonrpc_recv(s->rpc, &msg)) {
271 reconnect_received(s->reconnect, time_msec());
272 if (msg->type == JSONRPC_REQUEST) {
273 ovsdb_jsonrpc_session_got_request(s, msg);
274 } else if (msg->type == JSONRPC_NOTIFY) {
275 ovsdb_jsonrpc_session_got_notify(s, msg);
276 } else if (msg->type == JSONRPC_REPLY
277 && msg->id && msg->id->type == JSON_STRING
278 && !strcmp(msg->id->u.string, "echo")) {
279 /* It's a reply to our echo request. Ignore it. */
281 VLOG_WARN("%s: received unexpected %s message",
282 jsonrpc_get_name(s->rpc),
283 jsonrpc_msg_type_to_string(msg->type));
284 jsonrpc_error(s->rpc, EPROTO);
285 jsonrpc_msg_destroy(msg);
289 error = jsonrpc_get_status(s->rpc);
292 ovsdb_jsonrpc_session_disconnect(s);
297 } else if (s->stream) {
298 int error = stream_connect(s->stream);
300 reconnect_connected(s->reconnect, time_msec());
301 s->rpc = jsonrpc_open(s->stream);
303 } else if (error != EAGAIN) {
304 reconnect_connect_failed(s->reconnect, time_msec(), error);
305 stream_close(s->stream);
310 switch (reconnect_run(s->reconnect, time_msec())) {
311 case RECONNECT_CONNECT:
312 ovsdb_jsonrpc_session_connect(s);
315 case RECONNECT_DISCONNECT:
316 ovsdb_jsonrpc_session_disconnect(s);
319 case RECONNECT_PROBE:
322 struct jsonrpc_msg *request;
324 params = json_array_create_empty();
325 request = jsonrpc_create_request("echo", params, NULL);
326 json_destroy(request->id);
327 request->id = json_string_create("echo");
328 jsonrpc_send(s->rpc, request);
332 return s->active || s->rpc ? 0 : ETIMEDOUT;
336 ovsdb_jsonrpc_session_run_all(struct ovsdb_jsonrpc_server *svr)
338 struct ovsdb_jsonrpc_session *s, *next;
340 LIST_FOR_EACH_SAFE (s, next, struct ovsdb_jsonrpc_session, node,
342 int error = ovsdb_jsonrpc_session_run(s);
344 ovsdb_jsonrpc_session_close(s);
350 ovsdb_jsonrpc_session_wait(struct ovsdb_jsonrpc_session *s)
353 jsonrpc_wait(s->rpc);
354 if (!jsonrpc_get_backlog(s->rpc)) {
355 jsonrpc_recv_wait(s->rpc);
357 } else if (s->stream) {
358 stream_connect_wait(s->stream);
360 reconnect_wait(s->reconnect, time_msec());
364 ovsdb_jsonrpc_session_wait_all(struct ovsdb_jsonrpc_server *svr)
366 struct ovsdb_jsonrpc_session *s;
368 LIST_FOR_EACH (s, struct ovsdb_jsonrpc_session, node, &svr->sessions) {
369 ovsdb_jsonrpc_session_wait(s);
373 static struct jsonrpc_msg *
374 execute_transaction(struct ovsdb_jsonrpc_session *s,
375 struct jsonrpc_msg *request)
377 ovsdb_jsonrpc_trigger_create(s, request->id, request->params);
379 request->params = NULL;
384 ovsdb_jsonrpc_session_got_request(struct ovsdb_jsonrpc_session *s,
385 struct jsonrpc_msg *request)
387 struct jsonrpc_msg *reply;
389 if (!strcmp(request->method, "transact")) {
390 reply = execute_transaction(s, request);
391 } else if (!strcmp(request->method, "monitor")) {
392 reply = jsonrpc_create_reply(
393 ovsdb_jsonrpc_monitor_create(s, request->params), request->id);
394 } else if (!strcmp(request->method, "monitor_cancel")) {
395 reply = ovsdb_jsonrpc_monitor_cancel(s, json_array(request->params),
397 } else if (!strcmp(request->method, "get_schema")) {
398 reply = jsonrpc_create_reply(
399 ovsdb_schema_to_json(s->server->db->schema), request->id);
400 } else if (!strcmp(request->method, "echo")) {
401 reply = jsonrpc_create_reply(json_clone(request->params), request->id);
403 reply = jsonrpc_create_error(json_string_create("unknown method"),
408 jsonrpc_msg_destroy(request);
409 jsonrpc_send(s->rpc, reply);
414 execute_cancel(struct ovsdb_jsonrpc_session *s, struct jsonrpc_msg *request)
416 if (json_array(request->params)->n == 1) {
417 struct ovsdb_jsonrpc_trigger *t;
420 id = request->params->u.array.elems[0];
421 t = ovsdb_jsonrpc_trigger_find(s, id, json_hash(id, 0));
423 ovsdb_jsonrpc_trigger_complete(t);
429 ovsdb_jsonrpc_session_got_notify(struct ovsdb_jsonrpc_session *s,
430 struct jsonrpc_msg *request)
432 if (!strcmp(request->method, "cancel")) {
433 execute_cancel(s, request);
435 jsonrpc_msg_destroy(request);
438 /* JSON-RPC database server triggers.
440 * (Every transaction is treated as a trigger even if it doesn't actually have
441 * any "wait" operations.) */
443 struct ovsdb_jsonrpc_trigger {
444 struct ovsdb_trigger trigger;
445 struct ovsdb_jsonrpc_session *session;
446 struct hmap_node hmap_node; /* In session's "triggers" hmap. */
451 ovsdb_jsonrpc_trigger_create(struct ovsdb_jsonrpc_session *s,
452 struct json *id, struct json *params)
454 struct ovsdb_jsonrpc_trigger *t;
457 /* Check for duplicate ID. */
458 hash = json_hash(id, 0);
459 t = ovsdb_jsonrpc_trigger_find(s, id, hash);
461 jsonrpc_send(s->rpc, jsonrpc_create_error(
462 json_string_create("duplicate request ID"), id));
464 json_destroy(params);
468 /* Insert into trigger table. */
469 t = xmalloc(sizeof *t);
470 ovsdb_trigger_init(s->server->db,
471 &t->trigger, params, &s->completions,
475 hmap_insert(&s->triggers, &t->hmap_node, hash);
477 /* Complete early if possible. */
478 if (ovsdb_trigger_is_complete(&t->trigger)) {
479 ovsdb_jsonrpc_trigger_complete(t);
483 static struct ovsdb_jsonrpc_trigger *
484 ovsdb_jsonrpc_trigger_find(struct ovsdb_jsonrpc_session *s,
485 const struct json *id, size_t hash)
487 struct ovsdb_jsonrpc_trigger *t;
489 HMAP_FOR_EACH_WITH_HASH (t, struct ovsdb_jsonrpc_trigger, hmap_node, hash,
491 if (json_equal(t->id, id)) {
500 ovsdb_jsonrpc_trigger_complete(struct ovsdb_jsonrpc_trigger *t)
502 struct ovsdb_jsonrpc_session *s = t->session;
504 if (s->rpc && !jsonrpc_get_status(s->rpc)) {
505 struct jsonrpc_msg *reply;
508 result = ovsdb_trigger_steal_result(&t->trigger);
510 reply = jsonrpc_create_reply(result, t->id);
512 reply = jsonrpc_create_error(json_string_create("canceled"),
515 jsonrpc_send(s->rpc, reply);
519 ovsdb_trigger_destroy(&t->trigger);
520 hmap_remove(&s->triggers, &t->hmap_node);
525 ovsdb_jsonrpc_trigger_complete_all(struct ovsdb_jsonrpc_session *s)
527 struct ovsdb_jsonrpc_trigger *t, *next;
528 HMAP_FOR_EACH_SAFE (t, next, struct ovsdb_jsonrpc_trigger, hmap_node,
530 ovsdb_jsonrpc_trigger_complete(t);
535 ovsdb_jsonrpc_trigger_complete_done(struct ovsdb_jsonrpc_session *s)
537 while (!list_is_empty(&s->completions)) {
538 struct ovsdb_jsonrpc_trigger *t
539 = CONTAINER_OF(s->completions.next,
540 struct ovsdb_jsonrpc_trigger, trigger.node);
541 ovsdb_jsonrpc_trigger_complete(t);
545 /* JSON-RPC database table monitors. */
547 enum ovsdb_jsonrpc_monitor_selection {
548 OJMS_INITIAL = 1 << 0, /* All rows when monitor is created. */
549 OJMS_INSERT = 1 << 1, /* New rows. */
550 OJMS_DELETE = 1 << 2, /* Deleted rows. */
551 OJMS_MODIFY = 1 << 3 /* Modified rows. */
554 struct ovsdb_jsonrpc_monitor_table {
555 const struct ovsdb_table *table;
556 enum ovsdb_jsonrpc_monitor_selection select;
557 struct ovsdb_column_set columns;
560 struct ovsdb_jsonrpc_monitor {
561 struct ovsdb_replica replica;
562 struct ovsdb_jsonrpc_session *session;
563 struct hmap_node node; /* In ovsdb_jsonrpc_session's "monitors". */
565 struct json *monitor_id;
566 struct shash tables; /* Holds "struct ovsdb_jsonrpc_monitor_table"s. */
569 static const struct ovsdb_replica_class ovsdb_jsonrpc_replica_class;
571 struct ovsdb_jsonrpc_monitor *ovsdb_jsonrpc_monitor_find(
572 struct ovsdb_jsonrpc_session *, const struct json *monitor_id);
573 static void ovsdb_jsonrpc_monitor_destroy(struct ovsdb_replica *);
574 static struct json *ovsdb_jsonrpc_monitor_get_initial(
575 const struct ovsdb_jsonrpc_monitor *);
578 parse_bool(struct ovsdb_parser *parser, const char *name, bool default_value)
580 const struct json *json;
582 json = ovsdb_parser_member(parser, name, OP_BOOLEAN | OP_OPTIONAL);
583 return json ? json_boolean(json) : default_value;
586 struct ovsdb_jsonrpc_monitor *
587 ovsdb_jsonrpc_monitor_find(struct ovsdb_jsonrpc_session *s,
588 const struct json *monitor_id)
590 struct ovsdb_jsonrpc_monitor *m;
592 HMAP_FOR_EACH_WITH_HASH (m, struct ovsdb_jsonrpc_monitor, node,
593 json_hash(monitor_id, 0), &s->monitors) {
594 if (json_equal(m->monitor_id, monitor_id)) {
603 ovsdb_jsonrpc_monitor_create(struct ovsdb_jsonrpc_session *s,
606 struct ovsdb_jsonrpc_monitor *m = NULL;
607 struct json *monitor_id, *monitor_requests;
608 struct ovsdb_error *error = NULL;
609 struct shash_node *node;
612 if (json_array(params)->n != 2) {
613 error = ovsdb_syntax_error(params, NULL, "invalid parameters");
616 monitor_id = params->u.array.elems[0];
617 monitor_requests = params->u.array.elems[1];
618 if (monitor_requests->type != JSON_OBJECT) {
619 error = ovsdb_syntax_error(monitor_requests, NULL,
620 "monitor-requests must be object");
624 if (ovsdb_jsonrpc_monitor_find(s, monitor_id)) {
625 error = ovsdb_syntax_error(monitor_id, NULL, "duplicate monitor ID");
629 m = xzalloc(sizeof *m);
630 ovsdb_replica_init(&m->replica, &ovsdb_jsonrpc_replica_class);
631 ovsdb_add_replica(s->server->db, &m->replica);
633 hmap_insert(&s->monitors, &m->node, json_hash(monitor_id, 0));
634 m->monitor_id = json_clone(monitor_id);
635 shash_init(&m->tables);
637 SHASH_FOR_EACH (node, json_object(monitor_requests)) {
638 const struct ovsdb_table *table;
639 struct ovsdb_jsonrpc_monitor_table *mt;
640 const struct json *columns_json, *select_json;
641 struct ovsdb_parser parser;
643 table = ovsdb_get_table(s->server->db, node->name);
645 error = ovsdb_syntax_error(NULL, NULL,
646 "no table named %s", node->name);
650 mt = xzalloc(sizeof *mt);
652 mt->select = OJMS_INITIAL | OJMS_INSERT | OJMS_DELETE | OJMS_MODIFY;
653 ovsdb_column_set_init(&mt->columns);
654 shash_add(&m->tables, table->schema->name, mt);
656 ovsdb_parser_init(&parser, node->data, "table %s", node->name);
657 columns_json = ovsdb_parser_member(&parser, "columns",
658 OP_ARRAY | OP_OPTIONAL);
659 select_json = ovsdb_parser_member(&parser, "select",
660 OP_OBJECT | OP_OPTIONAL);
661 error = ovsdb_parser_finish(&parser);
667 error = ovsdb_column_set_from_json(columns_json, table,
673 struct shash_node *node;
675 SHASH_FOR_EACH (node, &table->schema->columns) {
676 const struct ovsdb_column *column = node->data;
677 if (column->index != OVSDB_COL_UUID) {
678 ovsdb_column_set_add(&mt->columns, column);
685 ovsdb_parser_init(&parser, select_json, "table %s select",
686 table->schema->name);
687 if (parse_bool(&parser, "initial", true)) {
688 mt->select |= OJMS_INITIAL;
690 if (parse_bool(&parser, "insert", true)) {
691 mt->select |= OJMS_INSERT;
693 if (parse_bool(&parser, "delete", true)) {
694 mt->select |= OJMS_DELETE;
696 if (parse_bool(&parser, "modify", true)) {
697 mt->select |= OJMS_MODIFY;
699 error = ovsdb_parser_finish(&parser);
706 return ovsdb_jsonrpc_monitor_get_initial(m);
710 ovsdb_remove_replica(s->server->db, &m->replica);
713 json = ovsdb_error_to_json(error);
714 ovsdb_error_destroy(error);
718 static struct jsonrpc_msg *
719 ovsdb_jsonrpc_monitor_cancel(struct ovsdb_jsonrpc_session *s,
720 struct json_array *params,
721 const struct json *request_id)
723 if (params->n != 1) {
724 return jsonrpc_create_error(json_string_create("invalid parameters"),
727 struct ovsdb_jsonrpc_monitor *m;
729 m = ovsdb_jsonrpc_monitor_find(s, params->elems[0]);
731 return jsonrpc_create_error(json_string_create("unknown monitor"),
734 ovsdb_remove_replica(s->server->db, &m->replica);
735 return jsonrpc_create_reply(json_object_create(), request_id);
741 ovsdb_jsonrpc_monitor_remove_all(struct ovsdb_jsonrpc_session *s)
743 struct ovsdb_jsonrpc_monitor *m, *next;
745 HMAP_FOR_EACH_SAFE (m, next,
746 struct ovsdb_jsonrpc_monitor, node, &s->monitors) {
747 ovsdb_remove_replica(s->server->db, &m->replica);
751 static struct ovsdb_jsonrpc_monitor *
752 ovsdb_jsonrpc_monitor_cast(struct ovsdb_replica *replica)
754 assert(replica->class == &ovsdb_jsonrpc_replica_class);
755 return CONTAINER_OF(replica, struct ovsdb_jsonrpc_monitor, replica);
758 struct ovsdb_jsonrpc_monitor_aux {
759 bool initial; /* Sending initial contents of table? */
760 const struct ovsdb_jsonrpc_monitor *monitor;
761 struct json *json; /* JSON for the whole transaction. */
764 struct ovsdb_jsonrpc_monitor_table *mt;
765 struct json *table_json; /* JSON for table's transaction. */
769 ovsdb_jsonrpc_monitor_change_cb(const struct ovsdb_row *old,
770 const struct ovsdb_row *new,
773 struct ovsdb_jsonrpc_monitor_aux *aux = aux_;
774 const struct ovsdb_jsonrpc_monitor *m = aux->monitor;
775 struct ovsdb_table *table = new ? new->table : old->table;
776 enum ovsdb_jsonrpc_monitor_selection type;
777 struct json *old_json, *new_json;
778 struct json *row_json;
779 char uuid[UUID_LEN + 1];
783 if (!aux->mt || table != aux->mt->table) {
784 aux->mt = shash_find_data(&m->tables, table->schema->name);
785 aux->table_json = NULL;
787 /* We don't care about rows in this table at all. Tell the caller
793 type = (aux->initial ? OJMS_INITIAL
797 if (!(aux->mt->select & type)) {
798 /* We don't care about this type of change (but do want to be called
799 * back for changes to other rows in the same table). */
803 old_json = new_json = NULL;
805 for (i = 0; i < aux->mt->columns.n_columns; i++) {
806 const struct ovsdb_column *column = aux->mt->columns.columns[i];
807 unsigned int idx = column->index;
808 bool changed = false;
810 if (type == OJMS_MODIFY) {
811 changed = !ovsdb_datum_equals(&old->fields[idx],
812 &new->fields[idx], &column->type);
813 n_changed += changed;
815 if (changed || type == OJMS_DELETE) {
817 old_json = json_object_create();
819 json_object_put(old_json, column->name,
820 ovsdb_datum_to_json(&old->fields[idx],
823 if (type & (OJMS_INITIAL | OJMS_INSERT | OJMS_MODIFY)) {
825 new_json = json_object_create();
827 json_object_put(new_json, column->name,
828 ovsdb_datum_to_json(&new->fields[idx],
832 if ((type == OJMS_MODIFY && !n_changed) || (!old_json && !new_json)) {
833 /* No reportable changes. */
834 json_destroy(old_json);
835 json_destroy(new_json);
839 /* Create JSON object for transaction overall. */
841 aux->json = json_object_create();
844 /* Create JSON object for transaction on this table. */
845 if (!aux->table_json) {
846 aux->table_json = json_object_create();
847 json_object_put(aux->json, aux->mt->table->schema->name,
851 /* Create JSON object for transaction on this row. */
852 row_json = json_object_create();
854 json_object_put(row_json, "old", old_json);
857 json_object_put(row_json, "new", new_json);
860 /* Add JSON row to JSON table. */
861 snprintf(uuid, sizeof uuid,
862 UUID_FMT, UUID_ARGS(ovsdb_row_get_uuid(new ? new : old)));
863 json_object_put(aux->table_json, uuid, row_json);
869 ovsdb_jsonrpc_monitor_init_aux(struct ovsdb_jsonrpc_monitor_aux *aux,
870 const struct ovsdb_jsonrpc_monitor *m,
873 aux->initial = initial;
877 aux->table_json = NULL;
880 static struct ovsdb_error *
881 ovsdb_jsonrpc_monitor_commit(struct ovsdb_replica *replica,
882 const struct ovsdb_txn *txn, bool durable UNUSED)
884 struct ovsdb_jsonrpc_monitor *m = ovsdb_jsonrpc_monitor_cast(replica);
885 struct ovsdb_jsonrpc_monitor_aux aux;
887 ovsdb_jsonrpc_monitor_init_aux(&aux, m, false);
888 ovsdb_txn_for_each_change(txn, ovsdb_jsonrpc_monitor_change_cb, &aux);
890 struct jsonrpc_msg *msg;
893 params = json_array_create_2(json_clone(aux.monitor->monitor_id),
895 msg = jsonrpc_create_notify("update", params);
896 jsonrpc_send(aux.monitor->session->rpc, msg);
903 ovsdb_jsonrpc_monitor_get_initial(const struct ovsdb_jsonrpc_monitor *m)
905 struct ovsdb_jsonrpc_monitor_aux aux;
906 struct shash_node *node;
908 ovsdb_jsonrpc_monitor_init_aux(&aux, m, true);
909 SHASH_FOR_EACH (node, &m->tables) {
910 struct ovsdb_jsonrpc_monitor_table *mt = node->data;
912 if (mt->select & OJMS_INITIAL) {
913 struct ovsdb_row *row;
915 HMAP_FOR_EACH (row, struct ovsdb_row, hmap_node,
917 ovsdb_jsonrpc_monitor_change_cb(NULL, row, &aux);
921 return aux.json ? aux.json : json_object_create();
925 ovsdb_jsonrpc_monitor_destroy(struct ovsdb_replica *replica)
927 struct ovsdb_jsonrpc_monitor *m = ovsdb_jsonrpc_monitor_cast(replica);
928 struct shash_node *node;
930 json_destroy(m->monitor_id);
931 SHASH_FOR_EACH (node, &m->tables) {
932 struct ovsdb_jsonrpc_monitor_table *mt = node->data;
933 ovsdb_column_set_destroy(&mt->columns);
936 shash_destroy(&m->tables);
937 hmap_remove(&m->session->monitors, &m->node);
941 static const struct ovsdb_replica_class ovsdb_jsonrpc_replica_class = {
942 ovsdb_jsonrpc_monitor_commit,
943 ovsdb_jsonrpc_monitor_destroy