unsigned int last_monitor_request_seqno;
unsigned int change_seqno;
+ /* Database locking. */
+ char *lock_name; /* Name of lock we need, NULL if none. */
+ bool has_lock; /* Has db server told us we have the lock? */
+ bool is_lock_contended; /* Has db server told us we can't get lock? */
+ struct json *lock_request_id; /* JSON-RPC ID of in-flight lock request. */
+
/* Transaction support. */
struct ovsdb_idl_txn *txn;
struct hmap outstanding_txns;
static bool ovsdb_idl_txn_process_reply(struct ovsdb_idl *,
const struct jsonrpc_msg *msg);
+static void ovsdb_idl_send_lock_request(struct ovsdb_idl *);
+static void ovsdb_idl_send_unlock_request(struct ovsdb_idl *);
+static void ovsdb_idl_parse_lock_reply(struct ovsdb_idl *,
+ const struct json *);
+static void ovsdb_idl_parse_lock_notify(struct ovsdb_idl *,
+ const struct json *params,
+ bool new_has_lock);
+
/* Creates and returns a connection to database 'remote', which should be in a
* form acceptable to jsonrpc_session_open(). The connection will maintain an
* in-memory replica of the remote database whose schema is described by
shash_destroy(&idl->table_by_name);
free(idl->tables);
json_destroy(idl->monitor_request_id);
+ free(idl->lock_name);
+ json_destroy(idl->lock_request_id);
free(idl);
}
}
/* Processes a batch of messages from the database server on 'idl'. Returns
* true if the database as seen through 'idl' changed, false if it did not
* change. The initial fetch of the entire contents of the remote database is
- * considered to be one kind of change.
+ * considered to be one kind of change. If 'idl' has been configured to
+ * acquire a database lock (with ovsdb_idl_set_lock()), then successfully
+ * acquiring the lock is also considered to be a change.
*
* When this function returns false, the client may continue to use any data
* structures it obtained from 'idl' in the past. But when it returns true,
idl->last_monitor_request_seqno = seqno;
ovsdb_idl_txn_abort_all(idl);
ovsdb_idl_send_monitor_request(idl);
+ if (idl->lock_name) {
+ ovsdb_idl_send_lock_request(idl);
+ }
break;
}
&& msg->params->type == JSON_ARRAY
&& msg->params->u.array.n == 2
&& msg->params->u.array.elems[0]->type == JSON_NULL) {
+ /* Database contents changed. */
ovsdb_idl_parse_update(idl, msg->params->u.array.elems[1]);
} else if (msg->type == JSONRPC_REPLY
&& idl->monitor_request_id
&& json_equal(idl->monitor_request_id, msg->id)) {
+ /* Reply to our "monitor" request. */
idl->change_seqno++;
json_destroy(idl->monitor_request_id);
idl->monitor_request_id = NULL;
ovsdb_idl_clear(idl);
ovsdb_idl_parse_update(idl, msg->result);
+ } else if (msg->type == JSONRPC_REPLY
+ && idl->lock_request_id
+ && json_equal(idl->lock_request_id, msg->id)) {
+ /* Reply to our "lock" request. */
+ ovsdb_idl_parse_lock_reply(idl, msg->result);
+ } else if (msg->type == JSONRPC_NOTIFY
+ && !strcmp(msg->method, "locked")) {
+ /* We got our lock. */
+ ovsdb_idl_parse_lock_notify(idl, msg->params, true);
+ } else if (msg->type == JSONRPC_NOTIFY
+ && !strcmp(msg->method, "stolen")) {
+ /* Someone else stole our lock. */
+ ovsdb_idl_parse_lock_notify(idl, msg->params, false);
} else if (msg->type == JSONRPC_REPLY && msg->id->type == JSON_STRING
&& !strcmp(msg->id->u.string, "echo")) {
- /* It's a reply to our echo request. Ignore it. */
+ /* Reply to our echo request. Ignore it. */
} else if ((msg->type == JSONRPC_ERROR
|| msg->type == JSONRPC_REPLY)
&& ovsdb_idl_txn_process_reply(idl, msg)) {
return "success";
case TXN_TRY_AGAIN:
return "try again";
+ case TXN_NOT_LOCKED:
+ return "not locked";
case TXN_ERROR:
return "error";
}
return txn->status;
}
+ /* If we need a lock but don't have it, give up quickly. */
+ if (txn->idl->lock_name && !ovsdb_idl_has_lock(txn->idl)) {
+ txn->status = TXN_NOT_LOCKED;
+ ovsdb_idl_txn_disassemble(txn);
+ return txn->status;
+ }
+
operations = json_array_create_1(
json_string_create(txn->idl->class->database));
+ /* Assert that we have the required lock (avoiding a race). */
+ if (txn->idl->lock_name) {
+ struct json *op = json_object_create();
+ json_array_add(operations, op);
+ json_object_put_string(op, "op", "assert");
+ json_object_put_string(op, "lock", txn->idl->lock_name);
+ }
+
/* Add prerequisites and declarations of new rows. */
HMAP_FOR_EACH (row, txn_node, &txn->txn_rows) {
/* XXX check that deleted rows exist even if no prereqs? */
struct json_array *ops = &msg->result->u.array;
int hard_errors = 0;
int soft_errors = 0;
+ int lock_errors = 0;
size_t i;
for (i = 0; i < ops->n; i++) {
if (error->type == JSON_STRING) {
if (!strcmp(error->u.string, "timed out")) {
soft_errors++;
+ } else if (!strcmp(error->u.string, "not owner")) {
+ lock_errors++;
} else if (strcmp(error->u.string, "aborted")) {
hard_errors++;
ovsdb_idl_txn_set_error_json(txn, op);
}
}
- if (!soft_errors && !hard_errors) {
+ if (!soft_errors && !hard_errors && !lock_errors) {
struct ovsdb_idl_txn_insert *insert;
if (txn->inc_table && !ovsdb_idl_txn_process_inc_reply(txn, ops)) {
}
status = (hard_errors ? TXN_ERROR
+ : lock_errors ? TXN_NOT_LOCKED
: soft_errors ? TXN_TRY_AGAIN
: TXN_SUCCESS);
}
{
return txn->idl;
}
+\f
+/* If 'lock_name' is nonnull, configures 'idl' to obtain the named lock from
+ * the database server and to avoid modifying the database when the lock cannot
+ * be acquired (that is, when another client has the same lock).
+ *
+ * If 'lock_name' is NULL, drops the locking requirement and releases the
+ * lock. */
+void
+ovsdb_idl_set_lock(struct ovsdb_idl *idl, const char *lock_name)
+{
+ assert(!idl->txn);
+ assert(hmap_is_empty(&idl->outstanding_txns));
+
+ if (idl->lock_name && (!lock_name || strcmp(lock_name, idl->lock_name))) {
+ /* Release previous lock. */
+ ovsdb_idl_send_unlock_request(idl);
+ free(idl->lock_name);
+ idl->lock_name = NULL;
+ idl->is_lock_contended = false;
+ }
+
+ if (lock_name && !idl->lock_name) {
+ /* Acquire new lock. */
+ idl->lock_name = xstrdup(lock_name);
+ ovsdb_idl_send_lock_request(idl);
+ }
+}
+
+/* Returns true if 'idl' is configured to obtain a lock and owns that lock.
+ *
+ * Locking and unlocking happens asynchronously from the database client's
+ * point of view, so the information is only useful for optimization (e.g. if
+ * the client doesn't have the lock then there's no point in trying to write to
+ * the database). */
+bool
+ovsdb_idl_has_lock(const struct ovsdb_idl *idl)
+{
+ return idl->has_lock;
+}
+
+/* Returns true if 'idl' is configured to obtain a lock but the database server
+ * has indicated that some other client already owns the requested lock. */
+bool
+ovsdb_idl_is_lock_contended(const struct ovsdb_idl *idl)
+{
+ return idl->is_lock_contended;
+}
+
+static void
+ovsdb_idl_update_has_lock(struct ovsdb_idl *idl, bool new_has_lock)
+{
+ if (new_has_lock && !idl->has_lock) {
+ if (!idl->monitor_request_id) {
+ idl->change_seqno++;
+ } else {
+ /* We're waiting for a monitor reply, so don't signal that the
+ * database changed. The monitor reply will increment change_seqno
+ * anyhow. */
+ }
+ idl->is_lock_contended = false;
+ }
+ idl->has_lock = new_has_lock;
+}
+
+static void
+ovsdb_idl_send_lock_request__(struct ovsdb_idl *idl, const char *method,
+ struct json **idp)
+{
+ ovsdb_idl_update_has_lock(idl, false);
+
+ json_destroy(idl->lock_request_id);
+ idl->lock_request_id = NULL;
+
+ if (jsonrpc_session_is_connected(idl->session)) {
+ struct json *params;
+
+ params = json_array_create_1(json_string_create(idl->lock_name));
+ jsonrpc_session_send(idl->session,
+ jsonrpc_create_request(method, params, idp));
+ }
+}
+static void
+ovsdb_idl_send_lock_request(struct ovsdb_idl *idl)
+{
+ ovsdb_idl_send_lock_request__(idl, "lock", &idl->lock_request_id);
+}
+
+static void
+ovsdb_idl_send_unlock_request(struct ovsdb_idl *idl)
+{
+ ovsdb_idl_send_lock_request__(idl, "unlock", NULL);
+}
+
+static void
+ovsdb_idl_parse_lock_reply(struct ovsdb_idl *idl, const struct json *result)
+{
+ bool got_lock;
+
+ json_destroy(idl->lock_request_id);
+ idl->lock_request_id = NULL;
+
+ if (result->type == JSON_OBJECT) {
+ const struct json *locked;
+
+ locked = shash_find_data(json_object(result), "locked");
+ got_lock = locked && locked->type == JSON_TRUE;
+ } else {
+ got_lock = false;
+ }
+
+ ovsdb_idl_update_has_lock(idl, got_lock);
+ if (!got_lock) {
+ idl->is_lock_contended = true;
+ }
+}
+
+static void
+ovsdb_idl_parse_lock_notify(struct ovsdb_idl *idl,
+ const struct json *params,
+ bool new_has_lock)
+{
+ if (idl->lock_name
+ && params->type == JSON_ARRAY
+ && json_array(params)->n > 0
+ && json_array(params)->elems[0]->type == JSON_STRING) {
+ const char *lock_name = json_string(json_array(params)->elems[0]);
+
+ if (!strcmp(idl->lock_name, lock_name)) {
+ ovsdb_idl_update_has_lock(idl, new_has_lock);
+ if (!new_has_lock) {
+ idl->is_lock_contended = true;
+ }
+ }
+ }
+}
{
/* Create connection to database. */
idl = ovsdb_idl_create(remote, &ovsrec_idl_class, true);
+ ovsdb_idl_set_lock(idl, "ovs_vswitchd");
ovsdb_idl_omit_alert(idl, &ovsrec_open_vswitch_col_cur_cfg);
ovsdb_idl_omit_alert(idl, &ovsrec_open_vswitch_col_statistics);
bool database_changed;
struct bridge *br;
+ /* (Re)configure if necessary. */
+ database_changed = ovsdb_idl_run(idl);
+ if (ovsdb_idl_is_lock_contended(idl)) {
+ static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 1);
+ struct bridge *br, *next_br;
+
+ VLOG_ERR_RL(&rl, "another ovs-vswitchd process is running, "
+ "disabling this process until it goes away");
+
+ HMAP_FOR_EACH_SAFE (br, next_br, node, &all_bridges) {
+ bridge_destroy(br);
+ }
+ return;
+ } else if (!ovsdb_idl_has_lock(idl)) {
+ return;
+ }
+ cfg = ovsrec_open_vswitch_first(idl);
+
/* Let each bridge do the work that it needs to do. */
datapath_destroyed = false;
HMAP_FOR_EACH (br, node, &all_bridges) {
}
}
- /* (Re)configure if necessary. */
- database_changed = ovsdb_idl_run(idl);
- cfg = ovsrec_open_vswitch_first(idl);
-
/* Re-configure SSL. We do this on every trip through the main loop,
* instead of just when the database changes, because the contents of the
* key and certificate files can change without the database changing.
void
bridge_wait(void)
{
- struct bridge *br;
-
- HMAP_FOR_EACH (br, node, &all_bridges) {
- ofproto_wait(br->ofproto);
- }
ovsdb_idl_wait(idl);
- poll_timer_wait_until(stats_timer);
+ if (!hmap_is_empty(&all_bridges)) {
+ struct bridge *br;
- if (db_limiter > time_msec()) {
- poll_timer_wait_until(db_limiter);
+ HMAP_FOR_EACH (br, node, &all_bridges) {
+ ofproto_wait(br->ofproto);
+ }
+ poll_timer_wait_until(stats_timer);
+
+ if (db_limiter > time_msec()) {
+ poll_timer_wait_until(db_limiter);
+ }
}
}
\f