rconn: Decentralize and improve disconnection logging.
[openvswitch] / lib / vconn-stream.c
1 /*
2  * Copyright (c) 2008, 2009, 2010 Nicira Networks.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at:
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #include <config.h>
18 #include "vconn-stream.h"
19 #include <assert.h>
20 #include <errno.h>
21 #include <poll.h>
22 #include <stdlib.h>
23 #include <string.h>
24 #include <sys/types.h>
25 #include <unistd.h>
26 #include "fatal-signal.h"
27 #include "leak-checker.h"
28 #include "ofpbuf.h"
29 #include "openflow/openflow.h"
30 #include "poll-loop.h"
31 #include "socket-util.h"
32 #include "util.h"
33 #include "vconn-provider.h"
34 #include "vconn.h"
35
36 #include "vlog.h"
37 #define THIS_MODULE VLM_vconn_stream
38
39 /* Active stream socket vconn. */
40
41 struct stream_vconn
42 {
43     struct vconn vconn;
44     int fd;
45     struct ofpbuf *rxbuf;
46     struct ofpbuf *txbuf;
47     struct poll_waiter *tx_waiter;
48     char *unlink_path;
49 };
50
51 static struct vconn_class stream_vconn_class;
52
53 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(10, 25);
54
55 static void stream_clear_txbuf(struct stream_vconn *);
56 static void maybe_unlink_and_free(char *path);
57
58 /* Creates a new vconn named 'name' that will send and receive data on 'fd' and
59  * stores a pointer to the vconn in '*vconnp'.  Initial connection status
60  * 'connect_status' is interpreted as described for vconn_init().
61  *
62  * When '*vconnp' is closed, then 'unlink_path' (if nonnull) will be passed to
63  * fatal_signal_unlink_file_now() and then freed with free().
64  *
65  * Returns 0 if successful, otherwise a positive errno value.  (The current
66  * implementation never fails.) */
67 int
68 new_stream_vconn(const char *name, int fd, int connect_status,
69                  char *unlink_path, struct vconn **vconnp)
70 {
71     struct stream_vconn *s;
72
73     s = xmalloc(sizeof *s);
74     vconn_init(&s->vconn, &stream_vconn_class, connect_status, name);
75     s->fd = fd;
76     s->txbuf = NULL;
77     s->tx_waiter = NULL;
78     s->rxbuf = NULL;
79     s->unlink_path = unlink_path;
80     *vconnp = &s->vconn;
81     return 0;
82 }
83
84 static struct stream_vconn *
85 stream_vconn_cast(struct vconn *vconn)
86 {
87     vconn_assert_class(vconn, &stream_vconn_class);
88     return CONTAINER_OF(vconn, struct stream_vconn, vconn);
89 }
90
91 static void
92 stream_close(struct vconn *vconn)
93 {
94     struct stream_vconn *s = stream_vconn_cast(vconn);
95     poll_cancel(s->tx_waiter);
96     stream_clear_txbuf(s);
97     ofpbuf_delete(s->rxbuf);
98     close(s->fd);
99     maybe_unlink_and_free(s->unlink_path);
100     free(s);
101 }
102
103 static int
104 stream_connect(struct vconn *vconn)
105 {
106     struct stream_vconn *s = stream_vconn_cast(vconn);
107     return check_connection_completion(s->fd);
108 }
109
110 static int
111 stream_recv(struct vconn *vconn, struct ofpbuf **bufferp)
112 {
113     struct stream_vconn *s = stream_vconn_cast(vconn);
114     struct ofpbuf *rx;
115     size_t want_bytes;
116     ssize_t retval;
117
118     if (s->rxbuf == NULL) {
119         s->rxbuf = ofpbuf_new(1564);
120     }
121     rx = s->rxbuf;
122
123 again:
124     if (sizeof(struct ofp_header) > rx->size) {
125         want_bytes = sizeof(struct ofp_header) - rx->size;
126     } else {
127         struct ofp_header *oh = rx->data;
128         size_t length = ntohs(oh->length);
129         if (length < sizeof(struct ofp_header)) {
130             VLOG_ERR_RL(&rl, "received too-short ofp_header (%zu bytes)",
131                         length);
132             return EPROTO;
133         }
134         want_bytes = length - rx->size;
135         if (!want_bytes) {
136             *bufferp = rx;
137             s->rxbuf = NULL;
138             return 0;
139         }
140     }
141     ofpbuf_prealloc_tailroom(rx, want_bytes);
142
143     retval = read(s->fd, ofpbuf_tail(rx), want_bytes);
144     if (retval > 0) {
145         rx->size += retval;
146         if (retval == want_bytes) {
147             if (rx->size > sizeof(struct ofp_header)) {
148                 *bufferp = rx;
149                 s->rxbuf = NULL;
150                 return 0;
151             } else {
152                 goto again;
153             }
154         }
155         return EAGAIN;
156     } else if (retval == 0) {
157         if (rx->size) {
158             VLOG_ERR_RL(&rl, "connection dropped mid-packet");
159             return EPROTO;
160         } else {
161             return EOF;
162         }
163     } else {
164         return errno;
165     }
166 }
167
168 static void
169 stream_clear_txbuf(struct stream_vconn *s)
170 {
171     ofpbuf_delete(s->txbuf);
172     s->txbuf = NULL;
173     s->tx_waiter = NULL;
174 }
175
176 static void
177 stream_do_tx(int fd OVS_UNUSED, short int revents OVS_UNUSED, void *vconn_)
178 {
179     struct vconn *vconn = vconn_;
180     struct stream_vconn *s = stream_vconn_cast(vconn);
181     ssize_t n = write(s->fd, s->txbuf->data, s->txbuf->size);
182     if (n < 0) {
183         if (errno != EAGAIN) {
184             VLOG_ERR_RL(&rl, "send: %s", strerror(errno));
185             stream_clear_txbuf(s);
186             return;
187         }
188     } else if (n > 0) {
189         ofpbuf_pull(s->txbuf, n);
190         if (!s->txbuf->size) {
191             stream_clear_txbuf(s);
192             return;
193         }
194     }
195     s->tx_waiter = poll_fd_callback(s->fd, POLLOUT, stream_do_tx, vconn);
196 }
197
198 static int
199 stream_send(struct vconn *vconn, struct ofpbuf *buffer)
200 {
201     struct stream_vconn *s = stream_vconn_cast(vconn);
202     ssize_t retval;
203
204     if (s->txbuf) {
205         return EAGAIN;
206     }
207
208     retval = write(s->fd, buffer->data, buffer->size);
209     if (retval == buffer->size) {
210         ofpbuf_delete(buffer);
211         return 0;
212     } else if (retval >= 0 || errno == EAGAIN) {
213         leak_checker_claim(buffer);
214         s->txbuf = buffer;
215         if (retval > 0) {
216             ofpbuf_pull(buffer, retval);
217         }
218         s->tx_waiter = poll_fd_callback(s->fd, POLLOUT, stream_do_tx, vconn);
219         return 0;
220     } else {
221         return errno;
222     }
223 }
224
225 static void
226 stream_wait(struct vconn *vconn, enum vconn_wait_type wait)
227 {
228     struct stream_vconn *s = stream_vconn_cast(vconn);
229     switch (wait) {
230     case WAIT_CONNECT:
231         poll_fd_wait(s->fd, POLLOUT);
232         break;
233
234     case WAIT_SEND:
235         if (!s->txbuf) {
236             poll_fd_wait(s->fd, POLLOUT);
237         } else {
238             /* Nothing to do: need to drain txbuf first. */
239         }
240         break;
241
242     case WAIT_RECV:
243         poll_fd_wait(s->fd, POLLIN);
244         break;
245
246     default:
247         NOT_REACHED();
248     }
249 }
250
251 static struct vconn_class stream_vconn_class = {
252     "stream",                   /* name */
253     NULL,                       /* open */
254     stream_close,               /* close */
255     stream_connect,             /* connect */
256     stream_recv,                /* recv */
257     stream_send,                /* send */
258     stream_wait,                /* wait */
259 };
260 \f
261 /* Passive stream socket vconn. */
262
263 struct pstream_pvconn
264 {
265     struct pvconn pvconn;
266     int fd;
267     int (*accept_cb)(int fd, const struct sockaddr *, size_t sa_len,
268                      struct vconn **);
269     char *unlink_path;
270 };
271
272 static struct pvconn_class pstream_pvconn_class;
273
274 static struct pstream_pvconn *
275 pstream_pvconn_cast(struct pvconn *pvconn)
276 {
277     pvconn_assert_class(pvconn, &pstream_pvconn_class);
278     return CONTAINER_OF(pvconn, struct pstream_pvconn, pvconn);
279 }
280
281 /* Creates a new pvconn named 'name' that will accept new socket connections on
282  * 'fd' and stores a pointer to the vconn in '*pvconnp'.
283  *
284  * When a connection has been accepted, 'accept_cb' will be called with the new
285  * socket fd 'fd' and the remote address of the connection 'sa' and 'sa_len'.
286  * accept_cb must return 0 if the connection is successful, in which case it
287  * must initialize '*vconnp' to the new vconn, or a positive errno value on
288  * error.  In either case accept_cb takes ownership of the 'fd' passed in.
289  *
290  * When '*pvconnp' is closed, then 'unlink_path' (if nonnull) will be passed to
291  * fatal_signal_unlink_file_now() and freed with free().
292  *
293  * Returns 0 if successful, otherwise a positive errno value.  (The current
294  * implementation never fails.) */
295 int
296 new_pstream_pvconn(const char *name, int fd,
297                   int (*accept_cb)(int fd, const struct sockaddr *sa,
298                                    size_t sa_len, struct vconn **vconnp),
299                   char *unlink_path, struct pvconn **pvconnp)
300 {
301     struct pstream_pvconn *ps = xmalloc(sizeof *ps);
302     pvconn_init(&ps->pvconn, &pstream_pvconn_class, name);
303     ps->fd = fd;
304     ps->accept_cb = accept_cb;
305     ps->unlink_path = unlink_path;
306     *pvconnp = &ps->pvconn;
307     return 0;
308 }
309
310 static void
311 pstream_close(struct pvconn *pvconn)
312 {
313     struct pstream_pvconn *ps = pstream_pvconn_cast(pvconn);
314     close(ps->fd);
315     maybe_unlink_and_free(ps->unlink_path);
316     free(ps);
317 }
318
319 static int
320 pstream_accept(struct pvconn *pvconn, struct vconn **new_vconnp)
321 {
322     struct pstream_pvconn *ps = pstream_pvconn_cast(pvconn);
323     struct sockaddr_storage ss;
324     socklen_t ss_len = sizeof ss;
325     int new_fd;
326     int retval;
327
328     new_fd = accept(ps->fd, (struct sockaddr *) &ss, &ss_len);
329     if (new_fd < 0) {
330         int retval = errno;
331         if (retval != EAGAIN) {
332             VLOG_DBG_RL(&rl, "accept: %s", strerror(retval));
333         }
334         return retval;
335     }
336
337     retval = set_nonblocking(new_fd);
338     if (retval) {
339         close(new_fd);
340         return retval;
341     }
342
343     return ps->accept_cb(new_fd, (const struct sockaddr *) &ss, ss_len,
344                          new_vconnp);
345 }
346
347 static void
348 pstream_wait(struct pvconn *pvconn)
349 {
350     struct pstream_pvconn *ps = pstream_pvconn_cast(pvconn);
351     poll_fd_wait(ps->fd, POLLIN);
352 }
353
354 static struct pvconn_class pstream_pvconn_class = {
355     "pstream",
356     NULL,
357     pstream_close,
358     pstream_accept,
359     pstream_wait
360 };
361 \f
362 /* Helper functions. */
363 static void
364 maybe_unlink_and_free(char *path)
365 {
366     if (path) {
367         fatal_signal_unlink_file_now(path);
368         free(path);
369     }
370 }