ovsdb: Implement table uniqueness constraints ("indexes").
authorBen Pfaff <blp@nicira.com>
Mon, 6 Jun 2011 16:09:10 +0000 (09:09 -0700)
committerBen Pfaff <blp@nicira.com>
Mon, 6 Jun 2011 16:09:10 +0000 (09:09 -0700)
12 files changed:
ovsdb/SPECS
ovsdb/ovsdb-doc.in
ovsdb/row.c
ovsdb/row.h
ovsdb/table.c
ovsdb/table.h
ovsdb/transaction.c
python/ovs/db/schema.py
tests/ovsdb-execution.at
tests/ovsdb-server.at
tests/ovsdb-table.at
tests/ovsdb-tool.at

index d413c284510f66d29ad77b2860ba7a19bc63a87e..cf0c9fa105c205ee0342821a02d82c849b275304 100644 (file)
@@ -132,6 +132,7 @@ is represented by <database-schema>, as described below.
         "columns": {<id>: <column-schema>, ...}   required
         "maxRows": <integer>                      optional
         "isRoot": <boolean>                       optional
         "columns": {<id>: <column-schema>, ...}   required
         "maxRows": <integer>                      optional
         "isRoot": <boolean>                       optional
+        "indexes": [<column-set>*]                optional
 
     The value of "columns" is a JSON object whose names are column
     names and whose values are <column-schema>s.
 
     The value of "columns" is a JSON object whose names are column
     names and whose values are <column-schema>s.
@@ -176,6 +177,15 @@ is represented by <database-schema>, as described below.
     enforced after unreferenced rows are deleted from tables with a
     false "isRoot".
 
     enforced after unreferenced rows are deleted from tables with a
     false "isRoot".
 
+    If "indexes" is specified, it must be an array of zero or more
+    <column-set>s.  A <column-set> is an array of one or more strings,
+    each of which names a column.  Each <column-set> is a set of
+    columns whose values, taken together within any given row, must be
+    unique within the table.  This is a "deferred" constraint,
+    enforced only at transaction commit time, after unreferenced rows
+    are deleted and dangling weak references are removed.  Ephemeral
+    columns may not be part of indexes.
+
 <column-schema>
 
     A JSON object with the following members:
 <column-schema>
 
     A JSON object with the following members:
index 2577e782b9be57ad322508e6bd336b1dcd5e8ff6..b325b263283bd924bbe7ce497078ca34999991e6 100755 (executable)
@@ -136,6 +136,8 @@ def typeAndConstraintsToNroff(column):
     constraints = column.type.constraintsToEnglish(escapeNroffLiteral)
     if constraints:
         type += ", " + constraints
     constraints = column.type.constraintsToEnglish(escapeNroffLiteral)
     if constraints:
         type += ", " + constraints
+    if column.unique:
+        type += " (must be unique within table)"
     return type
 
 def columnToNroff(columnName, column, node):
     return type
 
 def columnToNroff(columnName, column, node):
index dece90fd9d91665e56b0f3245e3091b25116ef0f..e25f857aeed85b43a3aa21fb30e7aca0dd27a6c4 100644 (file)
@@ -31,8 +31,10 @@ static struct ovsdb_row *
 allocate_row(const struct ovsdb_table *table)
 {
     size_t n_fields = shash_count(&table->schema->columns);
 allocate_row(const struct ovsdb_table *table)
 {
     size_t n_fields = shash_count(&table->schema->columns);
+    size_t n_indexes = table->schema->n_indexes;
     size_t row_size = (offsetof(struct ovsdb_row, fields)
     size_t row_size = (offsetof(struct ovsdb_row, fields)
-                       + sizeof(struct ovsdb_datum) * n_fields);
+                       + sizeof(struct ovsdb_datum) * n_fields
+                       + sizeof(struct hmap_node) * n_indexes);
     struct ovsdb_row *row = xmalloc(row_size);
     row->table = (struct ovsdb_table *) table;
     row->txn_row = NULL;
     struct ovsdb_row *row = xmalloc(row_size);
     row->table = (struct ovsdb_table *) table;
     row->txn_row = NULL;
index ba0f9fbee8b5626a5478671a882011d1653a72b9..2fdc72ef5eb79b22dc019d0727a0f83117240ba3 100644 (file)
@@ -58,7 +58,13 @@ struct ovsdb_row {
      * commit. */
     size_t n_refs;
 
      * commit. */
     size_t n_refs;
 
+    /* One datum for each column (shash_count(&table->schema->columns)
+     * elements). */
     struct ovsdb_datum fields[];
     struct ovsdb_datum fields[];
+
+    /* Followed by table->schema->n_indexes "struct hmap_node"s.  In rows that
+     * have have been committed as part of the database, the hmap_node with
+     * index 'i' is contained in hmap table->indexes[i].  */
 };
 
 struct ovsdb_row *ovsdb_row_create(const struct ovsdb_table *);
 };
 
 struct ovsdb_row *ovsdb_row_create(const struct ovsdb_table *);
index 44a205e9103d551e9a50ec2bc50cb18a5a515678..e350fc3017851e24ba331db9eb1d506eb32973fb 100644 (file)
@@ -57,6 +57,9 @@ ovsdb_table_schema_create(const char *name, bool mutable,
     add_column(ts, version);
     assert(version->index == OVSDB_COL_VERSION);
 
     add_column(ts, version);
     assert(version->index == OVSDB_COL_VERSION);
 
+    ts->n_indexes = 0;
+    ts->indexes = NULL;
+
     return ts;
 }
 
     return ts;
 }
 
@@ -65,6 +68,7 @@ ovsdb_table_schema_clone(const struct ovsdb_table_schema *old)
 {
     struct ovsdb_table_schema *new;
     struct shash_node *node;
 {
     struct ovsdb_table_schema *new;
     struct shash_node *node;
+    size_t i;
 
     new = ovsdb_table_schema_create(old->name, old->mutable,
                                     old->max_rows, old->is_root);
 
     new = ovsdb_table_schema_create(old->name, old->mutable,
                                     old->max_rows, old->is_root);
@@ -78,6 +82,24 @@ ovsdb_table_schema_clone(const struct ovsdb_table_schema *old)
 
         add_column(new, ovsdb_column_clone(column));
     }
 
         add_column(new, ovsdb_column_clone(column));
     }
+
+    new->n_indexes = old->n_indexes;
+    new->indexes = xmalloc(new->n_indexes * sizeof *new->indexes);
+    for (i = 0; i < new->n_indexes; i++) {
+        const struct ovsdb_column_set *old_index = &old->indexes[i];
+        struct ovsdb_column_set *new_index = &new->indexes[i];
+        size_t j;
+
+        ovsdb_column_set_init(new_index);
+        for (j = 0; j < old_index->n_columns; j++) {
+            const struct ovsdb_column *old_column = old_index->columns[j];
+            const struct ovsdb_column *new_column;
+
+            new_column = ovsdb_table_schema_get_column(new, old_column->name);
+            ovsdb_column_set_add(new_index, new_column);
+        }
+    }
+
     return new;
 }
 
     return new;
 }
 
