1 # Copyright (c) 2010, 2011 Nicira Networks
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at:
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
18 # Values returned by Reconnect.run()
20 DISCONNECT = 'disconnect'
25 class Reconnect(object):
26 """A finite-state machine for connecting and reconnecting to a network
27 resource with exponential backoff. It also provides optional support for
28 detecting a connection on which the peer is no longer responding.
30 The library does not implement anything networking related, only an FSM for
31 networking code to use.
33 Many Reconnect methods take a "now" argument. This makes testing easier
34 since there is no hidden state. When not testing, just pass the return
35 value of ovs.time.msec(). (Perhaps this design should be revisited
50 class Listening(object):
62 class Backoff(object):
68 return fsm.state_entered + fsm.backoff
74 class ConnectInProgress(object):
80 return fsm.state_entered + max(1000, fsm.backoff)
92 if fsm.probe_interval:
93 base = max(fsm.last_received, fsm.state_entered)
94 return base + fsm.probe_interval
99 logging.debug("%s: idle %d ms, sending inactivity probe"
101 now - max(fsm.last_received, fsm.state_entered)))
102 fsm._transition(now, Reconnect.Idle)
111 return fsm.state_entered + fsm.probe_interval
115 logging.error("%s: no response to inactivity probe after %.3g "
116 "seconds, disconnecting"
117 % (fsm.name, (now - fsm.state_entered) / 1000.0))
120 class Reconnect(object):
126 return fsm.state_entered
132 def __init__(self, now):
133 """Creates and returns a new reconnect FSM with default settings. The
134 FSM is initially disabled. The caller will likely want to call
135 self.enable() and self.set_name() on the returned object."""
138 self.min_backoff = 1000
139 self.max_backoff = 8000
140 self.probe_interval = 5000
142 self.info_level = logging.info
144 self.state = Reconnect.Void
145 self.state_entered = now
147 self.last_received = now
148 self.last_connected = None
149 self.last_disconnected = None
150 self.max_tries = None
152 self.creation_time = now
153 self.n_attempted_connections = 0
154 self.n_successful_connections = 0
155 self.total_connected_duration = 0
158 def set_quiet(self, quiet):
159 """If 'quiet' is true, this object will log informational messages at
160 debug level, by default keeping them out of log files. This is
161 appropriate if the connection is one that is expected to be
162 short-lived, so that the log messages are merely distracting.
164 If 'quiet' is false, this object logs informational messages at info
165 level. This is the default.
167 This setting has no effect on the log level of debugging, warning, or
170 self.info_level = logging.debug
172 self.info_level = logging.info
177 def set_name(self, name):
178 """Sets this object's name to 'name'. If 'name' is None, then "void"
181 The name is used in log messages."""
187 def get_min_backoff(self):
188 """Return the minimum number of milliseconds to back off between
189 consecutive connection attempts. The default is 1000 ms."""
190 return self.min_backoff
192 def get_max_backoff(self):
193 """Return the maximum number of milliseconds to back off between
194 consecutive connection attempts. The default is 8000 ms."""
195 return self.max_backoff
197 def get_probe_interval(self):
198 """Returns the "probe interval" in milliseconds. If this is zero, it
199 disables the connection keepalive feature. If it is nonzero, then if
200 the interval passes while the FSM is connected and without
201 self.received() being called, self.run() returns ovs.reconnect.PROBE.
202 If the interval passes again without self.received() being called,
203 self.run() returns ovs.reconnect.DISCONNECT."""
204 return self.probe_interval
206 def set_max_tries(self, max_tries):
207 """Limits the maximum number of times that this object will ask the
208 client to try to reconnect to 'max_tries'. None (the default) means an
209 unlimited number of tries.
211 After the number of tries has expired, the FSM will disable itself
212 instead of backing off and retrying."""
213 self.max_tries = max_tries
215 def get_max_tries(self):
216 """Returns the current remaining number of connection attempts,
217 None if the number is unlimited."""
218 return self.max_tries
220 def set_backoff(self, min_backoff, max_backoff):
221 """Configures the backoff parameters for this FSM. 'min_backoff' is
222 the minimum number of milliseconds, and 'max_backoff' is the maximum,
223 between connection attempts.
225 'min_backoff' must be at least 1000, and 'max_backoff' must be greater
226 than or equal to 'min_backoff'."""
227 self.min_backoff = max(min_backoff, 1000)
229 self.max_backoff = max(max_backoff, 1000)
231 self.max_backoff = 8000
232 if self.min_backoff > self.max_backoff:
233 self.max_backoff = self.min_backoff
235 if (self.state == Reconnect.Backoff and
236 self.backoff > self.max_backoff):
237 self.backoff = self.max_backoff
239 def set_probe_interval(self, probe_interval):
240 """Sets the "probe interval" to 'probe_interval', in milliseconds. If
241 this is zero, it disables the connection keepalive feature. If it is
242 nonzero, then if the interval passes while this FSM is connected and
243 without self.received() being called, self.run() returns
244 ovs.reconnect.PROBE. If the interval passes again without
245 self.received() being called, self.run() returns
246 ovs.reconnect.DISCONNECT.
248 If 'probe_interval' is nonzero, then it will be forced to a value of at
251 self.probe_interval = max(1000, probe_interval)
253 self.probe_interval = 0
255 def is_passive(self):
256 """Returns true if 'fsm' is in passive mode, false if 'fsm' is in
257 active mode (the default)."""
260 def set_passive(self, passive, now):
261 """Configures this FSM for active or passive mode. In active mode (the
262 default), the FSM is attempting to connect to a remote host. In
263 passive mode, the FSM is listening for connections from a remote host."""
264 if self.passive != passive:
265 self.passive = passive
267 if ((passive and self.state in (Reconnect.ConnectInProgress,
268 Reconnect.Reconnect)) or
269 (not passive and self.state == Reconnect.Listening
270 and self.__may_retry())):
271 self._transition(now, Reconnect.Backoff)
274 def is_enabled(self):
275 """Returns true if this FSM has been enabled with self.enable().
276 Calling another function that indicates a change in connection state,
277 such as self.disconnected() or self.force_reconnect(), will also enable
279 return self.state != Reconnect.Void
281 def enable(self, now):
282 """If this FSM is disabled (the default for newly created FSMs),
283 enables it, so that the next call to reconnect_run() for 'fsm' will
284 return ovs.reconnect.CONNECT.
286 If this FSM is not disabled, this function has no effect."""
287 if self.state == Reconnect.Void and self.__may_retry():
288 self._transition(now, Reconnect.Backoff)
291 def disable(self, now):
292 """Disables this FSM. Until 'fsm' is enabled again, self.run() will
294 if self.state != Reconnect.Void:
295 self._transition(now, Reconnect.Void)
297 def force_reconnect(self, now):
298 """If this FSM is enabled and currently connected (or attempting to
299 connect), forces self.run() to return ovs.reconnect.DISCONNECT the next
300 time it is called, which should cause the client to drop the connection
301 (or attempt), back off, and then reconnect."""
302 if self.state in (Reconnect.ConnectInProgress,
305 self._transition(now, Reconnect.Reconnect)
307 def disconnected(self, now, error):
308 """Tell this FSM that the connection dropped or that a connection
309 attempt failed. 'error' specifies the reason: a positive value
310 represents an errno value, EOF indicates that the connection was closed
311 by the peer (e.g. read() returned 0), and 0 indicates no specific
314 The FSM will back off, then reconnect."""
315 if self.state not in (Reconnect.Backoff, Reconnect.Void):
316 # Report what happened
317 if self.state in (Reconnect.Active, Reconnect.Idle):
319 logging.warning("%s: connection dropped (%s)"
320 % (self.name, os.strerror(error)))
322 self.info_level("%s: connection closed by peer"
325 self.info_level("%s: connection dropped" % self.name)
326 elif self.state == Reconnect.Listening:
328 logging.warning("%s: error listening for connections (%s)"
329 % (self.name, os.strerror(error)))
331 self.info_level("%s: error listening for connections"
339 logging.warning("%s: %s attempt failed (%s)"
340 % (self.name, type_, os.strerror(error)))
342 self.info_level("%s: %s attempt timed out"
343 % (self.name, type_))
345 if (self.state in (Reconnect.Active, Reconnect.Idle)):
346 self.last_disconnected = now
349 if (self.state in (Reconnect.Active, Reconnect.Idle) and
350 (self.last_received - self.last_connected >= self.backoff or
355 self.backoff = self.min_backoff
357 if self.backoff < self.min_backoff:
358 self.backoff = self.min_backoff
359 elif self.backoff >= self.max_backoff / 2:
360 self.backoff = self.max_backoff
365 self.info_level("%s: waiting %.3g seconds before trying "
367 % (self.name, self.backoff / 1000.0))
369 self.info_level("%s: waiting %.3g seconds before reconnect"
370 % (self.name, self.backoff / 1000.0))
372 if self.__may_retry():
373 self._transition(now, Reconnect.Backoff)
375 self._transition(now, Reconnect.Void)
377 def connecting(self, now):
378 """Tell this FSM that a connection or listening attempt is in progress.
380 The FSM will start a timer, after which the connection or listening
381 attempt will be aborted (by returning ovs.reconnect.DISCONNECT from
383 if self.state != Reconnect.ConnectInProgress:
385 self.info_level("%s: listening..." % self.name)
387 self.info_level("%s: connecting..." % self.name)
388 self._transition(now, Reconnect.ConnectInProgress)
390 def listening(self, now):
391 """Tell this FSM that the client is listening for connection attempts.
392 This state last indefinitely until the client reports some change.
394 The natural progression from this state is for the client to report
395 that a connection has been accepted or is in progress of being
396 accepted, by calling self.connecting() or self.connected().
398 The client may also report that listening failed (e.g. accept()
399 returned an unexpected error such as ENOMEM) by calling
400 self.listen_error(), in which case the FSM will back off and eventually
401 return ovs.reconnect.CONNECT from self.run() to tell the client to try
403 if self.state != Reconnect.Listening:
404 self.info_level("%s: listening..." % self.name)
405 self._transition(now, Reconnect.Listening)
407 def listen_error(self, now, error):
408 """Tell this FSM that the client's attempt to accept a connection
409 failed (e.g. accept() returned an unexpected error such as ENOMEM).
411 If the FSM is currently listening (self.listening() was called), it
412 will back off and eventually return ovs.reconnect.CONNECT from
413 self.run() to tell the client to try listening again. If there is an
414 active connection, this will be delayed until that connection drops."""
415 if self.state == Reconnect.Listening:
416 self.disconnected(now, error)
418 def connected(self, now):
419 """Tell this FSM that the connection was successful.
421 The FSM will start the probe interval timer, which is reset by
422 self.received(). If the timer expires, a probe will be sent (by
423 returning ovs.reconnect.PROBE from self.run(). If the timer expires
424 again without being reset, the connection will be aborted (by returning
425 ovs.reconnect.DISCONNECT from self.run()."""
426 if not self.state.is_connected:
429 self.info_level("%s: connected" % self.name)
430 self._transition(now, Reconnect.Active)
431 self.last_connected = now
433 def connect_failed(self, now, error):
434 """Tell this FSM that the connection attempt failed.
436 The FSM will back off and attempt to reconnect."""
438 self.disconnected(now, error)
440 def received(self, now):
441 """Tell this FSM that some data was received. This resets the probe
442 interval timer, so that the connection is known not to be idle."""
443 if self.state != Reconnect.Active:
444 self._transition(now, Reconnect.Active)
445 self.last_received = now
447 def _transition(self, now, state):
448 if self.state == Reconnect.ConnectInProgress:
449 self.n_attempted_connections += 1
450 if state == Reconnect.Active:
451 self.n_successful_connections += 1
453 connected_before = self.state.is_connected
454 connected_now = state.is_connected
455 if connected_before != connected_now:
457 self.total_connected_duration += now - self.last_connected
460 logging.debug("%s: entering %s" % (self.name, state.name))
462 self.state_entered = now
465 """Assesses whether any action should be taken on this FSM. The return
468 - None: The client need not take any action.
470 - Active client, ovs.reconnect.CONNECT: The client should start a
471 connection attempt and indicate this by calling
472 self.connecting(). If the connection attempt has definitely
473 succeeded, it should call self.connected(). If the connection
474 attempt has definitely failed, it should call
475 self.connect_failed().
477 The FSM is smart enough to back off correctly after successful
478 connections that quickly abort, so it is OK to call
479 self.connected() after a low-level successful connection
480 (e.g. connect()) even if the connection might soon abort due to a
481 failure at a high-level (e.g. SSL negotiation failure).
483 - Passive client, ovs.reconnect.CONNECT: The client should try to
484 listen for a connection, if it is not already listening. It
485 should call self.listening() if successful, otherwise
486 self.connecting() or reconnected_connect_failed() if the attempt
487 is in progress or definitely failed, respectively.
489 A listening passive client should constantly attempt to accept a
490 new connection and report an accepted connection with
493 - ovs.reconnect.DISCONNECT: The client should abort the current
494 connection or connection attempt or listen attempt and call
495 self.disconnected() or self.connect_failed() to indicate it.
497 - ovs.reconnect.PROBE: The client should send some kind of request
498 to the peer that will elicit a response, to ensure that the
499 connection is indeed in working order. (This will only be
500 returned if the "probe interval" is nonzero--see
501 self.set_probe_interval())."""
502 if now >= self.state.deadline(self):
503 return self.state.run(self, now)
507 def wait(self, poller, now):
508 """Causes the next call to poller.block() to wake up when self.run()
510 timeout = self.timeout(now)
512 poller.timer_wait(timeout)
514 def timeout(self, now):
515 """Returns the number of milliseconds after which self.run() should be
516 called if nothing else notable happens in the meantime, or None if this
517 is currently unnecessary."""
518 deadline = self.state.deadline(self)
519 if deadline is not None:
520 remaining = deadline - now
521 return max(0, remaining)
525 def is_connected(self):
526 """Returns True if this FSM is currently believed to be connected, that
527 is, if self.connected() was called more recently than any call to
528 self.connect_failed() or self.disconnected() or self.disable(), and
530 return self.state.is_connected
532 def get_last_connect_elapsed(self, now):
533 """Returns the number of milliseconds since 'fsm' was last connected
534 to its peer. Returns None if never connected."""
535 if self.last_connected:
536 return now - self.last_connected
540 def get_last_disconnect_elapsed(self, now):
541 """Returns the number of milliseconds since 'fsm' was last disconnected
542 from its peer. Returns None if never disconnected."""
543 if self.last_disconnected:
544 return now - self.last_disconnected
548 def get_stats(self, now):
552 stats.creation_time = self.creation_time
553 stats.last_connected = self.last_connected
554 stats.last_disconnected = self.last_disconnected
555 stats.last_received = self.last_received
556 stats.backoff = self.backoff
557 stats.seqno = self.seqno
558 stats.is_connected = self.is_connected()
559 stats.msec_since_connect = self.get_last_connect_elapsed(now)
560 stats.msec_since_disconnect = self.get_last_disconnect_elapsed(now)
561 stats.total_connected_duration = self.total_connected_duration
562 if self.is_connected():
563 stats.total_connected_duration += self.get_last_connect_elapsed(now)
564 stats.n_attempted_connections = self.n_attempted_connections
565 stats.n_successful_connections = self.n_successful_connections
566 stats.state = self.state.name
567 stats.state_elapsed = now - self.state_entered
570 def __may_retry(self):
571 if self.max_tries is None:
573 elif self.max_tries > 0: