ovsdb-idl: Make it possible to write data through the IDL.
[openvswitch] / lib / ovsdb-idl.c
1 /* Copyright (c) 2009 Nicira Networks.
2  *
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at:
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15
16 #include <config.h>
17
18 #include "ovsdb-idl.h"
19
20 #include <assert.h>
21 #include <errno.h>
22 #include <limits.h>
23 #include <stdlib.h>
24
25 #include "bitmap.h"
26 #include "json.h"
27 #include "jsonrpc.h"
28 #include "ovsdb-data.h"
29 #include "ovsdb-error.h"
30 #include "ovsdb-idl-provider.h"
31 #include "shash.h"
32 #include "util.h"
33
34 #define THIS_MODULE VLM_ovsdb_idl
35 #include "vlog.h"
36
37 /* An arc from one idl_row to another.  When row A contains a UUID that
38  * references row B, this is represented by an arc from A (the source) to B
39  * (the destination).
40  *
41  * Arcs from a row to itself are omitted, that is, src and dst are always
42  * different.
43  *
44  * Arcs are never duplicated, that is, even if there are multiple references
45  * from A to B, there is only a single arc from A to B.
46  *
47  * Arcs are directed: an arc from A to B is the converse of an an arc from B to
48  * A.  Both an arc and its converse may both be present, if each row refers
49  * to the other circularly.
50  *
51  * The source and destination row may be in the same table or in different
52  * tables.
53  */
54 struct ovsdb_idl_arc {
55     struct list src_node;       /* In src->src_arcs list. */
56     struct list dst_node;       /* In dst->dst_arcs list. */
57     struct ovsdb_idl_row *src;  /* Source row. */
58     struct ovsdb_idl_row *dst;  /* Destination row. */
59 };
60
61 struct ovsdb_idl {
62     const struct ovsdb_idl_class *class;
63     struct jsonrpc_session *session;
64     struct shash table_by_name;
65     struct ovsdb_idl_table *tables;
66     struct json *monitor_request_id;
67     unsigned int last_monitor_request_seqno;
68     unsigned int change_seqno;
69
70     /* Transaction support. */
71     struct ovsdb_idl_txn *txn;
72     struct hmap outstanding_txns;
73 };
74
75 struct ovsdb_idl_txn {
76     struct hmap_node hmap_node;
77     struct json *request_id;
78     struct ovsdb_idl *idl;
79     struct hmap txn_rows;
80     enum ovsdb_idl_txn_status status;
81 };
82
83 static struct vlog_rate_limit syntax_rl = VLOG_RATE_LIMIT_INIT(1, 5);
84 static struct vlog_rate_limit semantic_rl = VLOG_RATE_LIMIT_INIT(1, 5);
85
86 static void ovsdb_idl_clear(struct ovsdb_idl *);
87 static void ovsdb_idl_send_monitor_request(struct ovsdb_idl *);
88 static void ovsdb_idl_parse_update(struct ovsdb_idl *, const struct json *);
89 static struct ovsdb_error *ovsdb_idl_parse_update__(struct ovsdb_idl *,
90                                                     const struct json *);
91 static void ovsdb_idl_process_update(struct ovsdb_idl_table *,
92                                      const struct uuid *,
93                                      const struct json *old,
94                                      const struct json *new);
95 static void ovsdb_idl_insert_row(struct ovsdb_idl_row *, const struct json *);
96 static void ovsdb_idl_delete_row(struct ovsdb_idl_row *);
97 static void ovsdb_idl_modify_row(struct ovsdb_idl_row *, const struct json *);
98
99 static bool ovsdb_idl_row_is_orphan(const struct ovsdb_idl_row *);
100 static struct ovsdb_idl_row *ovsdb_idl_row_create__(
101     const struct ovsdb_idl_table_class *);
102 static struct ovsdb_idl_row *ovsdb_idl_row_create(struct ovsdb_idl_table *,
103                                                   const struct uuid *);
104 static void ovsdb_idl_row_destroy(struct ovsdb_idl_row *);
105
106 static void ovsdb_idl_row_clear_old(struct ovsdb_idl_row *);
107 static void ovsdb_idl_row_clear_new(struct ovsdb_idl_row *);
108
109 static void ovsdb_idl_txn_abort_all(struct ovsdb_idl *);
110 static bool ovsdb_idl_txn_process_reply(struct ovsdb_idl *,
111                                         const struct jsonrpc_msg *msg);
112
113 struct ovsdb_idl *
114 ovsdb_idl_create(const char *remote, const struct ovsdb_idl_class *class)
115 {
116     struct ovsdb_idl *idl;
117     size_t i;
118
119     idl = xzalloc(sizeof *idl);
120     idl->class = class;
121     idl->session = jsonrpc_session_open(remote);
122     shash_init(&idl->table_by_name);
123     idl->tables = xmalloc(class->n_tables * sizeof *idl->tables);
124     for (i = 0; i < class->n_tables; i++) {
125         const struct ovsdb_idl_table_class *tc = &class->tables[i];
126         struct ovsdb_idl_table *table = &idl->tables[i];
127         size_t j;
128
129         assert(!shash_find(&idl->table_by_name, tc->name));
130         shash_add(&idl->table_by_name, tc->name, table);
131         table->class = tc;
132         shash_init(&table->columns);
133         for (j = 0; j < tc->n_columns; j++) {
134             const struct ovsdb_idl_column *column = &tc->columns[j];
135
136             assert(!shash_find(&table->columns, column->name));
137             shash_add(&table->columns, column->name, column);
138         }
139         hmap_init(&table->rows);
140         table->idl = idl;
141     }
142     idl->last_monitor_request_seqno = UINT_MAX;
143     hmap_init(&idl->outstanding_txns);
144
145     return idl;
146 }
147
148 void
149 ovsdb_idl_destroy(struct ovsdb_idl *idl)
150 {
151     if (idl) {
152         size_t i;
153
154         assert(!idl->txn);
155         ovsdb_idl_clear(idl);
156         jsonrpc_session_close(idl->session);
157
158         for (i = 0; i < idl->class->n_tables; i++) {
159             struct ovsdb_idl_table *table = &idl->tables[i];
160             shash_destroy(&table->columns);
161             hmap_destroy(&table->rows);
162         }
163         shash_destroy(&idl->table_by_name);
164         free(idl->tables);
165         json_destroy(idl->monitor_request_id);
166         free(idl);
167     }
168 }
169
170 static void
171 ovsdb_idl_clear(struct ovsdb_idl *idl)
172 {
173     bool changed = false;
174     size_t i;
175
176     for (i = 0; i < idl->class->n_tables; i++) {
177         struct ovsdb_idl_table *table = &idl->tables[i];
178         struct ovsdb_idl_row *row, *next_row;
179
180         if (hmap_is_empty(&table->rows)) {
181             continue;
182         }
183
184         changed = true;
185         HMAP_FOR_EACH_SAFE (row, next_row, struct ovsdb_idl_row, hmap_node,
186                             &table->rows) {
187             struct ovsdb_idl_arc *arc, *next_arc;
188
189             if (!ovsdb_idl_row_is_orphan(row)) {
190                 (row->table->class->unparse)(row);
191                 ovsdb_idl_row_clear_old(row);
192             }
193             hmap_remove(&table->rows, &row->hmap_node);
194             LIST_FOR_EACH_SAFE (arc, next_arc, struct ovsdb_idl_arc, src_node,
195                                 &row->src_arcs) {
196                 free(arc);
197             }
198             /* No need to do anything with dst_arcs: some node has those arcs
199              * as forward arcs and will destroy them itself. */
200
201             free(row);
202         }
203     }
204
205     if (changed) {
206         idl->change_seqno++;
207     }
208 }
209
210 void
211 ovsdb_idl_run(struct ovsdb_idl *idl)
212 {
213     int i;
214
215     jsonrpc_session_run(idl->session);
216     for (i = 0; jsonrpc_session_is_connected(idl->session) && i < 50; i++) {
217         struct jsonrpc_msg *msg, *reply;
218         unsigned int seqno;
219
220         seqno = jsonrpc_session_get_seqno(idl->session);
221         if (idl->last_monitor_request_seqno != seqno) {
222             idl->last_monitor_request_seqno = seqno;
223             ovsdb_idl_txn_abort_all(idl);
224             ovsdb_idl_send_monitor_request(idl);
225             break;
226         }
227
228         msg = jsonrpc_session_recv(idl->session);
229         if (!msg) {
230             break;
231         }
232
233         reply = NULL;
234         if (msg->type == JSONRPC_REQUEST && !strcmp(msg->method, "echo")) {
235             reply = jsonrpc_create_reply(json_clone(msg->params), msg->id);
236         } else if (msg->type == JSONRPC_NOTIFY
237                    && !strcmp(msg->method, "update")
238                    && msg->params->type == JSON_ARRAY
239                    && msg->params->u.array.n == 2
240                    && msg->params->u.array.elems[0]->type == JSON_NULL) {
241             ovsdb_idl_parse_update(idl, msg->params->u.array.elems[1]);
242         } else if (msg->type == JSONRPC_REPLY
243                    && idl->monitor_request_id
244                    && json_equal(idl->monitor_request_id, msg->id)) {
245             json_destroy(idl->monitor_request_id);
246             idl->monitor_request_id = NULL;
247             ovsdb_idl_clear(idl);
248             ovsdb_idl_parse_update(idl, msg->result);
249         } else if (msg->type == JSONRPC_REPLY
250                    && msg->id && msg->id->type == JSON_STRING
251                    && !strcmp(msg->id->u.string, "echo")) {
252             /* It's a reply to our echo request.  Ignore it. */
253         } else if ((msg->type == JSONRPC_ERROR
254                     || msg->type == JSONRPC_REPLY)
255                    && ovsdb_idl_txn_process_reply(idl, msg)) {
256             /* ovsdb_idl_txn_process_reply() did everything needful. */
257         } else {
258             VLOG_WARN("%s: received unexpected %s message",
259                       jsonrpc_session_get_name(idl->session),
260                       jsonrpc_msg_type_to_string(msg->type));
261             jsonrpc_session_force_reconnect(idl->session);
262         }
263         if (reply) {
264             jsonrpc_session_send(idl->session, reply);
265         }
266         jsonrpc_msg_destroy(msg);
267     }
268 }
269
270 void
271 ovsdb_idl_wait(struct ovsdb_idl *idl)
272 {
273     jsonrpc_session_wait(idl->session);
274     jsonrpc_session_recv_wait(idl->session);
275 }
276
277 unsigned int
278 ovsdb_idl_get_seqno(const struct ovsdb_idl *idl)
279 {
280     return idl->change_seqno;
281 }
282
283 void
284 ovsdb_idl_force_reconnect(struct ovsdb_idl *idl)
285 {
286     jsonrpc_session_force_reconnect(idl->session);
287 }
288 \f
289 static void
290 ovsdb_idl_send_monitor_request(struct ovsdb_idl *idl)
291 {
292     struct json *monitor_requests;
293     struct jsonrpc_msg *msg;
294     size_t i;
295
296     monitor_requests = json_object_create();
297     for (i = 0; i < idl->class->n_tables; i++) {
298         const struct ovsdb_idl_table *table = &idl->tables[i];
299         const struct ovsdb_idl_table_class *tc = table->class;
300         struct json *monitor_request, *columns;
301         size_t i;
302
303         monitor_request = json_object_create();
304         columns = json_array_create_empty();
305         for (i = 0; i < tc->n_columns; i++) {
306             const struct ovsdb_idl_column *column = &tc->columns[i];
307             json_array_add(columns, json_string_create(column->name));
308         }
309         json_object_put(monitor_request, "columns", columns);
310         json_object_put(monitor_requests, tc->name, monitor_request);
311     }
312
313     json_destroy(idl->monitor_request_id);
314     msg = jsonrpc_create_request(
315         "monitor", json_array_create_2(json_null_create(), monitor_requests),
316         &idl->monitor_request_id);
317     jsonrpc_session_send(idl->session, msg);
318 }
319
320 static void
321 ovsdb_idl_parse_update(struct ovsdb_idl *idl, const struct json *table_updates)
322 {
323     struct ovsdb_error *error;
324
325     idl->change_seqno++;
326
327     error = ovsdb_idl_parse_update__(idl, table_updates);
328     if (error) {
329         if (!VLOG_DROP_WARN(&syntax_rl)) {
330             char *s = ovsdb_error_to_string(error);
331             VLOG_WARN_RL(&syntax_rl, "%s", s);
332             free(s);
333         }
334         ovsdb_error_destroy(error);
335     }
336 }
337
338 static struct ovsdb_error *
339 ovsdb_idl_parse_update__(struct ovsdb_idl *idl,
340                          const struct json *table_updates)
341 {
342     const struct shash_node *tables_node;
343
344     if (table_updates->type != JSON_OBJECT) {
345         return ovsdb_syntax_error(table_updates, NULL,
346                                   "<table-updates> is not an object");
347     }
348     SHASH_FOR_EACH (tables_node, json_object(table_updates)) {
349         const struct json *table_update = tables_node->data;
350         const struct shash_node *table_node;
351         struct ovsdb_idl_table *table;
352
353         table = shash_find_data(&idl->table_by_name, tables_node->name);
354         if (!table) {
355             return ovsdb_syntax_error(
356                 table_updates, NULL,
357                 "<table-updates> includes unknown table \"%s\"",
358                 tables_node->name);
359         }
360
361         if (table_update->type != JSON_OBJECT) {
362             return ovsdb_syntax_error(table_update, NULL,
363                                       "<table-update> for table \"%s\" is "
364                                       "not an object", table->class->name);
365         }
366         SHASH_FOR_EACH (table_node, json_object(table_update)) {
367             const struct json *row_update = table_node->data;
368             const struct json *old_json, *new_json;
369             struct uuid uuid;
370
371             if (!uuid_from_string(&uuid, table_node->name)) {
372                 return ovsdb_syntax_error(table_update, NULL,
373                                           "<table-update> for table \"%s\" "
374                                           "contains bad UUID "
375                                           "\"%s\" as member name",
376                                           table->class->name,
377                                           table_node->name);
378             }
379             if (row_update->type != JSON_OBJECT) {
380                 return ovsdb_syntax_error(row_update, NULL,
381                                           "<table-update> for table \"%s\" "
382                                           "contains <row-update> for %s that "
383                                           "is not an object",
384                                           table->class->name,
385                                           table_node->name);
386             }
387
388             old_json = shash_find_data(json_object(row_update), "old");
389             new_json = shash_find_data(json_object(row_update), "new");
390             if (old_json && old_json->type != JSON_OBJECT) {
391                 return ovsdb_syntax_error(old_json, NULL,
392                                           "\"old\" <row> is not object");
393             } else if (new_json && new_json->type != JSON_OBJECT) {
394                 return ovsdb_syntax_error(new_json, NULL,
395                                           "\"new\" <row> is not object");
396             } else if ((old_json != NULL) + (new_json != NULL)
397                        != shash_count(json_object(row_update))) {
398                 return ovsdb_syntax_error(row_update, NULL,
399                                           "<row-update> contains unexpected "
400                                           "member");
401             } else if (!old_json && !new_json) {
402                 return ovsdb_syntax_error(row_update, NULL,
403                                           "<row-update> missing \"old\" "
404                                           "and \"new\" members");
405             }
406
407             ovsdb_idl_process_update(table, &uuid, old_json, new_json);
408         }
409     }
410
411     return NULL;
412 }
413
414 static struct ovsdb_idl_row *
415 ovsdb_idl_get_row(struct ovsdb_idl_table *table, const struct uuid *uuid)
416 {
417     struct ovsdb_idl_row *row;
418
419     HMAP_FOR_EACH_WITH_HASH (row, struct ovsdb_idl_row, hmap_node,
420                              uuid_hash(uuid), &table->rows) {
421         if (uuid_equals(&row->uuid, uuid)) {
422             return row;
423         }
424     }
425     return NULL;
426 }
427
428 static void
429 ovsdb_idl_process_update(struct ovsdb_idl_table *table,
430                          const struct uuid *uuid, const struct json *old,
431                          const struct json *new)
432 {
433     struct ovsdb_idl_row *row;
434
435     row = ovsdb_idl_get_row(table, uuid);
436     if (!new) {
437         /* Delete row. */
438         if (row && !ovsdb_idl_row_is_orphan(row)) {
439             /* XXX perhaps we should check the 'old' values? */
440             ovsdb_idl_delete_row(row);
441         } else {
442             VLOG_WARN_RL(&semantic_rl, "cannot delete missing row "UUID_FMT" "
443                          "from table %s",
444                          UUID_ARGS(uuid), table->class->name);
445         }
446     } else if (!old) {
447         /* Insert row. */
448         if (!row) {
449             ovsdb_idl_insert_row(ovsdb_idl_row_create(table, uuid), new);
450         } else if (ovsdb_idl_row_is_orphan(row)) {
451             ovsdb_idl_insert_row(row, new);
452         } else {
453             VLOG_WARN_RL(&semantic_rl, "cannot add existing row "UUID_FMT" to "
454                          "table %s", UUID_ARGS(uuid), table->class->name);
455             ovsdb_idl_modify_row(row, new);
456         }
457     } else {
458         /* Modify row. */
459         if (row) {
460             /* XXX perhaps we should check the 'old' values? */
461             if (!ovsdb_idl_row_is_orphan(row)) {
462                 ovsdb_idl_modify_row(row, new);
463             } else {
464                 VLOG_WARN_RL(&semantic_rl, "cannot modify missing but "
465                              "referenced row "UUID_FMT" in table %s",
466                              UUID_ARGS(uuid), table->class->name);
467                 ovsdb_idl_insert_row(row, new);
468             }
469         } else {
470             VLOG_WARN_RL(&semantic_rl, "cannot modify missing row "UUID_FMT" "
471                          "in table %s", UUID_ARGS(uuid), table->class->name);
472             ovsdb_idl_insert_row(ovsdb_idl_row_create(table, uuid), new);
473         }
474     }
475 }
476
477 static void
478 ovsdb_idl_row_update(struct ovsdb_idl_row *row, const struct json *row_json)
479 {
480     struct ovsdb_idl_table *table = row->table;
481     struct shash_node *node;
482
483     SHASH_FOR_EACH (node, json_object(row_json)) {
484         const char *column_name = node->name;
485         const struct ovsdb_idl_column *column;
486         struct ovsdb_datum datum;
487         struct ovsdb_error *error;
488
489         column = shash_find_data(&table->columns, column_name);
490         if (!column) {
491             VLOG_WARN_RL(&syntax_rl, "unknown column %s updating row "UUID_FMT,
492                          column_name, UUID_ARGS(&row->uuid));
493             continue;
494         }
495
496         error = ovsdb_datum_from_json(&datum, &column->type, node->data, NULL);
497         if (!error) {
498             ovsdb_datum_swap(&row->old[column - table->class->columns],
499                              &datum);
500             ovsdb_datum_destroy(&datum, &column->type);
501         } else {
502             char *s = ovsdb_error_to_string(error);
503             VLOG_WARN_RL(&syntax_rl, "error parsing column %s in row "UUID_FMT
504                          " in table %s: %s", column_name,
505                          UUID_ARGS(&row->uuid), table->class->name, s);
506             free(s);
507             ovsdb_error_destroy(error);
508         }
509     }
510 }
511
512 static bool
513 ovsdb_idl_row_is_orphan(const struct ovsdb_idl_row *row)
514 {
515     return !row->old;
516 }
517
518 static void
519 ovsdb_idl_row_clear_old(struct ovsdb_idl_row *row)
520 {
521     assert(row->old == row->new);
522     if (!ovsdb_idl_row_is_orphan(row)) {
523         const struct ovsdb_idl_table_class *class = row->table->class;
524         size_t i;
525
526         for (i = 0; i < class->n_columns; i++) {
527             ovsdb_datum_destroy(&row->old[i], &class->columns[i].type);
528         }
529         free(row->old);
530         row->old = row->new = NULL;
531     }
532 }
533
534 static void
535 ovsdb_idl_row_clear_new(struct ovsdb_idl_row *row)
536 {
537     if (row->old != row->new) {
538         if (row->new) {
539             const struct ovsdb_idl_table_class *class = row->table->class;
540             size_t i;
541
542             BITMAP_FOR_EACH_1 (i, class->n_columns, row->written) {
543                 ovsdb_datum_destroy(&row->new[i], &class->columns[i].type);
544             }
545             free(row->new);
546             free(row->written);
547             row->written = NULL;
548         }
549         row->new = row->old;
550     }
551 }
552
553 static void
554 ovsdb_idl_row_clear_arcs(struct ovsdb_idl_row *row, bool destroy_dsts)
555 {
556     struct ovsdb_idl_arc *arc, *next;
557
558     /* Delete all forward arcs.  If 'destroy_dsts', destroy any orphaned rows
559      * that this causes to be unreferenced. */
560     LIST_FOR_EACH_SAFE (arc, next, struct ovsdb_idl_arc, src_node,
561                         &row->src_arcs) {
562         list_remove(&arc->dst_node);
563         if (destroy_dsts
564             && ovsdb_idl_row_is_orphan(arc->dst)
565             && list_is_empty(&arc->dst->dst_arcs)) {
566             ovsdb_idl_row_destroy(arc->dst);
567         }
568         free(arc);
569     }
570     list_init(&row->src_arcs);
571 }
572
573 /* Force nodes that reference 'row' to reparse. */
574 static void
575 ovsdb_idl_row_reparse_backrefs(struct ovsdb_idl_row *row, bool destroy_dsts)
576 {
577     struct ovsdb_idl_arc *arc, *next;
578
579     /* This is trickier than it looks.  ovsdb_idl_row_clear_arcs() will destroy
580      * 'arc', so we need to use the "safe" variant of list traversal.  However,
581      * calling ref->table->class->parse will add an arc equivalent to 'arc' to
582      * row->arcs.  That could be a problem for traversal, but it adds it at the
583      * beginning of the list to prevent us from stumbling upon it again.
584      *
585      * (If duplicate arcs were possible then we would need to make sure that
586      * 'next' didn't also point into 'arc''s destination, but we forbid
587      * duplicate arcs.) */
588     LIST_FOR_EACH_SAFE (arc, next, struct ovsdb_idl_arc, dst_node,
589                         &row->dst_arcs) {
590         struct ovsdb_idl_row *ref = arc->src;
591
592         (ref->table->class->unparse)(ref);
593         ovsdb_idl_row_clear_arcs(ref, destroy_dsts);
594         (ref->table->class->parse)(ref);
595     }
596 }
597
598 static struct ovsdb_idl_row *
599 ovsdb_idl_row_create__(const struct ovsdb_idl_table_class *class)
600 {
601     struct ovsdb_idl_row *row = xmalloc(class->allocation_size);
602     memset(row, 0, sizeof *row);
603     list_init(&row->src_arcs);
604     list_init(&row->dst_arcs);
605     hmap_node_nullify(&row->txn_node);
606     return row;
607 }
608
609 static struct ovsdb_idl_row *
610 ovsdb_idl_row_create(struct ovsdb_idl_table *table, const struct uuid *uuid)
611 {
612     struct ovsdb_idl_row *row = ovsdb_idl_row_create__(table->class);
613     hmap_insert(&table->rows, &row->hmap_node, uuid_hash(uuid));
614     row->uuid = *uuid;
615     row->table = table;
616     return row;
617 }
618
619 static void
620 ovsdb_idl_row_destroy(struct ovsdb_idl_row *row)
621 {
622     if (row) {
623         ovsdb_idl_row_clear_old(row);
624         hmap_remove(&row->table->rows, &row->hmap_node);
625         free(row);
626     }
627 }
628
629 static void
630 ovsdb_idl_insert_row(struct ovsdb_idl_row *row, const struct json *row_json)
631 {
632     const struct ovsdb_idl_table_class *class = row->table->class;
633     size_t i;
634
635     assert(!row->old && !row->new);
636     row->old = row->new = xmalloc(class->n_columns * sizeof *row->old);
637     for (i = 0; i < class->n_columns; i++) {
638         ovsdb_datum_init_default(&row->old[i], &class->columns[i].type);
639     }
640     ovsdb_idl_row_update(row, row_json);
641     (class->parse)(row);
642
643     ovsdb_idl_row_reparse_backrefs(row, false);
644 }
645
646 static void
647 ovsdb_idl_delete_row(struct ovsdb_idl_row *row)
648 {
649     (row->table->class->unparse)(row);
650     ovsdb_idl_row_clear_arcs(row, true);
651     ovsdb_idl_row_clear_old(row);
652     if (list_is_empty(&row->dst_arcs)) {
653         ovsdb_idl_row_destroy(row);
654     } else {
655         ovsdb_idl_row_reparse_backrefs(row, true);
656     }
657 }
658
659 static void
660 ovsdb_idl_modify_row(struct ovsdb_idl_row *row, const struct json *row_json)
661 {
662     (row->table->class->unparse)(row);
663     ovsdb_idl_row_clear_arcs(row, true);
664     ovsdb_idl_row_update(row, row_json);
665     (row->table->class->parse)(row);
666 }
667
668 static bool
669 may_add_arc(const struct ovsdb_idl_row *src, const struct ovsdb_idl_row *dst)
670 {
671     const struct ovsdb_idl_arc *arc;
672
673     /* No self-arcs. */
674     if (src == dst) {
675         return false;
676     }
677
678     /* No duplicate arcs.
679      *
680      * We only need to test whether the first arc in dst->dst_arcs originates
681      * at 'src', since we add all of the arcs from a given source in a clump
682      * (in a single call to a row's ->parse function) and new arcs are always
683      * added at the front of the dst_arcs list. */
684     if (list_is_empty(&dst->dst_arcs)) {
685         return true;
686     }
687     arc = CONTAINER_OF(dst->dst_arcs.next, struct ovsdb_idl_arc, dst_node);
688     return arc->src != src;
689 }
690
691 static struct ovsdb_idl_table *
692 ovsdb_idl_table_from_class(const struct ovsdb_idl *idl,
693                            const struct ovsdb_idl_table_class *table_class)
694 {
695     return &idl->tables[table_class - idl->class->tables];
696 }
697
698 struct ovsdb_idl_row *
699 ovsdb_idl_get_row_arc(struct ovsdb_idl_row *src,
700                       struct ovsdb_idl_table_class *dst_table_class,
701                       const struct uuid *dst_uuid)
702 {
703     struct ovsdb_idl *idl = src->table->idl;
704     struct ovsdb_idl_table *dst_table;
705     struct ovsdb_idl_arc *arc;
706     struct ovsdb_idl_row *dst;
707
708     dst_table = ovsdb_idl_table_from_class(idl, dst_table_class);
709     dst = ovsdb_idl_get_row(dst_table, dst_uuid);
710     if (!dst) {
711         dst = ovsdb_idl_row_create(dst_table, dst_uuid);
712     }
713
714     /* Add a new arc, if it wouldn't be a self-arc or a duplicate arc. */
715     if (may_add_arc(src, dst)) {
716         /* The arc *must* be added at the front of the dst_arcs list.  See
717          * ovsdb_idl_row_reparse_backrefs() for details. */
718         arc = xmalloc(sizeof *arc);
719         list_push_front(&src->src_arcs, &arc->src_node);
720         list_push_front(&dst->dst_arcs, &arc->dst_node);
721         arc->src = src;
722         arc->dst = dst;
723     }
724
725     return !ovsdb_idl_row_is_orphan(dst) ? dst : NULL;
726 }
727
728 static struct ovsdb_idl_row *
729 next_real_row(struct ovsdb_idl_table *table, struct hmap_node *node)
730 {
731     for (; node; node = hmap_next(&table->rows, node)) {
732         struct ovsdb_idl_row *row;
733
734         row = CONTAINER_OF(node, struct ovsdb_idl_row, hmap_node);
735         if (!ovsdb_idl_row_is_orphan(row)) {
736             return row;
737         }
738     }
739     return NULL;
740 }
741
742 struct ovsdb_idl_row *
743 ovsdb_idl_first_row(const struct ovsdb_idl *idl,
744                     const struct ovsdb_idl_table_class *table_class)
745 {
746     struct ovsdb_idl_table *table
747         = ovsdb_idl_table_from_class(idl, table_class);
748     return next_real_row(table, hmap_first(&table->rows));
749 }
750
751 struct ovsdb_idl_row *
752 ovsdb_idl_next_row(const struct ovsdb_idl_row *row)
753 {
754     struct ovsdb_idl_table *table = row->table;
755
756     return next_real_row(table, hmap_next(&table->rows, &row->hmap_node));
757 }
758 \f
759 /* Transactions. */
760
761 static void ovsdb_idl_txn_complete(struct ovsdb_idl_txn *txn,
762                                    enum ovsdb_idl_txn_status);
763
764 const char *
765 ovsdb_idl_txn_status_to_string(enum ovsdb_idl_txn_status status)
766 {
767     switch (status) {
768     case TXN_INCOMPLETE:
769         return "incomplete";
770     case TXN_ABORTED:
771         return "aborted";
772     case TXN_SUCCESS:
773         return "success";
774     case TXN_TRY_AGAIN:
775         return "try again";
776     case TXN_ERROR:
777         return "error";
778     }
779     return "<unknown>";
780 }
781
782 struct ovsdb_idl_txn *
783 ovsdb_idl_txn_create(struct ovsdb_idl *idl)
784 {
785     struct ovsdb_idl_txn *txn;
786
787     assert(!idl->txn);
788     idl->txn = txn = xmalloc(sizeof *txn);
789     txn->idl = idl;
790     txn->status = TXN_INCOMPLETE;
791     hmap_init(&txn->txn_rows);
792     return txn;
793 }
794
795 void
796 ovsdb_idl_txn_destroy(struct ovsdb_idl_txn *txn)
797 {
798     ovsdb_idl_txn_abort(txn);
799     free(txn);
800 }
801
802 static struct json *
803 where_uuid_equals(const struct uuid *uuid)
804 {
805     return
806         json_array_create_1(
807             json_array_create_3(
808                 json_string_create("_uuid"),
809                 json_string_create("=="),
810                 json_array_create_2(
811                     json_string_create("uuid"),
812                     json_string_create_nocopy(
813                         xasprintf(UUID_FMT, UUID_ARGS(uuid))))));
814 }
815
816 static char *
817 uuid_name_from_uuid(const struct uuid *uuid)
818 {
819     char *name;
820     char *p;
821
822     name = xasprintf("row"UUID_FMT, UUID_ARGS(uuid));
823     for (p = name; *p != '\0'; p++) {
824         if (*p == '-') {
825             *p = '_';
826         }
827     }
828
829     return name;
830 }
831
832 static const struct ovsdb_idl_row *
833 ovsdb_idl_txn_get_row(const struct ovsdb_idl_txn *txn, const struct uuid *uuid)
834 {
835     const struct ovsdb_idl_row *row;
836
837     HMAP_FOR_EACH_WITH_HASH (row, struct ovsdb_idl_row, txn_node,
838                              uuid_hash(uuid), &txn->txn_rows) {
839         if (uuid_equals(&row->uuid, uuid)) {
840             return row;
841         }
842     }
843     return NULL;
844 }
845
846 /* XXX there must be a cleaner way to do this */
847 static struct json *
848 substitute_uuids(struct json *json, const struct ovsdb_idl_txn *txn)
849 {
850     if (json->type == JSON_ARRAY) {
851         struct uuid uuid;
852         size_t i;
853
854         if (json->u.array.n == 2
855             && json->u.array.elems[0]->type == JSON_STRING
856             && json->u.array.elems[1]->type == JSON_STRING
857             && !strcmp(json->u.array.elems[0]->u.string, "uuid")
858             && uuid_from_string(&uuid, json->u.array.elems[1]->u.string)) {
859             const struct ovsdb_idl_row *row;
860
861             row = ovsdb_idl_txn_get_row(txn, &uuid);
862             if (row && !row->old && row->new) {
863                 json_destroy(json);
864
865                 return json_array_create_2(
866                     json_string_create("named-uuid"),
867                     json_string_create_nocopy(uuid_name_from_uuid(&uuid)));
868             }
869         }
870
871         for (i = 0; i < json->u.array.n; i++) {
872             json->u.array.elems[i] = substitute_uuids(json->u.array.elems[i],
873                                                       txn);
874         }
875     } else if (json->type == JSON_OBJECT) {
876         struct shash_node *node;
877
878         SHASH_FOR_EACH (node, json_object(json)) {
879             node->data = substitute_uuids(node->data, txn);
880         }
881     }
882     return json;
883 }
884
885 static void
886 ovsdb_idl_txn_disassemble(struct ovsdb_idl_txn *txn)
887 {
888     struct ovsdb_idl_row *row, *next;
889
890     HMAP_FOR_EACH_SAFE (row, next, struct ovsdb_idl_row, txn_node,
891                         &txn->txn_rows) {
892         ovsdb_idl_row_clear_new(row);
893
894         free(row->prereqs);
895         row->prereqs = NULL;
896
897         free(row->written);
898         row->written = NULL;
899
900         hmap_remove(&txn->txn_rows, &row->txn_node);
901         hmap_node_nullify(&row->txn_node);
902     }
903     hmap_destroy(&txn->txn_rows);
904     hmap_init(&txn->txn_rows);
905 }
906
907 enum ovsdb_idl_txn_status
908 ovsdb_idl_txn_commit(struct ovsdb_idl_txn *txn)
909 {
910     struct ovsdb_idl_row *row;
911     struct json *operations;
912     bool any_updates;
913     enum ovsdb_idl_txn_status status;
914
915     if (txn != txn->idl->txn) {
916         return txn->status;
917     }
918
919     operations = json_array_create_empty();
920
921     /* Add prerequisites and declarations of new rows. */
922     HMAP_FOR_EACH (row, struct ovsdb_idl_row, txn_node, &txn->txn_rows) {
923         /* XXX check that deleted rows exist even if no prereqs? */
924         if (row->prereqs) {
925             const struct ovsdb_idl_table_class *class = row->table->class;
926             size_t n_columns = class->n_columns;
927             struct json *op, *columns, *row_json;
928             size_t idx;
929
930             op = json_object_create();
931             json_array_add(operations, op);
932             json_object_put_string(op, "op", "wait");
933             json_object_put_string(op, "table", class->name);
934             json_object_put(op, "timeout", json_integer_create(0));
935             json_object_put(op, "where", where_uuid_equals(&row->uuid));
936             json_object_put_string(op, "until", "==");
937             columns = json_array_create_empty();
938             json_object_put(op, "columns", columns);
939             row_json = json_object_create();
940             json_object_put(op, "rows", json_array_create_1(row_json));
941
942             BITMAP_FOR_EACH_1 (idx, n_columns, row->prereqs) {
943                 const struct ovsdb_idl_column *column = &class->columns[idx];
944                 json_array_add(columns, json_string_create(column->name));
945                 json_object_put(row_json, column->name,
946                                 ovsdb_datum_to_json(&row->old[idx],
947                                                     &column->type));
948             }
949         }
950         if (row->new && !row->old) {
951             struct json *op;
952
953             op = json_object_create();
954             json_array_add(operations, op);
955             json_object_put_string(op, "op", "declare");
956             json_object_put(op, "uuid-name",
957                             json_string_create_nocopy(
958                                 uuid_name_from_uuid(&row->uuid)));
959         }
960     }
961
962     /* Add updates. */
963     any_updates = false;
964     HMAP_FOR_EACH (row, struct ovsdb_idl_row, txn_node, &txn->txn_rows) {
965         const struct ovsdb_idl_table_class *class = row->table->class;
966         size_t n_columns = class->n_columns;
967         struct json *row_json;
968         size_t idx;
969
970         if (row->old == row->new) {
971             continue;
972         } else if (!row->new) {
973             struct json *op = json_object_create();
974             json_object_put_string(op, "op", "delete");
975             json_object_put_string(op, "table", class->name);
976             json_object_put(op, "where", where_uuid_equals(&row->uuid));
977             json_array_add(operations, op);
978         } else {
979             row_json = NULL;
980             BITMAP_FOR_EACH_1 (idx, n_columns, row->written) {
981                 const struct ovsdb_idl_column *column = &class->columns[idx];
982
983                 if (row->old
984                     && ovsdb_datum_equals(&row->old[idx], &row->new[idx],
985                                           &column->type)) {
986                     continue;
987                 }
988                 if (!row_json) {
989                     struct json *op = json_object_create();
990                     json_array_add(operations, op);
991                     json_object_put_string(op, "op",
992                                            row->old ? "update" : "insert");
993                     json_object_put_string(op, "table", class->name);
994                     if (row->old) {
995                         json_object_put(op, "where",
996                                         where_uuid_equals(&row->uuid));
997                     } else {
998                         json_object_put(op, "uuid-name",
999                                         json_string_create_nocopy(
1000                                             uuid_name_from_uuid(&row->uuid)));
1001                     }
1002                     row_json = json_object_create();
1003                     json_object_put(op, "row", row_json);
1004                     any_updates = true;
1005                 }
1006                 json_object_put(row_json, column->name,
1007                                 substitute_uuids(
1008                                     ovsdb_datum_to_json(&row->new[idx],
1009                                                         &column->type),
1010                                     txn));
1011             }
1012         }
1013     }
1014
1015     status = (!any_updates ? TXN_SUCCESS
1016               : jsonrpc_session_send(
1017                   txn->idl->session,
1018                   jsonrpc_create_request(
1019                       "transact", operations, &txn->request_id))
1020               ? TXN_TRY_AGAIN
1021               : TXN_INCOMPLETE);
1022
1023     hmap_insert(&txn->idl->outstanding_txns, &txn->hmap_node,
1024                 json_hash(txn->request_id, 0));
1025     txn->idl->txn = NULL;
1026
1027     ovsdb_idl_txn_disassemble(txn);
1028     if (status != TXN_INCOMPLETE) {
1029         ovsdb_idl_txn_complete(txn, status);
1030     }
1031     return txn->status;
1032 }
1033
1034 void
1035 ovsdb_idl_txn_abort(struct ovsdb_idl_txn *txn)
1036 {
1037     ovsdb_idl_txn_disassemble(txn);
1038     if (txn->status == TXN_INCOMPLETE) {
1039         txn->status = TXN_ABORTED;
1040     }
1041 }
1042
1043 static void
1044 ovsdb_idl_txn_complete(struct ovsdb_idl_txn *txn,
1045                        enum ovsdb_idl_txn_status status)
1046 {
1047     txn->status = status;
1048     hmap_remove(&txn->idl->outstanding_txns, &txn->hmap_node);
1049 }
1050
1051 void
1052 ovsdb_idl_txn_write(struct ovsdb_idl_row *row,
1053                     const struct ovsdb_idl_column *column,
1054                     struct ovsdb_datum *datum)
1055 {
1056     const struct ovsdb_idl_table_class *class = row->table->class;
1057     size_t column_idx = column - class->columns;
1058
1059     assert(row->new);
1060     if (hmap_node_is_null(&row->txn_node)) {
1061         hmap_insert(&row->table->idl->txn->txn_rows, &row->txn_node,
1062                     uuid_hash(&row->uuid));
1063     }
1064     if (row->old == row->new) {
1065         row->new = xmalloc(class->n_columns * sizeof *row->new);
1066     }
1067     if (!row->written) {
1068         row->written = bitmap_allocate(class->n_columns);
1069     }
1070     if (bitmap_is_set(row->written, column_idx)) {
1071         ovsdb_datum_destroy(&row->new[column_idx], &column->type);
1072     } else {
1073         bitmap_set1(row->written, column_idx);
1074     }
1075     row->new[column_idx] = *datum;
1076 }
1077
1078 void
1079 ovsdb_idl_txn_verify(const struct ovsdb_idl_row *row_,
1080                      const struct ovsdb_idl_column *column)
1081 {
1082     struct ovsdb_idl_row *row = (struct ovsdb_idl_row *) row_;
1083     const struct ovsdb_idl_table_class *class = row->table->class;
1084     size_t column_idx = column - class->columns;
1085
1086     assert(row->new);
1087     if (!row->old
1088         || (row->written && bitmap_is_set(row->written, column_idx))) {
1089         return;
1090     }
1091
1092     if (hmap_node_is_null(&row->txn_node)) {
1093         hmap_insert(&row->table->idl->txn->txn_rows, &row->txn_node,
1094                     uuid_hash(&row->uuid));
1095     }
1096     if (!row->prereqs) {
1097         row->prereqs = bitmap_allocate(class->n_columns);
1098     }
1099     bitmap_set1(row->prereqs, column_idx);
1100 }
1101
1102 void
1103 ovsdb_idl_txn_delete(struct ovsdb_idl_row *row)
1104 {
1105     assert(row->new);
1106     if (!row->old) {
1107         ovsdb_idl_row_clear_new(row);
1108         assert(!row->prereqs);
1109         hmap_remove(&row->table->idl->txn->txn_rows, &row->txn_node);
1110         free(row);
1111     }
1112     if (hmap_node_is_null(&row->txn_node)) {
1113         hmap_insert(&row->table->idl->txn->txn_rows, &row->txn_node,
1114                     uuid_hash(&row->uuid));
1115     }
1116     if (row->new == row->old) {
1117         row->new = NULL;
1118     } else {
1119         ovsdb_idl_row_clear_new(row);
1120     }
1121 }
1122
1123 struct ovsdb_idl_row *
1124 ovsdb_idl_txn_insert(struct ovsdb_idl_txn *txn,
1125                      const struct ovsdb_idl_table_class *class)
1126 {
1127     struct ovsdb_idl_row *row = ovsdb_idl_row_create__(class);
1128     uuid_generate(&row->uuid);
1129     row->table = ovsdb_idl_table_from_class(txn->idl, class);
1130     row->new = xmalloc(class->n_columns * sizeof *row->new);
1131     row->written = bitmap_allocate(class->n_columns);
1132     hmap_insert(&txn->txn_rows, &row->txn_node, uuid_hash(&row->uuid));
1133     return row;
1134 }
1135
1136 static void
1137 ovsdb_idl_txn_abort_all(struct ovsdb_idl *idl)
1138 {
1139     struct ovsdb_idl_txn *txn;
1140
1141     HMAP_FOR_EACH (txn, struct ovsdb_idl_txn, hmap_node,
1142                    &idl->outstanding_txns) {
1143         ovsdb_idl_txn_complete(txn, TXN_TRY_AGAIN);
1144     }
1145 }
1146
1147 static struct ovsdb_idl_txn *
1148 ovsdb_idl_txn_find(struct ovsdb_idl *idl, const struct json *id)
1149 {
1150     struct ovsdb_idl_txn *txn;
1151
1152     HMAP_FOR_EACH_WITH_HASH (txn, struct ovsdb_idl_txn, hmap_node,
1153                              json_hash(id, 0), &idl->outstanding_txns) {
1154         if (json_equal(id, txn->request_id)) {
1155             return txn;
1156         }
1157     }
1158     return NULL;
1159 }
1160
1161 static bool
1162 ovsdb_idl_txn_process_reply(struct ovsdb_idl *idl,
1163                             const struct jsonrpc_msg *msg)
1164 {
1165     struct ovsdb_idl_txn *txn;
1166     enum ovsdb_idl_txn_status status;
1167
1168     txn = ovsdb_idl_txn_find(idl, msg->id);
1169     if (!txn) {
1170         return false;
1171     }
1172
1173     if (msg->type == JSONRPC_ERROR) {
1174         status = TXN_ERROR;
1175     } else if (msg->result->type != JSON_ARRAY) {
1176         VLOG_WARN_RL(&syntax_rl, "reply to \"transact\" is not JSON array");
1177         status = TXN_ERROR;
1178     } else {
1179         int hard_errors = 0;
1180         int soft_errors = 0;
1181         size_t i;
1182
1183         for (i = 0; i < msg->result->u.array.n; i++) {
1184             struct json *json = msg->result->u.array.elems[i];
1185
1186             if (json->type == JSON_NULL) {
1187                 /* This isn't an error in itself but indicates that some prior
1188                  * operation failed, so make sure that we know about it. */
1189                 soft_errors++;
1190             } else if (json->type == JSON_OBJECT) {
1191                 struct json *error;
1192
1193                 error = shash_find_data(json_object(json), "error");
1194                 if (error) {
1195                     if (error->type == JSON_STRING) {
1196                         if (!strcmp(error->u.string, "timed out")) {
1197                             soft_errors++;
1198                         } else {
1199                             hard_errors++;
1200                         }
1201                     } else {
1202                         hard_errors++;
1203                         VLOG_WARN_RL(&syntax_rl,
1204                                      "\"error\" in reply is not JSON string");
1205                     }
1206                 }
1207             } else {
1208                 hard_errors++;
1209                 VLOG_WARN_RL(&syntax_rl,
1210                              "operation reply is not JSON null or object");
1211             }
1212         }
1213
1214         status = (hard_errors ? TXN_ERROR
1215                   : soft_errors ? TXN_TRY_AGAIN
1216                   : TXN_SUCCESS);
1217     }
1218
1219     ovsdb_idl_txn_complete(txn, status);
1220     return true;
1221 }