return NULL;
}
- HMAP_FOR_EACH_WITH_HASH (txn_row, struct ovsdb_txn_row, hmap_node,
+ HMAP_FOR_EACH_WITH_HASH (txn_row, hmap_node,
uuid_hash(uuid), &table->txn_table->txn_rows) {
const struct ovsdb_row *row;
}
static struct ovsdb_error * WARN_UNUSED_RESULT
-ovsdb_txn_adjust_atom_refs(struct ovsdb_txn *txn,
+ovsdb_txn_adjust_atom_refs(struct ovsdb_txn *txn, const struct ovsdb_row *r,
+ const struct ovsdb_column *c,
const struct ovsdb_base_type *base,
const union ovsdb_atom *atoms, unsigned int n,
int delta)
const struct ovsdb_table *table;
unsigned int i;
- if (base->type != OVSDB_TYPE_UUID || !base->u.uuid.refTable) {
+ if (!ovsdb_base_type_is_strong_ref(base)) {
return NULL;
}
txn_row = ovsdb_txn_row_modify(txn, row)->txn_row;
} else {
return ovsdb_error("referential integrity violation",
- "reference to nonexistent row "
- UUID_FMT, UUID_ARGS(uuid));
+ "Table %s column %s row "UUID_FMT" "
+ "references nonexistent row "UUID_FMT" in "
+ "table %s.",
+ r->table->schema->name, c->name,
+ UUID_ARGS(ovsdb_row_get_uuid(r)),
+ UUID_ARGS(uuid), table->schema->name);
}
}
txn_row->n_refs += delta;
const struct ovsdb_datum *field = &r->fields[column->index];
struct ovsdb_error *error;
- error = ovsdb_txn_adjust_atom_refs(txn, &column->type.key,
+ error = ovsdb_txn_adjust_atom_refs(txn, r, column, &column->type.key,
field->keys, field->n, delta);
if (!error) {
- error = ovsdb_txn_adjust_atom_refs(txn, &column->type.value,
+ error = ovsdb_txn_adjust_atom_refs(txn, r, column, &column->type.value,
field->values, field->n, delta);
}
return error;
return NULL;
}
+static void
+add_weak_ref(struct ovsdb_txn *txn,
+ const struct ovsdb_row *src_, const struct ovsdb_row *dst_)
+{
+ struct ovsdb_row *src = (struct ovsdb_row *) src_;
+ struct ovsdb_row *dst = (struct ovsdb_row *) dst_;
+ struct ovsdb_weak_ref *weak;
+
+ if (src == dst) {
+ return;
+ }
+
+ dst = ovsdb_txn_row_modify(txn, dst);
+
+ if (!list_is_empty(&dst->dst_refs)) {
+ /* Omit duplicates. */
+ weak = CONTAINER_OF(list_back(&dst->dst_refs),
+ struct ovsdb_weak_ref, dst_node);
+ if (weak->src == src) {
+ return;
+ }
+ }
+
+ weak = xmalloc(sizeof *weak);
+ weak->src = src;
+ list_push_back(&dst->dst_refs, &weak->dst_node);
+ list_push_back(&src->src_refs, &weak->src_node);
+}
+
+static struct ovsdb_error * WARN_UNUSED_RESULT
+assess_weak_refs(struct ovsdb_txn *txn, struct ovsdb_txn_row *txn_row)
+{
+ struct ovsdb_table *table;
+ struct shash_node *node;
+
+ if (txn_row->old) {
+ /* Mark rows that have weak references to 'txn_row' as modified, so
+ * that their weak references will get reassessed. */
+ struct ovsdb_weak_ref *weak, *next;
+
+ LIST_FOR_EACH_SAFE (weak, next, dst_node, &txn_row->old->dst_refs) {
+ if (!weak->src->txn_row) {
+ ovsdb_txn_row_modify(txn, weak->src);
+ }
+ }
+ }
+
+ if (!txn_row->new) {
+ /* We don't have to do anything about references that originate at
+ * 'txn_row', because ovsdb_row_destroy() will remove those weak
+ * references. */
+ return NULL;
+ }
+
+ table = txn_row->new->table;
+ SHASH_FOR_EACH (node, &table->schema->columns) {
+ const struct ovsdb_column *column = node->data;
+ struct ovsdb_datum *datum = &txn_row->new->fields[column->index];
+ unsigned int orig_n, i;
+ bool zero = false;
+
+ orig_n = datum->n;
+
+ if (ovsdb_base_type_is_weak_ref(&column->type.key)) {
+ for (i = 0; i < datum->n; ) {
+ const struct ovsdb_row *row;
+
+ row = ovsdb_table_get_row(column->type.key.u.uuid.refTable,
+ &datum->keys[i].uuid);
+ if (row) {
+ add_weak_ref(txn, txn_row->new, row);
+ i++;
+ } else {
+ if (uuid_is_zero(&datum->keys[i].uuid)) {
+ zero = true;
+ }
+ ovsdb_datum_remove_unsafe(datum, i, &column->type);
+ }
+ }
+ }
+
+ if (ovsdb_base_type_is_weak_ref(&column->type.value)) {
+ for (i = 0; i < datum->n; ) {
+ const struct ovsdb_row *row;
+
+ row = ovsdb_table_get_row(column->type.value.u.uuid.refTable,
+ &datum->values[i].uuid);
+ if (row) {
+ add_weak_ref(txn, txn_row->new, row);
+ i++;
+ } else {
+ if (uuid_is_zero(&datum->values[i].uuid)) {
+ zero = true;
+ }
+ ovsdb_datum_remove_unsafe(datum, i, &column->type);
+ }
+ }
+ }
+
+ if (datum->n != orig_n) {
+ bitmap_set1(txn_row->changed, column->index);
+ ovsdb_datum_sort_assert(datum, column->type.key.type);
+ if (datum->n < column->type.n_min) {
+ const struct uuid *row_uuid = ovsdb_row_get_uuid(txn_row->new);
+ if (zero && !txn_row->old) {
+ return ovsdb_error(
+ "constraint violation",
+ "Weak reference column \"%s\" in \"%s\" row "UUID_FMT
+ " (inserted within this transaction) contained "
+ "all-zeros UUID (probably as the default value for "
+ "this column) but deleting this value caused a "
+ "constraint volation because this column is not "
+ "allowed to be empty.", column->name,
+ table->schema->name, UUID_ARGS(row_uuid));
+ } else {
+ return ovsdb_error(
+ "constraint violation",
+ "Deletion of %u weak reference(s) to deleted (or "
+ "never-existing) rows from column \"%s\" in \"%s\" "
+ "row "UUID_FMT" %scaused this column to become empty, "
+ "but constraints on this column disallow an "
+ "empty column.",
+ orig_n - datum->n, column->name, table->schema->name,
+ UUID_ARGS(row_uuid),
+ (txn_row->old
+ ? ""
+ : "(inserted within this transaction) "));
+ }
+ }
+ }
+ }
+
+ return NULL;
+}
+
static struct ovsdb_error * WARN_UNUSED_RESULT
determine_changes(struct ovsdb_txn *txn, struct ovsdb_txn_row *txn_row)
{
return NULL;
}
+static struct ovsdb_error * WARN_UNUSED_RESULT
+check_max_rows(struct ovsdb_txn *txn)
+{
+ struct ovsdb_txn_table *t;
+
+ LIST_FOR_EACH (t, node, &txn->txn_tables) {
+ size_t n_rows = hmap_count(&t->table->rows);
+ unsigned int max_rows = t->table->schema->max_rows;
+
+ if (n_rows > max_rows) {
+ return ovsdb_error("constraint violation",
+ "transaction causes \"%s\" table to contain "
+ "%zu rows, greater than the schema-defined "
+ "limit of %u row(s)",
+ t->table->schema->name, n_rows, max_rows);
+ }
+ }
+
+ return NULL;
+}
+
struct ovsdb_error *
ovsdb_txn_commit(struct ovsdb_txn *txn, bool durable)
{
return NULL;
}
+ /* Check maximum rows table constraints. */
+ error = check_max_rows(txn);
+ if (error) {
+ ovsdb_txn_abort(txn);
+ return error;
+ }
+
/* Update reference counts and check referential integrity. */
error = update_ref_counts(txn);
if (error) {
return error;
}
+ /* Check reference counts and remove bad reference for "weak" referential
+ * integrity. */
+ error = for_each_txn_row(txn, assess_weak_refs);
+ if (error) {
+ ovsdb_txn_abort(txn);
+ return error;
+ }
+
/* Send the commit to each replica. */
- LIST_FOR_EACH (replica, struct ovsdb_replica, node, &txn->db->replicas) {
+ LIST_FOR_EACH (replica, node, &txn->db->replicas) {
error = (replica->class->commit)(replica, txn, durable);
if (error) {
/* We don't support two-phase commit so only the first replica is
struct ovsdb_txn_table *t;
struct ovsdb_txn_row *r;
- LIST_FOR_EACH (t, struct ovsdb_txn_table, node, &txn->txn_tables) {
- HMAP_FOR_EACH (r, struct ovsdb_txn_row, hmap_node, &t->txn_rows) {
+ LIST_FOR_EACH (t, node, &txn->txn_tables) {
+ HMAP_FOR_EACH (r, hmap_node, &t->txn_rows) {
if (!cb(r->old, r->new, r->changed, aux)) {
break;
}
struct ovsdb_txn_table *t, *next_txn_table;
any_work = false;
- LIST_FOR_EACH_SAFE (t, next_txn_table, struct ovsdb_txn_table, node,
- &txn->txn_tables) {
+ LIST_FOR_EACH_SAFE (t, next_txn_table, node, &txn->txn_tables) {
if (t->serial != serial) {
t->serial = serial;
t->n_processed = 0;
while (t->n_processed < hmap_count(&t->txn_rows)) {
struct ovsdb_txn_row *r, *next_txn_row;
- HMAP_FOR_EACH_SAFE (r, next_txn_row,
- struct ovsdb_txn_row, hmap_node,
- &t->txn_rows) {
+ HMAP_FOR_EACH_SAFE (r, next_txn_row, hmap_node, &t->txn_rows) {
if (r->serial != serial) {
struct ovsdb_error *error;