Merge "[OVN] Add OVN functional tests - part 1"

This commit is contained in:
Zuul 2020-02-21 16:54:38 +00:00 committed by Gerrit Code Review
commit 6b9765c991
17 changed files with 2175 additions and 1 deletions

View File

@ -0,0 +1,194 @@
# Copyright 2020 Red Hat, Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import mock
from oslo_config import fixture as fixture_config
from oslo_utils import uuidutils
from ovsdbapp.backend.ovs_idl import event
from ovsdbapp.backend.ovs_idl import idlutils
from ovsdbapp.tests.functional.schema.ovn_southbound import event as test_event
from neutron.agent.ovn.metadata import agent
from neutron.agent.ovn.metadata import ovsdb
from neutron.agent.ovn.metadata import server as metadata_server
from neutron.common.ovn import constants as ovn_const
from neutron.common import utils as n_utils
from neutron.conf.agent.metadata import config as meta_config
from neutron.conf.agent.ovn.metadata import config as meta_config_ovn
from neutron.tests.functional import base
class MetadataAgentHealthEvent(event.WaitEvent):
event_name = 'MetadataAgentHealthEvent'
def __init__(self, chassis, sb_cfg, timeout=5):
self.chassis = chassis
self.sb_cfg = sb_cfg
super(MetadataAgentHealthEvent, self).__init__(
(self.ROW_UPDATE,), 'Chassis', (('name', '=', self.chassis),),
timeout=timeout)
def matches(self, event, row, old=None):
if not super(MetadataAgentHealthEvent, self).matches(event, row, old):
return False
return int(row.external_ids.get(
ovn_const.OVN_AGENT_METADATA_SB_CFG_KEY, 0)) >= self.sb_cfg
class TestMetadataAgent(base.TestOVNFunctionalBase):
OVN_BRIDGE = 'br-int'
FAKE_CHASSIS_HOST = 'ovn-host-fake'
def setUp(self):
super(TestMetadataAgent, self).setUp()
self.handler = self.sb_api.idl.notify_handler
# We only have OVN NB and OVN SB running for functional tests
mock.patch.object(ovsdb, 'MetadataAgentOvsIdl').start()
self._mock_get_ovn_br = mock.patch.object(
agent.MetadataAgent,
'_get_ovn_bridge',
return_value=self.OVN_BRIDGE).start()
self.agent = self._start_metadata_agent()
def _start_metadata_agent(self):
conf = self.useFixture(fixture_config.Config()).conf
conf.register_opts(meta_config.SHARED_OPTS)
conf.register_opts(meta_config.UNIX_DOMAIN_METADATA_PROXY_OPTS)
conf.register_opts(meta_config.METADATA_PROXY_HANDLER_OPTS)
conf.register_opts(meta_config_ovn.OVS_OPTS, group='ovs')
meta_config_ovn.setup_privsep()
ovn_sb_db = self.ovsdb_server_mgr.get_ovsdb_connection_path('sb')
conf.set_override('ovn_sb_connection', ovn_sb_db, group='ovn')
# We don't need the HA proxy server running for now
p = mock.patch.object(metadata_server, 'UnixDomainMetadataProxy')
p.start()
self.addCleanup(p.stop)
self.chassis_name = self.add_fake_chassis(self.FAKE_CHASSIS_HOST)
mock.patch.object(agent.MetadataAgent,
'_get_own_chassis_name',
return_value=self.chassis_name).start()
agt = agent.MetadataAgent(conf)
agt.start()
# Metadata agent will open connections to OVS and SB databases.
# Close connections to them when the test ends,
self.addCleanup(agt.ovs_idl.ovsdb_connection.stop)
self.addCleanup(agt.sb_idl.ovsdb_connection.stop)
return agt
def test_metadata_agent_healthcheck(self):
chassis_row = self.sb_api.db_find(
'Chassis', ('name', '=', self.chassis_name)).execute(
check_error=True)[0]
# Assert that, prior to creating a resource the metadata agent
# didn't populate the external_ids from the Chassis
self.assertNotIn(ovn_const.OVN_AGENT_METADATA_SB_CFG_KEY,
chassis_row['external_ids'])
# Let's list the agents to force the nb_cfg to be bumped on NB
# db, which will automatically increment the nb_cfg counter on
# NB_Global and make ovn-controller copy it over to SB_Global. Upon
# this event, Metadata agent will update the external_ids on its
# Chassis row to signal that it's healthy.
row_event = MetadataAgentHealthEvent(self.chassis_name, 1)
self.handler.watch_event(row_event)
self.new_list_request('agents').get_response(self.api)
# If we do not time out waiting for the event, then we are assured
# that the metadata agent has populated the external_ids from the
# chassis with the nb_cfg, 1 revisions when listing the agents.
self.assertTrue(row_event.wait())
def _create_metadata_port(self, txn, lswitch_name):
mdt_port_name = 'ovn-mdt-' + uuidutils.generate_uuid()
txn.add(
self.nb_api.lsp_add(
lswitch_name,
mdt_port_name,
type='localport',
addresses='AA:AA:AA:AA:AA:AA 192.168.122.123',
external_ids={
ovn_const.OVN_CIDRS_EXT_ID_KEY: '192.168.122.123/24'}))
def test_agent_resync_on_non_existing_bridge(self):
BR_NEW = 'br-new'
self._mock_get_ovn_br.return_value = BR_NEW
self.agent.ovs_idl.list_br.return_value.execute.return_value = [BR_NEW]
# The agent has initialized with br-int and above list_br doesn't
# return it, hence the agent should trigger reconfiguration and store
# new br-new value to its attribute.
self.assertEqual(self.OVN_BRIDGE, self.agent.ovn_bridge)
lswitch_name = 'ovn-' + uuidutils.generate_uuid()
lswitchport_name = 'ovn-port-' + uuidutils.generate_uuid()
# It may take some time to ovn-northd to translate from OVN NB DB to
# the OVN SB DB. Wait for port binding event to happen before binding
# the port to chassis.
pb_event = test_event.WaitForPortBindingEvent(lswitchport_name)
self.handler.watch_event(pb_event)
with self.nb_api.transaction(check_error=True, log_errors=True) as txn:
txn.add(
self.nb_api.ls_add(lswitch_name))
txn.add(
self.nb_api.create_lswitch_port(
lswitchport_name, lswitch_name))
self._create_metadata_port(txn, lswitch_name)
self.assertTrue(pb_event.wait())
# Trigger PortBindingChassisEvent
self.sb_api.lsp_bind(lswitchport_name, self.chassis_name).execute(
check_error=True, log_errors=True)
exc = Exception("Agent bridge hasn't changed from %s to %s "
"in 10 seconds after Port_Binding event" %
(self.agent.ovn_bridge, BR_NEW))
n_utils.wait_until_true(
lambda: BR_NEW == self.agent.ovn_bridge,
timeout=10,
exception=exc)
def test_agent_registration_at_chassis_create_event(self):
chassis = self.sb_api.lookup('Chassis', self.chassis_name)
self.assertIn(ovn_const.OVN_AGENT_METADATA_ID_KEY,
chassis.external_ids)
# Delete Chassis and assert
self.del_fake_chassis(chassis.name)
self.assertRaises(idlutils.RowNotFound, self.sb_api.lookup,
'Chassis', self.chassis_name)
# Re-add the Chassis
self.add_fake_chassis(self.FAKE_CHASSIS_HOST, name=self.chassis_name)
def check_for_metadata():
chassis = self.sb_api.lookup('Chassis', self.chassis_name)
return ovn_const.OVN_AGENT_METADATA_ID_KEY in chassis.external_ids
exc = Exception('Agent metadata failed to re-register itself '
'after the Chassis %s was re-created' %
self.chassis_name)
# Check if metadata agent was re-registered
chassis = self.sb_api.lookup('Chassis', self.chassis_name)
n_utils.wait_until_true(
check_for_metadata,
timeout=10,
exception=exc)

View File

