1 # Copyright (c) 2010, 2011 Nicira Networks
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at:
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
19 # Values returned by Reconnect.run()
21 DISCONNECT = 'disconnect'
25 vlog = ovs.vlog.Vlog("reconnect")
28 class Reconnect(object):
29 """A finite-state machine for connecting and reconnecting to a network
30 resource with exponential backoff. It also provides optional support for
31 detecting a connection on which the peer is no longer responding.
33 The library does not implement anything networking related, only an FSM for
34 networking code to use.
36 Many Reconnect methods take a "now" argument. This makes testing easier
37 since there is no hidden state. When not testing, just pass the return
38 value of ovs.time.msec(). (Perhaps this design should be revisited
53 class Listening(object):
65 class Backoff(object):
71 return fsm.state_entered + fsm.backoff
77 class ConnectInProgress(object):
83 return fsm.state_entered + max(1000, fsm.backoff)
95 if fsm.probe_interval:
96 base = max(fsm.last_received, fsm.state_entered)
97 return base + fsm.probe_interval
102 vlog.dbg("%s: idle %d ms, sending inactivity probe"
104 now - max(fsm.last_received, fsm.state_entered)))
105 fsm._transition(now, Reconnect.Idle)
114 return fsm.state_entered + fsm.probe_interval
118 vlog.err("%s: no response to inactivity probe after %.3g "
119 "seconds, disconnecting"
120 % (fsm.name, (now - fsm.state_entered) / 1000.0))
123 class Reconnect(object):
129 return fsm.state_entered
135 def __init__(self, now):
136 """Creates and returns a new reconnect FSM with default settings. The
137 FSM is initially disabled. The caller will likely want to call
138 self.enable() and self.set_name() on the returned object."""
141 self.min_backoff = 1000
142 self.max_backoff = 8000
143 self.probe_interval = 5000
145 self.info_level = vlog.info
147 self.state = Reconnect.Void
148 self.state_entered = now
150 self.last_received = now
151 self.last_connected = None
152 self.last_disconnected = None
153 self.max_tries = None
155 self.creation_time = now
156 self.n_attempted_connections = 0
157 self.n_successful_connections = 0
158 self.total_connected_duration = 0
161 def set_quiet(self, quiet):
162 """If 'quiet' is true, this object will log informational messages at
163 debug level, by default keeping them out of log files. This is
164 appropriate if the connection is one that is expected to be
165 short-lived, so that the log messages are merely distracting.
167 If 'quiet' is false, this object logs informational messages at info
168 level. This is the default.
170 This setting has no effect on the log level of debugging, warning, or
173 self.info_level = vlog.dbg
175 self.info_level = vlog.info
180 def set_name(self, name):
181 """Sets this object's name to 'name'. If 'name' is None, then "void"
184 The name is used in log messages."""
190 def get_min_backoff(self):
191 """Return the minimum number of milliseconds to back off between
192 consecutive connection attempts. The default is 1000 ms."""
193 return self.min_backoff
195 def get_max_backoff(self):
196 """Return the maximum number of milliseconds to back off between
197 consecutive connection attempts. The default is 8000 ms."""
198 return self.max_backoff
200 def get_probe_interval(self):
201 """Returns the "probe interval" in milliseconds. If this is zero, it
202 disables the connection keepalive feature. If it is nonzero, then if
203 the interval passes while the FSM is connected and without
204 self.received() being called, self.run() returns ovs.reconnect.PROBE.
205 If the interval passes again without self.received() being called,
206 self.run() returns ovs.reconnect.DISCONNECT."""
207 return self.probe_interval
209 def set_max_tries(self, max_tries):
210 """Limits the maximum number of times that this object will ask the
211 client to try to reconnect to 'max_tries'. None (the default) means an
212 unlimited number of tries.
214 After the number of tries has expired, the FSM will disable itself
215 instead of backing off and retrying."""
216 self.max_tries = max_tries
218 def get_max_tries(self):
219 """Returns the current remaining number of connection attempts,
220 None if the number is unlimited."""
221 return self.max_tries
223 def set_backoff(self, min_backoff, max_backoff):
224 """Configures the backoff parameters for this FSM. 'min_backoff' is
225 the minimum number of milliseconds, and 'max_backoff' is the maximum,
226 between connection attempts.
228 'min_backoff' must be at least 1000, and 'max_backoff' must be greater
229 than or equal to 'min_backoff'."""
230 self.min_backoff = max(min_backoff, 1000)
232 self.max_backoff = max(max_backoff, 1000)
234 self.max_backoff = 8000
235 if self.min_backoff > self.max_backoff:
236 self.max_backoff = self.min_backoff
238 if (self.state == Reconnect.Backoff and
239 self.backoff > self.max_backoff):
240 self.backoff = self.max_backoff
242 def set_probe_interval(self, probe_interval):
243 """Sets the "probe interval" to 'probe_interval', in milliseconds. If
244 this is zero, it disables the connection keepalive feature. If it is
245 nonzero, then if the interval passes while this FSM is connected and
246 without self.received() being called, self.run() returns
247 ovs.reconnect.PROBE. If the interval passes again without
248 self.received() being called, self.run() returns
249 ovs.reconnect.DISCONNECT.
251 If 'probe_interval' is nonzero, then it will be forced to a value of at
254 self.probe_interval = max(1000, probe_interval)
256 self.probe_interval = 0
258 def is_passive(self):
259 """Returns true if 'fsm' is in passive mode, false if 'fsm' is in
260 active mode (the default)."""
263 def set_passive(self, passive, now):
264 """Configures this FSM for active or passive mode. In active mode (the
265 default), the FSM is attempting to connect to a remote host. In
266 passive mode, the FSM is listening for connections from a remote
268 if self.passive != passive:
269 self.passive = passive
271 if ((passive and self.state in (Reconnect.ConnectInProgress,
272 Reconnect.Reconnect)) or
273 (not passive and self.state == Reconnect.Listening
274 and self.__may_retry())):
275 self._transition(now, Reconnect.Backoff)
278 def is_enabled(self):
279 """Returns true if this FSM has been enabled with self.enable().
280 Calling another function that indicates a change in connection state,
281 such as self.disconnected() or self.force_reconnect(), will also enable
283 return self.state != Reconnect.Void
285 def enable(self, now):
286 """If this FSM is disabled (the default for newly created FSMs),
287 enables it, so that the next call to reconnect_run() for 'fsm' will
288 return ovs.reconnect.CONNECT.
290 If this FSM is not disabled, this function has no effect."""
291 if self.state == Reconnect.Void and self.__may_retry():
292 self._transition(now, Reconnect.Backoff)
295 def disable(self, now):
296 """Disables this FSM. Until 'fsm' is enabled again, self.run() will
298 if self.state != Reconnect.Void:
299 self._transition(now, Reconnect.Void)
301 def force_reconnect(self, now):
302 """If this FSM is enabled and currently connected (or attempting to
303 connect), forces self.run() to return ovs.reconnect.DISCONNECT the next
304 time it is called, which should cause the client to drop the connection
305 (or attempt), back off, and then reconnect."""
306 if self.state in (Reconnect.ConnectInProgress,
309 self._transition(now, Reconnect.Reconnect)
311 def disconnected(self, now, error):
312 """Tell this FSM that the connection dropped or that a connection
313 attempt failed. 'error' specifies the reason: a positive value
314 represents an errno value, EOF indicates that the connection was closed
315 by the peer (e.g. read() returned 0), and 0 indicates no specific
318 The FSM will back off, then reconnect."""
319 if self.state not in (Reconnect.Backoff, Reconnect.Void):
320 # Report what happened
321 if self.state in (Reconnect.Active, Reconnect.Idle):
323 vlog.warn("%s: connection dropped (%s)"
324 % (self.name, os.strerror(error)))
326 self.info_level("%s: connection closed by peer"
329 self.info_level("%s: connection dropped" % self.name)
330 elif self.state == Reconnect.Listening:
332 vlog.warn("%s: error listening for connections (%s)"
333 % (self.name, os.strerror(error)))
335 self.info_level("%s: error listening for connections"
343 vlog.warn("%s: %s attempt failed (%s)"
344 % (self.name, type_, os.strerror(error)))
346 self.info_level("%s: %s attempt timed out"
347 % (self.name, type_))
349 if (self.state in (Reconnect.Active, Reconnect.Idle)):
350 self.last_disconnected = now
353 if (self.state in (Reconnect.Active, Reconnect.Idle) and
354 (self.last_received - self.last_connected >= self.backoff or
359 self.backoff = self.min_backoff
361 if self.backoff < self.min_backoff:
362 self.backoff = self.min_backoff
363 elif self.backoff >= self.max_backoff / 2:
364 self.backoff = self.max_backoff
369 self.info_level("%s: waiting %.3g seconds before trying "
371 % (self.name, self.backoff / 1000.0))
373 self.info_level("%s: waiting %.3g seconds before reconnect"
374 % (self.name, self.backoff / 1000.0))
376 if self.__may_retry():
377 self._transition(now, Reconnect.Backoff)
379 self._transition(now, Reconnect.Void)
381 def connecting(self, now):
382 """Tell this FSM that a connection or listening attempt is in progress.
384 The FSM will start a timer, after which the connection or listening
385 attempt will be aborted (by returning ovs.reconnect.DISCONNECT from
387 if self.state != Reconnect.ConnectInProgress:
389 self.info_level("%s: listening..." % self.name)
391 self.info_level("%s: connecting..." % self.name)
392 self._transition(now, Reconnect.ConnectInProgress)
394 def listening(self, now):
395 """Tell this FSM that the client is listening for connection attempts.
396 This state last indefinitely until the client reports some change.
398 The natural progression from this state is for the client to report
399 that a connection has been accepted or is in progress of being
400 accepted, by calling self.connecting() or self.connected().
402 The client may also report that listening failed (e.g. accept()
403 returned an unexpected error such as ENOMEM) by calling
404 self.listen_error(), in which case the FSM will back off and eventually
405 return ovs.reconnect.CONNECT from self.run() to tell the client to try
407 if self.state != Reconnect.Listening:
408 self.info_level("%s: listening..." % self.name)
409 self._transition(now, Reconnect.Listening)
411 def listen_error(self, now, error):
412 """Tell this FSM that the client's attempt to accept a connection
413 failed (e.g. accept() returned an unexpected error such as ENOMEM).
415 If the FSM is currently listening (self.listening() was called), it
416 will back off and eventually return ovs.reconnect.CONNECT from
417 self.run() to tell the client to try listening again. If there is an
418 active connection, this will be delayed until that connection drops."""
419 if self.state == Reconnect.Listening:
420 self.disconnected(now, error)
422 def connected(self, now):
423 """Tell this FSM that the connection was successful.
425 The FSM will start the probe interval timer, which is reset by
426 self.received(). If the timer expires, a probe will be sent (by
427 returning ovs.reconnect.PROBE from self.run(). If the timer expires
428 again without being reset, the connection will be aborted (by returning
429 ovs.reconnect.DISCONNECT from self.run()."""
430 if not self.state.is_connected:
433 self.info_level("%s: connected" % self.name)
434 self._transition(now, Reconnect.Active)
435 self.last_connected = now
437 def connect_failed(self, now, error):
438 """Tell this FSM that the connection attempt failed.
440 The FSM will back off and attempt to reconnect."""
442 self.disconnected(now, error)
444 def received(self, now):
445 """Tell this FSM that some data was received. This resets the probe
446 interval timer, so that the connection is known not to be idle."""
447 if self.state != Reconnect.Active:
448 self._transition(now, Reconnect.Active)
449 self.last_received = now
451 def _transition(self, now, state):
452 if self.state == Reconnect.ConnectInProgress:
453 self.n_attempted_connections += 1
454 if state == Reconnect.Active:
455 self.n_successful_connections += 1
457 connected_before = self.state.is_connected
458 connected_now = state.is_connected
459 if connected_before != connected_now:
461 self.total_connected_duration += now - self.last_connected
464 vlog.dbg("%s: entering %s" % (self.name, state.name))
466 self.state_entered = now
469 """Assesses whether any action should be taken on this FSM. The return
472 - None: The client need not take any action.
474 - Active client, ovs.reconnect.CONNECT: The client should start a
475 connection attempt and indicate this by calling
476 self.connecting(). If the connection attempt has definitely
477 succeeded, it should call self.connected(). If the connection
478 attempt has definitely failed, it should call
479 self.connect_failed().
481 The FSM is smart enough to back off correctly after successful
482 connections that quickly abort, so it is OK to call
483 self.connected() after a low-level successful connection
484 (e.g. connect()) even if the connection might soon abort due to a
485 failure at a high-level (e.g. SSL negotiation failure).
487 - Passive client, ovs.reconnect.CONNECT: The client should try to
488 listen for a connection, if it is not already listening. It
489 should call self.listening() if successful, otherwise
490 self.connecting() or reconnected_connect_failed() if the attempt
491 is in progress or definitely failed, respectively.
493 A listening passive client should constantly attempt to accept a
494 new connection and report an accepted connection with
497 - ovs.reconnect.DISCONNECT: The client should abort the current
498 connection or connection attempt or listen attempt and call
499 self.disconnected() or self.connect_failed() to indicate it.
501 - ovs.reconnect.PROBE: The client should send some kind of request
502 to the peer that will elicit a response, to ensure that the
503 connection is indeed in working order. (This will only be
504 returned if the "probe interval" is nonzero--see
505 self.set_probe_interval())."""
506 if now >= self.state.deadline(self):
507 return self.state.run(self, now)
511 def wait(self, poller, now):
512 """Causes the next call to poller.block() to wake up when self.run()
514 timeout = self.timeout(now)
516 poller.timer_wait(timeout)
518 def timeout(self, now):
519 """Returns the number of milliseconds after which self.run() should be
520 called if nothing else notable happens in the meantime, or None if this
521 is currently unnecessary."""
522 deadline = self.state.deadline(self)
523 if deadline is not None:
524 remaining = deadline - now
525 return max(0, remaining)
529 def is_connected(self):
530 """Returns True if this FSM is currently believed to be connected, that
531 is, if self.connected() was called more recently than any call to
532 self.connect_failed() or self.disconnected() or self.disable(), and
534 return self.state.is_connected
536 def get_last_connect_elapsed(self, now):
537 """Returns the number of milliseconds since 'fsm' was last connected
538 to its peer. Returns None if never connected."""
539 if self.last_connected:
540 return now - self.last_connected
544 def get_last_disconnect_elapsed(self, now):
545 """Returns the number of milliseconds since 'fsm' was last disconnected
546 from its peer. Returns None if never disconnected."""
547 if self.last_disconnected:
548 return now - self.last_disconnected
552 def get_stats(self, now):
556 stats.creation_time = self.creation_time
557 stats.last_connected = self.last_connected
558 stats.last_disconnected = self.last_disconnected
559 stats.last_received = self.last_received
560 stats.backoff = self.backoff
561 stats.seqno = self.seqno
562 stats.is_connected = self.is_connected()
563 stats.msec_since_connect = self.get_last_connect_elapsed(now)
564 stats.msec_since_disconnect = self.get_last_disconnect_elapsed(now)
565 stats.total_connected_duration = self.total_connected_duration
566 if self.is_connected():
567 stats.total_connected_duration += (
568 self.get_last_connect_elapsed(now))
569 stats.n_attempted_connections = self.n_attempted_connections
570 stats.n_successful_connections = self.n_successful_connections
571 stats.state = self.state.name
572 stats.state_elapsed = now - self.state_entered
575 def __may_retry(self):
576 if self.max_tries is None:
578 elif self.max_tries > 0: