1 # Copyright (c) 2010, 2011 Nicira Networks
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at:
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
21 import ovs.socket_util
24 vlog = ovs.vlog.Vlog("stream")
28 """Bidirectional byte stream. Currently only Unix domain sockets
37 # Kinds of events that one might wait for.
38 W_CONNECT = 0 # Connect complete (success or failure).
39 W_RECV = 1 # Data received.
40 W_SEND = 2 # Send buffer room available.
43 def is_valid_name(name):
44 """Returns True if 'name' is a stream name in the form "TYPE:ARGS" and
45 TYPE is a supported stream type (currently only "unix:"), otherwise
47 return name.startswith("unix:")
49 def __init__(self, socket, name, bind_path, status):
52 self.bind_path = bind_path
53 if status == errno.EAGAIN:
54 self.state = Stream.__S_CONNECTING
56 self.state = Stream.__S_CONNECTED
58 self.state = Stream.__S_DISCONNECTED
64 """Attempts to connect a stream to a remote peer. 'name' is a
65 connection name in the form "TYPE:ARGS", where TYPE is an active stream
66 class's name and ARGS are stream class-specific. Currently the only
67 supported TYPE is "unix".
69 Returns (error, stream): on success 'error' is 0 and 'stream' is the
70 new Stream, on failure 'error' is a positive errno value and 'stream'
73 Never returns errno.EAGAIN or errno.EINPROGRESS. Instead, returns 0
74 and a new Stream. The connect() method can be used to check for
75 successful connection completion."""
76 if not Stream.is_valid_name(name):
77 return errno.EAFNOSUPPORT, None
79 Stream.n_unix_sockets += 1
80 bind_path = "/tmp/stream-unix.%d.%d" % (os.getpid(),
81 Stream.n_unix_sockets)
82 connect_path = name[5:]
83 error, sock = ovs.socket_util.make_unix_socket(socket.SOCK_STREAM,
89 status = ovs.socket_util.check_connection_completion(sock)
90 return 0, Stream(sock, name, bind_path, status)
93 def open_block((error, stream)):
94 """Blocks until a Stream completes its connection attempt, either
95 succeeding or failing. (error, stream) should be the tuple returned by
96 Stream.open(). Returns a tuple of the same form.
99 error, stream = Stream.open_block(Stream.open("unix:/tmp/socket"))"""
103 error = stream.connect()
104 if error != errno.EAGAIN:
107 poller = ovs.poller.Poller()
109 stream.connect_wait(poller)
111 assert error != errno.EINPROGRESS
120 if self.bind_path is not None:
121 ovs.fatal_signal.unlink_file_now(self.bind_path)
122 self.bind_path = None
124 def __scs_connecting(self):
125 retval = ovs.socket_util.check_connection_completion(self.socket)
126 assert retval != errno.EINPROGRESS
128 self.state = Stream.__S_CONNECTED
129 elif retval != errno.EAGAIN:
130 self.state = Stream.__S_DISCONNECTED
134 """Tries to complete the connection on this stream. If the connection
135 is complete, returns 0 if the connection was successful or a positive
136 errno value if it failed. If the connection is still in progress,
137 returns errno.EAGAIN."""
138 last_state = -1 # Always differs from initial self.state
139 while self.state != last_state:
140 last_state = self.state
141 if self.state == Stream.__S_CONNECTING:
142 self.__scs_connecting()
143 elif self.state == Stream.__S_CONNECTED:
145 elif self.state == Stream.__S_DISCONNECTED:
149 """Tries to receive up to 'n' bytes from this stream. Returns a
150 (error, string) tuple:
152 - If successful, 'error' is zero and 'string' contains between 1
153 and 'n' bytes of data.
155 - On error, 'error' is a positive errno value.
157 - If the connection has been closed in the normal fashion or if 'n'
158 is 0, the tuple is (0, "").
160 The recv function will not block waiting for data to arrive. If no
161 data have been received, it returns (errno.EAGAIN, "") immediately."""
163 retval = self.connect()
170 return (0, self.socket.recv(n))
171 except socket.error, e:
172 return (ovs.socket_util.get_exception_errno(e), "")
175 """Tries to send 'buf' on this stream.
177 If successful, returns the number of bytes sent, between 1 and
178 len(buf). 0 is only a valid return value if len(buf) is 0.
180 On error, returns a negative errno value.
182 Will not block. If no bytes can be immediately accepted for
183 transmission, returns -errno.EAGAIN immediately."""
185 retval = self.connect()
192 return self.socket.send(buf)
193 except socket.error, e:
194 return -ovs.socket_util.get_exception_errno(e)
199 def run_wait(self, poller):
202 def wait(self, poller, wait):
203 assert wait in (Stream.W_CONNECT, Stream.W_RECV, Stream.W_SEND)
205 if self.state == Stream.__S_DISCONNECTED:
206 poller.immediate_wake()
209 if self.state == Stream.__S_CONNECTING:
210 wait = Stream.W_CONNECT
211 if wait == Stream.W_RECV:
212 poller.fd_wait(self.socket, select.POLLIN)
214 poller.fd_wait(self.socket, select.POLLOUT)
216 def connect_wait(self, poller):
217 self.wait(poller, Stream.W_CONNECT)
219 def recv_wait(self, poller):
220 self.wait(poller, Stream.W_RECV)
222 def send_wait(self, poller):
223 self.wait(poller, Stream.W_SEND)
226 # Don't delete the file: we might have forked.
230 class PassiveStream(object):
232 def is_valid_name(name):
233 """Returns True if 'name' is a passive stream name in the form
234 "TYPE:ARGS" and TYPE is a supported passive stream type (currently only
235 "punix:"), otherwise False."""
236 return name.startswith("punix:")
238 def __init__(self, sock, name, bind_path):
241 self.bind_path = bind_path
245 """Attempts to start listening for remote stream connections. 'name'
246 is a connection name in the form "TYPE:ARGS", where TYPE is an passive
247 stream class's name and ARGS are stream class-specific. Currently the
248 only supported TYPE is "punix".
250 Returns (error, pstream): on success 'error' is 0 and 'pstream' is the
251 new PassiveStream, on failure 'error' is a positive errno value and
252 'pstream' is None."""
253 if not PassiveStream.is_valid_name(name):
254 return errno.EAFNOSUPPORT, None
257 error, sock = ovs.socket_util.make_unix_socket(socket.SOCK_STREAM,
258 True, bind_path, None)
264 except socket.error, e:
265 vlog.err("%s: listen: %s" % (name, os.strerror(e.error)))
269 return 0, PassiveStream(sock, name, bind_path)
272 """Closes this PassiveStream."""
274 if self.bind_path is not None:
275 ovs.fatal_signal.unlink_file_now(self.bind_path)
276 self.bind_path = None
279 """Tries to accept a new connection on this passive stream. Returns
280 (error, stream): if successful, 'error' is 0 and 'stream' is the new
281 Stream object, and on failure 'error' is a positive errno value and
284 Will not block waiting for a connection. If no connection is ready to
285 be accepted, returns (errno.EAGAIN, None) immediately."""
289 sock, addr = self.socket.accept()
290 ovs.socket_util.set_nonblocking(sock)
291 return 0, Stream(sock, "unix:%s" % addr, None, 0)
292 except socket.error, e:
293 error = ovs.socket_util.get_exception_errno(e)
294 if error != errno.EAGAIN:
296 vlog.dbg("accept: %s" % os.strerror(error))
299 def wait(self, poller):
300 poller.fd_wait(self.socket, select.POLLIN)
303 # Don't delete the file: we might have forked.
309 Active %s connection methods:
310 unix:FILE Unix domain socket named FILE
312 Passive %s connection methods:
313 punix:FILE Listen on Unix domain socket FILE""" % (name, name)