test_integration.py: remove neutron dependency

Use neutron_lib.utils runtime instead of neutron manager in order to get pf_plugin
used in test.

Change-Id: I8adbb3dab99c54696a1fd02366dbe91832790850
This commit is contained in:
Flavio Fernandes 2020-09-17 14:28:41 -04:00
parent e5699cf86b
commit 898d8f1191

View File

@ -17,9 +17,13 @@ from ovn_octavia_provider.common import constants as ovn_const
from ovn_octavia_provider.common import utils from ovn_octavia_provider.common import utils
from ovn_octavia_provider.tests.functional import base as ovn_base from ovn_octavia_provider.tests.functional import base as ovn_base
from neutron import manager
from neutron_lib.api.definitions import floating_ip_port_forwarding as pf_def from neutron_lib.api.definitions import floating_ip_port_forwarding as pf_def
from neutron_lib.utils import runtime
from oslo_config import cfg from oslo_config import cfg
from oslo_log import log as logging
from oslo_utils import excutils
LOG = logging.getLogger(__name__)
class TestOvnOctaviaProviderIntegration(ovn_base.TestOvnOctaviaBase): class TestOvnOctaviaProviderIntegration(ovn_base.TestOvnOctaviaBase):
@ -32,13 +36,28 @@ class TestOvnOctaviaProviderIntegration(ovn_base.TestOvnOctaviaBase):
cfg.CONF.set_override("service_plugins", list(svc_plugins)) cfg.CONF.set_override("service_plugins", list(svc_plugins))
if not self.pf_plugin: if not self.pf_plugin:
# OVN does not use RPC: disable it for port-forwarding tests # OVN does not use RPC: disable it for port-forwarding tests
self.pf_plugin = manager.NeutronManager.load_class_for_provider( self.pf_plugin = self._load_port_forwarding_class()
'neutron.service_plugins', 'port_forwarding')()
self.pf_plugin._rpc_notifications_required = False self.pf_plugin._rpc_notifications_required = False
self.assertIsNotNone(self.pf_plugin, self.assertIsNotNone(self.pf_plugin,
"TestOVNFunctionalBase is expected to have " "TestOVNFunctionalBase is expected to have "
"port forwarding plugin configured") "port forwarding plugin configured")
@staticmethod
def _load_port_forwarding_class():
"""Load port forwarding plugin
:returns: instance of plugin that is loaded
:raises ImportError: if fails to load plugin
"""
try:
loaded_class = runtime.load_class_by_alias_or_classname(
'neutron.service_plugins', 'port_forwarding')
return loaded_class()
except ImportError:
with excutils.save_and_reraise_exception():
LOG.error("Error loading port_forwarding plugin")
def _find_pf_lb(self, router_id, fip_id=None): def _find_pf_lb(self, router_id, fip_id=None):
result = [] result = []
for ovn_lb in self.nb_api.get_router_floatingip_lbs( for ovn_lb in self.nb_api.get_router_floatingip_lbs(