ovsdb: Simplify referential integrity checking commit logic.
authorBen Pfaff <blp@nicira.com>
Fri, 12 Mar 2010 19:03:44 +0000 (11:03 -0800)
committerBen Pfaff <blp@nicira.com>
Wed, 17 Mar 2010 21:24:55 +0000 (14:24 -0700)
Until now, the commit-time logic for verifying referential integrity
modified row reference counts in-place.  That meant that it had to be
careful to be able to roll back those changes if it did detect a violation.
This commit changes the logic to avoid making any in-place changes.
Instead, the reference counts are tracked outside the rows themselves and
committed only if the transaction as a whole satisfies the constraints.
This eliminates a fair bit of code and paves the way for implementing
weak references as well.

ovsdb/transaction.c

index f7349c936fa1430d1622ee77c5f2a80c65a9b967..ac9b7f3196916c596f3e1649379c1aa45e83b0dd 100644 (file)
@@ -64,6 +64,7 @@ struct ovsdb_txn_row {
     struct hmap_node hmap_node; /* In ovsdb_txn_table's txn_rows hmap. */
     struct ovsdb_row *old;      /* The old row. */
     struct ovsdb_row *new;      /* The new row. */
+    size_t n_refs;              /* Number of remaining references. */
 
     /* Used by for_each_txn_row(). */
     unsigned int serial;        /* Serial number of in-progress commit. */
@@ -147,133 +148,109 @@ find_txn_row(const struct ovsdb_table *table, const struct uuid *uuid)
     return NULL;
 }
 
-static void
-ovsdb_txn_adjust_atom_refs(const union ovsdb_atom *atoms, unsigned int n,
-                           const struct ovsdb_table *table,
-                           int delta, struct ovsdb_error **errorp)
+static struct ovsdb_error * WARN_UNUSED_RESULT
+ovsdb_txn_adjust_atom_refs(struct ovsdb_txn *txn,
+                           const struct ovsdb_base_type *base,
+                           const union ovsdb_atom *atoms, unsigned int n,
+                           int delta)
 {
+    const struct ovsdb_table *table;
     unsigned int i;
 
+    if (base->type != OVSDB_TYPE_UUID || !base->u.uuid.refTable) {
+        return NULL;
+    }
+
+    table = base->u.uuid.refTable;
     for (i = 0; i < n; i++) {
         const struct uuid *uuid = &atoms[i].uuid;
         struct ovsdb_txn_row *txn_row = find_txn_row(table, uuid);
-        if (txn_row) {
-            if (txn_row->old) {
-                txn_row->old->n_refs += delta;
-            }
-            if (txn_row->new) {
-                txn_row->new->n_refs += delta;
-            }
-        } else {
-            const struct ovsdb_row *row_ = ovsdb_table_get_row(table, uuid);
-            if (row_) {
-                struct ovsdb_row *row = (struct ovsdb_row *) row_;
-                row->n_refs += delta;
-            } else if (errorp) {
-                if (!*errorp) {
-                    *errorp = ovsdb_error("referential integrity violation",
-                                          "reference to nonexistent row "
-                                          UUID_FMT, UUID_ARGS(uuid));
-                }
+        if (!txn_row) {
+            const struct ovsdb_row *row = ovsdb_table_get_row(table, uuid);
+            if (row) {
+                txn_row = ovsdb_txn_row_modify(txn, row)->txn_row;
             } else {
-                NOT_REACHED();
+                return ovsdb_error("referential integrity violation",
+                                   "reference to nonexistent row "
+                                   UUID_FMT, UUID_ARGS(uuid));
             }
         }
+        txn_row->n_refs += delta;
     }
+
+    return NULL;
 }
 
-static void
-ovsdb_txn_adjust_row_refs(const struct ovsdb_row *r,
-                          const struct ovsdb_column *column, int delta,
-                          struct ovsdb_error **errorp)
+static struct ovsdb_error * WARN_UNUSED_RESULT
+ovsdb_txn_adjust_row_refs(struct ovsdb_txn *txn, const struct ovsdb_row *r,
+                          const struct ovsdb_column *column, int delta)
 {
     const struct ovsdb_datum *field = &r->fields[column->index];
-    const struct ovsdb_type *type = &column->type;
+    struct ovsdb_error *error;
 
-    if (type->key.type == OVSDB_TYPE_UUID && type->key.u.uuid.refTable) {
-        ovsdb_txn_adjust_atom_refs(field->keys, field->n,
-                                   type->key.u.uuid.refTable, delta, errorp);
-    }
-    if (type->value.type == OVSDB_TYPE_UUID && type->value.u.uuid.refTable) {
-        ovsdb_txn_adjust_atom_refs(field->values, field->n,
-                                   type->value.u.uuid.refTable, delta, errorp);
+    error = ovsdb_txn_adjust_atom_refs(txn, &column->type.key,
+                                       field->keys, field->n, delta);
+    if (!error) {
+        error = ovsdb_txn_adjust_atom_refs(txn, &column->type.value,
+                                           field->values, field->n, delta);
     }
+    return error;
 }
 
 static struct ovsdb_error * WARN_UNUSED_RESULT
-ovsdb_txn_adjust_ref_counts__(struct ovsdb_txn *txn, int delta)
+update_row_ref_count(struct ovsdb_txn *txn, struct ovsdb_txn_row *r)
 {
-    struct ovsdb_txn_table *t;
-    struct ovsdb_error *error;
-
-    error = NULL;
-    LIST_FOR_EACH (t, struct ovsdb_txn_table, node, &txn->txn_tables) {
-        struct ovsdb_table *table = t->table;
-        struct ovsdb_txn_row *r;
-
-        HMAP_FOR_EACH (r, struct ovsdb_txn_row, hmap_node, &t->txn_rows) {
-            struct shash_node *node;
-
-            SHASH_FOR_EACH (node, &table->schema->columns) {
-                const struct ovsdb_column *column = node->data;
-
-                if (r->old) {
-                    ovsdb_txn_adjust_row_refs(r->old, column, -delta, NULL);
-                }
-                if (r->new) {
-                    ovsdb_txn_adjust_row_refs(r->new, column, delta, &error);
-                }
+    struct ovsdb_table *table = r->old ? r->old->table : r->new->table;
+    struct shash_node *node;
+
+    SHASH_FOR_EACH (node, &table->schema->columns) {
+        const struct ovsdb_column *column = node->data;
+        struct ovsdb_error *error;
+
+        if (r->old) {
+            error = ovsdb_txn_adjust_row_refs(txn, r->old, column, -1);
+            if (error) {
+                ovsdb_error_destroy(error);
+                return OVSDB_BUG("error decreasing refcount");
+            }
+        }
+        if (r->new) {
+            error = ovsdb_txn_adjust_row_refs(txn, r->new, column, 1);
+            if (error) {
+                return error;
             }
         }
     }
-    return error;
-}
 
-static void
-ovsdb_txn_rollback_counts(struct ovsdb_txn *txn)
-{
-    ovsdb_error_destroy(ovsdb_txn_adjust_ref_counts__(txn, -1));
+    return NULL;
 }
 
 static struct ovsdb_error * WARN_UNUSED_RESULT
-ovsdb_txn_commit_ref_counts(struct ovsdb_txn *txn)
+check_ref_count(struct ovsdb_txn *txn OVS_UNUSED, struct ovsdb_txn_row *r)
 {
-    struct ovsdb_error *error = ovsdb_txn_adjust_ref_counts__(txn, 1);
-    if (error) {
-        ovsdb_txn_rollback_counts(txn);
+    if (r->new || !r->n_refs) {
+        return NULL;
+    } else {
+        return ovsdb_error("referential integrity violation",
+                           "cannot delete %s row "UUID_FMT" because "
+                           "of %zu remaining reference(s)",
+                           r->old->table->schema->name,
+                           UUID_ARGS(ovsdb_row_get_uuid(r->old)),
+                           r->n_refs);
     }
-    return error;
 }
 
 static struct ovsdb_error * WARN_UNUSED_RESULT
 update_ref_counts(struct ovsdb_txn *txn)
 {
     struct ovsdb_error *error;
-    struct ovsdb_txn_table *t;
 
-    error = ovsdb_txn_commit_ref_counts(txn);
+    error = for_each_txn_row(txn, update_row_ref_count);
     if (error) {
         return error;
     }
 
-    LIST_FOR_EACH (t, struct ovsdb_txn_table, node, &txn->txn_tables) {
-        struct ovsdb_txn_row *r;
-
-        HMAP_FOR_EACH (r, struct ovsdb_txn_row, hmap_node, &t->txn_rows) {
-            if (!r->new && r->old->n_refs) {
-                error = ovsdb_error("referential integrity violation",
-                                    "cannot delete %s row "UUID_FMT" because "
-                                    "of %zu remaining reference(s)",
-                                    t->table->schema->name,
-                                    UUID_ARGS(ovsdb_row_get_uuid(r->old)),
-                                    r->old->n_refs);
-                ovsdb_txn_rollback_counts(txn);
-                return error;
-            }
-        }
-    }
-
-    return NULL;
+    return for_each_txn_row(txn, check_ref_count);
 }
 
 static struct ovsdb_error * WARN_UNUSED_RESULT
@@ -281,6 +258,9 @@ ovsdb_txn_row_commit(struct ovsdb_txn *txn OVS_UNUSED,
                      struct ovsdb_txn_row *txn_row)
 {
     ovsdb_txn_row_prefree(txn_row);
+    if (txn_row->new) {
+        txn_row->new->n_refs = txn_row->n_refs;
+    }
     ovsdb_row_destroy(txn_row->old);
     free(txn_row);
 
@@ -361,6 +341,7 @@ ovsdb_txn_row_create(struct ovsdb_txn *txn, struct ovsdb_table *table,
     txn_row = xmalloc(sizeof *txn_row);
     txn_row->old = old;
     txn_row->new = new;
+    txn_row->n_refs = old ? old->n_refs : 0;
     txn_row->serial = serial - 1;
 
     if (old) {