diff --git a/etc/neutron/plugins/openvswitch/ovs_neutron_plugin.ini b/etc/neutron/plugins/openvswitch/ovs_neutron_plugin.ini index 7196b9a9c74..072bd7293b9 100644 --- a/etc/neutron/plugins/openvswitch/ovs_neutron_plugin.ini +++ b/etc/neutron/plugins/openvswitch/ovs_neutron_plugin.ini @@ -40,6 +40,16 @@ # so long as it is set to True. # use_veth_interconnection = False +# (StrOpt) Which OVSDB backend to use, defaults to 'vsctl' +# vsctl - The backend based on executing ovs-vsctl +# native - The backend based on using native OVSDB +# ovsdb_interface = vsctl + +# (StrOpt) The connection string for the native OVSDB backend +# To enable ovsdb-server to listen on port 6640: +# ovs-vsctl set-manager ptcp:6640:127.0.0.1 +# ovsdb_connection = tcp:127.0.0.1:6640 + [agent] # Agent's polling interval in seconds # polling_interval = 2 diff --git a/neutron/agent/linux/ovs_lib.py b/neutron/agent/linux/ovs_lib.py index 531232388f0..8ab1888bc01 100644 --- a/neutron/agent/linux/ovs_lib.py +++ b/neutron/agent/linux/ovs_lib.py @@ -102,7 +102,10 @@ class BaseOVS(object): def add_bridge(self, bridge_name): self.ovsdb.add_br(bridge_name).execute() - return OVSBridge(bridge_name) + br = OVSBridge(bridge_name) + # Don't return until vswitchd sets up the internal port + br.get_port_ofport(bridge_name) + return br def delete_bridge(self, bridge_name): self.ovsdb.del_br(bridge_name).execute() @@ -162,6 +165,8 @@ class OVSBridge(BaseOVS): def create(self): self.ovsdb.add_br(self.br_name).execute() + # Don't return until vswitchd sets up the internal port + self.get_port_ofport(self.br_name) def destroy(self): self.delete_bridge(self.br_name) @@ -191,6 +196,8 @@ class OVSBridge(BaseOVS): if interface_attr_tuples: txn.add(self.ovsdb.db_set('Interface', port_name, *interface_attr_tuples)) + # Don't return until the port has been assigned by vswitchd + self.get_port_ofport(port_name) def delete_port(self, port_name): self.ovsdb.del_port(port_name, self.br_name).execute() diff --git a/neutron/agent/ovsdb/api.py b/neutron/agent/ovsdb/api.py index 10d365acff2..2d6634ae695 100644 --- a/neutron/agent/ovsdb/api.py +++ b/neutron/agent/ovsdb/api.py @@ -20,6 +20,7 @@ import six interface_map = { 'vsctl': 'neutron.agent.ovsdb.impl_vsctl.OvsdbVsctl', + 'native': 'neutron.agent.ovsdb.impl_idl.OvsdbIdl', } OPTS = [ diff --git a/neutron/agent/ovsdb/impl_idl.py b/neutron/agent/ovsdb/impl_idl.py new file mode 100644 index 00000000000..49d9605e608 --- /dev/null +++ b/neutron/agent/ovsdb/impl_idl.py @@ -0,0 +1,195 @@ +# Copyright (c) 2015 Red Hat, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import Queue +import time + +from oslo_config import cfg +from oslo_log import log as logging +from oslo_utils import excutils +from ovs.db import idl + +from neutron.agent.ovsdb import api +from neutron.agent.ovsdb.native import commands as cmd +from neutron.agent.ovsdb.native import connection +from neutron.agent.ovsdb.native import idlutils +from neutron.i18n import _LE + + +OPTS = [ + cfg.StrOpt('ovsdb_connection', + default='tcp:127.0.0.1:6640', + help=_('The connection string for the native OVSDB backend')), +] +cfg.CONF.register_opts(OPTS, 'OVS') +# TODO(twilson) DEFAULT.ovs_vsctl_timeout should be OVS.vsctl_timeout +cfg.CONF.import_opt('ovs_vsctl_timeout', 'neutron.agent.linux.ovs_lib') + +LOG = logging.getLogger(__name__) + + +ovsdb_connection = connection.Connection(cfg.CONF.OVS.ovsdb_connection, + cfg.CONF.ovs_vsctl_timeout) + + +class Transaction(api.Transaction): + def __init__(self, context, api, check_error=False, log_errors=False): + self.context = context + self.api = api + self.check_error = check_error + self.log_errors = log_errors + self.commands = [] + self.results = Queue.Queue(1) + + def add(self, command): + """Add a command to the transaction + + returns The command passed as a convenience + """ + + self.commands.append(command) + return command + + def commit(self): + ovsdb_connection.queue_txn(self) + result = self.results.get() + if isinstance(result, Exception) and self.check_error: + raise result + return result + + def do_commit(self): + start_time = time.time() + attempts = 0 + while True: + elapsed_time = time.time() - start_time + if attempts > 0 and elapsed_time > self.context.vsctl_timeout: + raise RuntimeError("OVS transaction timed out") + attempts += 1 + # TODO(twilson) Make sure we don't loop longer than vsctl_timeout + txn = idl.Transaction(self.api.idl) + for i, command in enumerate(self.commands): + LOG.debug("Running txn command(idx=%(idx)s): %(cmd)s", + {'idx': i, 'cmd': command}) + try: + command.run_idl(txn) + except Exception: + with excutils.save_and_reraise_exception() as ctx: + txn.abort() + if not self.check_error: + ctx.reraise = False + seqno = self.api.idl.change_seqno + status = txn.commit_block() + if status == txn.TRY_AGAIN: + LOG.debug("OVSDB transaction returned TRY_AGAIN, retrying") + if self.api.idl._session.rpc.status != 0: + LOG.debug("Lost connection to OVSDB, reconnecting!") + self.api.idl.force_reconnect() + idlutils.wait_for_change( + self.api.idl, self.context.vsctl_timeout - elapsed_time, + seqno) + continue + elif status == txn.ERROR: + msg = _LE("OVSDB Error: %s") % txn.get_error() + if self.log_errors: + LOG.error(msg) + if self.check_error: + # For now, raise similar error to vsctl/utils.execute() + raise RuntimeError(msg) + return + elif status == txn.ABORTED: + LOG.debug("Transaction aborted") + return + elif status == txn.UNCHANGED: + LOG.debug("Transaction caused no change") + + return [cmd.result for cmd in self.commands] + + +class OvsdbIdl(api.API): + def __init__(self, context): + super(OvsdbIdl, self).__init__(context) + ovsdb_connection.start() + self.idl = ovsdb_connection.idl + + @property + def _tables(self): + return self.idl.tables + + @property + def _ovs(self): + return self._tables['Open_vSwitch'].rows.values()[0] + + def transaction(self, check_error=False, log_errors=True, **kwargs): + return Transaction(self.context, self, check_error, log_errors) + + def add_br(self, name, may_exist=True): + return cmd.AddBridgeCommand(self, name, may_exist) + + def del_br(self, name, if_exists=True): + return cmd.DelBridgeCommand(self, name, if_exists) + + def br_exists(self, name): + return cmd.BridgeExistsCommand(self, name) + + def port_to_br(self, name): + return cmd.PortToBridgeCommand(self, name) + + def iface_to_br(self, name): + # For our purposes, ports and interfaces always have the same name + return cmd.PortToBridgeCommand(self, name) + + def list_br(self): + return cmd.ListBridgesCommand(self) + + def br_get_external_id(self, name, field): + return cmd.BrGetExternalIdCommand(self, name, field) + + def br_set_external_id(self, name, field, value): + return cmd.BrSetExternalIdCommand(self, name, field, value) + + def db_set(self, table, record, *col_values): + return cmd.DbSetCommand(self, table, record, *col_values) + + def db_clear(self, table, record, column): + return cmd.DbClearCommand(self, table, record, column) + + def db_get(self, table, record, column): + return cmd.DbGetCommand(self, table, record, column) + + def db_list(self, table, records=None, columns=None, if_exists=False): + return cmd.DbListCommand(self, table, records, columns, if_exists) + + def db_find(self, table, *conditions, **kwargs): + return cmd.DbFindCommand(self, table, *conditions, **kwargs) + + def set_controller(self, bridge, controllers): + return cmd.SetControllerCommand(self, bridge, controllers) + + def del_controller(self, bridge): + return cmd.DelControllerCommand(self, bridge) + + def get_controller(self, bridge): + return cmd.GetControllerCommand(self, bridge) + + def set_fail_mode(self, bridge, mode): + return cmd.SetFailModeCommand(self, bridge, mode) + + def add_port(self, bridge, port, may_exist=True): + return cmd.AddPortCommand(self, bridge, port, may_exist) + + def del_port(self, port, bridge=None, if_exists=True): + return cmd.DelPortCommand(self, port, bridge, if_exists) + + def list_ports(self, bridge): + return cmd.ListPortsCommand(self, bridge) diff --git a/neutron/agent/ovsdb/native/__init__.py b/neutron/agent/ovsdb/native/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/neutron/agent/ovsdb/native/commands.py b/neutron/agent/ovsdb/native/commands.py new file mode 100644 index 00000000000..5928e123c79 --- /dev/null +++ b/neutron/agent/ovsdb/native/commands.py @@ -0,0 +1,406 @@ +# Copyright (c) 2015 Openstack Foundation +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import collections + +from oslo_log import log as logging +from oslo_utils import excutils + +from neutron.agent.ovsdb import api +from neutron.agent.ovsdb.native import idlutils +from neutron.common import exceptions +from neutron.i18n import _LE + +LOG = logging.getLogger(__name__) + + +class RowNotFound(exceptions.NeutronException): + message = _("Table %(table)s has no row with %(col)s=%(match)s") + + +class BaseCommand(api.Command): + def __init__(self, api): + self.api = api + self.result = None + + def execute(self, check_error=False, log_errors=True): + try: + with self.api.transaction(check_error, log_errors) as txn: + txn.add(self) + return self.result + except Exception: + with excutils.save_and_reraise_exception() as ctx: + if log_errors: + LOG.exception(_LE("Error executing command")) + if not check_error: + ctx.reraise = False + + def row_by_index(self, table, match, *default): + tab = self.api._tables[table] + idx = idlutils.get_index_column(tab) + return self.row_by_value(table, idx, match, *default) + + def row_by_value(self, table, column, match, *default): + tab = self.api._tables[table] + try: + return next(r for r in tab.rows.values() + if getattr(r, column) == match) + except StopIteration: + if len(default) == 1: + return default[0] + else: + raise RowNotFound(table=table, col=column, match=match) + + def __str__(self): + command_info = self.__dict__ + return "%s(%s)" % ( + self.__class__.__name__, + ", ".join("%s=%s" % (k, v) for k, v in command_info.items() + if k not in ['api', 'result'])) + + +class AddBridgeCommand(BaseCommand): + def __init__(self, api, name, may_exist): + super(AddBridgeCommand, self).__init__(api) + self.name = name + self.may_exist = may_exist + + def run_idl(self, txn): + if self.may_exist: + br = self.row_by_value('Bridge', 'name', self.name, None) + if br: + return + row = txn.insert(self.api._tables['Bridge']) + row.name = self.name + self.api._ovs.verify('bridges') + self.api._ovs.bridges = self.api._ovs.bridges + [row] + + # Add the internal bridge port + cmd = AddPortCommand(self.api, self.name, self.name, self.may_exist) + cmd.run_idl(txn) + + cmd = DbSetCommand(self.api, 'Interface', self.name, + ('type', 'internal')) + cmd.run_idl(txn) + + +class DelBridgeCommand(BaseCommand): + def __init__(self, api, name, if_exists): + super(DelBridgeCommand, self).__init__(api) + self.name = name + self.if_exists = if_exists + + def run_idl(self, txn): + try: + br = self.row_by_value('Bridge', 'name', self.name) + except RowNotFound: + if self.if_exists: + return + else: + msg = _LE("Bridge %s does not exist") % self.name + LOG.error(msg) + raise RuntimeError(msg) + self.api._ovs.verify('bridges') + for port in br.ports: + cmd = DelPortCommand(self.api, port.name, self.name, + if_exists=True) + cmd.run_idl(txn) + bridges = self.api._ovs.bridges + bridges.remove(br) + self.api._ovs.bridges = bridges + del self.api._tables['Bridge'].rows[br.uuid] + + +class BridgeExistsCommand(BaseCommand): + def __init__(self, api, name): + super(BridgeExistsCommand, self).__init__(api) + self.name = name + + def run_idl(self, txn): + self.result = bool(self.row_by_value('Bridge', + 'name', self.name, None)) + + +class ListBridgesCommand(BaseCommand): + def __init__(self, api): + super(ListBridgesCommand, self).__init__(api) + + def run_idl(self, txn): + # NOTE (twilson) [x.name for x in rows.values()] if no index + self.result = [x.name for x in + self.api._tables['Bridge'].rows.values()] + + +class BrGetExternalIdCommand(BaseCommand): + def __init__(self, api, name, field): + super(BrGetExternalIdCommand, self).__init__(api) + self.name = name + self.field = field + + def run_idl(self, txn): + br = self.row_by_value('Bridge', 'name', self.name) + self.result = br.external_ids[self.field] + + +class BrSetExternalIdCommand(BaseCommand): + def __init__(self, api, name, field, value): + super(BrSetExternalIdCommand, self).__init__(api) + self.name = name + self.field = field + self.value = value + + def run_idl(self, txn): + br = self.row_by_value('Bridge', 'name', self.name) + external_ids = getattr(br, 'external_ids', {}) + external_ids[self.field] = self.value + br.external_ids = external_ids + + +class DbSetCommand(BaseCommand): + def __init__(self, api, table, record, *col_values): + super(DbSetCommand, self).__init__(api) + self.table = table + self.record = record + self.col_values = col_values + + def run_idl(self, txn): + record = self.row_by_index(self.table, self.record) + for col, val in self.col_values: + # TODO(twilson) Ugh, the OVS library doesn't like OrderedDict + # We're only using it to make a unit test work, so we should fix + # this soon. + if isinstance(val, collections.OrderedDict): + val = dict(val) + setattr(record, col, val) + + +class DbClearCommand(BaseCommand): + def __init__(self, api, table, record, column): + super(DbClearCommand, self).__init__(api) + self.table = table + self.record = record + self.column = column + + def run_idl(self, txn): + record = self.row_by_index(self.table, self.record) + # Create an empty value of the column type + value = type(getattr(record, self.column))() + setattr(record, self.column, value) + + +class DbGetCommand(BaseCommand): + def __init__(self, api, table, record, column): + super(DbGetCommand, self).__init__(api) + self.table = table + self.record = record + self.column = column + + def run_idl(self, txn): + record = self.row_by_index(self.table, self.record) + # TODO(twilson) This feels wrong, but ovs-vsctl returns single results + # on set types without the list. The IDL is returning them as lists, + # even if the set has the maximum number of items set to 1. Might be + # able to inspect the Schema and just do this conversion for that case. + result = getattr(record, self.column) + if isinstance(result, list) and len(result) == 1: + self.result = result[0] + else: + self.result = result + + +class SetControllerCommand(BaseCommand): + def __init__(self, api, bridge, targets): + super(SetControllerCommand, self).__init__(api) + self.bridge = bridge + self.targets = targets + + def run_idl(self, txn): + br = self.row_by_value('Bridge', 'name', self.bridge) + controllers = [] + for target in self.targets: + controller = txn.insert(self.api._tables['Controller']) + controller.target = target + controllers.append(controller) + br.verify('controller') + br.controller = controllers + + +class DelControllerCommand(BaseCommand): + def __init__(self, api, bridge): + super(DelControllerCommand, self).__init__(api) + self.bridge = bridge + + def run_idl(self, txn): + br = self.row_by_value('Bridge', 'name', self.bridge) + br.controller = [] + + +class GetControllerCommand(BaseCommand): + def __init__(self, api, bridge): + super(GetControllerCommand, self).__init__(api) + self.bridge = bridge + + def run_idl(self, txn): + br = self.row_by_value('Bridge', 'name', self.bridge) + br.verify('controller') + self.result = [c.target for c in br.controller] + + +class SetFailModeCommand(BaseCommand): + def __init__(self, api, bridge, mode): + super(SetFailModeCommand, self).__init__(api) + self.bridge = bridge + self.mode = mode + + def run_idl(self, txn): + br = self.row_by_value('Bridge', 'name', self.bridge) + br.verify('fail_mode') + br.fail_mode = self.mode + + +class AddPortCommand(BaseCommand): + def __init__(self, api, bridge, port, may_exist): + super(AddPortCommand, self).__init__(api) + self.bridge = bridge + self.port = port + self.may_exist = may_exist + + def run_idl(self, txn): + br = self.row_by_value('Bridge', 'name', self.bridge) + if self.may_exist: + port = self.row_by_value('Port', 'name', self.port, None) + if port: + return + port = txn.insert(self.api._tables['Port']) + port.name = self.port + br.verify('ports') + ports = getattr(br, 'ports', []) + ports.append(port) + br.ports = ports + + iface = txn.insert(self.api._tables['Interface']) + iface.name = self.port + port.verify('interfaces') + ifaces = getattr(port, 'interfaces', []) + ifaces.append(iface) + port.interfaces = ifaces + + +class DelPortCommand(BaseCommand): + def __init__(self, api, port, bridge, if_exists): + super(DelPortCommand, self).__init__(api) + self.port = port + self.bridge = bridge + self.if_exists = if_exists + + def run_idl(self, txn): + try: + port = self.row_by_value('Port', 'name', self.port) + except RowNotFound: + if self.if_exists: + return + msg = _LE("Port %s does not exist") % self.port + raise RuntimeError(msg) + if self.bridge: + br = self.row_by_value('Bridge', 'name', self.bridge) + else: + br = next(b for b in self.api._tables['Bridge'].rows.values() + if port in b.ports) + + if port.uuid not in br.ports and not self.if_exists: + # TODO(twilson) Make real errors across both implementations + msg = _LE("Port %(port)s does not exist on %(bridge)s!") % { + 'port': self.name, 'bridge': self.bridge + } + LOG.error(msg) + raise RuntimeError(msg) + + br.verify('ports') + ports = br.ports + ports.remove(port) + br.ports = ports + + # Also remove port/interface directly for indexing? + port.verify('interfaces') + for iface in port.interfaces: + del self.api._tables['Interface'].rows[iface.uuid] + del self.api._tables['Port'].rows[port.uuid] + + +class ListPortsCommand(BaseCommand): + def __init__(self, api, bridge): + super(ListPortsCommand, self).__init__(api) + self.bridge = bridge + + def run_idl(self, txn): + br = self.row_by_value('Bridge', 'name', self.bridge) + self.result = [p.name for p in br.ports if p.name != self.bridge] + + +class PortToBridgeCommand(BaseCommand): + def __init__(self, api, name): + super(PortToBridgeCommand, self).__init__(api) + self.name = name + + def run_idl(self, txn): + # TODO(twilson) This is expensive! + # This traversal of all ports could be eliminated by caching the bridge + # name on the Port's (or Interface's for iface_to_br) external_id field + # In fact, if we did that, the only place that uses to_br functions + # could just add the external_id field to the conditions passed to find + port = self.row_by_value('Port', 'name', self.name) + bridges = self.api._tables['Bridge'].rows.values() + self.result = next(br.name for br in bridges if port in br.ports) + + +class DbListCommand(BaseCommand): + def __init__(self, api, table, records, columns, if_exists): + super(DbListCommand, self).__init__(api) + self.table = self.api._tables[table] + self.columns = columns or self.table.columns.keys() + ['_uuid'] + self.if_exists = if_exists + idx = idlutils.get_index_column(self.table) + if records: + self.records = [uuid for uuid, row in self.table.rows.items() + if getattr(row, idx) in records] + else: + self.records = self.table.rows.keys() + + def run_idl(self, txn): + self.result = [ + { + c: idlutils.get_column_value(self.table.rows[uuid], c) + for c in self.columns + } + for uuid in self.records + ] + + +class DbFindCommand(BaseCommand): + def __init__(self, api, table, *conditions, **kwargs): + super(DbFindCommand, self).__init__(api) + self.table = self.api._tables[table] + self.conditions = conditions + self.columns = (kwargs.get('columns') or + self.table.columns.keys() + ['_uuid']) + + def run_idl(self, txn): + self.result = [ + { + c: idlutils.get_column_value(r, c) + for c in self.columns + } + for r in self.table.rows.values() + if idlutils.row_match(r, self.conditions) + ] diff --git a/neutron/agent/ovsdb/native/connection.py b/neutron/agent/ovsdb/native/connection.py new file mode 100644 index 00000000000..25ea55ead51 --- /dev/null +++ b/neutron/agent/ovsdb/native/connection.py @@ -0,0 +1,87 @@ +# Copyright (c) 2015 Red Hat, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import os +import Queue +import threading + +from ovs.db import idl +from ovs import poller + +from neutron.agent.ovsdb.native import idlutils + + +class TransactionQueue(Queue.Queue, object): + def __init__(self, *args, **kwargs): + super(TransactionQueue, self).__init__(*args, **kwargs) + alertpipe = os.pipe() + self.alertin = os.fdopen(alertpipe[0], 'r', 0) + self.alertout = os.fdopen(alertpipe[1], 'w', 0) + + def get_nowait(self, *args, **kwargs): + try: + result = super(TransactionQueue, self).get_nowait(*args, **kwargs) + except Queue.Empty: + return None + self.alertin.read(1) + return result + + def put(self, *args, **kwargs): + super(TransactionQueue, self).put(*args, **kwargs) + self.alertout.write('X') + self.alertout.flush() + + @property + def alert_fileno(self): + return self.alertin.fileno() + + +class Connection(object): + def __init__(self, connection, timeout): + self.idl = None + self.connection = connection + self.timeout = timeout + self.txns = TransactionQueue(1) + self.lock = threading.Lock() + + def start(self): + with self.lock: + if self.idl is not None: + return + + helper = idlutils.get_schema_helper(self.connection) + helper.register_all() + self.idl = idl.Idl(self.connection, helper) + idlutils.wait_for_change(self.idl, self.timeout) + self.poller = poller.Poller() + self.thread = threading.Thread(target=self.run) + self.thread.setDaemon(True) + self.thread.start() + + def run(self): + while True: + self.idl.wait(self.poller) + self.poller.fd_wait(self.txns.alert_fileno, poller.POLLIN) + self.poller.block() + self.idl.run() + txn = self.txns.get_nowait() + if txn is not None: + try: + txn.results.put(txn.do_commit()) + except Exception as ex: + txn.results.put(ex) + self.txns.task_done() + + def queue_txn(self, txn): + self.txns.put(txn) diff --git a/neutron/agent/ovsdb/native/idlutils.py b/neutron/agent/ovsdb/native/idlutils.py new file mode 100644 index 00000000000..89f32245395 --- /dev/null +++ b/neutron/agent/ovsdb/native/idlutils.py @@ -0,0 +1,118 @@ +# Copyright (c) 2015 Red Hat, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import os +import time + +from ovs.db import idl +from ovs import jsonrpc +from ovs import poller +from ovs import stream + + +def get_schema_helper(connection): + err, strm = stream.Stream.open_block( + stream.Stream.open(connection)) + if err: + raise Exception("Could not connect to %s" % ( + connection,)) + rpc = jsonrpc.Connection(strm) + req = jsonrpc.Message.create_request('get_schema', ['Open_vSwitch']) + err, resp = rpc.transact_block(req) + rpc.close() + if err: + raise Exception("Could not retrieve schema from %s: %s" % ( + connection, os.strerror(err))) + elif resp.error: + raise Exception(resp.error) + return idl.SchemaHelper(None, resp.result) + + +def wait_for_change(_idl, timeout, seqno=None): + if seqno is None: + seqno = _idl.change_seqno + stop = time.time() + timeout + while _idl.change_seqno == seqno: + _idl.run() + ovs_poller = poller.Poller() + _idl.wait(ovs_poller) + ovs_poller.timer_wait(timeout * 1000) + ovs_poller.block() + if time.time() > stop: + raise Exception("Timeout") + + +def get_column_value(row, col): + if col == '_uuid': + val = row.uuid + else: + val = getattr(row, col) + + # Idl returns lists of Rows where ovs-vsctl returns lists of UUIDs + if isinstance(val, list) and len(val): + if isinstance(val[0], idl.Row): + val = [v.uuid for v in val] + # ovs-vsctl treats lists of 1 as single results + if len(val) == 1: + val = val[0] + return val + + +def condition_match(row, condition): + """Return whether a condition matches a row + + :param row An OVSDB Row + :param condition A 3-tuple containing (column, operation, match) + """ + + col, op, match = condition + val = get_column_value(row, col) + matched = True + + # TODO(twilson) Implement other operators and type comparisons + # ovs_lib only uses dict '=' and '!=' searches for now + if isinstance(match, dict): + for key in match: + if op == '=': + if (key not in val or match[key] != val[key]): + matched = False + break + elif op == '!=': + if key not in val or match[key] == val[key]: + matched = False + break + else: + raise NotImplementedError() + elif isinstance(match, list): + raise NotImplementedError() + else: + if op == '==' and val != match: + matched = False + elif op == '!=' and val == match: + matched = False + else: + raise NotImplementedError() + return matched + + +def row_match(row, conditions): + """Return whether the row matches the list of conditions""" + return all(condition_match(row, cond) for cond in conditions) + + +def get_index_column(table): + if len(table.indexes) == 1: + idx = table.indexes[0] + if len(idx) == 1: + return idx[0].name diff --git a/neutron/tests/functional/agent/linux/base.py b/neutron/tests/functional/agent/linux/base.py index 49c2a561c21..68a0051b46e 100644 --- a/neutron/tests/functional/agent/linux/base.py +++ b/neutron/tests/functional/agent/linux/base.py @@ -129,6 +129,7 @@ class BaseLinuxTestCase(functional_base.BaseSudoTestCase): class BaseOVSLinuxTestCase(testscenarios.WithScenarios, BaseLinuxTestCase): scenarios = [ ('vsctl', dict(ovsdb_interface='vsctl')), + ('native', dict(ovsdb_interface='native')), ] def setUp(self): diff --git a/neutron/tests/functional/agent/test_ovs_lib.py b/neutron/tests/functional/agent/test_ovs_lib.py index 3ed3e6313fa..eba1361ffb7 100644 --- a/neutron/tests/functional/agent/test_ovs_lib.py +++ b/neutron/tests/functional/agent/test_ovs_lib.py @@ -54,6 +54,16 @@ class OVSBridgeTestCase(base.BaseOVSLinuxTestCase): self.br.delete_port(port_name) self.assertFalse(self.br.port_exists(port_name)) + def test_duplicate_port_may_exist_false(self): + port_name, ofport = self.create_ovs_port(('type', 'internal')) + cmd = self.br.ovsdb.add_port(self.br.br_name, + port_name, may_exist=False) + self.assertRaises(RuntimeError, cmd.execute, check_error=True) + + def test_delete_port_if_exists_false(self): + cmd = self.br.ovsdb.del_port('nonexistantport', if_exists=False) + self.assertRaises(RuntimeError, cmd.execute, check_error=True) + def test_replace_port(self): port_name = base.get_rand_port_name() self.br.replace_port(port_name, ('type', 'internal')) diff --git a/neutron/tests/unit/agent/linux/test_ovs_lib.py b/neutron/tests/unit/agent/linux/test_ovs_lib.py index 2376fdbd509..3924dbfcbaf 100644 --- a/neutron/tests/unit/agent/linux/test_ovs_lib.py +++ b/neutron/tests/unit/agent/linux/test_ovs_lib.py @@ -146,22 +146,6 @@ class OVS_Lib_Test(base.BaseTestCase): def _build_timeout_opt(self, exp_timeout): return "--timeout=%d" % exp_timeout if exp_timeout else self.TO - def test_replace_port(self): - pname = "tap5" - self.br.replace_port(pname) - self._verify_vsctl_mock("--if-exists", "del-port", pname, - "--", "add-port", self.BR_NAME, pname) - - def test_replace_port_with_attrs(self): - pname = "tap5" - self.br.replace_port(pname, ('type', 'internal'), - ('external_ids:iface-status', 'active')) - self._verify_vsctl_mock("--if-exists", "del-port", pname, - "--", "add-port", self.BR_NAME, pname, - "--", "set", "Interface", pname, - "type=internal", - "external_ids:iface-status=active") - def _test_delete_port(self, exp_timeout=None): pname = "tap5" self.br.delete_port(pname)