datapath: Fix FLOW_BUFSIZE definition.
[openvswitch] / python / ovs / stream.py
index c1ce422fa43ca71a518a6272b93bdc327780ff95..9c10612d920b32c74482d8daeb6eae4922edbffa 100644 (file)
@@ -1,4 +1,4 @@
-# Copyright (c) 2010 Nicira Networks
+# Copyright (c) 2010, 2011, 2012 Nicira, Inc.
 #
 # Licensed under the Apache License, Version 2.0 (the "License");
 # you may not use this file except in compliance with the License.
 #
 # Licensed under the Apache License, Version 2.0 (the "License");
 # you may not use this file except in compliance with the License.
 # limitations under the License.
 
 import errno
 # limitations under the License.
 
 import errno
-import logging
 import os
 import select
 import socket
 import os
 import select
 import socket
-import sys
 
 import ovs.poller
 import ovs.socket_util
 
 import ovs.poller
 import ovs.socket_util
+import ovs.vlog
+
+vlog = ovs.vlog.Vlog("stream")
+
+
+def stream_or_pstream_needs_probes(name):
+    """ 1 if the stream or pstream specified by 'name' needs periodic probes to
+    verify connectivity.  For [p]streams which need probes, it can take a long
+    time to notice the connection was dropped.  Returns 0 if probes aren't
+    needed, and -1 if 'name' is invalid"""
+
+    if PassiveStream.is_valid_name(name) or Stream.is_valid_name(name):
+        # Only unix and punix are supported currently.
+        return 0
+    else:
+        return -1
+
 
 class Stream(object):
     """Bidirectional byte stream.  Currently only Unix domain sockets
     are implemented."""
 
 class Stream(object):
     """Bidirectional byte stream.  Currently only Unix domain sockets
     are implemented."""
-    n_unix_sockets = 0
 
     # States.
     __S_CONNECTING = 0
 
     # States.
     __S_CONNECTING = 0
@@ -44,10 +58,9 @@ class Stream(object):
         False."""
         return name.startswith("unix:")
 
         False."""
         return name.startswith("unix:")
 
-    def __init__(self, socket, name, bind_path, status):
+    def __init__(self, socket, name, status):
         self.socket = socket
         self.name = name
         self.socket = socket
         self.name = name
-        self.bind_path = bind_path
         if status == errno.EAGAIN:
             self.state = Stream.__S_CONNECTING
         elif status == 0:
         if status == errno.EAGAIN:
             self.state = Stream.__S_CONNECTING
         elif status == 0:
@@ -74,29 +87,25 @@ class Stream(object):
         if not Stream.is_valid_name(name):
             return errno.EAFNOSUPPORT, None
 
         if not Stream.is_valid_name(name):
             return errno.EAFNOSUPPORT, None
 
-        Stream.n_unix_sockets += 1
-        bind_path = "/tmp/stream-unix.%ld.%d" % (os.getpid(),
-                                                 Stream.n_unix_sockets)
         connect_path = name[5:]
         error, sock = ovs.socket_util.make_unix_socket(socket.SOCK_STREAM,
         connect_path = name[5:]
         error, sock = ovs.socket_util.make_unix_socket(socket.SOCK_STREAM,
-                                                       True, bind_path,
+                                                       True, None,
                                                        connect_path)
         if error:
             return error, None
         else:
             status = ovs.socket_util.check_connection_completion(sock)
                                                        connect_path)
         if error:
             return error, None
         else:
             status = ovs.socket_util.check_connection_completion(sock)
-            return 0, Stream(sock, name, bind_path, status)
+            return 0, Stream(sock, name, status)
 
     @staticmethod
 
     @staticmethod
-    def open_block(tuple):
+    def open_block((error, stream)):
         """Blocks until a Stream completes its connection attempt, either
         """Blocks until a Stream completes its connection attempt, either
-        succeeding or failing.  'tuple' should be the tuple returned by
+        succeeding or failing.  (error, stream) should be the tuple returned by
         Stream.open().  Returns a tuple of the same form.
 
         Typical usage:
         Stream.open().  Returns a tuple of the same form.
 
         Typical usage:
-        error, stream = Stream.open_block(Stream.open("tcp:1.2.3.4:5"))"""
+        error, stream = Stream.open_block(Stream.open("unix:/tmp/socket"))"""
 
 
-        error, stream = tuple
         if not error:
             while True:
                 error = stream.connect()
         if not error:
             while True:
                 error = stream.connect()
@@ -104,11 +113,11 @@ class Stream(object):
                     break
                 stream.run()
                 poller = ovs.poller.Poller()
                     break
                 stream.run()
                 poller = ovs.poller.Poller()
-                stream.run_wait()
+                stream.run_wait(poller)
                 stream.connect_wait(poller)
                 poller.block()
             assert error != errno.EINPROGRESS
                 stream.connect_wait(poller)
                 poller.block()
             assert error != errno.EINPROGRESS
-        
+
         if error and stream:
             stream.close()
             stream = None
         if error and stream:
             stream.close()
             stream = None
@@ -116,9 +125,6 @@ class Stream(object):
 
     def close(self):
         self.socket.close()
 
     def close(self):
         self.socket.close()
