3 # Copyright (c) 2010 Nicira, Inc.
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at:
9 # http://www.apache.org/licenses/LICENSE-2.0
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
23 import signal #Causes keyboard interrupts to go to the main thread.
27 print_safe_lock = threading.Lock()
29 print_safe_lock.acquire()
31 print_safe_lock.release()
33 def start_thread(target, args):
34 t = threading.Thread(target=target, args=args)
39 #Caller is responsible for catching socket.error exceptions.
40 def send_packet(key, length, dest_ip, dest_port):
42 length -= 20 + 8 #IP and UDP headers.
45 packet += chr(0) * (length - len(packet))
47 sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
48 sock.sendto(packet, (dest_ip, dest_port))
53 def __init__(self, vlan_ip, vlan_port):
54 self.vlan_ip = vlan_ip
55 self.vlan_port = vlan_port
56 self.recv_callbacks = {}
59 def recv_packet(self, key, success_callback, timeout_callback):
61 event = threading.Event()
67 timer = threading.Timer(30, timeout_cb)
75 # Start the timer first to avoid a timer.cancel() race condition.
77 self.recv_callbacks[key] = success_cb
80 def udp_receiver(self):
82 sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
86 sock.bind((self.vlan_ip, self.vlan_port))
87 except socket.error, e:
88 print_safe('Failed to bind to %s:%d with error: %s'
89 % (self.vlan_ip, self.vlan_port, e))
90 os._exit(1) #sys.exit only exits the current thread.
95 data, _ = sock.recvfrom(4096)
96 except socket.timeout:
98 except socket.error, e:
99 print_safe('Failed to receive from %s:%d with error: %s'
100 % (self.vlan_ip, self.vlan_port, e))
103 data_str = data.split(chr(0))[0]
105 if not data_str.isdigit():
110 if key in self.recv_callbacks:
111 self.recv_callbacks[key]()
112 del self.recv_callbacks[key]
116 start_thread(self.udp_receiver, ())
125 def __init__(self, server_ip, server_port, vlan_ip, vlan_port):
130 self.server_ip = server_ip
131 self.server_port = server_port
133 self.recv_response = '%s:%d:' % (vlan_ip, vlan_port)
136 self.result_lock = threading.Lock()
139 self._test_id_lock = threading.Lock()
141 self.udp_recv = UDPReceiver(vlan_ip, vlan_port)
143 def get_test_id(self):
144 self._test_id_lock.acquire()
149 self._test_id_lock.release()
152 def set_result(self, key, value):
154 self.result_lock.acquire()
156 if key not in self.result:
157 self.result[key] = value
159 self.result_lock.release()
161 def recv(self, test_id):
162 self.udp_recv.recv_packet(test_id,
163 lambda : self.set_result(test_id, 'Success'),
164 lambda : self.set_result(test_id, 'Timeout'))
166 return self.recv_response + str(test_id)
168 def send(self, test_id, data):
170 ip, port, size = data.split(':')
174 self.set_result(test_id,
175 'Server failed to parse send request: %s' % data)
180 for _ in range(send_time * 2):
182 send_packet(test_id, size, ip, port)
183 except socket.error, e:
184 self.set_result(test_id, 'Failure: ' + str(e))
188 self.set_result(test_id, 'Success')
190 start_thread(send_thread, ())
195 self.udp_recv.start()
197 BaseHTTPServer.HTTPServer((self.server_ip, self.server_port),
198 VlanServerHandler).serve_forever()
199 except socket.error, e:
200 print_safe('Failed to start control server: %s' % e)
205 class VlanServerHandler(BaseHTTPServer.BaseHTTPRequestHandler):
208 #Guarantee three arguments.
209 path = (self.path.lower().lstrip('/') + '//').split('/')
214 if path[0] == 'start':
215 test_id = vlan_server.get_test_id()
217 if path[1] == 'recv':
219 body = vlan_server.recv(test_id)
220 elif path[1] == 'send':
222 body = vlan_server.send(test_id, path[2])
223 elif (path[0] == 'result'
224 and path[1].isdigit()
225 and int(path[1]) in vlan_server.result):
227 body = vlan_server.result[int(path[1])]
228 elif path[0] == 'ping':
232 self.send_response(resp)
236 self.wfile.write(body)
241 def __init__(self, server_ip, server_port, vlan_ip, vlan_port):
242 self.server_ip_port = '%s:%d' % (server_ip, server_port)
243 self.vlan_ip_port = "%s:%d" % (vlan_ip, vlan_port)
244 self.udp_recv = UDPReceiver(vlan_ip, vlan_port)
246 def request(self, resource):
247 conn = httplib.HTTPConnection(self.server_ip_port)
248 conn.request('GET', resource)
251 def send(self, size):
254 print_safe('Send size %d unsuccessful: %s' % (size, e))
257 conn = self.request('/start/recv')
258 data = conn.getresponse().read()
259 except (socket.error, httplib.HTTPException), e:
264 ip, port, test_id = data.split(':')
266 test_id = int(test_id)
268 error_msg("Received invalid response from control server (%s)" %
274 for _ in range(send_time * 4):
277 send_packet(test_id, size, ip, port)
278 resp = self.request('/result/%d' % test_id).getresponse()
280 except (socket.error, httplib.HTTPException), e:
284 if resp.status == 200 and data == 'Success':
285 print_safe('Send size %d successful' % size)
287 elif resp.status == 200:
296 def recv(self, size):
299 print_safe('Receive size %d unsuccessful: %s' % (size, e))
301 resource = '/start/send/%s:%d' % (self.vlan_ip_port, size)
303 conn = self.request(resource)
304 test_id = conn.getresponse().read()
305 except (socket.error, httplib.HTTPException), e:
309 if not test_id.isdigit():
310 error_msg('Invalid response %s' % test_id)
313 success = [False] #Primitive datatypes can't be set from closures.
321 self.udp_recv.recv_packet(int(test_id), success_cb, failure_cb).wait()
324 print_safe('Receive size %d successful' % size)
333 print_safe('Failed control server connectivity test: %s' % e)
336 resp = self.request('/ping').getresponse()
338 except (socket.error, httplib.HTTPException), e:
342 if resp.status != 200:
343 error_msg('Invalid status %d' % resp.status)
345 error_msg('Invalid response %s' % data)
351 if not self.server_up():
354 self.udp_recv.start()
357 for size in [50, 500, 1000, 1500]:
358 success = self.send(size) and success
359 success = self.recv(size) and success
372 %(argv0)s: Test vlan connectivity
373 usage: %(argv0)s server vlan
375 The following options are also available:
376 -s, --server run in server mode
377 -h, --help display this help message
378 -V, --version display version information\
379 """ % {'argv0': sys.argv[0]})
384 options, args = getopt.gnu_getopt(sys.argv[1:], 'hVs',
385 ['help', 'version', 'server'])
386 except getopt.GetoptError, geo:
387 print_safe('%s: %s\n' % (sys.argv[0], geo.msg))
391 for key, _ in options:
392 if key in ['-h', '--help']:
395 elif key in ['-V', '--version']:
396 print_safe('ovs-vlan-test (Open vSwitch) @VERSION@')
398 elif key in ['-s', '--server']:
401 print_safe('Unexpected option %s. (use --help for help)' % key)
405 print_safe('Expecting two arguments. (use --help for help)')
409 server_ip, server_port = args[0].split(':')
410 server_port = int(server_port)
416 vlan_ip, vlan_port = args[1].split(':')
417 vlan_port = int(vlan_port)
423 return VlanServer(server_ip, server_port, vlan_ip, vlan_port).run()
425 return VlanClient(server_ip, server_port, vlan_ip, vlan_port).run()
427 if __name__ == '__main__':
430 # Python can throw exceptions if threads are running at exit.
431 for th in threading.enumerate():
432 if th != threading.currentThread():