@@ -85,6 +107,12 @@ void
 ovsdb_table_schema_destroy(struct ovsdb_table_schema *ts)
 {
     struct shash_node *node;
 ovsdb_table_schema_destroy(struct ovsdb_table_schema *ts)
 {
     struct shash_node *node;
+    size_t i;
+
+    for (i = 0; i < ts->n_indexes; i++) {
+        ovsdb_column_set_destroy(&ts->indexes[i]);
+    }
+    free(ts->indexes);
 
     SHASH_FOR_EACH (node, &ts->columns) {
         ovsdb_column_destroy(node->data);
 
     SHASH_FOR_EACH (node, &ts->columns) {
         ovsdb_column_destroy(node->data);
@@ -99,7 +127,7 @@ ovsdb_table_schema_from_json(const struct json *json, const char *name,
                              struct ovsdb_table_schema **tsp)
 {
     struct ovsdb_table_schema *ts;
                              struct ovsdb_table_schema **tsp)
 {
     struct ovsdb_table_schema *ts;
-    const struct json *columns, *mutable, *max_rows, *is_root;
+    const struct json *columns, *mutable, *max_rows, *is_root, *indexes;
     struct shash_node *node;
     struct ovsdb_parser parser;
     struct ovsdb_error *error;
     struct shash_node *node;
     struct ovsdb_parser parser;
     struct ovsdb_error *error;
@@ -114,6 +142,7 @@ ovsdb_table_schema_from_json(const struct json *json, const char *name,
     max_rows = ovsdb_parser_member(&parser, "maxRows",
                                    OP_INTEGER | OP_OPTIONAL);
     is_root = ovsdb_parser_member(&parser, "isRoot", OP_BOOLEAN | OP_OPTIONAL);
     max_rows = ovsdb_parser_member(&parser, "maxRows",
                                    OP_INTEGER | OP_OPTIONAL);
     is_root = ovsdb_parser_member(&parser, "isRoot", OP_BOOLEAN | OP_OPTIONAL);
+    indexes = ovsdb_parser_member(&parser, "indexes", OP_ARRAY | OP_OPTIONAL);
     error = ovsdb_parser_finish(&parser);
     if (error) {
         return error;
     error = ovsdb_parser_finish(&parser);
     if (error) {
         return error;
@@ -150,14 +179,51 @@ ovsdb_table_schema_from_json(const struct json *json, const char *name,
             error = ovsdb_column_from_json(node->data, node->name, &column);
         }
         if (error) {
             error = ovsdb_column_from_json(node->data, node->name, &column);
         }
         if (error) {
-            ovsdb_table_schema_destroy(ts);
-            return error;
+            goto error;
         }
 
         add_column(ts, column);
     }
         }
 
         add_column(ts, column);
     }
+
+    if (indexes) {
+        size_t i;
+
+        ts->indexes = xmalloc(indexes->u.array.n * sizeof *ts->indexes);
+        for (i = 0; i < indexes->u.array.n; i++) {
+            struct ovsdb_column_set *index = &ts->indexes[i];
+            size_t j;
+
+            error = ovsdb_column_set_from_json(indexes->u.array.elems[i],
+                                               ts, index);
+            if (error) {
+                goto error;
+            }
+            if (index->n_columns == 0) {
+                error = ovsdb_syntax_error(json, NULL, "index must have "
+                                           "at least one column");
+                goto error;
+            }
+            ts->n_indexes++;
+
+            for (j = 0; j < index->n_columns; j++) {
+                const struct ovsdb_column *column = index->columns[j];
+
+                if (!column->persistent) {
+                    error = ovsdb_syntax_error(json, NULL, "ephemeral columns "
+                                               "(such as %s) may not be "
+                                               "indexed", column->name);
+                    goto error;
+                }
+            }
+        }
+    }
+
     *tsp = ts;
     return NULL;
     *tsp = ts;
     return NULL;
+
+error:
+    ovsdb_table_schema_destroy(ts);
+    return error;
 }
 
 /* Returns table schema 'ts' serialized into JSON.
 }
 
 /* Returns table schema 'ts' serialized into JSON.
@@ -199,6 +265,18 @@ ovsdb_table_schema_to_json(const struct ovsdb_table_schema *ts,
         json_object_put(json, "maxRows", json_integer_create(ts->max_rows));
     }
 
         json_object_put(json, "maxRows", json_integer_create(ts->max_rows));
     }
 
+    if (ts->n_indexes) {
+        struct json **indexes;
+        size_t i;
+
+        indexes = xmalloc(ts->n_indexes * sizeof *indexes);
+        for (i = 0; i < ts->n_indexes; i++) {
+            indexes[i] = ovsdb_column_set_to_json(&ts->indexes[i]);
+        }
+        json_object_put(json, "indexes",
+                        json_array_create(indexes, ts->n_indexes));
+    }
+
     return json;
 }
 
     return json;
 }
 
@@ -213,10 +291,15 @@ struct ovsdb_table *
 ovsdb_table_create(struct ovsdb_table_schema *ts)
 {
     struct ovsdb_table *table;
 ovsdb_table_create(struct ovsdb_table_schema *ts)
 {
     struct ovsdb_table *table;
+    size_t i;
 
     table = xmalloc(sizeof *table);
     table->schema = ts;
     table->txn_table = NULL;
 
     table = xmalloc(sizeof *table);
     table->schema = ts;
     table->txn_table = NULL;
+    table->indexes = xmalloc(ts->n_indexes * sizeof *table->indexes);
+    for (i = 0; i < ts->n_indexes; i++) {
+        hmap_init(&table->indexes[i]);
+    }
     hmap_init(&table->rows);
 
     return table;
     hmap_init(&table->rows);
 
     return table;
@@ -227,12 +310,18 @@ ovsdb_table_destroy(struct ovsdb_table *table)
 {
     if (table) {
         struct ovsdb_row *row, *next;
 {
     if (table) {
         struct ovsdb_row *row, *next;
+        size_t i;
 
         HMAP_FOR_EACH_SAFE (row, next, hmap_node, &table->rows) {
             ovsdb_row_destroy(row);
         }
         hmap_destroy(&table->rows);
 
 
         HMAP_FOR_EACH_SAFE (row, next, hmap_node, &table->rows) {
             ovsdb_row_destroy(row);
         }
         hmap_destroy(&table->rows);
 
+        for (i = 0; i < table->schema->n_indexes; i++) {
+            hmap_destroy(&table->indexes[i]);
+        }
+        free(table->indexes);
+
         ovsdb_table_schema_destroy(table->schema);
         free(table);
     }
         ovsdb_table_schema_destroy(table->schema);
         free(table);
     }
index 8e933438613561f6a9a99d8f0962b6f17922c776..a2f2aa84e39154a1b49fb16ba7c8c235776a499a 100644 (file)
@@ -31,6 +31,8 @@ struct ovsdb_table_schema {
     struct shash columns;       /* Contains "struct ovsdb_column *"s. */
     unsigned int max_rows;      /* Maximum number of rows. */
     bool is_root;               /* Part of garbage collection root set? */
     struct shash columns;       /* Contains "struct ovsdb_column *"s. */
     unsigned int max_rows;      /* Maximum number of rows. */
     bool is_root;               /* Part of garbage collection root set? */
+    struct ovsdb_column_set *indexes;
+    size_t n_indexes;
 };
 
 struct ovsdb_table_schema *ovsdb_table_schema_create(
 };
 
 struct ovsdb_table_schema *ovsdb_table_schema_create(
@@ -55,6 +57,11 @@ struct ovsdb_table {
     struct ovsdb_table_schema *schema;
     struct ovsdb_txn_table *txn_table; /* Only if table is in a transaction. */
     struct hmap rows;           /* Contains "struct ovsdb_row"s. */
     struct ovsdb_table_schema *schema;
     struct ovsdb_txn_table *txn_table; /* Only if table is in a transaction. */
     struct hmap rows;           /* Contains "struct ovsdb_row"s. */
+
+    /* An array of schema->n_indexes hmaps, each of which contains "struct
+     * ovsdb_row"s.  Each of the hmap_nodes in indexes[i] are at index 'i' at
+     * the end of struct ovsdb_row, following the 'fields' member. */
+    struct hmap *indexes;
 };
 
 struct ovsdb_table *ovsdb_table_create(struct ovsdb_table_schema *);
 };
 
 struct ovsdb_table *ovsdb_table_create(struct ovsdb_table_schema *);
index c07541ed3a049579d4db1325a98f48d8a77120d5..23d5eebe184259fda228ea1dd71b4b1558a3e4c6 100644 (file)
@@ -43,6 +43,11 @@ struct ovsdb_txn_table {
     struct ovsdb_table *table;
     struct hmap txn_rows;       /* Contains "struct ovsdb_txn_row"s. */
 
     struct ovsdb_table *table;
     struct hmap txn_rows;       /* Contains "struct ovsdb_txn_row"s. */
 
+    /* This has the same form as the 'indexes' member of struct ovsdb_table,
+     * but it is only used or updated at transaction commit time, from
+     * check_index_uniqueness(). */
+    struct hmap *txn_indexes;
+
     /* Used by for_each_txn_row(). */
     unsigned int serial;        /* Serial number of in-progress iteration. */
     unsigned int n_processed;   /* Number of rows processed. */
     /* Used by for_each_txn_row(). */
     unsigned int serial;        /* Serial number of in-progress iteration. */
     unsigned int n_processed;   /* Number of rows processed. */
@@ -134,6 +139,33 @@ ovsdb_txn_row_abort(struct ovsdb_txn *txn OVS_UNUSED,
     return NULL;
 }
 
     return NULL;
 }
 
+/* Returns the offset in bytes from the start of an ovsdb_row for 'table' to
+ * the hmap_node for the index numbered 'i'. */
+static size_t
+ovsdb_row_index_offset__(const struct ovsdb_table *table, size_t i)
+{
+    size_t n_fields = shash_count(&table->schema->columns);
+    return (offsetof(struct ovsdb_row, fields)
+            + n_fields * sizeof(struct ovsdb_datum)
+            + i * sizeof(struct hmap_node));
+}
+
+/* Returns the hmap_node in 'row' for the index numbered 'i'. */
+static struct hmap_node *
+ovsdb_row_get_index_node(struct ovsdb_row *row, size_t i)
+{
+    return (void *) ((char *) row + ovsdb_row_index_offset__(row->table, i));
+}
+
+/* Returns the ovsdb_row given 'index_node', which is a pointer to that row's
+ * hmap_node for the index numbered 'i' within 'table'. */
+static struct ovsdb_row *
+ovsdb_row_from_index_node(struct hmap_node *index_node,
+                          const struct ovsdb_table *table, size_t i)
+{
+    return (void *) ((char *) index_node - ovsdb_row_index_offset__(table, i));
+}
+
 void
 ovsdb_txn_abort(struct ovsdb_txn *txn)
 {
 void
 ovsdb_txn_abort(struct ovsdb_txn *txn)
 {
@@ -374,6 +406,25 @@ static struct ovsdb_error *
 ovsdb_txn_row_commit(struct ovsdb_txn *txn OVS_UNUSED,
                      struct ovsdb_txn_row *txn_row)
 {
 ovsdb_txn_row_commit(struct ovsdb_txn *txn OVS_UNUSED,
                      struct ovsdb_txn_row *txn_row)
 {
+    size_t n_indexes = txn_row->table->schema->n_indexes;
+
+    if (txn_row->old) {
+        size_t i;
+
+        for (i = 0; i < n_indexes; i++) {
+            struct hmap_node *node = ovsdb_row_get_index_node(txn_row->old, i);
+            hmap_remove(&txn_row->table->indexes[i], node);
+        }
+    }
+    if (txn_row->new) {
+        size_t i;
+
+        for (i = 0; i < n_indexes; i++) {
+            struct hmap_node *node = ovsdb_row_get_index_node(txn_row->new, i);
+            hmap_insert(&txn_row->table->indexes[i], node, node->hash);
+        }
+    }
+
     ovsdb_txn_row_prefree(txn_row);
     if (txn_row->new) {
         txn_row->new->n_refs = txn_row->n_refs;
     ovsdb_txn_row_prefree(txn_row);
     if (txn_row->new) {
         txn_row->new->n_refs = txn_row->n_refs;
@@ -574,6 +625,129 @@ check_max_rows(struct ovsdb_txn *txn)
     return NULL;
 }
 
     return NULL;
 }
 
+static struct ovsdb_row *
+ovsdb_index_search(struct hmap *index, struct ovsdb_row *row, size_t i,
+                   uint32_t hash)
+{
+    const struct ovsdb_table *table = row->table;
+    const struct ovsdb_column_set *columns = &table->schema->indexes[i];
+    struct hmap_node *node;
+
+    for (node = hmap_first_with_hash(index, hash); node;
+         node = hmap_next_with_hash(node)) {
+        struct ovsdb_row *irow = ovsdb_row_from_index_node(node, table, i);
+        if (ovsdb_row_equal_columns(row, irow, columns)) {
+            return irow;
+        }
+    }
+
+    return NULL;
+}
+
+static void
+duplicate_index_row__(const struct ovsdb_column_set *index,
+                      const struct ovsdb_row *row,
+                      const char *title,
+                      struct ds *out)
+{
+    size_t n_columns = shash_count(&row->table->schema->columns);
+
+    ds_put_format(out, "%s row, with UUID "UUID_FMT", ",
+                  title, UUID_ARGS(ovsdb_row_get_uuid(row)));
+    if (!row->txn_row
+        || bitmap_scan(row->txn_row->changed, 0, n_columns) == n_columns) {
+        ds_put_cstr(out, "existed in the database before this "
+                    "transaction and was not modified by the transaction.");
+    } else if (!row->txn_row->old) {
+        ds_put_cstr(out, "was inserted by this transaction.");
+    } else if (ovsdb_row_equal_columns(row->txn_row->old,
+                                       row->txn_row->new, index)) {
+        ds_put_cstr(out, "existed in the database before this "
+                    "transaction, which modified some of the row's columns "
+                    "but not any columns in this index.");
+    } else {
+        ds_put_cstr(out, "had the following index values before the "
+                    "transaction: ");
+        ovsdb_row_columns_to_string(row->txn_row->old, index, out);
+        ds_put_char(out, '.');
+    }
+}
+
+static struct ovsdb_error * WARN_UNUSED_RESULT
+duplicate_index_row(const struct ovsdb_column_set *index,
+                    const struct ovsdb_row *a,
+                    const struct ovsdb_row *b)
+{
+    struct ovsdb_column_set all_columns;
+    struct ovsdb_error *error;
+    char *index_s;
+    struct ds s;
+
+    /* Put 'a' and 'b' in a predictable order to make error messages
+     * reproducible for testing. */
+    ovsdb_column_set_init(&all_columns);
+    ovsdb_column_set_add_all(&all_columns, a->table);
+    if (ovsdb_row_compare_columns_3way(a, b, &all_columns) < 0) {
+        const struct ovsdb_row *tmp = a;
+        a = b;
+        b = tmp;
+    }
+    ovsdb_column_set_destroy(&all_columns);
+
+    index_s = ovsdb_column_set_to_string(index);
+
+    ds_init(&s);
+    ds_put_format(&s, "Transaction causes multiple rows in \"%s\" table to "
+                  "have identical values (", a->table->schema->name);
+    ovsdb_row_columns_to_string(a, index, &s);
+    ds_put_format(&s, ") for index on %s.  ", index_s);
+    duplicate_index_row__(index, a, "First", &s);
+    ds_put_cstr(&s, "  ");
+    duplicate_index_row__(index, b, "Second", &s);
+
+    free(index_s);
+
+    error = ovsdb_error("constraint violation", "%s", ds_cstr(&s));
+    ds_destroy(&s);
+    return error;
+}
+
+static struct ovsdb_error * WARN_UNUSED_RESULT
+check_index_uniqueness(struct ovsdb_txn *txn OVS_UNUSED,
+                       struct ovsdb_txn_row *txn_row)
+{
+    struct ovsdb_txn_table *txn_table = txn_row->table->txn_table;
+    struct ovsdb_table *table = txn_row->table;
+    struct ovsdb_row *row = txn_row->new;
+    size_t i;
+
+    if (!row) {
+        return NULL;
+    }
+
+    for (i = 0; i < table->schema->n_indexes; i++) {
+        const struct ovsdb_column_set *index = &table->schema->indexes[i];
+        struct ovsdb_row *irow;
+        uint32_t hash;
+
+        hash = ovsdb_row_hash_columns(row, index, 0);
+        irow = ovsdb_index_search(&txn_table->txn_indexes[i], row, i, hash);
+        if (irow) {
+            return duplicate_index_row(index, irow, row);
+        }
+
+        irow = ovsdb_index_search(&table->indexes[i], row, i, hash);
+        if (irow && !irow->txn_row) {
+            return duplicate_index_row(index, irow, row);
+        }
+
+        hmap_insert(&txn_table->txn_indexes[i],
+                    ovsdb_row_get_index_node(row, i), hash);
+    }
+
+    return NULL;
+}
+
 struct ovsdb_error *
 ovsdb_txn_commit(struct ovsdb_txn *txn, bool durable)
 {
 struct ovsdb_error *
 ovsdb_txn_commit(struct ovsdb_txn *txn, bool durable)
 {
@@ -612,7 +786,7 @@ ovsdb_txn_commit(struct ovsdb_txn *txn, bool durable)
         return error;
     }
 
         return error;
     }
 
-    /* Check reference counts and remove bad reference for "weak" referential
+    /* Check reference counts and remove bad references for "weak" referential
      * integrity. */
     error = for_each_txn_row(txn, assess_weak_refs);
     if (error) {
      * integrity. */
     error = for_each_txn_row(txn, assess_weak_refs);
     if (error) {
@@ -620,6 +794,13 @@ ovsdb_txn_commit(struct ovsdb_txn *txn, bool durable)
         return error;
     }
 
         return error;
     }
 
+    /* Verify that the indexes will still be unique post-transaction. */
+    error = for_each_txn_row(txn, check_index_uniqueness);
+    if (error) {
+        ovsdb_txn_abort(txn);
+        return error;
+    }
+
     /* Send the commit to each replica. */
     LIST_FOR_EACH (replica, node, &txn->db->replicas) {
         error = (replica->class->commit)(replica, txn, durable);
     /* Send the commit to each replica. */
     LIST_FOR_EACH (replica, node, &txn->db->replicas) {
         error = (replica->class->commit)(replica, txn, durable);
@@ -662,11 +843,17 @@ ovsdb_txn_create_txn_table(struct ovsdb_txn *txn, struct ovsdb_table *table)
 {
     if (!table->txn_table) {
         struct ovsdb_txn_table *txn_table;
 {
     if (!table->txn_table) {
         struct ovsdb_txn_table *txn_table;
+        size_t i;
 
         table->txn_table = txn_table = xmalloc(sizeof *table->txn_table);
         txn_table->table = table;
         hmap_init(&txn_table->txn_rows);
         txn_table->serial = serial - 1;
 
         table->txn_table = txn_table = xmalloc(sizeof *table->txn_table);
         txn_table->table = table;
         hmap_init(&txn_table->txn_rows);
         txn_table->serial = serial - 1;
+        txn_table->txn_indexes = xmalloc(table->schema->n_indexes
+                                         * sizeof *txn_table->txn_indexes);
+        for (i = 0; i < table->schema->n_indexes; i++) {
+            hmap_init(&txn_table->txn_indexes[i]);
+        }
         list_push_back(&txn->txn_tables, &txn_table->node);
     }
     return table->txn_table;
         list_push_back(&txn->txn_tables, &txn_table->node);
     }
     return table->txn_table;
@@ -798,7 +985,14 @@ ovsdb_txn_row_prefree(struct ovsdb_txn_row *txn_row)
 static void
 ovsdb_txn_table_destroy(struct ovsdb_txn_table *txn_table)
 {
 static void
 ovsdb_txn_table_destroy(struct ovsdb_txn_table *txn_table)
 {
+    size_t i;
+
     assert(hmap_is_empty(&txn_table->txn_rows));
     assert(hmap_is_empty(&txn_table->txn_rows));
+
+    for (i = 0; i < txn_table->table->schema->n_indexes; i++) {
+        hmap_destroy(&txn_table->txn_indexes[i]);
+    }
+
     txn_table->table->txn_table = NULL;
     hmap_destroy(&txn_table->txn_rows);
     list_remove(&txn_table->node);
     txn_table->table->txn_table = NULL;
     hmap_destroy(&txn_table->txn_rows);
     list_remove(&txn_table->node);
index 9883adc2d69262d926cae5374489e6502239bb69..9c453df2bdb7267ef4d67ddd14612f0e8bd93e2a 100644 (file)
@@ -134,14 +134,33 @@ class IdlSchema(DbSchema):
         return IdlSchema(schema.name, schema.version, schema.tables,
                          idlPrefix, idlHeader)
 
         return IdlSchema(schema.name, schema.version, schema.tables,
                          idlPrefix, idlHeader)
 
+def column_set_from_json(json, columns):
+    if json is None:
+        return tuple(columns)
+    elif type(json) != list:
+        raise error.Error("array of distinct column names expected", json)
+    else:
+        for column_name in json:
+            if type(column_name) not in [str, unicode]:
+                raise error.Error("array of distinct column names expected",
+                                  json)
+            elif column_name not in columns:
+                raise error.Error("%s is not a valid column name"
+                                  % column_name, json)
+        if len(set(json)) != len(json):
+            # Duplicate.
+            raise error.Error("array of distinct column names expected", json)
+        return tuple([columns[column_name] for column_name in json])
+
 class TableSchema(object):
     def __init__(self, name, columns, mutable=True, max_rows=sys.maxint,
 class TableSchema(object):
     def __init__(self, name, columns, mutable=True, max_rows=sys.maxint,
-                 is_root=True):
+                 is_root=True, indexes=[]):
         self.name = name
         self.columns = columns
         self.mutable = mutable
         self.max_rows = max_rows
         self.is_root = is_root
         self.name = name
         self.columns = columns
         self.mutable = mutable
         self.max_rows = max_rows
         self.is_root = is_root
+        self.indexes = indexes
 
     @staticmethod
     def from_json(json, name):
 
     @staticmethod
     def from_json(json, name):
@@ -150,6 +169,7 @@ class TableSchema(object):
         mutable = parser.get_optional("mutable", [bool], True)
         max_rows = parser.get_optional("maxRows", [int])
         is_root = parser.get_optional("isRoot", [bool], False)
         mutable = parser.get_optional("mutable", [bool], True)
         max_rows = parser.get_optional("maxRows", [int])
         is_root = parser.get_optional("isRoot", [bool], False)
+        indexes_json = parser.get_optional("indexes", [list], [])
         parser.finish()
 
         if max_rows == None:
         parser.finish()
 
         if max_rows == None:
@@ -170,7 +190,20 @@ class TableSchema(object):
             columns[columnName] = ColumnSchema.from_json(columnJson,
                                                          columnName)
 
             columns[columnName] = ColumnSchema.from_json(columnJson,
                                                          columnName)
 
-        return TableSchema(name, columns, mutable, max_rows, is_root)
+        indexes = []
+        for index_json in indexes_json:
+            index = column_set_from_json(index_json, columns)
+            if not index:
+                raise error.Error("index must have at least one column", json)
+            elif len(index) == 1:
+                index[0].unique = True
+            for column in index:
+                if not column.persistent:
+                    raise error.Error("ephemeral columns (such as %s) may "
+                                      "not be indexed" % column.name, json)
+            indexes.append(index)
+
+        return TableSchema(name, columns, mutable, max_rows, is_root, indexes)
 
     def to_json(self, default_is_root=False):
         """Returns this table schema serialized into JSON.
 
     def to_json(self, default_is_root=False):
         """Returns this table schema serialized into JSON.
@@ -198,6 +231,11 @@ class TableSchema(object):
         if self.max_rows != sys.maxint:
             json["maxRows"] = self.max_rows
 
         if self.max_rows != sys.maxint:
             json["maxRows"] = self.max_rows
 
+        if self.indexes:
+            json["indexes"] = []
+            for index in self.indexes:
+                json["indexes"].append([column.name for column in index])
+
         return json
 
 class ColumnSchema(object):
         return json
 
 class ColumnSchema(object):
@@ -206,6 +244,7 @@ class ColumnSchema(object):
         self.mutable = mutable
         self.persistent = persistent
         self.type = type
         self.mutable = mutable
         self.persistent = persistent
         self.type = type
+        self.unique = False
 
     @staticmethod
     def from_json(json, name):
 
     @staticmethod
     def from_json(json, name):
index 54ff0aeb2e597d542255eeadc042bfbb3bd82b53..90f20ff6caa79bba68e3075706d2905a13513ebe 100644 (file)
@@ -6,7 +6,8 @@ m4_define([ORDINAL_SCHEMA],
        "ordinals": {
          "columns": {
            "number": {"type": "integer"},
        "ordinals": {
          "columns": {
            "number": {"type": "integer"},
-           "name": {"type": "string"}}}},
+           "name": {"type": "string"}},
+         "indexes": [["number"]]}},
      "version": "5.1.3",
      "cksum": "12345678 9"}]])
 
      "version": "5.1.3",
      "cksum": "12345678 9"}]])
 
@@ -553,6 +554,88 @@ OVSDB_CHECK_EXECUTION([insert and update constraints],
 [{"uuid":["uuid","<1>"]},{"details":"transaction causes \"constrained\" table to contain 2 rows, greater than the schema-defined limit of 1 row(s)","error":"constraint violation"}]
 ]])
 
 [{"uuid":["uuid","<1>"]},{"details":"transaction causes \"constrained\" table to contain 2 rows, greater than the schema-defined limit of 1 row(s)","error":"constraint violation"}]
 ]])
 
