Make the "exit" unixctl command reliable in ovsdb-server, ovs-vswitchd.
[openvswitch] / ovsdb / transaction.c
index 8a10d1ed05aa018de7b7701fbf588440f4cc25b6..b26705a3ab2304d16c5260cb780760866c1e5690 100644 (file)
@@ -138,7 +138,7 @@ find_txn_row(const struct ovsdb_table *table, const struct uuid *uuid)
         return NULL;
     }
 
-    HMAP_FOR_EACH_WITH_HASH (txn_row, struct ovsdb_txn_row, hmap_node,
+    HMAP_FOR_EACH_WITH_HASH (txn_row, hmap_node,
                              uuid_hash(uuid), &table->txn_table->txn_rows) {
         const struct ovsdb_row *row;
 
@@ -152,7 +152,8 @@ find_txn_row(const struct ovsdb_table *table, const struct uuid *uuid)
 }
 
 static struct ovsdb_error * WARN_UNUSED_RESULT
-ovsdb_txn_adjust_atom_refs(struct ovsdb_txn *txn,
+ovsdb_txn_adjust_atom_refs(struct ovsdb_txn *txn, const struct ovsdb_row *r,
+                           const struct ovsdb_column *c,
                            const struct ovsdb_base_type *base,
                            const union ovsdb_atom *atoms, unsigned int n,
                            int delta)
@@ -160,7 +161,7 @@ ovsdb_txn_adjust_atom_refs(struct ovsdb_txn *txn,
     const struct ovsdb_table *table;
     unsigned int i;
 
-    if (base->type != OVSDB_TYPE_UUID || !base->u.uuid.refTable) {
+    if (!ovsdb_base_type_is_strong_ref(base)) {
         return NULL;
     }
 
@@ -174,8 +175,12 @@ ovsdb_txn_adjust_atom_refs(struct ovsdb_txn *txn,
                 txn_row = ovsdb_txn_row_modify(txn, row)->txn_row;
             } else {
                 return ovsdb_error("referential integrity violation",
-                                   "reference to nonexistent row "
-                                   UUID_FMT, UUID_ARGS(uuid));
+                                   "Table %s column %s row "UUID_FMT" "
+                                   "references nonexistent row "UUID_FMT" in "
+                                   "table %s.",
+                                   r->table->schema->name, c->name,
+                                   UUID_ARGS(ovsdb_row_get_uuid(r)),
+                                   UUID_ARGS(uuid), table->schema->name);
             }
         }
         txn_row->n_refs += delta;
@@ -191,10 +196,10 @@ ovsdb_txn_adjust_row_refs(struct ovsdb_txn *txn, const struct ovsdb_row *r,
     const struct ovsdb_datum *field = &r->fields[column->index];
     struct ovsdb_error *error;
 
-    error = ovsdb_txn_adjust_atom_refs(txn, &column->type.key,
+    error = ovsdb_txn_adjust_atom_refs(txn, r, column, &column->type.key,
                                        field->keys, field->n, delta);
     if (!error) {
-        error = ovsdb_txn_adjust_atom_refs(txn, &column->type.value,
+        error = ovsdb_txn_adjust_atom_refs(txn, r, column, &column->type.value,
                                            field->values, field->n, delta);
     }
     return error;
@@ -270,6 +275,141 @@ ovsdb_txn_row_commit(struct ovsdb_txn *txn OVS_UNUSED,
     return NULL;
 }
 
+static void
+add_weak_ref(struct ovsdb_txn *txn,
+             const struct ovsdb_row *src_, const struct ovsdb_row *dst_)
+{
+    struct ovsdb_row *src = (struct ovsdb_row *) src_;
+    struct ovsdb_row *dst = (struct ovsdb_row *) dst_;
+    struct ovsdb_weak_ref *weak;
+
+    if (src == dst) {
+        return;
+    }
+
+    dst = ovsdb_txn_row_modify(txn, dst);
+
+    if (!list_is_empty(&dst->dst_refs)) {
+        /* Omit duplicates. */
+        weak = CONTAINER_OF(list_back(&dst->dst_refs),
+                            struct ovsdb_weak_ref, dst_node);
+        if (weak->src == src) {
+            return;
+        }
+    }
+
+    weak = xmalloc(sizeof *weak);
+    weak->src = src;
+    list_push_back(&dst->dst_refs, &weak->dst_node);
+    list_push_back(&src->src_refs, &weak->src_node);
+}
+
+static struct ovsdb_error * WARN_UNUSED_RESULT
+assess_weak_refs(struct ovsdb_txn *txn, struct ovsdb_txn_row *txn_row)
+{
+    struct ovsdb_table *table;
+    struct shash_node *node;
+
+    if (txn_row->old) {
+        /* Mark rows that have weak references to 'txn_row' as modified, so
+         * that their weak references will get reassessed. */
+        struct ovsdb_weak_ref *weak, *next;
+
+        LIST_FOR_EACH_SAFE (weak, next, dst_node, &txn_row->old->dst_refs) {
+            if (!weak->src->txn_row) {
+                ovsdb_txn_row_modify(txn, weak->src);
+            }
+        }
+    }
+
+    if (!txn_row->new) {
+        /* We don't have to do anything about references that originate at
+         * 'txn_row', because ovsdb_row_destroy() will remove those weak
+         * references. */
+        return NULL;
+    }
+
+    table = txn_row->new->table;
+    SHASH_FOR_EACH (node, &table->schema->columns) {
+        const struct ovsdb_column *column = node->data;
+        struct ovsdb_datum *datum = &txn_row->new->fields[column->index];
+        unsigned int orig_n, i;
+        bool zero = false;
+
+        orig_n = datum->n;
+
+        if (ovsdb_base_type_is_weak_ref(&column->type.key)) {
+            for (i = 0; i < datum->n; ) {
+                const struct ovsdb_row *row;
+
+                row = ovsdb_table_get_row(column->type.key.u.uuid.refTable,
+                                          &datum->keys[i].uuid);
+                if (row) {
+                    add_weak_ref(txn, txn_row->new, row);
+                    i++;
+                } else {
+                    if (uuid_is_zero(&datum->keys[i].uuid)) {
+                        zero = true;
+                    }
+                    ovsdb_datum_remove_unsafe(datum, i, &column->type);
+                }
+            }
+        }
+
+        if (ovsdb_base_type_is_weak_ref(&column->type.value)) {
+            for (i = 0; i < datum->n; ) {
+                const struct ovsdb_row *row;
+
+                row = ovsdb_table_get_row(column->type.value.u.uuid.refTable,
+                                          &datum->values[i].uuid);
+                if (row) {
+                    add_weak_ref(txn, txn_row->new, row);
+                    i++;
+                } else {
+                    if (uuid_is_zero(&datum->values[i].uuid)) {
+                        zero = true;
+                    }
+                    ovsdb_datum_remove_unsafe(datum, i, &column->type);
+                }
+            }
+        }
+
+        if (datum->n != orig_n) {
+            bitmap_set1(txn_row->changed, column->index);
+            ovsdb_datum_sort_assert(datum, column->type.key.type);
+            if (datum->n < column->type.n_min) {
+                const struct uuid *row_uuid = ovsdb_row_get_uuid(txn_row->new);
+                if (zero && !txn_row->old) {
+                    return ovsdb_error(
+                        "constraint violation",
+                        "Weak reference column \"%s\" in \"%s\" row "UUID_FMT
+                        " (inserted within this transaction) contained "
+                        "all-zeros UUID (probably as the default value for "
+                        "this column) but deleting this value caused a "
+                        "constraint volation because this column is not "
+                        "allowed to be empty.", column->name,
+                        table->schema->name, UUID_ARGS(row_uuid));
+                } else {
+                    return ovsdb_error(
+                        "constraint violation",
+                        "Deletion of %u weak reference(s) to deleted (or "
+                        "never-existing) rows from column \"%s\" in \"%s\" "
+                        "row "UUID_FMT" %scaused this column to become empty, "
+                        "but constraints on this column disallow an "
+                        "empty column.",
+                        orig_n - datum->n, column->name, table->schema->name,
+                        UUID_ARGS(row_uuid),
+                        (txn_row->old
+                         ? ""
+                         : "(inserted within this transaction) "));
+                }
+            }
+        }
+    }
+
+    return NULL;
+}
+
 static struct ovsdb_error * WARN_UNUSED_RESULT
 determine_changes(struct ovsdb_txn *txn, struct ovsdb_txn_row *txn_row)
 {
@@ -305,6 +445,27 @@ determine_changes(struct ovsdb_txn *txn, struct ovsdb_txn_row *txn_row)
     return NULL;
 }
 
+static struct ovsdb_error * WARN_UNUSED_RESULT
+check_max_rows(struct ovsdb_txn *txn)
+{
+    struct ovsdb_txn_table *t;
+
+    LIST_FOR_EACH (t, node, &txn->txn_tables) {
+        size_t n_rows = hmap_count(&t->table->rows);
+        unsigned int max_rows = t->table->schema->max_rows;
+
+        if (n_rows > max_rows) {
+            return ovsdb_error("constraint violation",
+                               "transaction causes \"%s\" table to contain "
+                               "%zu rows, greater than the schema-defined "
+                               "limit of %u row(s)",
+                               t->table->schema->name, n_rows, max_rows);
+        }
+    }
+
+    return NULL;
+}
+
 struct ovsdb_error *
 ovsdb_txn_commit(struct ovsdb_txn *txn, bool durable)
 {
@@ -323,6 +484,13 @@ ovsdb_txn_commit(struct ovsdb_txn *txn, bool durable)
         return NULL;
     }
 
+    /* Check maximum rows table constraints. */
+    error = check_max_rows(txn);
+    if (error) {
+        ovsdb_txn_abort(txn);
+        return error;
+    }
+
     /* Update reference counts and check referential integrity. */
     error = update_ref_counts(txn);
     if (error) {
@@ -330,8 +498,16 @@ ovsdb_txn_commit(struct ovsdb_txn *txn, bool durable)
         return error;
     }
 
+    /* Check reference counts and remove bad reference for "weak" referential
+     * integrity. */
+    error = for_each_txn_row(txn, assess_weak_refs);
+    if (error) {
+        ovsdb_txn_abort(txn);
+        return error;
+    }
+
     /* Send the commit to each replica. */
-    LIST_FOR_EACH (replica, struct ovsdb_replica, node, &txn->db->replicas) {
+    LIST_FOR_EACH (replica, node, &txn->db->replicas) {
         error = (replica->class->commit)(replica, txn, durable);
         if (error) {
             /* We don't support two-phase commit so only the first replica is
@@ -358,8 +534,8 @@ ovsdb_txn_for_each_change(const struct ovsdb_txn *txn,
     struct ovsdb_txn_table *t;
     struct ovsdb_txn_row *r;
 
-    LIST_FOR_EACH (t, struct ovsdb_txn_table, node, &txn->txn_tables) {
-        HMAP_FOR_EACH (r, struct ovsdb_txn_row, hmap_node, &t->txn_rows) {
+    LIST_FOR_EACH (t, node, &txn->txn_tables) {
+        HMAP_FOR_EACH (r, hmap_node, &t->txn_rows) {
             if (!cb(r->old, r->new, r->changed, aux)) {
                 break;
             }
@@ -537,8 +713,7 @@ for_each_txn_row(struct ovsdb_txn *txn,
         struct ovsdb_txn_table *t, *next_txn_table;
 
         any_work = false;
-        LIST_FOR_EACH_SAFE (t, next_txn_table, struct ovsdb_txn_table, node,
-                            &txn->txn_tables) {
+        LIST_FOR_EACH_SAFE (t, next_txn_table, node, &txn->txn_tables) {
             if (t->serial != serial) {
                 t->serial = serial;
                 t->n_processed = 0;
@@ -547,9 +722,7 @@ for_each_txn_row(struct ovsdb_txn *txn,
             while (t->n_processed < hmap_count(&t->txn_rows)) {
                 struct ovsdb_txn_row *r, *next_txn_row;
 
-                HMAP_FOR_EACH_SAFE (r, next_txn_row,
-                                    struct ovsdb_txn_row, hmap_node,
-                                    &t->txn_rows) {
+                HMAP_FOR_EACH_SAFE (r, next_txn_row, hmap_node, &t->txn_rows) {
                     if (r->serial != serial) {
                         struct ovsdb_error *error;