ovsdb: Implement table uniqueness constraints ("indexes").
[openvswitch] / ovsdb / row.c
1 /* Copyright (c) 2009, 2010, 2011 Nicira Networks
2  *
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at:
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15
16 #include <config.h>
17
18 #include "row.h"
19
20 #include <assert.h>
21 #include <stddef.h>
22
23 #include "dynamic-string.h"
24 #include "json.h"
25 #include "ovsdb-error.h"
26 #include "shash.h"
27 #include "sort.h"
28 #include "table.h"
29
30 static struct ovsdb_row *
31 allocate_row(const struct ovsdb_table *table)
32 {
33     size_t n_fields = shash_count(&table->schema->columns);
34     size_t n_indexes = table->schema->n_indexes;
35     size_t row_size = (offsetof(struct ovsdb_row, fields)
36                        + sizeof(struct ovsdb_datum) * n_fields
37                        + sizeof(struct hmap_node) * n_indexes);
38     struct ovsdb_row *row = xmalloc(row_size);
39     row->table = (struct ovsdb_table *) table;
40     row->txn_row = NULL;
41     list_init(&row->src_refs);
42     list_init(&row->dst_refs);
43     row->n_refs = 0;
44     return row;
45 }
46
47 struct ovsdb_row *
48 ovsdb_row_create(const struct ovsdb_table *table)
49 {
50     struct shash_node *node;
51     struct ovsdb_row *row;
52
53     row = allocate_row(table);
54     SHASH_FOR_EACH (node, &table->schema->columns) {
55         const struct ovsdb_column *column = node->data;
56         ovsdb_datum_init_default(&row->fields[column->index], &column->type);
57     }
58     return row;
59 }
60
61 struct ovsdb_row *
62 ovsdb_row_clone(const struct ovsdb_row *old)
63 {
64     const struct ovsdb_table *table = old->table;
65     const struct shash_node *node;
66     struct ovsdb_row *new;
67
68     new = allocate_row(table);
69     SHASH_FOR_EACH (node, &table->schema->columns) {
70         const struct ovsdb_column *column = node->data;
71         ovsdb_datum_clone(&new->fields[column->index],
72                           &old->fields[column->index],
73                           &column->type);
74     }
75     return new;
76 }
77
78 /* The caller is responsible for ensuring that 'row' has been removed from its
79  * table and that it is not participating in a transaction. */
80 void
81 ovsdb_row_destroy(struct ovsdb_row *row)
82 {
83     if (row) {
84         const struct ovsdb_table *table = row->table;
85         struct ovsdb_weak_ref *weak, *next;
86         const struct shash_node *node;
87
88         LIST_FOR_EACH_SAFE (weak, next, dst_node, &row->dst_refs) {
89             list_remove(&weak->src_node);
90             list_remove(&weak->dst_node);
91             free(weak);
92         }
93
94         LIST_FOR_EACH_SAFE (weak, next, src_node, &row->src_refs) {
95             list_remove(&weak->src_node);
96             list_remove(&weak->dst_node);
97             free(weak);
98         }
99
100         SHASH_FOR_EACH (node, &table->schema->columns) {
101             const struct ovsdb_column *column = node->data;
102             ovsdb_datum_destroy(&row->fields[column->index], &column->type);
103         }
104         free(row);
105     }
106 }
107
108 uint32_t
109 ovsdb_row_hash_columns(const struct ovsdb_row *row,
110                        const struct ovsdb_column_set *columns,
111                        uint32_t basis)
112 {
113     size_t i;
114
115     for (i = 0; i < columns->n_columns; i++) {
116         const struct ovsdb_column *column = columns->columns[i];
117         basis = ovsdb_datum_hash(&row->fields[column->index], &column->type,
118                                  basis);
119     }
120
121     return basis;
122 }
123
124 int
125 ovsdb_row_compare_columns_3way(const struct ovsdb_row *a,
126                                const struct ovsdb_row *b,
127                                const struct ovsdb_column_set *columns)
128 {
129     size_t i;
130
131     for (i = 0; i < columns->n_columns; i++) {
132         const struct ovsdb_column *column = columns->columns[i];
133         int cmp = ovsdb_datum_compare_3way(&a->fields[column->index],
134                                            &b->fields[column->index],
135                                            &column->type);
136         if (cmp) {
137             return cmp;
138         }
139     }
140
141     return 0;
142 }
143
144 bool
145 ovsdb_row_equal_columns(const struct ovsdb_row *a,
146                         const struct ovsdb_row *b,
147                         const struct ovsdb_column_set *columns)
148 {
149     size_t i;
150
151     for (i = 0; i < columns->n_columns; i++) {
152         const struct ovsdb_column *column = columns->columns[i];
153         if (!ovsdb_datum_equals(&a->fields[column->index],
154                                 &b->fields[column->index],
155                                 &column->type)) {
156             return false;
157         }
158     }
159
160     return true;
161 }
162
163 void
164 ovsdb_row_update_columns(struct ovsdb_row *dst,
165                          const struct ovsdb_row *src,
166                          const struct ovsdb_column_set *columns)
167 {
168     size_t i;
169
170     for (i = 0; i < columns->n_columns; i++) {
171         const struct ovsdb_column *column = columns->columns[i];
172         ovsdb_datum_destroy(&dst->fields[column->index], &column->type);
173         ovsdb_datum_clone(&dst->fields[column->index],
174                           &src->fields[column->index],
175                           &column->type);
176     }
177 }
178
179 /* Appends the string form of the value in 'row' of each of the columns in
180  * 'columns' to 'out', e.g. "1, \"xyz\", and [1, 2, 3]". */
181 void
182 ovsdb_row_columns_to_string(const struct ovsdb_row *row,
183                             const struct ovsdb_column_set *columns,
184                             struct ds *out)
185 {
186     size_t i;
187
188     for (i = 0; i < columns->n_columns; i++) {
189         const struct ovsdb_column *column = columns->columns[i];
190
191         ds_put_cstr(out, english_list_delimiter(i, columns->n_columns));
192         ovsdb_datum_to_string(&row->fields[column->index], &column->type, out);
193     }
194 }
195
196 struct ovsdb_error *
197 ovsdb_row_from_json(struct ovsdb_row *row, const struct json *json,
198                     struct ovsdb_symbol_table *symtab,
199                     struct ovsdb_column_set *included)
200 {
201     struct ovsdb_table_schema *schema = row->table->schema;
202     struct ovsdb_error *error;
203     struct shash_node *node;
204
205     if (json->type != JSON_OBJECT) {
206         return ovsdb_syntax_error(json, NULL, "row must be JSON object");
207     }
208
209     SHASH_FOR_EACH (node, json_object(json)) {
210         const char *column_name = node->name;
211         const struct ovsdb_column *column;
212         struct ovsdb_datum datum;
213
214         column = ovsdb_table_schema_get_column(schema, column_name);
215         if (!column) {
216             return ovsdb_syntax_error(json, "unknown column",
217                                       "No column %s in table %s.",
218                                       column_name, schema->name);
219         }
220
221         error = ovsdb_datum_from_json(&datum, &column->type, node->data,
222                                       symtab);
223         if (error) {
224             return error;
225         }
226         ovsdb_datum_swap(&row->fields[column->index], &datum);
227         ovsdb_datum_destroy(&datum, &column->type);
228         if (included) {
229             ovsdb_column_set_add(included, column);
230         }
231     }
232
233     return NULL;
234 }
235
236 static void
237 put_json_column(struct json *object, const struct ovsdb_row *row,
238                 const struct ovsdb_column *column)
239 {
240     json_object_put(object, column->name,
241                     ovsdb_datum_to_json(&row->fields[column->index],
242                                         &column->type));
243 }
244
245 struct json *
246 ovsdb_row_to_json(const struct ovsdb_row *row,
247                   const struct ovsdb_column_set *columns)
248 {
249     struct json *json;
250     size_t i;
251
252     json = json_object_create();
253     for (i = 0; i < columns->n_columns; i++) {
254         put_json_column(json, row, columns->columns[i]);
255     }
256     return json;
257 }
258 \f
259 void
260 ovsdb_row_set_init(struct ovsdb_row_set *set)
261 {
262     set->rows = NULL;
263     set->n_rows = set->allocated_rows = 0;
264 }
265
266 void
267 ovsdb_row_set_destroy(struct ovsdb_row_set *set)
268 {
269     free(set->rows);
270 }
271
272 void
273 ovsdb_row_set_add_row(struct ovsdb_row_set *set, const struct ovsdb_row *row)
274 {
275     if (set->n_rows >= set->allocated_rows) {
276         set->rows = x2nrealloc(set->rows, &set->allocated_rows,
277                                sizeof *set->rows);
278     }
279     set->rows[set->n_rows++] = row;
280 }
281
282 struct json *
283 ovsdb_row_set_to_json(const struct ovsdb_row_set *rows,
284                       const struct ovsdb_column_set *columns)
285 {
286     struct json **json_rows;
287     size_t i;
288
289     json_rows = xmalloc(rows->n_rows * sizeof *json_rows);
290     for (i = 0; i < rows->n_rows; i++) {
291         json_rows[i] = ovsdb_row_to_json(rows->rows[i], columns);
292     }
293     return json_array_create(json_rows, rows->n_rows);
294 }
295
296 struct ovsdb_row_set_sort_cbdata {
297     struct ovsdb_row_set *set;
298     const struct ovsdb_column_set *columns;
299 };
300
301 static int
302 ovsdb_row_set_sort_compare_cb(size_t a, size_t b, void *cbdata_)
303 {
304     struct ovsdb_row_set_sort_cbdata *cbdata = cbdata_;
305     return ovsdb_row_compare_columns_3way(cbdata->set->rows[a],
306                                           cbdata->set->rows[b],
307                                           cbdata->columns);
308 }
309
310 static void
311 ovsdb_row_set_sort_swap_cb(size_t a, size_t b, void *cbdata_)
312 {
313     struct ovsdb_row_set_sort_cbdata *cbdata = cbdata_;
314     const struct ovsdb_row *tmp = cbdata->set->rows[a];
315     cbdata->set->rows[a] = cbdata->set->rows[b];
316     cbdata->set->rows[b] = tmp;
317 }
318
319 void
320 ovsdb_row_set_sort(struct ovsdb_row_set *set,
321                    const struct ovsdb_column_set *columns)
322 {
323     if (columns && columns->n_columns && set->n_rows > 1) {
324         struct ovsdb_row_set_sort_cbdata cbdata;
325         cbdata.set = set;
326         cbdata.columns = columns;
327         sort(set->n_rows,
328              ovsdb_row_set_sort_compare_cb,
329              ovsdb_row_set_sort_swap_cb,
330              &cbdata);
331     }
332 }
333 \f
334 void
335 ovsdb_row_hash_init(struct ovsdb_row_hash *rh,
336                     const struct ovsdb_column_set *columns)
337 {
338     hmap_init(&rh->rows);
339     ovsdb_column_set_clone(&rh->columns, columns);
340 }
341
342 void
343 ovsdb_row_hash_destroy(struct ovsdb_row_hash *rh, bool destroy_rows)
344 {
345     struct ovsdb_row_hash_node *node, *next;
346
347     HMAP_FOR_EACH_SAFE (node, next, hmap_node, &rh->rows) {
348         hmap_remove(&rh->rows, &node->hmap_node);
349         if (destroy_rows) {
350             ovsdb_row_destroy((struct ovsdb_row *) node->row);
351         }
352         free(node);
353     }
354     hmap_destroy(&rh->rows);
355     ovsdb_column_set_destroy(&rh->columns);
356 }
357
358 size_t
359 ovsdb_row_hash_count(const struct ovsdb_row_hash *rh)
360 {
361     return hmap_count(&rh->rows);
362 }
363
364 bool
365 ovsdb_row_hash_contains(const struct ovsdb_row_hash *rh,
366                         const struct ovsdb_row *row)
367 {
368     size_t hash = ovsdb_row_hash_columns(row, &rh->columns, 0);
369     return ovsdb_row_hash_contains__(rh, row, hash);
370 }
371
372 /* Returns true if every row in 'b' has an equal row in 'a'. */
373 bool
374 ovsdb_row_hash_contains_all(const struct ovsdb_row_hash *a,
375                             const struct ovsdb_row_hash *b)
376 {
377     struct ovsdb_row_hash_node *node;
378
379     assert(ovsdb_column_set_equals(&a->columns, &b->columns));
380     HMAP_FOR_EACH (node, hmap_node, &b->rows) {
381         if (!ovsdb_row_hash_contains__(a, node->row, node->hmap_node.hash)) {
382             return false;
383         }
384     }
385     return true;
386 }
387
388 bool
389 ovsdb_row_hash_insert(struct ovsdb_row_hash *rh, const struct ovsdb_row *row)
390 {
391     size_t hash = ovsdb_row_hash_columns(row, &rh->columns, 0);
392     return ovsdb_row_hash_insert__(rh, row, hash);
393 }
394
395 bool
396 ovsdb_row_hash_contains__(const struct ovsdb_row_hash *rh,
397                           const struct ovsdb_row *row, size_t hash)
398 {
399     struct ovsdb_row_hash_node *node;
400     HMAP_FOR_EACH_WITH_HASH (node, hmap_node, hash, &rh->rows) {
401         if (ovsdb_row_equal_columns(row, node->row, &rh->columns)) {
402             return true;
403         }
404     }
405     return false;
406 }
407
408 bool
409 ovsdb_row_hash_insert__(struct ovsdb_row_hash *rh, const struct ovsdb_row *row,
410                         size_t hash)
411 {
412     if (!ovsdb_row_hash_contains__(rh, row, hash)) {
413         struct ovsdb_row_hash_node *node = xmalloc(sizeof *node);
414         node->row = row;
415         hmap_insert(&rh->rows, &node->hmap_node, hash);
416         return true;
417     } else {
418         return false;
419     }
420 }