1 # Copyright (c) 2010 Nicira Networks
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at:
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
23 import ovs.socket_util
26 """Bidirectional byte stream. Currently only Unix domain sockets
35 # Kinds of events that one might wait for.
36 W_CONNECT = 0 # Connect complete (success or failure).
37 W_RECV = 1 # Data received.
38 W_SEND = 2 # Send buffer room available.
41 def is_valid_name(name):
42 """Returns True if 'name' is a stream name in the form "TYPE:ARGS" and
43 TYPE is a supported stream type (currently only "unix:"), otherwise
45 return name.startswith("unix:")
47 def __init__(self, socket, name, bind_path, status):
50 self.bind_path = bind_path
51 if status == errno.EAGAIN:
52 self.state = Stream.__S_CONNECTING
54 self.state = Stream.__S_CONNECTED
56 self.state = Stream.__S_DISCONNECTED
62 """Attempts to connect a stream to a remote peer. 'name' is a
63 connection name in the form "TYPE:ARGS", where TYPE is an active stream
64 class's name and ARGS are stream class-specific. Currently the only
65 supported TYPE is "unix".
67 Returns (error, stream): on success 'error' is 0 and 'stream' is the
68 new Stream, on failure 'error' is a positive errno value and 'stream'
71 Never returns errno.EAGAIN or errno.EINPROGRESS. Instead, returns 0
72 and a new Stream. The connect() method can be used to check for
73 successful connection completion."""
74 if not Stream.is_valid_name(name):
75 return errno.EAFNOSUPPORT, None
77 Stream.n_unix_sockets += 1
78 bind_path = "/tmp/stream-unix.%ld.%d" % (os.getpid(),
79 Stream.n_unix_sockets)
80 connect_path = name[5:]
81 error, sock = ovs.socket_util.make_unix_socket(socket.SOCK_STREAM,
87 status = ovs.socket_util.check_connection_completion(sock)
88 return 0, Stream(sock, name, bind_path, status)
91 def open_block(tuple):
92 """Blocks until a Stream completes its connection attempt, either
93 succeeding or failing. 'tuple' should be the tuple returned by
94 Stream.open(). Returns a tuple of the same form.
97 error, stream = Stream.open_block(Stream.open("tcp:1.2.3.4:5"))"""
102 error = stream.connect()
103 if error != errno.EAGAIN:
106 poller = ovs.poller.Poller()
108 stream.connect_wait(poller)
110 assert error != errno.EINPROGRESS
119 if self.bind_path is not None:
120 ovs.fatal_signal.unlink_file_now(self.bind_path)
121 self.bind_path = None
123 def __scs_connecting(self):
124 retval = ovs.socket_util.check_connection_completion(self.socket)
125 assert retval != errno.EINPROGRESS
127 self.state = Stream.__S_CONNECTED
128 elif retval != errno.EAGAIN:
129 self.state = Stream.__S_DISCONNECTED
133 """Tries to complete the connection on this stream. If the connection
134 is complete, returns 0 if the connection was successful or a positive
135 errno value if it failed. If the connection is still in progress,
136 returns errno.EAGAIN."""
137 last_state = -1 # Always differs from initial self.state
138 while self.state != last_state:
139 last_state = self.state
140 if self.state == Stream.__S_CONNECTING:
141 self.__scs_connecting()
142 elif self.state == Stream.__S_CONNECTED:
144 elif self.state == Stream.__S_DISCONNECTED:
148 """Tries to receive up to 'n' bytes from this stream. Returns a
149 (error, string) tuple:
151 - If successful, 'error' is zero and 'string' contains between 1
152 and 'n' bytes of data.
154 - On error, 'error' is a positive errno value.
156 - If the connection has been closed in the normal fashion or if 'n'
157 is 0, the tuple is (0, "").
159 The recv function will not block waiting for data to arrive. If no
160 data have been received, it returns (errno.EAGAIN, "") immediately."""
162 retval = self.connect()
169 return (0, self.socket.recv(n))
170 except socket.error, e:
171 return (ovs.socket_util.get_exception_errno(e), "")
174 """Tries to send 'buf' on this stream.
176 If successful, returns the number of bytes sent, between 1 and
177 len(buf). 0 is only a valid return value if len(buf) is 0.
179 On error, returns a negative errno value.
181 Will not block. If no bytes can be immediately accepted for
182 transmission, returns -errno.EAGAIN immediately."""
184 retval = self.connect()
191 return self.socket.send(buf)
192 except socket.error, e:
193 return -ovs.socket_util.get_exception_errno(e)
198 def run_wait(self, poller):
201 def wait(self, poller, wait):
202 assert wait in (Stream.W_CONNECT, Stream.W_RECV, Stream.W_SEND)
204 if self.state == Stream.__S_DISCONNECTED:
205 poller.immediate_wake()
208 if self.state == Stream.__S_CONNECTING:
209 wait = Stream.W_CONNECT
210 if wait in (Stream.W_CONNECT, Stream.W_SEND):
211 poller.fd_wait(self.socket, select.POLLOUT)
213 poller.fd_wait(self.socket, select.POLLIN)
215 def connect_wait(self, poller):
216 self.wait(poller, Stream.W_CONNECT)
218 def recv_wait(self, poller):
219 self.wait(poller, Stream.W_RECV)
221 def send_wait(self, poller):
222 self.wait(poller, Stream.W_SEND)
228 # Don't delete the file: we might have forked.
231 class PassiveStream(object):
233 def is_valid_name(name):
234 """Returns True if 'name' is a passive stream name in the form
235 "TYPE:ARGS" and TYPE is a supported passive stream type (currently only
236 "punix:"), otherwise False."""
237 return name.startswith("punix:")
239 def __init__(self, sock, name, bind_path):
242 self.bind_path = bind_path
246 """Attempts to start listening for remote stream connections. 'name'
247 is a connection name in the form "TYPE:ARGS", where TYPE is an passive
248 stream class's name and ARGS are stream class-specific. Currently the
249 only supported TYPE is "punix".
251 Returns (error, pstream): on success 'error' is 0 and 'pstream' is the
252 new PassiveStream, on failure 'error' is a positive errno value and
253 'pstream' is None."""
254 if not PassiveStream.is_valid_name(name):
255 return errno.EAFNOSUPPORT, None
258 error, sock = ovs.socket_util.make_unix_socket(socket.SOCK_STREAM,
259 True, bind_path, None)
265 except socket.error, e:
266 logging.error("%s: listen: %s" % (name, os.strerror(e.error)))
270 return 0, PassiveStream(sock, name, bind_path)
273 """Closes this PassiveStream."""
275 if self.bind_path is not None:
276 ovs.fatal_signal.unlink_file_now(self.bind_path)
277 self.bind_path = None
280 """Tries to accept a new connection on this passive stream. Returns
281 (error, stream): if successful, 'error' is 0 and 'stream' is the new
282 Stream object, and on failure 'error' is a positive errno value and
285 Will not block waiting for a connection. If no connection is ready to
286 be accepted, returns (errno.EAGAIN, None) immediately."""
290 sock, addr = self.socket.accept()
291 ovs.socket_util.set_nonblocking(sock)
292 return 0, Stream(sock, "unix:%s" % addr, None, 0)
293 except socket.error, e:
294 error = ovs.socket_util.get_exception_errno(e)
295 if error != errno.EAGAIN:
297 logging.debug("accept: %s" % os.strerror(error))
300 def wait(self, poller):
301 poller.fd_wait(self.socket, select.POLLIN)
304 # Don't delete the file: we might have forked.
307 def usage(name, active, passive, bootstrap):
310 print("Active %s connection methods:" % name)
312 "Unix domain socket named FILE");
315 print("Passive %s connection methods:" % name)
317 "listen on Unix domain socket FILE")