ovsdb: Introduce for_each_txn_row() iterator function for transactions.
authorBen Pfaff <blp@nicira.com>
Wed, 17 Mar 2010 18:16:07 +0000 (11:16 -0700)
committerBen Pfaff <blp@nicira.com>
Wed, 17 Mar 2010 21:24:55 +0000 (14:24 -0700)
A number of places in the transaction code want to iterate over all of
the txn_rows and possibly modify them.  It is getting messy to duplicate
the code to do this everywhere.  This commit introduces a new function that
encapsulates the iteration logic, efficiently handling modifications made
by the callback function.

Upcoming commits will add more uses of this iterator function.

ovsdb/transaction.c

index 154f811b4981ce9095cf035693c0695f2814348e..f7349c936fa1430d1622ee77c5f2a80c65a9b967 100644 (file)
@@ -41,6 +41,10 @@ struct ovsdb_txn_table {
     struct list node;           /* Element in ovsdb_txn's txn_tables list. */
     struct ovsdb_table *table;
     struct hmap txn_rows;       /* Contains "struct ovsdb_txn_row"s. */
+
+    /* Used by for_each_txn_row(). */
+    unsigned int serial;        /* Serial number of in-progress iteration. */
+    unsigned int n_processed;   /* Number of rows processed. */
 };
 
 /* A row modified by the transaction:
@@ -60,8 +64,21 @@ struct ovsdb_txn_row {
     struct hmap_node hmap_node; /* In ovsdb_txn_table's txn_rows hmap. */
     struct ovsdb_row *old;      /* The old row. */
     struct ovsdb_row *new;      /* The new row. */
+
+    /* Used by for_each_txn_row(). */
+    unsigned int serial;        /* Serial number of in-progress commit. */
 };
 
+static void ovsdb_txn_row_prefree(struct ovsdb_txn_row *);
+static struct ovsdb_error * WARN_UNUSED_RESULT
+for_each_txn_row(struct ovsdb_txn *txn,
+                      struct ovsdb_error *(*)(struct ovsdb_txn *,
+                                              struct ovsdb_txn_row *));
+
+/* Used by for_each_txn_row() to track tables and rows that have been
+ * processed.  */
+static unsigned int serial;
+
 struct ovsdb_txn *
 ovsdb_txn_create(struct ovsdb *db)
 {
@@ -73,42 +90,21 @@ ovsdb_txn_create(struct ovsdb *db)
 }
 
 static void
-ovsdb_txn_destroy(struct ovsdb_txn *txn, void (*cb)(struct ovsdb_txn_row *))
+ovsdb_txn_free(struct ovsdb_txn *txn)
 {
-    struct ovsdb_txn_table *txn_table, *next_txn_table;
-
-    LIST_FOR_EACH_SAFE (txn_table, next_txn_table,
-                        struct ovsdb_txn_table, node, &txn->txn_tables) {
-        struct ovsdb_txn_row *txn_row, *next_txn_row;
-
-        HMAP_FOR_EACH_SAFE (txn_row, next_txn_row,
-                            struct ovsdb_txn_row, hmap_node,
-                            &txn_table->txn_rows)
-        {
-            if (txn_row->old) {
-                txn_row->old->txn_row = NULL;
-            }
-            if (txn_row->new) {
-                txn_row->new->txn_row = NULL;
-            }
-            cb(txn_row);
-            free(txn_row);
-        }
-
-        txn_table->table->txn_table = NULL;
-        hmap_destroy(&txn_table->txn_rows);
-        free(txn_table);
-    }
+    assert(list_is_empty(&txn->txn_tables));
     ds_destroy(&txn->comment);
     free(txn);
 }
 
-static void
-ovsdb_txn_row_abort(struct ovsdb_txn_row *txn_row)
+static struct ovsdb_error * WARN_UNUSED_RESULT
+ovsdb_txn_row_abort(struct ovsdb_txn *txn OVS_UNUSED,
+                    struct ovsdb_txn_row *txn_row)
 {
     struct ovsdb_row *old = txn_row->old;
     struct ovsdb_row *new = txn_row->new;
 
+    ovsdb_txn_row_prefree(txn_row);
     if (!old) {
         hmap_remove(&new->table->rows, &new->hmap_node);
     } else if (!new) {
@@ -117,18 +113,16 @@ ovsdb_txn_row_abort(struct ovsdb_txn_row *txn_row)
         hmap_replace(&new->table->rows, &new->hmap_node, &old->hmap_node);
     }
     ovsdb_row_destroy(new);
+    free(txn_row);
+
+    return NULL;
 }
 
 void
 ovsdb_txn_abort(struct ovsdb_txn *txn)
 {
-    ovsdb_txn_destroy(txn, ovsdb_txn_row_abort);
-}
-
-static void
-ovsdb_txn_row_commit(struct ovsdb_txn_row *txn_row)
-{
-    ovsdb_row_destroy(txn_row->old);
+    ovsdb_error_assert(for_each_txn_row(txn, ovsdb_txn_row_abort));
+    ovsdb_txn_free(txn);
 }
 
 static struct ovsdb_txn_row *
@@ -282,6 +276,17 @@ update_ref_counts(struct ovsdb_txn *txn)
     return NULL;
 }
 
