from ovs.db import error
import ovs.db.types
+
class ConstraintViolation(error.Error):
def __init__(self, msg, json=None):
error.Error.__init__(self, msg, json, tag="constraint violation")
+
def escapeCString(src):
dst = []
for c in src:
dst.append(c)
return ''.join(dst)
+
def returnUnchanged(x):
return x
+
class Atom(object):
def __init__(self, type_, value=None):
self.type = type_
type_ = base.type
json = ovs.db.parser.float_to_int(json)
if ((type_ == ovs.db.types.IntegerType and type(json) in [int, long])
- or (type_ == ovs.db.types.RealType and type(json) in [int, long, float])
+ or (type_ == ovs.db.types.RealType
+ and type(json) in [int, long, float])
or (type_ == ovs.db.types.BooleanType and type(json) == bool)
- or (type_ == ovs.db.types.StringType and type(json) in [str, unicode])):
+ or (type_ == ovs.db.types.StringType
+ and type(json) in [str, unicode])):
atom = Atom(type_, json)
elif type_ == ovs.db.types.UuidType:
atom = Atom(type_, ovs.ovsuuid.from_json(json, symtab))
raise ConstraintViolation(
'"%s" length %d is greater than maximum allowed '
'length %d' % (s, length, base.max_length))
-
+
def to_json(self):
if self.type == ovs.db.types.UuidType:
return ovs.ovsuuid.to_json(self.value)
return self.value.value
__need_quotes_re = re.compile("$|true|false|[^_a-zA-Z]|.*[^-._a-zA-Z]")
+
@staticmethod
def __string_needs_quotes(s):
return Atom.__need_quotes_re.match(s)
raise TypeError
return Atom(t, x)
+
class Datum(object):
def __init__(self, type_, values={}):
self.type = type_
"""Parses 'json' as a datum of the type described by 'type'. If
successful, returns a new datum. On failure, raises an
ovs.db.error.Error.
-
+
Violations of constraints expressed by 'type' are treated as errors.
-
+
If 'symtab' is nonnull, then named UUIDs in 'symtab' are accepted.
Refer to ovsdb/SPECS for information about this, and for the syntax
that this function accepts."""
return [[k.value, v.value] for k, v in self.values.iteritems()]
else:
return [k.value for k in self.values.iterkeys()]
-
+
def as_dict(self):
return dict(self.values)
return self.values[key].value
else:
return default
-
+
def __str__(self):
return self.to_string()
for i, key in enumerate(sorted(self.values)):
s += key.cInitAtom("%s->keys[%d]" % (var, i))
-
+
if self.type.value:
s += ["%s->values = xmalloc(%d * sizeof *%s->values);"
% (var, len(self.values), var)]
import ovs.json
+
class Error(Exception):
def __init__(self, msg, json=None, tag=None):
self.msg = msg
import ovs.ovsuuid
import ovs.poller
+__pychecker__ = 'no-classattr no-objattrs'
+
+
class Idl:
"""Open vSwitch Database Interface Definition Language (OVSDB IDL).
# Database locking.
self.lock_name = None # Name of lock we need, None if none.
self.has_lock = False # Has db server said we have the lock?
- self.is_lock_contended = False # Has db server said we can't get lock?
+ self.is_lock_contended = False # Has db server said we can't get lock?
self._lock_request_id = None # JSON-RPC ID of in-flight lock request.
# Transaction support.
if txn:
txn._process_reply(msg)
+
def _uuid_to_row(atom, base):
if base.ref_table:
return base.ref_table.rows.get(atom)
else:
return atom
+
def _row_to_uuid(value):
if type(value) == Row:
return value.uuid
else:
return value
+
class Row(object):
"""A row within an IDL.
self.__dict__["_changes"] = None
del self._table.rows[self.uuid]
+
def _uuid_name_from_uuid(uuid):
return "row%s" % str(uuid).replace("-", "_")
+
def _where_uuid_equals(uuid):
return [["_uuid", "==", ["uuid", str(uuid)]]]
+
class _InsertedRow(object):
def __init__(self, op_index):
self.op_index = op_index
self.real = None
+
class Transaction(object):
# Status values that Transaction.commit() can return.
- UNCOMMITTED = "uncommitted" # Not yet committed or aborted.
- UNCHANGED = "unchanged" # Transaction didn't include any changes.
- INCOMPLETE = "incomplete" # Commit in progress, please wait.
- ABORTED = "aborted" # ovsdb_idl_txn_abort() called.
- SUCCESS = "success" # Commit successful.
- TRY_AGAIN = "try again" # Commit failed because a "verify" operation
- # reported an inconsistency, due to a network
- # problem, or other transient failure.
- NOT_LOCKED = "not locked" # Server hasn't given us the lock yet.
- ERROR = "error" # Commit failed due to a hard error.
+ UNCOMMITTED = "uncommitted" # Not yet committed or aborted.
+ UNCHANGED = "unchanged" # Transaction didn't include any changes.
+ INCOMPLETE = "incomplete" # Commit in progress, please wait.
+ ABORTED = "aborted" # ovsdb_idl_txn_abort() called.
+ SUCCESS = "success" # Commit successful.
+ TRY_AGAIN = "try again" # Commit failed because a "verify" operation
+ # reported an inconsistency, due to a network
+ # problem, or other transient failure.
+ NOT_LOCKED = "not locked" # Server hasn't given us the lock yet.
+ ERROR = "error" # Commit failed due to a hard error.
@staticmethod
def status_to_string(status):
self._inc_column = None
self._inc_where = None
- self._inserted_rows = {} # Map from UUID to _InsertedRow
+ self._inserted_rows = {} # Map from UUID to _InsertedRow
def add_comment(self, comment):
"""Appens 'comment' to the comments that will be passed to the OVSDB
for column_name, datum in row._changes.iteritems():
if row._data is not None or not datum.is_default():
- row_json[column_name] = self._substitute_uuids(datum.to_json())
+ row_json[column_name] = (
+ self._substitute_uuids(datum.to_json()))
# If anything really changed, consider it an update.
# We can't suppress not-really-changed values earlier
operations.append({"op": "mutate",
"table": self._inc_table,
- "where": self._substitute_uuids(self._inc_where),
+ "where": self._substitute_uuids(
+ self._inc_where),
"mutations": [[self._inc_column, "+=", 1]]})
operations.append({"op": "select",
"table": self._inc_table,
- "where": self._substitute_uuids(self._inc_where),
+ "where": self._substitute_uuids(
+ self._inc_where),
"columns": [self._inc_column]})
# Add comment.
from ovs.db import error
+
class Parser(object):
def __init__(self, json, name):
self.name = name
self.__raise_error("Type mismatch for member '%s'." % name)
return member
else:
- if not optional:
+ if not optional:
self.__raise_error("Required '%s' member is missing." % name)
return default
present = "is"
self.__raise_error("Member '%s' %s present but not allowed here" %
(name, present))
-
+
+
def float_to_int(x):
# XXX still needed?
if type(x) == float:
integer = int(x)
- if integer == x and -2**53 <= integer < 2**53:
+ if integer == x and -2 ** 53 <= integer < 2 ** 53:
return integer
return x
+
id_re = re.compile("[_a-zA-Z][_a-zA-Z0-9]*$")
+
+
def is_identifier(s):
return type(s) in [str, unicode] and id_re.match(s)
+
def json_type_to_string(type_):
if type_ == None:
return "null"
else:
return "<invalid>"
+
def unwrap_json(json, name, types, desc):
if (type(json) not in (list, tuple) or len(json) != 2 or json[0] != name or
type(json[1]) not in types):
raise error.Error('expected ["%s", <%s>]' % (name, desc), json)
return json[1]
+
def parse_json_pair(json):
if type(json) != list or len(json) != 2:
raise error.Error("expected 2-element array", json)
return json
-
import ovs.db.parser
from ovs.db import types
+
def _check_id(name, json):
if name.startswith('_'):
raise error.Error('names beginning with "_" are reserved', json)
elif not ovs.db.parser.is_identifier(name):
raise error.Error("name must be a valid id", json)
+
class DbSchema(object):
"""Schema for an OVSDB database."""
# error.
column.persistent = True
+
class IdlSchema(DbSchema):
def __init__(self, name, version, tables, idlPrefix, idlHeader):
DbSchema.__init__(self, name, version, tables)
return IdlSchema(schema.name, schema.version, schema.tables,
idlPrefix, idlHeader)
+
def column_set_from_json(json, columns):
if json is None:
return tuple(columns)
raise error.Error("array of distinct column names expected", json)
return tuple([columns[column_name] for column_name in json])
+
class TableSchema(object):
def __init__(self, name, columns, mutable=True, max_rows=sys.maxint,
is_root=True, indexes=[]):
return json
+
class ColumnSchema(object):
def __init__(self, name, mutable, persistent, type_):
self.name = name
if not self.persistent:
json["ephemeral"] = True
return json
-
import ovs.db.data
import ovs.ovsuuid
+
class AtomicType(object):
def __init__(self, name, default, python_types):
self.name = name
ATOMIC_TYPES = [VoidType, IntegerType, RealType, BooleanType, StringType,
UuidType]
+
def escapeCString(src):
dst = ""
for c in src:
dst += c
return dst
+
def commafy(x):
"""Returns integer x formatted in decimal with thousands set off by
commas."""
return _commafy("%d" % x)
+
+
def _commafy(s):
if s.startswith('-'):
return '-' + _commafy(s[1:])
else:
return _commafy(s[:-3]) + ',' + _commafy(s[-3:])
+
def returnUnchanged(x):
return x
+
class BaseType(object):
def __init__(self, type_, enum=None, min=None, max=None,
- min_length = 0, max_length=sys.maxint, ref_table_name=None):
+ min_length=0, max_length=sys.maxint, ref_table_name=None):
assert isinstance(type_, AtomicType)
self.type = type_
self.enum = enum
if value is None:
value = default
else:
- max_value = 2**32 - 1
+ max_value = 2 ** 32 - 1
if not (0 <= value <= max_value):
raise error.Error("%s out of valid range 0 to %d"
% (name, max_value), value)
enum = parser.get_optional("enum", [])
if enum is not None:
- base.enum = ovs.db.data.Datum.from_json(BaseType.get_enum_type(base.type), enum)
+ base.enum = ovs.db.data.Datum.from_json(
+ BaseType.get_enum_type(base.type), enum)
elif base.type == IntegerType:
base.min = parser.get_optional("minInteger", [int, long])
base.max = parser.get_optional("maxInteger", [int, long])
- if base.min is not None and base.max is not None and base.min > base.max:
+ if (base.min is not None and base.max is not None
+ and base.min > base.max):
raise error.Error("minInteger exceeds maxInteger", json)
elif base.type == RealType:
base.min = parser.get_optional("minReal", [int, long, float])
base.max = parser.get_optional("maxReal", [int, long, float])
- if base.min is not None and base.max is not None and base.min > base.max:
+ if (base.min is not None and base.max is not None
+ and base.min > base.max):
raise error.Error("minReal exceeds maxReal", json)
elif base.type == StringType:
base.min_length = BaseType.__parse_uint(parser, "minLength", 0)
return False
def has_constraints(self):
- return (self.enum is not None or self.min is not None or self.max is not None or
+ return (self.enum is not None or self.min is not None or
+ self.max is not None or
self.min_length != 0 or self.max_length != sys.maxint or
self.ref_table_name is not None)
"""Returns the type of the 'enum' member for a BaseType whose
'type' is 'atomic_type'."""
return Type(BaseType(atomic_type), None, 1, sys.maxint)
-
+
def is_ref(self):
return self.type == UuidType and self.ref_table_name is not None
literals = [value.toEnglish(escapeLiteral)
for value in self.enum.values]
if len(literals) == 2:
- return 'either %s or %s' % (literals[0], literals[1])
+ english = 'either %s or %s' % (literals[0], literals[1])
else:
- return 'one of %s, %s, or %s' % (literals[0],
- ', '.join(literals[1:-1]),
- literals[-1])
+ english = 'one of %s, %s, or %s' % (literals[0],
+ ', '.join(literals[1:-1]),
+ literals[-1])
elif self.min is not None and self.max is not None:
if self.type == IntegerType:
- return 'in range %s to %s' % (commafy(self.min),
- commafy(self.max))
+ english = 'in range %s to %s' % (commafy(self.min),
+ commafy(self.max))
else:
- return 'in range %g to %g' % (self.min, self.max)
+ english = 'in range %g to %g' % (self.min, self.max)
elif self.min is not None:
if self.type == IntegerType:
- return 'at least %s' % commafy(self.min)
+ english = 'at least %s' % commafy(self.min)
else:
- return 'at least %g' % self.min
+ english = 'at least %g' % self.min
elif self.max is not None:
if self.type == IntegerType:
- return 'at most %s' % commafy(self.max)
+ english = 'at most %s' % commafy(self.max)
else:
- return 'at most %g' % self.max
+ english = 'at most %g' % self.max
elif self.min_length != 0 and self.max_length != sys.maxint:
if self.min_length == self.max_length:
- return 'exactly %d characters long' % (self.min_length)
+ english = 'exactly %d characters long' % (self.min_length)
else:
- return 'between %d and %d characters long' % (self.min_length, self.max_length)
+ english = ('between %d and %d characters long'
+ % (self.min_length, self.max_length))
elif self.min_length != 0:
return 'at least %d characters long' % self.min_length
elif self.max_length != sys.maxint:
- return 'at most %d characters long' % self.max_length
+ english = 'at most %d characters long' % self.max_length
else:
- return ''
+ english = ''
+
+ return english
def toCType(self, prefix):
if self.ref_table_name:
BooleanType: '%s = false;',
StringType: '%s = NULL;'}[self.type]
return pattern % var
-
+
def cInitBaseType(self, indent, var):
stmts = []
stmts.append('ovsdb_base_type_init(&%s, %s);' % (
stmts += self.enum.cInitDatum("%s.enum_" % var)
if self.type == IntegerType:
if self.min is not None:
- stmts.append('%s.u.integer.min = INT64_C(%d);' % (var, self.min))
+ stmts.append('%s.u.integer.min = INT64_C(%d);'
+ % (var, self.min))
if self.max is not None:
- stmts.append('%s.u.integer.max = INT64_C(%d);' % (var, self.max))
+ stmts.append('%s.u.integer.max = INT64_C(%d);'
+ % (var, self.max))
elif self.type == RealType:
if self.min is not None:
stmts.append('%s.u.real.min = %d;' % (var, self.min))
stmts.append('%s.u.real.max = %d;' % (var, self.max))
elif self.type == StringType:
if self.min_length is not None:
- stmts.append('%s.u.string.minLen = %d;' % (var, self.min_length))
+ stmts.append('%s.u.string.minLen = %d;'
+ % (var, self.min_length))
if self.max_length != sys.maxint:
- stmts.append('%s.u.string.maxLen = %d;' % (var, self.max_length))
+ stmts.append('%s.u.string.maxLen = %d;'
+ % (var, self.max_length))
elif self.type == UuidType:
if self.ref_table_name is not None:
- stmts.append('%s.u.uuid.refTableName = "%s";' % (var, escapeCString(self.ref_table_name)))
- stmts.append('%s.u.uuid.refType = OVSDB_REF_%s;' % (var, self.ref_type.upper()))
+ stmts.append('%s.u.uuid.refTableName = "%s";'
+ % (var, escapeCString(self.ref_table_name)))
+ stmts.append('%s.u.uuid.refType = OVSDB_REF_%s;'
+ % (var, self.ref_type.upper()))
return '\n'.join([indent + stmt for stmt in stmts])
+
class Type(object):
DEFAULT_MIN = 1
DEFAULT_MAX = 1
return json
else:
raise error.Error("bad min or max value", json)
-
+
@staticmethod
def from_json(json):
if type(json) in [str, unicode]:
constraints.append('value %s' % valueConstraints)
return ', '.join(constraints)
-
+
def cDeclComment(self):
if self.n_min == 1 and self.n_max == 1 and self.key.type == StringType:
return "\t/* Always nonnull. */"
n_max = self.n_max
initMax = "%s%s.n_max = %s;" % (indent, var, n_max)
return "\n".join((initKey, initValue, initMin, initMax))
-
_hooks = []
+
def add_hook(hook, cancel, run_at_exit):
_init()
_hooks.append((hook, cancel, run_at_exit))
+
def fork():
"""Clears all of the fatal signal hooks without executing them. If any of
the hooks passed a 'cancel' function to add_hook(), then those functions
_added_hook = False
_files = {}
+
def add_file_to_unlink(file):
"""Registers 'file' to be unlinked when the program terminates via
sys.exit() or a fatal signal."""
add_hook(_unlink_files, _cancel_files, True)
_files[file] = None
+
def remove_file_to_unlink(file):
"""Unregisters 'file' from being unlinked when the program terminates via
sys.exit() or a fatal signal."""
if file in _files:
del _files[file]
+
def unlink_file_now(file):
"""Like fatal_signal_remove_file_to_unlink(), but also unlinks 'file'.
Returns 0 if successful, otherwise a positive errno value."""
remove_file_to_unlink(file)
return error
+
def _unlink_files():
for file_ in _files:
_unlink(file_)
+
def _cancel_files():
global _added_hook
global _files
_added_hook = False
_files = {}
+
def _unlink(file_):
try:
os.unlink(file_)
return 0
except OSError, e:
return e.errno
+
\f
def _signal_handler(signr, _):
_call_hooks(signr)
signal.signal(signr, signal.SIG_DFL)
os.kill(os.getpid(), signr)
+
def _atexit_handler():
_call_hooks(0)
+
recurse = False
+
+
def _call_hooks(signr):
global recurse
if recurse:
if signr != 0 or run_at_exit:
hook()
+
_inited = False
+
+
def _init():
global _inited
if not _inited:
import StringIO
import sys
+__pychecker__ = 'no-stringiter'
+
escapes = {ord('"'): u"\\\"",
ord("\\"): u"\\\\",
ord("\b"): u"\\b",
def to_stream(obj, stream, pretty=False, sort_keys=True):
_Serializer(stream, pretty, sort_keys).serialize(obj)
+
def to_file(obj, name, pretty=False, sort_keys=True):
stream = open(name, "w")
try:
finally:
stream.close()
+
def to_string(obj, pretty=False, sort_keys=True):
output = StringIO.StringIO()
to_stream(obj, output, pretty, sort_keys)
output.close()
return s
+
def from_stream(stream):
p = Parser(check_trailer=True)
while True:
break
return p.finish()
+
def from_file(name):
stream = open(name, "r")
try:
finally:
stream.close()
+
def from_string(s):
try:
s = unicode(s, 'utf-8')
p.feed(s)
return p.finish()
+
class Parser(object):
## Maximum height of parsing stack. ##
MAX_HEIGHT = 1000
self.line_number = 0
self.column_number = 0
self.byte_number = 0
-
+
# Parsing.
self.parse_state = Parser.__parse_start
self.stack = []
def __lex_start_space(self, c):
pass
+
def __lex_start_alpha(self, c):
self.buffer = c
self.lex_state = Parser.__lex_keyword
+
def __lex_start_token(self, c):
self.__parser_input(c)
+
def __lex_start_number(self, c):
self.buffer = c
self.lex_state = Parser.__lex_number
+
def __lex_start_string(self, _):
self.lex_state = Parser.__lex_string
+
def __lex_start_error(self, c):
if ord(c) >= 32 and ord(c) < 128:
self.__error("invalid character '%s'" % c)
for c in "-0123456789":
__lex_start_actions[c] = __lex_start_number
__lex_start_actions['"'] = __lex_start_string
+
def __lex_start(self, c):
Parser.__lex_start_actions.get(
c, Parser.__lex_start_error)(self, c)
__lex_alpha = {}
for c in "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ":
__lex_alpha[c] = True
+
def __lex_finish_keyword(self):
if self.buffer == "false":
self.__parser_input(False)
self.__parser_input(None)
else:
self.__error("invalid keyword '%s'" % self.buffer)
+
def __lex_keyword(self, c):
if c in Parser.__lex_alpha:
self.buffer += c
self.__lex_finish_keyword()
return False
- __number_re = re.compile("(-)?(0|[1-9][0-9]*)(?:\.([0-9]+))?(?:[eE]([-+]?[0-9]+))?$")
+ __number_re = re.compile("(-)?(0|[1-9][0-9]*)"
+ "(?:\.([0-9]+))?(?:[eE]([-+]?[0-9]+))?$")
+
def __lex_finish_number(self):
s = self.buffer
m = Parser.__number_re.match(s)
if m:
- sign, integer, fraction, exp = m.groups()
+ sign, integer, fraction, exp = m.groups()
if (exp is not None and
(long(exp) > sys.maxint or long(exp) < -sys.maxint - 1)):
self.__error("exponent outside valid range")
if significand == 0:
self.__parser_input(0)
return
- elif significand <= 2**63:
- while pow10 > 0 and significand <= 2*63:
+ elif significand <= 2 ** 63:
+ while pow10 > 0 and significand <= 2 * 63:
significand *= 10
pow10 -= 1
while pow10 < 0 and significand % 10 == 0:
significand /= 10
pow10 += 1
if (pow10 == 0 and
- ((not sign and significand < 2**63) or
- (sign and significand <= 2**63))):
+ ((not sign and significand < 2 ** 63) or
+ (sign and significand <= 2 ** 63))):
if sign:
self.__parser_input(-significand)
else:
self.__error("exponent must contain at least one digit")
else:
self.__error("syntax error in number")
-
+
def __lex_number(self, c):
if c in ".0123456789eE-+":
self.buffer += c
return False
__4hex_re = re.compile("[0-9a-fA-F]{4}")
+
def __lex_4hex(self, s):
if len(s) < 4:
self.__error("quoted string ends within \\u escape")
self.__error("null bytes not supported in quoted strings")
else:
return int(s, 16)
+
@staticmethod
def __is_leading_surrogate(c):
"""Returns true if 'c' is a Unicode code point for a leading
surrogate."""
return c >= 0xd800 and c <= 0xdbff
+
@staticmethod
def __is_trailing_surrogate(c):
"""Returns true if 'c' is a Unicode code point for a trailing
surrogate."""
return c >= 0xdc00 and c <= 0xdfff
+
@staticmethod
def __utf16_decode_surrogate_pair(leading, trailing):
"""Returns the unicode code point corresponding to leading surrogate
"n": u"\n",
"r": u"\r",
"t": u"\t"}
+
def __lex_finish_string(self):
inp = self.buffer
out = u""
elif inp[0] != u'u':
self.__error("bad escape \\%s" % inp[0])
return
-
+
c0 = self.__lex_4hex(inp[1:5])
if c0 is None:
return
self.buffer += c
self.lex_state = Parser.__lex_string
return True
+
def __lex_string(self, c):
if c == '\\':
self.buffer += c
self.__push_array()
else:
self.__error("syntax error at beginning of input")
+
def __parse_end(self, unused_token, unused_string):
self.__error("trailing garbage at end of input")
+
def __parse_object_init(self, token, string):
if token == '}':
self.__parser_pop()
else:
self.__parse_object_name(token, string)
+
def __parse_object_name(self, token, string):
if token == 'string':
self.member_name = string
self.parse_state = Parser.__parse_object_colon
else:
self.__error("syntax error parsing object expecting string")
+
def __parse_object_colon(self, token, unused_string):
if token == ":":
self.parse_state = Parser.__parse_object_value
else:
self.__error("syntax error parsing object expecting ':'")
+
def __parse_object_value(self, token, string):
self.__parse_value(token, string, Parser.__parse_object_next)
+
def __parse_object_next(self, token, unused_string):
if token == ",":
self.parse_state = Parser.__parse_object_name
self.__parser_pop()
else:
self.__error("syntax error expecting '}' or ','")
+
def __parse_array_init(self, token, string):
if token == ']':
self.__parser_pop()
else:
self.__parse_array_value(token, string)
+
def __parse_array_value(self, token, string):
self.__parse_value(token, string, Parser.__parse_array_next)
+
def __parse_array_next(self, token, unused_string):
if token == ",":
self.parse_state = Parser.__parse_array_value
self.__parser_pop()
else:
self.__error("syntax error expecting ']' or ','")
+
def __parser_input(self, token, string=None):
self.lex_state = Parser.__lex_start
self.buffer = ""
else:
self.__error("input exceeds maximum nesting depth %d" %
Parser.MAX_HEIGHT)
+
def __push_object(self):
self.__parser_push({}, Parser.__parse_object_init)
+
def __push_array(self):
self.__parser_push([], Parser.__parse_array_init)
EOF = -1
+
class Message(object):
T_REQUEST = 0 # Request.
T_NOTIFY = 1 # Notification.
self.id = id
_next_id = 0
+
@staticmethod
def _create_id():
this_id = Message._next_id
msg_type = Message.T_REQUEST
else:
msg_type = Message.T_NOTIFY
-
+
msg = Message(msg_type, method, params, result, error, id_)
validation_error = msg.is_valid()
if validation_error is not None:
s.append("id=" + ovs.json.to_string(self.id))
return ", ".join(s)
+
class Connection(object):
def __init__(self, stream):
self.name = stream.name
self.wait(poller)
self.recv_wait(poller)
poller.block()
-
+
def transact_block(self, request):
id_ = request.id
self.__log_msg("received", msg)
return msg
-
+
def recv_wait(self, poller):
if self.status or self.input:
poller.immediate_wake()
self.status = error
self.stream.close()
self.output = ""
-
+
+
class Session(object):
"""A JSON-RPC session with reconnection."""
"""Creates and returns a Session that maintains a JSON-RPC session to
'name', which should be a string acceptable to ovs.stream.Stream or
ovs.stream.PassiveStream's initializer.
-
+
If 'name' is an active connection method, e.g. "tcp:127.1.2.3", the new
session connects and reconnects, with back-off, to 'name'.
-
+
If 'name' is a passive connection method, e.g. "ptcp:", the new session
listens for connections to 'name'. It maintains at most one connection
at any given time. Any new connection causes the previous one (if any)
self.stream.close()
self.stream = None
self.seqno += 1
-
+
def __connect(self):
self.__disconnect()
else:
max_tries = self.reconnect.get_max_tries()
return max_tries is None or max_tries > 0
-
+
def is_connected(self):
return self.rpc is not None
uuidRE = re.compile("^xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx$"
.replace('x', '[0-9a-fA-F]'))
+
def zero():
return uuid.UUID(int=0)
import select
import ovs.timeval
+
class Poller(object):
"""High-level wrapper around the "poll" system call.
be select.POLLIN or select.POLLOUT or their bitwise-OR). The following
call to self.block() will wake up when 'fd' becomes ready for one or
more of the requested events.
-
+
The event registration is one-shot: only the following call to
self.block() is affected. The event will need to be re-registered
after self.block() is called if it is to persist.
self.__timer_wait(msec)
def timer_wait_until(self, msec):
- """Causes the following call to self.block() to wake up when the current
- time, as returned by ovs.timeval.msec(), reaches 'msec' or later. If
- 'msec' is earlier than the current time, the following call to
- self.block() will not block at all.
+ """Causes the following call to self.block() to wake up when the
+ current time, as returned by ovs.timeval.msec(), reaches 'msec' or
+ later. If 'msec' is earlier than the current time, the following call
+ to self.block() will not block at all.
The timer registration is one-shot: only the following call to
self.block() is affected. The timer will need to be re-registered
def __reset(self):
self.poll = select.poll()
- self.timeout = -1
-
+ self.timeout = -1
import os
import signal
+
def _signal_status_msg(type_, signr):
s = "%s by signal %d" % (type_, signr)
for name in signal.__dict__:
if name.startswith("SIG") and getattr(signal, name) == signr:
return "%s (%s)" % (s, name)
return s
-
+
+
def status_msg(status):
"""Given 'status', which is a process status in the form reported by
waitpid(2) and returned by process_status(), returns a string describing
EOF = -1
+
class Reconnect(object):
"""A finite-state machine for connecting and reconnecting to a network
resource with exponential backoff. It also provides optional support for
debug level, by default keeping them out of log files. This is
appropriate if the connection is one that is expected to be
short-lived, so that the log messages are merely distracting.
-
+
If 'quiet' is false, this object logs informational messages at info
level. This is the default.
-
+
This setting has no effect on the log level of debugging, warning, or
error messages."""
if quiet:
def set_name(self, name):
"""Sets this object's name to 'name'. If 'name' is None, then "void"
is used instead.
-
+
The name is used in log messages."""
if name is None:
self.name = "void"
if (self.state == Reconnect.Backoff and
self.backoff > self.max_backoff):
self.backoff = self.max_backoff
-
+
def set_probe_interval(self, probe_interval):
"""Sets the "probe interval" to 'probe_interval', in milliseconds. If
this is zero, it disables the connection keepalive feature. If it is
def set_passive(self, passive, now):
"""Configures this FSM for active or passive mode. In active mode (the
default), the FSM is attempting to connect to a remote host. In
- passive mode, the FSM is listening for connections from a remote host."""
+ passive mode, the FSM is listening for connections from a remote
+ host."""
if self.passive != passive:
self.passive = passive
else:
self.info_level("%s: connecting..." % self.name)
self._transition(now, Reconnect.ConnectInProgress)
-
+
def listening(self, now):
"""Tell this FSM that the client is listening for connection attempts.
This state last indefinitely until the client reports some change.
-
+
The natural progression from this state is for the client to report
that a connection has been accepted or is in progress of being
accepted, by calling self.connecting() or self.connected().
-
+
The client may also report that listening failed (e.g. accept()
returned an unexpected error such as ENOMEM) by calling
self.listen_error(), in which case the FSM will back off and eventually
def listen_error(self, now, error):
"""Tell this FSM that the client's attempt to accept a connection
failed (e.g. accept() returned an unexpected error such as ENOMEM).
-
+
If the FSM is currently listening (self.listening() was called), it
will back off and eventually return ovs.reconnect.CONNECT from
self.run() to tell the client to try listening again. If there is an
if connected_before:
self.total_connected_duration += now - self.last_connected
self.seqno += 1
-
+
logging.debug("%s: entering %s" % (self.name, state.name))
self.state = state
self.state_entered = now
def run(self, now):
"""Assesses whether any action should be taken on this FSM. The return
value is one of:
-
+
- None: The client need not take any action.
-
+
- Active client, ovs.reconnect.CONNECT: The client should start a
connection attempt and indicate this by calling
self.connecting(). If the connection attempt has definitely
succeeded, it should call self.connected(). If the connection
attempt has definitely failed, it should call
self.connect_failed().
-
+
The FSM is smart enough to back off correctly after successful
connections that quickly abort, so it is OK to call
self.connected() after a low-level successful connection
(e.g. connect()) even if the connection might soon abort due to a
failure at a high-level (e.g. SSL negotiation failure).
-
+
- Passive client, ovs.reconnect.CONNECT: The client should try to
listen for a connection, if it is not already listening. It
should call self.listening() if successful, otherwise
self.connecting() or reconnected_connect_failed() if the attempt
is in progress or definitely failed, respectively.
-
+
A listening passive client should constantly attempt to accept a
new connection and report an accepted connection with
self.connected().
-
+
- ovs.reconnect.DISCONNECT: The client should abort the current
connection or connection attempt or listen attempt and call
self.disconnected() or self.connect_failed() to indicate it.
-
+
- ovs.reconnect.PROBE: The client should send some kind of request
to the peer that will elicit a response, to ensure that the
connection is indeed in working order. (This will only be
return self.state.run(self, now)
else:
return None
-
+
def wait(self, poller, now):
"""Causes the next call to poller.block() to wake up when self.run()
should be called."""
stats.msec_since_disconnect = self.get_last_disconnect_elapsed(now)
stats.total_connected_duration = self.total_connected_duration
if self.is_connected():
- stats.total_connected_duration += self.get_last_connect_elapsed(now)
+ stats.total_connected_duration += (
+ self.get_last_connect_elapsed(now))
stats.n_attempted_connections = self.n_attempted_connections
stats.n_successful_connections = self.n_successful_connections
stats.state = self.state.name
import ovs.fatal_signal
+
def make_unix_socket(style, nonblock, bind_path, connect_path):
"""Creates a Unix domain socket in the given 'style' (either
socket.SOCK_DGRAM or socket.SOCK_STREAM) that is bound to 'bind_path' (if
ovs.fatal_signal.add_file_to_unlink(bind_path)
return get_exception_errno(e), None
+
def check_connection_completion(sock):
p = select.poll()
p.register(sock, select.POLLOUT)
else:
return errno.EAGAIN
+
def get_socket_error(sock):
"""Returns the errno value associated with 'socket' (0 if no error) and
resets the socket's error status."""
return sock.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR)
+
def get_exception_errno(e):
"""A lot of methods on Python socket objects raise socket.error, but that
exception is documented as having two completely different forms of
else:
return errno.EPROTO
+
null_fd = -1
+
+
def get_null_fd():
"""Returns a readable and writable fd for /dev/null, if successful,
otherwise a negative errno value. The caller must not close the returned
return -e.errno
return null_fd
+
def write_fully(fd, buf):
"""Returns an (error, bytes_written) tuple where 'error' is 0 on success,
otherwise a positive errno value, and 'bytes_written' is the number of
except OSError, e:
return e.errno, bytes_written
+
def set_nonblocking(sock):
try:
sock.setblocking(0)
import ovs.poller
import ovs.socket_util
+
class Stream(object):
"""Bidirectional byte stream. Currently only Unix domain sockets
are implemented."""
stream.connect_wait(poller)
poller.block()
assert error != errno.EINPROGRESS
-
+
if error and stream:
stream.close()
stream = None
def recv(self, n):
"""Tries to receive up to 'n' bytes from this stream. Returns a
(error, string) tuple:
-
+
- If successful, 'error' is zero and 'string' contains between 1
and 'n' bytes of data.
- If the connection has been closed in the normal fashion or if 'n'
is 0, the tuple is (0, "").
-
+
The recv function will not block waiting for data to arrive. If no
data have been received, it returns (errno.EAGAIN, "") immediately."""
def connect_wait(self, poller):
self.wait(poller, Stream.W_CONNECT)
-
+
def recv_wait(self, poller):
self.wait(poller, Stream.W_RECV)
-
+
def send_wait(self, poller):
self.wait(poller, Stream.W_SEND)
-
+
def __del__(self):
# Don't delete the file: we might have forked.
self.socket.close()
+
class PassiveStream(object):
@staticmethod
def is_valid_name(name):
# Don't delete the file: we might have forked.
self.socket.close()
+
def usage(name, active, passive):
print
if active:
print("Active %s connection methods:" % name)
print(" unix:FILE "
- "Unix domain socket named FILE");
+ "Unix domain socket named FILE")
if passive:
print("Passive %s connection methods:" % name)
import time
+
def msec():
"""Returns the current time, as the amount of time since the epoch, in
milliseconds, as a float."""
return time.time() * 1000.0
+
def postfork():
# Just a stub for now
pass
PROGRAM_NAME = os.path.basename(sys.argv[0])
+
def abs_file_name(dir_, file_name):
"""If 'file_name' starts with '/', returns a copy of 'file_name'.
Otherwise, returns an absolute path to 'file_name' considering it relative
# Copyright (c) 2009, 2010 Nicira Networks.
-#
+#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at:
-#
+#
# http://www.apache.org/licenses/LICENSE-2.0
-#
+#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
import ovs.json
+
def print_json(json):
if type(json) in [str, unicode]:
print "error: %s" % json
sys.stdout.write("\n")
return True
+
def parse_multiple(stream):
buf = stream.read(4096)
ok = True
ok = False
return ok
+
def main(argv):
argv0 = argv[0]
if not ok:
sys.exit(1)
+
if __name__ == '__main__':
main(sys.argv)
import ovs.poller
import ovs.util
+
def unbox_json(json):
if type(json) == list and len(json) == 1:
return json[0]
else:
return json
+
def do_default_atoms():
for type_ in types.ATOMIC_TYPES:
if type_ == types.VoidType:
sys.stdout.write("OK\n")
+
def do_default_data():
any_errors = False
for n_min in 0, 1:
if any_errors:
sys.exit(1)
+
def do_parse_atomic_type(type_string):
type_json = unbox_json(ovs.json.from_string(type_string))
atomic_type = types.AtomicType.from_json(type_json)
print ovs.json.to_string(atomic_type.to_json(), sort_keys=True)
+
def do_parse_base_type(type_string):
type_json = unbox_json(ovs.json.from_string(type_string))
base_type = types.BaseType.from_json(type_json)
print ovs.json.to_string(base_type.to_json(), sort_keys=True)
+
def do_parse_type(type_string):
type_json = unbox_json(ovs.json.from_string(type_string))
type_ = types.Type.from_json(type_json)
print ovs.json.to_string(type_.to_json(), sort_keys=True)
+
def do_parse_atoms(type_string, *atom_strings):
type_json = unbox_json(ovs.json.from_string(type_string))
base = types.BaseType.from_json(type_json)
except error.Error, e:
print unicode(e)
+
def do_parse_data(type_string, *data_strings):
type_json = unbox_json(ovs.json.from_string(type_string))
type_ = types.Type.from_json(type_json)
datum = data.Datum.from_json(type_, datum_json)
print ovs.json.to_string(datum.to_json())
+
def do_sort_atoms(type_string, atom_strings):
type_json = unbox_json(ovs.json.from_string(type_string))
base = types.BaseType.from_json(type_json)
print ovs.json.to_string([data.Atom.to_json(atom)
for atom in sorted(atoms)])
+
def do_parse_column(name, column_string):
column_json = unbox_json(ovs.json.from_string(column_string))
column = ovs.db.schema.ColumnSchema.from_json(column_json, name)
print ovs.json.to_string(column.to_json(), sort_keys=True)
+
def do_parse_table(name, table_string, default_is_root_string='false'):
default_is_root = default_is_root_string == 'true'
table_json = unbox_json(ovs.json.from_string(table_string))
table = ovs.db.schema.TableSchema.from_json(table_json, name)
print ovs.json.to_string(table.to_json(default_is_root), sort_keys=True)
+
def do_parse_schema(schema_string):
schema_json = unbox_json(ovs.json.from_string(schema_string))
schema = ovs.db.schema.DbSchema.from_json(schema_json)
print ovs.json.to_string(schema.to_json(), sort_keys=True)
+
def print_idl(idl, step):
simple = idl.tables["simple"].rows
l1 = idl.tables["link1"].rows
print("%03d: empty" % step)
sys.stdout.flush()
+
def substitute_uuids(json, symtab):
if type(json) in [str, unicode]:
symbol = symtab.get(json)
return d
return json
+
def parse_uuids(json, symtab):
if type(json) in [str, unicode] and ovs.ovsuuid.is_valid_string(json):
name = "#%d#" % len(symtab)
for value in json.itervalues():
parse_uuids(value, symtab)
+
def idltest_find_simple(idl, i):
for row in idl.tables["simple"].rows.itervalues():
if row.i == i:
return row
return None
+
def idl_set(idl, commands, step):
txn = ovs.db.idl.Transaction(idl)
increment = False
sys.stdout.write("\n")
sys.stdout.flush()
+
def do_idl(schema_file, remote, *commands):
schema = ovs.db.schema.DbSchema.from_json(ovs.json.from_file(schema_file))
idl = ovs.db.idl.Idl(remote, schema)
idl.wait(poller)
rpc.wait(poller)
poller.block()
-
+
print_idl(idl, step)
step += 1
idl.close()
print("%03d: done" % step)
+
def usage():
print """\
%(program_name)s: test utility for Open vSwitch database Python bindings
""" % {'program_name': ovs.util.PROGRAM_NAME}
sys.exit(0)
+
def main(argv):
try:
options, args = getopt.gnu_getopt(argv[1:], 't:h',
func(*args)
+
if __name__ == '__main__':
try:
main(sys.argv)
# Copyright (c) 2009, 2010 Nicira Networks.
-#
+#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at:
-#
+#
# http://www.apache.org/licenses/LICENSE-2.0
-#
+#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
import ovs.reconnect
-def do_enable(arg):
+now = 0
+r = None
+
+
+def do_enable(_):
r.enable(now)
-def do_disable(arg):
+
+def do_disable(_):
r.disable(now)
-def do_force_reconnect(arg):
+
+def do_force_reconnect(_):
r.force_reconnect(now)
+
def error_from_string(s):
if not s:
return 0
sys.stderr.write("unknown error '%s'\n" % s)
sys.exit(1)
+
def do_disconnected(arg):
r.disconnected(now, error_from_string(arg))
-
-def do_connecting(arg):
+
+
+def do_connecting(_):
r.connecting(now)
+
def do_connect_failed(arg):
r.connect_failed(now, error_from_string(arg))
-def do_connected(arg):
+
+def do_connected(_):
r.connected(now)
-def do_received(arg):
+
+def do_received(_):
r.received(now)
+
def do_run(arg):
global now
if arg is not None:
else:
assert False
+
def do_advance(arg):
global now
now += int(arg)
-def do_timeout(arg):
+
+def do_timeout(_):
global now
timeout = r.timeout(now)
if timeout >= 0:
else:
print " no timeout"
+
def do_set_max_tries(arg):
r.set_max_tries(int(arg))
+
def diff_stats(old, new, delta):
if (old.state != new.state or
old.state_elapsed != new.state_elapsed or
if (old.last_connected != new.last_connected or
(new.msec_since_connect != None and
old.msec_since_connect != new.msec_since_connect - delta) or
- (old.total_connected_duration != new.total_connected_duration - delta and
- not (old.total_connected_duration == 0 and
- new.total_connected_duration == 0))):
+ (old.total_connected_duration != new.total_connected_duration - delta
+ and not (old.total_connected_duration == 0 and
+ new.total_connected_duration == 0))):
print(" last connected %d ms ago, connected %d ms total"
% (new.msec_since_connect, new.total_connected_duration))
print(" disconnected at %d ms (%d ms ago)"
% (new.last_disconnected, new.msec_since_disconnect))
-def do_set_passive(arg):
+
+def do_set_passive(_):
r.set_passive(True, now)
-def do_listening(arg):
+
+def do_listening(_):
r.listening(now)
+
def do_listen_error(arg):
r.listen_error(now, int(arg))
+
def main():
commands = {
"enable": do_enable,
old_time = now
+
if __name__ == '__main__':
main()
-
-