@ -13,22 +13,46 @@
# License for the specific language governing permissions and limitations
# under the License.
from datetime import datetime
import errno
import os
import shutil
import warnings
import fixtures
import mock
from neutron_lib import fixture
from neutron_lib.plugins import constants
from neutron_lib.plugins import directory
from oslo_concurrency import lockutils
from oslo_config import cfg
from oslo_db import exception as os_db_exc
from oslo_db.sqlalchemy import provision
from oslo_log import log
from oslo_utils import uuidutils
from neutron.agent.linux import utils
from neutron.api import extensions as exts
from neutron.conf.agent import common as config
from neutron.conf.agent import ovs_conf
from neutron.conf.plugins.ml2 import config as ml2_config
# Load all the models to register them into SQLAlchemy metadata before using
# the SqlFixture
from neutron.db import models # noqa
from neutron.plugins.ml2.drivers.ovn.mech_driver.ovsdb import worker
from neutron.plugins.ml2.drivers import type_geneve # noqa
from neutron.tests import base
from neutron.tests.common import base as common_base
from neutron.tests.common import helpers
from neutron.tests.functional.resources import process
from neutron.tests.unit.plugins.ml2 import test_plugin
LOG = log.getLogger(__name__)
# This is the directory from which infra fetches log files for functional tests
DEFAULT_LOG_DIR = os.path.join(helpers.get_test_log_path(),
'dsvm-functional-logs')
SQL_FIXTURE_LOCK = 'sql_fixture_lock'
def config_decorator(method_to_decorate, config_tuples):
@ -99,3 +123,237 @@ class BaseSudoTestCase(BaseLoggingTestCase):
ovs_conf.register_ovs_agent_opts, ovs_agent_opts)
mock.patch.object(ovs_conf, 'register_ovs_agent_opts',
new=ovs_agent_decorator).start()
class OVNSqlFixture(fixture.StaticSqlFixture):
@classmethod
@lockutils.synchronized(SQL_FIXTURE_LOCK)
def _init_resources(cls):
cls.schema_resource = provision.SchemaResource(
provision.DatabaseResource("sqlite"),
cls._generate_schema, teardown=False)
dependency_resources = {}
for name, resource in cls.schema_resource.resources:
dependency_resources[name] = resource.getResource()
cls.schema_resource.make(dependency_resources)
cls.engine = dependency_resources['database'].engine
def _delete_from_schema(self, engine):
try:
super(OVNSqlFixture, self)._delete_from_schema(engine)
except os_db_exc.DBNonExistentTable:
pass
class TestOVNFunctionalBase(test_plugin.Ml2PluginV2TestCase,
BaseLoggingTestCase):
OVS_DISTRIBUTION = 'openvswitch'
OVN_DISTRIBUTION = 'ovn'
OVN_SCHEMA_FILES = ['ovn-nb.ovsschema', 'ovn-sb.ovsschema']
_mechanism_drivers = ['logger', 'ovn']
_extension_drivers = ['port_security']
_counter = 0
l3_plugin = 'neutron.services.ovn_l3.plugin.OVNL3RouterPlugin'
def setUp(self, maintenance_worker=False):
ml2_config.cfg.CONF.set_override('extension_drivers',
self._extension_drivers,
group='ml2')
ml2_config.cfg.CONF.set_override('tenant_network_types',
['geneve'],
group='ml2')
ml2_config.cfg.CONF.set_override('vni_ranges',
['1:65536'],
group='ml2_type_geneve')
self.addCleanup(exts.PluginAwareExtensionManager.clear_instance)
super(TestOVNFunctionalBase, self).setUp()
self.test_log_dir = os.path.join(DEFAULT_LOG_DIR, self.id())
base.setup_test_logging(
cfg.CONF, self.test_log_dir, "testrun.txt")
mm = directory.get_plugin().mechanism_manager
self.mech_driver = mm.mech_drivers['ovn'].obj
self.l3_plugin = directory.get_plugin(constants.L3)
self.ovsdb_server_mgr = None
self.ovn_northd_mgr = None
self.maintenance_worker = maintenance_worker
self.temp_dir = self.useFixture(fixtures.TempDir()).path
self._start_ovsdb_server_and_idls()
self._start_ovn_northd()
def _get_install_share_path(self):
lookup_paths = set()
for installation in ['local', '']:
for distribution in [self.OVN_DISTRIBUTION, self.OVS_DISTRIBUTION]:
exists = True
for ovn_file in self.OVN_SCHEMA_FILES:
path = os.path.join(os.path.sep, 'usr', installation,
'share', distribution, ovn_file)
exists &= os.path.isfile(path)
lookup_paths.add(os.path.dirname(path))
if exists:
return os.path.dirname(path)
msg = 'Either ovn-nb.ovsschema and/or ovn-sb.ovsschema are missing. '
msg += 'Looked for schemas in paths:' + ', '.join(sorted(lookup_paths))
raise FileNotFoundError(
errno.ENOENT, os.strerror(errno.ENOENT), msg)
# FIXME(lucasagomes): Workaround for
# https://bugs.launchpad.net/networking-ovn/+bug/1808146. We should
# investigate and properly fix the problem. This method is just a
# workaround to alleviate the gate for now and should not be considered
# a proper fix.
def _setup_database_fixtures(self):
fixture = OVNSqlFixture()
self.useFixture(fixture)
self.engine = fixture.engine
def get_additional_service_plugins(self):
p = super(TestOVNFunctionalBase, self).get_additional_service_plugins()
p.update({'revision_plugin_name': 'revisions'})
return p
@property
def _ovsdb_protocol(self):
return self.get_ovsdb_server_protocol()
def get_ovsdb_server_protocol(self):
return 'unix'
def _start_ovn_northd(self):
if not self.ovsdb_server_mgr:
return
ovn_nb_db = self.ovsdb_server_mgr.get_ovsdb_connection_path('nb')
ovn_sb_db = self.ovsdb_server_mgr.get_ovsdb_connection_path('sb')
self.ovn_northd_mgr = self.useFixture(
process.OvnNorthd(self.temp_dir,
ovn_nb_db, ovn_sb_db,
protocol=self._ovsdb_protocol))
def _start_ovsdb_server_and_idls(self):
# Start 2 ovsdb-servers one each for OVN NB DB and OVN SB DB
# ovsdb-server with OVN SB DB can be used to test the chassis up/down
# events.
install_share_path = self._get_install_share_path()
self.ovsdb_server_mgr = self.useFixture(
process.OvsdbServer(self.temp_dir, install_share_path,
ovn_nb_db=True, ovn_sb_db=True,
protocol=self._ovsdb_protocol))
set_cfg = cfg.CONF.set_override
set_cfg('ovn_nb_connection',
self.ovsdb_server_mgr.get_ovsdb_connection_path(), 'ovn')
set_cfg('ovn_sb_connection',
self.ovsdb_server_mgr.get_ovsdb_connection_path(
db_type='sb'), 'ovn')
set_cfg('ovn_nb_private_key', self.ovsdb_server_mgr.private_key, 'ovn')
set_cfg('ovn_nb_certificate', self.ovsdb_server_mgr.certificate, 'ovn')
set_cfg('ovn_nb_ca_cert', self.ovsdb_server_mgr.ca_cert, 'ovn')
set_cfg('ovn_sb_private_key', self.ovsdb_server_mgr.private_key, 'ovn')
set_cfg('ovn_sb_certificate', self.ovsdb_server_mgr.certificate, 'ovn')
set_cfg('ovn_sb_ca_cert', self.ovsdb_server_mgr.ca_cert, 'ovn')
# 5 seconds should be more than enough for the transaction to complete
# for the test cases.
# This also fixes the bug #1607639.
cfg.CONF.set_override(
'ovsdb_connection_timeout', 5,
'ovn')
class TriggerCls(mock.MagicMock):
def trigger(self):
pass
trigger_cls = TriggerCls()
if self.maintenance_worker:
trigger_cls.trigger.__self__.__class__ = worker.MaintenanceWorker
cfg.CONF.set_override('neutron_sync_mode', 'off', 'ovn')
self.addCleanup(self._collect_processes_logs)
self.addCleanup(self.stop)
# mech_driver.post_fork_initialize creates the IDL connections
self.mech_driver.post_fork_initialize(
mock.ANY, mock.ANY, trigger_cls.trigger)
self.nb_api = self.mech_driver._nb_ovn
self.sb_api = self.mech_driver._sb_ovn
def _collect_processes_logs(self):
for database in ("nb", "sb"):
for file_suffix in ("log", "db"):
src_filename = "ovn_%(db)s.%(suffix)s" % {
'db': database,
'suffix': file_suffix
}
dst_filename = "ovn_%(db)s-%(timestamp)s.%(suffix)s" % {
'db': database,
'suffix': file_suffix,
'timestamp': datetime.now().strftime('%y-%m-%d_%H-%M-%S'),
}
filepath = os.path.join(self.temp_dir, src_filename)
shutil.copyfile(
filepath, os.path.join(self.test_log_dir, dst_filename))
def stop(self):
if self.maintenance_worker:
self.mech_driver.nb_synchronizer.stop()
self.mech_driver.sb_synchronizer.stop()
self.mech_driver._nb_ovn.ovsdb_connection.stop()
self.mech_driver._sb_ovn.ovsdb_connection.stop()
def restart(self):
self.stop()
# The OVN sync test starts its own synchronizers...
self.l3_plugin._nb_ovn_idl.ovsdb_connection.stop()
self.l3_plugin._sb_ovn_idl.ovsdb_connection.stop()
# Stop our monitor connections
self.nb_api.ovsdb_connection.stop()
self.sb_api.ovsdb_connection.stop()
if self.ovsdb_server_mgr:
self.ovsdb_server_mgr.stop()
if self.ovn_northd_mgr:
self.ovn_northd_mgr.stop()
self.mech_driver._nb_ovn = None
self.mech_driver._sb_ovn = None
self.l3_plugin._nb_ovn_idl = None
self.l3_plugin._sb_ovn_idl = None
self.nb_api.ovsdb_connection = None
self.sb_api.ovsdb_connection = None
self.ovsdb_server_mgr.delete_dbs()
self._start_ovsdb_server_and_idls()
self._start_ovn_northd()
def add_fake_chassis(self, host, physical_nets=None, external_ids=None,
name=None):
physical_nets = physical_nets or []
external_ids = external_ids or {}
bridge_mapping = ",".join(["%s:br-provider%s" % (phys_net, i)
for i, phys_net in enumerate(physical_nets)])
if name is None:
name = uuidutils.generate_uuid()
external_ids['ovn-bridge-mappings'] = bridge_mapping
# We'll be using different IP addresses every time for the Encap of
# the fake chassis as the SB schema doesn't allow to have two entries
# with same (ip,type) pairs as of OVS 2.11. This shouldn't have any
# impact as the tunnels won't get created anyways since ovn-controller
# is not running. Ideally we shouldn't be creating more than 255
# fake chassis but from the SB db point of view, 'ip' column can be
# any string so we could add entries with ip='172.24.4.1000'.
self._counter += 1
self.sb_api.chassis_add(
name, ['geneve'], '172.24.4.%d' % self._counter,
external_ids=external_ids, hostname=host).execute(check_error=True)
return name
def del_fake_chassis(self, chassis, if_exists=True):
self.sb_api.chassis_del(
chassis, if_exists=if_exists).execute(check_error=True)

View File

@ -0,0 +1,149 @@
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
import uuid
from ovsdbapp import event as ovsdb_event
from ovsdbapp.tests.functional import base
from ovsdbapp.tests import utils
from neutron.plugins.ml2.drivers.ovn.mech_driver.ovsdb \
import impl_idl_ovn as impl
from neutron.tests.functional import base as n_base
from neutron.tests.functional.resources.ovsdb import events
class TestSbApi(base.FunctionalTestCase,
n_base.BaseLoggingTestCase):
schemas = ['OVN_Southbound', 'OVN_Northbound']
def setUp(self):
super(TestSbApi, self).setUp()
self.data = {
'chassis': [
{'external_ids': {'ovn-bridge-mappings':
'public:br-ex,private:br-0'}},
{'external_ids': {'ovn-bridge-mappings':
'public:br-ex,public2:br-ex'}},
{'external_ids': {'ovn-bridge-mappings':
'public:br-ex'}},
]
}
self.api = impl.OvsdbSbOvnIdl(self.connection['OVN_Southbound'])
self.nbapi = impl.OvsdbNbOvnIdl(self.connection['OVN_Northbound'])
self.load_test_data()
self.handler = ovsdb_event.RowEventHandler()
self.api.idl.notify = self.handler.notify
def load_test_data(self):
with self.api.transaction(check_error=True) as txn:
for chassis in self.data['chassis']:
chassis['name'] = utils.get_rand_device_name('chassis')
chassis['hostname'] = '%s.localdomain.com' % chassis['name']
txn.add(self.api.chassis_add(
chassis['name'], ['geneve'], chassis['hostname'],
hostname=chassis['hostname'],
external_ids=chassis['external_ids']))
def test_get_chassis_hostname_and_physnets(self):
mapping = self.api.get_chassis_hostname_and_physnets()
self.assertLessEqual(len(self.data['chassis']), len(mapping))
self.assertGreaterEqual(set(mapping.keys()),
{c['hostname'] for c in self.data['chassis']})
def test_get_all_chassis(self):
chassis_list = set(self.api.get_all_chassis())
our_chassis = {c['name'] for c in self.data['chassis']}
self.assertLessEqual(our_chassis, chassis_list)
def test_get_chassis_data_for_ml2_bind_port(self):
host = self.data['chassis'][0]['hostname']
dp, iface, phys = self.api.get_chassis_data_for_ml2_bind_port(host)
self.assertEqual('', dp)
self.assertEqual('', iface)
self.assertItemsEqual(phys, ['private', 'public'])
def test_chassis_exists(self):
self.assertTrue(self.api.chassis_exists(
self.data['chassis'][0]['hostname']))
self.assertFalse(self.api.chassis_exists("nochassishere"))
def test_get_chassis_and_physnets(self):
mapping = self.api.get_chassis_and_physnets()
self.assertLessEqual(len(self.data['chassis']), len(mapping))
self.assertGreaterEqual(set(mapping.keys()),
{c['name'] for c in self.data['chassis']})
def _add_switch_port(self, chassis_name, type='localport'):
sname, pname = (utils.get_rand_device_name(prefix=p)
for p in ('switch', 'port'))
chassis = self.api.lookup('Chassis', chassis_name)
row_event = events.WaitForCreatePortBindingEvent(pname)
self.handler.watch_event(row_event)
with self.nbapi.transaction(check_error=True) as txn:
switch = txn.add(self.nbapi.ls_add(sname))
port = txn.add(self.nbapi.lsp_add(sname, pname, type=type))
row_event.wait()
return chassis, switch.result, port.result, row_event.row
def test_get_metadata_port_network(self):
chassis, switch, port, binding = self._add_switch_port(
self.data['chassis'][0]['name'])
result = self.api.get_metadata_port_network(str(binding.datapath.uuid))
self.assertEqual(binding, result)
self.assertEqual(binding.datapath.external_ids['logical-switch'],
str(switch.uuid))
def test_get_metadata_port_network_missing(self):
val = str(uuid.uuid4())
self.assertIsNone(self.api.get_metadata_port_network(val))
def test_set_get_chassis_metadata_networks(self):
name = self.data['chassis'][0]['name']
nets = [str(uuid.uuid4()) for _ in range(3)]
self.api.set_chassis_metadata_networks(name, nets).execute(
check_error=True)
self.assertEqual(nets, self.api.get_chassis_metadata_networks(name))
def test_get_network_port_bindings_by_ip(self):
chassis, switch, port, binding = self._add_switch_port(
self.data['chassis'][0]['name'])
mac = 'de:ad:be:ef:4d:ad'
ipaddr = '192.0.2.1'
mac_ip = '%s %s' % (mac, ipaddr)
pb_update_event = events.WaitForUpdatePortBindingEvent(
port.name, mac=[mac_ip])
self.handler.watch_event(pb_update_event)
self.nbapi.lsp_set_addresses(
port.name, [mac_ip]).execute(check_error=True)
self.assertTrue(pb_update_event.wait())
self.api.lsp_bind(port.name, chassis.name).execute(check_error=True)
result = self.api.get_network_port_bindings_by_ip(
str(binding.datapath.uuid), ipaddr)
self.assertIn(binding, result)
def test_get_ports_on_chassis(self):
chassis, switch, port, binding = self._add_switch_port(
self.data['chassis'][0]['name'])
self.api.lsp_bind(port.name, chassis.name).execute(check_error=True)
self.assertEqual([binding],
self.api.get_ports_on_chassis(chassis.name))
def test_get_logical_port_chassis_and_datapath(self):
chassis, switch, port, binding = self._add_switch_port(
self.data['chassis'][0]['name'])
self.api.lsp_bind(port.name, chassis.name).execute(check_error=True)
self.assertEqual(
(chassis.name, str(binding.datapath.uuid)),
self.api.get_logical_port_chassis_and_datapath(port.name))

View File

@ -0,0 +1,800 @@
# Copyright 2020 Red Hat, Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import mock
from futurist import periodics
from neutron_lib.api.definitions import external_net as extnet_apidef
from neutron_lib import constants as n_const
from neutron_lib import context as n_context
from neutron.common.ovn import constants as ovn_const
from neutron.common.ovn import utils
from neutron.conf.plugins.ml2.drivers.ovn import ovn_conf as ovn_config
from neutron.db import ovn_revision_numbers_db as db_rev
from neutron.plugins.ml2.drivers.ovn.mech_driver.ovsdb import maintenance
from neutron.tests.functional import base
from neutron.tests.unit.api import test_extensions
from neutron.tests.unit.extensions import test_extraroute
class _TestMaintenanceHelper(base.TestOVNFunctionalBase):
"""A helper class to keep the code more organized."""
def setUp(self):
super(_TestMaintenanceHelper, self).setUp()
self._ovn_client = self.mech_driver._ovn_client
self._l3_ovn_client = self.l3_plugin._ovn_client
ext_mgr = test_extraroute.ExtraRouteTestExtensionManager()
self.ext_api = test_extensions.setup_extensions_middleware(ext_mgr)
self.maint = maintenance.DBInconsistenciesPeriodics(self._ovn_client)
self.context = n_context.get_admin_context()
# Always verify inconsistencies for all objects.
db_rev.INCONSISTENCIES_OLDER_THAN = -1
def _find_network_row_by_name(self, name):
for row in self.nb_api._tables['Logical_Switch'].rows.values():
if (row.external_ids.get(
ovn_const.OVN_NETWORK_NAME_EXT_ID_KEY) == name):
return row
def _create_network(self, name, external=False):
data = {'network': {'name': name, 'tenant_id': self._tenant_id,
extnet_apidef.EXTERNAL: external}}
req = self.new_create_request('networks', data, self.fmt)
res = req.get_response(self.api)
return self.deserialize(self.fmt, res)['network']
def _update_network_name(self, net_id, new_name):
data = {'network': {'name': new_name}}
req = self.new_update_request('networks', data, net_id, self.fmt)
res = req.get_response(self.api)
return self.deserialize(self.fmt, res)['network']
def _create_port(self, name, net_id, security_groups=None,
device_owner=None):
data = {'port': {'name': name,
'tenant_id': self._tenant_id,
'network_id': net_id}}
if security_groups is not None:
data['port']['security_groups'] = security_groups
if device_owner is not None:
data['port']['device_owner'] = device_owner
req = self.new_create_request('ports', data, self.fmt)
res = req.get_response(self.api)
return self.deserialize(self.fmt, res)['port']
def _update_port_name(self, port_id, new_name):
data = {'port': {'name': new_name}}
req = self.new_update_request('ports', data, port_id, self.fmt)
res = req.get_response(self.api)
return self.deserialize(self.fmt, res)['port']
def _find_port_row_by_name(self, name):
for row in self.nb_api._tables['Logical_Switch_Port'].rows.values():
if (row.external_ids.get(
ovn_const.OVN_PORT_NAME_EXT_ID_KEY) == name):
return row
def _set_global_dhcp_opts(self, ip_version, opts):
opt_string = ','.join(['{0}:{1}'.format(key, value)
for key, value
in opts.items()])
if ip_version == 6:
ovn_config.cfg.CONF.set_override('ovn_dhcp6_global_options',
opt_string,
group='ovn')
if ip_version == 4:
ovn_config.cfg.CONF.set_override('ovn_dhcp4_global_options',
opt_string,
group='ovn')
def _unset_global_dhcp_opts(self, ip_version):
if ip_version == 6:
ovn_config.cfg.CONF.clear_override('ovn_dhcp6_global_options',
group='ovn')
if ip_version == 4:
ovn_config.cfg.CONF.clear_override('ovn_dhcp4_global_options',
group='ovn')
def _create_subnet(self, name, net_id, ip_version=4):
data = {'subnet': {'name': name,
'tenant_id': self._tenant_id,
'network_id': net_id,
'ip_version': ip_version,
'enable_dhcp': True}}
if ip_version == 4:
data['subnet']['cidr'] = '10.0.0.0/24'
else:
data['subnet']['cidr'] = 'eef0::/64'
req = self.new_create_request('subnets', data, self.fmt)
res = req.get_response(self.api)
return self.deserialize(self.fmt, res)['subnet']
def _update_subnet_enable_dhcp(self, subnet_id, value):
data = {'subnet': {'enable_dhcp': value}}
req = self.new_update_request('subnets', data, subnet_id, self.fmt)
res = req.get_response(self.api)
return self.deserialize(self.fmt, res)['subnet']
def _find_subnet_row_by_id(self, subnet_id):
for row in self.nb_api._tables['DHCP_Options'].rows.values():
if (row.external_ids.get('subnet_id') == subnet_id and
not row.external_ids.get('port_id')):
return row
def _create_router(self, name, external_gateway_info=None):
data = {'router': {'name': name, 'tenant_id': self._tenant_id}}
if external_gateway_info is not None:
data['router']['external_gateway_info'] = external_gateway_info
req = self.new_create_request('routers', data, self.fmt)
res = req.get_response(self.api)
return self.deserialize(self.fmt, res)['router']
def _update_router_name(self, net_id, new_name):
data = {'router': {'name': new_name}}
req = self.new_update_request('routers', data, net_id, self.fmt)
res = req.get_response(self.api)
return self.deserialize(self.fmt, res)['router']
def _find_router_row_by_name(self, name):
for row in self.nb_api._tables['Logical_Router'].rows.values():
if (row.external_ids.get(
ovn_const.OVN_ROUTER_NAME_EXT_ID_KEY) == name):
return row
def _create_security_group(self):
data = {'security_group': {'name': 'sgtest',
'tenant_id': self._tenant_id,
'description': 'SpongeBob Rocks!'}}
req = self.new_create_request('security-groups', data, self.fmt)
res = req.get_response(self.api)
return self.deserialize(self.fmt, res)['security_group']
def _find_security_group_row_by_id(self, sg_id):
if self.nb_api.is_port_groups_supported():
for row in self.nb_api._tables['Port_Group'].rows.values():
if row.name == utils.ovn_port_group_name(sg_id):
return row
else:
for row in self.nb_api._tables['Address_Set'].rows.values():
if (row.external_ids.get(
ovn_const.OVN_SG_EXT_ID_KEY) == sg_id):
return row
def _create_security_group_rule(self, sg_id):
data = {'security_group_rule': {'security_group_id': sg_id,
'direction': 'ingress',
'protocol': n_const.PROTO_NAME_TCP,
'ethertype': n_const.IPv4,
'port_range_min': 22,
'port_range_max': 22,
'tenant_id': self._tenant_id}}
req = self.new_create_request('security-group-rules', data, self.fmt)
res = req.get_response(self.api)
return self.deserialize(self.fmt, res)['security_group_rule']
def _find_security_group_rule_row_by_id(self, sgr_id):
for row in self.nb_api._tables['ACL'].rows.values():
if (row.external_ids.get(
ovn_const.OVN_SG_RULE_EXT_ID_KEY) == sgr_id):
return row
def _process_router_interface(self, action, router_id, subnet_id):
req = self.new_action_request(
'routers', {'subnet_id': subnet_id}, router_id,
'%s_router_interface' % action)
res = req.get_response(self.api)
return self.deserialize(self.fmt, res)
def _add_router_interface(self, router_id, subnet_id):
return self._process_router_interface('add', router_id, subnet_id)
def _remove_router_interface(self, router_id, subnet_id):
return self._process_router_interface('remove', router_id, subnet_id)
def _find_router_port_row_by_port_id(self, port_id):
for row in self.nb_api._tables['Logical_Router_Port'].rows.values():
if row.name == utils.ovn_lrouter_port_name(port_id):
return row
class TestMaintenance(_TestMaintenanceHelper):
def test_network(self):
net_name = 'networktest'
with mock.patch.object(self._ovn_client, 'create_network'):
neutron_obj = self._create_network(net_name)
# Assert the network doesn't exist in OVN
self.assertIsNone(self._find_network_row_by_name(net_name))
# Call the maintenance thread to fix the problem
self.maint.check_for_inconsistencies()
# Assert the network was now created
ovn_obj = self._find_network_row_by_name(net_name)
self.assertIsNotNone(ovn_obj)
self.assertEqual(
neutron_obj['revision_number'],
int(ovn_obj.external_ids[ovn_const.OVN_REV_NUM_EXT_ID_KEY]))
# > Update
new_obj_name = 'networktest_updated'
with mock.patch.object(self._ovn_client, 'update_network'):
new_neutron_obj = self._update_network_name(neutron_obj['id'],
new_obj_name)
# Assert the revision numbers are out-of-sync
ovn_obj = self._find_network_row_by_name(net_name)
self.assertNotEqual(
new_neutron_obj['revision_number'],
int(ovn_obj.external_ids[ovn_const.OVN_REV_NUM_EXT_ID_KEY]))
# Call the maintenance thread to fix the problem
self.maint.check_for_inconsistencies()
# Assert the old name doesn't exist anymore in the OVNDB
self.assertIsNone(self._find_network_row_by_name(net_name))
# Assert the network is now in sync
ovn_obj = self._find_network_row_by_name(new_obj_name)
self.assertEqual(
new_neutron_obj['revision_number'],
int(ovn_obj.external_ids[ovn_const.OVN_REV_NUM_EXT_ID_KEY]))
# > Delete
with mock.patch.object(self._ovn_client, 'delete_network'):
self._delete('networks', new_neutron_obj['id'])
# Assert the network still exists in OVNDB
self.assertIsNotNone(self._find_network_row_by_name(new_obj_name))
# Call the maintenance thread to fix the problem
self.maint.check_for_inconsistencies()
# Assert the network is now deleted from OVNDB
self.assertIsNone(self._find_network_row_by_name(new_obj_name))
# Assert the revision number no longer exists
self.assertIsNone(db_rev.get_revision_row(
self.context,
new_neutron_obj['id']))
def test_port(self):
obj_name = 'porttest'
neutron_net = self._create_network('network1')
with mock.patch.object(self._ovn_client, 'create_port'):
neutron_obj = self._create_port(obj_name, neutron_net['id'])
# Assert the port doesn't exist in OVN
self.assertIsNone(self._find_port_row_by_name(obj_name))
# Call the maintenance thread to fix the problem
self.maint.check_for_inconsistencies()
# Assert the port was now created
ovn_obj = self._find_port_row_by_name(obj_name)
self.assertIsNotNone(ovn_obj)
self.assertEqual(
neutron_obj['revision_number'],
int(ovn_obj.external_ids[ovn_const.OVN_REV_NUM_EXT_ID_KEY]))
# > Update
new_obj_name = 'porttest_updated'
with mock.patch.object(self._ovn_client, 'update_port'):
new_neutron_obj = self._update_port_name(neutron_obj['id'],
new_obj_name)
# Assert the revision numbers are out-of-sync
ovn_obj = self._find_port_row_by_name(obj_name)
self.assertNotEqual(
new_neutron_obj['revision_number'],
int(ovn_obj.external_ids[ovn_const.OVN_REV_NUM_EXT_ID_KEY]))
# Call the maintenance thread to fix the problem
self.maint.check_for_inconsistencies()
# Assert the old name doesn't exist anymore in the OVNDB
self.assertIsNone(self._find_port_row_by_name(obj_name))
# Assert the port is now in sync. Note that for ports we are
# fetching it again from the Neutron database prior to comparison
# because of the monitor code that can update the ports again upon
# changes to it.
ovn_obj = self._find_port_row_by_name(new_obj_name)
new_neutron_obj = self._ovn_client._plugin.get_port(
self.context, neutron_obj['id'])
self.assertEqual(
new_neutron_obj['revision_number'],
int(ovn_obj.external_ids[ovn_const.OVN_REV_NUM_EXT_ID_KEY]))
# > Delete
with mock.patch.object(self._ovn_client, 'delete_port'):
self._delete('ports', new_neutron_obj['id'])
# Assert the port still exists in OVNDB
self.assertIsNotNone(self._find_port_row_by_name(new_obj_name))
# Call the maintenance thread to fix the problem
self.maint.check_for_inconsistencies()
# Assert the port is now deleted from OVNDB
self.assertIsNone(self._find_port_row_by_name(new_obj_name))
# Assert the revision number no longer exists
self.assertIsNone(db_rev.get_revision_row(
self.context,
neutron_obj['id']))
def test_subnet_global_dhcp4_opts(self):
obj_name = 'globaltestsubnet'
options = {'ntp_server': '1.2.3.4'}
neutron_net = self._create_network('network1')
# Create a subnet without global options
neutron_sub = self._create_subnet(obj_name, neutron_net['id'])
# Assert that the option is not set
ovn_obj = self._find_subnet_row_by_id(neutron_sub['id'])
self.assertIsNone(ovn_obj.options.get('ntp_server', None))
# Set some global DHCP Options
self._set_global_dhcp_opts(ip_version=4, opts=options)
# Run the maintenance task to add the new options
self.assertRaises(periodics.NeverAgain,
self.maint.check_global_dhcp_opts)
# Assert that the option was added
ovn_obj = self._find_subnet_row_by_id(neutron_sub['id'])
self.assertEqual(
ovn_obj.options.get('ntp_server', None),
'1.2.3.4')
# Change the global option
new_options = {'ntp_server': '4.3.2.1'}
self._set_global_dhcp_opts(ip_version=4, opts=new_options)
# Run the maintenance task to update the options
self.assertRaises(periodics.NeverAgain,
self.maint.check_global_dhcp_opts)
# Assert that the option was changed
ovn_obj = self._find_subnet_row_by_id(neutron_sub['id'])
self.assertEqual(
ovn_obj.options.get('ntp_server', None),
'4.3.2.1')
# Change the global option to null
new_options = {'ntp_server': ''}
self._set_global_dhcp_opts(ip_version=4, opts=new_options)
# Run the maintenance task to update the options
self.assertRaises(periodics.NeverAgain,
self.maint.check_global_dhcp_opts)
# Assert that the option was removed
ovn_obj = self._find_subnet_row_by_id(neutron_sub['id'])
self.assertIsNone(ovn_obj.options.get('ntp_server', None))
def test_subnet_global_dhcp6_opts(self):
obj_name = 'globaltestsubnet'
options = {'ntp_server': '1.2.3.4'}
neutron_net = self._create_network('network1')
# Create a subnet without global options
neutron_sub = self._create_subnet(obj_name, neutron_net['id'], 6)
# Assert that the option is not set
ovn_obj = self._find_subnet_row_by_id(neutron_sub['id'])
self.assertIsNone(ovn_obj.options.get('ntp_server', None))
# Set some global DHCP Options
self._set_global_dhcp_opts(ip_version=6, opts=options)
# Run the maintenance task to add the new options
self.assertRaises(periodics.NeverAgain,
self.maint.check_global_dhcp_opts)
# Assert that the option was added
ovn_obj = self._find_subnet_row_by_id(neutron_sub['id'])
self.assertEqual(
ovn_obj.options.get('ntp_server', None),
'1.2.3.4')
# Change the global option
new_options = {'ntp_server': '4.3.2.1'}
self._set_global_dhcp_opts(ip_version=6, opts=new_options)
# Run the maintenance task to update the options
self.assertRaises(periodics.NeverAgain,
self.maint.check_global_dhcp_opts)
# Assert that the option was changed
ovn_obj = self._find_subnet_row_by_id(neutron_sub['id'])
self.assertEqual(
ovn_obj.options.get('ntp_server', None),
'4.3.2.1')
# Change the global option to null
new_options = {'ntp_server': ''}
self._set_global_dhcp_opts(ip_version=6, opts=new_options)
# Run the maintenance task to update the options
self.assertRaises(periodics.NeverAgain,
self.maint.check_global_dhcp_opts)
# Assert that the option was removed
ovn_obj = self._find_subnet_row_by_id(neutron_sub['id'])
self.assertIsNone(ovn_obj.options.get('ntp_server', None))
def test_subnet(self):
obj_name = 'subnettest'
neutron_net = self._create_network('network1')
with mock.patch.object(self._ovn_client, 'create_subnet'):
neutron_obj = self._create_subnet(obj_name, neutron_net['id'])
# Assert the subnet doesn't exist in OVN
self.assertIsNone(self._find_subnet_row_by_id(neutron_obj['id']))
# Call the maintenance thread to fix the problem
self.maint.check_for_inconsistencies()
# Assert the subnet was now created
ovn_obj = self._find_subnet_row_by_id(neutron_obj['id'])
self.assertIsNotNone(ovn_obj)
self.assertEqual(
neutron_obj['revision_number'],
int(ovn_obj.external_ids[ovn_const.OVN_REV_NUM_EXT_ID_KEY]))
# > Update
with mock.patch.object(self._ovn_client, 'update_subnet'):
neutron_obj = self._update_subnet_enable_dhcp(
neutron_obj['id'], False)
# Assert the revision numbers are out-of-sync
ovn_obj = self._find_subnet_row_by_id(neutron_obj['id'])
self.assertNotEqual(
neutron_obj['revision_number'],
int(ovn_obj.external_ids[ovn_const.OVN_REV_NUM_EXT_ID_KEY]))
# Call the maintenance thread to fix the problem
self.maint.check_for_inconsistencies()
# Assert the old name doesn't exist anymore in the OVNDB. When
# the subnet's enable_dhcp's is set to False, OVN will remove the
# DHCP_Options entry related to that subnet.
self.assertIsNone(self._find_subnet_row_by_id(neutron_obj['id']))
# Re-enable the DHCP for the subnet and check if the maintenance
# thread will re-create it in OVN
with mock.patch.object(self._ovn_client, 'update_subnet'):
neutron_obj = self._update_subnet_enable_dhcp(
neutron_obj['id'], True)
# Assert the DHCP_Options still doesn't exist in OVNDB
self.assertIsNone(self._find_subnet_row_by_id(neutron_obj['id']))
# Call the maintenance thread to fix the problem
self.maint.check_for_inconsistencies()
# Assert the subnet is now in sync
ovn_obj = self._find_subnet_row_by_id(neutron_obj['id'])
self.assertEqual(
neutron_obj['revision_number'],
int(ovn_obj.external_ids[ovn_const.OVN_REV_NUM_EXT_ID_KEY]))
# > Delete
with mock.patch.object(self._ovn_client, 'delete_subnet'):
self._delete('subnets', neutron_obj['id'])
# Assert the subnet still exists in OVNDB
self.assertIsNotNone(self._find_subnet_row_by_id(neutron_obj['id']))
# Call the maintenance thread to fix the problem
self.maint.check_for_inconsistencies()
# Assert the subnet is now deleted from OVNDB
self.assertIsNone(self._find_subnet_row_by_id(neutron_obj['id']))
# Assert the revision number no longer exists
self.assertIsNone(db_rev.get_revision_row(
self.context,
neutron_obj['id']))
def test_router(self):
obj_name = 'routertest'
with mock.patch.object(self._l3_ovn_client, 'create_router'):
neutron_obj = self._create_router(obj_name)
# Assert the router doesn't exist in OVN
self.assertIsNone(self._find_router_row_by_name(obj_name))
# Call the maintenance thread to fix the problem
self.maint.check_for_inconsistencies()
# Assert the router was now created
ovn_obj = self._find_router_row_by_name(obj_name)
self.assertIsNotNone(ovn_obj)
self.assertEqual(
neutron_obj['revision_number'],
int(ovn_obj.external_ids[ovn_const.OVN_REV_NUM_EXT_ID_KEY]))
# > Update
new_obj_name = 'routertest_updated'
with mock.patch.object(self._l3_ovn_client, 'update_router'):
new_neutron_obj = self._update_router_name(neutron_obj['id'],
new_obj_name)
# Assert the revision numbers are out-of-sync
ovn_obj = self._find_router_row_by_name(obj_name)
self.assertNotEqual(
new_neutron_obj['revision_number'],
int(ovn_obj.external_ids[ovn_const.OVN_REV_NUM_EXT_ID_KEY]))
# Call the maintenance thread to fix the problem
self.maint.check_for_inconsistencies()
# Assert the old name doesn't exist anymore in the OVNDB
self.assertIsNone(self._find_router_row_by_name(obj_name))
# Assert the router is now in sync
ovn_obj = self._find_router_row_by_name(new_obj_name)
self.assertEqual(
new_neutron_obj['revision_number'],
int(ovn_obj.external_ids[ovn_const.OVN_REV_NUM_EXT_ID_KEY]))
# > Delete
with mock.patch.object(self._l3_ovn_client, 'delete_router'):
self._delete('routers', new_neutron_obj['id'])
# Assert the router still exists in OVNDB
self.assertIsNotNone(self._find_router_row_by_name(new_obj_name))
# Call the maintenance thread to fix the problem
self.maint.check_for_inconsistencies()
# Assert the router is now deleted from OVNDB
self.assertIsNone(self._find_router_row_by_name(new_obj_name))
# Assert the revision number no longer exists
self.assertIsNone(db_rev.get_revision_row(
self.context,
new_neutron_obj['id']))
def test_security_group(self):
with mock.patch.object(self._ovn_client, 'create_security_group'):
neutron_obj = self._create_security_group()
# Assert the sg doesn't exist in OVN
self.assertIsNone(
self._find_security_group_row_by_id(neutron_obj['id']))
# Call the maintenance thread to fix the problem
self.maint.check_for_inconsistencies()
# Assert the sg was now created. We don't save the revision number
# in the Security Group because OVN doesn't support updating it,
# all we care about is whether it exists or not.
self.assertIsNotNone(
self._find_security_group_row_by_id(neutron_obj['id']))
# > Delete
with mock.patch.object(self._ovn_client, 'delete_security_group'):
self._delete('security-groups', neutron_obj['id'])
# Assert the sg still exists in OVNDB
self.assertIsNotNone(
self._find_security_group_row_by_id(neutron_obj['id']))
# Call the maintenance thread to fix the problem
self.maint.check_for_inconsistencies()
# Assert the sg is now deleted from OVNDB
self.assertIsNone(
self._find_security_group_row_by_id(neutron_obj['id']))
# Assert the revision number no longer exists
self.assertIsNone(db_rev.get_revision_row(
self.context,
neutron_obj['id']))
def test_security_group_rule(self):
neutron_sg = self._create_security_group()
neutron_net = self._create_network('network1')
self._create_port('portsgtest', neutron_net['id'],
security_groups=[neutron_sg['id']])
with mock.patch.object(self._ovn_client, 'create_security_group_rule'):
neutron_obj = self._create_security_group_rule(neutron_sg['id'])
# Assert the sg rule doesn't exist in OVN
self.assertIsNone(
self._find_security_group_rule_row_by_id(neutron_obj['id']))
# Call the maintenance thread to fix the problem
self.maint.check_for_inconsistencies()
# Assert the sg rule was now created. We don't save the revision number
# in the Security Group because OVN doesn't support updating it,
# all we care about is whether it exists or not.
self.assertIsNotNone(
self._find_security_group_rule_row_by_id(neutron_obj['id']))
# > Delete
# FIXME(lucasagomes): Maintenance thread fixing deleted
# security group rules is currently broken due to:
# https://bugs.launchpad.net/networking-ovn/+bug/1756123
def test_router_port(self):
neutron_net = self._create_network('networktest', external=True)
neutron_subnet = self._create_subnet('subnettest', neutron_net['id'])
neutron_router = self._create_router('routertest')
with mock.patch.object(self._l3_ovn_client, 'create_router_port'):
with mock.patch('neutron.db.ovn_revision_numbers_db.'
'bump_revision'):
neutron_obj = self._add_router_interface(neutron_router['id'],
neutron_subnet['id'])
# Assert the router port doesn't exist in OVN
self.assertIsNone(
self._find_router_port_row_by_port_id(neutron_obj['port_id']))
# Call the maintenance thread to fix the problem
self.maint.check_for_inconsistencies()
# Assert the router port was now created
self.assertIsNotNone(
self._find_router_port_row_by_port_id(neutron_obj['port_id']))
# > Delete
with mock.patch.object(self._l3_ovn_client, 'delete_router_port'):
self._remove_router_interface(neutron_router['id'],
neutron_subnet['id'])
# Assert the router port still exists in OVNDB
self.assertIsNotNone(
self._find_router_port_row_by_port_id(neutron_obj['port_id']))
# Call the maintenance thread to fix the problem
self.maint.check_for_inconsistencies()
# Assert the router port is now deleted from OVNDB
self.assertIsNone(
self._find_router_port_row_by_port_id(neutron_obj['port_id']))
# Assert the revision number no longer exists
self.assertIsNone(db_rev.get_revision_row(
self.context,
neutron_obj['port_id']))
def test_check_metadata_ports(self):
ovn_config.cfg.CONF.set_override('ovn_metadata_enabled', True,
group='ovn')
neutron_net = self._create_network('network1')
metadata_port = self._ovn_client._find_metadata_port(
self.context, neutron_net['id'])
# Assert the metadata port exists
self.assertIsNotNone(metadata_port)
# Delete the metadata port
self._delete('ports', metadata_port['id'])
metadata_port = self._ovn_client._find_metadata_port(
self.context, neutron_net['id'])
# Assert the metadata port is gone
self.assertIsNone(metadata_port)
# Call the maintenance thread to fix the problem, it will raise
# NeverAgain so that the job only runs once at startup
self.assertRaises(periodics.NeverAgain,
self.maint.check_metadata_ports)
metadata_port = self._ovn_client._find_metadata_port(
self.context, neutron_net['id'])
# Assert the metadata port was re-created
self.assertIsNotNone(metadata_port)
def test_check_metadata_ports_not_enabled(self):
ovn_config.cfg.CONF.set_override('ovn_metadata_enabled', False,
group='ovn')
with mock.patch.object(self._ovn_client,
'create_metadata_port') as mock_create_port:
self.assertRaises(periodics.NeverAgain,
self.maint.check_metadata_ports)
# Assert create_metadata_port() wasn't called since metadata
# is not enabled
self.assertFalse(mock_create_port.called)
def test_check_for_port_security_unknown_address(self):
neutron_net = self._create_network('network1')
neutron_port = self._create_port('port1', neutron_net['id'])
# Let's force disabling port security for the LSP
self.nb_api.lsp_set_port_security(neutron_port['id'], []).execute(
check_error=True)
ovn_port = self.nb_api.db_find(
'Logical_Switch_Port', ('name', '=', neutron_port['id'])).execute(
check_error=True)[0]
# Assert that port security is now disabled but the 'unknown'
# is not set in the addresses column
self.assertFalse(ovn_port['port_security'])
self.assertNotIn('unknown', ovn_port['addresses'])
# Call the maintenance task to fix the problem. Note that
# NeverAgain is raised so it only runs once at start up
self.assertRaises(periodics.NeverAgain,
self.maint.check_for_port_security_unknown_address)
ovn_port = self.nb_api.db_find(
'Logical_Switch_Port', ('name', '=', neutron_port['id'])).execute(
check_error=True)[0]
# Assert that 'unknown' was set in the addresses column for
# the port
self.assertFalse(ovn_port['port_security'])
self.assertIn('unknown', ovn_port['addresses'])
# Now the other way around, let's set port_security in the OVN
# table while the 'unknown' address is set in the addresses column
self.nb_api.lsp_set_port_security(
neutron_port['id'], ovn_port['addresses']).execute(
check_error=True)
ovn_port = self.nb_api.db_find(
'Logical_Switch_Port', ('name', '=', neutron_port['id'])).execute(
check_error=True)[0]
self.assertTrue(ovn_port['port_security'])
self.assertIn('unknown', ovn_port['addresses'])
# Call the maintenance task to fix the problem. Note that
# NeverAgain is raised so it only runs once at start up
self.assertRaises(periodics.NeverAgain,
self.maint.check_for_port_security_unknown_address)
ovn_port = self.nb_api.db_find(
'Logical_Switch_Port', ('name', '=', neutron_port['id'])).execute(
check_error=True)[0]
# Assert that 'unknown' was removed from the addresses column
# for the port
self.assertTrue(ovn_port['port_security'])
self.assertNotIn('unknown', ovn_port['addresses'])

