[OVN] Add OVN functional tests - part 1

This patch adds functional tests for:
  * base
  * ovn metadata agent
  * ovn mechanism driver
  * impl_idl
  * maintenance task
and needed resources to run the tests.

Previous paths in networking-ovn tree:
./networking_ovn/tests/functional/base.py ->
  ./neutron/tests/functional/base.py
./networking_ovn/tests/functional/resources ->
  ./neutron/tests/functional/resources
./networking_ovn/tests/functional/test_metadata_agent.py ->
  ./neutron/tests/functional/agent/ovn/metadata/test_metadata_agent.py
./networking_ovn/tests/functional/test_mech_driver.py ->
  ./neutron/tests/functional/plugins/ml2/drivers/ovn/mech_driver/test_mech_driver.py
./networking_ovn/tests/functional/test_impl_idl.py ->
  ./neutron/tests/functional/plugins/ml2/drivers/ovn/mech_driver/ovsdb/test_impl_idl.py
./networking_ovn/tests/functional/test_maintenance.py ->
  ./neutron/tests/functional/plugins/ml2/drivers/ovn/mech_driver/ovsdb/test_maintenance.py

Co-Authored-By: Amitabha Biswas <abiswas@us.ibm.com>
Co-Authored-By: Andrew Austin <aaustin@redhat.com>
Co-Authored-By: Armando Migliaccio <armamig@gmail.com>
Co-Authored-By: Boden R <bodenvmw@gmail.com>
Co-Authored-By: Brian Haley <bhaley@redhat.com>
Co-Authored-By: Cao Xuan Hoang <hoangcx@vn.fujitsu.com>
Co-Authored-By: Daniel Alvarez <dalvarez@redhat.com>
Co-Authored-By: Dong Jun <dongj@dtdream.com>
Co-Authored-By: Doug Wiegley <dougw@a10networks.com>
Co-Authored-By: Flavio Fernandes <flaviof@redhat.com>
Co-Authored-By: Gary Kotton <gkotton@vmware.com>
Co-Authored-By: Guoshuai Li <ligs@dtdream.com>
Co-Authored-By: Hong Hui Xiao <honghui_xiao@yeah.net>
Co-Authored-By: Ihar Hrachyshka <ihrachys@redhat.com>
Co-Authored-By: Jakub Libosvar <libosvar@redhat.com>
Co-Authored-By: Kamil Sambor <ksambor@redhat.com>
Co-Authored-By: Lucas Alvares Gomes <lucasagomes@gmail.com>
Co-Authored-By: Numan Siddique <nusiddiq@redhat.com>
Co-Authored-By: Reedip <rbanerje@redhat.com>
Co-Authored-By: Richard Theis <rtheis@us.ibm.com>
Co-Authored-By: Rodolfo Alonso Hernandez <ralonsoh@redhat.com>
Co-Authored-By: Terry Wilson <twilson@redhat.com>
Co-Authored-By: bailinzhang <zhang.bailin@zte.com.cn>
Co-Authored-By: reedip <rbanerje@redhat.com>
Co-Authored-By: venkata anil <anilvenkata@redhat.com>

Depends-On: https://review.opendev.org/#/c/703883/

Change-Id: Ic45ab26c28fdb757d1ee8a9e0c774a1543fb4bff
Related-Blueprint: neutron-ovn-merge
This commit is contained in:
Maciej Józefczyk 2020-01-09 13:25:34 +00:00
parent 98266664c8
commit 03203cc4c8
17 changed files with 2175 additions and 1 deletions

View File

@ -0,0 +1,194 @@
# Copyright 2020 Red Hat, Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import mock
from oslo_config import fixture as fixture_config
from oslo_utils import uuidutils
from ovsdbapp.backend.ovs_idl import event
from ovsdbapp.backend.ovs_idl import idlutils
from ovsdbapp.tests.functional.schema.ovn_southbound import event as test_event
from neutron.agent.ovn.metadata import agent
from neutron.agent.ovn.metadata import ovsdb
from neutron.agent.ovn.metadata import server as metadata_server
from neutron.common.ovn import constants as ovn_const
from neutron.common import utils as n_utils
from neutron.conf.agent.metadata import config as meta_config
from neutron.conf.agent.ovn.metadata import config as meta_config_ovn
from neutron.tests.functional import base
class MetadataAgentHealthEvent(event.WaitEvent):
event_name = 'MetadataAgentHealthEvent'
def __init__(self, chassis, sb_cfg, timeout=5):
self.chassis = chassis
self.sb_cfg = sb_cfg
super(MetadataAgentHealthEvent, self).__init__(
(self.ROW_UPDATE,), 'Chassis', (('name', '=', self.chassis),),
timeout=timeout)
def matches(self, event, row, old=None):
if not super(MetadataAgentHealthEvent, self).matches(event, row, old):
return False
return int(row.external_ids.get(
ovn_const.OVN_AGENT_METADATA_SB_CFG_KEY, 0)) >= self.sb_cfg
class TestMetadataAgent(base.TestOVNFunctionalBase):
OVN_BRIDGE = 'br-int'
FAKE_CHASSIS_HOST = 'ovn-host-fake'
def setUp(self):
super(TestMetadataAgent, self).setUp()
self.handler = self.sb_api.idl.notify_handler
# We only have OVN NB and OVN SB running for functional tests
mock.patch.object(ovsdb, 'MetadataAgentOvsIdl').start()
self._mock_get_ovn_br = mock.patch.object(
agent.MetadataAgent,
'_get_ovn_bridge',
return_value=self.OVN_BRIDGE).start()
self.agent = self._start_metadata_agent()
def _start_metadata_agent(self):
conf = self.useFixture(fixture_config.Config()).conf
conf.register_opts(meta_config.SHARED_OPTS)
conf.register_opts(meta_config.UNIX_DOMAIN_METADATA_PROXY_OPTS)
conf.register_opts(meta_config.METADATA_PROXY_HANDLER_OPTS)
conf.register_opts(meta_config_ovn.OVS_OPTS, group='ovs')
meta_config_ovn.setup_privsep()
ovn_sb_db = self.ovsdb_server_mgr.get_ovsdb_connection_path('sb')
conf.set_override('ovn_sb_connection', ovn_sb_db, group='ovn')
# We don't need the HA proxy server running for now
p = mock.patch.object(metadata_server, 'UnixDomainMetadataProxy')
p.start()
self.addCleanup(p.stop)
self.chassis_name = self.add_fake_chassis(self.FAKE_CHASSIS_HOST)
mock.patch.object(agent.MetadataAgent,
'_get_own_chassis_name',
return_value=self.chassis_name).start()
agt = agent.MetadataAgent(conf)
agt.start()
# Metadata agent will open connections to OVS and SB databases.
# Close connections to them when the test ends,
self.addCleanup(agt.ovs_idl.ovsdb_connection.stop)
self.addCleanup(agt.sb_idl.ovsdb_connection.stop)
return agt
def test_metadata_agent_healthcheck(self):
chassis_row = self.sb_api.db_find(
'Chassis', ('name', '=', self.chassis_name)).execute(
check_error=True)[0]
# Assert that, prior to creating a resource the metadata agent
# didn't populate the external_ids from the Chassis
self.assertNotIn(ovn_const.OVN_AGENT_METADATA_SB_CFG_KEY,
chassis_row['external_ids'])
# Let's list the agents to force the nb_cfg to be bumped on NB
# db, which will automatically increment the nb_cfg counter on
# NB_Global and make ovn-controller copy it over to SB_Global. Upon
# this event, Metadata agent will update the external_ids on its
# Chassis row to signal that it's healthy.
row_event = MetadataAgentHealthEvent(self.chassis_name, 1)
self.handler.watch_event(row_event)
self.new_list_request('agents').get_response(self.api)
# If we do not time out waiting for the event, then we are assured
# that the metadata agent has populated the external_ids from the
# chassis with the nb_cfg, 1 revisions when listing the agents.
self.assertTrue(row_event.wait())
def _create_metadata_port(self, txn, lswitch_name):
mdt_port_name = 'ovn-mdt-' + uuidutils.generate_uuid()
txn.add(
self.nb_api.lsp_add(
lswitch_name,
mdt_port_name,
type='localport',
addresses='AA:AA:AA:AA:AA:AA 192.168.122.123',
external_ids={
ovn_const.OVN_CIDRS_EXT_ID_KEY: '192.168.122.123/24'}))
def test_agent_resync_on_non_existing_bridge(self):
BR_NEW = 'br-new'
self._mock_get_ovn_br.return_value = BR_NEW
self.agent.ovs_idl.list_br.return_value.execute.return_value = [BR_NEW]
# The agent has initialized with br-int and above list_br doesn't
# return it, hence the agent should trigger reconfiguration and store
# new br-new value to its attribute.
self.assertEqual(self.OVN_BRIDGE, self.agent.ovn_bridge)
lswitch_name = 'ovn-' + uuidutils.generate_uuid()
lswitchport_name = 'ovn-port-' + uuidutils.generate_uuid()
# It may take some time to ovn-northd to translate from OVN NB DB to
# the OVN SB DB. Wait for port binding event to happen before binding
# the port to chassis.
pb_event = test_event.WaitForPortBindingEvent(lswitchport_name)
self.handler.watch_event(pb_event)
with self.nb_api.transaction(check_error=True, log_errors=True) as txn:
txn.add(
self.nb_api.ls_add(lswitch_name))
txn.add(
self.nb_api.create_lswitch_port(
lswitchport_name, lswitch_name))
self._create_metadata_port(txn, lswitch_name)
self.assertTrue(pb_event.wait())
# Trigger PortBindingChassisEvent
self.sb_api.lsp_bind(lswitchport_name, self.chassis_name).execute(
check_error=True, log_errors=True)
exc = Exception("Agent bridge hasn't changed from %s to %s "
"in 10 seconds after Port_Binding event" %
(self.agent.ovn_bridge, BR_NEW))
n_utils.wait_until_true(
lambda: BR_NEW == self.agent.ovn_bridge,
timeout=10,
exception=exc)
def test_agent_registration_at_chassis_create_event(self):
chassis = self.sb_api.lookup('Chassis', self.chassis_name)
self.assertIn(ovn_const.OVN_AGENT_METADATA_ID_KEY,
chassis.external_ids)
# Delete Chassis and assert
self.del_fake_chassis(chassis.name)
self.assertRaises(idlutils.RowNotFound, self.sb_api.lookup,
'Chassis', self.chassis_name)
# Re-add the Chassis
self.add_fake_chassis(self.FAKE_CHASSIS_HOST, name=self.chassis_name)
def check_for_metadata():
chassis = self.sb_api.lookup('Chassis', self.chassis_name)
return ovn_const.OVN_AGENT_METADATA_ID_KEY in chassis.external_ids
exc = Exception('Agent metadata failed to re-register itself '
'after the Chassis %s was re-created' %
self.chassis_name)
# Check if metadata agent was re-registered
chassis = self.sb_api.lookup('Chassis', self.chassis_name)
n_utils.wait_until_true(
check_for_metadata,
timeout=10,
exception=exc)

View File

@ -13,22 +13,46 @@
# License for the specific language governing permissions and limitations # License for the specific language governing permissions and limitations
# under the License. # under the License.
from datetime import datetime
import errno
import os import os
import shutil
import warnings import warnings
import fixtures
import mock import mock
from neutron_lib import fixture
from neutron_lib.plugins import constants
from neutron_lib.plugins import directory
from oslo_concurrency import lockutils
from oslo_config import cfg from oslo_config import cfg
from oslo_db import exception as os_db_exc
from oslo_db.sqlalchemy import provision
from oslo_log import log
from oslo_utils import uuidutils
from neutron.agent.linux import utils from neutron.agent.linux import utils
from neutron.api import extensions as exts
from neutron.conf.agent import common as config from neutron.conf.agent import common as config
from neutron.conf.agent import ovs_conf from neutron.conf.agent import ovs_conf
from neutron.conf.plugins.ml2 import config as ml2_config
# Load all the models to register them into SQLAlchemy metadata before using
# the SqlFixture
from neutron.db import models # noqa
from neutron.plugins.ml2.drivers.ovn.mech_driver.ovsdb import worker
from neutron.plugins.ml2.drivers import type_geneve # noqa
from neutron.tests import base from neutron.tests import base
from neutron.tests.common import base as common_base from neutron.tests.common import base as common_base
from neutron.tests.common import helpers from neutron.tests.common import helpers
from neutron.tests.functional.resources import process
from neutron.tests.unit.plugins.ml2 import test_plugin
LOG = log.getLogger(__name__)
# This is the directory from which infra fetches log files for functional tests # This is the directory from which infra fetches log files for functional tests
DEFAULT_LOG_DIR = os.path.join(helpers.get_test_log_path(), DEFAULT_LOG_DIR = os.path.join(helpers.get_test_log_path(),
'dsvm-functional-logs') 'dsvm-functional-logs')
SQL_FIXTURE_LOCK = 'sql_fixture_lock'
def config_decorator(method_to_decorate, config_tuples): def config_decorator(method_to_decorate, config_tuples):
@ -99,3 +123,237 @@ class BaseSudoTestCase(BaseLoggingTestCase):
ovs_conf.register_ovs_agent_opts, ovs_agent_opts) ovs_conf.register_ovs_agent_opts, ovs_agent_opts)
mock.patch.object(ovs_conf, 'register_ovs_agent_opts', mock.patch.object(ovs_conf, 'register_ovs_agent_opts',
new=ovs_agent_decorator).start() new=ovs_agent_decorator).start()
class OVNSqlFixture(fixture.StaticSqlFixture):
@classmethod
@lockutils.synchronized(SQL_FIXTURE_LOCK)
def _init_resources(cls):
cls.schema_resource = provision.SchemaResource(
provision.DatabaseResource("sqlite"),
cls._generate_schema, teardown=False)
dependency_resources = {}
for name, resource in cls.schema_resource.resources:
dependency_resources[name] = resource.getResource()
cls.schema_resource.make(dependency_resources)
cls.engine = dependency_resources['database'].engine
def _delete_from_schema(self, engine):
try:
super(OVNSqlFixture, self)._delete_from_schema(engine)
except os_db_exc.DBNonExistentTable:
pass
class TestOVNFunctionalBase(test_plugin.Ml2PluginV2TestCase,
BaseLoggingTestCase):
OVS_DISTRIBUTION = 'openvswitch'
OVN_DISTRIBUTION = 'ovn'
OVN_SCHEMA_FILES = ['ovn-nb.ovsschema', 'ovn-sb.ovsschema']
_mechanism_drivers = ['logger', 'ovn']
_extension_drivers = ['port_security']
_counter = 0
l3_plugin = 'neutron.services.ovn_l3.plugin.OVNL3RouterPlugin'
def setUp(self, maintenance_worker=False):
ml2_config.cfg.CONF.set_override('extension_drivers',
self._extension_drivers,
group='ml2')
ml2_config.cfg.CONF.set_override('tenant_network_types',
['geneve'],
group='ml2')
ml2_config.cfg.CONF.set_override('vni_ranges',
['1:65536'],
group='ml2_type_geneve')
self.addCleanup(exts.PluginAwareExtensionManager.clear_instance)
super(TestOVNFunctionalBase, self).setUp()
self.test_log_dir = os.path.join(DEFAULT_LOG_DIR, self.id())
base.setup_test_logging(
cfg.CONF, self.test_log_dir, "testrun.txt")
mm = directory.get_plugin().mechanism_manager
self.mech_driver = mm.mech_drivers['ovn'].obj
self.l3_plugin = directory.get_plugin(constants.L3)
self.ovsdb_server_mgr = None
self.ovn_northd_mgr = None
self.maintenance_worker = maintenance_worker
self.temp_dir = self.useFixture(fixtures.TempDir()).path
self._start_ovsdb_server_and_idls()
self._start_ovn_northd()
def _get_install_share_path(self):
lookup_paths = set()
for installation in ['local', '']:
for distribution in [self.OVN_DISTRIBUTION, self.OVS_DISTRIBUTION]:
exists = True
for ovn_file in self.OVN_SCHEMA_FILES:
path = os.path.join(os.path.sep, 'usr', installation,
'share', distribution, ovn_file)
exists &= os.path.isfile(path)
lookup_paths.add(os.path.dirname(path))
if exists:
return os.path.dirname(path)
msg = 'Either ovn-nb.ovsschema and/or ovn-sb.ovsschema are missing. '
msg += 'Looked for schemas in paths:' + ', '.join(sorted(lookup_paths))
raise FileNotFoundError(
errno.ENOENT, os.strerror(errno.ENOENT), msg)
# FIXME(lucasagomes): Workaround for
# https://bugs.launchpad.net/networking-ovn/+bug/1808146. We should
# investigate and properly fix the problem. This method is just a
# workaround to alleviate the gate for now and should not be considered
# a proper fix.
def _setup_database_fixtures(self):
fixture = OVNSqlFixture()
self.useFixture(fixture)
self.engine = fixture.engine
def get_additional_service_plugins(self):
p = super(TestOVNFunctionalBase, self).get_additional_service_plugins()
p.update({'revision_plugin_name': 'revisions'})
return p
@property
def _ovsdb_protocol(self):
return self.get_ovsdb_server_protocol()
def get_ovsdb_server_protocol(self):
return 'unix'
def _start_ovn_northd(self):
if not self.ovsdb_server_mgr:
return
ovn_nb_db = self.ovsdb_server_mgr.get_ovsdb_connection_path('nb')
ovn_sb_db = self.ovsdb_server_mgr.get_ovsdb_connection_path('sb')
self.ovn_northd_mgr = self.useFixture(
process.OvnNorthd(self.temp_dir,
ovn_nb_db, ovn_sb_db,
protocol=self._ovsdb_protocol))
def _start_ovsdb_server_and_idls(self):
# Start 2 ovsdb-servers one each for OVN NB DB and OVN SB DB
# ovsdb-server with OVN SB DB can be used to test the chassis up/down
# events.
install_share_path = self._get_install_share_path()
self.ovsdb_server_mgr = self.useFixture(
process.OvsdbServer(self.temp_dir, install_share_path,
ovn_nb_db=True, ovn_sb_db=True,
protocol=self._ovsdb_protocol))
set_cfg = cfg.CONF.set_override
set_cfg('ovn_nb_connection',
self.ovsdb_server_mgr.get_ovsdb_connection_path(), 'ovn')
set_cfg('ovn_sb_connection',
self.ovsdb_server_mgr.get_ovsdb_connection_path(
db_type='sb'), 'ovn')
set_cfg('ovn_nb_private_key', self.ovsdb_server_mgr.private_key, 'ovn')
set_cfg('ovn_nb_certificate', self.ovsdb_server_mgr.certificate, 'ovn')
set_cfg('ovn_nb_ca_cert', self.ovsdb_server_mgr.ca_cert, 'ovn')
set_cfg('ovn_sb_private_key', self.ovsdb_server_mgr.private_key, 'ovn')
set_cfg('ovn_sb_certificate', self.ovsdb_server_mgr.certificate, 'ovn')
set_cfg('ovn_sb_ca_cert', self.ovsdb_server_mgr.ca_cert, 'ovn')
# 5 seconds should be more than enough for the transaction to complete
# for the test cases.
# This also fixes the bug #1607639.
cfg.CONF.set_override(
'ovsdb_connection_timeout', 5,
'ovn')
class TriggerCls(mock.MagicMock):
def trigger(self):
pass
trigger_cls = TriggerCls()
if self.maintenance_worker:
trigger_cls.trigger.__self__.__class__ = worker.MaintenanceWorker
cfg.CONF.set_override('neutron_sync_mode', 'off', 'ovn')
self.addCleanup(self._collect_processes_logs)
self.addCleanup(self.stop)
# mech_driver.post_fork_initialize creates the IDL connections
self.mech_driver.post_fork_initialize(
mock.ANY, mock.ANY, trigger_cls.trigger)
self.nb_api = self.mech_driver._nb_ovn
self.sb_api = self.mech_driver._sb_ovn
def _collect_processes_logs(self):
for database in ("nb", "sb"):
for file_suffix in ("log", "db"):
src_filename = "ovn_%(db)s.%(suffix)s" % {
'db': database,
'suffix': file_suffix
}
dst_filename = "ovn_%(db)s-%(timestamp)s.%(suffix)s" % {
'db': database,
'suffix': file_suffix,
'timestamp': datetime.now().strftime('%y-%m-%d_%H-%M-%S'),
}
filepath = os.path.join(self.temp_dir, src_filename)
shutil.copyfile(
filepath, os.path.join(self.test_log_dir, dst_filename))
def stop(self):
if self.maintenance_worker:
self.mech_driver.nb_synchronizer.stop()
self.mech_driver.sb_synchronizer.stop()
self.mech_driver._nb_ovn.ovsdb_connection.stop()
self.mech_driver._sb_ovn.ovsdb_connection.stop()
def restart(self):
self.stop()
# The OVN sync test starts its own synchronizers...
self.l3_plugin._nb_ovn_idl.ovsdb_connection.stop()
self.l3_plugin._sb_ovn_idl.ovsdb_connection.stop()
# Stop our monitor connections
self.nb_api.ovsdb_connection.stop()
self.sb_api.ovsdb_connection.stop()
if self.ovsdb_server_mgr:
self.ovsdb_server_mgr.stop()
if self.ovn_northd_mgr:
self.ovn_northd_mgr.stop()
self.mech_driver._nb_ovn = None
self.mech_driver._sb_ovn = None
self.l3_plugin._nb_ovn_idl = None
self.l3_plugin._sb_ovn_idl = None
self.nb_api.ovsdb_connection = None
self.sb_api.ovsdb_connection = None
self.ovsdb_server_mgr.delete_dbs()
self._start_ovsdb_server_and_idls()
self._start_ovn_northd()
def add_fake_chassis(self, host, physical_nets=None, external_ids=None,
name=None):
physical_nets = physical_nets or []
external_ids = external_ids or {}
bridge_mapping = ",".join(["%s:br-provider%s" % (phys_net, i)
for i, phys_net in enumerate(physical_nets)])
if name is None:
name = uuidutils.generate_uuid()
external_ids['ovn-bridge-mappings'] = bridge_mapping
# We'll be using different IP addresses every time for the Encap of
# the fake chassis as the SB schema doesn't allow to have two entries
# with same (ip,type) pairs as of OVS 2.11. This shouldn't have any
# impact as the tunnels won't get created anyways since ovn-controller
# is not running. Ideally we shouldn't be creating more than 255
# fake chassis but from the SB db point of view, 'ip' column can be
# any string so we could add entries with ip='172.24.4.1000'.
self._counter += 1
self.sb_api.chassis_add(
name, ['geneve'], '172.24.4.%d' % self._counter,
external_ids=external_ids, hostname=host).execute(check_error=True)
return name
def del_fake_chassis(self, chassis, if_exists=True):
self.sb_api.chassis_del(
chassis, if_exists=if_exists).execute(check_error=True)

View File

@ -0,0 +1,149 @@
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
import uuid
from ovsdbapp import event as ovsdb_event
from ovsdbapp.tests.functional import base
from ovsdbapp.tests import utils
from neutron.plugins.ml2.drivers.ovn.mech_driver.ovsdb \
import impl_idl_ovn as impl
from neutron.tests.functional import base as n_base
from neutron.tests.functional.resources.ovsdb import events
class TestSbApi(base.FunctionalTestCase,
n_base.BaseLoggingTestCase):
schemas = ['OVN_Southbound', 'OVN_Northbound']
def setUp(self):
super(TestSbApi, self).setUp()
self.data = {
'chassis': [
{'external_ids': {'ovn-bridge-mappings':
'public:br-ex,private:br-0'}},
{'external_ids': {'ovn-bridge-mappings':
'public:br-ex,public2:br-ex'}},
{'external_ids': {'ovn-bridge-mappings':
'public:br-ex'}},
]
}
self.api = impl.OvsdbSbOvnIdl(self.connection['OVN_Southbound'])
self.nbapi = impl.OvsdbNbOvnIdl(self.connection['OVN_Northbound'])
self.load_test_data()
self.handler = ovsdb_event.RowEventHandler()
self.api.idl.notify = self.handler.notify
def load_test_data(self):
with self.api.transaction(check_error=True) as txn:
for chassis in self.data['chassis']:
chassis['name'] = utils.get_rand_device_name('chassis')
chassis['hostname'] = '%s.localdomain.com' % chassis['name']
txn.add(self.api.chassis_add(
chassis['name'], ['geneve'], chassis['hostname'],
hostname=chassis['hostname'],
external_ids=chassis['external_ids']))
def test_get_chassis_hostname_and_physnets(self):
mapping = self.api.get_chassis_hostname_and_physnets()
self.assertLessEqual(len(self.data['chassis']), len(mapping))
self.assertGreaterEqual(set(mapping.keys()),
{c['hostname'] for c in self.data['chassis']})
def test_get_all_chassis(self):
chassis_list = set(self.api.get_all_chassis())
our_chassis = {c['name'] for c in self.data['chassis']}
self.assertLessEqual(our_chassis, chassis_list)
def test_get_chassis_data_for_ml2_bind_port(self):
host = self.data['chassis'][0]['hostname']
dp, iface, phys = self.api.get_chassis_data_for_ml2_bind_port(host)
self.assertEqual('', dp)
self.assertEqual('', iface)
self.assertItemsEqual(phys, ['private', 'public'])
def test_chassis_exists(self):
self.assertTrue(self.api.chassis_exists(
self.data['chassis'][0]['hostname']))
self.assertFalse(self.api.chassis_exists("nochassishere"))
def test_get_chassis_and_physnets(self):
mapping = self.api.get_chassis_and_physnets()
self.assertLessEqual(len(self.data['chassis']), len(mapping))
self.assertGreaterEqual(set(mapping.keys()),
{c['name'] for c in self.data['chassis']})
def _add_switch_port(self, chassis_name, type='localport'):
sname, pname = (utils.get_rand_device_name(prefix=p)
for p in ('switch', 'port'))
chassis = self.api.lookup('Chassis', chassis_name)
row_event = events.WaitForCreatePortBindingEvent(pname)
self.handler.watch_event(row_event)
with self.nbapi.transaction(check_error=True) as txn:
switch = txn.add(self.nbapi.ls_add(sname))
port = txn.add(self.nbapi.lsp_add(sname, pname, type=type))
row_event.wait()
return chassis, switch.result, port.result, row_event.row
def test_get_metadata_port_network(self):
chassis, switch, port, binding = self._add_switch_port(
self.data['chassis'][0]['name'])
result = self.api.get_metadata_port_network(str(binding.datapath.uuid))
self.assertEqual(binding, result)
self.assertEqual(binding.datapath.external_ids['logical-switch'],
str(switch.uuid))
def test_get_metadata_port_network_missing(self):
val = str(uuid.uuid4())
self.assertIsNone(self.api.get_metadata_port_network(val))
def test_set_get_chassis_metadata_networks(self):
name = self.data['chassis'][0]['name']
nets = [str(uuid.uuid4()) for _ in range(3)]
self.api.set_chassis_metadata_networks(name, nets).execute(
check_error=True)
self.assertEqual(nets, self.api.get_chassis_metadata_networks(name))
def test_get_network_port_bindings_by_ip(self):
chassis, switch, port, binding = self._add_switch_port(
self.data['chassis'][0]['name'])
mac = 'de:ad:be:ef:4d:ad'
ipaddr = '192.0.2.1'
mac_ip = '%s %s' % (mac, ipaddr)
pb_update_event = events.WaitForUpdatePortBindingEvent(
port.name, mac=[mac_ip])
self.handler.watch_event(pb_update_event)
self.nbapi.lsp_set_addresses(
port.name, [mac_ip]).execute(check_error=True)
self.assertTrue(pb_update_event.wait())
self.api.lsp_bind(port.name, chassis.name).execute(check_error=True)
result = self.api.get_network_port_bindings_by_ip(
str(binding.datapath.uuid), ipaddr)
self.assertIn(binding, result)
def test_get_ports_on_chassis(self):
chassis, switch, port, binding = self._add_switch_port(
self.data['chassis'][0]['name'])
self.api.lsp_bind(port.name, chassis.name).execute(check_error=True)
self.assertEqual([binding],
self.api.get_ports_on_chassis(chassis.name))
def test_get_logical_port_chassis_and_datapath(self):
chassis, switch, port, binding = self._add_switch_port(
self.data['chassis'][0]['name'])
self.api.lsp_bind(port.name, chassis.name).execute(check_error=True)
self.assertEqual(
(chassis.name, str(binding.datapath.uuid)),
self.api.get_logical_port_chassis_and_datapath(port.name))

View File

