1 # Copyright (c) 2010, 2011, 2012 Nicira Networks
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at:
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
20 # Values returned by Reconnect.run()
22 DISCONNECT = 'disconnect'
26 vlog = ovs.vlog.Vlog("reconnect")
29 class Reconnect(object):
30 """A finite-state machine for connecting and reconnecting to a network
31 resource with exponential backoff. It also provides optional support for
32 detecting a connection on which the peer is no longer responding.
34 The library does not implement anything networking related, only an FSM for
35 networking code to use.
37 Many Reconnect methods take a "now" argument. This makes testing easier
38 since there is no hidden state. When not testing, just pass the return
39 value of ovs.time.msec(). (Perhaps this design should be revisited
54 class Listening(object):
66 class Backoff(object):
72 return fsm.state_entered + fsm.backoff
78 class ConnectInProgress(object):
84 return fsm.state_entered + max(1000, fsm.backoff)
96 if fsm.probe_interval:
97 base = max(fsm.last_received, fsm.state_entered)
98 return base + fsm.probe_interval
103 vlog.dbg("%s: idle %d ms, sending inactivity probe"
105 now - max(fsm.last_received, fsm.state_entered)))
106 fsm._transition(now, Reconnect.Idle)
115 return fsm.state_entered + fsm.probe_interval
119 vlog.err("%s: no response to inactivity probe after %.3g "
120 "seconds, disconnecting"
121 % (fsm.name, (now - fsm.state_entered) / 1000.0))
124 class Reconnect(object):
130 return fsm.state_entered
136 def __init__(self, now):
137 """Creates and returns a new reconnect FSM with default settings. The
138 FSM is initially disabled. The caller will likely want to call
139 self.enable() and self.set_name() on the returned object."""
142 self.min_backoff = 1000
143 self.max_backoff = 8000
144 self.probe_interval = 5000
146 self.info_level = vlog.info
148 self.state = Reconnect.Void
149 self.state_entered = now
151 self.last_received = now
152 self.last_connected = None
153 self.last_disconnected = None
154 self.max_tries = None
156 self.creation_time = now
157 self.n_attempted_connections = 0
158 self.n_successful_connections = 0
159 self.total_connected_duration = 0
162 def set_quiet(self, quiet):
163 """If 'quiet' is true, this object will log informational messages at
164 debug level, by default keeping them out of log files. This is
165 appropriate if the connection is one that is expected to be
166 short-lived, so that the log messages are merely distracting.
168 If 'quiet' is false, this object logs informational messages at info
169 level. This is the default.
171 This setting has no effect on the log level of debugging, warning, or
174 self.info_level = vlog.dbg
176 self.info_level = vlog.info
181 def set_name(self, name):
182 """Sets this object's name to 'name'. If 'name' is None, then "void"
185 The name is used in log messages."""
191 def get_min_backoff(self):
192 """Return the minimum number of milliseconds to back off between
193 consecutive connection attempts. The default is 1000 ms."""
194 return self.min_backoff
196 def get_max_backoff(self):
197 """Return the maximum number of milliseconds to back off between
198 consecutive connection attempts. The default is 8000 ms."""
199 return self.max_backoff
201 def get_probe_interval(self):
202 """Returns the "probe interval" in milliseconds. If this is zero, it
203 disables the connection keepalive feature. If it is nonzero, then if
204 the interval passes while the FSM is connected and without
205 self.received() being called, self.run() returns ovs.reconnect.PROBE.
206 If the interval passes again without self.received() being called,
207 self.run() returns ovs.reconnect.DISCONNECT."""
208 return self.probe_interval
210 def set_max_tries(self, max_tries):
211 """Limits the maximum number of times that this object will ask the
212 client to try to reconnect to 'max_tries'. None (the default) means an
213 unlimited number of tries.
215 After the number of tries has expired, the FSM will disable itself
216 instead of backing off and retrying."""
217 self.max_tries = max_tries
219 def get_max_tries(self):
220 """Returns the current remaining number of connection attempts,
221 None if the number is unlimited."""
222 return self.max_tries
224 def set_backoff(self, min_backoff, max_backoff):
225 """Configures the backoff parameters for this FSM. 'min_backoff' is
226 the minimum number of milliseconds, and 'max_backoff' is the maximum,
227 between connection attempts.
229 'min_backoff' must be at least 1000, and 'max_backoff' must be greater
230 than or equal to 'min_backoff'."""
231 self.min_backoff = max(min_backoff, 1000)
233 self.max_backoff = max(max_backoff, 1000)
235 self.max_backoff = 8000
236 if self.min_backoff > self.max_backoff:
237 self.max_backoff = self.min_backoff
239 if (self.state == Reconnect.Backoff and
240 self.backoff > self.max_backoff):
241 self.backoff = self.max_backoff
243 def set_probe_interval(self, probe_interval):
244 """Sets the "probe interval" to 'probe_interval', in milliseconds. If
245 this is zero, it disables the connection keepalive feature. If it is
246 nonzero, then if the interval passes while this FSM is connected and
247 without self.received() being called, self.run() returns
248 ovs.reconnect.PROBE. If the interval passes again without
249 self.received() being called, self.run() returns
250 ovs.reconnect.DISCONNECT.
252 If 'probe_interval' is nonzero, then it will be forced to a value of at
255 self.probe_interval = max(1000, probe_interval)
257 self.probe_interval = 0
259 def is_passive(self):
260 """Returns true if 'fsm' is in passive mode, false if 'fsm' is in
261 active mode (the default)."""
264 def set_passive(self, passive, now):
265 """Configures this FSM for active or passive mode. In active mode (the
266 default), the FSM is attempting to connect to a remote host. In
267 passive mode, the FSM is listening for connections from a remote
269 if self.passive != passive:
270 self.passive = passive
272 if ((passive and self.state in (Reconnect.ConnectInProgress,
273 Reconnect.Reconnect)) or
274 (not passive and self.state == Reconnect.Listening
275 and self.__may_retry())):
276 self._transition(now, Reconnect.Backoff)
279 def is_enabled(self):
280 """Returns true if this FSM has been enabled with self.enable().
281 Calling another function that indicates a change in connection state,
282 such as self.disconnected() or self.force_reconnect(), will also enable
284 return self.state != Reconnect.Void
286 def enable(self, now):
287 """If this FSM is disabled (the default for newly created FSMs),
288 enables it, so that the next call to reconnect_run() for 'fsm' will
289 return ovs.reconnect.CONNECT.
291 If this FSM is not disabled, this function has no effect."""
292 if self.state == Reconnect.Void and self.__may_retry():
293 self._transition(now, Reconnect.Backoff)
296 def disable(self, now):
297 """Disables this FSM. Until 'fsm' is enabled again, self.run() will
299 if self.state != Reconnect.Void:
300 self._transition(now, Reconnect.Void)
302 def force_reconnect(self, now):
303 """If this FSM is enabled and currently connected (or attempting to
304 connect), forces self.run() to return ovs.reconnect.DISCONNECT the next
305 time it is called, which should cause the client to drop the connection
306 (or attempt), back off, and then reconnect."""
307 if self.state in (Reconnect.ConnectInProgress,
310 self._transition(now, Reconnect.Reconnect)
312 def disconnected(self, now, error):
313 """Tell this FSM that the connection dropped or that a connection
314 attempt failed. 'error' specifies the reason: a positive value
315 represents an errno value, EOF indicates that the connection was closed
316 by the peer (e.g. read() returned 0), and 0 indicates no specific
319 The FSM will back off, then reconnect."""
320 if self.state not in (Reconnect.Backoff, Reconnect.Void):
321 # Report what happened
322 if self.state in (Reconnect.Active, Reconnect.Idle):
324 vlog.warn("%s: connection dropped (%s)"
325 % (self.name, os.strerror(error)))
327 self.info_level("%s: connection closed by peer"
330 self.info_level("%s: connection dropped" % self.name)
331 elif self.state == Reconnect.Listening:
333 vlog.warn("%s: error listening for connections (%s)"
334 % (self.name, os.strerror(error)))
336 self.info_level("%s: error listening for connections"
344 vlog.warn("%s: %s attempt failed (%s)"
345 % (self.name, type_, os.strerror(error)))
347 self.info_level("%s: %s attempt timed out"
348 % (self.name, type_))
350 if (self.state in (Reconnect.Active, Reconnect.Idle)):
351 self.last_disconnected = now
354 if (self.state in (Reconnect.Active, Reconnect.Idle) and
355 (self.last_received - self.last_connected >= self.backoff or
360 self.backoff = self.min_backoff
362 if self.backoff < self.min_backoff:
363 self.backoff = self.min_backoff
364 elif self.backoff >= self.max_backoff / 2:
365 self.backoff = self.max_backoff
370 self.info_level("%s: waiting %.3g seconds before trying "
372 % (self.name, self.backoff / 1000.0))
374 self.info_level("%s: waiting %.3g seconds before reconnect"
375 % (self.name, self.backoff / 1000.0))
377 if self.__may_retry():
378 self._transition(now, Reconnect.Backoff)
380 self._transition(now, Reconnect.Void)
382 def connecting(self, now):
383 """Tell this FSM that a connection or listening attempt is in progress.
385 The FSM will start a timer, after which the connection or listening
386 attempt will be aborted (by returning ovs.reconnect.DISCONNECT from
388 if self.state != Reconnect.ConnectInProgress:
390 self.info_level("%s: listening..." % self.name)
392 self.info_level("%s: connecting..." % self.name)
393 self._transition(now, Reconnect.ConnectInProgress)
395 def listening(self, now):
396 """Tell this FSM that the client is listening for connection attempts.
397 This state last indefinitely until the client reports some change.
399 The natural progression from this state is for the client to report
400 that a connection has been accepted or is in progress of being
401 accepted, by calling self.connecting() or self.connected().
403 The client may also report that listening failed (e.g. accept()
404 returned an unexpected error such as ENOMEM) by calling
405 self.listen_error(), in which case the FSM will back off and eventually
406 return ovs.reconnect.CONNECT from self.run() to tell the client to try
408 if self.state != Reconnect.Listening:
409 self.info_level("%s: listening..." % self.name)
410 self._transition(now, Reconnect.Listening)
412 def listen_error(self, now, error):
413 """Tell this FSM that the client's attempt to accept a connection
414 failed (e.g. accept() returned an unexpected error such as ENOMEM).
416 If the FSM is currently listening (self.listening() was called), it
417 will back off and eventually return ovs.reconnect.CONNECT from
418 self.run() to tell the client to try listening again. If there is an
419 active connection, this will be delayed until that connection drops."""
420 if self.state == Reconnect.Listening:
421 self.disconnected(now, error)
423 def connected(self, now):
424 """Tell this FSM that the connection was successful.
426 The FSM will start the probe interval timer, which is reset by
427 self.received(). If the timer expires, a probe will be sent (by
428 returning ovs.reconnect.PROBE from self.run(). If the timer expires
429 again without being reset, the connection will be aborted (by returning
430 ovs.reconnect.DISCONNECT from self.run()."""
431 if not self.state.is_connected:
434 self.info_level("%s: connected" % self.name)
435 self._transition(now, Reconnect.Active)
436 self.last_connected = now
438 def connect_failed(self, now, error):
439 """Tell this FSM that the connection attempt failed.
441 The FSM will back off and attempt to reconnect."""
443 self.disconnected(now, error)
445 def received(self, now):
446 """Tell this FSM that some data was received. This resets the probe
447 interval timer, so that the connection is known not to be idle."""
448 if self.state != Reconnect.Active:
449 self._transition(now, Reconnect.Active)
450 self.last_received = now
452 def _transition(self, now, state):
453 if self.state == Reconnect.ConnectInProgress:
454 self.n_attempted_connections += 1
455 if state == Reconnect.Active:
456 self.n_successful_connections += 1
458 connected_before = self.state.is_connected
459 connected_now = state.is_connected
460 if connected_before != connected_now:
462 self.total_connected_duration += now - self.last_connected
465 vlog.dbg("%s: entering %s" % (self.name, state.name))
467 self.state_entered = now
470 """Assesses whether any action should be taken on this FSM. The return
473 - None: The client need not take any action.
475 - Active client, ovs.reconnect.CONNECT: The client should start a
476 connection attempt and indicate this by calling
477 self.connecting(). If the connection attempt has definitely
478 succeeded, it should call self.connected(). If the connection
479 attempt has definitely failed, it should call
480 self.connect_failed().
482 The FSM is smart enough to back off correctly after successful
483 connections that quickly abort, so it is OK to call
484 self.connected() after a low-level successful connection
485 (e.g. connect()) even if the connection might soon abort due to a
486 failure at a high-level (e.g. SSL negotiation failure).
488 - Passive client, ovs.reconnect.CONNECT: The client should try to
489 listen for a connection, if it is not already listening. It
490 should call self.listening() if successful, otherwise
491 self.connecting() or reconnected_connect_failed() if the attempt
492 is in progress or definitely failed, respectively.
494 A listening passive client should constantly attempt to accept a
495 new connection and report an accepted connection with
498 - ovs.reconnect.DISCONNECT: The client should abort the current
499 connection or connection attempt or listen attempt and call
500 self.disconnected() or self.connect_failed() to indicate it.
502 - ovs.reconnect.PROBE: The client should send some kind of request
503 to the peer that will elicit a response, to ensure that the
504 connection is indeed in working order. (This will only be
505 returned if the "probe interval" is nonzero--see
506 self.set_probe_interval())."""
507 if now >= self.state.deadline(self):
508 return self.state.run(self, now)
512 def wait(self, poller, now):
513 """Causes the next call to poller.block() to wake up when self.run()
515 timeout = self.timeout(now)
517 poller.timer_wait(timeout)
519 def timeout(self, now):
520 """Returns the number of milliseconds after which self.run() should be
521 called if nothing else notable happens in the meantime, or None if this
522 is currently unnecessary."""
523 deadline = self.state.deadline(self)
524 if deadline is not None:
525 remaining = deadline - now
526 return max(0, remaining)
530 def is_connected(self):
531 """Returns True if this FSM is currently believed to be connected, that
532 is, if self.connected() was called more recently than any call to
533 self.connect_failed() or self.disconnected() or self.disable(), and
535 return self.state.is_connected
537 def get_last_connect_elapsed(self, now):
538 """Returns the number of milliseconds since 'fsm' was last connected
539 to its peer. Returns None if never connected."""
540 if self.last_connected:
541 return now - self.last_connected
545 def get_last_disconnect_elapsed(self, now):
546 """Returns the number of milliseconds since 'fsm' was last disconnected
547 from its peer. Returns None if never disconnected."""
548 if self.last_disconnected:
549 return now - self.last_disconnected
553 def get_stats(self, now):
557 stats.creation_time = self.creation_time
558 stats.last_connected = self.last_connected
559 stats.last_disconnected = self.last_disconnected
560 stats.last_received = self.last_received
561 stats.backoff = self.backoff
562 stats.seqno = self.seqno
563 stats.is_connected = self.is_connected()
564 stats.msec_since_connect = self.get_last_connect_elapsed(now)
565 stats.msec_since_disconnect = self.get_last_disconnect_elapsed(now)
566 stats.total_connected_duration = self.total_connected_duration
567 if self.is_connected():
568 stats.total_connected_duration += (
569 self.get_last_connect_elapsed(now))
570 stats.n_attempted_connections = self.n_attempted_connections
571 stats.n_successful_connections = self.n_successful_connections
572 stats.state = self.state.name
573 stats.state_elapsed = now - self.state_entered
576 def __may_retry(self):
577 if self.max_tries is None:
579 elif self.max_tries > 0: