From 539e96f62300e4afab00e5906a28e3b89301d62e Mon Sep 17 00:00:00 2001 From: Ben Pfaff Date: Wed, 6 Jan 2010 14:26:48 -0800 Subject: [PATCH] stream: Add stream_run(), stream_run_wait() functions. SSL, which will be added in an upcoming commit, requires some background processing, which is best done in a "run" function in our architecture. This commit adds stream_run() and stream_run_wait() and calls to them from the places where they will be required. --- lib/jsonrpc.c | 16 ++++++++++++---- lib/stream-fd.c | 2 ++ lib/stream-provider.h | 12 ++++++++++++ lib/stream-tcp.c | 2 ++ lib/stream-unix.c | 2 ++ lib/stream.c | 29 +++++++++++++++++++++++++++-- lib/stream.h | 3 +++ 7 files changed, 60 insertions(+), 6 deletions(-) diff --git a/lib/jsonrpc.c b/lib/jsonrpc.c index 0f535155..ec9108f8 100644 --- a/lib/jsonrpc.c +++ b/lib/jsonrpc.c @@ -1,5 +1,5 @@ /* - * Copyright (c) 2009 Nicira Networks. + * Copyright (c) 2009, 2010 Nicira Networks. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -88,6 +88,7 @@ jsonrpc_run(struct jsonrpc *rpc) return; } + stream_run(rpc->stream); while (!queue_is_empty(&rpc->output)) { struct ofpbuf *buf = rpc->output.head; int retval; @@ -113,8 +114,11 @@ jsonrpc_run(struct jsonrpc *rpc) void jsonrpc_wait(struct jsonrpc *rpc) { - if (!rpc->status && !queue_is_empty(&rpc->output)) { - stream_send_wait(rpc->stream); + if (!rpc->status) { + stream_run_wait(rpc->stream); + if (!queue_is_empty(&rpc->output)) { + stream_send_wait(rpc->stream); + } } } @@ -721,7 +725,10 @@ jsonrpc_session_run(struct jsonrpc_session *s) jsonrpc_session_disconnect(s); } } else if (s->stream) { - int error = stream_connect(s->stream); + int error; + + stream_run(s->stream); + error = stream_connect(s->stream); if (!error) { reconnect_connected(s->reconnect, time_msec()); s->rpc = jsonrpc_open(s->stream); @@ -763,6 +770,7 @@ jsonrpc_session_wait(struct jsonrpc_session *s) if (s->rpc) { jsonrpc_wait(s->rpc); } else if (s->stream) { + stream_run_wait(s->stream); stream_connect_wait(s->stream); } reconnect_wait(s->reconnect, time_msec()); diff --git a/lib/stream-fd.c b/lib/stream-fd.c index 46aa8e73..94c84340 100644 --- a/lib/stream-fd.c +++ b/lib/stream-fd.c @@ -139,6 +139,8 @@ static struct stream_class stream_fd_class = { fd_connect, /* connect */ fd_recv, /* recv */ fd_send, /* send */ + NULL, /* run */ + NULL, /* run_wait */ fd_wait, /* wait */ }; diff --git a/lib/stream-provider.h b/lib/stream-provider.h index 6beaab75..872da3c7 100644 --- a/lib/stream-provider.h +++ b/lib/stream-provider.h @@ -109,6 +109,18 @@ struct stream_class { * accepted for transmission, it should return -EAGAIN immediately. */ ssize_t (*send)(struct stream *stream, const void *buffer, size_t n); + /* Allows 'stream' to perform maintenance activities, such as flushing + * output buffers. + * + * May be null if 'stream' doesn't have anything to do here. */ + void (*run)(struct stream *stream); + + /* Arranges for the poll loop to wake up when 'stream' needs to perform + * maintenance activities. + * + * May be null if 'stream' doesn't have anything to do here. */ + void (*run_wait)(struct stream *stream); + /* Arranges for the poll loop to wake up when 'stream' is ready to take an * action of the given 'type'. */ void (*wait)(struct stream *stream, enum stream_wait_type type); diff --git a/lib/stream-tcp.c b/lib/stream-tcp.c index 947be9f1..bfcf35c7 100644 --- a/lib/stream-tcp.c +++ b/lib/stream-tcp.c @@ -90,6 +90,8 @@ struct stream_class tcp_stream_class = { NULL, /* connect */ NULL, /* recv */ NULL, /* send */ + NULL, /* run */ + NULL, /* run_wait */ NULL, /* wait */ }; diff --git a/lib/stream-unix.c b/lib/stream-unix.c index 9046da15..6ce7790b 100644 --- a/lib/stream-unix.c +++ b/lib/stream-unix.c @@ -69,6 +69,8 @@ struct stream_class unix_stream_class = { NULL, /* connect */ NULL, /* recv */ NULL, /* send */ + NULL, /* run */ + NULL, /* run_wait */ NULL, /* wait */ }; diff --git a/lib/stream.c b/lib/stream.c index 337fb5c7..d4f5de29 100644 --- a/lib/stream.c +++ b/lib/stream.c @@ -1,5 +1,5 @@ /* - * Copyright (c) 2008, 2009 Nicira Networks. + * Copyright (c) 2008, 2009, 2010 Nicira Networks. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -66,7 +66,8 @@ check_stream_classes(void) struct stream_class *class = stream_classes[i]; assert(class->name != NULL); assert(class->open != NULL); - if (class->close || class->recv || class->send || class->wait) { + if (class->close || class->recv || class->send || class->run + || class->run_wait || class->wait) { assert(class->close != NULL); assert(class->recv != NULL); assert(class->send != NULL); @@ -166,6 +167,8 @@ stream_open_block(const char *name, struct stream **streamp) error = stream_open(name, &stream); while (error == EAGAIN) { + stream_run(stream); + stream_run_wait(stream); stream_connect_wait(stream); poll_block(); error = stream_connect(stream); @@ -312,6 +315,28 @@ stream_send(struct stream *stream, const void *buffer, size_t n) : (stream->class->send)(stream, buffer, n)); } +/* Allows 'stream' to perform maintenance activities, such as flushing + * output buffers. */ +void +stream_run(struct stream *stream) +{ + if (stream->class->run) { + (stream->class->run)(stream); + } +} + +/* Arranges for the poll loop to wake up when 'stream' needs to perform + * maintenance activities. */ +void +stream_run_wait(struct stream *stream) +{ + if (stream->class->run_wait) { + (stream->class->run_wait)(stream); + } +} + +/* Arranges for the poll loop to wake up when 'stream' is ready to take an + * action of the given 'type'. */ void stream_wait(struct stream *stream, enum stream_wait_type wait) { diff --git a/lib/stream.h b/lib/stream.h index 7a62a5a3..ae30b103 100644 --- a/lib/stream.h +++ b/lib/stream.h @@ -42,6 +42,9 @@ int stream_connect(struct stream *); int stream_recv(struct stream *, void *buffer, size_t n); int stream_send(struct stream *, const void *buffer, size_t n); +void stream_run(struct stream *); +void stream_run_wait(struct stream *); + enum stream_wait_type { STREAM_CONNECT, STREAM_RECV, -- 2.30.2