@ -0,0 +1,800 @@
# Copyright 2020 Red Hat, Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import mock
from futurist import periodics
from neutron_lib.api.definitions import external_net as extnet_apidef
from neutron_lib import constants as n_const
from neutron_lib import context as n_context
from neutron.common.ovn import constants as ovn_const
from neutron.common.ovn import utils
from neutron.conf.plugins.ml2.drivers.ovn import ovn_conf as ovn_config
from neutron.db import ovn_revision_numbers_db as db_rev
from neutron.plugins.ml2.drivers.ovn.mech_driver.ovsdb import maintenance
from neutron.tests.functional import base
from neutron.tests.unit.api import test_extensions
from neutron.tests.unit.extensions import test_extraroute
class _TestMaintenanceHelper(base.TestOVNFunctionalBase):
"""A helper class to keep the code more organized."""
def setUp(self):
super(_TestMaintenanceHelper, self).setUp()
self._ovn_client = self.mech_driver._ovn_client
self._l3_ovn_client = self.l3_plugin._ovn_client
ext_mgr = test_extraroute.ExtraRouteTestExtensionManager()
self.ext_api = test_extensions.setup_extensions_middleware(ext_mgr)
self.maint = maintenance.DBInconsistenciesPeriodics(self._ovn_client)
self.context = n_context.get_admin_context()
# Always verify inconsistencies for all objects.
db_rev.INCONSISTENCIES_OLDER_THAN = -1
def _find_network_row_by_name(self, name):
for row in self.nb_api._tables['Logical_Switch'].rows.values():
if (row.external_ids.get(
ovn_const.OVN_NETWORK_NAME_EXT_ID_KEY) == name):
return row
def _create_network(self, name, external=False):
data = {'network': {'name': name, 'tenant_id': self._tenant_id,
extnet_apidef.EXTERNAL: external}}
req = self.new_create_request('networks', data, self.fmt)
res = req.get_response(self.api)
return self.deserialize(self.fmt, res)['network']
def _update_network_name(self, net_id, new_name):
data = {'network': {'name': new_name}}
req = self.new_update_request('networks', data, net_id, self.fmt)
res = req.get_response(self.api)
return self.deserialize(self.fmt, res)['network']
def _create_port(self, name, net_id, security_groups=None,
device_owner=None):
data = {'port': {'name': name,
'tenant_id': self._tenant_id,
'network_id': net_id}}
if security_groups is not None:
data['port']['security_groups'] = security_groups
if device_owner is not None:
data['port']['device_owner'] = device_owner
req = self.new_create_request('ports', data, self.fmt)
res = req.get_response(self.api)
return self.deserialize(self.fmt, res)['port']
def _update_port_name(self, port_id, new_name):
data = {'port': {'name': new_name}}
req = self.new_update_request('ports', data, port_id, self.fmt)
res = req.get_response(self.api)
return self.deserialize(self.fmt, res)['port']
def _find_port_row_by_name(self, name):
for row in self.nb_api._tables['Logical_Switch_Port'].rows.values():
if (row.external_ids.get(
ovn_const.OVN_PORT_NAME_EXT_ID_KEY) == name):
return row
def _set_global_dhcp_opts(self, ip_version, opts):
opt_string = ','.join(['{0}:{1}'.format(key, value)
for key, value
in opts.items()])
if ip_version == 6:
ovn_config.cfg.CONF.set_override('ovn_dhcp6_global_options',
opt_string,
group='ovn')
if ip_version == 4:
ovn_config.cfg.CONF.set_override('ovn_dhcp4_global_options',
opt_string,
group='ovn')
def _unset_global_dhcp_opts(self, ip_version):
if ip_version == 6:
ovn_config.cfg.CONF.clear_override('ovn_dhcp6_global_options',
group='ovn')
if ip_version == 4:
ovn_config.cfg.CONF.clear_override('ovn_dhcp4_global_options',
group='ovn')
def _create_subnet(self, name, net_id, ip_version=4):
data = {'subnet': {'name': name,
'tenant_id': self._tenant_id,
'network_id': net_id,
'ip_version': ip_version,
'enable_dhcp': True}}
if ip_version == 4:
data['subnet']['cidr'] = '10.0.0.0/24'
else:
data['subnet']['cidr'] = 'eef0::/64'
req = self.new_create_request('subnets', data, self.fmt)
res = req.get_response(self.api)
return self.deserialize(self.fmt, res)['subnet']
def _update_subnet_enable_dhcp(self, subnet_id, value):
data = {'subnet': {'enable_dhcp': value}}
req = self.new_update_request('subnets', data, subnet_id, self.fmt)
res = req.get_response(self.api)
return self.deserialize(self.fmt, res)['subnet']
def _find_subnet_row_by_id(self, subnet_id):
for row in self.nb_api._tables['DHCP_Options'].rows.values():
if (row.external_ids.get('subnet_id') == subnet_id and
not row.external_ids.get('port_id')):
return row
def _create_router(self, name, external_gateway_info=None):
data = {'router': {'name': name, 'tenant_id': self._tenant_id}}
if external_gateway_info is not None:
data['router']['external_gateway_info'] = external_gateway_info
req = self.new_create_request('routers', data, self.fmt)
res = req.get_response(self.api)
return self.deserialize(self.fmt, res)['router']
def _update_router_name(self, net_id, new_name):
data = {'router': {'name': new_name}}
req = self.new_update_request('routers', data, net_id, self.fmt)
res = req.get_response(self.api)
return self.deserialize(self.fmt, res)['router']
def _find_router_row_by_name(self, name):
for row in self.nb_api._tables['Logical_Router'].rows.values():
if (row.external_ids.get(
ovn_const.OVN_ROUTER_NAME_EXT_ID_KEY) == name):
return row
def _create_security_group(self):
data = {'security_group': {'name': 'sgtest',
'tenant_id': self._tenant_id,
'description': 'SpongeBob Rocks!'}}
req = self.new_create_request('security-groups', data, self.fmt)
res = req.get_response(self.api)
return self.deserialize(self.fmt, res)['security_group']
def _find_security_group_row_by_id(self, sg_id):
if self.nb_api.is_port_groups_supported():
for row in self.nb_api._tables['Port_Group'].rows.values():
if row.name == utils.ovn_port_group_name(sg_id):
return row
else:
for row in self.nb_api._tables['Address_Set'].rows.values():
if (row.external_ids.get(
ovn_const.OVN_SG_EXT_ID_KEY) == sg_id):
return row
def _create_security_group_rule(self, sg_id):
data = {'security_group_rule': {'security_group_id': sg_id,
'direction': 'ingress',
'protocol': n_const.PROTO_NAME_TCP,
'ethertype': n_const.IPv4,
'port_range_min': 22,
'port_range_max': 22,
'tenant_id': self._tenant_id}}
req = self.new_create_request('security-group-rules', data, self.fmt)
res = req.get_response(self.api)
return self.deserialize(self.fmt, res)['security_group_rule']
def _find_security_group_rule_row_by_id(self, sgr_id):
for row in self.nb_api._tables['ACL'].rows.values():
if (row.external_ids.get(
ovn_const.OVN_SG_RULE_EXT_ID_KEY) == sgr_id):
return row
def _process_router_interface(self, action, router_id, subnet_id):
req = self.new_action_request(
'routers', {'subnet_id': subnet_id}, router_id,
'%s_router_interface' % action)
res = req.get_response(self.api)
return self.deserialize(self.fmt, res)
def _add_router_interface(self, router_id, subnet_id):
return self._process_router_interface('add', router_id, subnet_id)
def _remove_router_interface(self, router_id, subnet_id):
return self._process_router_interface('remove', router_id, subnet_id)
def _find_router_port_row_by_port_id(self, port_id):
for row in self.nb_api._tables['Logical_Router_Port'].rows.values():
if row.name == utils.ovn_lrouter_port_name(port_id):
return row
class TestMaintenance(_TestMaintenanceHelper):
def test_network(self):
net_name = 'networktest'
with mock.patch.object(self._ovn_client, 'create_network'):
neutron_obj = self._create_network(net_name)
# Assert the network doesn't exist in OVN
self.assertIsNone(self._find_network_row_by_name(net_name))
# Call the maintenance thread to fix the problem
self.maint.check_for_inconsistencies()
# Assert the network was now created
ovn_obj = self._find_network_row_by_name(net_name)
self.assertIsNotNone(ovn_obj)
self.assertEqual(
neutron_obj['revision_number'],
int(ovn_obj.external_ids[ovn_const.OVN_REV_NUM_EXT_ID_KEY]))
# > Update
new_obj_name = 'networktest_updated'
with mock.patch.object(self._ovn_client, 'update_network'):
new_neutron_obj = self._update_network_name(neutron_obj['id'],
new_obj_name)
# Assert the revision numbers are out-of-sync
ovn_obj = self._find_network_row_by_name(net_name)
self.assertNotEqual(
new_neutron_obj['revision_number'],
int(ovn_obj.external_ids[ovn_const.OVN_REV_NUM_EXT_ID_KEY]))
# Call the maintenance thread to fix the problem
self.maint.check_for_inconsistencies()
# Assert the old name doesn't exist anymore in the OVNDB
self.assertIsNone(self._find_network_row_by_name(net_name))
# Assert the network is now in sync
ovn_obj = self._find_network_row_by_name(new_obj_name)
self.assertEqual(
new_neutron_obj['revision_number'],
int(ovn_obj.external_ids[ovn_const.OVN_REV_NUM_EXT_ID_KEY]))
# > Delete
with mock.patch.object(self._ovn_client, 'delete_network'):
self._delete('networks', new_neutron_obj['id'])
# Assert the network still exists in OVNDB
self.assertIsNotNone(self._find_network_row_by_name(new_obj_name))
# Call the maintenance thread to fix the problem
self.maint.check_for_inconsistencies()
# Assert the network is now deleted from OVNDB
self.assertIsNone(self._find_network_row_by_name(new_obj_name))
# Assert the revision number no longer exists
self.assertIsNone(db_rev.get_revision_row(
self.context,
new_neutron_obj['id']))
def test_port(self):
obj_name = 'porttest'
neutron_net = self._create_network('network1')
with mock.patch.object(self._ovn_client, 'create_port'):
neutron_obj = self._create_port(obj_name, neutron_net['id'])
# Assert the port doesn't exist in OVN
self.assertIsNone(self._find_port_row_by_name(obj_name))
# Call the maintenance thread to fix the problem
self.maint.check_for_inconsistencies()
# Assert the port was now created
ovn_obj = self._find_port_row_by_name(obj_name)
self.assertIsNotNone(ovn_obj)
self.assertEqual(
neutron_obj['revision_number'],
int(ovn_obj.external_ids[ovn_const.OVN_REV_NUM_EXT_ID_KEY]))
# > Update
new_obj_name = 'porttest_updated'
with mock.patch.object(self._ovn_client, 'update_port'):
new_neutron_obj = self._update_port_name(neutron_obj['id'],
new_obj_name)
# Assert the revision numbers are out-of-sync
ovn_obj = self._find_port_row_by_name(obj_name)
self.assertNotEqual(
new_neutron_obj['revision_number'],
int(ovn_obj.external_ids[ovn_const.OVN_REV_NUM_EXT_ID_KEY]))
# Call the maintenance thread to fix the problem
self.maint.check_for_inconsistencies()
# Assert the old name doesn't exist anymore in the OVNDB
self.assertIsNone(self._find_port_row_by_name(obj_name))
# Assert the port is now in sync. Note that for ports we are
# fetching it again from the Neutron database prior to comparison
# because of the monitor code that can update the ports again upon
# changes to it.
ovn_obj = self._find_port_row_by_name(new_obj_name)
new_neutron_obj = self._ovn_client._plugin.get_port(
self.context, neutron_obj['id'])
self.assertEqual(
new_neutron_obj['revision_number'],
int(ovn_obj.external_ids[ovn_const.OVN_REV_NUM_EXT_ID_KEY]))
# > Delete
with mock.patch.object(self._ovn_client, 'delete_port'):
self._delete('ports', new_neutron_obj['id'])
# Assert the port still exists in OVNDB
self.assertIsNotNone(self._find_port_row_by_name(new_obj_name))
# Call the maintenance thread to fix the problem
self.maint.check_for_inconsistencies()
# Assert the port is now deleted from OVNDB
self.assertIsNone(self._find_port_row_by_name(new_obj_name))
# Assert the revision number no longer exists
self.assertIsNone(db_rev.get_revision_row(
self.context,
neutron_obj['id']))
def test_subnet_global_dhcp4_opts(self):
obj_name = 'globaltestsubnet'
options = {'ntp_server': '1.2.3.4'}
neutron_net = self._create_network('network1')
# Create a subnet without global options
neutron_sub = self._create_subnet(obj_name, neutron_net['id'])
# Assert that the option is not set
ovn_obj = self._find_subnet_row_by_id(neutron_sub['id'])
self.assertIsNone(ovn_obj.options.get('ntp_server', None))
# Set some global DHCP Options
self._set_global_dhcp_opts(ip_version=4, opts=options)
# Run the maintenance task to add the new options
self.assertRaises(periodics.NeverAgain,
self.maint.check_global_dhcp_opts)
# Assert that the option was added
ovn_obj = self._find_subnet_row_by_id(neutron_sub['id'])
self.assertEqual(
ovn_obj.options.get('ntp_server', None),
'1.2.3.4')
# Change the global option
new_options = {'ntp_server': '4.3.2.1'}
self._set_global_dhcp_opts(ip_version=4, opts=new_options)
# Run the maintenance task to update the options
self.assertRaises(periodics.NeverAgain,
self.maint.check_global_dhcp_opts)
# Assert that the option was changed
ovn_obj = self._find_subnet_row_by_id(neutron_sub['id'])
self.assertEqual(
ovn_obj.options.get('ntp_server', None),
'4.3.2.1')
# Change the global option to null
new_options = {'ntp_server': ''}
self._set_global_dhcp_opts(ip_version=4, opts=new_options)
# Run the maintenance task to update the options
self.assertRaises(periodics.NeverAgain,
self.maint.check_global_dhcp_opts)
# Assert that the option was removed
ovn_obj = self._find_subnet_row_by_id(neutron_sub['id'])
self.assertIsNone(ovn_obj.options.get('ntp_server', None))
def test_subnet_global_dhcp6_opts(self):
obj_name = 'globaltestsubnet'
options = {'ntp_server': '1.2.3.4'}
neutron_net = self._create_network('network1')
# Create a subnet without global options
neutron_sub = self._create_subnet(obj_name, neutron_net['id'], 6)
# Assert that the option is not set
ovn_obj = self._find_subnet_row_by_id(neutron_sub['id'])
self.assertIsNone(ovn_obj.options.get('ntp_server', None))
# Set some global DHCP Options
self._set_global_dhcp_opts(ip_version=6, opts=options)
# Run the maintenance task to add the new options
self.assertRaises(periodics.NeverAgain,
self.maint.check_global_dhcp_opts)
# Assert that the option was added
ovn_obj = self._find_subnet_row_by_id(neutron_sub['id'])
self.assertEqual(
ovn_obj.options.get('ntp_server', None),
'1.2.3.4')
# Change the global option
new_options = {'ntp_server': '4.3.2.1'}
self._set_global_dhcp_opts(ip_version=6, opts=new_options)
# Run the maintenance task to update the options
self.assertRaises(periodics.NeverAgain,
self.maint.check_global_dhcp_opts)
# Assert that the option was changed
ovn_obj = self._find_subnet_row_by_id(neutron_sub['id'])
self.assertEqual(
ovn_obj.options.get('ntp_server', None),
'4.3.2.1')
# Change the global option to null
new_options = {'ntp_server': ''}
self._set_global_dhcp_opts(ip_version=6, opts=new_options)
# Run the maintenance task to update the options
self.assertRaises(periodics.NeverAgain,
self.maint.check_global_dhcp_opts)
# Assert that the option was removed
ovn_obj = self._find_subnet_row_by_id(neutron_sub['id'])
self.assertIsNone(ovn_obj.options.get('ntp_server', None))
def test_subnet(self):
obj_name = 'subnettest'
neutron_net = self._create_network('network1')
with mock.patch.object(self._ovn_client, 'create_subnet'):
neutron_obj = self._create_subnet(obj_name, neutron_net['id'])
# Assert the subnet doesn't exist in OVN
self.assertIsNone(self._find_subnet_row_by_id(neutron_obj['id']))
# Call the maintenance thread to fix the problem
self.maint.check_for_inconsistencies()
# Assert the subnet was now created
ovn_obj = self._find_subnet_row_by_id(neutron_obj['id'])
self.assertIsNotNone(ovn_obj)
self.assertEqual(
neutron_obj['revision_number'],
int(ovn_obj.external_ids[ovn_const.OVN_REV_NUM_EXT_ID_KEY]))
# > Update
with mock.patch.object(self._ovn_client, 'update_subnet'):
neutron_obj = self._update_subnet_enable_dhcp(
neutron_obj['id'], False)
# Assert the revision numbers are out-of-sync
ovn_obj = self._find_subnet_row_by_id(neutron_obj['id'])
self.assertNotEqual(
neutron_obj['revision_number'],
int(ovn_obj.external_ids[ovn_const.OVN_REV_NUM_EXT_ID_KEY]))
# Call the maintenance thread to fix the problem
self.maint.check_for_inconsistencies()
# Assert the old name doesn't exist anymore in the OVNDB. When
# the subnet's enable_dhcp's is set to False, OVN will remove the
# DHCP_Options entry related to that subnet.
self.assertIsNone(self._find_subnet_row_by_id(neutron_obj['id']))
# Re-enable the DHCP for the subnet and check if the maintenance
# thread will re-create it in OVN
with mock.patch.object(self._ovn_client, 'update_subnet'):
neutron_obj = self._update_subnet_enable_dhcp(
neutron_obj['id'], True)
# Assert the DHCP_Options still doesn't exist in OVNDB
self.assertIsNone(self._find_subnet_row_by_id(neutron_obj['id']))
# Call the maintenance thread to fix the problem
self.maint.check_for_inconsistencies()
# Assert the subnet is now in sync
ovn_obj = self._find_subnet_row_by_id(neutron_obj['id'])
self.assertEqual(
neutron_obj['revision_number'],
int(ovn_obj.external_ids[ovn_const.OVN_REV_NUM_EXT_ID_KEY]))
# > Delete
with mock.patch.object(self._ovn_client, 'delete_subnet'):
self._delete('subnets', neutron_obj['id'])
# Assert the subnet still exists in OVNDB
self.assertIsNotNone(self._find_subnet_row_by_id(neutron_obj['id']))
# Call the maintenance thread to fix the problem
self.maint.check_for_inconsistencies()
# Assert the subnet is now deleted from OVNDB
self.assertIsNone(self._find_subnet_row_by_id(neutron_obj['id']))
# Assert the revision number no longer exists
self.assertIsNone(db_rev.get_revision_row(
self.context,
neutron_obj['id']))
def test_router(self):
obj_name = 'routertest'
with mock.patch.object(self._l3_ovn_client, 'create_router'):
neutron_obj = self._create_router(obj_name)
# Assert the router doesn't exist in OVN
self.assertIsNone(self._find_router_row_by_name(obj_name))
# Call the maintenance thread to fix the problem
self.maint.check_for_inconsistencies()
# Assert the router was now created
ovn_obj = self._find_router_row_by_name(obj_name)
self.assertIsNotNone(ovn_obj)
self.assertEqual(
neutron_obj['revision_number'],
int(ovn_obj.external_ids[ovn_const.OVN_REV_NUM_EXT_ID_KEY]))
# > Update
new_obj_name = 'routertest_updated'
with mock.patch.object(self._l3_ovn_client, 'update_router'):
new_neutron_obj = self._update_router_name(neutron_obj['id'],
new_obj_name)
# Assert the revision numbers are out-of-sync
ovn_obj = self._find_router_row_by_name(obj_name)
self.assertNotEqual(
new_neutron_obj['revision_number'],
int(ovn_obj.external_ids[ovn_const.OVN_REV_NUM_EXT_ID_KEY]))
# Call the maintenance thread to fix the problem
self.maint.check_for_inconsistencies()
# Assert the old name doesn't exist anymore in the OVNDB
self.assertIsNone(self._find_router_row_by_name(obj_name))
# Assert the router is now in sync
ovn_obj = self._find_router_row_by_name(new_obj_name)
self.assertEqual(
new_neutron_obj['revision_number'],
int(ovn_obj.external_ids[ovn_const.OVN_REV_NUM_EXT_ID_KEY]))
# > Delete
with mock.patch.object(self._l3_ovn_client, 'delete_router'):
self._delete('routers', new_neutron_obj['id'])
# Assert the router still exists in OVNDB
self.assertIsNotNone(self._find_router_row_by_name(new_obj_name))
# Call the maintenance thread to fix the problem
self.maint.check_for_inconsistencies()
# Assert the router is now deleted from OVNDB
self.assertIsNone(self._find_router_row_by_name(new_obj_name))
# Assert the revision number no longer exists
self.assertIsNone(db_rev.get_revision_row(
self.context,
new_neutron_obj['id']))
def test_security_group(self):
with mock.patch.object(self._ovn_client, 'create_security_group'):
neutron_obj = self._create_security_group()
# Assert the sg doesn't exist in OVN
self.assertIsNone(
self._find_security_group_row_by_id(neutron_obj['id']))
# Call the maintenance thread to fix the problem
self.maint.check_for_inconsistencies()
# Assert the sg was now created. We don't save the revision number
# in the Security Group because OVN doesn't support updating it,
# all we care about is whether it exists or not.
self.assertIsNotNone(
self._find_security_group_row_by_id(neutron_obj['id']))
# > Delete
with mock.patch.object(self._ovn_client, 'delete_security_group'):
self._delete('security-groups', neutron_obj['id'])
# Assert the sg still exists in OVNDB
self.assertIsNotNone(
self._find_security_group_row_by_id(neutron_obj['id']))
# Call the maintenance thread to fix the problem
self.maint.check_for_inconsistencies()
# Assert the sg is now deleted from OVNDB
self.assertIsNone(
self._find_security_group_row_by_id(neutron_obj['id']))
# Assert the revision number no longer exists
self.assertIsNone(db_rev.get_revision_row(
self.context,
neutron_obj['id']))
def test_security_group_rule(self):
neutron_sg = self._create_security_group()
neutron_net = self._create_network('network1')
self._create_port('portsgtest', neutron_net['id'],
security_groups=[neutron_sg['id']])
with mock.patch.object(self._ovn_client, 'create_security_group_rule'):
neutron_obj = self._create_security_group_rule(neutron_sg['id'])
# Assert the sg rule doesn't exist in OVN
self.assertIsNone(
self._find_security_group_rule_row_by_id(neutron_obj['id']))
# Call the maintenance thread to fix the problem
self.maint.check_for_inconsistencies()
# Assert the sg rule was now created. We don't save the revision number
# in the Security Group because OVN doesn't support updating it,
# all we care about is whether it exists or not.
self.assertIsNotNone(
self._find_security_group_rule_row_by_id(neutron_obj['id']))
# > Delete
# FIXME(lucasagomes): Maintenance thread fixing deleted
# security group rules is currently broken due to:
# https://bugs.launchpad.net/networking-ovn/+bug/1756123
def test_router_port(self):
neutron_net = self._create_network('networktest', external=True)
neutron_subnet = self._create_subnet('subnettest', neutron_net['id'])
neutron_router = self._create_router('routertest')
with mock.patch.object(self._l3_ovn_client, 'create_router_port'):
with mock.patch('neutron.db.ovn_revision_numbers_db.'
'bump_revision'):
neutron_obj = self._add_router_interface(neutron_router['id'],
neutron_subnet['id'])
# Assert the router port doesn't exist in OVN
self.assertIsNone(
self._find_router_port_row_by_port_id(neutron_obj['port_id']))
# Call the maintenance thread to fix the problem
self.maint.check_for_inconsistencies()
# Assert the router port was now created
self.assertIsNotNone(
self._find_router_port_row_by_port_id(neutron_obj['port_id']))
# > Delete
with mock.patch.object(self._l3_ovn_client, 'delete_router_port'):
self._remove_router_interface(neutron_router['id'],
neutron_subnet['id'])
# Assert the router port still exists in OVNDB
self.assertIsNotNone(
self._find_router_port_row_by_port_id(neutron_obj['port_id']))
# Call the maintenance thread to fix the problem
self.maint.check_for_inconsistencies()
# Assert the router port is now deleted from OVNDB
self.assertIsNone(
self._find_router_port_row_by_port_id(neutron_obj['port_id']))
# Assert the revision number no longer exists
self.assertIsNone(db_rev.get_revision_row(
self.context,
neutron_obj['port_id']))
def test_check_metadata_ports(self):
ovn_config.cfg.CONF.set_override('ovn_metadata_enabled', True,
group='ovn')
neutron_net = self._create_network('network1')
metadata_port = self._ovn_client._find_metadata_port(
self.context, neutron_net['id'])
# Assert the metadata port exists
self.assertIsNotNone(metadata_port)
# Delete the metadata port
self._delete('ports', metadata_port['id'])
metadata_port = self._ovn_client._find_metadata_port(
self.context, neutron_net['id'])
# Assert the metadata port is gone
self.assertIsNone(metadata_port)
# Call the maintenance thread to fix the problem, it will raise
# NeverAgain so that the job only runs once at startup
self.assertRaises(periodics.NeverAgain,
self.maint.check_metadata_ports)
metadata_port = self._ovn_client._find_metadata_port(
self.context, neutron_net['id'])
# Assert the metadata port was re-created
self.assertIsNotNone(metadata_port)
def test_check_metadata_ports_not_enabled(self):
ovn_config.cfg.CONF.set_override('ovn_metadata_enabled', False,
group='ovn')
with mock.patch.object(self._ovn_client,
'create_metadata_port') as mock_create_port:
self.assertRaises(periodics.NeverAgain,
self.maint.check_metadata_ports)
# Assert create_metadata_port() wasn't called since metadata
# is not enabled
self.assertFalse(mock_create_port.called)
def test_check_for_port_security_unknown_address(self):
neutron_net = self._create_network('network1')
neutron_port = self._create_port('port1', neutron_net['id'])
# Let's force disabling port security for the LSP
self.nb_api.lsp_set_port_security(neutron_port['id'], []).execute(
check_error=True)
ovn_port = self.nb_api.db_find(
'Logical_Switch_Port', ('name', '=', neutron_port['id'])).execute(
check_error=True)[0]
# Assert that port security is now disabled but the 'unknown'
# is not set in the addresses column
self.assertFalse(ovn_port['port_security'])
self.assertNotIn('unknown', ovn_port['addresses'])
# Call the maintenance task to fix the problem. Note that
# NeverAgain is raised so it only runs once at start up
self.assertRaises(periodics.NeverAgain,
self.maint.check_for_port_security_unknown_address)
ovn_port = self.nb_api.db_find(
'Logical_Switch_Port', ('name', '=', neutron_port['id'])).execute(
check_error=True)[0]
# Assert that 'unknown' was set in the addresses column for
# the port
self.assertFalse(ovn_port['port_security'])
self.assertIn('unknown', ovn_port['addresses'])
# Now the other way around, let's set port_security in the OVN
# table while the 'unknown' address is set in the addresses column
self.nb_api.lsp_set_port_security(
neutron_port['id'], ovn_port['addresses']).execute(
check_error=True)
ovn_port = self.nb_api.db_find(
'Logical_Switch_Port', ('name', '=', neutron_port['id'])).execute(
check_error=True)[0]
self.assertTrue(ovn_port['port_security'])
self.assertIn('unknown', ovn_port['addresses'])
# Call the maintenance task to fix the problem. Note that
# NeverAgain is raised so it only runs once at start up
self.assertRaises(periodics.NeverAgain,
self.maint.check_for_port_security_unknown_address)
ovn_port = self.nb_api.db_find(
'Logical_Switch_Port', ('name', '=', neutron_port['id'])).execute(
check_error=True)[0]
# Assert that 'unknown' was removed from the addresses column
# for the port
self.assertTrue(ovn_port['port_security'])
self.assertNotIn('unknown', ovn_port['addresses'])

