python: Upgrade to vlog.
[openvswitch] / python / ovs / reconnect.py
1 # Copyright (c) 2010, 2011 Nicira Networks
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at:
6 #
7 #     http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14
15 import os
16
17 import ovs.vlog
18
19 # Values returned by Reconnect.run()
20 CONNECT = 'connect'
21 DISCONNECT = 'disconnect'
22 PROBE = 'probe'
23
24 EOF = -1
25 vlog = ovs.vlog.Vlog("reconnect")
26
27
28 class Reconnect(object):
29     """A finite-state machine for connecting and reconnecting to a network
30     resource with exponential backoff.  It also provides optional support for
31     detecting a connection on which the peer is no longer responding.
32
33     The library does not implement anything networking related, only an FSM for
34     networking code to use.
35
36     Many Reconnect methods take a "now" argument.  This makes testing easier
37     since there is no hidden state.  When not testing, just pass the return
38     value of ovs.time.msec().  (Perhaps this design should be revisited
39     later.)"""
40
41     class Void(object):
42         name = "VOID"
43         is_connected = False
44
45         @staticmethod
46         def deadline(fsm):
47             return None
48
49         @staticmethod
50         def run(fsm, now):
51             return None
52
53     class Listening(object):
54         name = "LISTENING"
55         is_connected = False
56
57         @staticmethod
58         def deadline(fsm):
59             return None
60
61         @staticmethod
62         def run(fsm, now):
63             return None
64
65     class Backoff(object):
66         name = "BACKOFF"
67         is_connected = False
68
69         @staticmethod
70         def deadline(fsm):
71             return fsm.state_entered + fsm.backoff
72
73         @staticmethod
74         def run(fsm, now):
75             return CONNECT
76
77     class ConnectInProgress(object):
78         name = "CONNECTING"
79         is_connected = False
80
81         @staticmethod
82         def deadline(fsm):
83             return fsm.state_entered + max(1000, fsm.backoff)
84
85         @staticmethod
86         def run(fsm, now):
87             return DISCONNECT
88
89     class Active(object):
90         name = "ACTIVE"
91         is_connected = True
92
93         @staticmethod
94         def deadline(fsm):
95             if fsm.probe_interval:
96                 base = max(fsm.last_received, fsm.state_entered)
97                 return base + fsm.probe_interval
98             return None
99
100         @staticmethod
101         def run(fsm, now):
102             vlog.dbg("%s: idle %d ms, sending inactivity probe"
103                      % (fsm.name,
104                         now - max(fsm.last_received, fsm.state_entered)))
105             fsm._transition(now, Reconnect.Idle)
106             return PROBE
107
108     class Idle(object):
109         name = "IDLE"
110         is_connected = True
111
112         @staticmethod
113         def deadline(fsm):
114             return fsm.state_entered + fsm.probe_interval
115
116         @staticmethod
117         def run(fsm, now):
118             vlog.err("%s: no response to inactivity probe after %.3g "
119                      "seconds, disconnecting"
120                       % (fsm.name, (now - fsm.state_entered) / 1000.0))
121             return DISCONNECT
122
123     class Reconnect(object):
124         name = "RECONNECT"
125         is_connected = False
126
127         @staticmethod
128         def deadline(fsm):
129             return fsm.state_entered
130
131         @staticmethod
132         def run(fsm, now):
133             return DISCONNECT
134
135     def __init__(self, now):
136         """Creates and returns a new reconnect FSM with default settings.  The
137         FSM is initially disabled.  The caller will likely want to call
138         self.enable() and self.set_name() on the returned object."""
139
140         self.name = "void"
141         self.min_backoff = 1000
142         self.max_backoff = 8000
143         self.probe_interval = 5000
144         self.passive = False
145         self.info_level = vlog.info
146
147         self.state = Reconnect.Void
148         self.state_entered = now
149         self.backoff = 0
150         self.last_received = now
151         self.last_connected = None
152         self.last_disconnected = None
153         self.max_tries = None
154
155         self.creation_time = now
156         self.n_attempted_connections = 0
157         self.n_successful_connections = 0
158         self.total_connected_duration = 0
159         self.seqno = 0
160
161     def set_quiet(self, quiet):
162         """If 'quiet' is true, this object will log informational messages at
163         debug level, by default keeping them out of log files.  This is
164         appropriate if the connection is one that is expected to be
165         short-lived, so that the log messages are merely distracting.
166
167         If 'quiet' is false, this object logs informational messages at info
168         level.  This is the default.
169
170         This setting has no effect on the log level of debugging, warning, or
171         error messages."""
172         if quiet:
173             self.info_level = vlog.dbg
174         else:
175             self.info_level = vlog.info
176
177     def get_name(self):
178         return self.name
179
180     def set_name(self, name):
181         """Sets this object's name to 'name'.  If 'name' is None, then "void"
182         is used instead.
183
184         The name is used in log messages."""
185         if name is None:
186             self.name = "void"
187         else:
188             self.name = name
189
190     def get_min_backoff(self):
191         """Return the minimum number of milliseconds to back off between
192         consecutive connection attempts.  The default is 1000 ms."""
193         return self.min_backoff
194
195     def get_max_backoff(self):
196         """Return the maximum number of milliseconds to back off between
197         consecutive connection attempts.  The default is 8000 ms."""
198         return self.max_backoff
199
200     def get_probe_interval(self):
201         """Returns the "probe interval" in milliseconds.  If this is zero, it
202         disables the connection keepalive feature.  If it is nonzero, then if
203         the interval passes while the FSM is connected and without
204         self.received() being called, self.run() returns ovs.reconnect.PROBE.
205         If the interval passes again without self.received() being called,
206         self.run() returns ovs.reconnect.DISCONNECT."""
207         return self.probe_interval
208
209     def set_max_tries(self, max_tries):
210         """Limits the maximum number of times that this object will ask the
211         client to try to reconnect to 'max_tries'.  None (the default) means an
212         unlimited number of tries.
213
214         After the number of tries has expired, the FSM will disable itself
215         instead of backing off and retrying."""
216         self.max_tries = max_tries
217
218     def get_max_tries(self):
219         """Returns the current remaining number of connection attempts,
220         None if the number is unlimited."""
221         return self.max_tries
222
223     def set_backoff(self, min_backoff, max_backoff):
224         """Configures the backoff parameters for this FSM.  'min_backoff' is
225         the minimum number of milliseconds, and 'max_backoff' is the maximum,
226         between connection attempts.
227
228         'min_backoff' must be at least 1000, and 'max_backoff' must be greater
229         than or equal to 'min_backoff'."""
230         self.min_backoff = max(min_backoff, 1000)
231         if self.max_backoff:
232             self.max_backoff = max(max_backoff, 1000)
233         else:
234             self.max_backoff = 8000
235         if self.min_backoff > self.max_backoff:
236             self.max_backoff = self.min_backoff
237
238         if (self.state == Reconnect.Backoff and
239             self.backoff > self.max_backoff):
240                 self.backoff = self.max_backoff
241
242     def set_probe_interval(self, probe_interval):
243         """Sets the "probe interval" to 'probe_interval', in milliseconds.  If
244         this is zero, it disables the connection keepalive feature.  If it is
245         nonzero, then if the interval passes while this FSM is connected and
246         without self.received() being called, self.run() returns
247         ovs.reconnect.PROBE.  If the interval passes again without
248         self.received() being called, self.run() returns
249         ovs.reconnect.DISCONNECT.
250
251         If 'probe_interval' is nonzero, then it will be forced to a value of at
252         least 1000 ms."""
253         if probe_interval:
254             self.probe_interval = max(1000, probe_interval)
255         else:
256             self.probe_interval = 0
257
258     def is_passive(self):
259         """Returns true if 'fsm' is in passive mode, false if 'fsm' is in
260         active mode (the default)."""
261         return self.passive
262
263     def set_passive(self, passive, now):
264         """Configures this FSM for active or passive mode.  In active mode (the
265         default), the FSM is attempting to connect to a remote host.  In
266         passive mode, the FSM is listening for connections from a remote
267         host."""
268         if self.passive != passive:
269             self.passive = passive
270
271             if ((passive and self.state in (Reconnect.ConnectInProgress,
272                                             Reconnect.Reconnect)) or
273                 (not passive and self.state == Reconnect.Listening
274                  and self.__may_retry())):
275                 self._transition(now, Reconnect.Backoff)
276                 self.backoff = 0
277
278     def is_enabled(self):
279         """Returns true if this FSM has been enabled with self.enable().
280         Calling another function that indicates a change in connection state,
281         such as self.disconnected() or self.force_reconnect(), will also enable
282         a reconnect FSM."""
283         return self.state != Reconnect.Void
284
285     def enable(self, now):
286         """If this FSM is disabled (the default for newly created FSMs),
287         enables it, so that the next call to reconnect_run() for 'fsm' will
288         return ovs.reconnect.CONNECT.
289
290         If this FSM is not disabled, this function has no effect."""
291         if self.state == Reconnect.Void and self.__may_retry():
292             self._transition(now, Reconnect.Backoff)
293             self.backoff = 0
294
295     def disable(self, now):
296         """Disables this FSM.  Until 'fsm' is enabled again, self.run() will
297         always return 0."""
298         if self.state != Reconnect.Void:
299             self._transition(now, Reconnect.Void)
300
301     def force_reconnect(self, now):
302         """If this FSM is enabled and currently connected (or attempting to
303         connect), forces self.run() to return ovs.reconnect.DISCONNECT the next
304         time it is called, which should cause the client to drop the connection
305         (or attempt), back off, and then reconnect."""
306         if self.state in (Reconnect.ConnectInProgress,
307                           Reconnect.Active,
308                           Reconnect.Idle):
309             self._transition(now, Reconnect.Reconnect)
310
311     def disconnected(self, now, error):
312         """Tell this FSM that the connection dropped or that a connection
313         attempt failed.  'error' specifies the reason: a positive value
314         represents an errno value, EOF indicates that the connection was closed
315         by the peer (e.g. read() returned 0), and 0 indicates no specific
316         error.
317
318         The FSM will back off, then reconnect."""
319         if self.state not in (Reconnect.Backoff, Reconnect.Void):
320             # Report what happened
321             if self.state in (Reconnect.Active, Reconnect.Idle):
322                 if error > 0:
323                     vlog.warn("%s: connection dropped (%s)"
324                               % (self.name, os.strerror(error)))
325                 elif error == EOF:
326                     self.info_level("%s: connection closed by peer"
327                                     % self.name)
328                 else:
329                     self.info_level("%s: connection dropped" % self.name)
330             elif self.state == Reconnect.Listening:
331                 if error > 0:
332                     vlog.warn("%s: error listening for connections (%s)"
333                               % (self.name, os.strerror(error)))
334                 else:
335                     self.info_level("%s: error listening for connections"
336                                     % self.name)
337             else:
338                 if self.passive:
339                     type_ = "listen"
340                 else:
341                     type_ = "connection"
342                 if error > 0:
343                     vlog.warn("%s: %s attempt failed (%s)"
344                               % (self.name, type_, os.strerror(error)))
345                 else:
346                     self.info_level("%s: %s attempt timed out"
347                                     % (self.name, type_))
348
349             if (self.state in (Reconnect.Active, Reconnect.Idle)):
350                 self.last_disconnected = now
351
352             # Back off
353             if (self.state in (Reconnect.Active, Reconnect.Idle) and
354                 (self.last_received - self.last_connected >= self.backoff or
355                  self.passive)):
356                 if self.passive:
357                     self.backoff = 0
358                 else:
359                     self.backoff = self.min_backoff
360             else:
361                 if self.backoff < self.min_backoff:
362                     self.backoff = self.min_backoff
363                 elif self.backoff >= self.max_backoff / 2:
364                     self.backoff = self.max_backoff
365                 else:
366                     self.backoff *= 2
367
368                 if self.passive:
369                     self.info_level("%s: waiting %.3g seconds before trying "
370                                     "to listen again"
371                                     % (self.name, self.backoff / 1000.0))
372                 else:
373                     self.info_level("%s: waiting %.3g seconds before reconnect"
374                                     % (self.name, self.backoff / 1000.0))
375
376             if self.__may_retry():
377                 self._transition(now, Reconnect.Backoff)
378             else:
379                 self._transition(now, Reconnect.Void)
380
381     def connecting(self, now):
382         """Tell this FSM that a connection or listening attempt is in progress.
383
384         The FSM will start a timer, after which the connection or listening
385         attempt will be aborted (by returning ovs.reconnect.DISCONNECT from
386         self.run())."""
387         if self.state != Reconnect.ConnectInProgress:
388             if self.passive:
389                 self.info_level("%s: listening..." % self.name)
390             else:
391                 self.info_level("%s: connecting..." % self.name)
392             self._transition(now, Reconnect.ConnectInProgress)
393
394     def listening(self, now):
395         """Tell this FSM that the client is listening for connection attempts.
396         This state last indefinitely until the client reports some change.
397
398         The natural progression from this state is for the client to report
399         that a connection has been accepted or is in progress of being
400         accepted, by calling self.connecting() or self.connected().
401
402         The client may also report that listening failed (e.g. accept()
403         returned an unexpected error such as ENOMEM) by calling
404         self.listen_error(), in which case the FSM will back off and eventually
405         return ovs.reconnect.CONNECT from self.run() to tell the client to try
406         listening again."""
407         if self.state != Reconnect.Listening:
408             self.info_level("%s: listening..." % self.name)
409             self._transition(now, Reconnect.Listening)
410
411     def listen_error(self, now, error):
412         """Tell this FSM that the client's attempt to accept a connection
413         failed (e.g. accept() returned an unexpected error such as ENOMEM).
414
415         If the FSM is currently listening (self.listening() was called), it
416         will back off and eventually return ovs.reconnect.CONNECT from
417         self.run() to tell the client to try listening again.  If there is an
418         active connection, this will be delayed until that connection drops."""
419         if self.state == Reconnect.Listening:
420             self.disconnected(now, error)
421
422     def connected(self, now):
423         """Tell this FSM that the connection was successful.
424
425         The FSM will start the probe interval timer, which is reset by
426         self.received().  If the timer expires, a probe will be sent (by
427         returning ovs.reconnect.PROBE from self.run().  If the timer expires
428         again without being reset, the connection will be aborted (by returning
429         ovs.reconnect.DISCONNECT from self.run()."""
430         if not self.state.is_connected:
431             self.connecting(now)
432
433             self.info_level("%s: connected" % self.name)
434             self._transition(now, Reconnect.Active)
435             self.last_connected = now
436
437     def connect_failed(self, now, error):
438         """Tell this FSM that the connection attempt failed.
439
440         The FSM will back off and attempt to reconnect."""
441         self.connecting(now)
442         self.disconnected(now, error)
443
444     def received(self, now):
445         """Tell this FSM that some data was received.  This resets the probe
446         interval timer, so that the connection is known not to be idle."""
447         if self.state != Reconnect.Active:
448             self._transition(now, Reconnect.Active)
449         self.last_received = now
450
451     def _transition(self, now, state):
452         if self.state == Reconnect.ConnectInProgress:
453             self.n_attempted_connections += 1
454             if state == Reconnect.Active:
455                 self.n_successful_connections += 1
456
457         connected_before = self.state.is_connected
458         connected_now = state.is_connected
459         if connected_before != connected_now:
460             if connected_before:
461                 self.total_connected_duration += now - self.last_connected
462             self.seqno += 1
463
464         vlog.dbg("%s: entering %s" % (self.name, state.name))
465         self.state = state
466         self.state_entered = now
467
468     def run(self, now):
469         """Assesses whether any action should be taken on this FSM.  The return
470         value is one of:
471
472             - None: The client need not take any action.
473
474             - Active client, ovs.reconnect.CONNECT: The client should start a
475               connection attempt and indicate this by calling
476               self.connecting().  If the connection attempt has definitely
477               succeeded, it should call self.connected().  If the connection
478               attempt has definitely failed, it should call
479               self.connect_failed().
480
481               The FSM is smart enough to back off correctly after successful
482               connections that quickly abort, so it is OK to call
483               self.connected() after a low-level successful connection
484               (e.g. connect()) even if the connection might soon abort due to a
485               failure at a high-level (e.g. SSL negotiation failure).
486
487             - Passive client, ovs.reconnect.CONNECT: The client should try to
488               listen for a connection, if it is not already listening.  It
489               should call self.listening() if successful, otherwise
490               self.connecting() or reconnected_connect_failed() if the attempt
491               is in progress or definitely failed, respectively.
492
493               A listening passive client should constantly attempt to accept a
494               new connection and report an accepted connection with
495               self.connected().
496
497             - ovs.reconnect.DISCONNECT: The client should abort the current
498               connection or connection attempt or listen attempt and call
499               self.disconnected() or self.connect_failed() to indicate it.
500
501             - ovs.reconnect.PROBE: The client should send some kind of request
502               to the peer that will elicit a response, to ensure that the
503               connection is indeed in working order.  (This will only be
504               returned if the "probe interval" is nonzero--see
505               self.set_probe_interval())."""
506         if now >= self.state.deadline(self):
507             return self.state.run(self, now)
508         else:
509             return None
510
511     def wait(self, poller, now):
512         """Causes the next call to poller.block() to wake up when self.run()
513         should be called."""
514         timeout = self.timeout(now)
515         if timeout >= 0:
516             poller.timer_wait(timeout)
517
518     def timeout(self, now):
519         """Returns the number of milliseconds after which self.run() should be
520         called if nothing else notable happens in the meantime, or None if this
521         is currently unnecessary."""
522         deadline = self.state.deadline(self)
523         if deadline is not None:
524             remaining = deadline - now
525             return max(0, remaining)
526         else:
527             return None
528
529     def is_connected(self):
530         """Returns True if this FSM is currently believed to be connected, that
531         is, if self.connected() was called more recently than any call to
532         self.connect_failed() or self.disconnected() or self.disable(), and
533         False otherwise."""
534         return self.state.is_connected
535
536     def get_last_connect_elapsed(self, now):
537         """Returns the number of milliseconds since 'fsm' was last connected
538         to its peer. Returns None if never connected."""
539         if self.last_connected:
540             return now - self.last_connected
541         else:
542             return None
543
544     def get_last_disconnect_elapsed(self, now):
545         """Returns the number of milliseconds since 'fsm' was last disconnected
546         from its peer. Returns None if never disconnected."""
547         if self.last_disconnected:
548             return now - self.last_disconnected
549         else:
550             return None
551
552     def get_stats(self, now):
553         class Stats(object):
554             pass
555         stats = Stats()
556         stats.creation_time = self.creation_time
557         stats.last_connected = self.last_connected
558         stats.last_disconnected = self.last_disconnected
559         stats.last_received = self.last_received
560         stats.backoff = self.backoff
561         stats.seqno = self.seqno
562         stats.is_connected = self.is_connected()
563         stats.msec_since_connect = self.get_last_connect_elapsed(now)
564         stats.msec_since_disconnect = self.get_last_disconnect_elapsed(now)
565         stats.total_connected_duration = self.total_connected_duration
566         if self.is_connected():
567             stats.total_connected_duration += (
568                     self.get_last_connect_elapsed(now))
569         stats.n_attempted_connections = self.n_attempted_connections
570         stats.n_successful_connections = self.n_successful_connections
571         stats.state = self.state.name
572         stats.state_elapsed = now - self.state_entered
573         return stats
574
575     def __may_retry(self):
576         if self.max_tries is None:
577             return True
578         elif self.max_tries > 0:
579             self.max_tries -= 1
580             return True
581         else:
582             return False