1 # Copyright (c) 2010 Citrix Systems, Inc.
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at:
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
15 from OVEStandard import *
16 from OVEConfig import *
17 from OVELogger import *
19 # This sequence installs the qt4reactor before twisted gets a chance to install its reactor
21 globalApp = QtGui.QApplication([])
25 from twisted.conch.ssh import transport, userauth, connection, common, keys, channel
26 from twisted.internet import defer, protocol, reactor
27 from twisted.application import reactors
29 print('+++ Python Twisted Conch module is required\n')
32 class OVEFetchUserAuth(userauth.SSHUserAuthClient):
33 def __init__(self, fetch, *params):
34 userauth.SSHUserAuthClient.__init__(self, *params)
38 def getPassword(self):
39 return defer.succeed(self.fetch.config()['password'])
41 def ssh_USERAUTH_FAILURE(self, packet):
42 if self.authFails > 0: # We normally get one so ignore. Real failures send these repeatedly
43 OVELog('Authentication failure for '+self.fetch.config()['address'])
45 userauth.SSHUserAuthClient.ssh_USERAUTH_FAILURE(self, packet)
47 class OVEFetchConnection(connection.SSHConnection, QtCore.QObject):
48 def __init__(self, fetch, *params):
49 connection.SSHConnection.__init__(self, *params)
50 QtCore.QObject.__init__(self)
53 self._oldChannels = []
55 def serviceStarted(self):
56 self.emit(QtCore.SIGNAL('connectionService(QObject)'), self)
58 def serviceStopped(self):
59 self.emit(QtCore.SIGNAL('connectionService(QObject)'), None)
61 def execCommand(self, requester, ref, command, commandType):
62 if self._channel is not None:
63 # Don't delete old channels immediately in case they're e.g. going to time out with a failure
64 self._oldChannels.append(self._channel)
65 if len(self._oldChannels) > 90:
66 # For 30 second timeouts at 1 second refresh interval and three windows open on a single host, need 90 channels
67 del self._oldChannels[1]
68 self._channel = OVECommandChannel(self.fetch, requester, ref, command, commandType, 2**16, 2**15, self)
69 self.openChannel(self._channel)
71 def connectionLost(self, reason):
72 if self._channel is not None:
73 self._channel.connectionLost(reason)
75 class OVEFetchTransport(transport.SSHClientTransport, QtCore.QObject):
76 def __init__(self, fetch, *params):
77 # There is no __init__ method for this class
78 # transport.SSHClientTransport.__init__(self, *params)
80 QtCore.QObject.__init__(self)
82 self._connection = None
83 self.connect(self, QtCore.SIGNAL('channelFailure(QObject, int, QString, QString, QString)'), self.fetch.xon_channelFailure)
85 def verifyHostKey(self, hostKey, fingerprint):
86 return defer.succeed(1)
88 def connectionSecure(self):
89 self._connection = OVEFetchConnection(self.fetch)
90 QtCore.QObject.connect(self._connection, QtCore.SIGNAL('connectionService(QObject)'), self.fetch.xon_connectionService)
92 OVEFetchUserAuth(self.fetch, self.fetch.config().get('username', 'root'),
95 def connectionLost(self, reason):
96 if self._connection is not None:
97 self._connection.connectionLost(reason)
99 class OVEFetchWrapper:
100 def __init__(self, contents):
101 self.contents = contents
103 class OVECommandChannel(channel.SSHChannel, QtCore.QObject):
106 STATUS_CONNECTION_LOST = 100001
107 STATUS_TIMEOUT = 100002
108 END_MARKER='END-MARKER'
109 END_MARKER_RE=re.compile(r'^END-MARKER$', re.MULTILINE)
111 def __init__(self, fetch, requester, ref, command, commandType, *params):
112 channel.SSHChannel.__init__(self, *params)
113 QtCore.QObject.__init__(self)
115 self.requester = requester
117 self.command = command
118 self.commandType= commandType
121 self._jsonValues = None
124 self.connect(self, QtCore.SIGNAL('channelData(QObject, int, QString)'), self.fetch.xon_channelData)
125 self.connect(self, QtCore.SIGNAL('channelExtData(QObject, int, QString)'), self.fetch.xon_channelExtData)
126 self.connect(self, QtCore.SIGNAL('channelSuccess(QObject, int, QString, QString, QVariant)'), self.fetch.xon_channelSuccess)
127 self.connect(self, QtCore.SIGNAL('channelFailure(QObject, int, QString, QString, QString)'), self.fetch.xon_channelFailure)
129 def openFailed(self, reason):
130 if self._timerId is not None:
131 self.killTimer(self._timerId)
132 self.emit(QtCore.SIGNAL('channelFailure(QObject, int, QString, QString, QString)'), self.requester, self.ref,
133 'Open failed:'+str(reason), '', '')
135 def channelOpen(self, ignoredData):
137 nsCommand = common.NS(str(self.command))
138 self._timerId = self.startTimer(self.MSEC_TIMEOUT)
139 self.conn.sendRequest(self, 'exec', nsCommand, wantReply=1)
141 self.emit(QtCore.SIGNAL('channelFailure(QObject, int, QString, QString, QString)'), self.requester, self.ref,
142 'Open failed:'+str(e), self._data, self._extData)
144 def dataReceived(self, data):
146 if OVEConfig.Inst().logTraffic:
147 self.emit(QtCore.SIGNAL('channelData(QObject, int, QString)'), self.requester, self.ref, data)
150 def extDataReceived(self, extData):
151 self._extData += extData
152 if OVEConfig.Inst().logTraffic:
153 self.emit(QtCore.SIGNAL('channelExtData(QObject, int, QString)'), self.requester, self.ref, extData)
155 def request_exit_status(self, data):
156 # We can get the exit status before the data, so delay calling sendResult until we get both
157 self._status = struct.unpack('>L', data)[0]
160 def testIfDone(self):
161 if self._status is not None:
162 if self._status != 0:
163 self.sendResult() # Failed, so send what we have
164 elif len(self._data) > 0:
165 # Status == success and we have some data
166 if self.commandType == 'JSON':
168 # Decode the JSON data, to confirm that we have all of the data
169 self._jsonValues = json.read(str(self._data)) # FIXME: Should handle unicode
172 pass # Wait for more data
173 elif self.commandType == 'framed':
174 match = self.END_MARKER_RE.search(self._data)
176 self._data = self._data[:match.start()] # Remove end marker
179 OVELog('Bad command type')
181 def sendResult(self):
182 if self._timerId is not None:
183 self.killTimer(self._timerId)
184 if self.commandType == 'JSON' and self._status == 0 and self._jsonValues is not None:
185 self.emit(QtCore.SIGNAL('channelSuccess(QObject, int, QString, QString, QVariant)'), self.requester, self.ref, self._data, self._extData, QVariant(OVEFetchWrapper(self._jsonValues)))
186 elif self.commandType != 'JSON' and self._status == 0:
187 self.emit(QtCore.SIGNAL('channelSuccess(QObject, int, QString, QString, QVariant)'), self.requester, self.ref, self._data, self._extData, QVariant(None))
189 self.emit(QtCore.SIGNAL('channelFailure(QObject, int, QString, QString, QString)'), self.requester, self.ref, 'Remote command failed (rc='+str(self._status)+')', self._data, self._extData)
190 if self._status != self.STATUS_CONNECTION_LOST:
192 self.loseConnection()
194 OVELog('OVECommandChannel.sendResult loseConnection error: '+str(e))
196 def connectionLost(self, reason):
197 self._extData += '+++ Connection lost'
198 self._status = self.STATUS_CONNECTION_LOST
201 def timerEvent(self, event):
202 if event.timerId() == self._timerId:
203 self._extData += '+++ Timeout'
204 self._status = self.STATUS_TIMEOUT
207 QtCore.QObject.timerEvent(self, event)
209 class OVEFetchEvent(QtCore.QEvent):
210 TYPE = QtCore.QEvent.Type(QtCore.QEvent.registerEventType())
211 def __init__(self, ref, data):
212 QtCore.QEvent.__init__(self, self.TYPE)
216 class OVEFetchFailEvent(QtCore.QEvent):
217 TYPE = QtCore.QEvent.Type(QtCore.QEvent.registerEventType())
218 def __init__(self, ref, message):
219 QtCore.QEvent.__init__(self, self.TYPE)
221 self.message = str(message)
223 class OVEFetch(QtCore.QObject):
227 def __init__(self, uuid):
228 QtCore.QObject.__init__(self)
229 self._hostUuid = uuid
231 self._transport = None
232 self._connection = None
233 self._commandQueue = []
238 self.connect(OVEConfig.Inst(), QtCore.SIGNAL("configUpdated()"), self.xon_configUpdated)
242 if uuid not in cls.instances:
243 cls.instances[uuid] = OVEFetch(uuid)
244 return cls.instances[uuid]
247 def startReactor(cls):
250 def xon_configUpdated(self):
252 self.resetTransport()
254 def xon_connectionService(self, connection):
255 self._connection = connection
256 if self._connection is not None:
257 OVELog('SSH connection to '+self.config()['address'] +' established')
258 for command in self._commandQueue:
259 # OVELog('Unqueueing '+str(command))
260 self.execCommand2(*command)
261 self._commandQueue = []
263 def xon_channelData(self, requester, ref, data):
264 if OVEConfig.Inst().logTraffic:
265 OVELog('Channel data received: '+str(data))
267 def xon_channelExtData(self, requester, ref, data):
268 if OVEConfig.Inst().logTraffic:
269 OVELog('+++ Channel extData (stderr) received: '+str(data))
271 def xon_channelFailure(self, requester, ref, message, data, extData):
272 if OVEConfig.Inst().logTraffic:
273 OVELog('+++ Channel failure: '+str(message))
274 OVELog("Closing SSH session due to failure")
278 errMessage += '\n+++ Failed command output: '+data
280 errMessage += '\n+++ Failed command output (stderr): '+extData
282 self.refs[requester] = ref # For PySide workaround
283 self.messages[requester] = errMessage # For PySide workaround
284 event = OVEFetchFailEvent(ref, errMessage)
285 QtCore.QCoreApplication.postEvent(requester, event)
286 self.resetTransport()
288 def xon_channelSuccess(self, requester, ref, data, extData, jsonValueVariant):
289 jsonValues = jsonValueVariant.toPyObject()
290 if OVEConfig.Inst().logTraffic:
291 OVELog('--- Channel success')
293 if jsonValues is not None:
294 values = jsonValues.contents
298 self.refs[requester] = ref # For PySide workaround
299 self.values[requester] = values # For PySide workaround
300 event = OVEFetchEvent(ref, values)
301 QtCore.QCoreApplication.postEvent(requester, event)
303 message = ('+++ Failed to decode JSON reply: '+str(e))
304 if len(data) > 0: message += "\n++++++ Data (stdout): "+str(data)
305 if len(extData) > 0: message += '\n++++++ Error (stderr): '+str(extData)
306 self.refs[requester] = ref # For PySide workaround
307 self.messages[requester] = message # For PySide workaround
308 event = OVEFetchFailEvent(ref, message)
309 QtCore.QCoreApplication.postEvent(requester, event)
311 # Use for workaround only
312 def snoopRef(self, requester):
313 return self.refs.get(requester, None)
315 # Use for workaround only
316 def snoopValues(self, requester):
317 return self.values.get(requester, None)
319 # Use for workaround only
320 def snoopMessage(self, requester):
321 return self.messages.get(requester, None)
324 if self._config is None:
325 self._config = OVEConfig.Inst().hostFromUuid(self._hostUuid)
329 def resetTransport(self):
330 if OVEConfig.Inst().logTraffic:
331 OVELog('Transport reset for '+self.config()['address'])
334 self._connection = None
335 self._transport = None
337 def transportErrback(self, failure, requester, ref, address):
338 self._timerRef += 1 # Prevent timeout handling
339 self.resetTransport()
340 message = 'Failure connecting to '+address+': '+failure.getErrorMessage()
341 self.refs[requester] = ref # For PySide workaround
342 self.messages[requester] = message # For PySide workaround
343 event = OVEFetchFailEvent(ref, message)
344 QtCore.QCoreApplication.postEvent(requester, event)
346 def transportTimeout(self, timerRef, requester, ref, address):
347 if self._timerRef == timerRef and self._transport is not None and self._connection is None:
348 message = 'Connection attempt to ' +address+' timed out'
349 self.refs[requester] = ref # For PySide workaround
350 self.messages[requester] = message # For PySide workaround
351 event = OVEFetchFailEvent(ref, message)
352 QtCore.QCoreApplication.postEvent(requester, event)
353 self.resetTransport()
355 def execCommand(self, requester, ref, command, commandType):
356 if OVEConfig.Inst().logTraffic:
357 hostName = (self.config() or {}).get('address', '<Address not set>')
358 OVELog(str(QtCore.QTime.currentTime().toString())+' '+hostName+': Executing '+command)
359 if self._transport is None:
360 self._connection = None
361 self._commandQueue.append((requester, ref, command, commandType))
362 config = self.config()
363 creator = protocol.ClientCreator(reactor, OVEFetchTransport, self)
364 self._transport = creator.connectTCP(config['address'], config.get('port', 22), timeout = self.SEC_TIMEOUT)
365 self._transport.addErrback(self.transportErrback, requester, ref, config['address'])
367 # Set this timer slightly longer than the twisted.conch timeout, as transportErrback can cancel
368 # the timeout and prevent double handling
369 # lambda timerRef = self._timerRef: takes a copy of self._timerRef
370 QtCore.QTimer.singleShot(int((1+self.SEC_TIMEOUT) * 1000), lambda timerRef = self._timerRef: self.transportTimeout(timerRef, requester, ref, config['address']))
372 self.execCommand2(requester, ref, command, commandType)
374 def execCommand2(self, requester, ref, command, commandType):
375 if self._connection is None:
376 self._commandQueue.append((requester, ref, command, commandType))
378 self._connection.execCommand(requester, ref, command, commandType)
380 def getTable(self, requester, tableName, ref = QtCore.QObject()):
381 command = '/usr/bin/ovsdb-client transact '+self.config()['connectTarget']+' \'["Open_vSwitch", {"op":"select","table":"'+tableName+'", "where":[]}]\''
383 self.execCommand(requester, ref, command, 'JSON')
385 def execCommandFramed(self, requester, ref, command):
386 self.execCommand(requester, ref, command + ' && echo ' + OVECommandChannel.END_MARKER, 'framed')