1 # Copyright (c) 2010, 2011 Nicira Networks
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at:
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
23 import ovs.socket_util
26 """Bidirectional byte stream. Currently only Unix domain sockets
35 # Kinds of events that one might wait for.
36 W_CONNECT = 0 # Connect complete (success or failure).
37 W_RECV = 1 # Data received.
38 W_SEND = 2 # Send buffer room available.
41 def is_valid_name(name):
42 """Returns True if 'name' is a stream name in the form "TYPE:ARGS" and
43 TYPE is a supported stream type (currently only "unix:"), otherwise
45 return name.startswith("unix:")
47 def __init__(self, socket, name, bind_path, status):
50 self.bind_path = bind_path
51 if status == errno.EAGAIN:
52 self.state = Stream.__S_CONNECTING
54 self.state = Stream.__S_CONNECTED
56 self.state = Stream.__S_DISCONNECTED
62 """Attempts to connect a stream to a remote peer. 'name' is a
63 connection name in the form "TYPE:ARGS", where TYPE is an active stream
64 class's name and ARGS are stream class-specific. Currently the only
65 supported TYPE is "unix".
67 Returns (error, stream): on success 'error' is 0 and 'stream' is the
68 new Stream, on failure 'error' is a positive errno value and 'stream'
71 Never returns errno.EAGAIN or errno.EINPROGRESS. Instead, returns 0
72 and a new Stream. The connect() method can be used to check for
73 successful connection completion."""
74 if not Stream.is_valid_name(name):
75 return errno.EAFNOSUPPORT, None
77 Stream.n_unix_sockets += 1
78 bind_path = "/tmp/stream-unix.%d.%d" % (os.getpid(),
79 Stream.n_unix_sockets)
80 connect_path = name[5:]
81 error, sock = ovs.socket_util.make_unix_socket(socket.SOCK_STREAM,
87 status = ovs.socket_util.check_connection_completion(sock)
88 return 0, Stream(sock, name, bind_path, status)
91 def open_block((error, stream)):
92 """Blocks until a Stream completes its connection attempt, either
93 succeeding or failing. (error, stream) should be the tuple returned by
94 Stream.open(). Returns a tuple of the same form.
97 error, stream = Stream.open_block(Stream.open("unix:/tmp/socket"))"""
101 error = stream.connect()
102 if error != errno.EAGAIN:
105 poller = ovs.poller.Poller()
107 stream.connect_wait(poller)
109 assert error != errno.EINPROGRESS
118 if self.bind_path is not None:
119 ovs.fatal_signal.unlink_file_now(self.bind_path)
120 self.bind_path = None
122 def __scs_connecting(self):
123 retval = ovs.socket_util.check_connection_completion(self.socket)
124 assert retval != errno.EINPROGRESS
126 self.state = Stream.__S_CONNECTED
127 elif retval != errno.EAGAIN:
128 self.state = Stream.__S_DISCONNECTED
132 """Tries to complete the connection on this stream. If the connection
133 is complete, returns 0 if the connection was successful or a positive
134 errno value if it failed. If the connection is still in progress,
135 returns errno.EAGAIN."""
136 last_state = -1 # Always differs from initial self.state
137 while self.state != last_state:
138 last_state = self.state
139 if self.state == Stream.__S_CONNECTING:
140 self.__scs_connecting()
141 elif self.state == Stream.__S_CONNECTED:
143 elif self.state == Stream.__S_DISCONNECTED:
147 """Tries to receive up to 'n' bytes from this stream. Returns a
148 (error, string) tuple:
150 - If successful, 'error' is zero and 'string' contains between 1
151 and 'n' bytes of data.
153 - On error, 'error' is a positive errno value.
155 - If the connection has been closed in the normal fashion or if 'n'
156 is 0, the tuple is (0, "").
158 The recv function will not block waiting for data to arrive. If no
159 data have been received, it returns (errno.EAGAIN, "") immediately."""
161 retval = self.connect()
168 return (0, self.socket.recv(n))
169 except socket.error, e:
170 return (ovs.socket_util.get_exception_errno(e), "")
173 """Tries to send 'buf' on this stream.
175 If successful, returns the number of bytes sent, between 1 and
176 len(buf). 0 is only a valid return value if len(buf) is 0.
178 On error, returns a negative errno value.
180 Will not block. If no bytes can be immediately accepted for
181 transmission, returns -errno.EAGAIN immediately."""
183 retval = self.connect()
190 return self.socket.send(buf)
191 except socket.error, e:
192 return -ovs.socket_util.get_exception_errno(e)
197 def run_wait(self, poller):
200 def wait(self, poller, wait):
201 assert wait in (Stream.W_CONNECT, Stream.W_RECV, Stream.W_SEND)
203 if self.state == Stream.__S_DISCONNECTED:
204 poller.immediate_wake()
207 if self.state == Stream.__S_CONNECTING:
208 wait = Stream.W_CONNECT
209 if wait in (Stream.W_CONNECT, Stream.W_SEND):
210 poller.fd_wait(self.socket, select.POLLOUT)
212 poller.fd_wait(self.socket, select.POLLIN)
214 def connect_wait(self, poller):
215 self.wait(poller, Stream.W_CONNECT)
217 def recv_wait(self, poller):
218 self.wait(poller, Stream.W_RECV)
220 def send_wait(self, poller):
221 self.wait(poller, Stream.W_SEND)
224 # Don't delete the file: we might have forked.
227 class PassiveStream(object):
229 def is_valid_name(name):
230 """Returns True if 'name' is a passive stream name in the form
231 "TYPE:ARGS" and TYPE is a supported passive stream type (currently only
232 "punix:"), otherwise False."""
233 return name.startswith("punix:")
235 def __init__(self, sock, name, bind_path):
238 self.bind_path = bind_path
242 """Attempts to start listening for remote stream connections. 'name'
243 is a connection name in the form "TYPE:ARGS", where TYPE is an passive
244 stream class's name and ARGS are stream class-specific. Currently the
245 only supported TYPE is "punix".
247 Returns (error, pstream): on success 'error' is 0 and 'pstream' is the
248 new PassiveStream, on failure 'error' is a positive errno value and
249 'pstream' is None."""
250 if not PassiveStream.is_valid_name(name):
251 return errno.EAFNOSUPPORT, None
254 error, sock = ovs.socket_util.make_unix_socket(socket.SOCK_STREAM,
255 True, bind_path, None)
261 except socket.error, e:
262 logging.error("%s: listen: %s" % (name, os.strerror(e.error)))
266 return 0, PassiveStream(sock, name, bind_path)
269 """Closes this PassiveStream."""
271 if self.bind_path is not None:
272 ovs.fatal_signal.unlink_file_now(self.bind_path)
273 self.bind_path = None
276 """Tries to accept a new connection on this passive stream. Returns
277 (error, stream): if successful, 'error' is 0 and 'stream' is the new
278 Stream object, and on failure 'error' is a positive errno value and
281 Will not block waiting for a connection. If no connection is ready to
282 be accepted, returns (errno.EAGAIN, None) immediately."""
286 sock, addr = self.socket.accept()
287 ovs.socket_util.set_nonblocking(sock)
288 return 0, Stream(sock, "unix:%s" % addr, None, 0)
289 except socket.error, e:
290 error = ovs.socket_util.get_exception_errno(e)
291 if error != errno.EAGAIN:
293 logging.debug("accept: %s" % os.strerror(error))
296 def wait(self, poller):
297 poller.fd_wait(self.socket, select.POLLIN)
300 # Don't delete the file: we might have forked.
303 def usage(name, active, passive, bootstrap):
306 print("Active %s connection methods:" % name)
308 "Unix domain socket named FILE");
311 print("Passive %s connection methods:" % name)
313 "listen on Unix domain socket FILE")