ovsdb-idl: Optimize lookup of struct idl_table from struct idl_table_class.
[openvswitch] / lib / ovsdb-idl.c
1 /* Copyright (c) 2009 Nicira Networks.
2  *
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at:
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15
16 #include <config.h>
17
18 #include "ovsdb-idl.h"
19
20 #include <assert.h>
21 #include <limits.h>
22 #include <stdlib.h>
23
24 #include "json.h"
25 #include "jsonrpc.h"
26 #include "ovsdb-data.h"
27 #include "ovsdb-error.h"
28 #include "ovsdb-idl-provider.h"
29 #include "shash.h"
30 #include "util.h"
31
32 #define THIS_MODULE VLM_ovsdb_idl
33 #include "vlog.h"
34
35 /* An arc from one idl_row to another.  When row A contains a UUID that
36  * references row B, this is represented by an arc from A (the source) to B
37  * (the destination).
38  *
39  * Arcs from a row to itself are omitted, that is, src and dst are always
40  * different.
41  *
42  * Arcs are never duplicated, that is, even if there are multiple references
43  * from A to B, there is only a single arc from A to B.
44  *
45  * Arcs are directed: an arc from A to B is the converse of an an arc from B to
46  * A.  Both an arc and its converse may both be present, if each row refers
47  * to the other circularly.
48  *
49  * The source and destination row may be in the same table or in different
50  * tables.
51  */
52 struct ovsdb_idl_arc {
53     struct list src_node;       /* In src->src_arcs list. */
54     struct list dst_node;       /* In dst->dst_arcs list. */
55     struct ovsdb_idl_row *src;  /* Source row. */
56     struct ovsdb_idl_row *dst;  /* Destination row. */
57 };
58
59 struct ovsdb_idl {
60     const struct ovsdb_idl_class *class;
61     struct jsonrpc_session *session;
62     struct shash table_by_name;
63     struct ovsdb_idl_table *tables;
64     struct json *monitor_request_id;
65     unsigned int last_monitor_request_seqno;
66     unsigned int change_seqno;
67 };
68
69 static struct vlog_rate_limit syntax_rl = VLOG_RATE_LIMIT_INIT(1, 5);
70 static struct vlog_rate_limit semantic_rl = VLOG_RATE_LIMIT_INIT(1, 5);
71
72 static void ovsdb_idl_clear(struct ovsdb_idl *);
73 static void ovsdb_idl_send_monitor_request(struct ovsdb_idl *);
74 static void ovsdb_idl_parse_update(struct ovsdb_idl *, const struct json *);
75 static struct ovsdb_error *ovsdb_idl_parse_update__(struct ovsdb_idl *,
76                                                     const struct json *);
77 static void ovsdb_idl_process_update(struct ovsdb_idl_table *,
78                                      const struct uuid *,
79                                      const struct json *old,
80                                      const struct json *new);
81 static void ovsdb_idl_insert_row(struct ovsdb_idl_row *, const struct json *);
82 static void ovsdb_idl_delete_row(struct ovsdb_idl_row *);
83 static void ovsdb_idl_modify_row(struct ovsdb_idl_row *, const struct json *);
84
85 static bool ovsdb_idl_row_is_orphan(const struct ovsdb_idl_row *);
86 static struct ovsdb_idl_row *ovsdb_idl_row_create(struct ovsdb_idl_table *,
87                                                   const struct uuid *);
88 static void ovsdb_idl_row_destroy(struct ovsdb_idl_row *);
89
90 static void ovsdb_idl_row_clear_fields(struct ovsdb_idl_row *);
91
92 struct ovsdb_idl *
93 ovsdb_idl_create(const char *remote, const struct ovsdb_idl_class *class)
94 {
95     struct ovsdb_idl *idl;
96     size_t i;
97
98     idl = xzalloc(sizeof *idl);
99     idl->class = class;
100     idl->session = jsonrpc_session_open(remote);
101     shash_init(&idl->table_by_name);
102     idl->tables = xmalloc(class->n_tables * sizeof *idl->tables);
103     for (i = 0; i < class->n_tables; i++) {
104         const struct ovsdb_idl_table_class *tc = &class->tables[i];
105         struct ovsdb_idl_table *table = &idl->tables[i];
106         size_t j;
107
108         assert(!shash_find(&idl->table_by_name, tc->name));
109         shash_add(&idl->table_by_name, tc->name, table);
110         table->class = tc;
111         shash_init(&table->columns);
112         for (j = 0; j < tc->n_columns; j++) {
113             const struct ovsdb_idl_column *column = &tc->columns[j];
114
115             assert(!shash_find(&table->columns, column->name));
116             shash_add(&table->columns, column->name, column);
117         }
118         hmap_init(&table->rows);
119         table->idl = idl;
120     }
121     idl->last_monitor_request_seqno = UINT_MAX;
122
123     return idl;
124 }
125
126 void
127 ovsdb_idl_destroy(struct ovsdb_idl *idl)
128 {
129     if (idl) {
130         size_t i;
131
132         ovsdb_idl_clear(idl);
133         jsonrpc_session_close(idl->session);
134
135         for (i = 0; i < idl->class->n_tables; i++) {
136             struct ovsdb_idl_table *table = &idl->tables[i];
137             shash_destroy(&table->columns);
138             hmap_destroy(&table->rows);
139         }
140         shash_destroy(&idl->table_by_name);
141         free(idl->tables);
142         json_destroy(idl->monitor_request_id);
143         free(idl);
144     }
145 }
146
147 static void
148 ovsdb_idl_clear(struct ovsdb_idl *idl)
149 {
150     bool changed = false;
151     size_t i;
152
153     for (i = 0; i < idl->class->n_tables; i++) {
154         struct ovsdb_idl_table *table = &idl->tables[i];
155         struct ovsdb_idl_row *row, *next_row;
156
157         if (hmap_is_empty(&table->rows)) {
158             continue;
159         }
160
161         changed = true;
162         HMAP_FOR_EACH_SAFE (row, next_row, struct ovsdb_idl_row, hmap_node,
163                             &table->rows) {
164             struct ovsdb_idl_arc *arc, *next_arc;
165
166             if (!ovsdb_idl_row_is_orphan(row)) {
167                 (row->table->class->unparse)(row);
168                 ovsdb_idl_row_clear_fields(row);
169             }
170             hmap_remove(&table->rows, &row->hmap_node);
171             LIST_FOR_EACH_SAFE (arc, next_arc, struct ovsdb_idl_arc, src_node,
172                                 &row->src_arcs) {
173                 free(arc);
174             }
175             /* No need to do anything with dst_arcs: some node has those arcs
176              * as forward arcs and will destroy them itself. */
177
178             free(row);
179         }
180     }
181
182     if (changed) {
183         idl->change_seqno++;
184     }
185 }
186
187 void
188 ovsdb_idl_run(struct ovsdb_idl *idl)
189 {
190     int i;
191
192     jsonrpc_session_run(idl->session);
193     for (i = 0; jsonrpc_session_is_connected(idl->session) && i < 50; i++) {
194         struct jsonrpc_msg *msg, *reply;
195         unsigned int seqno;
196
197         seqno = jsonrpc_session_get_seqno(idl->session);
198         if (idl->last_monitor_request_seqno != seqno) {
199             idl->last_monitor_request_seqno = seqno;
200             ovsdb_idl_send_monitor_request(idl);
201             break;
202         }
203
204         msg = jsonrpc_session_recv(idl->session);
205         if (!msg) {
206             break;
207         }
208
209         reply = NULL;
210         if (msg->type == JSONRPC_REQUEST && !strcmp(msg->method, "echo")) {
211             reply = jsonrpc_create_reply(json_clone(msg->params), msg->id);
212         } else if (msg->type == JSONRPC_NOTIFY
213                    && !strcmp(msg->method, "update")
214                    && msg->params->type == JSON_ARRAY
215                    && msg->params->u.array.n == 2
216                    && msg->params->u.array.elems[0]->type == JSON_NULL) {
217             ovsdb_idl_parse_update(idl, msg->params->u.array.elems[1]);
218         } else if (msg->type == JSONRPC_REPLY
219                    && idl->monitor_request_id
220                    && json_equal(idl->monitor_request_id, msg->id)) {
221             json_destroy(idl->monitor_request_id);
222             idl->monitor_request_id = NULL;
223             ovsdb_idl_clear(idl);
224             ovsdb_idl_parse_update(idl, msg->result);
225         } else if (msg->type == JSONRPC_REPLY
226                    && msg->id && msg->id->type == JSON_STRING
227                    && !strcmp(msg->id->u.string, "echo")) {
228             /* It's a reply to our echo request.  Ignore it. */
229         } else {
230             VLOG_WARN("%s: received unexpected %s message",
231                       jsonrpc_session_get_name(idl->session),
232                       jsonrpc_msg_type_to_string(msg->type));
233             jsonrpc_session_force_reconnect(idl->session);
234         }
235         if (reply) {
236             jsonrpc_session_send(idl->session, reply);
237         }
238         jsonrpc_msg_destroy(msg);
239     }
240 }
241
242 void
243 ovsdb_idl_wait(struct ovsdb_idl *idl)
244 {
245     jsonrpc_session_wait(idl->session);
246     jsonrpc_session_recv_wait(idl->session);
247 }
248
249 unsigned int
250 ovsdb_idl_get_seqno(const struct ovsdb_idl *idl)
251 {
252     return idl->change_seqno;
253 }
254
255 void
256 ovsdb_idl_force_reconnect(struct ovsdb_idl *idl)
257 {
258     jsonrpc_session_force_reconnect(idl->session);
259 }
260 \f
261 static void
262 ovsdb_idl_send_monitor_request(struct ovsdb_idl *idl)
263 {
264     struct json *monitor_requests;
265     struct jsonrpc_msg *msg;
266     size_t i;
267
268     monitor_requests = json_object_create();
269     for (i = 0; i < idl->class->n_tables; i++) {
270         const struct ovsdb_idl_table *table = &idl->tables[i];
271         const struct ovsdb_idl_table_class *tc = table->class;
272         struct json *monitor_request, *columns;
273         size_t i;
274
275         monitor_request = json_object_create();
276         columns = json_array_create_empty();
277         for (i = 0; i < tc->n_columns; i++) {
278             const struct ovsdb_idl_column *column = &tc->columns[i];
279             json_array_add(columns, json_string_create(column->name));
280         }
281         json_object_put(monitor_request, "columns", columns);
282         json_object_put(monitor_requests, tc->name, monitor_request);
283     }
284
285     json_destroy(idl->monitor_request_id);
286     msg = jsonrpc_create_request(
287         "monitor", json_array_create_2(json_null_create(), monitor_requests),
288         &idl->monitor_request_id);
289     jsonrpc_session_send(idl->session, msg);
290 }
291
292 static void
293 ovsdb_idl_parse_update(struct ovsdb_idl *idl, const struct json *table_updates)
294 {
295     struct ovsdb_error *error;
296
297     idl->change_seqno++;
298
299     error = ovsdb_idl_parse_update__(idl, table_updates);
300     if (error) {
301         if (!VLOG_DROP_WARN(&syntax_rl)) {
302             char *s = ovsdb_error_to_string(error);
303             VLOG_WARN_RL(&syntax_rl, "%s", s);
304             free(s);
305         }
306         ovsdb_error_destroy(error);
307     }
308 }
309
310 static struct ovsdb_error *
311 ovsdb_idl_parse_update__(struct ovsdb_idl *idl,
312                          const struct json *table_updates)
313 {
314     const struct shash_node *tables_node;
315
316     if (table_updates->type != JSON_OBJECT) {
317         return ovsdb_syntax_error(table_updates, NULL,
318                                   "<table-updates> is not an object");
319     }
320     SHASH_FOR_EACH (tables_node, json_object(table_updates)) {
321         const struct json *table_update = tables_node->data;
322         const struct shash_node *table_node;
323         struct ovsdb_idl_table *table;
324
325         table = shash_find_data(&idl->table_by_name, tables_node->name);
326         if (!table) {
327             return ovsdb_syntax_error(
328                 table_updates, NULL,
329                 "<table-updates> includes unknown table \"%s\"",
330                 tables_node->name);
331         }
332
333         if (table_update->type != JSON_OBJECT) {
334             return ovsdb_syntax_error(table_update, NULL,
335                                       "<table-update> for table \"%s\" is "
336                                       "not an object", table->class->name);
337         }
338         SHASH_FOR_EACH (table_node, json_object(table_update)) {
339             const struct json *row_update = table_node->data;
340             const struct json *old_json, *new_json;
341             struct uuid uuid;
342
343             if (!uuid_from_string(&uuid, table_node->name)) {
344                 return ovsdb_syntax_error(table_update, NULL,
345                                           "<table-update> for table \"%s\" "
346                                           "contains bad UUID "
347                                           "\"%s\" as member name",
348                                           table->class->name,
349                                           table_node->name);
350             }
351             if (row_update->type != JSON_OBJECT) {
352                 return ovsdb_syntax_error(row_update, NULL,
353                                           "<table-update> for table \"%s\" "
354                                           "contains <row-update> for %s that "
355                                           "is not an object",
356                                           table->class->name,
357                                           table_node->name);
358             }
359
360             old_json = shash_find_data(json_object(row_update), "old");
361             new_json = shash_find_data(json_object(row_update), "new");
362             if (old_json && old_json->type != JSON_OBJECT) {
363                 return ovsdb_syntax_error(old_json, NULL,
364                                           "\"old\" <row> is not object");
365             } else if (new_json && new_json->type != JSON_OBJECT) {
366                 return ovsdb_syntax_error(new_json, NULL,
367                                           "\"new\" <row> is not object");
368             } else if ((old_json != NULL) + (new_json != NULL)
369                        != shash_count(json_object(row_update))) {
370                 return ovsdb_syntax_error(row_update, NULL,
371                                           "<row-update> contains unexpected "
372                                           "member");
373             } else if (!old_json && !new_json) {
374                 return ovsdb_syntax_error(row_update, NULL,
375                                           "<row-update> missing \"old\" "
376                                           "and \"new\" members");
377             }
378
379             ovsdb_idl_process_update(table, &uuid, old_json, new_json);
380         }
381     }
382
383     return NULL;
384 }
385
386 static struct ovsdb_idl_row *
387 ovsdb_idl_get_row(struct ovsdb_idl_table *table, const struct uuid *uuid)
388 {
389     struct ovsdb_idl_row *row;
390
391     HMAP_FOR_EACH_WITH_HASH (row, struct ovsdb_idl_row, hmap_node,
392                              uuid_hash(uuid), &table->rows) {
393         if (uuid_equals(&row->uuid, uuid)) {
394             return row;
395         }
396     }
397     return NULL;
398 }
399
400 static void
401 ovsdb_idl_process_update(struct ovsdb_idl_table *table,
402                          const struct uuid *uuid, const struct json *old,
403                          const struct json *new)
404 {
405     struct ovsdb_idl_row *row;
406
407     row = ovsdb_idl_get_row(table, uuid);
408     if (!new) {
409         /* Delete row. */
410         if (row && !ovsdb_idl_row_is_orphan(row)) {
411             /* XXX perhaps we should check the 'old' values? */
412             ovsdb_idl_delete_row(row);
413         } else {
414             VLOG_WARN_RL(&semantic_rl, "cannot delete missing row "UUID_FMT" "
415                          "from table %s",
416                          UUID_ARGS(uuid), table->class->name);
417         }
418     } else if (!old) {
419         /* Insert row. */
420         if (!row) {
421             ovsdb_idl_insert_row(ovsdb_idl_row_create(table, uuid), new);
422         } else if (ovsdb_idl_row_is_orphan(row)) {
423             ovsdb_idl_insert_row(row, new);
424         } else {
425             VLOG_WARN_RL(&semantic_rl, "cannot add existing row "UUID_FMT" to "
426                          "table %s", UUID_ARGS(uuid), table->class->name);
427             ovsdb_idl_modify_row(row, new);
428         }
429     } else {
430         /* Modify row. */
431         if (row) {
432             /* XXX perhaps we should check the 'old' values? */
433             if (!ovsdb_idl_row_is_orphan(row)) {
434                 ovsdb_idl_modify_row(row, new);
435             } else {
436                 VLOG_WARN_RL(&semantic_rl, "cannot modify missing but "
437                              "referenced row "UUID_FMT" in table %s",
438                              UUID_ARGS(uuid), table->class->name);
439                 ovsdb_idl_insert_row(row, new);
440             }
441         } else {
442             VLOG_WARN_RL(&semantic_rl, "cannot modify missing row "UUID_FMT" "
443                          "in table %s", UUID_ARGS(uuid), table->class->name);
444             ovsdb_idl_insert_row(ovsdb_idl_row_create(table, uuid), new);
445         }
446     }
447 }
448
449 static void
450 ovsdb_idl_row_update(struct ovsdb_idl_row *row, const struct json *row_json)
451 {
452     struct ovsdb_idl_table *table = row->table;
453     struct shash_node *node;
454
455     SHASH_FOR_EACH (node, json_object(row_json)) {
456         const char *column_name = node->name;
457         const struct ovsdb_idl_column *column;
458         struct ovsdb_datum datum;
459         struct ovsdb_error *error;
460
461         column = shash_find_data(&table->columns, column_name);
462         if (!column) {
463             VLOG_WARN_RL(&syntax_rl, "unknown column %s updating row "UUID_FMT,
464                          column_name, UUID_ARGS(&row->uuid));
465             continue;
466         }
467
468         error = ovsdb_datum_from_json(&datum, &column->type, node->data, NULL);
469         if (!error) {
470             ovsdb_datum_swap(&row->fields[column - table->class->columns],
471                              &datum);
472             ovsdb_datum_destroy(&datum, &column->type);
473         } else {
474             char *s = ovsdb_error_to_string(error);
475             VLOG_WARN_RL(&syntax_rl, "error parsing column %s in row "UUID_FMT
476                          " in table %s: %s", column_name,
477                          UUID_ARGS(&row->uuid), table->class->name, s);
478             free(s);
479             ovsdb_error_destroy(error);
480         }
481     }
482 }
483
484 static bool
485 ovsdb_idl_row_is_orphan(const struct ovsdb_idl_row *row)
486 {
487     return !row->fields;
488 }
489
490 static void
491 ovsdb_idl_row_clear_fields(struct ovsdb_idl_row *row)
492 {
493     if (!ovsdb_idl_row_is_orphan(row)) {
494         const struct ovsdb_idl_table_class *class = row->table->class;
495         size_t i;
496
497         for (i = 0; i < class->n_columns; i++) {
498             ovsdb_datum_destroy(&row->fields[i], &class->columns[i].type);
499         }
500         row->fields = NULL;
501     }
502 }
503
504 static void
505 ovsdb_idl_row_clear_arcs(struct ovsdb_idl_row *row, bool destroy_dsts)
506 {
507     struct ovsdb_idl_arc *arc, *next;
508
509     /* Delete all forward arcs.  If 'destroy_dsts', destroy any orphaned rows
510      * that this causes to be unreferenced. */
511     LIST_FOR_EACH_SAFE (arc, next, struct ovsdb_idl_arc, src_node,
512                         &row->src_arcs) {
513         list_remove(&arc->dst_node);
514         if (destroy_dsts
515             && ovsdb_idl_row_is_orphan(arc->dst)
516             && list_is_empty(&arc->dst->dst_arcs)) {
517             ovsdb_idl_row_destroy(arc->dst);
518         }
519         free(arc);
520     }
521     list_init(&row->src_arcs);
522 }
523
524 /* Force nodes that reference 'row' to reparse. */
525 static void
526 ovsdb_idl_row_reparse_backrefs(struct ovsdb_idl_row *row, bool destroy_dsts)
527 {
528     struct ovsdb_idl_arc *arc, *next;
529
530     /* This is trickier than it looks.  ovsdb_idl_row_clear_arcs() will destroy
531      * 'arc', so we need to use the "safe" variant of list traversal.  However,
532      * calling ref->table->class->parse will add an arc equivalent to 'arc' to
533      * row->arcs.  That could be a problem for traversal, but it adds it at the
534      * beginning of the list to prevent us from stumbling upon it again.
535      *
536      * (If duplicate arcs were possible then we would need to make sure that
537      * 'next' didn't also point into 'arc''s destination, but we forbid
538      * duplicate arcs.) */
539     LIST_FOR_EACH_SAFE (arc, next, struct ovsdb_idl_arc, dst_node,
540                         &row->dst_arcs) {
541         struct ovsdb_idl_row *ref = arc->src;
542
543         (ref->table->class->unparse)(ref);
544         ovsdb_idl_row_clear_arcs(ref, destroy_dsts);
545         (ref->table->class->parse)(ref);
546     }
547 }
548
549 static struct ovsdb_idl_row *
550 ovsdb_idl_row_create(struct ovsdb_idl_table *table, const struct uuid *uuid)
551 {
552     struct ovsdb_idl_row *row = xmalloc(table->class->allocation_size);
553     hmap_insert(&table->rows, &row->hmap_node, uuid_hash(uuid));
554     row->uuid = *uuid;
555     list_init(&row->src_arcs);
556     list_init(&row->dst_arcs);
557     row->table = table;
558     row->fields = NULL;
559     return row;
560 }
561
562 static void
563 ovsdb_idl_row_destroy(struct ovsdb_idl_row *row)
564 {
565     if (row) {
566         ovsdb_idl_row_clear_fields(row);
567         hmap_remove(&row->table->rows, &row->hmap_node);
568         free(row);
569     }
570 }
571
572 static void
573 ovsdb_idl_insert_row(struct ovsdb_idl_row *row, const struct json *row_json)
574 {
575     const struct ovsdb_idl_table_class *class = row->table->class;
576     size_t i;
577
578     assert(!row->fields);
579     row->fields = xmalloc(class->n_columns * sizeof *row->fields);
580     for (i = 0; i < class->n_columns; i++) {
581         ovsdb_datum_init_default(&row->fields[i], &class->columns[i].type);
582     }
583     ovsdb_idl_row_update(row, row_json);
584     (class->parse)(row);
585
586     ovsdb_idl_row_reparse_backrefs(row, false);
587 }
588
589 static void
590 ovsdb_idl_delete_row(struct ovsdb_idl_row *row)
591 {
592     (row->table->class->unparse)(row);
593     ovsdb_idl_row_clear_arcs(row, true);
594     ovsdb_idl_row_clear_fields(row);
595     if (list_is_empty(&row->dst_arcs)) {
596         ovsdb_idl_row_destroy(row);
597     } else {
598         ovsdb_idl_row_reparse_backrefs(row, true);
599     }
600 }
601
602 static void
603 ovsdb_idl_modify_row(struct ovsdb_idl_row *row, const struct json *row_json)
604 {
605     (row->table->class->unparse)(row);
606     ovsdb_idl_row_clear_arcs(row, true);
607     ovsdb_idl_row_update(row, row_json);
608     (row->table->class->parse)(row);
609 }
610
611 static bool
612 may_add_arc(const struct ovsdb_idl_row *src, const struct ovsdb_idl_row *dst)
613 {
614     const struct ovsdb_idl_arc *arc;
615
616     /* No self-arcs. */
617     if (src == dst) {
618         return false;
619     }
620
621     /* No duplicate arcs.
622      *
623      * We only need to test whether the first arc in dst->dst_arcs originates
624      * at 'src', since we add all of the arcs from a given source in a clump
625      * (in a single call to a row's ->parse function) and new arcs are always
626      * added at the front of the dst_arcs list. */
627     if (list_is_empty(&dst->dst_arcs)) {
628         return true;
629     }
630     arc = CONTAINER_OF(dst->dst_arcs.next, struct ovsdb_idl_arc, dst_node);
631     return arc->src != src;
632 }
633
634 static struct ovsdb_idl_table *
635 ovsdb_table_from_class(const struct ovsdb_idl *idl,
636                        const struct ovsdb_idl_table_class *table_class)
637 {
638     return &idl->tables[table_class - idl->class->tables];
639 }
640
641 struct ovsdb_idl_row *
642 ovsdb_idl_get_row_arc(struct ovsdb_idl_row *src,
643                       struct ovsdb_idl_table_class *dst_table_class,
644                       const struct uuid *dst_uuid)
645 {
646     struct ovsdb_idl *idl = src->table->idl;
647     struct ovsdb_idl_table *dst_table;
648     struct ovsdb_idl_arc *arc;
649     struct ovsdb_idl_row *dst;
650
651     dst_table = ovsdb_table_from_class(idl, dst_table_class);
652     dst = ovsdb_idl_get_row(dst_table, dst_uuid);
653     if (!dst) {
654         dst = ovsdb_idl_row_create(dst_table, dst_uuid);
655     }
656
657     /* Add a new arc, if it wouldn't be a self-arc or a duplicate arc. */
658     if (may_add_arc(src, dst)) {
659         /* The arc *must* be added at the front of the dst_arcs list.  See
660          * ovsdb_idl_row_reparse_backrefs() for details. */
661         arc = xmalloc(sizeof *arc);
662         list_push_front(&src->src_arcs, &arc->src_node);
663         list_push_front(&dst->dst_arcs, &arc->dst_node);
664         arc->src = src;
665         arc->dst = dst;
666     }
667
668     return !ovsdb_idl_row_is_orphan(dst) ? dst : NULL;
669 }
670
671 static struct ovsdb_idl_row *
672 next_real_row(struct ovsdb_idl_table *table, struct hmap_node *node)
673 {
674     for (; node; node = hmap_next(&table->rows, node)) {
675         struct ovsdb_idl_row *row;
676
677         row = CONTAINER_OF(node, struct ovsdb_idl_row, hmap_node);
678         if (!ovsdb_idl_row_is_orphan(row)) {
679             return row;
680         }
681     }
682     return NULL;
683 }
684
685 struct ovsdb_idl_row *
686 ovsdb_idl_first_row(const struct ovsdb_idl *idl,
687                     const struct ovsdb_idl_table_class *table_class)
688 {
689     struct ovsdb_idl_table *table = ovsdb_table_from_class(idl, table_class);
690     return next_real_row(table, hmap_first(&table->rows));
691 }
692
693 struct ovsdb_idl_row *
694 ovsdb_idl_next_row(const struct ovsdb_idl_row *row)
695 {
696     struct ovsdb_idl_table *table = row->table;
697
698     return next_real_row(table, hmap_next(&table->rows, &row->hmap_node));
699 }