1 # Copyright (c) 2010 Nicira Networks
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at:
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
18 # Values returned by Reconnect.run()
20 DISCONNECT = 'disconnect'
25 class Reconnect(object):
26 """A finite-state machine for connecting and reconnecting to a network
27 resource with exponential backoff. It also provides optional support for
28 detecting a connection on which the peer is no longer responding.
30 The library does not implement anything networking related, only an FSM for
31 networking code to use.
33 Many Reconnect methods take a "now" argument. This makes testing easier
34 since there is no hidden state. When not testing, just pass the return
35 value of ovs.time.msec(). (Perhaps this design should be revisited
50 class Listening(object):
62 class Backoff(object):
68 return fsm.state_entered + fsm.backoff
74 class ConnectInProgress(object):
75 name = "CONNECT_IN_PROGRESS"
80 return fsm.state_entered + max(1000, fsm.backoff)
92 if fsm.probe_interval:
93 base = max(fsm.last_received, fsm.state_entered)
94 return base + fsm.probe_interval
99 logging.debug("%s: idle %d ms, sending inactivity probe"
101 now - max(fsm.last_received, fsm.state_entered)))
102 fsm._transition(now, Reconnect.Idle)
111 return fsm.state_entered + fsm.probe_interval
115 logging.error("%s: no response to inactivity probe after %.3g "
116 "seconds, disconnecting"
117 % (fsm.name, (now - fsm.state_entered) / 1000.0))
126 return fsm.state_entered
132 def __init__(self, now):
133 """Creates and returns a new reconnect FSM with default settings. The
134 FSM is initially disabled. The caller will likely want to call
135 self.enable() and self.set_name() on the returned object."""
138 self.min_backoff = 1000
139 self.max_backoff = 8000
140 self.probe_interval = 5000
142 self.info_level = logging.info
144 self.state = Reconnect.Void
145 self.state_entered = now
147 self.last_received = now
148 self.last_connected = now
149 self.max_tries = None
151 self.creation_time = now
152 self.n_attempted_connections = 0
153 self.n_successful_connections = 0
154 self.total_connected_duration = 0
157 def set_quiet(self, quiet):
158 """If 'quiet' is true, this object will log informational messages at
159 debug level, by default keeping them out of log files. This is
160 appropriate if the connection is one that is expected to be
161 short-lived, so that the log messages are merely distracting.
163 If 'quiet' is false, this object logs informational messages at info
164 level. This is the default.
166 This setting has no effect on the log level of debugging, warning, or
169 self.info_level = logging.debug
171 self.info_level = logging.info
176 def set_name(self, name):
177 """Sets this object's name to 'name'. If 'name' is None, then "void"
180 The name is used in log messages."""
186 def get_min_backoff(self):
187 """Return the minimum number of milliseconds to back off between
188 consecutive connection attempts. The default is 1000 ms."""
189 return self.min_backoff
191 def get_max_backoff(self):
192 """Return the maximum number of milliseconds to back off between
193 consecutive connection attempts. The default is 8000 ms."""
194 return self.max_backoff
196 def get_probe_interval(self):
197 """Returns the "probe interval" in milliseconds. If this is zero, it
198 disables the connection keepalive feature. If it is nonzero, then if
199 the interval passes while the FSM is connected and without
200 self.received() being called, self.run() returns ovs.reconnect.PROBE.
201 If the interval passes again without self.received() being called,
202 self.run() returns ovs.reconnect.DISCONNECT."""
203 return self.probe_interval
205 def set_max_tries(self, max_tries):
206 """Limits the maximum number of times that this object will ask the
207 client to try to reconnect to 'max_tries'. None (the default) means an
208 unlimited number of tries.
210 After the number of tries has expired, the FSM will disable itself
211 instead of backing off and retrying."""
212 self.max_tries = max_tries
214 def get_max_tries(self):
215 """Returns the current remaining number of connection attempts,
216 None if the number is unlimited."""
217 return self.max_tries
219 def set_backoff(self, min_backoff, max_backoff):
220 """Configures the backoff parameters for this FSM. 'min_backoff' is
221 the minimum number of milliseconds, and 'max_backoff' is the maximum,
222 between connection attempts.
224 'min_backoff' must be at least 1000, and 'max_backoff' must be greater
225 than or equal to 'min_backoff'."""
226 self.min_backoff = max(min_backoff, 1000)
228 self.max_backoff = max(max_backoff, 1000)
230 self.max_backoff = 8000
231 if self.min_backoff > self.max_backoff:
232 self.max_backoff = self.min_backoff
234 if (self.state == Reconnect.Backoff and
235 self.backoff > self.max_backoff):
236 self.backoff = self.max_backoff
238 def set_probe_interval(self, probe_interval):
239 """Sets the "probe interval" to 'probe_interval', in milliseconds. If
240 this is zero, it disables the connection keepalive feature. If it is
241 nonzero, then if the interval passes while this FSM is connected and
242 without self.received() being called, self.run() returns
243 ovs.reconnect.PROBE. If the interval passes again without
244 self.received() being called, self.run() returns
245 ovs.reconnect.DISCONNECT.
247 If 'probe_interval' is nonzero, then it will be forced to a value of at
250 self.probe_interval = max(1000, probe_interval)
252 self.probe_interval = 0
254 def is_passive(self):
255 """Returns true if 'fsm' is in passive mode, false if 'fsm' is in
256 active mode (the default)."""
259 def set_passive(self, passive, now):
260 """Configures this FSM for active or passive mode. In active mode (the
261 default), the FSM is attempting to connect to a remote host. In
262 passive mode, the FSM is listening for connections from a remote host."""
263 if self.passive != passive:
264 self.passive = passive
266 if ((passive and self.state in (Reconnect.ConnectInProgress,
267 Reconnect.Reconnect)) or
268 (not passive and self.state == Reconnect.Listening
269 and self.__may_retry())):
270 self._transition(now, Reconnect.Backoff)
273 def is_enabled(self):
274 """Returns true if this FSM has been enabled with self.enable().
275 Calling another function that indicates a change in connection state,
276 such as self.disconnected() or self.force_reconnect(), will also enable
278 return self.state != Reconnect.Void
280 def enable(self, now):
281 """If this FSM is disabled (the default for newly created FSMs),
282 enables it, so that the next call to reconnect_run() for 'fsm' will
283 return ovs.reconnect.CONNECT.
285 If this FSM is not disabled, this function has no effect."""
286 if self.state == Reconnect.Void and self.__may_retry():
287 self._transition(now, Reconnect.Backoff)
290 def disable(self, now):
291 """Disables this FSM. Until 'fsm' is enabled again, self.run() will
293 if self.state != Reconnect.Void:
294 self._transition(now, Reconnect.Void)
296 def force_reconnect(self, now):
297 """If this FSM is enabled and currently connected (or attempting to
298 connect), forces self.run() to return ovs.reconnect.DISCONNECT the next
299 time it is called, which should cause the client to drop the connection
300 (or attempt), back off, and then reconnect."""
301 if self.state in (Reconnect.ConnectInProgress,
304 self._transition(now, Reconnect.Reconnect)
306 def disconnected(self, now, error):
307 """Tell this FSM that the connection dropped or that a connection
308 attempt failed. 'error' specifies the reason: a positive value
309 represents an errno value, EOF indicates that the connection was closed
310 by the peer (e.g. read() returned 0), and 0 indicates no specific
313 The FSM will back off, then reconnect."""
314 if self.state not in (Reconnect.Backoff, Reconnect.Void):
315 # Report what happened
316 if self.state in (Reconnect.Active, Reconnect.Idle):
318 logging.warning("%s: connection dropped (%s)"
319 % (self.name, os.strerror(error)))
321 self.info_level("%s: connection closed by peer"
324 self.info_level("%s: connection dropped" % self.name)
325 elif self.state == Reconnect.Listening:
327 logging.warning("%s: error listening for connections (%s)"
328 % (self.name, os.strerror(error)))
330 self.info_level("%s: error listening for connections"
338 logging.warning("%s: %s attempt failed (%s)"
339 % (self.name, type, os.strerror(error)))
341 self.info_level("%s: %s attempt timed out"
345 if (self.state in (Reconnect.Active, Reconnect.Idle) and
346 (self.last_received - self.last_connected >= self.backoff or
351 self.backoff = self.min_backoff
353 if self.backoff < self.min_backoff:
354 self.backoff = self.min_backoff
355 elif self.backoff >= self.max_backoff / 2:
356 self.backoff = self.max_backoff
361 self.info_level("%s: waiting %.3g seconds before trying "
363 % (self.name, self.backoff / 1000.0))
365 self.info_level("%s: waiting %.3g seconds before reconnect"
366 % (self.name, self.backoff / 1000.0))
368 if self.__may_retry():
369 self._transition(now, Reconnect.Backoff)
371 self._transition(now, Reconnect.Void)
373 def connecting(self, now):
374 """Tell this FSM that a connection or listening attempt is in progress.
376 The FSM will start a timer, after which the connection or listening
377 attempt will be aborted (by returning ovs.reconnect.DISCONNECT from
379 if self.state != Reconnect.ConnectInProgress:
381 self.info_level("%s: listening..." % self.name)
383 self.info_level("%s: connecting..." % self.name)
384 self._transition(now, Reconnect.ConnectInProgress)
386 def listening(self, now):
387 """Tell this FSM that the client is listening for connection attempts.
388 This state last indefinitely until the client reports some change.
390 The natural progression from this state is for the client to report
391 that a connection has been accepted or is in progress of being
392 accepted, by calling self.connecting() or self.connected().
394 The client may also report that listening failed (e.g. accept()
395 returned an unexpected error such as ENOMEM) by calling
396 self.listen_error(), in which case the FSM will back off and eventually
397 return ovs.reconnect.CONNECT from self.run() to tell the client to try
399 if self.state != Reconnect.Listening:
400 self.info_level("%s: listening..." % self.name)
401 self._transition(now, Reconnect.Listening)
403 def listen_error(self, now, error):
404 """Tell this FSM that the client's attempt to accept a connection
405 failed (e.g. accept() returned an unexpected error such as ENOMEM).
407 If the FSM is currently listening (self.listening() was called), it
408 will back off and eventually return ovs.reconnect.CONNECT from
409 self.run() to tell the client to try listening again. If there is an
410 active connection, this will be delayed until that connection drops."""
411 if self.state == Reconnect.Listening:
412 self.disconnected(now, error)
414 def connected(self, now):
415 """Tell this FSM that the connection was successful.
417 The FSM will start the probe interval timer, which is reset by
418 self.received(). If the timer expires, a probe will be sent (by
419 returning ovs.reconnect.PROBE from self.run(). If the timer expires
420 again without being reset, the connection will be aborted (by returning
421 ovs.reconnect.DISCONNECT from self.run()."""
422 if not self.state.is_connected:
425 self.info_level("%s: connected" % self.name)
426 self._transition(now, Reconnect.Active)
427 self.last_connected = now
429 def connect_failed(self, now, error):
430 """Tell this FSM that the connection attempt failed.
432 The FSM will back off and attempt to reconnect."""
434 self.disconnected(now, error)
436 def received(self, now):
437 """Tell this FSM that some data was received. This resets the probe
438 interval timer, so that the connection is known not to be idle."""
439 if self.state != Reconnect.Active:
440 self._transition(now, Reconnect.Active)
441 self.last_received = now
443 def _transition(self, now, state):
444 if self.state == Reconnect.ConnectInProgress:
445 self.n_attempted_connections += 1
446 if state == Reconnect.Active:
447 self.n_successful_connections += 1
449 connected_before = self.state.is_connected
450 connected_now = state.is_connected
451 if connected_before != connected_now:
453 self.total_connected_duration += now - self.last_connected
456 logging.debug("%s: entering %s" % (self.name, state.name))
458 self.state_entered = now
461 """Assesses whether any action should be taken on this FSM. The return
464 - None: The client need not take any action.
466 - Active client, ovs.reconnect.CONNECT: The client should start a
467 connection attempt and indicate this by calling
468 self.connecting(). If the connection attempt has definitely
469 succeeded, it should call self.connected(). If the connection
470 attempt has definitely failed, it should call
471 self.connect_failed().
473 The FSM is smart enough to back off correctly after successful
474 connections that quickly abort, so it is OK to call
475 self.connected() after a low-level successful connection
476 (e.g. connect()) even if the connection might soon abort due to a
477 failure at a high-level (e.g. SSL negotiation failure).
479 - Passive client, ovs.reconnect.CONNECT: The client should try to
480 listen for a connection, if it is not already listening. It
481 should call self.listening() if successful, otherwise
482 self.connecting() or reconnected_connect_failed() if the attempt
483 is in progress or definitely failed, respectively.
485 A listening passive client should constantly attempt to accept a
486 new connection and report an accepted connection with
489 - ovs.reconnect.DISCONNECT: The client should abort the current
490 connection or connection attempt or listen attempt and call
491 self.disconnected() or self.connect_failed() to indicate it.
493 - ovs.reconnect.PROBE: The client should send some kind of request
494 to the peer that will elicit a response, to ensure that the
495 connection is indeed in working order. (This will only be
496 returned if the "probe interval" is nonzero--see
497 self.set_probe_interval())."""
498 if now >= self.state.deadline(self):
499 return self.state.run(self, now)
503 def wait(self, poller, now):
504 """Causes the next call to poller.block() to wake up when self.run()
506 timeout = self.timeout(now)
508 poller.timer_wait(timeout)
510 def timeout(self, now):
511 """Returns the number of milliseconds after which self.run() should be
512 called if nothing else notable happens in the meantime, or a negative
513 number if this is currently unnecessary."""
514 deadline = self.state.deadline(self)
515 if deadline is not None:
516 remaining = deadline - now
517 return max(0, remaining)
521 def is_connected(self):
522 """Returns True if this FSM is currently believed to be connected, that
523 is, if self.connected() was called more recently than any call to
524 self.connect_failed() or self.disconnected() or self.disable(), and
526 return self.state.is_connected
528 def get_connection_duration(self, now):
529 """Returns the number of milliseconds for which this FSM has been
530 continuously connected to its peer. (If this FSM is not currently
531 connected, this is 0.)"""
532 if self.is_connected():
533 return now - self.last_connected
537 def get_stats(self, now):
541 stats.creation_time = self.creation_time
542 stats.last_connected = self.last_connected
543 stats.last_received = self.last_received
544 stats.backoff = self.backoff
545 stats.seqno = self.seqno
546 stats.is_connected = self.is_connected()
547 stats.current_connection_duration = self.get_connection_duration(now)
548 stats.total_connected_duration = (stats.current_connection_duration +
549 self.total_connected_duration)
550 stats.n_attempted_connections = self.n_attempted_connections
551 stats.n_successful_connections = self.n_successful_connections
552 stats.state = self.state.name
553 stats.state_elapsed = now - self.state_entered
556 def __may_retry(self):
557 if self.max_tries is None:
559 elif self.max_tries > 0: