1 # Copyright (c) 2010, 2011, 2012 Nicira Networks
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at:
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
21 import ovs.socket_util
24 vlog = ovs.vlog.Vlog("stream")
28 """Bidirectional byte stream. Currently only Unix domain sockets
36 # Kinds of events that one might wait for.
37 W_CONNECT = 0 # Connect complete (success or failure).
38 W_RECV = 1 # Data received.
39 W_SEND = 2 # Send buffer room available.
42 def is_valid_name(name):
43 """Returns True if 'name' is a stream name in the form "TYPE:ARGS" and
44 TYPE is a supported stream type (currently only "unix:"), otherwise
46 return name.startswith("unix:")
48 def __init__(self, socket, name, status):
51 if status == errno.EAGAIN:
52 self.state = Stream.__S_CONNECTING
54 self.state = Stream.__S_CONNECTED
56 self.state = Stream.__S_DISCONNECTED
62 """Attempts to connect a stream to a remote peer. 'name' is a
63 connection name in the form "TYPE:ARGS", where TYPE is an active stream
64 class's name and ARGS are stream class-specific. Currently the only
65 supported TYPE is "unix".
67 Returns (error, stream): on success 'error' is 0 and 'stream' is the
68 new Stream, on failure 'error' is a positive errno value and 'stream'
71 Never returns errno.EAGAIN or errno.EINPROGRESS. Instead, returns 0
72 and a new Stream. The connect() method can be used to check for
73 successful connection completion."""
74 if not Stream.is_valid_name(name):
75 return errno.EAFNOSUPPORT, None
77 connect_path = name[5:]
78 error, sock = ovs.socket_util.make_unix_socket(socket.SOCK_STREAM,
84 status = ovs.socket_util.check_connection_completion(sock)
85 return 0, Stream(sock, name, status)
88 def open_block((error, stream)):
89 """Blocks until a Stream completes its connection attempt, either
90 succeeding or failing. (error, stream) should be the tuple returned by
91 Stream.open(). Returns a tuple of the same form.
94 error, stream = Stream.open_block(Stream.open("unix:/tmp/socket"))"""
98 error = stream.connect()
99 if error != errno.EAGAIN:
102 poller = ovs.poller.Poller()
104 stream.connect_wait(poller)
106 assert error != errno.EINPROGRESS
116 def __scs_connecting(self):
117 retval = ovs.socket_util.check_connection_completion(self.socket)
118 assert retval != errno.EINPROGRESS
120 self.state = Stream.__S_CONNECTED
121 elif retval != errno.EAGAIN:
122 self.state = Stream.__S_DISCONNECTED
126 """Tries to complete the connection on this stream. If the connection
127 is complete, returns 0 if the connection was successful or a positive
128 errno value if it failed. If the connection is still in progress,
129 returns errno.EAGAIN."""
130 last_state = -1 # Always differs from initial self.state
131 while self.state != last_state:
132 last_state = self.state
133 if self.state == Stream.__S_CONNECTING:
134 self.__scs_connecting()
135 elif self.state == Stream.__S_CONNECTED:
137 elif self.state == Stream.__S_DISCONNECTED:
141 """Tries to receive up to 'n' bytes from this stream. Returns a
142 (error, string) tuple:
144 - If successful, 'error' is zero and 'string' contains between 1
145 and 'n' bytes of data.
147 - On error, 'error' is a positive errno value.
149 - If the connection has been closed in the normal fashion or if 'n'
150 is 0, the tuple is (0, "").
152 The recv function will not block waiting for data to arrive. If no
153 data have been received, it returns (errno.EAGAIN, "") immediately."""
155 retval = self.connect()
162 return (0, self.socket.recv(n))
163 except socket.error, e:
164 return (ovs.socket_util.get_exception_errno(e), "")
167 """Tries to send 'buf' on this stream.
169 If successful, returns the number of bytes sent, between 1 and
170 len(buf). 0 is only a valid return value if len(buf) is 0.
172 On error, returns a negative errno value.
174 Will not block. If no bytes can be immediately accepted for
175 transmission, returns -errno.EAGAIN immediately."""
177 retval = self.connect()
184 return self.socket.send(buf)
185 except socket.error, e:
186 return -ovs.socket_util.get_exception_errno(e)
191 def run_wait(self, poller):
194 def wait(self, poller, wait):
195 assert wait in (Stream.W_CONNECT, Stream.W_RECV, Stream.W_SEND)
197 if self.state == Stream.__S_DISCONNECTED:
198 poller.immediate_wake()
201 if self.state == Stream.__S_CONNECTING:
202 wait = Stream.W_CONNECT
203 if wait == Stream.W_RECV:
204 poller.fd_wait(self.socket, select.POLLIN)
206 poller.fd_wait(self.socket, select.POLLOUT)
208 def connect_wait(self, poller):
209 self.wait(poller, Stream.W_CONNECT)
211 def recv_wait(self, poller):
212 self.wait(poller, Stream.W_RECV)
214 def send_wait(self, poller):
215 self.wait(poller, Stream.W_SEND)
218 # Don't delete the file: we might have forked.
222 class PassiveStream(object):
224 def is_valid_name(name):
225 """Returns True if 'name' is a passive stream name in the form
226 "TYPE:ARGS" and TYPE is a supported passive stream type (currently only
227 "punix:"), otherwise False."""
228 return name.startswith("punix:")
230 def __init__(self, sock, name, bind_path):
233 self.bind_path = bind_path
237 """Attempts to start listening for remote stream connections. 'name'
238 is a connection name in the form "TYPE:ARGS", where TYPE is an passive
239 stream class's name and ARGS are stream class-specific. Currently the
240 only supported TYPE is "punix".
242 Returns (error, pstream): on success 'error' is 0 and 'pstream' is the
243 new PassiveStream, on failure 'error' is a positive errno value and
244 'pstream' is None."""
245 if not PassiveStream.is_valid_name(name):
246 return errno.EAFNOSUPPORT, None
249 error, sock = ovs.socket_util.make_unix_socket(socket.SOCK_STREAM,
250 True, bind_path, None)
256 except socket.error, e:
257 vlog.err("%s: listen: %s" % (name, os.strerror(e.error)))
261 return 0, PassiveStream(sock, name, bind_path)
264 """Closes this PassiveStream."""
266 if self.bind_path is not None:
267 ovs.fatal_signal.unlink_file_now(self.bind_path)
268 self.bind_path = None
271 """Tries to accept a new connection on this passive stream. Returns
272 (error, stream): if successful, 'error' is 0 and 'stream' is the new
273 Stream object, and on failure 'error' is a positive errno value and
276 Will not block waiting for a connection. If no connection is ready to
277 be accepted, returns (errno.EAGAIN, None) immediately."""
281 sock, addr = self.socket.accept()
282 ovs.socket_util.set_nonblocking(sock)
283 return 0, Stream(sock, "unix:%s" % addr, 0)
284 except socket.error, e:
285 error = ovs.socket_util.get_exception_errno(e)
286 if error != errno.EAGAIN:
288 vlog.dbg("accept: %s" % os.strerror(error))
291 def wait(self, poller):
292 poller.fd_wait(self.socket, select.POLLIN)
295 # Don't delete the file: we might have forked.
301 Active %s connection methods:
302 unix:FILE Unix domain socket named FILE
304 Passive %s connection methods:
305 punix:FILE Listen on Unix domain socket FILE""" % (name, name)