View File

@ -0,0 +1,432 @@
# Copyright 2016 Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import functools
import mock
from oslo_config import cfg
from oslo_utils import uuidutils
from neutron.common.ovn import constants as ovn_const
from neutron.common.ovn import utils
from neutron.common import utils as n_utils
from neutron.db import ovn_revision_numbers_db as db_rev
from neutron.tests.functional import base
class TestPortBinding(base.TestOVNFunctionalBase):
def setUp(self):
super(TestPortBinding, self).setUp()
self.ovs_host = 'ovs-host'
self.dpdk_host = 'dpdk-host'
self.invalid_dpdk_host = 'invalid-host'
self.vhu_mode = 'server'
self.add_fake_chassis(self.ovs_host)
self.add_fake_chassis(
self.dpdk_host,
external_ids={'datapath-type': 'netdev',
'iface-types': 'dummy,dummy-internal,dpdkvhostuser'})
self.add_fake_chassis(
self.invalid_dpdk_host,
external_ids={'datapath-type': 'netdev',
'iface-types': 'dummy,dummy-internal,geneve,vxlan'})
self.n1 = self._make_network(self.fmt, 'n1', True)
res = self._create_subnet(self.fmt, self.n1['network']['id'],
'10.0.0.0/24')
self.deserialize(self.fmt, res)
def _create_or_update_port(self, port_id=None, hostname=None):
if port_id is None:
port_data = {
'port': {'network_id': self.n1['network']['id'],
'tenant_id': self._tenant_id}}
if hostname:
port_data['port']['device_id'] = uuidutils.generate_uuid()
port_data['port']['device_owner'] = 'compute:None'
port_data['port']['binding:host_id'] = hostname
port_req = self.new_create_request('ports', port_data, self.fmt)
port_res = port_req.get_response(self.api)
p = self.deserialize(self.fmt, port_res)
port_id = p['port']['id']
else:
port_data = {
'port': {'device_id': uuidutils.generate_uuid(),
'device_owner': 'compute:None',
'binding:host_id': hostname}}
port_req = self.new_update_request('ports', port_data, port_id,
self.fmt)
port_res = port_req.get_response(self.api)
self.deserialize(self.fmt, port_res)
return port_id
def _verify_vif_details(self, port_id, expected_host_name,
expected_vif_type, expected_vif_details):
port_req = self.new_show_request('ports', port_id)
port_res = port_req.get_response(self.api)
p = self.deserialize(self.fmt, port_res)
self.assertEqual(expected_host_name, p['port']['binding:host_id'])
self.assertEqual(expected_vif_type, p['port']['binding:vif_type'])
self.assertEqual(expected_vif_details,
p['port']['binding:vif_details'])
def test_port_binding_create_port(self):
port_id = self._create_or_update_port(hostname=self.ovs_host)
self._verify_vif_details(port_id, self.ovs_host, 'ovs',
{'port_filter': True})
port_id = self._create_or_update_port(hostname=self.dpdk_host)
expected_vif_details = {'port_filter': False,
'vhostuser_mode': self.vhu_mode,
'vhostuser_ovs_plug': True}
expected_vif_details['vhostuser_socket'] = (
utils.ovn_vhu_sockpath(cfg.CONF.ovn.vhost_sock_dir, port_id))
self._verify_vif_details(port_id, self.dpdk_host, 'vhostuser',
expected_vif_details)
port_id = self._create_or_update_port(hostname=self.invalid_dpdk_host)
self._verify_vif_details(port_id, self.invalid_dpdk_host, 'ovs',
{'port_filter': True})
def test_port_binding_update_port(self):
port_id = self._create_or_update_port()
self._verify_vif_details(port_id, '', 'unbound', {})
port_id = self._create_or_update_port(port_id=port_id,
hostname=self.ovs_host)
self._verify_vif_details(port_id, self.ovs_host, 'ovs',
{'port_filter': True})
port_id = self._create_or_update_port(port_id=port_id,
hostname=self.dpdk_host)
expected_vif_details = {'port_filter': False,
'vhostuser_mode': self.vhu_mode,
'vhostuser_ovs_plug': True}
expected_vif_details['vhostuser_socket'] = (
utils.ovn_vhu_sockpath(cfg.CONF.ovn.vhost_sock_dir, port_id))
self._verify_vif_details(port_id, self.dpdk_host, 'vhostuser',
expected_vif_details)
port_id = self._create_or_update_port(port_id=port_id,
hostname=self.invalid_dpdk_host)
self._verify_vif_details(port_id, self.invalid_dpdk_host, 'ovs',
{'port_filter': True})
class TestPortBindingOverTcp(TestPortBinding):
def get_ovsdb_server_protocol(self):
return 'tcp'
# TODO(mjozefcz): This test class hangs during execution.
class TestPortBindingOverSsl(TestPortBinding):
def get_ovsdb_server_protocol(self):
return 'ssl'
class TestNetworkMTUUpdate(base.TestOVNFunctionalBase):
def setUp(self):
super(TestNetworkMTUUpdate, self).setUp()
self._ovn_client = self.mech_driver._ovn_client
self.n1 = self._make_network(self.fmt, 'n1', True)
res = self._create_subnet(self.fmt, self.n1['network']['id'],
'10.0.0.0/24')
self.sub = self.deserialize(self.fmt, res)
def test_update_network_mtu(self):
mtu_value = self.n1['network']['mtu'] - 100
dhcp_options = (
self.mech_driver._ovn_client._nb_idl.get_subnet_dhcp_options(
self.sub['subnet']['id'])
)
self.assertNotEqual(
int(dhcp_options['subnet']['options']['mtu']),
mtu_value)
data = {'network': {'mtu': mtu_value}}
req = self.new_update_request(
'networks', data, self.n1['network']['id'], self.fmt)
req.get_response(self.api)
dhcp_options = (
self.mech_driver._ovn_client._nb_idl.get_subnet_dhcp_options(
self.sub['subnet']['id'])
)
self.assertEqual(
int(dhcp_options['subnet']['options']['mtu']),
mtu_value)
def test_no_update_network_mtu(self):
mtu_value = self.n1['network']['mtu']
base_revision = db_rev.get_revision_row(
self.context,
self.sub['subnet']['id'])
data = {'network': {'mtu': mtu_value}}
req = self.new_update_request(
'networks', data, self.n1['network']['id'], self.fmt)
req.get_response(self.api)
second_revision = db_rev.get_revision_row(
self.context,
self.sub['subnet']['id'])
self.assertEqual(
base_revision.updated_at,
second_revision.updated_at)
@mock.patch('neutron.plugins.ml2.drivers.ovn.mech_driver.'
'ovsdb.ovn_client.OVNClient._is_virtual_port_supported',
lambda *args: True)
class TestVirtualPorts(base.TestOVNFunctionalBase):
def setUp(self):
super(TestVirtualPorts, self).setUp()
self._ovn_client = self.mech_driver._ovn_client
self.n1 = self._make_network(self.fmt, 'n1', True)
res = self._create_subnet(self.fmt, self.n1['network']['id'],
'10.0.0.0/24')
self.sub = self.deserialize(self.fmt, res)
def _create_port(self, fixed_ip=None, allowed_address=None):
port_data = {
'port': {'network_id': self.n1['network']['id'],
'tenant_id': self._tenant_id}}
if fixed_ip:
port_data['port']['fixed_ips'] = [{'ip_address': fixed_ip}]
if allowed_address:
port_data['port']['allowed_address_pairs'] = [
{'ip_address': allowed_address}]
port_req = self.new_create_request('ports', port_data, self.fmt)
port_res = port_req.get_response(self.api)
self.assertEqual(201, port_res.status_int)
return self.deserialize(self.fmt, port_res)['port']
def _update_allowed_address_pair(self, port_id, data):
port_data = {
'port': {'allowed_address_pairs': data}}
port_req = self.new_update_request('ports', port_data, port_id,
self.fmt)
port_res = port_req.get_response(self.api)
self.assertEqual(200, port_res.status_int)
return self.deserialize(self.fmt, port_res)['port']
def _set_allowed_address_pair(self, port_id, ip):
return self._update_allowed_address_pair(port_id, [{'ip_address': ip}])
def _unset_allowed_address_pair(self, port_id):
return self._update_allowed_address_pair(port_id, [])
def _find_port_row(self, port_id):
for row in self.nb_api._tables['Logical_Switch_Port'].rows.values():
if row.name == port_id:
return row
def _is_ovn_port_type(self, port_id, port_type):
ovn_vport = self._find_port_row(port_id)
return port_type == ovn_vport.type
def _check_port_type(self, port_id, type):
check = functools.partial(self._is_ovn_port_type, port_id, type)
n_utils.wait_until_true(check, timeout=10)
def test_virtual_port_created_before(self):
virt_port = self._create_port()
virt_ip = virt_port['fixed_ips'][0]['ip_address']
# Create the master port with the VIP address already set in
# the allowed_address_pairs field
master = self._create_port(allowed_address=virt_ip)
# Assert the virt port has the type virtual and master is set
# as parent
self._check_port_type(virt_port['id'], ovn_const.LSP_TYPE_VIRTUAL)
ovn_vport = self._find_port_row(virt_port['id'])
self.assertEqual(
virt_ip,
ovn_vport.options[ovn_const.LSP_OPTIONS_VIRTUAL_IP_KEY])
self.assertEqual(
master['id'],
ovn_vport.options[ovn_const.LSP_OPTIONS_VIRTUAL_PARENTS_KEY])
# Create the backport parent port
backup = self._create_port(allowed_address=virt_ip)
# Assert the virt port now also includes the backup port as a parent
self._check_port_type(virt_port['id'], ovn_const.LSP_TYPE_VIRTUAL)
ovn_vport = self._find_port_row(virt_port['id'])
self.assertEqual(
virt_ip,
ovn_vport.options[ovn_const.LSP_OPTIONS_VIRTUAL_IP_KEY])
self.assertIn(
master['id'],
ovn_vport.options[ovn_const.LSP_OPTIONS_VIRTUAL_PARENTS_KEY])
self.assertIn(
backup['id'],
ovn_vport.options[ovn_const.LSP_OPTIONS_VIRTUAL_PARENTS_KEY])
def test_virtual_port_update_address_pairs(self):
master = self._create_port()
backup = self._create_port()
virt_port = self._create_port()
virt_ip = virt_port['fixed_ips'][0]['ip_address']
# Assert the virt port does not yet have the type virtual (no
# address pairs were set yet)
self._check_port_type(virt_port['id'], ''),
ovn_vport = self._find_port_row(virt_port['id'])
self.assertNotIn(ovn_const.LSP_OPTIONS_VIRTUAL_PARENTS_KEY,
ovn_vport.options)
self.assertNotIn(ovn_const.LSP_OPTIONS_VIRTUAL_IP_KEY,
ovn_vport.options)
# Set the virt IP to the allowed address pairs of the master port
self._set_allowed_address_pair(master['id'], virt_ip)
# Assert the virt port is now updated
self._check_port_type(virt_port['id'], ovn_const.LSP_TYPE_VIRTUAL),
ovn_vport = self._find_port_row(virt_port['id'])
self.assertEqual(
virt_ip,
ovn_vport.options[ovn_const.LSP_OPTIONS_VIRTUAL_IP_KEY])
self.assertEqual(
master['id'],
ovn_vport.options[ovn_const.LSP_OPTIONS_VIRTUAL_PARENTS_KEY])
# Set the virt IP to the allowed address pairs of the backup port
self._set_allowed_address_pair(backup['id'], virt_ip)
# Assert the virt port now includes the backup port as a parent
self._check_port_type(virt_port['id'], ovn_const.LSP_TYPE_VIRTUAL),
ovn_vport = self._find_port_row(virt_port['id'])
self.assertEqual(
virt_ip,
ovn_vport.options[ovn_const.LSP_OPTIONS_VIRTUAL_IP_KEY])
self.assertIn(
master['id'],
ovn_vport.options[ovn_const.LSP_OPTIONS_VIRTUAL_PARENTS_KEY])
self.assertIn(
backup['id'],
ovn_vport.options[ovn_const.LSP_OPTIONS_VIRTUAL_PARENTS_KEY])
# Remove the address pairs from the master port
self._unset_allowed_address_pair(master['id'])
# Assert the virt port now only has the backup port as a parent
self._check_port_type(virt_port['id'], ovn_const.LSP_TYPE_VIRTUAL),
ovn_vport = self._find_port_row(virt_port['id'])
self.assertEqual(
virt_ip,
ovn_vport.options[ovn_const.LSP_OPTIONS_VIRTUAL_IP_KEY])
self.assertEqual(
backup['id'],
ovn_vport.options[ovn_const.LSP_OPTIONS_VIRTUAL_PARENTS_KEY])
# Remove the address pairs from the backup port
self._unset_allowed_address_pair(backup['id'])
# Assert the virt port is not type virtual anymore and the virtual
# port options are cleared
self._check_port_type(virt_port['id'], ''),
ovn_vport = self._find_port_row(virt_port['id'])
self.assertNotIn(ovn_const.LSP_OPTIONS_VIRTUAL_PARENTS_KEY,
ovn_vport.options)
self.assertNotIn(ovn_const.LSP_OPTIONS_VIRTUAL_IP_KEY,
ovn_vport.options)
def test_virtual_port_created_after(self):
master = self._create_port(fixed_ip='10.0.0.11')
backup = self._create_port(fixed_ip='10.0.0.12')
virt_ip = '10.0.0.55'
# Set the virt IP to the master and backup ports *before* creating
# the virtual port
self._set_allowed_address_pair(master['id'], virt_ip)
self._set_allowed_address_pair(backup['id'], virt_ip)
virt_port = self._create_port(fixed_ip=virt_ip)
# Assert the virtual port has been created with the
# right type and parents
ovn_vport = self._find_port_row(virt_port['id'])
self.assertEqual(ovn_const.LSP_TYPE_VIRTUAL, ovn_vport.type)
self.assertEqual(
virt_ip,
ovn_vport.options[ovn_const.LSP_OPTIONS_VIRTUAL_IP_KEY])
self.assertIn(
master['id'],
ovn_vport.options[ovn_const.LSP_OPTIONS_VIRTUAL_PARENTS_KEY])
self.assertIn(
backup['id'],
ovn_vport.options[ovn_const.LSP_OPTIONS_VIRTUAL_PARENTS_KEY])
def test_virtual_port_delete_parents(self):
master = self._create_port()
backup = self._create_port()
virt_port = self._create_port()
virt_ip = virt_port['fixed_ips'][0]['ip_address']
# Assert the virt port does not yet have the type virtual (no
# address pairs were set yet)
ovn_vport = self._find_port_row(virt_port['id'])
self.assertEqual("", ovn_vport.type)
self.assertNotIn(ovn_const.LSP_OPTIONS_VIRTUAL_PARENTS_KEY,
ovn_vport.options)
self.assertNotIn(ovn_const.LSP_OPTIONS_VIRTUAL_IP_KEY,
ovn_vport.options)
# Set allowed address paris to the master and backup ports
self._set_allowed_address_pair(master['id'], virt_ip)
self._set_allowed_address_pair(backup['id'], virt_ip)
# Assert the virtual port is correct
ovn_vport = self._find_port_row(virt_port['id'])
self.assertEqual(ovn_const.LSP_TYPE_VIRTUAL, ovn_vport.type)
self.assertEqual(
virt_ip,
ovn_vport.options[ovn_const.LSP_OPTIONS_VIRTUAL_IP_KEY])
self.assertIn(
master['id'],
ovn_vport.options[ovn_const.LSP_OPTIONS_VIRTUAL_PARENTS_KEY])
self.assertIn(
backup['id'],
ovn_vport.options[ovn_const.LSP_OPTIONS_VIRTUAL_PARENTS_KEY])
# Delete the backup port
self._delete('ports', backup['id'])
# Assert the virt port now only has the master port as a parent
ovn_vport = self._find_port_row(virt_port['id'])
self.assertEqual(ovn_const.LSP_TYPE_VIRTUAL, ovn_vport.type)
self.assertEqual(
virt_ip,
ovn_vport.options[ovn_const.LSP_OPTIONS_VIRTUAL_IP_KEY])
self.assertEqual(
master['id'],
ovn_vport.options[ovn_const.LSP_OPTIONS_VIRTUAL_PARENTS_KEY])
# Delete the master port
self._delete('ports', master['id'])
# Assert the virt port is not type virtual anymore and the virtual
# port options are cleared
ovn_vport = self._find_port_row(virt_port['id'])
self.assertEqual("", ovn_vport.type)
self.assertNotIn(ovn_const.LSP_OPTIONS_VIRTUAL_PARENTS_KEY,
ovn_vport.options)
self.assertNotIn(ovn_const.LSP_OPTIONS_VIRTUAL_IP_KEY,
ovn_vport.options)