+OVSDB_CHECK_EXECUTION([index uniqueness checking],
+  [ORDINAL_SCHEMA],
+dnl Insert initial row.
+  [[[["ordinals",
+      {"op": "insert",
+       "table": "ordinals",
+       "row": {"number": 1, "name": "one"}}]]],
+dnl Try to insert row with identical value (fails).
+   [[["ordinals",
+      {"op": "insert",
+       "table": "ordinals",
+       "row": {"number": 1, "name": "another one"}}]]],
+dnl Remove initial row and insert new row with identical value in a single
+dnl transaction (succeeds).
+   [[["ordinals",
+      {"op": "insert",
+       "table": "ordinals",
+       "row": {"number": 1, "name": "another one"}},
+      {"op": "delete",
+       "table": "ordinals",
+       "where": [["name", "==", "one"]]}]]],
+dnl Remove row and insert two new rows with identical value in a single
+dnl transaction (fails).
+   [[["ordinals",
+      {"op": "delete",
+       "table": "ordinals",
+       "where": []},
+      {"op": "insert",
+       "table": "ordinals",
+       "row": {"number": 1, "name": "one"}},
+      {"op": "insert",
+       "table": "ordinals",
+       "row": {"number": 1, "name": "still another one"}}]]],
+dnl Add new row with different value (succeeds).
+   [[["ordinals",
+      {"op": "insert",
+       "table": "ordinals",
+       "row": {"number": 2, "name": "two"}}]]],
+dnl Change rows so values collide (fails).
+   [[["ordinals",
+      {"op": "update",
+       "table": "ordinals",
+       "where": [],
+       "row": {"number": 3}}]]],
+dnl Swap rows' values (succeeds).
+   [[["ordinals",
+      {"op": "update",
+       "table": "ordinals",
+       "where": [["number", "==", 1]],
+       "row": {"number": 2, "name": "old two"}},
+      {"op": "update",
+       "table": "ordinals",
+       "where": [["name", "==", "two"]],
+       "row": {"number": 1, "name": "old one"}}]]],
+dnl Change all rows' values to values not used before and insert values that
+dnl collide (only) with their previous values (succeeds).
+   [[["ordinals",
+      {"op": "mutate",
+       "table": "ordinals",
+       "where": [],
+       "mutations": [["number", "*=", 10]]},
+      {"op": "insert",
+       "table": "ordinals",
+       "row": {"number": 1, "name": "new one"}},
+      {"op": "insert",
+       "table": "ordinals",
+       "row": {"number": 2, "name": "new two"}},
+      {"op": "select",
+       "table": "ordinals",
+       "where": [],
+       "columns": ["number", "name"],
+       "sort": ["number"]}]]]],
+  [[[{"uuid":["uuid","<0>"]}]
+[{"uuid":["uuid","<1>"]},{"details":"Transaction causes multiple rows in \"ordinals\" table to have identical values (1) for index on column \"number\".  First row, with UUID <0>, existed in the database before this transaction and was not modified by the transaction.  Second row, with UUID <1>, was inserted by this transaction.","error":"constraint violation"}]
+[{"uuid":["uuid","<2>"]},{"count":1}]
+[{"count":1},{"uuid":["uuid","<3>"]},{"uuid":["uuid","<4>"]},{"details":"Transaction causes multiple rows in \"ordinals\" table to have identical values (1) for index on column \"number\".  First row, with UUID <4>, was inserted by this transaction.  Second row, with UUID <3>, was inserted by this transaction.","error":"constraint violation"}]
+[{"uuid":["uuid","<5>"]}]
+[{"count":2},{"details":"Transaction causes multiple rows in \"ordinals\" table to have identical values (3) for index on column \"number\".  First row, with UUID <5>, had the following index values before the transaction: 2.  Second row, with UUID <2>, had the following index values before the transaction: 1.","error":"constraint violation"}]
+[{"count":1},{"count":1}]
+[{"count":2},{"uuid":["uuid","<6>"]},{"uuid":["uuid","<7>"]},{"rows":[{"name":"new one","number":1},{"name":"new two","number":2},{"name":"old one","number":10},{"name":"old two","number":20}]}]
+]])
+
 OVSDB_CHECK_EXECUTION([referential integrity -- simple],
   [CONSTRAINT_SCHEMA],
   [[[["constraints",
 OVSDB_CHECK_EXECUTION([referential integrity -- simple],
   [CONSTRAINT_SCHEMA],
   [[[["constraints",
index d7c0335efa2ec7a38d65716b38e2a71f124361ae..a7a2829e26f8280a57e3985ada6a459986ca39cc 100644 (file)
@@ -287,7 +287,7 @@ AT_CHECK(
   [0], [stdout], [ignore], [test ! -e pid || kill `cat pid`])
 dnl Check that all the crap is in fact in the database log.
 AT_CHECK([[perl $srcdir/uuidfilt.pl db | grep -v ^OVSDB | sed 's/"_date":[0-9]*/"_date":0/' | test-json --multiple -]], [0],
   [0], [stdout], [ignore], [test ! -e pid || kill `cat pid`])
 dnl Check that all the crap is in fact in the database log.
 AT_CHECK([[perl $srcdir/uuidfilt.pl db | grep -v ^OVSDB | sed 's/"_date":[0-9]*/"_date":0/' | test-json --multiple -]], [0],
-  [[{"cksum":"12345678 9","name":"ordinals","tables":{"ordinals":{"columns":{"name":{"type":"string"},"number":{"type":"integer"}}}},"version":"5.1.3"}
+  [[{"cksum":"12345678 9","name":"ordinals","tables":{"ordinals":{"columns":{"name":{"type":"string"},"number":{"type":"integer"}},"indexes":[["number"]]}},"version":"5.1.3"}
 {"_comment":"add row for zero 0","_date":0,"ordinals":{"<0>":{"name":"zero"}}}
 {"_comment":"delete row for 0","_date":0,"ordinals":{"<0>":null}}
 {"_comment":"add back row for zero 0","_date":0,"ordinals":{"<1>":{"name":"zero"}}}
 {"_comment":"add row for zero 0","_date":0,"ordinals":{"<0>":{"name":"zero"}}}
 {"_comment":"delete row for 0","_date":0,"ordinals":{"<0>":null}}
 {"_comment":"add back row for zero 0","_date":0,"ordinals":{"<1>":{"name":"zero"}}}
index cf206c4237b38d6216727181d24987ac8af08e8f..26ff90e7a28ef790bda06ebb7bf060f5591cfa96 100644 (file)
@@ -31,6 +31,31 @@ OVSDB_CHECK_POSITIVE_CPY([table with maxRows of 2],
                           "maxRows": 2}']],
   [[{"columns":{"name":{"type":"string"}},"maxRows":2}]])
 
                           "maxRows": 2}']],
   [[{"columns":{"name":{"type":"string"}},"maxRows":2}]])
 
+OVSDB_CHECK_POSITIVE_CPY([table with index],
+  [[parse-table mytable '{"columns": {"a": {"type": "integer"},
+                                      "b": {"type": "string"}},
+                          "indexes": [["b", "a"]]}']],
+  [[{"columns":{"a":{"type":"integer"},"b":{"type":"string"}},"indexes":[["b","a"]]}]])
+
+OVSDB_CHECK_NEGATIVE_CPY([table with syntax error in index],
+  [[parse-table mytable '{"columns": {"a": {"type": "integer"},
+                                      "b": {"type": "string"}},
+                          "indexes": [["b", "a"], [0]]}']],
+  [[array of distinct column names expected]])
+
+OVSDB_CHECK_NEGATIVE_CPY([table with empty index],
+  [[parse-table mytable '{"columns": {"a": {"type": "integer"},
+                                      "b": {"type": "string"}},
+                          "indexes": [[]]}']],
+  [[index must have at least one column]])
+
+OVSDB_CHECK_NEGATIVE_CPY([table with index of ephemeral column],
+  [[parse-table mytable '{"columns": {"a": {"type": "integer",
+                                            "ephemeral": true},
+                                      "b": {"type": "string"}},
+                          "indexes": [["b", "a"]]}']],
+  [[ephemeral columns (such as a) may not be indexed]])
+
 OVSDB_CHECK_NEGATIVE_CPY([column names may not begin with _],
   [[parse-table mytable \
     '{"columns": {"_column": {"type": "integer"}}}']],
 OVSDB_CHECK_NEGATIVE_CPY([column names may not begin with _],
   [[parse-table mytable \
     '{"columns": {"_column": {"type": "integer"}}}']],
index 989159dd582286260afc84f9d591cb6c7080129a..d48a0c1ec2bc89f4363ecc54be4f341407d89fb2 100644 (file)
@@ -83,7 +83,7 @@ AT_CHECK(
   [0], [stdout], [ignore])
 dnl Check that all the crap is in fact in the database log.
 AT_CHECK([[perl $srcdir/uuidfilt.pl db | grep -v ^OVSDB | sed 's/"_date":[0-9]*/"_date":0/' | test-json --multiple -]], [0],
   [0], [stdout], [ignore])
 dnl Check that all the crap is in fact in the database log.
 AT_CHECK([[perl $srcdir/uuidfilt.pl db | grep -v ^OVSDB | sed 's/"_date":[0-9]*/"_date":0/' | test-json --multiple -]], [0],
-  [[{"cksum":"12345678 9","name":"ordinals","tables":{"ordinals":{"columns":{"name":{"type":"string"},"number":{"type":"integer"}}}},"version":"5.1.3"}
+  [[{"cksum":"12345678 9","name":"ordinals","tables":{"ordinals":{"columns":{"name":{"type":"string"},"number":{"type":"integer"}},"indexes":[["number"]]}},"version":"5.1.3"}
 {"_comment":"add row for zero 0","_date":0,"ordinals":{"<0>":{"name":"zero"}}}
 {"_comment":"delete row for 0","_date":0,"ordinals":{"<0>":null}}
 {"_comment":"add back row for zero 0","_date":0,"ordinals":{"<1>":{"name":"zero"}}}
 {"_comment":"add row for zero 0","_date":0,"ordinals":{"<0>":{"name":"zero"}}}
 {"_comment":"delete row for 0","_date":0,"ordinals":{"<0>":null}}
 {"_comment":"add back row for zero 0","_date":0,"ordinals":{"<1>":{"name":"zero"}}}