1 # Copyright (c) 2010, 2011, 2012 Nicira Networks
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at:
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
20 # Values returned by Reconnect.run()
22 DISCONNECT = 'disconnect'
26 vlog = ovs.vlog.Vlog("reconnect")
29 class Reconnect(object):
30 """A finite-state machine for connecting and reconnecting to a network
31 resource with exponential backoff. It also provides optional support for
32 detecting a connection on which the peer is no longer responding.
34 The library does not implement anything networking related, only an FSM for
35 networking code to use.
37 Many Reconnect methods take a "now" argument. This makes testing easier
38 since there is no hidden state. When not testing, just pass the return
39 value of ovs.time.msec(). (Perhaps this design should be revisited
54 class Listening(object):
66 class Backoff(object):
72 return fsm.state_entered + fsm.backoff
78 class ConnectInProgress(object):
84 return fsm.state_entered + max(1000, fsm.backoff)
96 if fsm.probe_interval:
97 base = max(fsm.last_received, fsm.state_entered)
98 return base + fsm.probe_interval
103 vlog.dbg("%s: idle %d ms, sending inactivity probe"
105 now - max(fsm.last_received, fsm.state_entered)))
106 fsm._transition(now, Reconnect.Idle)
115 if fsm.probe_interval:
116 return fsm.state_entered + fsm.probe_interval
121 vlog.err("%s: no response to inactivity probe after %.3g "
122 "seconds, disconnecting"
123 % (fsm.name, (now - fsm.state_entered) / 1000.0))
126 class Reconnect(object):
132 return fsm.state_entered
138 def __init__(self, now):
139 """Creates and returns a new reconnect FSM with default settings. The
140 FSM is initially disabled. The caller will likely want to call
141 self.enable() and self.set_name() on the returned object."""
144 self.min_backoff = 1000
145 self.max_backoff = 8000
146 self.probe_interval = 5000
148 self.info_level = vlog.info
150 self.state = Reconnect.Void
151 self.state_entered = now
153 self.last_received = now
154 self.last_connected = None
155 self.last_disconnected = None
156 self.max_tries = None
158 self.creation_time = now
159 self.n_attempted_connections = 0
160 self.n_successful_connections = 0
161 self.total_connected_duration = 0
164 def set_quiet(self, quiet):
165 """If 'quiet' is true, this object will log informational messages at
166 debug level, by default keeping them out of log files. This is
167 appropriate if the connection is one that is expected to be
168 short-lived, so that the log messages are merely distracting.
170 If 'quiet' is false, this object logs informational messages at info
171 level. This is the default.
173 This setting has no effect on the log level of debugging, warning, or
176 self.info_level = vlog.dbg
178 self.info_level = vlog.info
183 def set_name(self, name):
184 """Sets this object's name to 'name'. If 'name' is None, then "void"
187 The name is used in log messages."""
193 def get_min_backoff(self):
194 """Return the minimum number of milliseconds to back off between
195 consecutive connection attempts. The default is 1000 ms."""
196 return self.min_backoff
198 def get_max_backoff(self):
199 """Return the maximum number of milliseconds to back off between
200 consecutive connection attempts. The default is 8000 ms."""
201 return self.max_backoff
203 def get_probe_interval(self):
204 """Returns the "probe interval" in milliseconds. If this is zero, it
205 disables the connection keepalive feature. If it is nonzero, then if
206 the interval passes while the FSM is connected and without
207 self.received() being called, self.run() returns ovs.reconnect.PROBE.
208 If the interval passes again without self.received() being called,
209 self.run() returns ovs.reconnect.DISCONNECT."""
210 return self.probe_interval
212 def set_max_tries(self, max_tries):
213 """Limits the maximum number of times that this object will ask the
214 client to try to reconnect to 'max_tries'. None (the default) means an
215 unlimited number of tries.
217 After the number of tries has expired, the FSM will disable itself
218 instead of backing off and retrying."""
219 self.max_tries = max_tries
221 def get_max_tries(self):
222 """Returns the current remaining number of connection attempts,
223 None if the number is unlimited."""
224 return self.max_tries
226 def set_backoff(self, min_backoff, max_backoff):
227 """Configures the backoff parameters for this FSM. 'min_backoff' is
228 the minimum number of milliseconds, and 'max_backoff' is the maximum,
229 between connection attempts.
231 'min_backoff' must be at least 1000, and 'max_backoff' must be greater
232 than or equal to 'min_backoff'."""
233 self.min_backoff = max(min_backoff, 1000)
235 self.max_backoff = max(max_backoff, 1000)
237 self.max_backoff = 8000
238 if self.min_backoff > self.max_backoff:
239 self.max_backoff = self.min_backoff
241 if (self.state == Reconnect.Backoff and
242 self.backoff > self.max_backoff):
243 self.backoff = self.max_backoff
245 def set_probe_interval(self, probe_interval):
246 """Sets the "probe interval" to 'probe_interval', in milliseconds. If
247 this is zero, it disables the connection keepalive feature. If it is
248 nonzero, then if the interval passes while this FSM is connected and
249 without self.received() being called, self.run() returns
250 ovs.reconnect.PROBE. If the interval passes again without
251 self.received() being called, self.run() returns
252 ovs.reconnect.DISCONNECT.
254 If 'probe_interval' is nonzero, then it will be forced to a value of at
257 self.probe_interval = max(1000, probe_interval)
259 self.probe_interval = 0
261 def is_passive(self):
262 """Returns true if 'fsm' is in passive mode, false if 'fsm' is in
263 active mode (the default)."""
266 def set_passive(self, passive, now):
267 """Configures this FSM for active or passive mode. In active mode (the
268 default), the FSM is attempting to connect to a remote host. In
269 passive mode, the FSM is listening for connections from a remote
271 if self.passive != passive:
272 self.passive = passive
274 if ((passive and self.state in (Reconnect.ConnectInProgress,
275 Reconnect.Reconnect)) or
276 (not passive and self.state == Reconnect.Listening
277 and self.__may_retry())):
278 self._transition(now, Reconnect.Backoff)
281 def is_enabled(self):
282 """Returns true if this FSM has been enabled with self.enable().
283 Calling another function that indicates a change in connection state,
284 such as self.disconnected() or self.force_reconnect(), will also enable
286 return self.state != Reconnect.Void
288 def enable(self, now):
289 """If this FSM is disabled (the default for newly created FSMs),
290 enables it, so that the next call to reconnect_run() for 'fsm' will
291 return ovs.reconnect.CONNECT.
293 If this FSM is not disabled, this function has no effect."""
294 if self.state == Reconnect.Void and self.__may_retry():
295 self._transition(now, Reconnect.Backoff)
298 def disable(self, now):
299 """Disables this FSM. Until 'fsm' is enabled again, self.run() will
301 if self.state != Reconnect.Void:
302 self._transition(now, Reconnect.Void)
304 def force_reconnect(self, now):
305 """If this FSM is enabled and currently connected (or attempting to
306 connect), forces self.run() to return ovs.reconnect.DISCONNECT the next
307 time it is called, which should cause the client to drop the connection
308 (or attempt), back off, and then reconnect."""
309 if self.state in (Reconnect.ConnectInProgress,
312 self._transition(now, Reconnect.Reconnect)
314 def disconnected(self, now, error):
315 """Tell this FSM that the connection dropped or that a connection
316 attempt failed. 'error' specifies the reason: a positive value
317 represents an errno value, EOF indicates that the connection was closed
318 by the peer (e.g. read() returned 0), and 0 indicates no specific
321 The FSM will back off, then reconnect."""
322 if self.state not in (Reconnect.Backoff, Reconnect.Void):
323 # Report what happened
324 if self.state in (Reconnect.Active, Reconnect.Idle):
326 vlog.warn("%s: connection dropped (%s)"
327 % (self.name, os.strerror(error)))
329 self.info_level("%s: connection closed by peer"
332 self.info_level("%s: connection dropped" % self.name)
333 elif self.state == Reconnect.Listening:
335 vlog.warn("%s: error listening for connections (%s)"
336 % (self.name, os.strerror(error)))
338 self.info_level("%s: error listening for connections"
346 vlog.warn("%s: %s attempt failed (%s)"
347 % (self.name, type_, os.strerror(error)))
349 self.info_level("%s: %s attempt timed out"
350 % (self.name, type_))
352 if (self.state in (Reconnect.Active, Reconnect.Idle)):
353 self.last_disconnected = now
356 if (self.state in (Reconnect.Active, Reconnect.Idle) and
357 (self.last_received - self.last_connected >= self.backoff or
362 self.backoff = self.min_backoff
364 if self.backoff < self.min_backoff:
365 self.backoff = self.min_backoff
366 elif self.backoff >= self.max_backoff / 2:
367 self.backoff = self.max_backoff
372 self.info_level("%s: waiting %.3g seconds before trying "
374 % (self.name, self.backoff / 1000.0))
376 self.info_level("%s: waiting %.3g seconds before reconnect"
377 % (self.name, self.backoff / 1000.0))
379 if self.__may_retry():
380 self._transition(now, Reconnect.Backoff)
382 self._transition(now, Reconnect.Void)
384 def connecting(self, now):
385 """Tell this FSM that a connection or listening attempt is in progress.
387 The FSM will start a timer, after which the connection or listening
388 attempt will be aborted (by returning ovs.reconnect.DISCONNECT from
390 if self.state != Reconnect.ConnectInProgress:
392 self.info_level("%s: listening..." % self.name)
394 self.info_level("%s: connecting..." % self.name)
395 self._transition(now, Reconnect.ConnectInProgress)
397 def listening(self, now):
398 """Tell this FSM that the client is listening for connection attempts.
399 This state last indefinitely until the client reports some change.
401 The natural progression from this state is for the client to report
402 that a connection has been accepted or is in progress of being
403 accepted, by calling self.connecting() or self.connected().
405 The client may also report that listening failed (e.g. accept()
406 returned an unexpected error such as ENOMEM) by calling
407 self.listen_error(), in which case the FSM will back off and eventually
408 return ovs.reconnect.CONNECT from self.run() to tell the client to try
410 if self.state != Reconnect.Listening:
411 self.info_level("%s: listening..." % self.name)
412 self._transition(now, Reconnect.Listening)
414 def listen_error(self, now, error):
415 """Tell this FSM that the client's attempt to accept a connection
416 failed (e.g. accept() returned an unexpected error such as ENOMEM).
418 If the FSM is currently listening (self.listening() was called), it
419 will back off and eventually return ovs.reconnect.CONNECT from
420 self.run() to tell the client to try listening again. If there is an
421 active connection, this will be delayed until that connection drops."""
422 if self.state == Reconnect.Listening:
423 self.disconnected(now, error)
425 def connected(self, now):
426 """Tell this FSM that the connection was successful.
428 The FSM will start the probe interval timer, which is reset by
429 self.received(). If the timer expires, a probe will be sent (by
430 returning ovs.reconnect.PROBE from self.run(). If the timer expires
431 again without being reset, the connection will be aborted (by returning
432 ovs.reconnect.DISCONNECT from self.run()."""
433 if not self.state.is_connected:
436 self.info_level("%s: connected" % self.name)
437 self._transition(now, Reconnect.Active)
438 self.last_connected = now
440 def connect_failed(self, now, error):
441 """Tell this FSM that the connection attempt failed.
443 The FSM will back off and attempt to reconnect."""
445 self.disconnected(now, error)
447 def received(self, now):
448 """Tell this FSM that some data was received. This resets the probe
449 interval timer, so that the connection is known not to be idle."""
450 if self.state != Reconnect.Active:
451 self._transition(now, Reconnect.Active)
452 self.last_received = now
454 def _transition(self, now, state):
455 if self.state == Reconnect.ConnectInProgress:
456 self.n_attempted_connections += 1
457 if state == Reconnect.Active:
458 self.n_successful_connections += 1
460 connected_before = self.state.is_connected
461 connected_now = state.is_connected
462 if connected_before != connected_now:
464 self.total_connected_duration += now - self.last_connected
467 vlog.dbg("%s: entering %s" % (self.name, state.name))
469 self.state_entered = now
472 """Assesses whether any action should be taken on this FSM. The return
475 - None: The client need not take any action.
477 - Active client, ovs.reconnect.CONNECT: The client should start a
478 connection attempt and indicate this by calling
479 self.connecting(). If the connection attempt has definitely
480 succeeded, it should call self.connected(). If the connection
481 attempt has definitely failed, it should call
482 self.connect_failed().
484 The FSM is smart enough to back off correctly after successful
485 connections that quickly abort, so it is OK to call
486 self.connected() after a low-level successful connection
487 (e.g. connect()) even if the connection might soon abort due to a
488 failure at a high-level (e.g. SSL negotiation failure).
490 - Passive client, ovs.reconnect.CONNECT: The client should try to
491 listen for a connection, if it is not already listening. It
492 should call self.listening() if successful, otherwise
493 self.connecting() or reconnected_connect_failed() if the attempt
494 is in progress or definitely failed, respectively.
496 A listening passive client should constantly attempt to accept a
497 new connection and report an accepted connection with
500 - ovs.reconnect.DISCONNECT: The client should abort the current
501 connection or connection attempt or listen attempt and call
502 self.disconnected() or self.connect_failed() to indicate it.
504 - ovs.reconnect.PROBE: The client should send some kind of request
505 to the peer that will elicit a response, to ensure that the
506 connection is indeed in working order. (This will only be
507 returned if the "probe interval" is nonzero--see
508 self.set_probe_interval())."""
510 deadline = self.state.deadline(self)
511 if deadline is not None and now >= deadline:
512 return self.state.run(self, now)
516 def wait(self, poller, now):
517 """Causes the next call to poller.block() to wake up when self.run()
519 timeout = self.timeout(now)
521 poller.timer_wait(timeout)
523 def timeout(self, now):
524 """Returns the number of milliseconds after which self.run() should be
525 called if nothing else notable happens in the meantime, or None if this
526 is currently unnecessary."""
527 deadline = self.state.deadline(self)
528 if deadline is not None:
529 remaining = deadline - now
530 return max(0, remaining)
534 def is_connected(self):
535 """Returns True if this FSM is currently believed to be connected, that
536 is, if self.connected() was called more recently than any call to
537 self.connect_failed() or self.disconnected() or self.disable(), and
539 return self.state.is_connected
541 def get_last_connect_elapsed(self, now):
542 """Returns the number of milliseconds since 'fsm' was last connected
543 to its peer. Returns None if never connected."""
544 if self.last_connected:
545 return now - self.last_connected
549 def get_last_disconnect_elapsed(self, now):
550 """Returns the number of milliseconds since 'fsm' was last disconnected
551 from its peer. Returns None if never disconnected."""
552 if self.last_disconnected:
553 return now - self.last_disconnected
557 def get_stats(self, now):
561 stats.creation_time = self.creation_time
562 stats.last_connected = self.last_connected
563 stats.last_disconnected = self.last_disconnected
564 stats.last_received = self.last_received
565 stats.backoff = self.backoff
566 stats.seqno = self.seqno
567 stats.is_connected = self.is_connected()
568 stats.msec_since_connect = self.get_last_connect_elapsed(now)
569 stats.msec_since_disconnect = self.get_last_disconnect_elapsed(now)
570 stats.total_connected_duration = self.total_connected_duration
571 if self.is_connected():
572 stats.total_connected_duration += (
573 self.get_last_connect_elapsed(now))
574 stats.n_attempted_connections = self.n_attempted_connections
575 stats.n_successful_connections = self.n_successful_connections
576 stats.state = self.state.name
577 stats.state_elapsed = now - self.state_entered
580 def __may_retry(self):
581 if self.max_tries is None:
583 elif self.max_tries > 0: