python: Change 'clone' function names to 'copy'.
[openvswitch] / python / ovs / stream.py
1 # Copyright (c) 2010, 2011 Nicira Networks
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at:
6 #
7 #     http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14
15 import errno
16 import logging
17 import os
18 import select
19 import socket
20
21 import ovs.poller
22 import ovs.socket_util
23
24 class Stream(object):
25     """Bidirectional byte stream.  Currently only Unix domain sockets
26     are implemented."""
27     n_unix_sockets = 0
28
29     # States.
30     __S_CONNECTING = 0
31     __S_CONNECTED = 1
32     __S_DISCONNECTED = 2
33
34     # Kinds of events that one might wait for.
35     W_CONNECT = 0               # Connect complete (success or failure).
36     W_RECV = 1                  # Data received.
37     W_SEND = 2                  # Send buffer room available.
38
39     @staticmethod
40     def is_valid_name(name):
41         """Returns True if 'name' is a stream name in the form "TYPE:ARGS" and
42         TYPE is a supported stream type (currently only "unix:"), otherwise
43         False."""
44         return name.startswith("unix:")
45
46     def __init__(self, socket, name, bind_path, status):
47         self.socket = socket
48         self.name = name
49         self.bind_path = bind_path
50         if status == errno.EAGAIN:
51             self.state = Stream.__S_CONNECTING
52         elif status == 0:
53             self.state = Stream.__S_CONNECTED
54         else:
55             self.state = Stream.__S_DISCONNECTED
56
57         self.error = 0
58
59     @staticmethod
60     def open(name):
61         """Attempts to connect a stream to a remote peer.  'name' is a
62         connection name in the form "TYPE:ARGS", where TYPE is an active stream
63         class's name and ARGS are stream class-specific.  Currently the only
64         supported TYPE is "unix".
65
66         Returns (error, stream): on success 'error' is 0 and 'stream' is the
67         new Stream, on failure 'error' is a positive errno value and 'stream'
68         is None.
69
70         Never returns errno.EAGAIN or errno.EINPROGRESS.  Instead, returns 0
71         and a new Stream.  The connect() method can be used to check for
72         successful connection completion."""
73         if not Stream.is_valid_name(name):
74             return errno.EAFNOSUPPORT, None
75
76         Stream.n_unix_sockets += 1
77         bind_path = "/tmp/stream-unix.%d.%d" % (os.getpid(),
78                                                 Stream.n_unix_sockets)
79         connect_path = name[5:]
80         error, sock = ovs.socket_util.make_unix_socket(socket.SOCK_STREAM,
81                                                        True, bind_path,
82                                                        connect_path)
83         if error:
84             return error, None
85         else:
86             status = ovs.socket_util.check_connection_completion(sock)
87             return 0, Stream(sock, name, bind_path, status)
88
89     @staticmethod
90     def open_block((error, stream)):
91         """Blocks until a Stream completes its connection attempt, either
92         succeeding or failing.  (error, stream) should be the tuple returned by
93         Stream.open().  Returns a tuple of the same form.
94
95         Typical usage:
96         error, stream = Stream.open_block(Stream.open("unix:/tmp/socket"))"""
97
98         if not error:
99             while True:
100                 error = stream.connect()
101                 if error != errno.EAGAIN:
102                     break
103                 stream.run()
104                 poller = ovs.poller.Poller()
105                 stream.run_wait()
106                 stream.connect_wait(poller)
107                 poller.block()
108             assert error != errno.EINPROGRESS
109         
110         if error and stream:
111             stream.close()
112             stream = None
113         return error, stream
114
115     def close(self):
116         self.socket.close()
117         if self.bind_path is not None:
118             ovs.fatal_signal.unlink_file_now(self.bind_path)
119             self.bind_path = None
120
121     def __scs_connecting(self):
122         retval = ovs.socket_util.check_connection_completion(self.socket)
123         assert retval != errno.EINPROGRESS
124         if retval == 0:
125             self.state = Stream.__S_CONNECTED
126         elif retval != errno.EAGAIN:
127             self.state = Stream.__S_DISCONNECTED
128             self.error = retval
129
130     def connect(self):
131         """Tries to complete the connection on this stream.  If the connection
132         is complete, returns 0 if the connection was successful or a positive
133         errno value if it failed.  If the connection is still in progress,
134         returns errno.EAGAIN."""
135         last_state = -1         # Always differs from initial self.state
136         while self.state != last_state:
137             last_state = self.state
138             if self.state == Stream.__S_CONNECTING:
139                 self.__scs_connecting()
140             elif self.state == Stream.__S_CONNECTED:
141                 return 0
142             elif self.state == Stream.__S_DISCONNECTED:
143                 return self.error
144
145     def recv(self, n):
146         """Tries to receive up to 'n' bytes from this stream.  Returns a
147         (error, string) tuple:
148         
149             - If successful, 'error' is zero and 'string' contains between 1
150               and 'n' bytes of data.
151
152             - On error, 'error' is a positive errno value.
153
154             - If the connection has been closed in the normal fashion or if 'n'
155               is 0, the tuple is (0, "").
156         
157         The recv function will not block waiting for data to arrive.  If no
158         data have been received, it returns (errno.EAGAIN, "") immediately."""
159
160         retval = self.connect()
161         if retval != 0:
162             return (retval, "")
163         elif n == 0:
164             return (0, "")
165
166         try:
167             return (0, self.socket.recv(n))
168         except socket.error, e:
169             return (ovs.socket_util.get_exception_errno(e), "")
170
171     def send(self, buf):
172         """Tries to send 'buf' on this stream.
173
174         If successful, returns the number of bytes sent, between 1 and
175         len(buf).  0 is only a valid return value if len(buf) is 0.
176
177         On error, returns a negative errno value.
178
179         Will not block.  If no bytes can be immediately accepted for
180         transmission, returns -errno.EAGAIN immediately."""
181
182         retval = self.connect()
183         if retval != 0:
184             return -retval
185         elif len(buf) == 0:
186             return 0
187
188         try:
189             return self.socket.send(buf)
190         except socket.error, e:
191             return -ovs.socket_util.get_exception_errno(e)
192
193     def run(self):
194         pass
195
196     def run_wait(self, poller):
197         pass
198
199     def wait(self, poller, wait):
200         assert wait in (Stream.W_CONNECT, Stream.W_RECV, Stream.W_SEND)
201
202         if self.state == Stream.__S_DISCONNECTED:
203             poller.immediate_wake()
204             return
205
206         if self.state == Stream.__S_CONNECTING:
207             wait = Stream.W_CONNECT
208         if wait == Stream.W_RECV:
209             poller.fd_wait(self.socket, select.POLLIN)
210         else:
211             poller.fd_wait(self.socket, select.POLLOUT)
212
213     def connect_wait(self, poller):
214         self.wait(poller, Stream.W_CONNECT)
215         
216     def recv_wait(self, poller):
217         self.wait(poller, Stream.W_RECV)
218         
219     def send_wait(self, poller):
220         self.wait(poller, Stream.W_SEND)
221         
222     def __del__(self):
223         # Don't delete the file: we might have forked.
224         self.socket.close()
225
226 class PassiveStream(object):
227     @staticmethod
228     def is_valid_name(name):
229         """Returns True if 'name' is a passive stream name in the form
230         "TYPE:ARGS" and TYPE is a supported passive stream type (currently only
231         "punix:"), otherwise False."""
232         return name.startswith("punix:")
233
234     def __init__(self, sock, name, bind_path):
235         self.name = name
236         self.socket = sock
237         self.bind_path = bind_path
238
239     @staticmethod
240     def open(name):
241         """Attempts to start listening for remote stream connections.  'name'
242         is a connection name in the form "TYPE:ARGS", where TYPE is an passive
243         stream class's name and ARGS are stream class-specific.  Currently the
244         only supported TYPE is "punix".
245
246         Returns (error, pstream): on success 'error' is 0 and 'pstream' is the
247         new PassiveStream, on failure 'error' is a positive errno value and
248         'pstream' is None."""
249         if not PassiveStream.is_valid_name(name):
250             return errno.EAFNOSUPPORT, None
251
252         bind_path = name[6:]
253         error, sock = ovs.socket_util.make_unix_socket(socket.SOCK_STREAM,
254                                                        True, bind_path, None)
255         if error:
256             return error, None
257
258         try:
259             sock.listen(10)
260         except socket.error, e:
261             logging.error("%s: listen: %s" % (name, os.strerror(e.error)))
262             sock.close()
263             return e.error, None
264
265         return 0, PassiveStream(sock, name, bind_path)
266
267     def close(self):
268         """Closes this PassiveStream."""
269         self.socket.close()
270         if self.bind_path is not None:
271             ovs.fatal_signal.unlink_file_now(self.bind_path)
272             self.bind_path = None
273
274     def accept(self):
275         """Tries to accept a new connection on this passive stream.  Returns
276         (error, stream): if successful, 'error' is 0 and 'stream' is the new
277         Stream object, and on failure 'error' is a positive errno value and
278         'stream' is None.
279
280         Will not block waiting for a connection.  If no connection is ready to
281         be accepted, returns (errno.EAGAIN, None) immediately."""
282
283         while True:
284             try:
285                 sock, addr = self.socket.accept()
286                 ovs.socket_util.set_nonblocking(sock)
287                 return 0, Stream(sock, "unix:%s" % addr, None, 0)
288             except socket.error, e:
289                 error = ovs.socket_util.get_exception_errno(e)
290                 if error != errno.EAGAIN:
291                     # XXX rate-limit
292                     logging.debug("accept: %s" % os.strerror(error))
293                 return error, None
294
295     def wait(self, poller):
296         poller.fd_wait(self.socket, select.POLLIN)
297
298     def __del__(self):
299         # Don't delete the file: we might have forked.
300         self.socket.close()
301
302 def usage(name, active, passive):
303     print
304     if active:
305         print("Active %s connection methods:" % name)
306         print("  unix:FILE               "
307                "Unix domain socket named FILE");
308
309     if passive:
310         print("Passive %s connection methods:" % name)
311         print("  punix:FILE              "
312               "listen on Unix domain socket FILE")