ofp-util: Allow decoding of Open Flow 1.2 Packet In Messages
[openvswitch] / python / ovs / stream.py
index 5954c3148a76768e02483b71d49b553e1dd70db3..9c10612d920b32c74482d8daeb6eae4922edbffa 100644 (file)
@@ -1,4 +1,4 @@
-# Copyright (c) 2010, 2011 Nicira Networks
+# Copyright (c) 2010, 2011, 2012 Nicira, Inc.
 #
 # Licensed under the Apache License, Version 2.0 (the "License");
 # you may not use this file except in compliance with the License.
 #
 # Licensed under the Apache License, Version 2.0 (the "License");
 # you may not use this file except in compliance with the License.
 # limitations under the License.
 
 import errno
 # limitations under the License.
 
 import errno
-import logging
 import os
 import select
 import socket
 import os
 import select
 import socket
-import sys
 
 import ovs.poller
 import ovs.socket_util
 
 import ovs.poller
 import ovs.socket_util
+import ovs.vlog
+
+vlog = ovs.vlog.Vlog("stream")
+
+
+def stream_or_pstream_needs_probes(name):
+    """ 1 if the stream or pstream specified by 'name' needs periodic probes to
+    verify connectivity.  For [p]streams which need probes, it can take a long
+    time to notice the connection was dropped.  Returns 0 if probes aren't
+    needed, and -1 if 'name' is invalid"""
+
+    if PassiveStream.is_valid_name(name) or Stream.is_valid_name(name):
+        # Only unix and punix are supported currently.
+        return 0
+    else:
+        return -1
+
 
 class Stream(object):
     """Bidirectional byte stream.  Currently only Unix domain sockets
     are implemented."""
 
 class Stream(object):
     """Bidirectional byte stream.  Currently only Unix domain sockets
     are implemented."""
-    n_unix_sockets = 0
 
     # States.
     __S_CONNECTING = 0
 
     # States.
     __S_CONNECTING = 0
@@ -44,10 +58,9 @@ class Stream(object):
         False."""
         return name.startswith("unix:")
 
         False."""
         return name.startswith("unix:")
 
-    def __init__(self, socket, name, bind_path, status):
+    def __init__(self, socket, name, status):
         self.socket = socket
         self.name = name
         self.socket = socket
         self.name = name
-        self.bind_path = bind_path
         if status == errno.EAGAIN:
             self.state = Stream.__S_CONNECTING
         elif status == 0:
         if status == errno.EAGAIN:
             self.state = Stream.__S_CONNECTING
         elif status == 0:
@@ -74,18 +87,15 @@ class Stream(object):
         if not Stream.is_valid_name(name):
             return errno.EAFNOSUPPORT, None
 
         if not Stream.is_valid_name(name):
             return errno.EAFNOSUPPORT, None
 
-        Stream.n_unix_sockets += 1
-        bind_path = "/tmp/stream-unix.%d.%d" % (os.getpid(),
-                                                Stream.n_unix_sockets)
         connect_path = name[5:]
         error, sock = ovs.socket_util.make_unix_socket(socket.SOCK_STREAM,
         connect_path = name[5:]
         error, sock = ovs.socket_util.make_unix_socket(socket.SOCK_STREAM,
-                                                       True, bind_path,
+                                                       True, None,
                                                        connect_path)
         if error:
             return error, None
         else:
             status = ovs.socket_util.check_connection_completion(sock)
                                                        connect_path)
         if error:
             return error, None
         else:
             status = ovs.socket_util.check_connection_completion(sock)
-            return 0, Stream(sock, name, bind_path, status)
+            return 0, Stream(sock, name, status)
 
     @staticmethod
     def open_block((error, stream)):
 
     @staticmethod
     def open_block((error, stream)):
@@ -103,11 +113,11 @@ class Stream(object):
                     break
                 stream.run()
                 poller = ovs.poller.Poller()
                     break
                 stream.run()
                 poller = ovs.poller.Poller()
-                stream.run_wait()
+                stream.run_wait(poller)
                 stream.connect_wait(poller)
                 poller.block()
             assert error != errno.EINPROGRESS
                 stream.connect_wait(poller)
                 poller.block()
             assert error != errno.EINPROGRESS
-        
+
         if error and stream:
             stream.close()
             stream = None
         if error and stream:
             stream.close()
             stream = None
@@ -115,9 +125,6 @@ class Stream(object):
 
     def close(self):
         self.socket.close()
 
     def close(self):
         self.socket.close()
-        if self.bind_path is not None:
-            ovs.fatal_signal.unlink_file_now(self.bind_path)
-            self.bind_path = None
 
     def __scs_connecting(self):
         retval = ovs.socket_util.check_connection_completion(self.socket)
 
     def __scs_connecting(self):
         retval = ovs.socket_util.check_connection_completion(self.socket)
