python: Add ovs_error() helper function to Python.
[openvswitch] / python / ovs / reconnect.py
1 # Copyright (c) 2010, 2011, 2012 Nicira Networks
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at:
6 #
7 #     http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14
15 import os
16
17 import ovs.vlog
18 import ovs.util
19
20 # Values returned by Reconnect.run()
21 CONNECT = 'connect'
22 DISCONNECT = 'disconnect'
23 PROBE = 'probe'
24
25 EOF = ovs.util.EOF
26 vlog = ovs.vlog.Vlog("reconnect")
27
28
29 class Reconnect(object):
30     """A finite-state machine for connecting and reconnecting to a network
31     resource with exponential backoff.  It also provides optional support for
32     detecting a connection on which the peer is no longer responding.
33
34     The library does not implement anything networking related, only an FSM for
35     networking code to use.
36
37     Many Reconnect methods take a "now" argument.  This makes testing easier
38     since there is no hidden state.  When not testing, just pass the return
39     value of ovs.time.msec().  (Perhaps this design should be revisited
40     later.)"""
41
42     class Void(object):
43         name = "VOID"
44         is_connected = False
45
46         @staticmethod
47         def deadline(fsm):
48             return None
49
50         @staticmethod
51         def run(fsm, now):
52             return None
53
54     class Listening(object):
55         name = "LISTENING"
56         is_connected = False
57
58         @staticmethod
59         def deadline(fsm):
60             return None
61
62         @staticmethod
63         def run(fsm, now):
64             return None
65
66     class Backoff(object):
67         name = "BACKOFF"
68         is_connected = False
69
70         @staticmethod
71         def deadline(fsm):
72             return fsm.state_entered + fsm.backoff
73
74         @staticmethod
75         def run(fsm, now):
76             return CONNECT
77
78     class ConnectInProgress(object):
79         name = "CONNECTING"
80         is_connected = False
81
82         @staticmethod
83         def deadline(fsm):
84             return fsm.state_entered + max(1000, fsm.backoff)
85
86         @staticmethod
87         def run(fsm, now):
88             return DISCONNECT
89
90     class Active(object):
91         name = "ACTIVE"
92         is_connected = True
93
94         @staticmethod
95         def deadline(fsm):
96             if fsm.probe_interval:
97                 base = max(fsm.last_received, fsm.state_entered)
98                 return base + fsm.probe_interval
99             return None
100
101         @staticmethod
102         def run(fsm, now):
103             vlog.dbg("%s: idle %d ms, sending inactivity probe"
104                      % (fsm.name,
105                         now - max(fsm.last_received, fsm.state_entered)))
106             fsm._transition(now, Reconnect.Idle)
107             return PROBE
108
109     class Idle(object):
110         name = "IDLE"
111         is_connected = True
112
113         @staticmethod
114         def deadline(fsm):
115             return fsm.state_entered + fsm.probe_interval
116
117         @staticmethod
118         def run(fsm, now):
119             vlog.err("%s: no response to inactivity probe after %.3g "
120                      "seconds, disconnecting"
121                       % (fsm.name, (now - fsm.state_entered) / 1000.0))
122             return DISCONNECT
123
124     class Reconnect(object):
125         name = "RECONNECT"
126         is_connected = False
127
128         @staticmethod
129         def deadline(fsm):
130             return fsm.state_entered
131
132         @staticmethod
133         def run(fsm, now):
134             return DISCONNECT
135
136     def __init__(self, now):
137         """Creates and returns a new reconnect FSM with default settings.  The
138         FSM is initially disabled.  The caller will likely want to call
139         self.enable() and self.set_name() on the returned object."""
140
141         self.name = "void"
142         self.min_backoff = 1000
143         self.max_backoff = 8000
144         self.probe_interval = 5000
145         self.passive = False
146         self.info_level = vlog.info
147
148         self.state = Reconnect.Void
149         self.state_entered = now
150         self.backoff = 0
151         self.last_received = now
152         self.last_connected = None
153         self.last_disconnected = None
154         self.max_tries = None
155
156         self.creation_time = now
157         self.n_attempted_connections = 0
158         self.n_successful_connections = 0
159         self.total_connected_duration = 0
160         self.seqno = 0
161
162     def set_quiet(self, quiet):
163         """If 'quiet' is true, this object will log informational messages at
164         debug level, by default keeping them out of log files.  This is
165         appropriate if the connection is one that is expected to be
166         short-lived, so that the log messages are merely distracting.
167
168         If 'quiet' is false, this object logs informational messages at info
169         level.  This is the default.
170
171         This setting has no effect on the log level of debugging, warning, or
172         error messages."""
173         if quiet:
174             self.info_level = vlog.dbg
175         else:
176             self.info_level = vlog.info
177
178     def get_name(self):
179         return self.name
180
181     def set_name(self, name):
182         """Sets this object's name to 'name'.  If 'name' is None, then "void"
183         is used instead.
184
185         The name is used in log messages."""
186         if name is None:
187             self.name = "void"
188         else:
189             self.name = name
190
191     def get_min_backoff(self):
192         """Return the minimum number of milliseconds to back off between
193         consecutive connection attempts.  The default is 1000 ms."""
194         return self.min_backoff
195
196     def get_max_backoff(self):
197         """Return the maximum number of milliseconds to back off between
198         consecutive connection attempts.  The default is 8000 ms."""
199         return self.max_backoff
200
201     def get_probe_interval(self):
202         """Returns the "probe interval" in milliseconds.  If this is zero, it
203         disables the connection keepalive feature.  If it is nonzero, then if
204         the interval passes while the FSM is connected and without
205         self.received() being called, self.run() returns ovs.reconnect.PROBE.
206         If the interval passes again without self.received() being called,
207         self.run() returns ovs.reconnect.DISCONNECT."""
208         return self.probe_interval
209
210     def set_max_tries(self, max_tries):
211         """Limits the maximum number of times that this object will ask the
212         client to try to reconnect to 'max_tries'.  None (the default) means an
213         unlimited number of tries.
214
215         After the number of tries has expired, the FSM will disable itself
216         instead of backing off and retrying."""
217         self.max_tries = max_tries
218
219     def get_max_tries(self):
220         """Returns the current remaining number of connection attempts,
221         None if the number is unlimited."""
222         return self.max_tries
223
224     def set_backoff(self, min_backoff, max_backoff):
225         """Configures the backoff parameters for this FSM.  'min_backoff' is
226         the minimum number of milliseconds, and 'max_backoff' is the maximum,
227         between connection attempts.
228
229         'min_backoff' must be at least 1000, and 'max_backoff' must be greater
230         than or equal to 'min_backoff'."""
231         self.min_backoff = max(min_backoff, 1000)
232         if self.max_backoff:
233             self.max_backoff = max(max_backoff, 1000)
234         else:
235             self.max_backoff = 8000
236         if self.min_backoff > self.max_backoff:
237             self.max_backoff = self.min_backoff
238
239         if (self.state == Reconnect.Backoff and
240             self.backoff > self.max_backoff):
241                 self.backoff = self.max_backoff
242
243     def set_probe_interval(self, probe_interval):
244         """Sets the "probe interval" to 'probe_interval', in milliseconds.  If
245         this is zero, it disables the connection keepalive feature.  If it is
246         nonzero, then if the interval passes while this FSM is connected and
247         without self.received() being called, self.run() returns
248         ovs.reconnect.PROBE.  If the interval passes again without
249         self.received() being called, self.run() returns
250         ovs.reconnect.DISCONNECT.
251
252         If 'probe_interval' is nonzero, then it will be forced to a value of at
253         least 1000 ms."""
254         if probe_interval:
255             self.probe_interval = max(1000, probe_interval)
256         else:
257             self.probe_interval = 0
258
259     def is_passive(self):
260         """Returns true if 'fsm' is in passive mode, false if 'fsm' is in
261         active mode (the default)."""
262         return self.passive
263
264     def set_passive(self, passive, now):
265         """Configures this FSM for active or passive mode.  In active mode (the
266         default), the FSM is attempting to connect to a remote host.  In
267         passive mode, the FSM is listening for connections from a remote
268         host."""
269         if self.passive != passive:
270             self.passive = passive
271
272             if ((passive and self.state in (Reconnect.ConnectInProgress,
273                                             Reconnect.Reconnect)) or
274                 (not passive and self.state == Reconnect.Listening
275                  and self.__may_retry())):
276                 self._transition(now, Reconnect.Backoff)
277                 self.backoff = 0
278
279     def is_enabled(self):
280         """Returns true if this FSM has been enabled with self.enable().
281         Calling another function that indicates a change in connection state,
282         such as self.disconnected() or self.force_reconnect(), will also enable
283         a reconnect FSM."""
284         return self.state != Reconnect.Void
285
286     def enable(self, now):
287         """If this FSM is disabled (the default for newly created FSMs),
288         enables it, so that the next call to reconnect_run() for 'fsm' will
289         return ovs.reconnect.CONNECT.
290
291         If this FSM is not disabled, this function has no effect."""
292         if self.state == Reconnect.Void and self.__may_retry():
293             self._transition(now, Reconnect.Backoff)
294             self.backoff = 0
295
296     def disable(self, now):
297         """Disables this FSM.  Until 'fsm' is enabled again, self.run() will
298         always return 0."""
299         if self.state != Reconnect.Void:
300             self._transition(now, Reconnect.Void)
301
302     def force_reconnect(self, now):
303         """If this FSM is enabled and currently connected (or attempting to
304         connect), forces self.run() to return ovs.reconnect.DISCONNECT the next
305         time it is called, which should cause the client to drop the connection
306         (or attempt), back off, and then reconnect."""
307         if self.state in (Reconnect.ConnectInProgress,
308                           Reconnect.Active,
309                           Reconnect.Idle):
310             self._transition(now, Reconnect.Reconnect)
311
312     def disconnected(self, now, error):
313         """Tell this FSM that the connection dropped or that a connection
314         attempt failed.  'error' specifies the reason: a positive value
315         represents an errno value, EOF indicates that the connection was closed
316         by the peer (e.g. read() returned 0), and 0 indicates no specific
317         error.
318
319         The FSM will back off, then reconnect."""
320         if self.state not in (Reconnect.Backoff, Reconnect.Void):
321             # Report what happened
322             if self.state in (Reconnect.Active, Reconnect.Idle):
323                 if error > 0:
324                     vlog.warn("%s: connection dropped (%s)"
325                               % (self.name, os.strerror(error)))
326                 elif error == EOF:
327                     self.info_level("%s: connection closed by peer"
328                                     % self.name)
329                 else:
330                     self.info_level("%s: connection dropped" % self.name)
331             elif self.state == Reconnect.Listening:
332                 if error > 0:
333                     vlog.warn("%s: error listening for connections (%s)"
334                               % (self.name, os.strerror(error)))
335                 else:
336                     self.info_level("%s: error listening for connections"
337                                     % self.name)
338             else:
339                 if self.passive:
340                     type_ = "listen"
341                 else:
342                     type_ = "connection"
343                 if error > 0:
344                     vlog.warn("%s: %s attempt failed (%s)"
345                               % (self.name, type_, os.strerror(error)))
346                 else:
347                     self.info_level("%s: %s attempt timed out"
348                                     % (self.name, type_))
349
350             if (self.state in (Reconnect.Active, Reconnect.Idle)):
351                 self.last_disconnected = now
352
353             # Back off
354             if (self.state in (Reconnect.Active, Reconnect.Idle) and
355                 (self.last_received - self.last_connected >= self.backoff or
356                  self.passive)):
357                 if self.passive:
358                     self.backoff = 0
359                 else:
360                     self.backoff = self.min_backoff
361             else:
362                 if self.backoff < self.min_backoff:
363                     self.backoff = self.min_backoff
364                 elif self.backoff >= self.max_backoff / 2:
365                     self.backoff = self.max_backoff
366                 else:
367                     self.backoff *= 2
368
369                 if self.passive:
370                     self.info_level("%s: waiting %.3g seconds before trying "
371                                     "to listen again"
372                                     % (self.name, self.backoff / 1000.0))
373                 else:
374                     self.info_level("%s: waiting %.3g seconds before reconnect"
375                                     % (self.name, self.backoff / 1000.0))
376
377             if self.__may_retry():
378                 self._transition(now, Reconnect.Backoff)
379             else:
380                 self._transition(now, Reconnect.Void)
381
382     def connecting(self, now):
383         """Tell this FSM that a connection or listening attempt is in progress.
384
385         The FSM will start a timer, after which the connection or listening
386         attempt will be aborted (by returning ovs.reconnect.DISCONNECT from
387         self.run())."""
388         if self.state != Reconnect.ConnectInProgress:
389             if self.passive:
390                 self.info_level("%s: listening..." % self.name)
391             else:
392                 self.info_level("%s: connecting..." % self.name)
393             self._transition(now, Reconnect.ConnectInProgress)
394
395     def listening(self, now):
396         """Tell this FSM that the client is listening for connection attempts.
397         This state last indefinitely until the client reports some change.
398
399         The natural progression from this state is for the client to report
400         that a connection has been accepted or is in progress of being
401         accepted, by calling self.connecting() or self.connected().
402
403         The client may also report that listening failed (e.g. accept()
404         returned an unexpected error such as ENOMEM) by calling
405         self.listen_error(), in which case the FSM will back off and eventually
406         return ovs.reconnect.CONNECT from self.run() to tell the client to try
407         listening again."""
408         if self.state != Reconnect.Listening:
409             self.info_level("%s: listening..." % self.name)
410             self._transition(now, Reconnect.Listening)
411
412     def listen_error(self, now, error):
413         """Tell this FSM that the client's attempt to accept a connection
414         failed (e.g. accept() returned an unexpected error such as ENOMEM).
415
416         If the FSM is currently listening (self.listening() was called), it
417         will back off and eventually return ovs.reconnect.CONNECT from
418         self.run() to tell the client to try listening again.  If there is an
419         active connection, this will be delayed until that connection drops."""
420         if self.state == Reconnect.Listening:
421             self.disconnected(now, error)
422
423     def connected(self, now):
424         """Tell this FSM that the connection was successful.
425
426         The FSM will start the probe interval timer, which is reset by
427         self.received().  If the timer expires, a probe will be sent (by
428         returning ovs.reconnect.PROBE from self.run().  If the timer expires
429         again without being reset, the connection will be aborted (by returning
430         ovs.reconnect.DISCONNECT from self.run()."""
431         if not self.state.is_connected:
432             self.connecting(now)
433
434             self.info_level("%s: connected" % self.name)
435             self._transition(now, Reconnect.Active)
436             self.last_connected = now
437
438     def connect_failed(self, now, error):
439         """Tell this FSM that the connection attempt failed.
440
441         The FSM will back off and attempt to reconnect."""
442         self.connecting(now)
443         self.disconnected(now, error)
444
445     def received(self, now):
446         """Tell this FSM that some data was received.  This resets the probe
447         interval timer, so that the connection is known not to be idle."""
448         if self.state != Reconnect.Active:
449             self._transition(now, Reconnect.Active)
450         self.last_received = now
451
452     def _transition(self, now, state):
453         if self.state == Reconnect.ConnectInProgress:
454             self.n_attempted_connections += 1
455             if state == Reconnect.Active:
456                 self.n_successful_connections += 1
457
458         connected_before = self.state.is_connected
459         connected_now = state.is_connected
460         if connected_before != connected_now:
461             if connected_before:
462                 self.total_connected_duration += now - self.last_connected
463             self.seqno += 1
464
465         vlog.dbg("%s: entering %s" % (self.name, state.name))
466         self.state = state
467         self.state_entered = now
468
469     def run(self, now):
470         """Assesses whether any action should be taken on this FSM.  The return
471         value is one of:
472
473             - None: The client need not take any action.
474
475             - Active client, ovs.reconnect.CONNECT: The client should start a
476               connection attempt and indicate this by calling
477               self.connecting().  If the connection attempt has definitely
478               succeeded, it should call self.connected().  If the connection
479               attempt has definitely failed, it should call
480               self.connect_failed().
481
482               The FSM is smart enough to back off correctly after successful
483               connections that quickly abort, so it is OK to call
484               self.connected() after a low-level successful connection
485               (e.g. connect()) even if the connection might soon abort due to a
486               failure at a high-level (e.g. SSL negotiation failure).
487
488             - Passive client, ovs.reconnect.CONNECT: The client should try to
489               listen for a connection, if it is not already listening.  It
490               should call self.listening() if successful, otherwise
491               self.connecting() or reconnected_connect_failed() if the attempt
492               is in progress or definitely failed, respectively.
493
494               A listening passive client should constantly attempt to accept a
495               new connection and report an accepted connection with
496               self.connected().
497
498             - ovs.reconnect.DISCONNECT: The client should abort the current
499               connection or connection attempt or listen attempt and call
500               self.disconnected() or self.connect_failed() to indicate it.
501
502             - ovs.reconnect.PROBE: The client should send some kind of request
503               to the peer that will elicit a response, to ensure that the
504               connection is indeed in working order.  (This will only be
505               returned if the "probe interval" is nonzero--see
506               self.set_probe_interval())."""
507         if now >= self.state.deadline(self):
508             return self.state.run(self, now)
509         else:
510             return None
511
512     def wait(self, poller, now):
513         """Causes the next call to poller.block() to wake up when self.run()
514         should be called."""
515         timeout = self.timeout(now)
516         if timeout >= 0:
517             poller.timer_wait(timeout)
518
519     def timeout(self, now):
520         """Returns the number of milliseconds after which self.run() should be
521         called if nothing else notable happens in the meantime, or None if this
522         is currently unnecessary."""
523         deadline = self.state.deadline(self)
524         if deadline is not None:
525             remaining = deadline - now
526             return max(0, remaining)
527         else:
528             return None
529
530     def is_connected(self):
531         """Returns True if this FSM is currently believed to be connected, that
532         is, if self.connected() was called more recently than any call to
533         self.connect_failed() or self.disconnected() or self.disable(), and
534         False otherwise."""
535         return self.state.is_connected
536
537     def get_last_connect_elapsed(self, now):
538         """Returns the number of milliseconds since 'fsm' was last connected
539         to its peer. Returns None if never connected."""
540         if self.last_connected:
541             return now - self.last_connected
542         else:
543             return None
544
545     def get_last_disconnect_elapsed(self, now):
546         """Returns the number of milliseconds since 'fsm' was last disconnected
547         from its peer. Returns None if never disconnected."""
548         if self.last_disconnected:
549             return now - self.last_disconnected
550         else:
551             return None
552
553     def get_stats(self, now):
554         class Stats(object):
555             pass
556         stats = Stats()
557         stats.creation_time = self.creation_time
558         stats.last_connected = self.last_connected
559         stats.last_disconnected = self.last_disconnected
560         stats.last_received = self.last_received
561         stats.backoff = self.backoff
562         stats.seqno = self.seqno
563         stats.is_connected = self.is_connected()
564         stats.msec_since_connect = self.get_last_connect_elapsed(now)
565         stats.msec_since_disconnect = self.get_last_disconnect_elapsed(now)
566         stats.total_connected_duration = self.total_connected_duration
567         if self.is_connected():
568             stats.total_connected_duration += (
569                     self.get_last_connect_elapsed(now))
570         stats.n_attempted_connections = self.n_attempted_connections
571         stats.n_successful_connections = self.n_successful_connections
572         stats.state = self.state.name
573         stats.state_elapsed = now - self.state_entered
574         return stats
575
576     def __may_retry(self):
577         if self.max_tries is None:
578             return True
579         elif self.max_tries > 0:
580             self.max_tries -= 1
581             return True
582         else:
583             return False