1 # Copyright (c) 2010, 2011 Nicira Networks
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at:
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
22 import ovs.socket_util
25 """Bidirectional byte stream. Currently only Unix domain sockets
34 # Kinds of events that one might wait for.
35 W_CONNECT = 0 # Connect complete (success or failure).
36 W_RECV = 1 # Data received.
37 W_SEND = 2 # Send buffer room available.
40 def is_valid_name(name):
41 """Returns True if 'name' is a stream name in the form "TYPE:ARGS" and
42 TYPE is a supported stream type (currently only "unix:"), otherwise
44 return name.startswith("unix:")
46 def __init__(self, socket, name, bind_path, status):
49 self.bind_path = bind_path
50 if status == errno.EAGAIN:
51 self.state = Stream.__S_CONNECTING
53 self.state = Stream.__S_CONNECTED
55 self.state = Stream.__S_DISCONNECTED
61 """Attempts to connect a stream to a remote peer. 'name' is a
62 connection name in the form "TYPE:ARGS", where TYPE is an active stream
63 class's name and ARGS are stream class-specific. Currently the only
64 supported TYPE is "unix".
66 Returns (error, stream): on success 'error' is 0 and 'stream' is the
67 new Stream, on failure 'error' is a positive errno value and 'stream'
70 Never returns errno.EAGAIN or errno.EINPROGRESS. Instead, returns 0
71 and a new Stream. The connect() method can be used to check for
72 successful connection completion."""
73 if not Stream.is_valid_name(name):
74 return errno.EAFNOSUPPORT, None
76 Stream.n_unix_sockets += 1
77 bind_path = "/tmp/stream-unix.%d.%d" % (os.getpid(),
78 Stream.n_unix_sockets)
79 connect_path = name[5:]
80 error, sock = ovs.socket_util.make_unix_socket(socket.SOCK_STREAM,
86 status = ovs.socket_util.check_connection_completion(sock)
87 return 0, Stream(sock, name, bind_path, status)
90 def open_block((error, stream)):
91 """Blocks until a Stream completes its connection attempt, either
92 succeeding or failing. (error, stream) should be the tuple returned by
93 Stream.open(). Returns a tuple of the same form.
96 error, stream = Stream.open_block(Stream.open("unix:/tmp/socket"))"""
100 error = stream.connect()
101 if error != errno.EAGAIN:
104 poller = ovs.poller.Poller()
106 stream.connect_wait(poller)
108 assert error != errno.EINPROGRESS
117 if self.bind_path is not None:
118 ovs.fatal_signal.unlink_file_now(self.bind_path)
119 self.bind_path = None
121 def __scs_connecting(self):
122 retval = ovs.socket_util.check_connection_completion(self.socket)
123 assert retval != errno.EINPROGRESS
125 self.state = Stream.__S_CONNECTED
126 elif retval != errno.EAGAIN:
127 self.state = Stream.__S_DISCONNECTED
131 """Tries to complete the connection on this stream. If the connection
132 is complete, returns 0 if the connection was successful or a positive
133 errno value if it failed. If the connection is still in progress,
134 returns errno.EAGAIN."""
135 last_state = -1 # Always differs from initial self.state
136 while self.state != last_state:
137 last_state = self.state
138 if self.state == Stream.__S_CONNECTING:
139 self.__scs_connecting()
140 elif self.state == Stream.__S_CONNECTED:
142 elif self.state == Stream.__S_DISCONNECTED:
146 """Tries to receive up to 'n' bytes from this stream. Returns a
147 (error, string) tuple:
149 - If successful, 'error' is zero and 'string' contains between 1
150 and 'n' bytes of data.
152 - On error, 'error' is a positive errno value.
154 - If the connection has been closed in the normal fashion or if 'n'
155 is 0, the tuple is (0, "").
157 The recv function will not block waiting for data to arrive. If no
158 data have been received, it returns (errno.EAGAIN, "") immediately."""
160 retval = self.connect()
167 return (0, self.socket.recv(n))
168 except socket.error, e:
169 return (ovs.socket_util.get_exception_errno(e), "")
172 """Tries to send 'buf' on this stream.
174 If successful, returns the number of bytes sent, between 1 and
175 len(buf). 0 is only a valid return value if len(buf) is 0.
177 On error, returns a negative errno value.
179 Will not block. If no bytes can be immediately accepted for
180 transmission, returns -errno.EAGAIN immediately."""
182 retval = self.connect()
189 return self.socket.send(buf)
190 except socket.error, e:
191 return -ovs.socket_util.get_exception_errno(e)
196 def run_wait(self, poller):
199 def wait(self, poller, wait):
200 assert wait in (Stream.W_CONNECT, Stream.W_RECV, Stream.W_SEND)
202 if self.state == Stream.__S_DISCONNECTED:
203 poller.immediate_wake()
206 if self.state == Stream.__S_CONNECTING:
207 wait = Stream.W_CONNECT
208 if wait == Stream.W_RECV:
209 poller.fd_wait(self.socket, select.POLLIN)
211 poller.fd_wait(self.socket, select.POLLOUT)
213 def connect_wait(self, poller):
214 self.wait(poller, Stream.W_CONNECT)
216 def recv_wait(self, poller):
217 self.wait(poller, Stream.W_RECV)
219 def send_wait(self, poller):
220 self.wait(poller, Stream.W_SEND)
223 # Don't delete the file: we might have forked.
226 class PassiveStream(object):
228 def is_valid_name(name):
229 """Returns True if 'name' is a passive stream name in the form
230 "TYPE:ARGS" and TYPE is a supported passive stream type (currently only
231 "punix:"), otherwise False."""
232 return name.startswith("punix:")
234 def __init__(self, sock, name, bind_path):
237 self.bind_path = bind_path
241 """Attempts to start listening for remote stream connections. 'name'
242 is a connection name in the form "TYPE:ARGS", where TYPE is an passive
243 stream class's name and ARGS are stream class-specific. Currently the
244 only supported TYPE is "punix".
246 Returns (error, pstream): on success 'error' is 0 and 'pstream' is the
247 new PassiveStream, on failure 'error' is a positive errno value and
248 'pstream' is None."""
249 if not PassiveStream.is_valid_name(name):
250 return errno.EAFNOSUPPORT, None
253 error, sock = ovs.socket_util.make_unix_socket(socket.SOCK_STREAM,
254 True, bind_path, None)
260 except socket.error, e:
261 logging.error("%s: listen: %s" % (name, os.strerror(e.error)))
265 return 0, PassiveStream(sock, name, bind_path)
268 """Closes this PassiveStream."""
270 if self.bind_path is not None:
271 ovs.fatal_signal.unlink_file_now(self.bind_path)
272 self.bind_path = None
275 """Tries to accept a new connection on this passive stream. Returns
276 (error, stream): if successful, 'error' is 0 and 'stream' is the new
277 Stream object, and on failure 'error' is a positive errno value and
280 Will not block waiting for a connection. If no connection is ready to
281 be accepted, returns (errno.EAGAIN, None) immediately."""
285 sock, addr = self.socket.accept()
286 ovs.socket_util.set_nonblocking(sock)
287 return 0, Stream(sock, "unix:%s" % addr, None, 0)
288 except socket.error, e:
289 error = ovs.socket_util.get_exception_errno(e)
290 if error != errno.EAGAIN:
292 logging.debug("accept: %s" % os.strerror(error))
295 def wait(self, poller):
296 poller.fd_wait(self.socket, select.POLLIN)
299 # Don't delete the file: we might have forked.
302 def usage(name, active, passive):
305 print("Active %s connection methods:" % name)
307 "Unix domain socket named FILE");
310 print("Passive %s connection methods:" % name)
312 "listen on Unix domain socket FILE")