From: Ben Pfaff Date: Mon, 14 May 2012 18:26:36 +0000 (-0700) Subject: python: Break unixctl implementation into registry, client, and server. X-Git-Url: https://pintos-os.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=53cf9963ccc60b443d738b31fbb446bc79170693;p=openvswitch python: Break unixctl implementation into registry, client, and server. I wish to add some unixctl commands to the Python vlog module. However, importing ovs.unixctl in ovs.vlog creates a circular dependency, because ovs.unixctl imports ovs.vlog already. The solution, in this commit, is to break the unixctl module into three parts: a register (ovs.unixctl) that does not depend on ovs.vlog, and client (ovs.unixctl.client) and server (ovs.unixctl.server) modules that do. This breaks the circular dependency. Signed-off-by: Ben Pfaff --- diff --git a/debian/ovs-monitor-ipsec b/debian/ovs-monitor-ipsec index 94048047..ffaa979d 100755 --- a/debian/ovs-monitor-ipsec +++ b/debian/ovs-monitor-ipsec @@ -38,6 +38,7 @@ import ovs.util import ovs.daemon import ovs.db.idl import ovs.unixctl +import ovs.unixctl.server import ovs.vlog vlog = ovs.vlog.Vlog("ovs-monitor-ipsec") @@ -414,7 +415,7 @@ def main(): ovs.daemon.daemonize() ovs.unixctl.command_register("exit", "", 0, 0, unixctl_exit, None) - error, unixctl_server = ovs.unixctl.UnixctlServer.create(None) + error, unixctl_server = ovs.unixctl.server.UnixctlServer.create(None) if error: ovs.util.ovs_fatal(error, "could not create unixctl server", vlog) diff --git a/python/automake.mk b/python/automake.mk index 4c0e78b4..96869e30 100644 --- a/python/automake.mk +++ b/python/automake.mk @@ -27,7 +27,9 @@ ovs_pyfiles = \ python/ovs/socket_util.py \ python/ovs/stream.py \ python/ovs/timeval.py \ - python/ovs/unixctl.py \ + python/ovs/unixctl/__init__.py \ + python/ovs/unixctl/client.py \ + python/ovs/unixctl/server.py \ python/ovs/util.py \ python/ovs/version.py \ python/ovs/vlog.py diff --git a/python/ovs/unixctl.py b/python/ovs/unixctl.py deleted file mode 100644 index 0fadaeb6..00000000 --- a/python/ovs/unixctl.py +++ /dev/null @@ -1,312 +0,0 @@ -# Copyright (c) 2012 Nicira, Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at: -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import copy -import errno -import os -import types - -import ovs.daemon -import ovs.dirs -import ovs.jsonrpc -import ovs.stream -import ovs.util -import ovs.version -import ovs.vlog - -Message = ovs.jsonrpc.Message -vlog = ovs.vlog.Vlog("unixctl") -commands = {} -strtypes = types.StringTypes - - -class _UnixctlCommand(object): - def __init__(self, usage, min_args, max_args, callback, aux): - self.usage = usage - self.min_args = min_args - self.max_args = max_args - self.callback = callback - self.aux = aux - - -def _unixctl_help(conn, unused_argv, unused_aux): - assert isinstance(conn, UnixctlConnection) - reply = "The available commands are:\n" - command_names = sorted(commands.keys()) - for name in command_names: - reply += " " - usage = commands[name].usage - if usage: - reply += "%-23s %s" % (name, usage) - else: - reply += name - reply += "\n" - conn.reply(reply) - - -def _unixctl_version(conn, unused_argv, version): - assert isinstance(conn, UnixctlConnection) - version = "%s (Open vSwitch) %s" % (ovs.util.PROGRAM_NAME, version) - conn.reply(version) - - -def command_register(name, usage, min_args, max_args, callback, aux): - """ Registers a command with the given 'name' to be exposed by the - UnixctlServer. 'usage' describes the arguments to the command; it is used - only for presentation to the user in "help" output. - - 'callback' is called when the command is received. It is passed a - UnixctlConnection object, the list of arguments as unicode strings, and - 'aux'. Normally 'callback' should reply by calling - UnixctlConnection.reply() or UnixctlConnection.reply_error() before it - returns, but if the command cannot be handled immediately, then it can - defer the reply until later. A given connection can only process a single - request at a time, so a reply must be made eventually to avoid blocking - that connection.""" - - assert isinstance(name, strtypes) - assert isinstance(usage, strtypes) - assert isinstance(min_args, int) - assert isinstance(max_args, int) - assert isinstance(callback, types.FunctionType) - - if name not in commands: - commands[name] = _UnixctlCommand(usage, min_args, max_args, callback, - aux) - - -def socket_name_from_target(target): - assert isinstance(target, strtypes) - - if target.startswith("/"): - return 0, target - - pidfile_name = "%s/%s.pid" % (ovs.dirs.RUNDIR, target) - pid = ovs.daemon.read_pidfile(pidfile_name) - if pid < 0: - return -pid, "cannot read pidfile \"%s\"" % pidfile_name - - return 0, "%s/%s.%d.ctl" % (ovs.dirs.RUNDIR, target, pid) - - -class UnixctlConnection(object): - def __init__(self, rpc): - assert isinstance(rpc, ovs.jsonrpc.Connection) - self._rpc = rpc - self._request_id = None - - def run(self): - self._rpc.run() - error = self._rpc.get_status() - if error or self._rpc.get_backlog(): - return error - - for _ in range(10): - if error or self._request_id: - break - - error, msg = self._rpc.recv() - if msg: - if msg.type == Message.T_REQUEST: - self._process_command(msg) - else: - # XXX: rate-limit - vlog.warn("%s: received unexpected %s message" - % (self._rpc.name, - Message.type_to_string(msg.type))) - error = errno.EINVAL - - if not error: - error = self._rpc.get_status() - - return error - - def reply(self, body): - self._reply_impl(True, body) - - def reply_error(self, body): - self._reply_impl(False, body) - - # Called only by unixctl classes. - def _close(self): - self._rpc.close() - self._request_id = None - - def _wait(self, poller): - self._rpc.wait(poller) - if not self._rpc.get_backlog(): - self._rpc.recv_wait(poller) - - def _reply_impl(self, success, body): - assert isinstance(success, bool) - assert body is None or isinstance(body, strtypes) - - assert self._request_id is not None - - if body is None: - body = "" - - if body and not body.endswith("\n"): - body += "\n" - - if success: - reply = Message.create_reply(body, self._request_id) - else: - reply = Message.create_error(body, self._request_id) - - self._rpc.send(reply) - self._request_id = None - - def _process_command(self, request): - assert isinstance(request, ovs.jsonrpc.Message) - assert request.type == ovs.jsonrpc.Message.T_REQUEST - - self._request_id = request.id - - error = None - params = request.params - method = request.method - command = commands.get(method) - if command is None: - error = '"%s" is not a valid command' % method - elif len(params) < command.min_args: - error = '"%s" command requires at least %d arguments' \ - % (method, command.min_args) - elif len(params) > command.max_args: - error = '"%s" command takes at most %d arguments' \ - % (method, command.max_args) - else: - for param in params: - if not isinstance(param, strtypes): - error = '"%s" command has non-string argument' % method - break - - if error is None: - unicode_params = [unicode(p) for p in params] - command.callback(self, unicode_params, command.aux) - - if error: - self.reply_error(error) - - -class UnixctlServer(object): - def __init__(self, listener): - assert isinstance(listener, ovs.stream.PassiveStream) - self._listener = listener - self._conns = [] - - def run(self): - for _ in range(10): - error, stream = self._listener.accept() - if not error: - rpc = ovs.jsonrpc.Connection(stream) - self._conns.append(UnixctlConnection(rpc)) - elif error == errno.EAGAIN: - break - else: - # XXX: rate-limit - vlog.warn("%s: accept failed: %s" % (self._listener.name, - os.strerror(error))) - - for conn in copy.copy(self._conns): - error = conn.run() - if error and error != errno.EAGAIN: - conn._close() - self._conns.remove(conn) - - def wait(self, poller): - self._listener.wait(poller) - for conn in self._conns: - conn._wait(poller) - - def close(self): - for conn in self._conns: - conn._close() - self._conns = None - - self._listener.close() - self._listener = None - - @staticmethod - def create(path, version=None): - """Creates a new UnixctlServer which listens on a unixctl socket - created at 'path'. If 'path' is None, the default path is chosen. - 'version' contains the version of the server as reported by the unixctl - version command. If None, ovs.version.VERSION is used.""" - - assert path is None or isinstance(path, strtypes) - - if path is not None: - path = "punix:%s" % ovs.util.abs_file_name(ovs.dirs.RUNDIR, path) - else: - path = "punix:%s/%s.%d.ctl" % (ovs.dirs.RUNDIR, - ovs.util.PROGRAM_NAME, os.getpid()) - - if version is None: - version = ovs.version.VERSION - - error, listener = ovs.stream.PassiveStream.open(path) - if error: - ovs.util.ovs_error(error, "could not initialize control socket %s" - % path) - return error, None - - command_register("help", "", 0, 0, _unixctl_help, None) - command_register("version", "", 0, 0, _unixctl_version, version) - - return 0, UnixctlServer(listener) - - -class UnixctlClient(object): - def __init__(self, conn): - assert isinstance(conn, ovs.jsonrpc.Connection) - self._conn = conn - - def transact(self, command, argv): - assert isinstance(command, strtypes) - assert isinstance(argv, list) - for arg in argv: - assert isinstance(arg, strtypes) - - request = Message.create_request(command, argv) - error, reply = self._conn.transact_block(request) - - if error: - vlog.warn("error communicating with %s: %s" - % (self._conn.name, os.strerror(error))) - return error, None, None - - if reply.error is not None: - return 0, str(reply.error), None - else: - assert reply.result is not None - return 0, None, str(reply.result) - - def close(self): - self._conn.close() - self.conn = None - - @staticmethod - def create(path): - assert isinstance(path, str) - - unix = "unix:%s" % ovs.util.abs_file_name(ovs.dirs.RUNDIR, path) - error, stream = ovs.stream.Stream.open_block( - ovs.stream.Stream.open(unix)) - - if error: - vlog.warn("failed to connect to %s" % path) - return error, None - - return 0, UnixctlClient(ovs.jsonrpc.Connection(stream)) diff --git a/python/ovs/unixctl/__init__.py b/python/ovs/unixctl/__init__.py new file mode 100644 index 00000000..715f2db5 --- /dev/null +++ b/python/ovs/unixctl/__init__.py @@ -0,0 +1,83 @@ +# Copyright (c) 2011, 2012 Nicira, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at: +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import types + +import ovs.util + +commands = {} +strtypes = types.StringTypes + + +class _UnixctlCommand(object): + def __init__(self, usage, min_args, max_args, callback, aux): + self.usage = usage + self.min_args = min_args + self.max_args = max_args + self.callback = callback + self.aux = aux + + +def _unixctl_help(conn, unused_argv, unused_aux): + reply = "The available commands are:\n" + command_names = sorted(commands.keys()) + for name in command_names: + reply += " " + usage = commands[name].usage + if usage: + reply += "%-23s %s" % (name, usage) + else: + reply += name + reply += "\n" + conn.reply(reply) + + +def command_register(name, usage, min_args, max_args, callback, aux): + """ Registers a command with the given 'name' to be exposed by the + UnixctlServer. 'usage' describes the arguments to the command; it is used + only for presentation to the user in "help" output. + + 'callback' is called when the command is received. It is passed a + UnixctlConnection object, the list of arguments as unicode strings, and + 'aux'. Normally 'callback' should reply by calling + UnixctlConnection.reply() or UnixctlConnection.reply_error() before it + returns, but if the command cannot be handled immediately, then it can + defer the reply until later. A given connection can only process a single + request at a time, so a reply must be made eventually to avoid blocking + that connection.""" + + assert isinstance(name, strtypes) + assert isinstance(usage, strtypes) + assert isinstance(min_args, int) + assert isinstance(max_args, int) + assert isinstance(callback, types.FunctionType) + + if name not in commands: + commands[name] = _UnixctlCommand(usage, min_args, max_args, callback, + aux) + +def socket_name_from_target(target): + assert isinstance(target, strtypes) + + if target.startswith("/"): + return 0, target + + pidfile_name = "%s/%s.pid" % (ovs.dirs.RUNDIR, target) + pid = ovs.daemon.read_pidfile(pidfile_name) + if pid < 0: + return -pid, "cannot read pidfile \"%s\"" % pidfile_name + + return 0, "%s/%s.%d.ctl" % (ovs.dirs.RUNDIR, target, pid) + +command_register("help", "", 0, 0, _unixctl_help, None) diff --git a/python/ovs/unixctl/client.py b/python/ovs/unixctl/client.py new file mode 100644 index 00000000..2176009a --- /dev/null +++ b/python/ovs/unixctl/client.py @@ -0,0 +1,70 @@ +# Copyright (c) 2011, 2012 Nicira, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at: +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import copy +import errno +import os +import types + +import ovs.jsonrpc +import ovs.stream +import ovs.util + + +vlog = ovs.vlog.Vlog("unixctl_client") +strtypes = types.StringTypes + + +class UnixctlClient(object): + def __init__(self, conn): + assert isinstance(conn, ovs.jsonrpc.Connection) + self._conn = conn + + def transact(self, command, argv): + assert isinstance(command, strtypes) + assert isinstance(argv, list) + for arg in argv: + assert isinstance(arg, strtypes) + + request = ovs.jsonrpc.Message.create_request(command, argv) + error, reply = self._conn.transact_block(request) + + if error: + vlog.warn("error communicating with %s: %s" + % (self._conn.name, os.strerror(error))) + return error, None, None + + if reply.error is not None: + return 0, str(reply.error), None + else: + assert reply.result is not None + return 0, None, str(reply.result) + + def close(self): + self._conn.close() + self.conn = None + + @staticmethod + def create(path): + assert isinstance(path, str) + + unix = "unix:%s" % ovs.util.abs_file_name(ovs.dirs.RUNDIR, path) + error, stream = ovs.stream.Stream.open_block( + ovs.stream.Stream.open(unix)) + + if error: + vlog.warn("failed to connect to %s" % path) + return error, None + + return 0, UnixctlClient(ovs.jsonrpc.Connection(stream)) diff --git a/python/ovs/unixctl/server.py b/python/ovs/unixctl/server.py new file mode 100644 index 00000000..18e1cf20 --- /dev/null +++ b/python/ovs/unixctl/server.py @@ -0,0 +1,247 @@ +# Copyright (c) 2012 Nicira, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at: +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import copy +import errno +import os +import types + +import ovs.dirs +import ovs.jsonrpc +import ovs.stream +import ovs.unixctl +import ovs.util +import ovs.version +import ovs.vlog + +Message = ovs.jsonrpc.Message +vlog = ovs.vlog.Vlog("unixctl_server") +strtypes = types.StringTypes + + +class UnixctlConnection(object): + def __init__(self, rpc): + assert isinstance(rpc, ovs.jsonrpc.Connection) + self._rpc = rpc + self._request_id = None + + def run(self): + self._rpc.run() + error = self._rpc.get_status() + if error or self._rpc.get_backlog(): + return error + + for _ in range(10): + if error or self._request_id: + break + + error, msg = self._rpc.recv() + if msg: + if msg.type == Message.T_REQUEST: + self._process_command(msg) + else: + # XXX: rate-limit + vlog.warn("%s: received unexpected %s message" + % (self._rpc.name, + Message.type_to_string(msg.type))) + error = errno.EINVAL + + if not error: + error = self._rpc.get_status() + + return error + + def reply(self, body): + self._reply_impl(True, body) + + def reply_error(self, body): + self._reply_impl(False, body) + + # Called only by unixctl classes. + def _close(self): + self._rpc.close() + self._request_id = None + + def _wait(self, poller): + self._rpc.wait(poller) + if not self._rpc.get_backlog(): + self._rpc.recv_wait(poller) + + def _reply_impl(self, success, body): + assert isinstance(success, bool) + assert body is None or isinstance(body, strtypes) + + assert self._request_id is not None + + if body is None: + body = "" + + if body and not body.endswith("\n"): + body += "\n" + + if success: + reply = Message.create_reply(body, self._request_id) + else: + reply = Message.create_error(body, self._request_id) + + self._rpc.send(reply) + self._request_id = None + + def _process_command(self, request): + assert isinstance(request, ovs.jsonrpc.Message) + assert request.type == ovs.jsonrpc.Message.T_REQUEST + + self._request_id = request.id + + error = None + params = request.params + method = request.method + command = ovs.unixctl.commands.get(method) + if command is None: + error = '"%s" is not a valid command' % method + elif len(params) < command.min_args: + error = '"%s" command requires at least %d arguments' \ + % (method, command.min_args) + elif len(params) > command.max_args: + error = '"%s" command takes at most %d arguments' \ + % (method, command.max_args) + else: + for param in params: + if not isinstance(param, strtypes): + error = '"%s" command has non-string argument' % method + break + + if error is None: + unicode_params = [unicode(p) for p in params] + command.callback(self, unicode_params, command.aux) + + if error: + self.reply_error(error) + + +def _unixctl_version(conn, unused_argv, version): + assert isinstance(conn, UnixctlConnection) + version = "%s (Open vSwitch) %s" % (ovs.util.PROGRAM_NAME, version) + conn.reply(version) + +class UnixctlServer(object): + def __init__(self, listener): + assert isinstance(listener, ovs.stream.PassiveStream) + self._listener = listener + self._conns = [] + + def run(self): + for _ in range(10): + error, stream = self._listener.accept() + if not error: + rpc = ovs.jsonrpc.Connection(stream) + self._conns.append(UnixctlConnection(rpc)) + elif error == errno.EAGAIN: + break + else: + # XXX: rate-limit + vlog.warn("%s: accept failed: %s" % (self._listener.name, + os.strerror(error))) + + for conn in copy.copy(self._conns): + error = conn.run() + if error and error != errno.EAGAIN: + conn._close() + self._conns.remove(conn) + + def wait(self, poller): + self._listener.wait(poller) + for conn in self._conns: + conn._wait(poller) + + def close(self): + for conn in self._conns: + conn._close() + self._conns = None + + self._listener.close() + self._listener = None + + @staticmethod + def create(path, version=None): + """Creates a new UnixctlServer which listens on a unixctl socket + created at 'path'. If 'path' is None, the default path is chosen. + 'version' contains the version of the server as reported by the unixctl + version command. If None, ovs.version.VERSION is used.""" + + assert path is None or isinstance(path, strtypes) + + if path is not None: + path = "punix:%s" % ovs.util.abs_file_name(ovs.dirs.RUNDIR, path) + else: + path = "punix:%s/%s.%d.ctl" % (ovs.dirs.RUNDIR, + ovs.util.PROGRAM_NAME, os.getpid()) + + if version is None: + version = ovs.version.VERSION + + error, listener = ovs.stream.PassiveStream.open(path) + if error: + ovs.util.ovs_error(error, "could not initialize control socket %s" + % path) + return error, None + + ovs.unixctl.command_register("version", "", 0, 0, _unixctl_version, + version) + + return 0, UnixctlServer(listener) + + +class UnixctlClient(object): + def __init__(self, conn): + assert isinstance(conn, ovs.jsonrpc.Connection) + self._conn = conn + + def transact(self, command, argv): + assert isinstance(command, strtypes) + assert isinstance(argv, list) + for arg in argv: + assert isinstance(arg, strtypes) + + request = Message.create_request(command, argv) + error, reply = self._conn.transact_block(request) + + if error: + vlog.warn("error communicating with %s: %s" + % (self._conn.name, os.strerror(error))) + return error, None, None + + if reply.error is not None: + return 0, str(reply.error), None + else: + assert reply.result is not None + return 0, None, str(reply.result) + + def close(self): + self._conn.close() + self.conn = None + + @staticmethod + def create(path): + assert isinstance(path, str) + + unix = "unix:%s" % ovs.util.abs_file_name(ovs.dirs.RUNDIR, path) + error, stream = ovs.stream.Stream.open_block( + ovs.stream.Stream.open(unix)) + + if error: + vlog.warn("failed to connect to %s" % path) + return error, None + + return 0, UnixctlClient(ovs.jsonrpc.Connection(stream)) diff --git a/tests/appctl.py b/tests/appctl.py index e5698ef8..e5bcf2c0 100644 --- a/tests/appctl.py +++ b/tests/appctl.py @@ -18,6 +18,7 @@ import sys import ovs.daemon import ovs.unixctl +import ovs.unixctl.client import ovs.util import ovs.vlog @@ -29,7 +30,7 @@ def connect_to_target(target): else: socket_name = str_result - error, client = ovs.unixctl.UnixctlClient.create(socket_name) + error, client = ovs.unixctl.client.UnixctlClient.create(socket_name) if error: ovs.util.ovs_fatal(error, "cannot connect to \"%s\"" % socket_name) diff --git a/tests/test-unixctl.py b/tests/test-unixctl.py index 038c1bd6..3d86db17 100644 --- a/tests/test-unixctl.py +++ b/tests/test-unixctl.py @@ -17,6 +17,7 @@ import sys import ovs.daemon import ovs.unixctl +import ovs.unixctl.server vlog = ovs.vlog.Vlog("test-unixctl") exiting = False @@ -55,7 +56,7 @@ def main(): ovs.vlog.handle_args(args) ovs.daemon.daemonize_start() - error, server = ovs.unixctl.UnixctlServer.create(args.unixctl) + error, server = ovs.unixctl.server.UnixctlServer.create(args.unixctl) if error: ovs.util.ovs_fatal(error, "could not create unixctl server at %s" % args.unixctl, vlog) diff --git a/xenserver/usr_share_openvswitch_scripts_ovs-xapi-sync b/xenserver/usr_share_openvswitch_scripts_ovs-xapi-sync index fc215824..5083bbd1 100755 --- a/xenserver/usr_share_openvswitch_scripts_ovs-xapi-sync +++ b/xenserver/usr_share_openvswitch_scripts_ovs-xapi-sync @@ -35,6 +35,7 @@ from ovs.db import types import ovs.daemon import ovs.db.idl import ovs.unixctl +import ovs.unixctl.server vlog = ovs.vlog.Vlog("ovs-xapi-sync") session = None @@ -236,7 +237,7 @@ def main(): ovs.unixctl.command_register("exit", "", 0, 0, unixctl_exit, None) ovs.unixctl.command_register("flush-cache", "", 0, 0, unixctl_flush_cache, None) - error, unixctl_server = ovs.unixctl.UnixctlServer.create(None) + error, unixctl_server = ovs.unixctl.server.UnixctlServer.create(None) if error: ovs.util.ovs_fatal(error, "could not create unixctl server", vlog)