1 # Copyright (c) 2010, 2011, 2012 Nicira, Inc.
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at:
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
21 import ovs.socket_util
24 vlog = ovs.vlog.Vlog("stream")
27 def stream_or_pstream_needs_probes(name):
28 """ 1 if the stream or pstream specified by 'name' needs periodic probes to
29 verify connectivity. For [p]streams which need probes, it can take a long
30 time to notice the connection was dropped. Returns 0 if probes aren't
31 needed, and -1 if 'name' is invalid"""
33 if PassiveStream.is_valid_name(name) or Stream.is_valid_name(name):
34 # Only unix and punix are supported currently.
41 """Bidirectional byte stream. Currently only Unix domain sockets
49 # Kinds of events that one might wait for.
50 W_CONNECT = 0 # Connect complete (success or failure).
51 W_RECV = 1 # Data received.
52 W_SEND = 2 # Send buffer room available.
57 def register_method(method, cls):
58 Stream._SOCKET_METHODS[method + ":"] = cls
61 def _find_method(name):
62 for method, cls in Stream._SOCKET_METHODS.items():
63 if name.startswith(method):
68 def is_valid_name(name):
69 """Returns True if 'name' is a stream name in the form "TYPE:ARGS" and
70 TYPE is a supported stream type (currently only "unix:" and "tcp:"),
72 return bool(Stream._find_method(name))
74 def __init__(self, socket, name, status):
77 if status == errno.EAGAIN:
78 self.state = Stream.__S_CONNECTING
80 self.state = Stream.__S_CONNECTED
82 self.state = Stream.__S_DISCONNECTED
86 # Default value of dscp bits for connection between controller and manager.
87 # Value of IPTOS_PREC_INTERNETCONTROL = 0xc0 which is defined
88 # in <netinet/ip.h> is used.
89 IPTOS_PREC_INTERNETCONTROL = 0xc0
90 DSCP_DEFAULT = IPTOS_PREC_INTERNETCONTROL >> 2
93 def open(name, dscp=DSCP_DEFAULT):
94 """Attempts to connect a stream to a remote peer. 'name' is a
95 connection name in the form "TYPE:ARGS", where TYPE is an active stream
96 class's name and ARGS are stream class-specific. Currently the only
97 supported TYPEs are "unix" and "tcp".
99 Returns (error, stream): on success 'error' is 0 and 'stream' is the
100 new Stream, on failure 'error' is a positive errno value and 'stream'
103 Never returns errno.EAGAIN or errno.EINPROGRESS. Instead, returns 0
104 and a new Stream. The connect() method can be used to check for
105 successful connection completion."""
106 cls = Stream._find_method(name)
108 return errno.EAFNOSUPPORT, None
110 suffix = name.split(":", 1)[1]
111 error, sock = cls._open(suffix, dscp)
115 status = ovs.socket_util.check_connection_completion(sock)
116 return 0, Stream(sock, name, status)
119 def _open(suffix, dscp):
120 raise NotImplementedError("This method must be overrided by subclass")
123 def open_block((error, stream)):
124 """Blocks until a Stream completes its connection attempt, either
125 succeeding or failing. (error, stream) should be the tuple returned by
126 Stream.open(). Returns a tuple of the same form.
129 error, stream = Stream.open_block(Stream.open("unix:/tmp/socket"))"""
133 error = stream.connect()
134 if error != errno.EAGAIN:
137 poller = ovs.poller.Poller()
138 stream.run_wait(poller)
139 stream.connect_wait(poller)
141 assert error != errno.EINPROGRESS
151 def __scs_connecting(self):
152 retval = ovs.socket_util.check_connection_completion(self.socket)
153 assert retval != errno.EINPROGRESS
155 self.state = Stream.__S_CONNECTED
156 elif retval != errno.EAGAIN:
157 self.state = Stream.__S_DISCONNECTED
161 """Tries to complete the connection on this stream. If the connection
162 is complete, returns 0 if the connection was successful or a positive
163 errno value if it failed. If the connection is still in progress,
164 returns errno.EAGAIN."""
165 last_state = -1 # Always differs from initial self.state
166 while self.state != last_state:
167 last_state = self.state
168 if self.state == Stream.__S_CONNECTING:
169 self.__scs_connecting()
170 elif self.state == Stream.__S_CONNECTED:
172 elif self.state == Stream.__S_DISCONNECTED:
176 """Tries to receive up to 'n' bytes from this stream. Returns a
177 (error, string) tuple:
179 - If successful, 'error' is zero and 'string' contains between 1
180 and 'n' bytes of data.
182 - On error, 'error' is a positive errno value.
184 - If the connection has been closed in the normal fashion or if 'n'
185 is 0, the tuple is (0, "").
187 The recv function will not block waiting for data to arrive. If no
188 data have been received, it returns (errno.EAGAIN, "") immediately."""
190 retval = self.connect()
197 return (0, self.socket.recv(n))
198 except socket.error, e:
199 return (ovs.socket_util.get_exception_errno(e), "")
202 """Tries to send 'buf' on this stream.
204 If successful, returns the number of bytes sent, between 1 and
205 len(buf). 0 is only a valid return value if len(buf) is 0.
207 On error, returns a negative errno value.
209 Will not block. If no bytes can be immediately accepted for
210 transmission, returns -errno.EAGAIN immediately."""
212 retval = self.connect()
219 return self.socket.send(buf)
220 except socket.error, e:
221 return -ovs.socket_util.get_exception_errno(e)
226 def run_wait(self, poller):
229 def wait(self, poller, wait):
230 assert wait in (Stream.W_CONNECT, Stream.W_RECV, Stream.W_SEND)
232 if self.state == Stream.__S_DISCONNECTED:
233 poller.immediate_wake()
236 if self.state == Stream.__S_CONNECTING:
237 wait = Stream.W_CONNECT
238 if wait == Stream.W_RECV:
239 poller.fd_wait(self.socket, select.POLLIN)
241 poller.fd_wait(self.socket, select.POLLOUT)
243 def connect_wait(self, poller):
244 self.wait(poller, Stream.W_CONNECT)
246 def recv_wait(self, poller):
247 self.wait(poller, Stream.W_RECV)
249 def send_wait(self, poller):
250 self.wait(poller, Stream.W_SEND)
253 # Don't delete the file: we might have forked.
257 class PassiveStream(object):
259 def is_valid_name(name):
260 """Returns True if 'name' is a passive stream name in the form
261 "TYPE:ARGS" and TYPE is a supported passive stream type (currently only
262 "punix:"), otherwise False."""
263 return name.startswith("punix:")
265 def __init__(self, sock, name, bind_path):
268 self.bind_path = bind_path
272 """Attempts to start listening for remote stream connections. 'name'
273 is a connection name in the form "TYPE:ARGS", where TYPE is an passive
274 stream class's name and ARGS are stream class-specific. Currently the
275 only supported TYPE is "punix".
277 Returns (error, pstream): on success 'error' is 0 and 'pstream' is the
278 new PassiveStream, on failure 'error' is a positive errno value and
279 'pstream' is None."""
280 if not PassiveStream.is_valid_name(name):
281 return errno.EAFNOSUPPORT, None
284 error, sock = ovs.socket_util.make_unix_socket(socket.SOCK_STREAM,
285 True, bind_path, None)
291 except socket.error, e:
292 vlog.err("%s: listen: %s" % (name, os.strerror(e.error)))
296 return 0, PassiveStream(sock, name, bind_path)
299 """Closes this PassiveStream."""
301 if self.bind_path is not None:
302 ovs.fatal_signal.unlink_file_now(self.bind_path)
303 self.bind_path = None
306 """Tries to accept a new connection on this passive stream. Returns
307 (error, stream): if successful, 'error' is 0 and 'stream' is the new
308 Stream object, and on failure 'error' is a positive errno value and
311 Will not block waiting for a connection. If no connection is ready to
312 be accepted, returns (errno.EAGAIN, None) immediately."""
316 sock, addr = self.socket.accept()
317 ovs.socket_util.set_nonblocking(sock)
318 return 0, Stream(sock, "unix:%s" % addr, 0)
319 except socket.error, e:
320 error = ovs.socket_util.get_exception_errno(e)
321 if error != errno.EAGAIN:
323 vlog.dbg("accept: %s" % os.strerror(error))
326 def wait(self, poller):
327 poller.fd_wait(self.socket, select.POLLIN)
330 # Don't delete the file: we might have forked.
336 Active %s connection methods:
337 unix:FILE Unix domain socket named FILE
338 tcp:IP:PORT TCP socket to IP with port no of PORT
340 Passive %s connection methods:
341 punix:FILE Listen on Unix domain socket FILE""" % (name, name)
344 class UnixStream(Stream):
346 def _open(suffix, dscp):
347 connect_path = suffix
348 return ovs.socket_util.make_unix_socket(socket.SOCK_STREAM,
349 True, None, connect_path)
350 Stream.register_method("unix", UnixStream)
353 class TCPStream(Stream):
355 def _open(suffix, dscp):
356 error, sock = ovs.socket_util.inet_open_active(socket.SOCK_STREAM,
359 sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
361 Stream.register_method("tcp", TCPStream)