View File

@ -0,0 +1,432 @@
# Copyright 2016 Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import functools
import mock
from oslo_config import cfg
from oslo_utils import uuidutils
from neutron.common.ovn import constants as ovn_const
from neutron.common.ovn import utils
from neutron.common import utils as n_utils
from neutron.db import ovn_revision_numbers_db as db_rev
from neutron.tests.functional import base
class TestPortBinding(base.TestOVNFunctionalBase):
def setUp(self):
super(TestPortBinding, self).setUp()
self.ovs_host = 'ovs-host'
self.dpdk_host = 'dpdk-host'
self.invalid_dpdk_host = 'invalid-host'
self.vhu_mode = 'server'
self.add_fake_chassis(self.ovs_host)
self.add_fake_chassis(
self.dpdk_host,
external_ids={'datapath-type': 'netdev',
'iface-types': 'dummy,dummy-internal,dpdkvhostuser'})
self.add_fake_chassis(
self.invalid_dpdk_host,
external_ids={'datapath-type': 'netdev',
'iface-types': 'dummy,dummy-internal,geneve,vxlan'})
self.n1 = self._make_network(self.fmt, 'n1', True)
res = self._create_subnet(self.fmt, self.n1['network']['id'],
'10.0.0.0/24')
self.deserialize(self.fmt, res)
def _create_or_update_port(self, port_id=None, hostname=None):
if port_id is None:
port_data = {
'port': {'network_id': self.n1['network']['id'],
'tenant_id': self._tenant_id}}
if hostname:
port_data['port']['device_id'] = uuidutils.generate_uuid()
port_data['port']['device_owner'] = 'compute:None'
port_data['port']['binding:host_id'] = hostname
port_req = self.new_create_request('ports', port_data, self.fmt)
port_res = port_req.get_response(self.api)
p = self.deserialize(self.fmt, port_res)
port_id = p['port']['id']
else:
port_data = {
'port': {'device_id': uuidutils.generate_uuid(),
'device_owner': 'compute:None',
'binding:host_id': hostname}}
port_req = self.new_update_request('ports', port_data, port_id,
self.fmt)
port_res = port_req.get_response(self.api)
self.deserialize(self.fmt, port_res)
return port_id
def _verify_vif_details(self, port_id, expected_host_name,
expected_vif_type, expected_vif_details):
port_req = self.new_show_request('ports', port_id)
port_res = port_req.get_response(self.api)
p = self.deserialize(self.fmt, port_res)
self.assertEqual(expected_host_name, p['port']['binding:host_id'])
self.assertEqual(expected_vif_type, p['port']['binding:vif_type'])
self.assertEqual(expected_vif_details,
p['port']['binding:vif_details'])
def test_port_binding_create_port(self):
port_id = self._create_or_update_port(hostname=self.ovs_host)
self._verify_vif_details(port_id, self.ovs_host, 'ovs',
{'port_filter': True})
port_id = self._create_or_update_port(hostname=self.dpdk_host)
expected_vif_details = {'port_filter': False,
'vhostuser_mode': self.vhu_mode,
'vhostuser_ovs_plug': True}
expected_vif_details['vhostuser_socket'] = (
utils.ovn_vhu_sockpath(cfg.CONF.ovn.vhost_sock_dir, port_id))
self._verify_vif_details(port_id, self.dpdk_host, 'vhostuser',
expected_vif_details)
port_id = self._create_or_update_port(hostname=self.invalid_dpdk_host)
self._verify_vif_details(port_id, self.invalid_dpdk_host, 'ovs',
{'port_filter': True})
def test_port_binding_update_port(self):
port_id = self._create_or_update_port()
self._verify_vif_details(port_id, '', 'unbound', {})
port_id = self._create_or_update_port(port_id=port_id,
hostname=self.ovs_host)
self._verify_vif_details(port_id, self.ovs_host, 'ovs',
{'port_filter': True})
port_id = self._create_or_update_port(port_id=port_id,
hostname=self.dpdk_host)
expected_vif_details = {'port_filter': False,
'vhostuser_mode': self.vhu_mode,
'vhostuser_ovs_plug': True}
expected_vif_details['vhostuser_socket'] = (
utils.ovn_vhu_sockpath(cfg.CONF.ovn.vhost_sock_dir, port_id))
self._verify_vif_details(port_id, self.dpdk_host, 'vhostuser',
expected_vif_details)
port_id = self._create_or_update_port(port_id=port_id,
hostname=self.invalid_dpdk_host)
self._verify_vif_details(port_id, self.invalid_dpdk_host, 'ovs',
{'port_filter': True})
class TestPortBindingOverTcp(TestPortBinding):
def get_ovsdb_server_protocol(self):
return 'tcp'
# TODO(mjozefcz): This test class hangs during execution.
class TestPortBindingOverSsl(TestPortBinding):
def get_ovsdb_server_protocol(self):
return 'ssl'
class TestNetworkMTUUpdate(base.TestOVNFunctionalBase):
def setUp(self):
super(TestNetworkMTUUpdate, self).setUp()
self._ovn_client = self.mech_driver._ovn_client
self.n1 = self._make_network(self.fmt, 'n1', True)
res = self._create_subnet(self.fmt, self.n1['network']['id'],
'10.0.0.0/24')
self.sub = self.deserialize(self.fmt, res)
def test_update_network_mtu(self):
mtu_value = self.n1['network']['mtu'] - 100
dhcp_options = (
self.mech_driver._ovn_client._nb_idl.get_subnet_dhcp_options(
self.sub['subnet']['id'])
)
self.assertNotEqual(
int(dhcp_options['subnet']['options']['mtu']),
mtu_value)
data = {'network': {'mtu': mtu_value}}
req = self.new_update_request(
'networks', data, self.n1['network']['id'], self.fmt)
req.get_response(self.api)
dhcp_options = (
self.mech_driver._ovn_client._nb_idl.get_subnet_dhcp_options(
self.sub['subnet']['id'])
)
self.assertEqual(
int(dhcp_options['subnet']['options']['mtu']),
mtu_value)
def test_no_update_network_mtu(self):
mtu_value = self.n1['network']['mtu']
base_revision = db_rev.get_revision_row(
self.context,
self.sub['subnet']['id'])
data = {'network': {'mtu': mtu_value}}
req = self.new_update_request(
'networks', data, self.n1['network']['id'], self.fmt)
req.get_response(self.api)
second_revision = db_rev.get_revision_row(
self.context,
self.sub['subnet']['id'])
self.assertEqual(
base_revision.updated_at,
second_revision.updated_at)
@mock.patch('neutron.plugins.ml2.drivers.ovn.mech_driver.'
'ovsdb.ovn_client.OVNClient._is_virtual_port_supported',
lambda *args: True)
class TestVirtualPorts(base.TestOVNFunctionalBase):
def setUp(self):
super(TestVirtualPorts, self).setUp()
self._ovn_client = self.mech_driver._ovn_client
self.n1 = self._make_network(self.fmt, 'n1', True)
res = self._create_subnet(self.fmt, self.n1['network']['id'],
'10.0.0.0/24')
self.sub = self.deserialize(self.fmt, res)
def _create_port(self, fixed_ip=None, allowed_address=None):
port_data = {
'port': {'network_id': self.n1['network']['id'],
'tenant_id': self._tenant_id}}
if fixed_ip:
port_data['port']['fixed_ips'] = [{'ip_address': fixed_ip}]
if allowed_address:
port_data['port']['allowed_address_pairs'] = [
{'ip_address': allowed_address}]
port_req = self.new_create_request('ports', port_data, self.fmt)
port_res = port_req.get_response(self.api)
self.assertEqual(201, port_res.status_int)
return self.deserialize(self.fmt, port_res)['port']
def _update_allowed_address_pair(self, port_id, data):
port_data = {
'port': {'allowed_address_pairs': data}}
port_req = self.new_update_request('ports', port_data, port_id,
self.fmt)
port_res = port_req.get_response(self.api)
self.assertEqual(200, port_res.status_int)
return self.deserialize(self.fmt, port_res)['port']
def _set_allowed_address_pair(self, port_id, ip):
return self._update_allowed_address_pair(port_id, [{'ip_address': ip}])
def _unset_allowed_address_pair(self, port_id):
return self._update_allowed_address_pair(port_id, [])
def _find_port_row(self, port_id):
for row in self.nb_api._tables['Logical_Switch_Port'].rows.values():
if row.name == port_id:
return row
def _is_ovn_port_type(self, port_id, port_type):
ovn_vport = self._find_port_row(port_id)
return port_type == ovn_vport.type
def _check_port_type(self, port_id, type):
check = functools.partial(self._is_ovn_port_type, port_id, type)
n_utils.wait_until_true(check, timeout=10)
def test_virtual_port_created_before(self):
virt_port = self._create_port()
virt_ip = virt_port['fixed_ips'][0]['ip_address']
# Create the master port with the VIP address already set in
# the allowed_address_pairs field
master = self._create_port(allowed_address=virt_ip)
# Assert the virt port has the type virtual and master is set
# as parent
self._check_port_type(virt_port['id'], ovn_const.LSP_TYPE_VIRTUAL)
ovn_vport = self._find_port_row(virt_port['id'])
self.assertEqual(
virt_ip,
ovn_vport.options[ovn_const.LSP_OPTIONS_VIRTUAL_IP_KEY])
self.assertEqual(
master['id'],
ovn_vport.options[ovn_const.LSP_OPTIONS_VIRTUAL_PARENTS_KEY])
# Create the backport parent port
backup = self._create_port(allowed_address=virt_ip)
# Assert the virt port now also includes the backup port as a parent
self._check_port_type(virt_port['id'], ovn_const.LSP_TYPE_VIRTUAL)
ovn_vport = self._find_port_row(virt_port['id'])
self.assertEqual(
virt_ip,
ovn_vport.options[ovn_const.LSP_OPTIONS_VIRTUAL_IP_KEY])
self.assertIn(
master['id'],
ovn_vport.options[ovn_const.LSP_OPTIONS_VIRTUAL_PARENTS_KEY])
self.assertIn(
backup['id'],
ovn_vport.options[ovn_const.LSP_OPTIONS_VIRTUAL_PARENTS_KEY])
def test_virtual_port_update_address_pairs(self):
master = self._create_port()
backup = self._create_port()
virt_port = self._create_port()
virt_ip = virt_port['fixed_ips'][0]['ip_address']
# Assert the virt port does not yet have the type virtual (no
# address pairs were set yet)
self._check_port_type(virt_port['id'], ''),
ovn_vport = self._find_port_row(virt_port['id'])
self.assertNotIn(ovn_const.LSP_OPTIONS_VIRTUAL_PARENTS_KEY,
ovn_vport.options)
self.assertNotIn(ovn_const.LSP_OPTIONS_VIRTUAL_IP_KEY,
ovn_vport.options)
# Set the virt IP to the allowed address pairs of the master port
self._set_allowed_address_pair(master['id'], virt_ip)
# Assert the virt port is now updated
self._check_port_type(virt_port['id'], ovn_const.LSP_TYPE_VIRTUAL),
ovn_vport = self._find_port_row(virt_port['id'])
self.assertEqual(
virt_ip,
ovn_vport.options[ovn_const.LSP_OPTIONS_VIRTUAL_IP_KEY])
self.assertEqual(
master['id'],
ovn_vport.options[ovn_const.LSP_OPTIONS_VIRTUAL_PARENTS_KEY])
# Set the virt IP to the allowed address pairs of the backup port
self._set_allowed_address_pair(backup['id'], virt_ip)
# Assert the virt port now includes the backup port as a parent
self._check_port_type(virt_port['id'], ovn_const.LSP_TYPE_VIRTUAL),
ovn_vport = self._find_port_row(virt_port['id'])
self.assertEqual(
virt_ip,
ovn_vport.options[ovn_const.LSP_OPTIONS_VIRTUAL_IP_KEY])
self.assertIn(
master['id'],
ovn_vport.options[ovn_const.LSP_OPTIONS_VIRTUAL_PARENTS_KEY])
self.assertIn(
backup['id'],
ovn_vport.options[ovn_const.LSP_OPTIONS_VIRTUAL_PARENTS_KEY])
# Remove the address pairs from the master port
self._unset_allowed_address_pair(master['id'])
# Assert the virt port now only has the backup port as a parent
self._check_port_type(virt_port['id'], ovn_const.LSP_TYPE_VIRTUAL),
ovn_vport = self._find_port_row(virt_port['id'])
self.assertEqual(
virt_ip,
ovn_vport.options[ovn_const.LSP_OPTIONS_VIRTUAL_IP_KEY])
self.assertEqual(
backup['id'],
ovn_vport.options[ovn_const.LSP_OPTIONS_VIRTUAL_PARENTS_KEY])
# Remove the address pairs from the backup port
self._unset_allowed_address_pair(backup['id'])
# Assert the virt port is not type virtual anymore and the virtual
# port options are cleared
self._check_port_type(virt_port['id'], ''),
ovn_vport = self._find_port_row(virt_port['id'])
self.assertNotIn(ovn_const.LSP_OPTIONS_VIRTUAL_PARENTS_KEY,
ovn_vport.options)
self.assertNotIn(ovn_const.LSP_OPTIONS_VIRTUAL_IP_KEY,
ovn_vport.options)
def test_virtual_port_created_after(self):
master = self._create_port(fixed_ip='10.0.0.11')
backup = self._create_port(fixed_ip='10.0.0.12')
virt_ip = '10.0.0.55'
# Set the virt IP to the master and backup ports *before* creating
# the virtual port
self._set_allowed_address_pair(master['id'], virt_ip)
self._set_allowed_address_pair(backup['id'], virt_ip)
virt_port = self._create_port(fixed_ip=virt_ip)
# Assert the virtual port has been created with the
# right type and parents
ovn_vport = self._find_port_row(virt_port['id'])
self.assertEqual(ovn_const.LSP_TYPE_VIRTUAL, ovn_vport.type)
self.assertEqual(
virt_ip,
ovn_vport.options[ovn_const.LSP_OPTIONS_VIRTUAL_IP_KEY])
self.assertIn(
master['id'],
ovn_vport.options[ovn_const.LSP_OPTIONS_VIRTUAL_PARENTS_KEY])
self.assertIn(
backup['id'],
ovn_vport.options[ovn_const.LSP_OPTIONS_VIRTUAL_PARENTS_KEY])
def test_virtual_port_delete_parents(self):
master = self._create_port()
backup = self._create_port()
virt_port = self._create_port()
virt_ip = virt_port['fixed_ips'][0]['ip_address']
# Assert the virt port does not yet have the type virtual (no
# address pairs were set yet)
ovn_vport = self._find_port_row(virt_port['id'])
self.assertEqual("", ovn_vport.type)
self.assertNotIn(ovn_const.LSP_OPTIONS_VIRTUAL_PARENTS_KEY,
ovn_vport.options)
self.assertNotIn(ovn_const.LSP_OPTIONS_VIRTUAL_IP_KEY,
ovn_vport.options)
# Set allowed address paris to the master and backup ports
self._set_allowed_address_pair(master['id'], virt_ip)
self._set_allowed_address_pair(backup['id'], virt_ip)
# Assert the virtual port is correct
ovn_vport = self._find_port_row(virt_port['id'])
self.assertEqual(ovn_const.LSP_TYPE_VIRTUAL, ovn_vport.type)
self.assertEqual(
virt_ip,
ovn_vport.options[ovn_const.LSP_OPTIONS_VIRTUAL_IP_KEY])
self.assertIn(
master['id'],
ovn_vport.options[ovn_const.LSP_OPTIONS_VIRTUAL_PARENTS_KEY])
self.assertIn(
backup['id'],
ovn_vport.options[ovn_const.LSP_OPTIONS_VIRTUAL_PARENTS_KEY])
# Delete the backup port
self._delete('ports', backup['id'])
# Assert the virt port now only has the master port as a parent
ovn_vport = self._find_port_row(virt_port['id'])
self.assertEqual(ovn_const.LSP_TYPE_VIRTUAL, ovn_vport.type)
self.assertEqual(
virt_ip,
ovn_vport.options[ovn_const.LSP_OPTIONS_VIRTUAL_IP_KEY])
self.assertEqual(
master['id'],
ovn_vport.options[ovn_const.LSP_OPTIONS_VIRTUAL_PARENTS_KEY])
# Delete the master port
self._delete('ports', master['id'])
# Assert the virt port is not type virtual anymore and the virtual
# port options are cleared
ovn_vport = self._find_port_row(virt_port['id'])
self.assertEqual("", ovn_vport.type)
self.assertNotIn(ovn_const.LSP_OPTIONS_VIRTUAL_PARENTS_KEY,
ovn_vport.options)
self.assertNotIn(ovn_const.LSP_OPTIONS_VIRTUAL_IP_KEY,
ovn_vport.options)

View File

@ -5,3 +5,5 @@
# process, which may cause wedges in the gate later. # process, which may cause wedges in the gate later.
psycopg2 psycopg2
psutil>=1.1.1,<3.2.2
PyMySQL>=0.6.2 # MIT License

View File

@ -0,0 +1,65 @@
# Copyright 2019 Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import collections
import threading
from ovsdbapp.backend.ovs_idl import event
from ovsdbapp.tests.functional.schema.ovn_southbound import event as test_event
class WaitForCrLrpPortBindingEvent(event.RowEvent):
event_name = 'WaitForCrLrpPortBindingEvent'
PREFIX = 'cr-lrp-'
TABLE = 'Port_Binding'
def __init__(self, timeout=5):
self.logical_port_events = collections.defaultdict(threading.Event)
self.timeout = timeout
super(WaitForCrLrpPortBindingEvent, self).__init__(
(self.ROW_CREATE,), 'Port_Binding', None)
def match_fn(self, event, row, old=None):
return row.logical_port.startswith(self.PREFIX)
def run(self, event, row, old):
self.logical_port_events[row.logical_port].set()
def wait(self, logical_port_name):
wait_val = self.logical_port_events[logical_port_name].wait(
self.timeout)
del self.logical_port_events[logical_port_name]
return wait_val
class WaitForCreatePortBindingEvent(test_event.WaitForPortBindingEvent):
event_name = 'WaitForCreatePortBindingEvent'
def run(self, event, row, old):
self.row = row
super(WaitForCreatePortBindingEvent, self).run(event, row, old)
class WaitForUpdatePortBindingEvent(test_event.WaitForPortBindingEvent):
event_name = 'WaitForUpdatePortBindingEvent'
def __init__(self, port, mac, timeout=5):
# Call the super of the superclass to avoid passing CREATE event type
# to the superclass.
super(test_event.WaitForPortBindingEvent, self).__init__(
(self.ROW_UPDATE,),
'Port_Binding',
(('logical_port', '=', port),
('mac', '=', mac)),
timeout=timeout)

View File

@ -0,0 +1,33 @@
# Copyright 2020 Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import fixtures
from ovsdbapp.backend.ovs_idl import connection
from neutron.plugins.ml2.drivers.ovn.mech_driver.ovsdb import ovsdb_monitor
class OVNIdlConnectionFixture(fixtures.Fixture):
def __init__(self, idl=None, constr=None, schema=None, timeout=60):
self.idl = idl or ovsdb_monitor.BaseOvnIdl.from_server(
constr, schema)
self.connection = connection.Connection(
idl=self.idl, timeout=timeout)
def _setUp(self):
self.addCleanup(self.stop)
self.connection.start()
def stop(self):
self.connection.stop()

View File

@ -0,0 +1,240 @@
# Copyright 2016 Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from distutils import spawn
import os
import fixtures
import psutil
import tenacity
from neutron.agent.linux import utils
class DaemonProcessFixture(fixtures.Fixture):
def __init__(self, temp_dir):
super(DaemonProcessFixture, self).__init__()
self.temp_dir = temp_dir
def _get_pid_from_pidfile(self, pidfile):
with open(os.path.join(self.temp_dir, pidfile), 'r') as pidfile_f:
pid = pidfile_f.read().strip()
try:
return int(pid)
except ValueError:
raise RuntimeError(
"Pidfile %(pidfile)s contains %(pid)s that "
"is not a pid" % {'pidfile': pidfile, 'pid': pid}
)
class OvnNorthd(DaemonProcessFixture):
def __init__(self, temp_dir, ovn_nb_db, ovn_sb_db, protocol='unix',
debug=True):
super(OvnNorthd, self).__init__(temp_dir)
self.ovn_nb_db = ovn_nb_db
self.ovn_sb_db = ovn_sb_db
self.protocol = protocol
self.unixctl_path = os.path.join(self.temp_dir, 'ovn_northd.ctl')
self.log_file_path = os.path.join(self.temp_dir, 'ovn_northd.log')
self.debug = debug
if self.protocol == 'ssl':
self.private_key = os.path.join(self.temp_dir, 'ovn-privkey.pem')
self.certificate = os.path.join(self.temp_dir, 'ovn-cert.pem')
self.ca_cert = os.path.join(self.temp_dir, 'controllerca',
'cacert.pem')
def _setUp(self):
self.addCleanup(self.stop)
self.start()
def start(self):
# start the ovn-northd
ovn_northd_cmd = [
spawn.find_executable('ovn-northd'), '-vconsole:off',
'--detach',
'--ovnnb-db=%s' % self.ovn_nb_db,
'--ovnsb-db=%s' % self.ovn_sb_db,
'--no-chdir',
'--unixctl=%s' % self.unixctl_path,
'--log-file=%s' % (self.log_file_path)]
if self.protocol == 'ssl':
ovn_northd_cmd.append('--private-key=%s' % self.private_key)
ovn_northd_cmd.append('--certificate=%s' % self.certificate)
ovn_northd_cmd.append('--ca-cert=%s' % self.ca_cert)
if self.debug:
ovn_northd_cmd.append('--verbose')
obj, _ = utils.create_process(ovn_northd_cmd)
obj.communicate()
def stop(self):
try:
stop_cmd = ['ovs-appctl', '-t', self.unixctl_path, 'exit']
utils.execute(stop_cmd)
except Exception:
pass
class OvsdbServer(DaemonProcessFixture):
def __init__(self, temp_dir, ovs_dir, ovn_nb_db=True, ovn_sb_db=False,
protocol='unix', debug=True):
super(OvsdbServer, self).__init__(temp_dir)
self.ovs_dir = ovs_dir
self.ovn_nb_db = ovn_nb_db
self.ovn_sb_db = ovn_sb_db
# The value of the protocol must be unix or tcp or ssl
self.protocol = protocol
self.ovsdb_server_processes = []
self.private_key = os.path.join(self.temp_dir, 'ovn-privkey.pem')
self.certificate = os.path.join(self.temp_dir, 'ovn-cert.pem')
self.ca_cert = os.path.join(self.temp_dir, 'controllerca',
'cacert.pem')
self.debug = debug
def _setUp(self):
if self.ovn_nb_db:
self.ovsdb_server_processes.append(
{'db_path': os.path.join(self.temp_dir, 'ovn_nb.db'),
'schema_path': os.path.join(self.ovs_dir, 'ovn-nb.ovsschema'),
'remote_path': os.path.join(self.temp_dir, 'ovnnb_db.sock'),
'protocol': self.protocol,
'remote_ip': '127.0.0.1',
'remote_port': '0',
'pidfile': 'ovn-nb.pid',
'unixctl_path': os.path.join(self.temp_dir, 'ovnnb_db.ctl'),
'log_file_path': os.path.join(self.temp_dir, 'ovn_nb.log'),
'db_type': 'nb',
'connection': 'db:OVN_Northbound,NB_Global,connections',
'ctl_cmd': 'ovn-nbctl'})
if self.ovn_sb_db:
self.ovsdb_server_processes.append(
{'db_path': os.path.join(self.temp_dir, 'ovn_sb.db'),
'schema_path': os.path.join(self.ovs_dir, 'ovn-sb.ovsschema'),
'remote_path': os.path.join(self.temp_dir, 'ovnsb_db.sock'),
'protocol': self.protocol,
'remote_ip': '127.0.0.1',
'remote_port': '0',
'pidfile': 'ovn-sb.pid',
'unixctl_path': os.path.join(self.temp_dir, 'ovnsb_db.ctl'),
'log_file_path': os.path.join(self.temp_dir, 'ovn_sb.log'),
'db_type': 'sb',
'connection': 'db:OVN_Southbound,SB_Global,connections',
'ctl_cmd': 'ovn-sbctl'})
self.addCleanup(self.stop)
self.start()
def _init_ovsdb_pki(self):
os.chdir(self.temp_dir)
pki_init_cmd = [spawn.find_executable('ovs-pki'), 'init',
'-d', self.temp_dir, '-l',
os.path.join(self.temp_dir, 'pki.log'), '--force']
utils.execute(pki_init_cmd)
pki_req_sign = [spawn.find_executable('ovs-pki'), 'req+sign', 'ovn',
'controller', '-d', self.temp_dir, '-l',
os.path.join(self.temp_dir, 'pki.log'), '--force']
utils.execute(pki_req_sign)
def delete_dbs(self):
for ovsdb in self.ovsdb_server_processes:
try:
os.remove(ovsdb['db_path'])
except OSError:
pass
def start(self):
pki_done = False
for ovsdb_process in self.ovsdb_server_processes:
# create the db from the schema using ovsdb-tool
ovsdb_tool_cmd = [spawn.find_executable('ovsdb-tool'),
'create', ovsdb_process['db_path'],
ovsdb_process['schema_path']]
utils.execute(ovsdb_tool_cmd)
# start the ovsdb-server
ovsdb_server_cmd = [
spawn.find_executable('ovsdb-server'), '-vconsole:off',
'--detach',
'--pidfile=%s' % os.path.join(
self.temp_dir, ovsdb_process['pidfile']),
'--log-file=%s' % (ovsdb_process['log_file_path']),
'--remote=punix:%s' % (ovsdb_process['remote_path']),
'--remote=%s' % (ovsdb_process['connection']),
'--unixctl=%s' % (ovsdb_process['unixctl_path']),
'--detach']
if ovsdb_process['protocol'] == 'ssl':
if not pki_done:
pki_done = True
self._init_ovsdb_pki()
ovsdb_server_cmd.append('--private-key=%s' % self.private_key)
ovsdb_server_cmd.append('--certificate=%s' % self.certificate)
ovsdb_server_cmd.append('--ca-cert=%s' % self.ca_cert)
ovsdb_server_cmd.append(ovsdb_process['db_path'])
if self.debug:
ovsdb_server_cmd.append('--verbose')
obj, _ = utils.create_process(ovsdb_server_cmd)
obj.communicate()
conn_cmd = [spawn.find_executable(ovsdb_process['ctl_cmd']),
'--db=unix:%s' % ovsdb_process['remote_path'],
'set-connection',
'p%s:%s:%s' % (ovsdb_process['protocol'],
ovsdb_process['remote_port'],
ovsdb_process['remote_ip']),
'--', 'set', 'connection', '.',
'inactivity_probe=60000']
@tenacity.retry(wait=tenacity.wait_exponential(multiplier=0.1),
stop=tenacity.stop_after_delay(3), reraise=True)
def _set_connection():
utils.execute(conn_cmd)
@tenacity.retry(
wait=tenacity.wait_exponential(multiplier=0.1),
stop=tenacity.stop_after_delay(10),
reraise=True)
def get_ovsdb_remote_port_retry(pid):
process = psutil.Process(pid)
for connect in process.connections():
if connect.status == 'LISTEN':
return connect.laddr[1]
raise Exception(_("Could not find LISTEN port."))
if ovsdb_process['protocol'] != 'unix':
_set_connection()
pid = self._get_pid_from_pidfile(ovsdb_process['pidfile'])
ovsdb_process['remote_port'] = \
get_ovsdb_remote_port_retry(pid)
def stop(self):
for ovsdb_process in self.ovsdb_server_processes:
try:
stop_cmd = ['ovs-appctl', '-t', ovsdb_process['unixctl_path'],
'exit']
utils.execute(stop_cmd)
except Exception:
pass
def get_ovsdb_connection_path(self, db_type='nb'):
for ovsdb_process in self.ovsdb_server_processes:
if ovsdb_process['db_type'] == db_type:
if ovsdb_process['protocol'] == 'unix':
return 'unix:' + ovsdb_process['remote_path']
else:
return '%s:%s:%s' % (ovsdb_process['protocol'],
ovsdb_process['remote_ip'],
ovsdb_process['remote_port'])

View File

@ -574,7 +574,8 @@ class NeutronDbPluginV2TestCase(testlib_api.WebTestCase):
return self.deserialize(fmt, res) return self.deserialize(fmt, res)
def _api_for_resource(self, resource): def _api_for_resource(self, resource):
if resource in ['networks', 'subnets', 'ports', 'subnetpools']: if resource in ['networks', 'subnets', 'ports', 'subnetpools',
'security-groups']:
return self.api return self.api
else: else:
return self.ext_api return self.ext_api