View File

@ -5,3 +5,5 @@
# process, which may cause wedges in the gate later.
psycopg2
psutil>=1.1.1,<3.2.2
PyMySQL>=0.6.2 # MIT License

View File

@ -0,0 +1,65 @@
# Copyright 2019 Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import collections
import threading
from ovsdbapp.backend.ovs_idl import event
from ovsdbapp.tests.functional.schema.ovn_southbound import event as test_event
class WaitForCrLrpPortBindingEvent(event.RowEvent):
event_name = 'WaitForCrLrpPortBindingEvent'
PREFIX = 'cr-lrp-'
TABLE = 'Port_Binding'
def __init__(self, timeout=5):
self.logical_port_events = collections.defaultdict(threading.Event)
self.timeout = timeout
super(WaitForCrLrpPortBindingEvent, self).__init__(
(self.ROW_CREATE,), 'Port_Binding', None)
def match_fn(self, event, row, old=None):
return row.logical_port.startswith(self.PREFIX)
def run(self, event, row, old):
self.logical_port_events[row.logical_port].set()
def wait(self, logical_port_name):
wait_val = self.logical_port_events[logical_port_name].wait(
self.timeout)
del self.logical_port_events[logical_port_name]
return wait_val
class WaitForCreatePortBindingEvent(test_event.WaitForPortBindingEvent):
event_name = 'WaitForCreatePortBindingEvent'
def run(self, event, row, old):
self.row = row
super(WaitForCreatePortBindingEvent, self).run(event, row, old)
class WaitForUpdatePortBindingEvent(test_event.WaitForPortBindingEvent):
event_name = 'WaitForUpdatePortBindingEvent'
def __init__(self, port, mac, timeout=5):
# Call the super of the superclass to avoid passing CREATE event type
# to the superclass.
super(test_event.WaitForPortBindingEvent, self).__init__(
(self.ROW_UPDATE,),
'Port_Binding',
(('logical_port', '=', port),
('mac', '=', mac)),
timeout=timeout)