@@ -146,7 +153,7 @@ class Stream(object):
     def recv(self, n):
         """Tries to receive up to 'n' bytes from this stream.  Returns a
         (error, string) tuple:
     def recv(self, n):
         """Tries to receive up to 'n' bytes from this stream.  Returns a
         (error, string) tuple:
-        
+
             - If successful, 'error' is zero and 'string' contains between 1
               and 'n' bytes of data.
 
             - If successful, 'error' is zero and 'string' contains between 1
               and 'n' bytes of data.
 
@@ -154,7 +161,7 @@ class Stream(object):
 
             - If the connection has been closed in the normal fashion or if 'n'
               is 0, the tuple is (0, "").
 
             - If the connection has been closed in the normal fashion or if 'n'
               is 0, the tuple is (0, "").
-        
+
         The recv function will not block waiting for data to arrive.  If no
         data have been received, it returns (errno.EAGAIN, "") immediately."""
 
         The recv function will not block waiting for data to arrive.  If no
         data have been received, it returns (errno.EAGAIN, "") immediately."""
 
@@ -206,24 +213,25 @@ class Stream(object):
 
         if self.state == Stream.__S_CONNECTING:
             wait = Stream.W_CONNECT
 
         if self.state == Stream.__S_CONNECTING:
             wait = Stream.W_CONNECT
-        if wait in (Stream.W_CONNECT, Stream.W_SEND):
-            poller.fd_wait(self.socket, select.POLLOUT)
-        else:
+        if wait == Stream.W_RECV:
             poller.fd_wait(self.socket, select.POLLIN)
             poller.fd_wait(self.socket, select.POLLIN)
+        else:
+            poller.fd_wait(self.socket, select.POLLOUT)
 
     def connect_wait(self, poller):
         self.wait(poller, Stream.W_CONNECT)
 
     def connect_wait(self, poller):
         self.wait(poller, Stream.W_CONNECT)
-        
+
     def recv_wait(self, poller):
         self.wait(poller, Stream.W_RECV)
     def recv_wait(self, poller):
         self.wait(poller, Stream.W_RECV)
-        
+
     def send_wait(self, poller):
         self.wait(poller, Stream.W_SEND)
     def send_wait(self, poller):
         self.wait(poller, Stream.W_SEND)
-        
+
     def __del__(self):
         # Don't delete the file: we might have forked.
         self.socket.close()
 
     def __del__(self):
         # Don't delete the file: we might have forked.
         self.socket.close()
 
+
 class PassiveStream(object):
     @staticmethod
     def is_valid_name(name):
 class PassiveStream(object):
     @staticmethod
     def is_valid_name(name):
@@ -259,7 +267,7 @@ class PassiveStream(object):
         try:
             sock.listen(10)
         except socket.error, e:
         try:
             sock.listen(10)
         except socket.error, e:
-            logging.error("%s: listen: %s" % (name, os.strerror(e.error)))
+            vlog.err("%s: listen: %s" % (name, os.strerror(e.error)))
             sock.close()
             return e.error, None
 
             sock.close()
             return e.error, None
 
@@ -285,12 +293,12 @@ class PassiveStream(object):
             try:
                 sock, addr = self.socket.accept()
                 ovs.socket_util.set_nonblocking(sock)
             try:
                 sock, addr = self.socket.accept()
                 ovs.socket_util.set_nonblocking(sock)
-                return 0, Stream(sock, "unix:%s" % addr, None, 0)
+                return 0, Stream(sock, "unix:%s" % addr, 0)
             except socket.error, e:
                 error = ovs.socket_util.get_exception_errno(e)
                 if error != errno.EAGAIN:
                     # XXX rate-limit
             except socket.error, e:
                 error = ovs.socket_util.get_exception_errno(e)
                 if error != errno.EAGAIN:
                     # XXX rate-limit
-                    logging.debug("accept: %s" % os.strerror(error))
+                    vlog.dbg("accept: %s" % os.strerror(error))
                 return error, None
 
     def wait(self, poller):
                 return error, None
 
     def wait(self, poller):
@@ -300,14 +308,11 @@ class PassiveStream(object):
         # Don't delete the file: we might have forked.
         self.socket.close()
 
         # Don't delete the file: we might have forked.
         self.socket.close()
 
-def usage(name, active, passive, bootstrap):
-    print
-    if active:
-        print("Active %s connection methods:" % name)
-        print("  unix:FILE               "
-               "Unix domain socket named FILE");
-
-    if passive:
-        print("Passive %s connection methods:" % name)
-        print("  punix:FILE              "
-              "listen on Unix domain socket FILE")
+
+def usage(name):
+    return """
+Active %s connection methods:
+  unix:FILE               Unix domain socket named FILE
+
+Passive %s connection methods:
+  punix:FILE              Listen on Unix domain socket FILE""" % (name, name)