1 # Copyright (c) 2010, 2011, 2012 Nicira, Inc.
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at:
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
20 # Values returned by Reconnect.run()
22 DISCONNECT = 'disconnect'
26 vlog = ovs.vlog.Vlog("reconnect")
29 class Reconnect(object):
30 """A finite-state machine for connecting and reconnecting to a network
31 resource with exponential backoff. It also provides optional support for
32 detecting a connection on which the peer is no longer responding.
34 The library does not implement anything networking related, only an FSM for
35 networking code to use.
37 Many Reconnect methods take a "now" argument. This makes testing easier
38 since there is no hidden state. When not testing, just pass the return
39 value of ovs.time.msec(). (Perhaps this design should be revisited
54 class Listening(object):
66 class Backoff(object):
72 return fsm.state_entered + fsm.backoff
78 class ConnectInProgress(object):
84 return fsm.state_entered + max(1000, fsm.backoff)
96 if fsm.probe_interval:
97 base = max(fsm.last_activity, fsm.state_entered)
98 return base + fsm.probe_interval
103 vlog.dbg("%s: idle %d ms, sending inactivity probe"
105 now - max(fsm.last_activity, fsm.state_entered)))
106 fsm._transition(now, Reconnect.Idle)
115 if fsm.probe_interval:
116 return fsm.state_entered + fsm.probe_interval
121 vlog.err("%s: no response to inactivity probe after %.3g "
122 "seconds, disconnecting"
123 % (fsm.name, (now - fsm.state_entered) / 1000.0))
126 class Reconnect(object):
132 return fsm.state_entered
138 def __init__(self, now):
139 """Creates and returns a new reconnect FSM with default settings. The
140 FSM is initially disabled. The caller will likely want to call
141 self.enable() and self.set_name() on the returned object."""
144 self.min_backoff = 1000
145 self.max_backoff = 8000
146 self.probe_interval = 5000
148 self.info_level = vlog.info
150 self.state = Reconnect.Void
151 self.state_entered = now
153 self.last_activity = now
154 self.last_connected = None
155 self.last_disconnected = None
156 self.max_tries = None
158 self.creation_time = now
159 self.n_attempted_connections = 0
160 self.n_successful_connections = 0
161 self.total_connected_duration = 0
164 def set_quiet(self, quiet):
165 """If 'quiet' is true, this object will log informational messages at
166 debug level, by default keeping them out of log files. This is
167 appropriate if the connection is one that is expected to be
168 short-lived, so that the log messages are merely distracting.
170 If 'quiet' is false, this object logs informational messages at info
171 level. This is the default.
173 This setting has no effect on the log level of debugging, warning, or
176 self.info_level = vlog.dbg
178 self.info_level = vlog.info
183 def set_name(self, name):
184 """Sets this object's name to 'name'. If 'name' is None, then "void"
187 The name is used in log messages."""
193 def get_min_backoff(self):
194 """Return the minimum number of milliseconds to back off between
195 consecutive connection attempts. The default is 1000 ms."""
196 return self.min_backoff
198 def get_max_backoff(self):
199 """Return the maximum number of milliseconds to back off between
200 consecutive connection attempts. The default is 8000 ms."""
201 return self.max_backoff
203 def get_probe_interval(self):
204 """Returns the "probe interval" in milliseconds. If this is zero, it
205 disables the connection keepalive feature. If it is nonzero, then if
206 the interval passes while the FSM is connected and without
207 self.activity() being called, self.run() returns ovs.reconnect.PROBE.
208 If the interval passes again without self.activity() being called,
209 self.run() returns ovs.reconnect.DISCONNECT."""
210 return self.probe_interval
212 def set_max_tries(self, max_tries):
213 """Limits the maximum number of times that this object will ask the
214 client to try to reconnect to 'max_tries'. None (the default) means an
215 unlimited number of tries.
217 After the number of tries has expired, the FSM will disable itself
218 instead of backing off and retrying."""
219 self.max_tries = max_tries
221 def get_max_tries(self):
222 """Returns the current remaining number of connection attempts,
223 None if the number is unlimited."""
224 return self.max_tries
226 def set_backoff(self, min_backoff, max_backoff):
227 """Configures the backoff parameters for this FSM. 'min_backoff' is
228 the minimum number of milliseconds, and 'max_backoff' is the maximum,
229 between connection attempts.
231 'min_backoff' must be at least 1000, and 'max_backoff' must be greater
232 than or equal to 'min_backoff'."""
233 self.min_backoff = max(min_backoff, 1000)
235 self.max_backoff = max(max_backoff, 1000)
237 self.max_backoff = 8000
238 if self.min_backoff > self.max_backoff:
239 self.max_backoff = self.min_backoff
241 if (self.state == Reconnect.Backoff and
242 self.backoff > self.max_backoff):
243 self.backoff = self.max_backoff
245 def set_probe_interval(self, probe_interval):
246 """Sets the "probe interval" to 'probe_interval', in milliseconds. If
247 this is zero, it disables the connection keepalive feature. If it is
248 nonzero, then if the interval passes while this FSM is connected and
249 without self.activity() being called, self.run() returns
250 ovs.reconnect.PROBE. If the interval passes again without
251 self.activity() being called, self.run() returns
252 ovs.reconnect.DISCONNECT.
254 If 'probe_interval' is nonzero, then it will be forced to a value of at
257 self.probe_interval = max(1000, probe_interval)
259 self.probe_interval = 0
261 def is_passive(self):
262 """Returns true if 'fsm' is in passive mode, false if 'fsm' is in
263 active mode (the default)."""
266 def set_passive(self, passive, now):
267 """Configures this FSM for active or passive mode. In active mode (the
268 default), the FSM is attempting to connect to a remote host. In
269 passive mode, the FSM is listening for connections from a remote
271 if self.passive != passive:
272 self.passive = passive
274 if ((passive and self.state in (Reconnect.ConnectInProgress,
275 Reconnect.Reconnect)) or
276 (not passive and self.state == Reconnect.Listening
277 and self.__may_retry())):
278 self._transition(now, Reconnect.Backoff)
281 def is_enabled(self):
282 """Returns true if this FSM has been enabled with self.enable().
283 Calling another function that indicates a change in connection state,
284 such as self.disconnected() or self.force_reconnect(), will also enable
286 return self.state != Reconnect.Void
288 def enable(self, now):
289 """If this FSM is disabled (the default for newly created FSMs),
290 enables it, so that the next call to reconnect_run() for 'fsm' will
291 return ovs.reconnect.CONNECT.
293 If this FSM is not disabled, this function has no effect."""
294 if self.state == Reconnect.Void and self.__may_retry():
295 self._transition(now, Reconnect.Backoff)
298 def disable(self, now):
299 """Disables this FSM. Until 'fsm' is enabled again, self.run() will
301 if self.state != Reconnect.Void:
302 self._transition(now, Reconnect.Void)
304 def force_reconnect(self, now):
305 """If this FSM is enabled and currently connected (or attempting to
306 connect), forces self.run() to return ovs.reconnect.DISCONNECT the next
307 time it is called, which should cause the client to drop the connection
308 (or attempt), back off, and then reconnect."""
309 if self.state in (Reconnect.ConnectInProgress,
312 self._transition(now, Reconnect.Reconnect)
314 def disconnected(self, now, error):
315 """Tell this FSM that the connection dropped or that a connection
316 attempt failed. 'error' specifies the reason: a positive value
317 represents an errno value, EOF indicates that the connection was closed
318 by the peer (e.g. read() returned 0), and 0 indicates no specific
321 The FSM will back off, then reconnect."""
322 if self.state not in (Reconnect.Backoff, Reconnect.Void):
323 # Report what happened
324 if self.state in (Reconnect.Active, Reconnect.Idle):
326 vlog.warn("%s: connection dropped (%s)"
327 % (self.name, os.strerror(error)))
329 self.info_level("%s: connection closed by peer"
332 self.info_level("%s: connection dropped" % self.name)
333 elif self.state == Reconnect.Listening:
335 vlog.warn("%s: error listening for connections (%s)"
336 % (self.name, os.strerror(error)))
338 self.info_level("%s: error listening for connections"
346 vlog.warn("%s: %s attempt failed (%s)"
347 % (self.name, type_, os.strerror(error)))
349 self.info_level("%s: %s attempt timed out"
350 % (self.name, type_))
352 if (self.state in (Reconnect.Active, Reconnect.Idle)):
353 self.last_disconnected = now
356 if (self.state in (Reconnect.Active, Reconnect.Idle) and
357 (self.last_activity - self.last_connected >= self.backoff or
362 self.backoff = self.min_backoff
364 if self.backoff < self.min_backoff:
365 self.backoff = self.min_backoff
366 elif self.backoff >= self.max_backoff / 2:
367 self.backoff = self.max_backoff
372 self.info_level("%s: waiting %.3g seconds before trying "
374 % (self.name, self.backoff / 1000.0))
376 self.info_level("%s: waiting %.3g seconds before reconnect"
377 % (self.name, self.backoff / 1000.0))
379 if self.__may_retry():
380 self._transition(now, Reconnect.Backoff)
382 self._transition(now, Reconnect.Void)
384 def connecting(self, now):
385 """Tell this FSM that a connection or listening attempt is in progress.
387 The FSM will start a timer, after which the connection or listening
388 attempt will be aborted (by returning ovs.reconnect.DISCONNECT from
390 if self.state != Reconnect.ConnectInProgress:
392 self.info_level("%s: listening..." % self.name)
394 self.info_level("%s: connecting..." % self.name)
395 self._transition(now, Reconnect.ConnectInProgress)
397 def listening(self, now):
398 """Tell this FSM that the client is listening for connection attempts.
399 This state last indefinitely until the client reports some change.
401 The natural progression from this state is for the client to report
402 that a connection has been accepted or is in progress of being
403 accepted, by calling self.connecting() or self.connected().
405 The client may also report that listening failed (e.g. accept()
406 returned an unexpected error such as ENOMEM) by calling
407 self.listen_error(), in which case the FSM will back off and eventually
408 return ovs.reconnect.CONNECT from self.run() to tell the client to try
410 if self.state != Reconnect.Listening:
411 self.info_level("%s: listening..." % self.name)
412 self._transition(now, Reconnect.Listening)
414 def listen_error(self, now, error):
415 """Tell this FSM that the client's attempt to accept a connection
416 failed (e.g. accept() returned an unexpected error such as ENOMEM).
418 If the FSM is currently listening (self.listening() was called), it
419 will back off and eventually return ovs.reconnect.CONNECT from
420 self.run() to tell the client to try listening again. If there is an
421 active connection, this will be delayed until that connection drops."""
422 if self.state == Reconnect.Listening:
423 self.disconnected(now, error)
425 def connected(self, now):
426 """Tell this FSM that the connection was successful.
428 The FSM will start the probe interval timer, which is reset by
429 self.activity(). If the timer expires, a probe will be sent (by
430 returning ovs.reconnect.PROBE from self.run(). If the timer expires
431 again without being reset, the connection will be aborted (by returning
432 ovs.reconnect.DISCONNECT from self.run()."""
433 if not self.state.is_connected:
436 self.info_level("%s: connected" % self.name)
437 self._transition(now, Reconnect.Active)
438 self.last_connected = now
440 def connect_failed(self, now, error):
441 """Tell this FSM that the connection attempt failed.
443 The FSM will back off and attempt to reconnect."""
445 self.disconnected(now, error)
447 def activity(self, now):
448 """Tell this FSM that some activity occurred on the connection. This
449 resets the probe interval timer, so that the connection is known not to
451 if self.state != Reconnect.Active:
452 self._transition(now, Reconnect.Active)
453 self.last_activity = now
455 def _transition(self, now, state):
456 if self.state == Reconnect.ConnectInProgress:
457 self.n_attempted_connections += 1
458 if state == Reconnect.Active:
459 self.n_successful_connections += 1
461 connected_before = self.state.is_connected
462 connected_now = state.is_connected
463 if connected_before != connected_now:
465 self.total_connected_duration += now - self.last_connected
468 vlog.dbg("%s: entering %s" % (self.name, state.name))
470 self.state_entered = now
473 """Assesses whether any action should be taken on this FSM. The return
476 - None: The client need not take any action.
478 - Active client, ovs.reconnect.CONNECT: The client should start a
479 connection attempt and indicate this by calling
480 self.connecting(). If the connection attempt has definitely
481 succeeded, it should call self.connected(). If the connection
482 attempt has definitely failed, it should call
483 self.connect_failed().
485 The FSM is smart enough to back off correctly after successful
486 connections that quickly abort, so it is OK to call
487 self.connected() after a low-level successful connection
488 (e.g. connect()) even if the connection might soon abort due to a
489 failure at a high-level (e.g. SSL negotiation failure).
491 - Passive client, ovs.reconnect.CONNECT: The client should try to
492 listen for a connection, if it is not already listening. It
493 should call self.listening() if successful, otherwise
494 self.connecting() or reconnected_connect_failed() if the attempt
495 is in progress or definitely failed, respectively.
497 A listening passive client should constantly attempt to accept a
498 new connection and report an accepted connection with
501 - ovs.reconnect.DISCONNECT: The client should abort the current
502 connection or connection attempt or listen attempt and call
503 self.disconnected() or self.connect_failed() to indicate it.
505 - ovs.reconnect.PROBE: The client should send some kind of request
506 to the peer that will elicit a response, to ensure that the
507 connection is indeed in working order. (This will only be
508 returned if the "probe interval" is nonzero--see
509 self.set_probe_interval())."""
511 deadline = self.state.deadline(self)
512 if deadline is not None and now >= deadline:
513 return self.state.run(self, now)
517 def wait(self, poller, now):
518 """Causes the next call to poller.block() to wake up when self.run()
520 timeout = self.timeout(now)
522 poller.timer_wait(timeout)
524 def timeout(self, now):
525 """Returns the number of milliseconds after which self.run() should be
526 called if nothing else notable happens in the meantime, or None if this
527 is currently unnecessary."""
528 deadline = self.state.deadline(self)
529 if deadline is not None:
530 remaining = deadline - now
531 return max(0, remaining)
535 def is_connected(self):
536 """Returns True if this FSM is currently believed to be connected, that
537 is, if self.connected() was called more recently than any call to
538 self.connect_failed() or self.disconnected() or self.disable(), and
540 return self.state.is_connected
542 def get_last_connect_elapsed(self, now):
543 """Returns the number of milliseconds since 'fsm' was last connected
544 to its peer. Returns None if never connected."""
545 if self.last_connected:
546 return now - self.last_connected
550 def get_last_disconnect_elapsed(self, now):
551 """Returns the number of milliseconds since 'fsm' was last disconnected
552 from its peer. Returns None if never disconnected."""
553 if self.last_disconnected:
554 return now - self.last_disconnected
558 def get_stats(self, now):
562 stats.creation_time = self.creation_time
563 stats.last_connected = self.last_connected
564 stats.last_disconnected = self.last_disconnected
565 stats.last_activity = self.last_activity
566 stats.backoff = self.backoff
567 stats.seqno = self.seqno
568 stats.is_connected = self.is_connected()
569 stats.msec_since_connect = self.get_last_connect_elapsed(now)
570 stats.msec_since_disconnect = self.get_last_disconnect_elapsed(now)
571 stats.total_connected_duration = self.total_connected_duration
572 if self.is_connected():
573 stats.total_connected_duration += (
574 self.get_last_connect_elapsed(now))
575 stats.n_attempted_connections = self.n_attempted_connections
576 stats.n_successful_connections = self.n_successful_connections
577 stats.state = self.state.name
578 stats.state_elapsed = now - self.state_entered
581 def __may_retry(self):
582 if self.max_tries is None:
584 elif self.max_tries > 0: