struct reconnect *reconnect;
struct jsonrpc *rpc;
struct stream *stream;
+ struct pstream *pstream;
unsigned int seqno;
};
-/* Creates and returns a jsonrpc_session that connects and reconnects, with
- * back-off, to 'name', which should be a string acceptable to
- * stream_open(). */
+/* Creates and returns a jsonrpc_session to 'name', which should be a string
+ * acceptable to stream_open() or pstream_open().
+ *
+ * If 'name' is an active connection method, e.g. "tcp:127.1.2.3", the new
+ * jsonrpc_session connects and reconnects, with back-off, to 'name'.
+ *
+ * If 'name' is a passive connection method, e.g. "ptcp:", the new
+ * jsonrpc_session listens for connections to 'name'. It maintains at most one
+ * connection at any given time. Any new connection causes the previous one
+ * (if any) to be dropped. */
struct jsonrpc_session *
jsonrpc_session_open(const char *name)
{
reconnect_enable(s->reconnect, time_msec());
s->rpc = NULL;
s->stream = NULL;
+ s->pstream = NULL;
s->seqno = 0;
+ if (!pstream_verify_name(name)) {
+ reconnect_set_passive(s->reconnect, true, time_msec());
+ }
+
return s;
}
reconnect_connected(s->reconnect, time_msec());
s->rpc = jsonrpc;
s->stream = NULL;
+ s->pstream = NULL;
s->seqno = 0;
return s;
jsonrpc_close(s->rpc);
reconnect_destroy(s->reconnect);
stream_close(s->stream);
+ pstream_close(s->pstream);
free(s);
}
}
static void
jsonrpc_session_connect(struct jsonrpc_session *s)
{
+ const char *name = reconnect_get_name(s->reconnect);
int error;
jsonrpc_session_disconnect(s);
- error = stream_open(reconnect_get_name(s->reconnect), &s->stream);
+ if (!reconnect_is_passive(s->reconnect)) {
+ error = stream_open(name, &s->stream);
+ if (!error) {
+ reconnect_connecting(s->reconnect, time_msec());
+ }
+ } else {
+ error = s->pstream ? 0 : pstream_open(name, &s->pstream);
+ if (!error) {
+ reconnect_listening(s->reconnect, time_msec());
+ }
+ }
+
if (error) {
reconnect_connect_failed(s->reconnect, time_msec(), error);
- } else {
- reconnect_connecting(s->reconnect, time_msec());
}
s->seqno++;
}
void
jsonrpc_session_run(struct jsonrpc_session *s)
{
+ if (s->pstream) {
+ struct stream *stream;
+ int error;
+
+ error = pstream_accept(s->pstream, &stream);
+ if (!error) {
+ if (s->rpc || s->stream) {
+ VLOG_INFO_RL(&rl,
+ "%s: new connection replacing active connection",
+ reconnect_get_name(s->reconnect));
+ jsonrpc_session_disconnect(s);
+ }
+ reconnect_connected(s->reconnect, time_msec());
+ s->rpc = jsonrpc_open(stream);
+ } else if (error != EAGAIN) {
+ reconnect_listen_error(s->reconnect, time_msec(), error);
+ pstream_close(s->pstream);
+ s->pstream = NULL;
+ }
+ }
+
if (s->rpc) {
int error;
stream_run_wait(s->stream);
stream_connect_wait(s->stream);
}
+ if (s->pstream) {
+ pstream_wait(s->pstream);
+ }
reconnect_wait(s->reconnect, time_msec());
}