+static struct ovsdb_error * WARN_UNUSED_RESULT
+ovsdb_txn_row_commit(struct ovsdb_txn *txn OVS_UNUSED,
+                     struct ovsdb_txn_row *txn_row)
+{
+    ovsdb_txn_row_prefree(txn_row);
+    ovsdb_row_destroy(txn_row->old);
+    free(txn_row);
+
+    return NULL;
+}
+
 struct ovsdb_error *
 ovsdb_txn_commit(struct ovsdb_txn *txn, bool durable)
 {
@@ -306,8 +311,11 @@ ovsdb_txn_commit(struct ovsdb_txn *txn, bool durable)
         }
     }
 
+    /* Finalize commit. */
     txn->db->run_triggers = true;
-    ovsdb_txn_destroy(txn, ovsdb_txn_row_commit);
+    ovsdb_error_assert(for_each_txn_row(txn, ovsdb_txn_row_commit));
+    ovsdb_txn_free(txn);
+
     return NULL;
 }
 
@@ -336,6 +344,7 @@ ovsdb_txn_create_txn_table(struct ovsdb_txn *txn, struct ovsdb_table *table)
         table->txn_table = txn_table = xmalloc(sizeof *table->txn_table);
         txn_table->table = table;
         hmap_init(&txn_table->txn_rows);
+        txn_table->serial = serial - 1;
         list_push_back(&txn->txn_tables, &txn_table->node);
     }
     return table->txn_table;
@@ -352,6 +361,7 @@ ovsdb_txn_row_create(struct ovsdb_txn *txn, struct ovsdb_table *table,
     txn_row = xmalloc(sizeof *txn_row);
     txn_row->old = old;
     txn_row->new = new;
+    txn_row->serial = serial - 1;
 
     if (old) {
         old->txn_row = txn_row;
@@ -440,3 +450,91 @@ ovsdb_txn_get_comment(const struct ovsdb_txn *txn)
 {
     return txn->comment.length ? ds_cstr_ro(&txn->comment) : NULL;
 }
+\f
+static void
+ovsdb_txn_row_prefree(struct ovsdb_txn_row *txn_row)
+{
+    struct ovsdb_row *row = txn_row->old ? txn_row->old : txn_row->new;
+    struct ovsdb_txn_table *txn_table = row->table->txn_table;
+
+    txn_table->n_processed--;
+    hmap_remove(&txn_table->txn_rows, &txn_row->hmap_node);
+
+    if (txn_row->old) {
+        txn_row->old->txn_row = NULL;
+    }
+    if (txn_row->new) {
+        txn_row->new->txn_row = NULL;
+    }
+}
+
+static void
+ovsdb_txn_table_destroy(struct ovsdb_txn_table *txn_table)
+{
+    assert(hmap_is_empty(&txn_table->txn_rows));
+    txn_table->table->txn_table = NULL;
+    hmap_destroy(&txn_table->txn_rows);
+    list_remove(&txn_table->node);
+    free(txn_table);
+}
+
+/* Calls 'cb' for every txn_row within 'txn'.  If 'cb' returns nonnull, this
+ * aborts the iteration and for_each_txn_row() passes the error up.  Otherwise,
+ * returns a null pointer after iteration is complete.
+ *
+ * 'cb' may insert new txn_rows and new txn_tables into 'txn'.  It may delete
+ * the txn_row that it is passed in, or txn_rows in txn_tables other than the
+ * one passed to 'cb'.  It may *not* delete txn_rows other than the one passed
+ * in within the same txn_table.  It may *not* delete any txn_tables.  As long
+ * as these rules are followed, 'cb' will be called exactly once for each
+ * txn_row in 'txn', even those added by 'cb'.
+ */
+static struct ovsdb_error * WARN_UNUSED_RESULT
+for_each_txn_row(struct ovsdb_txn *txn,
+                 struct ovsdb_error *(*cb)(struct ovsdb_txn *,
+                                           struct ovsdb_txn_row *))
+{
+    bool any_work;
+
+    serial++;
+
+    do {
+        struct ovsdb_txn_table *t, *next_txn_table;
+
+        any_work = false;
+        LIST_FOR_EACH_SAFE (t, next_txn_table, struct ovsdb_txn_table, node,
+                            &txn->txn_tables) {
+            if (t->serial != serial) {
+                t->serial = serial;
+                t->n_processed = 0;
+            }
+
+            while (t->n_processed < hmap_count(&t->txn_rows)) {
+                struct ovsdb_txn_row *r, *next_txn_row;
+
+                HMAP_FOR_EACH_SAFE (r, next_txn_row,
+                                    struct ovsdb_txn_row, hmap_node,
+                                    &t->txn_rows) {
+                    if (r->serial != serial) {
+                        struct ovsdb_error *error;
+
+                        r->serial = serial;
+                        t->n_processed++;
+                        any_work = true;
+
+                        error = cb(txn, r);
+                        if (error) {
+                            return error;
+                        }
+                    }
+                }
+            }
+            if (hmap_is_empty(&t->txn_rows)) {
+                /* Table is empty.  Drop it. */
+                ovsdb_txn_table_destroy(t);
+            }
+        }
+    } while (any_work);
+
+    return NULL;
+}