1 # Copyright (c) 2012 Nicira, Inc.
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at:
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
28 Message = ovs.jsonrpc.Message
29 vlog = ovs.vlog.Vlog("unixctl_server")
30 strtypes = types.StringTypes
33 class UnixctlConnection(object):
34 def __init__(self, rpc):
35 assert isinstance(rpc, ovs.jsonrpc.Connection)
37 self._request_id = None
41 error = self._rpc.get_status()
42 if error or self._rpc.get_backlog():
46 if error or self._request_id:
49 error, msg = self._rpc.recv()
51 if msg.type == Message.T_REQUEST:
52 self._process_command(msg)
55 vlog.warn("%s: received unexpected %s message"
57 Message.type_to_string(msg.type)))
61 error = self._rpc.get_status()
65 def reply(self, body):
66 self._reply_impl(True, body)
68 def reply_error(self, body):
69 self._reply_impl(False, body)
71 # Called only by unixctl classes.
74 self._request_id = None
76 def _wait(self, poller):
77 self._rpc.wait(poller)
78 if not self._rpc.get_backlog():
79 self._rpc.recv_wait(poller)
81 def _reply_impl(self, success, body):
82 assert isinstance(success, bool)
83 assert body is None or isinstance(body, strtypes)
85 assert self._request_id is not None
90 if body and not body.endswith("\n"):
94 reply = Message.create_reply(body, self._request_id)
96 reply = Message.create_error(body, self._request_id)
99 self._request_id = None
101 def _process_command(self, request):
102 assert isinstance(request, ovs.jsonrpc.Message)
103 assert request.type == ovs.jsonrpc.Message.T_REQUEST
105 self._request_id = request.id
108 params = request.params
109 method = request.method
110 command = ovs.unixctl.commands.get(method)
112 error = '"%s" is not a valid command' % method
113 elif len(params) < command.min_args:
114 error = '"%s" command requires at least %d arguments' \
115 % (method, command.min_args)
116 elif len(params) > command.max_args:
117 error = '"%s" command takes at most %d arguments' \
118 % (method, command.max_args)
121 if not isinstance(param, strtypes):
122 error = '"%s" command has non-string argument' % method
126 unicode_params = [unicode(p) for p in params]
127 command.callback(self, unicode_params, command.aux)
130 self.reply_error(error)
133 def _unixctl_version(conn, unused_argv, version):
134 assert isinstance(conn, UnixctlConnection)
135 version = "%s (Open vSwitch) %s" % (ovs.util.PROGRAM_NAME, version)
138 class UnixctlServer(object):
139 def __init__(self, listener):
140 assert isinstance(listener, ovs.stream.PassiveStream)
141 self._listener = listener
146 error, stream = self._listener.accept()
148 rpc = ovs.jsonrpc.Connection(stream)
149 self._conns.append(UnixctlConnection(rpc))
150 elif error == errno.EAGAIN:
154 vlog.warn("%s: accept failed: %s" % (self._listener.name,
157 for conn in copy.copy(self._conns):
159 if error and error != errno.EAGAIN:
161 self._conns.remove(conn)
163 def wait(self, poller):
164 self._listener.wait(poller)
165 for conn in self._conns:
169 for conn in self._conns:
173 self._listener.close()
174 self._listener = None
177 def create(path, version=None):
178 """Creates a new UnixctlServer which listens on a unixctl socket
179 created at 'path'. If 'path' is None, the default path is chosen.
180 'version' contains the version of the server as reported by the unixctl
181 version command. If None, ovs.version.VERSION is used."""
183 assert path is None or isinstance(path, strtypes)
186 path = "punix:%s" % ovs.util.abs_file_name(ovs.dirs.RUNDIR, path)
188 path = "punix:%s/%s.%d.ctl" % (ovs.dirs.RUNDIR,
189 ovs.util.PROGRAM_NAME, os.getpid())
192 version = ovs.version.VERSION
194 error, listener = ovs.stream.PassiveStream.open(path)
196 ovs.util.ovs_error(error, "could not initialize control socket %s"
200 ovs.unixctl.command_register("version", "", 0, 0, _unixctl_version,
203 return 0, UnixctlServer(listener)
206 class UnixctlClient(object):
207 def __init__(self, conn):
208 assert isinstance(conn, ovs.jsonrpc.Connection)
211 def transact(self, command, argv):
212 assert isinstance(command, strtypes)
213 assert isinstance(argv, list)
215 assert isinstance(arg, strtypes)
217 request = Message.create_request(command, argv)
218 error, reply = self._conn.transact_block(request)
221 vlog.warn("error communicating with %s: %s"
222 % (self._conn.name, os.strerror(error)))
223 return error, None, None
225 if reply.error is not None:
226 return 0, str(reply.error), None
228 assert reply.result is not None
229 return 0, None, str(reply.result)
237 assert isinstance(path, str)
239 unix = "unix:%s" % ovs.util.abs_file_name(ovs.dirs.RUNDIR, path)
240 error, stream = ovs.stream.Stream.open_block(
241 ovs.stream.Stream.open(unix))
244 vlog.warn("failed to connect to %s" % path)
247 return 0, UnixctlClient(ovs.jsonrpc.Connection(stream))