From a8425c53c5785856cabe80295f0cea0135febdb6 Mon Sep 17 00:00:00 2001 From: Ben Pfaff Date: Mon, 16 Nov 2009 10:38:14 -0800 Subject: [PATCH] ovsdb: Monitor support. --- ovsdb/SPECS | 128 ++++++++++++ ovsdb/column.c | 13 ++ ovsdb/column.h | 1 + ovsdb/jsonrpc-server.c | 426 ++++++++++++++++++++++++++++++++++++++++ ovsdb/ovsdb-client.1.in | 24 ++- ovsdb/ovsdb-client.c | 222 ++++++++++++++++++++- tests/automake.mk | 1 + tests/ovsdb-monitor.at | 46 +++++ tests/ovsdb-server.at | 2 +- tests/ovsdb.at | 1 + 10 files changed, 857 insertions(+), 7 deletions(-) create mode 100644 tests/ovsdb-monitor.at diff --git a/ovsdb/SPECS b/ovsdb/SPECS index ae4d649b..97c9a780 100644 --- a/ovsdb/SPECS +++ b/ovsdb/SPECS @@ -281,6 +281,134 @@ form: The "cancel" notification itself has no reply. +monitor +....... + +Request object members: + + "method": "monitor" required + "params": [, ] required + "id": any JSON value except null required + + is an object that maps from a table name to a +. + +Each is an object with the following members: + + "columns": [*] optional + "select": optional + + is an object with the following members: + + "initial": optional + "insert": optional + "delete": optional + "modify": optional + +Response object members: + + "result": + "error": null + "id": same "id" as request + +This JSON-RPC request enables a client to replicate tables or subsets +of tables. Each specifies a table to be replicated. +The JSON-RPC response to the "monitor" includes the initial contents +of each table. Afterward, when changes to those tables are committed, +the changes are automatically sent to the client using the "update" +monitor notification. This monitoring persists until the JSON-RPC +session terminates or until the client sends a "monitor_cancel" +JSON-RPC request. + +Each describes how to monitor a table: + + The circumstances in which an "update" notification is sent for a + row within the table are determined by : + + If "initial" is omitted or true, every row in the table is + sent as part of the reply to the "monitor" request. + + If "insert" is omitted or true, "update" notifications are + sent for rows newly inserted into the table. + + If "delete" is omitted or true, "update" notifications are + sent for rows deleted from the table. + + If "modify" is omitted or true, "update" notifications are + sent whenever when a row in the table is modified. + + The "columns" member specifies the columns whose values are + monitored. If "columns" is omitted, all columns in the table, + except for "_uuid", are monitored. + +The "result" in the JSON-RPC response to the "monitor" request is a + object (see below) that contains the contents of the +tables for which "initial" rows are selected. If no tables' initial +contents are requested, then "result" is an empty object. + +update +...... + +Notification object members: + + "method": "update" + "params": [, ] + "id": null + +The in "params" is the same as the value passed as the +in "params" for the "monitor" request. + + is an object that maps from a table name to a +. + +A is an object that maps from the row's UUID (as a +36-byte string) to a object. + +A is an object with the following members: + + "old": present for "delete" and "modify" updates + "new": present for "initial", "insert", and "modify" updates + +This JSON-RPC notification is sent from the server to the client to +tell it about changes to a monitored table (or the initial state of a +modified table). Each table in which one or more rows has changed (or +whose initial view is being presented) is represented in "updates". +Each row that has changed (or whose initial view is being presented) +is represented in its as a member with its name taken +from the row's _uuid member. The corresponding value is a +: + + The "old" member is present for "delete" and "modify" updates. + For "delete" updates, each monitored column is included. For + "modify" updates, the prior value of each monitored column whose + value has changed is included (monitored columns that have not + changed are represented in "new"). + + The "new" member is present for "initial", "insert", and "modify" + updates. For "initial" and "insert" updates, each monitored + column is included. For "modify" updates, the new value of each + monitored column is included. + +monitor_cancel +.............. + +Request object members: + + "method": "monitor_cancel" required + "params": [] required + "id": any JSON value except null required + +Response object members: + + "result": {} + "error": null + "id": the request "id" member + +Cancels the ongoing table monitor request, identified by the +in "params" matching the in "params" for an ongoing "monitor" +request. No more "update" messages will be sent for this table +monitor. + echo .... diff --git a/ovsdb/column.c b/ovsdb/column.c index 1e8a2d09..fc21cdc9 100644 --- a/ovsdb/column.c +++ b/ovsdb/column.c @@ -174,6 +174,19 @@ error: "array of distinct column names expected"); } +struct json * +ovsdb_column_set_to_json(const struct ovsdb_column_set *set) +{ + struct json *json; + size_t i; + + json = json_array_create_empty(); + for (i = 0; i < set->n_columns; i++) { + json_array_add(json, json_string_create(set->columns[i]->name)); + } + return json; +} + void ovsdb_column_set_add(struct ovsdb_column_set *set, const struct ovsdb_column *column) diff --git a/ovsdb/column.h b/ovsdb/column.h index 59421510..5fd39ae1 100644 --- a/ovsdb/column.h +++ b/ovsdb/column.h @@ -71,6 +71,7 @@ void ovsdb_column_set_clone(struct ovsdb_column_set *, struct ovsdb_error *ovsdb_column_set_from_json(const struct json *, const struct ovsdb_table *, struct ovsdb_column_set *); +struct json *ovsdb_column_set_to_json(const struct ovsdb_column_set *); void ovsdb_column_set_add(struct ovsdb_column_set *, const struct ovsdb_column *); diff --git a/ovsdb/jsonrpc-server.c b/ovsdb/jsonrpc-server.c index 977d3024..fc8b194e 100644 --- a/ovsdb/jsonrpc-server.c +++ b/ovsdb/jsonrpc-server.c @@ -22,10 +22,15 @@ #include "column.h" #include "json.h" #include "jsonrpc.h" +#include "ovsdb-error.h" +#include "ovsdb-parser.h" #include "ovsdb.h" #include "reconnect.h" +#include "row.h" #include "stream.h" +#include "table.h" #include "timeval.h" +#include "transaction.h" #include "trigger.h" #define THIS_MODULE VLM_ovsdb_jsonrpc_server @@ -33,6 +38,7 @@ struct ovsdb_jsonrpc_session; +/* Sessions. */ static void ovsdb_jsonrpc_session_create_active(struct ovsdb_jsonrpc_server *, const char *name); static void ovsdb_jsonrpc_session_create_passive(struct ovsdb_jsonrpc_server *, @@ -40,6 +46,7 @@ static void ovsdb_jsonrpc_session_create_passive(struct ovsdb_jsonrpc_server *, static void ovsdb_jsonrpc_session_run_all(struct ovsdb_jsonrpc_server *); static void ovsdb_jsonrpc_session_wait_all(struct ovsdb_jsonrpc_server *); +/* Triggers. */ static void ovsdb_jsonrpc_trigger_create(struct ovsdb_jsonrpc_session *, struct json *id, struct json *params); static struct ovsdb_jsonrpc_trigger *ovsdb_jsonrpc_trigger_find( @@ -48,6 +55,15 @@ static void ovsdb_jsonrpc_trigger_complete(struct ovsdb_jsonrpc_trigger *); static void ovsdb_jsonrpc_trigger_complete_all(struct ovsdb_jsonrpc_session *); static void ovsdb_jsonrpc_trigger_complete_done( struct ovsdb_jsonrpc_session *); + +/* Monitors. */ +static struct json *ovsdb_jsonrpc_monitor_create( + struct ovsdb_jsonrpc_session *, struct json *params); +static struct jsonrpc_msg *ovsdb_jsonrpc_monitor_cancel( + struct ovsdb_jsonrpc_session *, + struct json_array *params, + const struct json *request_id); +static void ovsdb_jsonrpc_monitor_remove_all(struct ovsdb_jsonrpc_session *); /* JSON-RPC database server. */ @@ -151,6 +167,9 @@ struct ovsdb_jsonrpc_session { struct hmap triggers; /* Hmap of "struct ovsdb_jsonrpc_trigger"s. */ struct list completions; /* Completed triggers. */ + /* Monitors. */ + struct hmap monitors; /* Hmap of "struct ovsdb_jsonrpc_monitor"s. */ + /* Connecting and reconnecting. */ struct reconnect *reconnect; /* For back-off. */ bool active; /* Active or passive connection? */ @@ -177,6 +196,7 @@ ovsdb_jsonrpc_session_create(struct ovsdb_jsonrpc_server *svr, s->server = svr; list_push_back(&svr->sessions, &s->node); hmap_init(&s->triggers); + hmap_init(&s->monitors); list_init(&s->completions); s->reconnect = reconnect_create(time_msec()); reconnect_set_name(s->reconnect, name); @@ -221,6 +241,7 @@ ovsdb_jsonrpc_session_disconnect(struct ovsdb_jsonrpc_session *s) if (s->rpc) { jsonrpc_error(s->rpc, EOF); ovsdb_jsonrpc_trigger_complete_all(s); + ovsdb_jsonrpc_monitor_remove_all(s); jsonrpc_close(s->rpc); s->rpc = NULL; } else if (s->stream) { @@ -375,6 +396,12 @@ ovsdb_jsonrpc_session_got_request(struct ovsdb_jsonrpc_session *s, if (!strcmp(request->method, "transact")) { reply = execute_transaction(s, request); + } else if (!strcmp(request->method, "monitor")) { + reply = jsonrpc_create_reply( + ovsdb_jsonrpc_monitor_create(s, request->params), request->id); + } else if (!strcmp(request->method, "monitor_cancel")) { + reply = ovsdb_jsonrpc_monitor_cancel(s, json_array(request->params), + request->id); } else if (!strcmp(request->method, "get_schema")) { reply = jsonrpc_create_reply( ovsdb_schema_to_json(s->server->db->schema), request->id); @@ -522,3 +549,402 @@ ovsdb_jsonrpc_trigger_complete_done(struct ovsdb_jsonrpc_session *s) ovsdb_jsonrpc_trigger_complete(t); } } + +/* JSON-RPC database table monitors. */ + +enum ovsdb_jsonrpc_monitor_selection { + OJMS_INITIAL = 1 << 0, /* All rows when monitor is created. */ + OJMS_INSERT = 1 << 1, /* New rows. */ + OJMS_DELETE = 1 << 2, /* Deleted rows. */ + OJMS_MODIFY = 1 << 3 /* Modified rows. */ +}; + +struct ovsdb_jsonrpc_monitor_table { + const struct ovsdb_table *table; + enum ovsdb_jsonrpc_monitor_selection select; + struct ovsdb_column_set columns; +}; + +struct ovsdb_jsonrpc_monitor { + struct ovsdb_replica replica; + struct ovsdb_jsonrpc_session *session; + struct hmap_node node; /* In ovsdb_jsonrpc_session's "monitors". */ + + struct json *monitor_id; + struct shash tables; /* Holds "struct ovsdb_jsonrpc_monitor_table"s. */ +}; + +static const struct ovsdb_replica_class ovsdb_jsonrpc_replica_class; + +struct ovsdb_jsonrpc_monitor *ovsdb_jsonrpc_monitor_find( + struct ovsdb_jsonrpc_session *, const struct json *monitor_id); +static void ovsdb_jsonrpc_monitor_destroy(struct ovsdb_replica *); +static struct json *ovsdb_jsonrpc_monitor_get_initial( + const struct ovsdb_jsonrpc_monitor *); + +static bool +parse_bool(struct ovsdb_parser *parser, const char *name, bool default_value) +{ + const struct json *json; + + json = ovsdb_parser_member(parser, name, OP_BOOLEAN | OP_OPTIONAL); + return json ? json_boolean(json) : default_value; +} + +struct ovsdb_jsonrpc_monitor * +ovsdb_jsonrpc_monitor_find(struct ovsdb_jsonrpc_session *s, + const struct json *monitor_id) +{ + struct ovsdb_jsonrpc_monitor *m; + + HMAP_FOR_EACH_WITH_HASH (m, struct ovsdb_jsonrpc_monitor, node, + json_hash(monitor_id, 0), &s->monitors) { + if (json_equal(m->monitor_id, monitor_id)) { + return m; + } + } + + return NULL; +} + +static struct json * +ovsdb_jsonrpc_monitor_create(struct ovsdb_jsonrpc_session *s, + struct json *params) +{ + struct ovsdb_jsonrpc_monitor *m = NULL; + struct json *monitor_id, *monitor_requests; + struct ovsdb_error *error = NULL; + struct shash_node *node; + struct json *json; + + if (json_array(params)->n != 2) { + error = ovsdb_syntax_error(params, NULL, "invalid parameters"); + goto error; + } + monitor_id = params->u.array.elems[0]; + monitor_requests = params->u.array.elems[1]; + if (monitor_requests->type != JSON_OBJECT) { + error = ovsdb_syntax_error(monitor_requests, NULL, + "monitor-requests must be object"); + goto error; + } + + if (ovsdb_jsonrpc_monitor_find(s, monitor_id)) { + error = ovsdb_syntax_error(monitor_id, NULL, "duplicate monitor ID"); + goto error; + } + + m = xzalloc(sizeof *m); + ovsdb_replica_init(&m->replica, &ovsdb_jsonrpc_replica_class); + ovsdb_add_replica(s->server->db, &m->replica); + m->session = s; + hmap_insert(&s->monitors, &m->node, json_hash(monitor_id, 0)); + m->monitor_id = json_clone(monitor_id); + shash_init(&m->tables); + + SHASH_FOR_EACH (node, json_object(monitor_requests)) { + const struct ovsdb_table *table; + struct ovsdb_jsonrpc_monitor_table *mt; + const struct json *columns_json, *select_json; + struct ovsdb_parser parser; + + table = ovsdb_get_table(s->server->db, node->name); + if (!table) { + error = ovsdb_syntax_error(NULL, NULL, + "no table named %s", node->name); + goto error; + } + + mt = xzalloc(sizeof *mt); + mt->table = table; + mt->select = OJMS_INITIAL | OJMS_INSERT | OJMS_DELETE | OJMS_MODIFY; + ovsdb_column_set_init(&mt->columns); + shash_add(&m->tables, table->schema->name, mt); + + ovsdb_parser_init(&parser, node->data, "table %s", node->name); + columns_json = ovsdb_parser_member(&parser, "columns", + OP_ARRAY | OP_OPTIONAL); + select_json = ovsdb_parser_member(&parser, "select", + OP_OBJECT | OP_OPTIONAL); + error = ovsdb_parser_finish(&parser); + if (error) { + goto error; + } + + if (columns_json) { + error = ovsdb_column_set_from_json(columns_json, table, + &mt->columns); + if (error) { + goto error; + } + } else { + struct shash_node *node; + + SHASH_FOR_EACH (node, &table->schema->columns) { + const struct ovsdb_column *column = node->data; + if (column->index != OVSDB_COL_UUID) { + ovsdb_column_set_add(&mt->columns, column); + } + } + } + + if (select_json) { + mt->select = 0; + ovsdb_parser_init(&parser, select_json, "table %s select", + table->schema->name); + if (parse_bool(&parser, "initial", true)) { + mt->select |= OJMS_INITIAL; + } + if (parse_bool(&parser, "insert", true)) { + mt->select |= OJMS_INSERT; + } + if (parse_bool(&parser, "delete", true)) { + mt->select |= OJMS_DELETE; + } + if (parse_bool(&parser, "modify", true)) { + mt->select |= OJMS_MODIFY; + } + error = ovsdb_parser_finish(&parser); + if (error) { + goto error; + } + } + } + + return ovsdb_jsonrpc_monitor_get_initial(m); + +error: + ovsdb_remove_replica(s->server->db, &m->replica); + + json = ovsdb_error_to_json(error); + ovsdb_error_destroy(error); + return json; +} + +static struct jsonrpc_msg * +ovsdb_jsonrpc_monitor_cancel(struct ovsdb_jsonrpc_session *s, + struct json_array *params, + const struct json *request_id) +{ + if (params->n != 1) { + return jsonrpc_create_error(json_string_create("invalid parameters"), + request_id); + } else { + struct ovsdb_jsonrpc_monitor *m; + + m = ovsdb_jsonrpc_monitor_find(s, params->elems[0]); + if (!m) { + return jsonrpc_create_error(json_string_create("unknown monitor"), + request_id); + } else { + ovsdb_remove_replica(s->server->db, &m->replica); + return jsonrpc_create_reply(json_object_create(), request_id); + } + } +} + +static void +ovsdb_jsonrpc_monitor_remove_all(struct ovsdb_jsonrpc_session *s) +{ + struct ovsdb_jsonrpc_monitor *m, *next; + + HMAP_FOR_EACH_SAFE (m, next, + struct ovsdb_jsonrpc_monitor, node, &s->monitors) { + ovsdb_remove_replica(s->server->db, &m->replica); + } +} + +static struct ovsdb_jsonrpc_monitor * +ovsdb_jsonrpc_monitor_cast(struct ovsdb_replica *replica) +{ + assert(replica->class == &ovsdb_jsonrpc_replica_class); + return CONTAINER_OF(replica, struct ovsdb_jsonrpc_monitor, replica); +} + +struct ovsdb_jsonrpc_monitor_aux { + bool initial; /* Sending initial contents of table? */ + const struct ovsdb_jsonrpc_monitor *monitor; + struct json *json; /* JSON for the whole transaction. */ + + /* Current table. */ + struct ovsdb_jsonrpc_monitor_table *mt; + struct json *table_json; /* JSON for table's transaction. */ +}; + +static bool +ovsdb_jsonrpc_monitor_change_cb(const struct ovsdb_row *old, + const struct ovsdb_row *new, + void *aux_) +{ + struct ovsdb_jsonrpc_monitor_aux *aux = aux_; + const struct ovsdb_jsonrpc_monitor *m = aux->monitor; + struct ovsdb_table *table = new ? new->table : old->table; + enum ovsdb_jsonrpc_monitor_selection type; + struct json *old_json, *new_json; + struct json *row_json; + char uuid[UUID_LEN + 1]; + int n_changed; + size_t i; + + if (!aux->mt || table != aux->mt->table) { + aux->mt = shash_find_data(&m->tables, table->schema->name); + aux->table_json = NULL; + if (!aux->mt) { + /* We don't care about rows in this table at all. Tell the caller + * to skip it. */ + return false; + } + } + + type = (aux->initial ? OJMS_INITIAL + : !old ? OJMS_INSERT + : !new ? OJMS_DELETE + : OJMS_MODIFY); + if (!(aux->mt->select & type)) { + /* We don't care about this type of change (but do want to be called + * back for changes to other rows in the same table). */ + return true; + } + + old_json = new_json = NULL; + n_changed = 0; + for (i = 0; i < aux->mt->columns.n_columns; i++) { + const struct ovsdb_column *column = aux->mt->columns.columns[i]; + unsigned int idx = column->index; + bool changed = false; + + if (type == OJMS_MODIFY) { + changed = !ovsdb_datum_equals(&old->fields[idx], + &new->fields[idx], &column->type); + n_changed += changed; + } + if (changed || type == OJMS_DELETE) { + if (!old_json) { + old_json = json_object_create(); + } + json_object_put(old_json, column->name, + ovsdb_datum_to_json(&old->fields[idx], + &column->type)); + } + if (type & (OJMS_INITIAL | OJMS_INSERT | OJMS_MODIFY)) { + if (!new_json) { + new_json = json_object_create(); + } + json_object_put(new_json, column->name, + ovsdb_datum_to_json(&new->fields[idx], + &column->type)); + } + } + if ((type == OJMS_MODIFY && !n_changed) || (!old_json && !new_json)) { + /* No reportable changes. */ + json_destroy(old_json); + json_destroy(new_json); + return true; + } + + /* Create JSON object for transaction overall. */ + if (!aux->json) { + aux->json = json_object_create(); + } + + /* Create JSON object for transaction on this table. */ + if (!aux->table_json) { + aux->table_json = json_object_create(); + json_object_put(aux->json, aux->mt->table->schema->name, + aux->table_json); + } + + /* Create JSON object for transaction on this row. */ + row_json = json_object_create(); + if (old_json) { + json_object_put(row_json, "old", old_json); + } + if (new_json) { + json_object_put(row_json, "new", new_json); + } + + /* Add JSON row to JSON table. */ + snprintf(uuid, sizeof uuid, + UUID_FMT, UUID_ARGS(ovsdb_row_get_uuid(new ? new : old))); + json_object_put(aux->table_json, uuid, row_json); + + return true; +} + +static void +ovsdb_jsonrpc_monitor_init_aux(struct ovsdb_jsonrpc_monitor_aux *aux, + const struct ovsdb_jsonrpc_monitor *m, + bool initial) +{ + aux->initial = initial; + aux->monitor = m; + aux->json = NULL; + aux->mt = NULL; + aux->table_json = NULL; +} + +static struct ovsdb_error * +ovsdb_jsonrpc_monitor_commit(struct ovsdb_replica *replica, + const struct ovsdb_txn *txn, bool durable UNUSED) +{ + struct ovsdb_jsonrpc_monitor *m = ovsdb_jsonrpc_monitor_cast(replica); + struct ovsdb_jsonrpc_monitor_aux aux; + + ovsdb_jsonrpc_monitor_init_aux(&aux, m, false); + ovsdb_txn_for_each_change(txn, ovsdb_jsonrpc_monitor_change_cb, &aux); + if (aux.json) { + struct jsonrpc_msg *msg; + struct json *params; + + params = json_array_create_2(json_clone(aux.monitor->monitor_id), + aux.json); + msg = jsonrpc_create_notify("update", params); + jsonrpc_send(aux.monitor->session->rpc, msg); + } + + return NULL; +} + +static struct json * +ovsdb_jsonrpc_monitor_get_initial(const struct ovsdb_jsonrpc_monitor *m) +{ + struct ovsdb_jsonrpc_monitor_aux aux; + struct shash_node *node; + + ovsdb_jsonrpc_monitor_init_aux(&aux, m, true); + SHASH_FOR_EACH (node, &m->tables) { + struct ovsdb_jsonrpc_monitor_table *mt = node->data; + + if (mt->select & OJMS_INITIAL) { + struct ovsdb_row *row; + + HMAP_FOR_EACH (row, struct ovsdb_row, hmap_node, + &mt->table->rows) { + ovsdb_jsonrpc_monitor_change_cb(NULL, row, &aux); + } + } + } + return aux.json ? aux.json : json_object_create(); +} + +static void +ovsdb_jsonrpc_monitor_destroy(struct ovsdb_replica *replica) +{ + struct ovsdb_jsonrpc_monitor *m = ovsdb_jsonrpc_monitor_cast(replica); + struct shash_node *node; + + json_destroy(m->monitor_id); + SHASH_FOR_EACH (node, &m->tables) { + struct ovsdb_jsonrpc_monitor_table *mt = node->data; + ovsdb_column_set_destroy(&mt->columns); + free(mt); + } + shash_destroy(&m->tables); + hmap_remove(&m->session->monitors, &m->node); + free(m); +} + +static const struct ovsdb_replica_class ovsdb_jsonrpc_replica_class = { + ovsdb_jsonrpc_monitor_commit, + ovsdb_jsonrpc_monitor_destroy +}; diff --git a/ovsdb/ovsdb-client.1.in b/ovsdb/ovsdb-client.1.in index 0337c3dc..9825d327 100644 --- a/ovsdb/ovsdb-client.1.in +++ b/ovsdb/ovsdb-client.1.in @@ -20,6 +20,10 @@ ovsdb\-client \- command-line interface to \fBovsdb-server\fR(1) .br \fBovsdb\-client \fR[\fIoptions\fR] \fBtransact\fI server transaction\fR .br +\fBovsdb\-client \fR[\fIoptions\fR] \fBmonitor\fI server table\fR +[\fIcolumn\fR[\fB,\fIcolumn\fR]...] +[\fIselect\fR[\fB,\fIselect\fR]...] +.br \fBovsdb\-client help\fR .IP "Output formatting options:" [\fB--format=\fIformat\fR] @@ -62,10 +66,28 @@ Connects to \fIserver\fR, retrieves the database schema, and prints a table listing the names, type, and comment (if any) on each column. If \fItable\fR is specified, only columns in that table are listed; otherwise, the tables include columns in all tables. -.IP "\fBovsdb\-client \fR[\fIoptions\fR] \fBtransact\fI server transaction\fR" +. +.IP "\fBtransact\fI server transaction\fR" Connects to \fIserver\fR, sends it the specified \fItransaction\fR, which must be a JSON array containing one or more valid OVSDB operations, and prints the received reply on stdout. +. +.IP "\fBmonitor\fI server table\fR [\fIcolumn\fR[\fB,\fIcolumn\fR]...] [\fIselect\fR[\fB,\fIselect\fR]...]" +Connects to \fIserver\fR and monitors the contents of \fItable\fR. By +default, the initial contents of \fItable\fR are printed, followed by +each change as it occurs. If at least one \fIcolumn\fR is specified, +only those columns are monitored. If at least one \fIselect\fR is +specified, they are interpreted as follows: +.RS +.IP "\fBinitial\fR" +Print the initial contents of the specified columns. +.IP "\fBinsert\fR" +Print newly inserted rows. +.IP "\fBdelete\fR" +Print deleted rows. +.IP "\fBmodify\fR" +Print old and new values of modified rows. +.RE .SH OPTIONS .SS "Output Formatting Options" Much of the output from \fBovsdb\-client\fR is in the form of tables. diff --git a/ovsdb/ovsdb-client.c b/ovsdb/ovsdb-client.c index 249cafe5..6e006812 100644 --- a/ovsdb/ovsdb-client.c +++ b/ovsdb/ovsdb-client.c @@ -149,7 +149,10 @@ usage(void) " list columns in TABLE (or all tables) on SERVER\n" "\n transact SERVER TRANSACTION\n" " run TRANSACTION (a JSON array of operations) on SERVER\n" - " and print the results as JSON on stdout\n", + " and print the results as JSON on stdout\n" + "\n monitor SERVER TABLE [COLUMN,...] [SELECT,...]\n" + " monitor contents of (COLUMNs in) TABLE on SERVER\n" + " Valid SELECTs are: initial, insert, delete, modify\n", program_name, program_name); stream_usage("SERVER", true, true); printf("\nOutput formatting options:\n" @@ -227,14 +230,12 @@ check_ovsdb_error(struct ovsdb_error *error) } static struct ovsdb_schema * -fetch_schema(const char *server) +fetch_schema_from_rpc(struct jsonrpc *rpc) { struct jsonrpc_msg *request, *reply; struct ovsdb_schema *schema; - struct jsonrpc *rpc; int error; - rpc = open_jsonrpc(server); request = jsonrpc_create_request("get_schema", json_array_create_empty()); error = jsonrpc_transact_block(rpc, request, &reply); if (error) { @@ -242,6 +243,18 @@ fetch_schema(const char *server) } check_ovsdb_error(ovsdb_schema_from_json(reply->result, &schema)); jsonrpc_msg_destroy(reply); + + return schema; +} + +static struct ovsdb_schema * +fetch_schema(const char *server) +{ + struct ovsdb_schema *schema; + struct jsonrpc *rpc; + + rpc = open_jsonrpc(server); + schema = fetch_schema_from_rpc(rpc); jsonrpc_close(rpc); return schema; @@ -266,6 +279,22 @@ table_init(struct table *table) memset(table, 0, sizeof *table); } +static void +table_destroy(struct table *table) +{ + size_t i; + + for (i = 0; i < table->n_columns; i++) { + free(table->columns[i].heading); + } + free(table->columns); + + for (i = 0; i < table->n_columns * table->n_rows; i++) { + free(table->cells[i]); + } + free(table->cells); +} + static void table_add_column(struct table *table, const char *heading, ...) PRINTF_FORMAT(2, 3); @@ -590,7 +619,7 @@ do_list_columns(int argc UNUSED, char *argv[]) } static void -do_transact(int argc UNUSED, char *argv[] UNUSED) +do_transact(int argc UNUSED, char *argv[]) { struct jsonrpc_msg *request, *reply; struct json *transaction; @@ -615,6 +644,188 @@ do_transact(int argc UNUSED, char *argv[] UNUSED) jsonrpc_close(rpc); } +static void +monitor_print_row(struct json *row, const char *type, const char *uuid, + const struct ovsdb_column_set *columns, struct table *t) +{ + size_t i; + + if (!row) { + ovs_error(0, "missing %s row", type); + return; + } else if (row->type != JSON_OBJECT) { + ovs_error(0, " is not object"); + return; + } + + table_add_row(t); + table_add_cell(t, uuid); + table_add_cell(t, type); + for (i = 0; i < columns->n_columns; i++) { + const struct ovsdb_column *column = columns->columns[i]; + struct json *value = shash_find_data(json_object(row), column->name); + if (value) { + table_add_cell_nocopy(t, json_to_string(value, JSSF_SORT)); + } else { + table_add_cell(t, ""); + } + } +} + +static void +monitor_print(struct json *table_updates, + const struct ovsdb_table_schema *table, + const struct ovsdb_column_set *columns, bool initial) +{ + struct json *table_update; + struct shash_node *node; + struct table t; + size_t i; + + table_init(&t); + + if (table_updates->type != JSON_OBJECT) { + ovs_error(0, " is not object"); + return; + } + table_update = shash_find_data(json_object(table_updates), table->name); + if (!table_update) { + return; + } + if (table_update->type != JSON_OBJECT) { + ovs_error(0, " is not object"); + return; + } + + table_add_column(&t, "row"); + table_add_column(&t, "action"); + for (i = 0; i < columns->n_columns; i++) { + table_add_column(&t, "%s", columns->columns[i]->name); + } + SHASH_FOR_EACH (node, json_object(table_update)) { + struct json *row_update = node->data; + struct json *old, *new; + + if (row_update->type != JSON_OBJECT) { + ovs_error(0, " is not object"); + continue; + } + old = shash_find_data(json_object(row_update), "old"); + new = shash_find_data(json_object(row_update), "new"); + if (initial) { + monitor_print_row(new, "initial", node->name, columns, &t); + } else if (!old) { + monitor_print_row(new, "insert", node->name, columns, &t); + } else if (!new) { + monitor_print_row(old, "delete", node->name, columns, &t); + } else { + monitor_print_row(old, "old", node->name, columns, &t); + monitor_print_row(new, "new", "", columns, &t); + } + } + table_print(&t); + table_destroy(&t); +} + +static void +do_monitor(int argc, char *argv[]) +{ + struct ovsdb_column_set columns = OVSDB_COLUMN_SET_INITIALIZER; + struct ovsdb_table_schema *table; + struct ovsdb_schema *schema; + struct jsonrpc_msg *request; + struct jsonrpc *rpc; + struct json *select, *monitor, *monitor_request, *monitor_requests, + *request_id; + + rpc = open_jsonrpc(argv[1]); + + schema = fetch_schema_from_rpc(rpc); + table = shash_find_data(&schema->tables, argv[2]); + if (!table) { + ovs_fatal(0, "%s: no table named \"%s\"", argv[1], argv[2]); + } + + if (argc >= 4 && *argv[3] != '\0') { + char *save_ptr = NULL; + char *token; + + for (token = strtok_r(argv[3], ",", &save_ptr); token != NULL; + token = strtok_r(NULL, ",", &save_ptr)) { + const struct ovsdb_column *column; + column = ovsdb_table_schema_get_column(table, token); + if (!column) { + ovs_fatal(0, "%s: table \"%s\" does not have a " + "column named \"%s\"", argv[1], argv[2], token); + } + ovsdb_column_set_add(&columns, column); + } + } else { + struct shash_node *node; + + SHASH_FOR_EACH (node, &table->columns) { + const struct ovsdb_column *column = node->data; + if (column->index != OVSDB_COL_UUID) { + ovsdb_column_set_add(&columns, column); + } + } + } + + if (argc >= 5 && *argv[4] != '\0') { + char *save_ptr = NULL; + char *token; + + select = json_object_create(); + for (token = strtok_r(argv[4], ",", &save_ptr); token != NULL; + token = strtok_r(NULL, ",", &save_ptr)) { + json_object_put(select, token, json_boolean_create(true)); + } + } else { + select = NULL; + } + + monitor_request = json_object_create(); + json_object_put(monitor_request, + "columns", ovsdb_column_set_to_json(&columns)); + if (select) { + json_object_put(monitor_request, "select", select); + } + + monitor_requests = json_object_create(); + json_object_put(monitor_requests, argv[2], monitor_request); + + monitor = json_array_create_2(json_null_create(), monitor_requests); + request = jsonrpc_create_request("monitor", monitor); + request_id = json_clone(request->id); + jsonrpc_send(rpc, request); + for (;;) { + struct jsonrpc_msg *msg; + int error; + + error = jsonrpc_recv_block(rpc, &msg); + if (error) { + ovs_fatal(error, "%s: receive failed", argv[1]); + } + + if (msg->type == JSONRPC_REQUEST && !strcmp(msg->method, "echo")) { + jsonrpc_send(rpc, jsonrpc_create_reply(json_clone(msg->params), + msg->id)); + } else if (msg->type == JSONRPC_REPLY + && json_equal(msg->id, request_id)) { + monitor_print(msg->result, table, &columns, true); + } else if (msg->type == JSONRPC_NOTIFY + && !strcmp(msg->method, "update")) { + struct json *params = msg->params; + if (params->type == JSON_ARRAY + && params->u.array.n == 2 + && params->u.array.elems[0]->type == JSON_NULL) { + monitor_print(params->u.array.elems[1], + table, &columns, false); + } + } + } +} + static void do_help(int argc UNUSED, char *argv[] UNUSED) { @@ -626,6 +837,7 @@ static const struct command all_commands[] = { { "list-tables", 1, 1, do_list_tables }, { "list-columns", 1, 2, do_list_columns }, { "transact", 2, 2, do_transact }, + { "monitor", 2, 4, do_monitor }, { "help", 0, INT_MAX, do_help }, { NULL, 0, 0, NULL }, }; diff --git a/tests/automake.mk b/tests/automake.mk index ba2d95fd..b9d74836 100644 --- a/tests/automake.mk +++ b/tests/automake.mk @@ -30,6 +30,7 @@ TESTSUITE_AT = \ tests/ovsdb-trigger.at \ tests/ovsdb-file.at \ tests/ovsdb-server.at \ + tests/ovsdb-monitor.at \ tests/stp.at \ tests/ovs-vsctl.at \ tests/lcov-post.at diff --git a/tests/ovsdb-monitor.at b/tests/ovsdb-monitor.at new file mode 100644 index 00000000..cd62ca86 --- /dev/null +++ b/tests/ovsdb-monitor.at @@ -0,0 +1,46 @@ +AT_BANNER([OVSDB -- ovsdb-server monitors]) + +# OVSDB_CHECK_MONITOR(TITLE, SCHEMA, [PRE-MONITOR-TXN], MONITOR-ARGS, +# TRANSACTIONS, OUTPUT, [KEYWORDS]) +# +# Creates a database with the given SCHEMA, starts an ovsdb-server on +# that database, and runs each of the TRANSACTIONS (which should be a +# quoted list of quoted strings) against it with ovsdb-client one at a +# time. +# +# Checks that the overall output is OUTPUT, but UUIDs in the output +# are replaced by markers of the form where N is a number. The +# first unique UUID is replaced by <0>, the next by <1>, and so on. +# If a given UUID appears more than once it is always replaced by the +# same marker. +# +# TITLE is provided to AT_SETUP and KEYWORDS to AT_KEYWORDS. +m4_define([OVSDB_CHECK_MONITOR], + [AT_SETUP([$1]) + AT_KEYWORDS([ovsdb server monitor positive $7]) + AT_DATA([schema], [$2 +]) + OVS_CHECK_LCOV([ovsdb-tool create db schema], [0], [stdout], [ignore]) + m4_if([$3], [], [], + [OVS_CHECK_LCOV([ovsdb-tool transact db '$2'], [0], [ignore], [ignore])]) + AT_CHECK([ovsdb-server --detach --pidfile=$PWD/server-pid --listen=punix:socket --unixctl=$PWD/unixctl db]) + AT_CHECK([ovsdb-client monitor --format=csv unix:socket $4 > output & echo $! > monitor-pid], + [0], [ignore], [ignore], [kill `cat server-pid`]) + m4_foreach([txn], [$5], + [OVS_CHECK_LCOV([ovsdb-client transact unix:socket 'txn'], [0], + [ignore], [ignore], [kill `cat server-pid monitor-pid`])]) + AT_CHECK([ovs-appctl -t $PWD/unixctl -e exit], [0], [ignore], [ignore]) + wait + AT_CHECK([perl $srcdir/uuidfilt.pl output], [0], [$6]) + AT_CLEANUP]) + +OVSDB_CHECK_MONITOR([monitor initially empty table], + [ORDINAL_SCHEMA], + [], + [ordinals], + [[[[{"op": "insert", + "table": "ordinals", + "row": {"number": 0, "name": "zero"}}]]]], + [[row,action,name,number,_version +<0>,insert,"""zero""",0,"[""uuid"",""<1>""]" +]]) diff --git a/tests/ovsdb-server.at b/tests/ovsdb-server.at index 95f0e907..5d5eebdd 100644 --- a/tests/ovsdb-server.at +++ b/tests/ovsdb-server.at @@ -20,7 +20,7 @@ m4_define([OVSDB_CHECK_EXECUTION], AT_DATA([schema], [$2 ]) OVS_CHECK_LCOV([ovsdb-tool create db schema], [0], [stdout], [ignore]) - AT_CHECK([ovsdb-server --verbose --detach --pidfile=$PWD/pid --listen=punix:socket db]) + AT_CHECK([ovsdb-server --detach --pidfile=$PWD/pid --listen=punix:socket db]) m4_foreach([txn], [$3], [OVS_CHECK_LCOV([ovsdb-client transact unix:socket 'txn'], [0], [stdout], [ignore], [test ! -e pid || kill `cat pid`]) diff --git a/tests/ovsdb.at b/tests/ovsdb.at index bdd15f92..690e9987 100644 --- a/tests/ovsdb.at +++ b/tests/ovsdb.at @@ -48,3 +48,4 @@ m4_include([tests/ovsdb-execution.at]) m4_include([tests/ovsdb-trigger.at]) m4_include([tests/ovsdb-file.at]) m4_include([tests/ovsdb-server.at]) +m4_include([tests/ovsdb-monitor.at]) -- 2.30.2