-        if self.bind_path is not None:
-            ovs.fatal_signal.unlink_file_now(self.bind_path)
-            self.bind_path = None
 
     def __scs_connecting(self):
         retval = ovs.socket_util.check_connection_completion(self.socket)
 
     def __scs_connecting(self):
         retval = ovs.socket_util.check_connection_completion(self.socket)
@@ -147,7 +153,7 @@ class Stream(object):
     def recv(self, n):
         """Tries to receive up to 'n' bytes from this stream.  Returns a
         (error, string) tuple:
     def recv(self, n):
         """Tries to receive up to 'n' bytes from this stream.  Returns a
         (error, string) tuple:
-        
+
             - If successful, 'error' is zero and 'string' contains between 1
               and 'n' bytes of data.
 
             - If successful, 'error' is zero and 'string' contains between 1
               and 'n' bytes of data.
 
@@ -155,7 +161,7 @@ class Stream(object):
 
             - If the connection has been closed in the normal fashion or if 'n'
               is 0, the tuple is (0, "").
 
             - If the connection has been closed in the normal fashion or if 'n'
               is 0, the tuple is (0, "").
-        
+
         The recv function will not block waiting for data to arrive.  If no
         data have been received, it returns (errno.EAGAIN, "") immediately."""
 
         The recv function will not block waiting for data to arrive.  If no
         data have been received, it returns (errno.EAGAIN, "") immediately."""
 
@@ -207,27 +213,25 @@ class Stream(object):
 
         if self.state == Stream.__S_CONNECTING:
             wait = Stream.W_CONNECT
 
         if self.state == Stream.__S_CONNECTING:
             wait = Stream.W_CONNECT
-        if wait in (Stream.W_CONNECT, Stream.W_SEND):
-            poller.fd_wait(self.socket, select.POLLOUT)
-        else:
+        if wait == Stream.W_RECV:
             poller.fd_wait(self.socket, select.POLLIN)
             poller.fd_wait(self.socket, select.POLLIN)
+        else:
+            poller.fd_wait(self.socket, select.POLLOUT)
 
     def connect_wait(self, poller):
         self.wait(poller, Stream.W_CONNECT)
 
     def connect_wait(self, poller):
         self.wait(poller, Stream.W_CONNECT)
-        
+
     def recv_wait(self, poller):
         self.wait(poller, Stream.W_RECV)
     def recv_wait(self, poller):
         self.wait(poller, Stream.W_RECV)
-        
+
     def send_wait(self, poller):
         self.wait(poller, Stream.W_SEND)
     def send_wait(self, poller):
         self.wait(poller, Stream.W_SEND)
-        
-    def get_name(self):
-        return self.name
-        
+
     def __del__(self):
         # Don't delete the file: we might have forked.
         self.socket.close()
 
     def __del__(self):
         # Don't delete the file: we might have forked.
         self.socket.close()
 
+
 class PassiveStream(object):
     @staticmethod
     def is_valid_name(name):
 class PassiveStream(object):
     @staticmethod
     def is_valid_name(name):
@@ -263,7 +267,7 @@ class PassiveStream(object):
         try:
             sock.listen(10)
         except socket.error, e:
         try:
             sock.listen(10)
         except socket.error, e:
-            logging.error("%s: listen: %s" % (name, os.strerror(e.error)))
+            vlog.err("%s: listen: %s" % (name, os.strerror(e.error)))
             sock.close()
             return e.error, None
 
             sock.close()
             return e.error, None
 
@@ -289,12 +293,12 @@ class PassiveStream(object):
             try:
                 sock, addr = self.socket.accept()
                 ovs.socket_util.set_nonblocking(sock)
             try:
                 sock, addr = self.socket.accept()
                 ovs.socket_util.set_nonblocking(sock)
-                return 0, Stream(sock, "unix:%s" % addr, None, 0)
+                return 0, Stream(sock, "unix:%s" % addr, 0)
             except socket.error, e:
                 error = ovs.socket_util.get_exception_errno(e)
                 if error != errno.EAGAIN:
                     # XXX rate-limit
             except socket.error, e:
                 error = ovs.socket_util.get_exception_errno(e)
                 if error != errno.EAGAIN:
                     # XXX rate-limit
-                    logging.debug("accept: %s" % os.strerror(error))
+                    vlog.dbg("accept: %s" % os.strerror(error))
                 return error, None
 
     def wait(self, poller):
                 return error, None
 
     def wait(self, poller):
@@ -304,14 +308,11 @@ class PassiveStream(object):
         # Don't delete the file: we might have forked.
         self.socket.close()
 
         # Don't delete the file: we might have forked.
         self.socket.close()
 
-def usage(name, active, passive, bootstrap):
-    print
-    if active:
-        print("Active %s connection methods:" % name)
-        print("  unix:FILE               "
-               "Unix domain socket named FILE");
-
-    if passive:
-        print("Passive %s connection methods:" % name)
-        print("  punix:FILE              "
-              "listen on Unix domain socket FILE")
+
+def usage(name):
+    return """
+Active %s connection methods:
+  unix:FILE               Unix domain socket named FILE
+
+Passive %s connection methods:
+  punix:FILE              Listen on Unix domain socket FILE""" % (name, name)