View File

@ -0,0 +1,33 @@
# Copyright 2020 Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import fixtures
from ovsdbapp.backend.ovs_idl import connection
from neutron.plugins.ml2.drivers.ovn.mech_driver.ovsdb import ovsdb_monitor
class OVNIdlConnectionFixture(fixtures.Fixture):
def __init__(self, idl=None, constr=None, schema=None, timeout=60):
self.idl = idl or ovsdb_monitor.BaseOvnIdl.from_server(
constr, schema)
self.connection = connection.Connection(
idl=self.idl, timeout=timeout)
def _setUp(self):
self.addCleanup(self.stop)
self.connection.start()
def stop(self):
self.connection.stop()

View File

@ -0,0 +1,240 @@
# Copyright 2016 Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from distutils import spawn
import os
import fixtures
import psutil
import tenacity
from neutron.agent.linux import utils
class DaemonProcessFixture(fixtures.Fixture):
def __init__(self, temp_dir):
super(DaemonProcessFixture, self).__init__()
self.temp_dir = temp_dir
def _get_pid_from_pidfile(self, pidfile):
with open(os.path.join(self.temp_dir, pidfile), 'r') as pidfile_f:
pid = pidfile_f.read().strip()
try:
return int(pid)
except ValueError:
raise RuntimeError(
"Pidfile %(pidfile)s contains %(pid)s that "
"is not a pid" % {'pidfile': pidfile, 'pid': pid}
)
class OvnNorthd(DaemonProcessFixture):
def __init__(self, temp_dir, ovn_nb_db, ovn_sb_db, protocol='unix',
debug=True):
super(OvnNorthd, self).__init__(temp_dir)
self.ovn_nb_db = ovn_nb_db
self.ovn_sb_db = ovn_sb_db
self.protocol = protocol
self.unixctl_path = os.path.join(self.temp_dir, 'ovn_northd.ctl')
self.log_file_path = os.path.join(self.temp_dir, 'ovn_northd.log')
self.debug = debug
if self.protocol == 'ssl':
self.private_key = os.path.join(self.temp_dir, 'ovn-privkey.pem')
self.certificate = os.path.join(self.temp_dir, 'ovn-cert.pem')
self.ca_cert = os.path.join(self.temp_dir, 'controllerca',
'cacert.pem')
def _setUp(self):
self.addCleanup(self.stop)
self.start()
def start(self):
# start the ovn-northd
ovn_northd_cmd = [
spawn.find_executable('ovn-northd'), '-vconsole:off',
'--detach',
'--ovnnb-db=%s' % self.ovn_nb_db,
'--ovnsb-db=%s' % self.ovn_sb_db,
'--no-chdir',
'--unixctl=%s' % self.unixctl_path,
'--log-file=%s' % (self.log_file_path)]
if self.protocol == 'ssl':
ovn_northd_cmd.append('--private-key=%s' % self.private_key)
ovn_northd_cmd.append('--certificate=%s' % self.certificate)
ovn_northd_cmd.append('--ca-cert=%s' % self.ca_cert)
if self.debug:
ovn_northd_cmd.append('--verbose')
obj, _ = utils.create_process(ovn_northd_cmd)
obj.communicate()
def stop(self):
try:
stop_cmd = ['ovs-appctl', '-t', self.unixctl_path, 'exit']
utils.execute(stop_cmd)
except Exception:
pass
class OvsdbServer(DaemonProcessFixture):
def __init__(self, temp_dir, ovs_dir, ovn_nb_db=True, ovn_sb_db=False,
protocol='unix', debug=True):
super(OvsdbServer, self).__init__(temp_dir)
self.ovs_dir = ovs_dir
self.ovn_nb_db = ovn_nb_db
self.ovn_sb_db = ovn_sb_db
# The value of the protocol must be unix or tcp or ssl
self.protocol = protocol
self.ovsdb_server_processes = []
self.private_key = os.path.join(self.temp_dir, 'ovn-privkey.pem')
self.certificate = os.path.join(self.temp_dir, 'ovn-cert.pem')
self.ca_cert = os.path.join(self.temp_dir, 'controllerca',
'cacert.pem')
self.debug = debug
def _setUp(self):
if self.ovn_nb_db:
self.ovsdb_server_processes.append(
{'db_path': os.path.join(self.temp_dir, 'ovn_nb.db'),
'schema_path': os.path.join(self.ovs_dir, 'ovn-nb.ovsschema'),
'remote_path': os.path.join(self.temp_dir, 'ovnnb_db.sock'),
'protocol': self.protocol,
'remote_ip': '127.0.0.1',
'remote_port': '0',
'pidfile': 'ovn-nb.pid',
'unixctl_path': os.path.join(self.temp_dir, 'ovnnb_db.ctl'),
'log_file_path': os.path.join(self.temp_dir, 'ovn_nb.log'),
'db_type': 'nb',
'connection': 'db:OVN_Northbound,NB_Global,connections',
'ctl_cmd': 'ovn-nbctl'})
if self.ovn_sb_db:
self.ovsdb_server_processes.append(
{'db_path': os.path.join(self.temp_dir, 'ovn_sb.db'),
'schema_path': os.path.join(self.ovs_dir, 'ovn-sb.ovsschema'),
'remote_path': os.path.join(self.temp_dir, 'ovnsb_db.sock'),
'protocol': self.protocol,
'remote_ip': '127.0.0.1',
'remote_port': '0',
'pidfile': 'ovn-sb.pid',
'unixctl_path': os.path.join(self.temp_dir, 'ovnsb_db.ctl'),
'log_file_path': os.path.join(self.temp_dir, 'ovn_sb.log'),
'db_type': 'sb',
'connection': 'db:OVN_Southbound,SB_Global,connections',
'ctl_cmd': 'ovn-sbctl'})
self.addCleanup(self.stop)
self.start()
def _init_ovsdb_pki(self):
os.chdir(self.temp_dir)
pki_init_cmd = [spawn.find_executable('ovs-pki'), 'init',
'-d', self.temp_dir, '-l',
os.path.join(self.temp_dir, 'pki.log'), '--force']
utils.execute(pki_init_cmd)
pki_req_sign = [spawn.find_executable('ovs-pki'), 'req+sign', 'ovn',
'controller', '-d', self.temp_dir, '-l',
os.path.join(self.temp_dir, 'pki.log'), '--force']
utils.execute(pki_req_sign)
def delete_dbs(self):
for ovsdb in self.ovsdb_server_processes:
try:
os.remove(ovsdb['db_path'])
except OSError:
pass
def start(self):
pki_done = False
for ovsdb_process in self.ovsdb_server_processes:
# create the db from the schema using ovsdb-tool
ovsdb_tool_cmd = [spawn.find_executable('ovsdb-tool'),
'create', ovsdb_process['db_path'],
ovsdb_process['schema_path']]
utils.execute(ovsdb_tool_cmd)
# start the ovsdb-server
ovsdb_server_cmd = [
spawn.find_executable('ovsdb-server'), '-vconsole:off',
'--detach',
'--pidfile=%s' % os.path.join(
self.temp_dir, ovsdb_process['pidfile']),
'--log-file=%s' % (ovsdb_process['log_file_path']),
'--remote=punix:%s' % (ovsdb_process['remote_path']),
'--remote=%s' % (ovsdb_process['connection']),
'--unixctl=%s' % (ovsdb_process['unixctl_path']),
'--detach']
if ovsdb_process['protocol'] == 'ssl':
if not pki_done:
pki_done = True
self._init_ovsdb_pki()
ovsdb_server_cmd.append('--private-key=%s' % self.private_key)
ovsdb_server_cmd.append('--certificate=%s' % self.certificate)
ovsdb_server_cmd.append('--ca-cert=%s' % self.ca_cert)
ovsdb_server_cmd.append(ovsdb_process['db_path'])
if self.debug:
ovsdb_server_cmd.append('--verbose')
obj, _ = utils.create_process(ovsdb_server_cmd)
obj.communicate()
conn_cmd = [spawn.find_executable(ovsdb_process['ctl_cmd']),
'--db=unix:%s' % ovsdb_process['remote_path'],
'set-connection',
'p%s:%s:%s' % (ovsdb_process['protocol'],
ovsdb_process['remote_port'],
ovsdb_process['remote_ip']),
'--', 'set', 'connection', '.',
'inactivity_probe=60000']
@tenacity.retry(wait=tenacity.wait_exponential(multiplier=0.1),
stop=tenacity.stop_after_delay(3), reraise=True)
def _set_connection():
utils.execute(conn_cmd)
@tenacity.retry(
wait=tenacity.wait_exponential(multiplier=0.1),
stop=tenacity.stop_after_delay(10),
reraise=True)
def get_ovsdb_remote_port_retry(pid):
process = psutil.Process(pid)
for connect in process.connections():
if connect.status == 'LISTEN':
return connect.laddr[1]
raise Exception(_("Could not find LISTEN port."))
if ovsdb_process['protocol'] != 'unix':
_set_connection()
pid = self._get_pid_from_pidfile(ovsdb_process['pidfile'])
ovsdb_process['remote_port'] = \
get_ovsdb_remote_port_retry(pid)
def stop(self):
for ovsdb_process in self.ovsdb_server_processes:
try:
stop_cmd = ['ovs-appctl', '-t', ovsdb_process['unixctl_path'],
'exit']
utils.execute(stop_cmd)
except Exception:
pass
def get_ovsdb_connection_path(self, db_type='nb'):
for ovsdb_process in self.ovsdb_server_processes:
if ovsdb_process['db_type'] == db_type:
if ovsdb_process['protocol'] == 'unix':
return 'unix:' + ovsdb_process['remote_path']
else:
return '%s:%s:%s' % (ovsdb_process['protocol'],
ovsdb_process['remote_ip'],
ovsdb_process['remote_port'])

View File

@ -574,7 +574,8 @@ class NeutronDbPluginV2TestCase(testlib_api.WebTestCase):
return self.deserialize(fmt, res)
def _api_for_resource(self, resource):
if resource in ['networks', 'subnets', 'ports', 'subnetpools']:
if resource in ['networks', 'subnets', 'ports', 'subnetpools',
'security-groups']:
return self.api
else:
return self.ext_api