2 * Copyright (c) 2008, 2009 Nicira Networks.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at:
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
18 #include "vconn-stream.h"
24 #include <sys/types.h>
26 #include "fatal-signal.h"
27 #include "leak-checker.h"
29 #include "openflow/openflow.h"
30 #include "poll-loop.h"
31 #include "socket-util.h"
33 #include "vconn-provider.h"
37 #define THIS_MODULE VLM_vconn_stream
39 /* Active stream socket vconn. */
50 static struct vconn_class stream_vconn_class;
52 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(10, 25);
54 static void stream_clear_txbuf(struct stream_vconn *);
55 static void maybe_unlink_and_free(char *path);
57 /* Creates a new vconn named 'name' that will send and receive data on 'fd' and
58 * stores a pointer to the vconn in '*vconnp'. Initial connection status
59 * 'connect_status' is interpreted as described for vconn_init().
61 * When '*vconnp' is closed, then 'unlink_path' (if nonnull) will be passed to
62 * fatal_signal_unlink_file_now() and then freed with free().
64 * Returns 0 if successful, otherwise a positive errno value. (The current
65 * implementation never fails.) */
67 new_stream_vconn(const char *name, int fd, int connect_status,
68 char *unlink_path, struct vconn **vconnp)
70 struct stream_vconn *s;
72 s = xmalloc(sizeof *s);
73 vconn_init(&s->vconn, &stream_vconn_class, connect_status, name);
77 s->unlink_path = unlink_path;
82 static struct stream_vconn *
83 stream_vconn_cast(struct vconn *vconn)
85 vconn_assert_class(vconn, &stream_vconn_class);
86 return CONTAINER_OF(vconn, struct stream_vconn, vconn);
90 stream_close(struct vconn *vconn)
92 struct stream_vconn *s = stream_vconn_cast(vconn);
93 stream_clear_txbuf(s);
94 ofpbuf_delete(s->rxbuf);
96 maybe_unlink_and_free(s->unlink_path);
101 stream_connect(struct vconn *vconn)
103 struct stream_vconn *s = stream_vconn_cast(vconn);
104 return check_connection_completion(s->fd);
108 stream_recv(struct vconn *vconn, struct ofpbuf **bufferp)
110 struct stream_vconn *s = stream_vconn_cast(vconn);
115 if (s->rxbuf == NULL) {
116 s->rxbuf = ofpbuf_new(1564);
121 if (sizeof(struct ofp_header) > rx->size) {
122 want_bytes = sizeof(struct ofp_header) - rx->size;
124 struct ofp_header *oh = rx->data;
125 size_t length = ntohs(oh->length);
126 if (length < sizeof(struct ofp_header)) {
127 VLOG_ERR_RL(&rl, "received too-short ofp_header (%zu bytes)",
131 want_bytes = length - rx->size;
138 ofpbuf_prealloc_tailroom(rx, want_bytes);
140 retval = read(s->fd, ofpbuf_tail(rx), want_bytes);
143 if (retval == want_bytes) {
144 if (rx->size > sizeof(struct ofp_header)) {
153 } else if (retval == 0) {
155 VLOG_ERR_RL(&rl, "connection dropped mid-packet");
166 stream_clear_txbuf(struct stream_vconn *s)
168 ofpbuf_delete(s->txbuf);
173 stream_send(struct vconn *vconn, struct ofpbuf *buffer)
175 struct stream_vconn *s = stream_vconn_cast(vconn);
182 retval = write(s->fd, buffer->data, buffer->size);
183 if (retval == buffer->size) {
184 ofpbuf_delete(buffer);
186 } else if (retval >= 0 || errno == EAGAIN) {
187 leak_checker_claim(buffer);
190 ofpbuf_pull(buffer, retval);
199 stream_run(struct vconn *vconn)
201 struct stream_vconn *s = stream_vconn_cast(vconn);
208 n = write(s->fd, s->txbuf->data, s->txbuf->size);
210 if (errno != EAGAIN) {
211 VLOG_ERR_RL(&rl, "send: %s", strerror(errno));
212 stream_clear_txbuf(s);
216 ofpbuf_pull(s->txbuf, n);
217 if (!s->txbuf->size) {
218 stream_clear_txbuf(s);
225 stream_run_wait(struct vconn *vconn)
227 struct stream_vconn *s = stream_vconn_cast(vconn);
230 poll_fd_wait(s->fd, POLLOUT);
235 stream_wait(struct vconn *vconn, enum vconn_wait_type wait)
237 struct stream_vconn *s = stream_vconn_cast(vconn);
240 poll_fd_wait(s->fd, POLLOUT);
245 poll_fd_wait(s->fd, POLLOUT);
247 /* Nothing to do: need to drain txbuf first. stream_run_wait()
248 * will arrange to wake up when there room to send data, so there's
249 * no point in calling poll_fd_wait() redundantly here. */
254 poll_fd_wait(s->fd, POLLIN);
262 static struct vconn_class stream_vconn_class = {
265 stream_close, /* close */
266 stream_connect, /* connect */
267 stream_recv, /* recv */
268 stream_send, /* send */
269 stream_run, /* run */
270 stream_run_wait, /* run_wait */
271 stream_wait, /* wait */
274 /* Passive stream socket vconn. */
276 struct pstream_pvconn
278 struct pvconn pvconn;
280 int (*accept_cb)(int fd, const struct sockaddr *, size_t sa_len,
285 static struct pvconn_class pstream_pvconn_class;
287 static struct pstream_pvconn *
288 pstream_pvconn_cast(struct pvconn *pvconn)
290 pvconn_assert_class(pvconn, &pstream_pvconn_class);
291 return CONTAINER_OF(pvconn, struct pstream_pvconn, pvconn);
294 /* Creates a new pvconn named 'name' that will accept new socket connections on
295 * 'fd' and stores a pointer to the vconn in '*pvconnp'.
297 * When a connection has been accepted, 'accept_cb' will be called with the new
298 * socket fd 'fd' and the remote address of the connection 'sa' and 'sa_len'.
299 * accept_cb must return 0 if the connection is successful, in which case it
300 * must initialize '*vconnp' to the new vconn, or a positive errno value on
301 * error. In either case accept_cb takes ownership of the 'fd' passed in.
303 * When '*pvconnp' is closed, then 'unlink_path' (if nonnull) will be passed to
304 * fatal_signal_unlink_file_now() and freed with free().
306 * Returns 0 if successful, otherwise a positive errno value. (The current
307 * implementation never fails.) */
309 new_pstream_pvconn(const char *name, int fd,
310 int (*accept_cb)(int fd, const struct sockaddr *sa,
311 size_t sa_len, struct vconn **vconnp),
312 char *unlink_path, struct pvconn **pvconnp)
314 struct pstream_pvconn *ps = xmalloc(sizeof *ps);
315 pvconn_init(&ps->pvconn, &pstream_pvconn_class, name);
317 ps->accept_cb = accept_cb;
318 ps->unlink_path = unlink_path;
319 *pvconnp = &ps->pvconn;
324 pstream_close(struct pvconn *pvconn)
326 struct pstream_pvconn *ps = pstream_pvconn_cast(pvconn);
328 maybe_unlink_and_free(ps->unlink_path);
333 pstream_accept(struct pvconn *pvconn, struct vconn **new_vconnp)
335 struct pstream_pvconn *ps = pstream_pvconn_cast(pvconn);
336 struct sockaddr_storage ss;
337 socklen_t ss_len = sizeof ss;
341 new_fd = accept(ps->fd, (struct sockaddr *) &ss, &ss_len);
344 if (retval != EAGAIN) {
345 VLOG_DBG_RL(&rl, "accept: %s", strerror(retval));
350 retval = set_nonblocking(new_fd);
356 return ps->accept_cb(new_fd, (const struct sockaddr *) &ss, ss_len,
361 pstream_wait(struct pvconn *pvconn)
363 struct pstream_pvconn *ps = pstream_pvconn_cast(pvconn);
364 poll_fd_wait(ps->fd, POLLIN);
367 static struct pvconn_class pstream_pvconn_class = {
375 /* Helper functions. */
377 maybe_unlink_and_free(char *path)
380 fatal_signal_unlink_file_now(path);