WIP Removing python-neutronclient code

Depends-On: https://review.opendev.org/c/openstack/octavia/+/866327
Change-Id: I985b24e4a6db962b1e73eeae69a8c96f4b0760ae
This commit is contained in:
Gregory Thiemonge 2023-01-16 03:31:03 -05:00
parent 9c9085eaab
commit 4ad1eaf5ad
8 changed files with 353 additions and 291 deletions

View File

@ -10,11 +10,11 @@
# License for the specific language governing permissions and limitations
# under the License.
from keystoneauth1 import exceptions as ks_exceptions
from keystoneauth1 import loading as ks_loading
from neutronclient.common import exceptions as n_exc
from neutronclient.neutron import client as neutron_client
from octavia_lib.api.drivers import exceptions as driver_exceptions
import openstack
from oslo_config import cfg
from oslo_log import log as logging
from oslo_utils import excutils
@ -25,8 +25,6 @@ from ovn_octavia_provider.i18n import _
LOG = logging.getLogger(__name__)
CONF = cfg.CONF
NEUTRON_VERSION = '2.0'
class KeystoneSession():
@ -35,8 +33,6 @@ class KeystoneSession():
self._auth = None
self.section = section
ks_loading.register_auth_conf_options(cfg.CONF, self.section)
ks_loading.register_session_conf_options(cfg.CONF, self.section)
@property
def session(self):
@ -52,8 +48,55 @@ class KeystoneSession():
@property
def auth(self):
if not self._auth:
self._auth = ks_loading.load_auth_from_conf_options(
cfg.CONF, self.section)
try:
self._auth = ks_loading.load_auth_from_conf_options(
cfg.CONF, self.section)
except ks_exceptions.auth_plugins.MissingRequiredOptions as e:
if self.section == constants.SERVICE_AUTH:
raise e
# NOTE(gthiemonge): MissingRequiredOptions is raised: there is
# one or more missing auth options in the config file. It may
# be due to the migration from python-neutronclient to
# openstacksdk.
# With neutronclient, most of the auth settings were in
# [service_auth] with a few overrides in [neutron],
# but with openstacksdk, we have all the auth settings in the
# [neutron] section. In order to support smooth upgrades, in
# case those options are missing, we override the undefined
# options with the existing settings from [service_auth].
# This code should be removed when all the deployment tools set
# the correct options in [neutron]
# The config options are lazily registered/loaded by keystone,
# it means that we cannot get/set them before invoking
# 'load_auth_from_conf_options' on 'service_auth'.
ks_loading.load_auth_from_conf_options(
cfg.CONF, constants.SERVICE_AUTH)
config = getattr(cfg.CONF, self.section)
for opt in config:
# For each option in the [neutron] section, get its setting
# location, if the location is 'opt_default' or
# 'set_default', it means that the option is not configured
# in the config file, it should be replaced with the one
# from [service_auth]
loc = cfg.CONF.get_location(opt, self.section)
if not loc or loc.location in (cfg.Locations.opt_default,
cfg.Locations.set_default):
if hasattr(cfg.CONF.service_auth, opt):
cur_value = getattr(config, opt)
value = getattr(cfg.CONF.service_auth, opt)
if value != cur_value:
LOG.debug("Overriding [%s].%s with '%s'",
self.section, opt, value)
cfg.CONF.set_override(opt, value, self.section)
# Now we can call load_auth_from_conf_options for this specific
# service with the newly defined options.
self._auth = ks_loading.load_auth_from_conf_options(
cfg.CONF, self.section)
return self._auth
@ -68,34 +111,18 @@ class Singleton(type):
class NeutronAuth(metaclass=Singleton):
def __init__(self, region, service_name=None, endpoint=None,
endpoint_type='publicURL', insecure=False,
ca_cert=None):
"""Create neutron client object.
:param region: The region of the service
:param service_name: The name of the neutron service in the catalog
:param endpoint: The endpoint of the service
:param endpoint_type: The endpoint_type of the service
:param insecure: Turn off certificate validation
:param ca_cert: CA Cert file path
:return: a Neutron Client object.
:raises Exception: if the client cannot be created
"""
ksession = KeystoneSession()
kwargs = {'region_name': region,
'session': ksession.session,
'endpoint_type': endpoint_type,
'insecure': insecure}
if service_name:
kwargs['service_name'] = service_name
if endpoint:
kwargs['endpoint_override'] = endpoint
if ca_cert:
kwargs['ca_cert'] = ca_cert
def __init__(self):
"""Create neutron client object."""
try:
self.neutron_client = neutron_client.Client(
NEUTRON_VERSION, **kwargs)
ksession = KeystoneSession('neutron')
kwargs = {}
if CONF.neutron.endpoint_override:
kwargs['network_endpoint_override'] = (
CONF.neutron.endpoint_override)
self.network_proxy = openstack.connection.Connection(
session=ksession.session, **kwargs).network
except Exception:
with excutils.save_and_reraise_exception():
LOG.exception("Error creating Neutron client.")
@ -103,16 +130,9 @@ class NeutronAuth(metaclass=Singleton):
def get_neutron_client():
try:
return NeutronAuth(
endpoint=CONF.neutron.endpoint,
region=CONF.neutron.region_name,
endpoint_type=CONF.neutron.endpoint_type,
service_name=CONF.neutron.service_name,
insecure=CONF.neutron.insecure,
ca_cert=CONF.neutron.ca_certificates_file,
).neutron_client
except n_exc.NeutronClientException as e:
msg = _('Cannot inialize Neutron Client. Exception: %s. '
return NeutronAuth().network_proxy
except Exception as e:
msg = _('Cannot initialize OpenStackSDK. Exception: %s. '
'Please verify Neutron service configuration '
'in Octavia API configuration.') % e
raise driver_exceptions.DriverError(

View File

@ -79,40 +79,92 @@ ovn_opts = [
]
neutron_opts = [
cfg.StrOpt('service_name',
help=_('The name of the neutron service in the '
'keystone catalog')),
cfg.StrOpt('endpoint', help=_('A new endpoint to override the endpoint '
'in the keystone catalog.')),
cfg.StrOpt('region_name',
help=_('Region in Identity service catalog to use for '
'communication with the OpenStack services.')),
cfg.StrOpt('endpoint_type', default='publicURL',
help=_('Endpoint interface in identity service to use')),
'in the keystone catalog.'),
deprecated_for_removal=True,
deprecated_reason=_('The endpoint_override option defined by '
'keystoneauth1 is the new name for this '
'option.'),
deprecated_since='Antelope'),
cfg.StrOpt('endpoint_type', help=_('Endpoint interface in identity '
'service to use'),
deprecated_for_removal=True,
deprecated_reason=_('This option was replaced by the '
'valid_interfaces option defined by '
'keystoneauth.'),
deprecated_since='Antelope'),
cfg.StrOpt('ca_certificates_file',
help=_('CA certificates file path')),
cfg.BoolOpt('insecure',
default=False,
help=_('Disable certificate validation on SSL connections ')),
help=_('CA certificates file path'),
deprecated_for_removal=True,
deprecated_reason=_('The cafile option defined by '
'keystoneauth1 is the new name for this '
'option.'),
deprecated_since='Antelope'),
]
def handle_neutron_deprecations():
# Apply neutron deprecated options to their new setting if needed
# Basicaly: if the value of the deprecated option is not the default:
# * convert it to a valid "new" value if needed
# * set it as the default for the new option
# Thus [neutron].<new_option> has an higher precedence than
# [neutron].<deprecated_option>
loc = cfg.CONF.get_location('endpoint', 'neutron')
if loc and loc.location != cfg.Locations.opt_default:
cfg.CONF.set_default('endpoint_override', cfg.CONF.neutron.endpoint,
'neutron')
loc = cfg.CONF.get_location('endpoint_type', 'neutron')
if loc and loc.location != cfg.Locations.opt_default:
endpoint_type = cfg.CONF.neutron.endpoint_type.replace('URL', '')
cfg.CONF.set_default('valid_interfaces', [endpoint_type],
'neutron')
loc = cfg.CONF.get_location('ca_certificates_file', 'neutron')
if loc and loc.location != cfg.Locations.opt_default:
cfg.CONF.set_default('cafile', cfg.CONF.neutron.ca_certificates_file,
'neutron')
def register_opts():
# NOTE (froyo): just to not try to re-register options already done
# by Neutron, specially in test scope, that will get a DuplicateOptError
missing_opts = ovn_opts
missing_ovn_opts = ovn_opts
try:
neutron_registered_opts = [opt for opt in cfg.CONF.ovn]
missing_opts = [opt for opt in ovn_opts
if opt.name not in neutron_registered_opts]
missing_ovn_opts = [opt for opt in ovn_opts
if opt.name not in neutron_registered_opts]
except cfg.NoSuchOptError:
LOG.info('Not found any opts under group ovn registered by Neutron')
cfg.CONF.register_opts(missing_opts, group='ovn')
cfg.CONF.register_opts(neutron_opts, group='neutron')
# Do the same for neutron options that have been already registered by
# Octavia
missing_neutron_opts = neutron_opts
try:
neutron_registered_opts = [opt for opt in cfg.CONF.neutron]
missing_neutron_opts = [opt for opt in neutron_opts
if opt.name not in neutron_registered_opts]
except cfg.NoSuchOptError:
LOG.info('Not found any opts under group neutron')
cfg.CONF.register_opts(missing_ovn_opts, group='ovn')
cfg.CONF.register_opts(missing_neutron_opts, group='neutron')
ks_loading.register_auth_conf_options(cfg.CONF, 'service_auth')
ks_loading.register_session_conf_options(cfg.CONF, 'service_auth')
ks_loading.register_auth_conf_options(cfg.CONF, 'neutron')
ks_loading.register_session_conf_options(cfg.CONF, 'neutron')
ks_loading.register_adapter_conf_options(cfg.CONF, 'neutron',
include_deprecated=False)
# Override default auth_type for plugins with the default from service_auth
auth_type = cfg.CONF.service_auth.auth_type
cfg.CONF.set_default('auth_type', auth_type, 'neutron')
handle_neutron_deprecations()
def list_opts():
return [

View File

@ -461,15 +461,18 @@ class OvnProviderDriver(driver_base.ProviderDriver):
# TODO(gthiemonge): implement additional_vip_dicts
try:
port = self._ovn_helper.create_vip_port(
project_id, lb_id, vip_dict)['port']
project_id, lb_id, vip_dict)
vip_dict[constants.VIP_PORT_ID] = port['id']
vip_dict[constants.VIP_ADDRESS] = (
port['fixed_ips'][0]['ip_address'])
except Exception as e:
kwargs = {}
if hasattr(e, 'message'):
kwargs = {'user_fault_string': e.message,
'operator_fault_string': e.message}
for attr in ('details', 'message'):
if hasattr(e, attr):
value = getattr(e, attr)
kwargs = {'user_fault_string': value,
'operator_fault_string': value}
break
raise driver_exceptions.DriverError(
**kwargs)
return vip_dict, []

View File

@ -19,11 +19,11 @@ import threading
import netaddr
from neutron_lib import constants as n_const
from neutronclient.common import exceptions as n_exc
from octavia_lib.api.drivers import data_models as o_datamodels
from octavia_lib.api.drivers import driver_lib as o_driver_lib
from octavia_lib.api.drivers import exceptions as driver_exceptions
from octavia_lib.common import constants
import openstack
from oslo_config import cfg
from oslo_log import log as logging
from oslo_serialization import jsonutils
@ -138,11 +138,10 @@ class OvnProviderHelper():
# one when the network was created
neutron_client = clients.get_neutron_client()
meta_dhcp_port = neutron_client.list_ports(
meta_dhcp_port = neutron_client.ports(
network_id=network_id,
device_owner=n_const.DEVICE_OWNER_DISTRIBUTED)
if meta_dhcp_port['ports']:
return meta_dhcp_port['ports'][0]
return next(meta_dhcp_port, None)
def _get_nw_router_info_on_interface_event(self, lrp):
"""Get the Router and Network information on an interface event
@ -511,9 +510,9 @@ class OvnProviderHelper():
if lb and lb.vip_subnet_id:
neutron_client = clients.get_neutron_client()
try:
subnet = neutron_client.show_subnet(lb.vip_subnet_id)
vip_subnet_cidr = subnet['subnet']['cidr']
except n_exc.NotFound:
subnet = neutron_client.get_subnet(lb.vip_subnet_id)
vip_subnet_cidr = subnet.cidr
except openstack.exceptions.ResourceNotFound:
LOG.warning('Subnet %s not found while trying to '
'fetch its data.', lb.vip_subnet_id)
return None, None
@ -557,9 +556,9 @@ class OvnProviderHelper():
else:
neutron_client = clients.get_neutron_client()
try:
subnet = neutron_client.show_subnet(subnet_id)
ls_name = utils.ovn_name(subnet['subnet']['network_id'])
except n_exc.NotFound:
subnet = neutron_client.get_subnet(subnet_id)
ls_name = utils.ovn_name(subnet.network_id)
except openstack.exceptions.ResourceNotFound:
LOG.warning('Subnet %s not found while trying to '
'fetch its data.', subnet_id)
ls_name = None
@ -916,38 +915,38 @@ class OvnProviderHelper():
neutron_client = clients.get_neutron_client()
if loadbalancer.get(constants.VIP_PORT_ID):
# In case we don't have vip_network_id
port = neutron_client.show_port(
loadbalancer[constants.VIP_PORT_ID])['port']
for ip in port['fixed_ips']:
port = neutron_client.get_port(
loadbalancer[constants.VIP_PORT_ID])
for ip in port.fixed_ips:
if ip['ip_address'] == loadbalancer[constants.VIP_ADDRESS]:
subnet = neutron_client.show_subnet(
ip['subnet_id'])['subnet']
subnet = neutron_client.get_subnet(
ip['subnet_id'])
break
elif (loadbalancer.get(constants.VIP_NETWORK_ID) and
loadbalancer.get(constants.VIP_ADDRESS)):
ports = neutron_client.list_ports(
ports = neutron_client.ports(
network_id=loadbalancer[constants.VIP_NETWORK_ID])
for p in ports['ports']:
for ip in p['fixed_ips']:
for p in ports:
for ip in p.fixed_ips:
if ip['ip_address'] == loadbalancer[
constants.VIP_ADDRESS]:
port = p
subnet = neutron_client.show_subnet(
ip['subnet_id'])['subnet']
subnet = neutron_client.get_subnet(
ip['subnet_id'])
break
except Exception:
LOG.error('Cannot get info from neutron client')
LOG.exception(ovn_const.EXCEPTION_MSG, "creation of loadbalancer")
# Any Exception set the status to ERROR
if isinstance(port, dict):
if port:
try:
self.delete_vip_port(port.get('id'))
self.delete_vip_port(port.id)
LOG.warning("Deleting the VIP port %s since LB went into "
"ERROR state", str(port.get('id')))
"ERROR state", str(port.id))
except Exception:
LOG.exception("Error deleting the VIP port %s upon "
"loadbalancer %s creation failure",
str(port.get('id')),
str(port.id),
str(loadbalancer[constants.ID]))
status = {
constants.LOADBALANCERS: [
@ -963,7 +962,7 @@ class OvnProviderHelper():
external_ids = {
ovn_const.LB_EXT_IDS_VIP_KEY: loadbalancer[constants.VIP_ADDRESS],
ovn_const.LB_EXT_IDS_VIP_PORT_ID_KEY:
loadbalancer.get(constants.VIP_PORT_ID) or port['id'],
loadbalancer.get(constants.VIP_PORT_ID) or port.id,
'enabled': str(loadbalancer[constants.ADMIN_STATE_UP])}
# In case vip_fip was passed - use it.
vip_fip = loadbalancer.get(ovn_const.LB_EXT_IDS_VIP_FIP_KEY)
@ -992,9 +991,9 @@ class OvnProviderHelper():
# NOTE(froyo): This is the association of the lb to the VIP ls
# so this is executed right away
self._update_lb_to_ls_association(
ovn_lb, network_id=port['network_id'],
ovn_lb, network_id=port.network_id,
associate=True, update_ls_ref=True)
ls_name = utils.ovn_name(port['network_id'])
ls_name = utils.ovn_name(port.network_id)
ovn_ls = self.ovn_nbdb_api.ls_get(ls_name).execute(
check_error=True)
ovn_lr = self._find_lr_of_ls(ovn_ls, subnet.get('gateway_ip'))
@ -1050,15 +1049,15 @@ class OvnProviderHelper():
except Exception:
LOG.exception(ovn_const.EXCEPTION_MSG, "creation of loadbalancer")
# Any Exception set the status to ERROR
if isinstance(port, dict):
if port:
try:
self.delete_vip_port(port.get('id'))
self.delete_vip_port(port.id)
LOG.warning("Deleting the VIP port %s since LB went into "
"ERROR state", str(port.get('id')))
"ERROR state", str(port.id))
except Exception:
LOG.exception("Error deleting the VIP port %s upon "
"loadbalancer %s creation failure",
str(port.get('id')),
str(port.id),
str(loadbalancer[constants.ID]))
status = {
constants.LOADBALANCERS: [
@ -1771,13 +1770,13 @@ class OvnProviderHelper():
neutron_client = clients.get_neutron_client()
ovn_lr = None
try:
subnet = neutron_client.show_subnet(subnet_id)
ls_name = utils.ovn_name(subnet['subnet']['network_id'])
subnet = neutron_client.get_subnet(subnet_id)
ls_name = utils.ovn_name(subnet.network_id)
ovn_ls = self.ovn_nbdb_api.ls_get(ls_name).execute(
check_error=True)
ovn_lr = self._find_lr_of_ls(
ovn_ls, subnet['subnet'].get('gateway_ip'))
except n_exc.NotFound:
ovn_ls, subnet.gateway_ip)
except openstack.exceptions.ResourceNotFound:
pass
except idlutils.RowNotFound:
pass
@ -2028,49 +2027,46 @@ class OvnProviderHelper():
return meminf.split('_')[1]
def create_vip_port(self, project_id, lb_id, vip_d):
port = {'port': {'name': ovn_const.LB_VIP_PORT_PREFIX + str(lb_id),
'network_id': vip_d[constants.VIP_NETWORK_ID],
'fixed_ips': [{'subnet_id': vip_d['vip_subnet_id']}],
'admin_state_up': True,
'project_id': project_id}}
port = {'name': ovn_const.LB_VIP_PORT_PREFIX + str(lb_id),
'network_id': vip_d[constants.VIP_NETWORK_ID],
'fixed_ips': [{'subnet_id': vip_d['vip_subnet_id']}],
'admin_state_up': True,
'project_id': project_id}
try:
port['port']['fixed_ips'][0]['ip_address'] = (
port['fixed_ips'][0]['ip_address'] = (
vip_d[constants.VIP_ADDRESS])
except KeyError:
pass
neutron_client = clients.get_neutron_client()
try:
return neutron_client.create_port(port)
except n_exc.IpAddressAlreadyAllocatedClient as e:
return neutron_client.create_port(**port)
except openstack.exceptions.ConflictException as e:
# Sometimes the VIP is already created (race-conditions)
# Lets get the it from Neutron API.
ports = neutron_client.list_ports(
network_id=vip_d[constants.VIP_NETWORK_ID],
name=f'{ovn_const.LB_VIP_PORT_PREFIX}{lb_id}')
if not ports['ports']:
port = neutron_client.find_port(
name_or_id=f'{ovn_const.LB_VIP_PORT_PREFIX}{lb_id}',
network_id=vip_d[constants.VIP_NETWORK_ID])
if not port:
LOG.error('Cannot create/get LoadBalancer VIP port with '
'fixed IP: %s', vip_d[constants.VIP_ADDRESS])
raise e
# there should only be one port returned
port = ports['ports'][0]
LOG.debug('VIP Port already exists, uuid: %s', port['id'])
return {'port': port}
except n_exc.NeutronClientException as e:
LOG.debug('VIP Port already exists, uuid: %s', port.id)
return port
except openstack.exceptions.HttpException as e:
# NOTE (froyo): whatever other exception as e.g. Timeout
# we should try to ensure no leftover port remains
ports = neutron_client.list_ports(
network_id=vip_d[constants.VIP_NETWORK_ID],
name=f'{ovn_const.LB_VIP_PORT_PREFIX}{lb_id}')
if ports['ports']:
port = ports['ports'][0]
port = neutron_client.find_port(
name_or_id=f'{ovn_const.LB_VIP_PORT_PREFIX}{lb_id}',
network_id=vip_d[constants.VIP_NETWORK_ID])
if port:
LOG.debug('Leftover port %s has been found. Trying to '
'delete it', port['id'])
self.delete_vip_port(port['id'])
'delete it', port.id)
self.delete_vip_port(port.id)
raise e
@tenacity.retry(
retry=tenacity.retry_if_exception_type(
n_exc.NeutronClientException),
openstack.exceptions.HttpException),
wait=tenacity.wait_exponential(max=75),
stop=tenacity.stop_after_attempt(15),
reraise=True)
@ -2078,7 +2074,7 @@ class OvnProviderHelper():
neutron_client = clients.get_neutron_client()
try:
neutron_client.delete_port(port_id)
except n_exc.PortNotFoundClient:
except openstack.exceptions.ResourceNotFound:
LOG.warning("Port %s could not be found. Please "
"check Neutron logs. Perhaps port "
"was already deleted.", port_id)
@ -2118,9 +2114,9 @@ class OvnProviderHelper():
# Find out if member has FIP assigned.
neutron_client = clients.get_neutron_client()
try:
subnet = neutron_client.show_subnet(info['subnet_id'])
ls_name = utils.ovn_name(subnet['subnet']['network_id'])
except n_exc.NotFound:
subnet = neutron_client.get_subnet(info['subnet_id'])
ls_name = utils.ovn_name(subnet.network_id)
except openstack.exceptions.ResourceNotFound:
LOG.exception('Subnet %s not found while trying to '
'fetch its data.', info['subnet_id'])
return
@ -2175,15 +2171,14 @@ class OvnProviderHelper():
# We should call neutron API to do 'empty' update of the FIP.
# It will bump revision number and do recomputation of the FIP.
try:
fip_info = neutron_client.show_floatingip(
fip_info = neutron_client.get_ip(
fip.external_ids[ovn_const.OVN_FIP_EXT_ID_KEY])
empty_update = {
"floatingip": {
'description': fip_info['floatingip']['description']}}
neutron_client.update_floatingip(
'description': fip_info['description']}
neutron_client.update_ip(
fip.external_ids[ovn_const.OVN_FIP_EXT_ID_KEY],
empty_update)
except n_exc.NotFound:
**empty_update)
except openstack.exceptions.ResourceNotFound:
LOG.warning('Member %(member)s FIP %(fip)s not found in '
'Neutron. Cannot update it.',
{'member': info['id'],
@ -2192,12 +2187,12 @@ class OvnProviderHelper():
def _get_member_lsp(self, member_ip, member_subnet_id):
neutron_client = clients.get_neutron_client()
try:
member_subnet = neutron_client.show_subnet(member_subnet_id)
except n_exc.NotFound:
member_subnet = neutron_client.get_subnet(member_subnet_id)
except openstack.exceptions.ResourceNotFound:
LOG.exception('Subnet %s not found while trying to '
'fetch its data.', member_subnet_id)
return
ls_name = utils.ovn_name(member_subnet['subnet']['network_id'])
ls_name = utils.ovn_name(member_subnet.network_id)
try:
ls = self.ovn_nbdb_api.lookup('Logical_Switch', ls_name)
except idlutils.RowNotFound:
@ -2326,7 +2321,7 @@ class OvnProviderHelper():
'pool': pool_key})
return False
hm_source_ip = None
for fixed_ip in hm_port['fixed_ips']:
for fixed_ip in hm_port.fixed_ips:
if fixed_ip['subnet_id'] == member_subnet:
hm_source_ip = fixed_ip['ip_address']
break

View File

@ -57,28 +57,28 @@ class TestOvnOctaviaBase(base.TestOVNFunctionalBase,
self.fake_neutron_client = mock.MagicMock()
clients.get_neutron_client = mock.MagicMock()
clients.get_neutron_client.return_value = self.fake_neutron_client
self.fake_neutron_client.show_subnet = self._mock_show_subnet
self.fake_neutron_client.list_ports = self._mock_list_ports
self.fake_neutron_client.show_port = self._mock_show_port
self.fake_neutron_client.get_subnet = self._mock_get_subnet
self.fake_neutron_client.ports = self._mock_ports
self.fake_neutron_client.get_port = self._mock_get_port
self.fake_neutron_client.delete_port.return_value = True
self._local_net_cache = {}
self._local_cidr_cache = {}
self._local_port_cache = {'ports': []}
self.core_plugin = directory.get_plugin()
def _mock_show_subnet(self, subnet_id):
subnet = {}
subnet['network_id'] = self._local_net_cache[subnet_id]
subnet['cidr'] = self._local_cidr_cache[subnet_id]
return {'subnet': subnet}
def _mock_get_subnet(self, subnet_id):
subnet = mock.Mock(spec_set=['network_id', 'cidr'])
subnet.network_id = self._local_net_cache[subnet_id]
subnet.cidr = self._local_cidr_cache[subnet_id]
return subnet
def _mock_list_ports(self, **kwargs):
def _mock_ports(self, **kwargs):
return self._local_port_cache
def _mock_show_port(self, port_id):
def _mock_get_port(self, port_id):
for port in self._local_port_cache['ports']:
if port['id'] == port_id:
return {'port': port}
return port
def _create_provider_network(self):
e1 = self._make_network(self.fmt, 'e1', True,

View File

@ -16,79 +16,62 @@ from unittest import mock
from oslotest import base
from ovn_octavia_provider.common import clients
from ovn_octavia_provider.common import config
class TestKeystoneSession(base.BaseTestCase):
@mock.patch(
'keystoneauth1.loading.register_auth_conf_options')
@mock.patch(
'keystoneauth1.loading.register_session_conf_options')
def test_init(self, kl_rs, kl_ra):
clients.KeystoneSession()
kl_ra.assert_called_once_with(mock.ANY, 'service_auth')
kl_rs.assert_called_once_with(mock.ANY, 'service_auth')
@mock.patch(
'keystoneauth1.loading.load_session_from_conf_options')
def test_cached_session(self, kl):
ksession = clients.KeystoneSession()
@mock.patch(
'keystoneauth1.loading.load_auth_from_conf_options')
def test_cached_session(self, kl_auth, kl_session):
ksession = clients.KeystoneSession('neutron')
self.assertIs(
ksession.session,
ksession.session)
kl.assert_called_once_with(
mock.ANY, 'service_auth', auth=ksession.auth)
kl_session.assert_called_once_with(
mock.ANY, 'neutron', auth=ksession.auth)
@mock.patch(
'keystoneauth1.loading.load_auth_from_conf_options')
def test_cached_auth(self, kl):
ksession = clients.KeystoneSession()
ksession = clients.KeystoneSession('neutron')
self.assertIs(
ksession.auth,
ksession.auth)
kl.assert_called_once_with(mock.ANY, 'service_auth')
kl.assert_called_once_with(mock.ANY, 'neutron')
class TestNeutronAuth(base.BaseTestCase):
def setUp(self):
super().setUp()
config.register_opts()
self.mock_client = mock.patch(
'neutronclient.neutron.client.Client').start()
self.client_args = {
'endpoint': 'foo_endpoint',
'region': 'foo_region',
'endpoint_type': 'foo_endpoint_type',
'service_name': 'foo_service_name',
'insecure': 'foo_insecure',
'ca_cert': 'foo_ca_cert'}
'openstack.connection.Connection').start()
clients.Singleton._instances = {}
@mock.patch.object(clients, 'KeystoneSession')
def test_init(self, mock_ks):
clients.NeutronAuth(**self.client_args)
clients.NeutronAuth()
self.mock_client.assert_called_once_with(
'2.0',
endpoint_override=self.client_args['endpoint'],
region_name=self.client_args['region'],
endpoint_type=self.client_args['endpoint_type'],
service_name=self.client_args['service_name'],
insecure=self.client_args['insecure'],
ca_cert=self.client_args['ca_cert'],
session=mock_ks().session)
def test_singleton(self):
c1 = clients.NeutronAuth(**self.client_args)
c2 = clients.NeutronAuth(**self.client_args)
c1 = clients.NeutronAuth()
c2 = clients.NeutronAuth()
self.assertIs(c1, c2)
def test_singleton_exception(self):
mock_client = mock.Mock()
mock_client.network_proxy = 'foo'
with mock.patch(
'neutronclient.neutron.client.Client',
side_effect=[RuntimeError, 'foo', 'foo']) as n_cli:
'openstack.connection.Connection',
side_effect=[RuntimeError,
mock_client, mock_client]) as os_sdk:
self.assertRaises(
RuntimeError,
clients.NeutronAuth,
**self.client_args)
c2 = clients.NeutronAuth(**self.client_args)
c3 = clients.NeutronAuth(**self.client_args)
clients.NeutronAuth)
c2 = clients.NeutronAuth()
c3 = clients.NeutronAuth()
self.assertIs(c2, c3)
self.assertEqual(n_cli._mock_call_count, 2)
self.assertEqual(os_sdk._mock_call_count, 2)

View File

@ -86,6 +86,14 @@ class FakeResource(dict):
self._add_details(info)
class FakeOpenStackSDKResource(FakeResource):
def __getattr__(self, attr):
if attr in self._info:
return self._info[attr]
else:
raise AttributeError("No such attribute '{}'".format(attr))
class FakeOvsdbRow(FakeResource):
"""Fake one or more OVSDB rows."""
@ -165,8 +173,8 @@ class FakeSubnet():
# Overwrite default attributes.
subnet_attrs.update(attrs)
return FakeResource(info=copy.deepcopy(subnet_attrs),
loaded=True)
return FakeOpenStackSDKResource(info=copy.deepcopy(subnet_attrs),
loaded=True)
class FakeOVNPort():
@ -289,8 +297,8 @@ class FakePort():
# Overwrite default attributes.
port_attrs.update(attrs)
return FakeResource(info=copy.deepcopy(port_attrs),
loaded=True)
return FakeOpenStackSDKResource(info=copy.deepcopy(port_attrs),
loaded=True)
class FakeLB(data_models.LoadBalancer):

View File

@ -11,6 +11,7 @@
# License for the specific language governing permissions and limitations
# under the License.
#
import collections
import copy
from unittest import mock
@ -19,6 +20,7 @@ from neutronclient.common import exceptions as n_exc
from octavia_lib.api.drivers import data_models
from octavia_lib.api.drivers import exceptions
from octavia_lib.common import constants
import openstack
from oslo_serialization import jsonutils
from oslo_utils import uuidutils
from ovsdbapp.backend.ovs_idl import idlutils
@ -32,6 +34,10 @@ from ovn_octavia_provider.tests.unit import base as ovn_base
from ovn_octavia_provider.tests.unit import fakes
Port = collections.namedtuple('Port', 'id, name, network_id, fixed_ips',
defaults=[None, '', None, []])
class TestOvnProviderHelper(ovn_base.TestOvnOctaviaBase):
def setUp(self):
@ -54,11 +60,11 @@ class TestOvnProviderHelper(ovn_base.TestOvnOctaviaBase):
'cascade': False,
'vip_network_id': self.vip_network_id,
'admin_state_up': False}
self.ports = {'ports': [{
'fixed_ips': [{'ip_address': self.vip_address,
'subnet_id': uuidutils.generate_uuid()}],
'network_id': self.vip_network_id,
'id': self.port1_id}]}
self.ports = [Port(
fixed_ips=[{'ip_address': self.vip_address,
'subnet_id': uuidutils.generate_uuid()}],
network_id=self.vip_network_id,
id=self.port1_id)]
self.pool = {'id': self.pool_id,
'loadbalancer_id': self.loadbalancer_id,
'listener_id': self.listener_id,
@ -381,8 +387,9 @@ class TestOvnProviderHelper(ovn_base.TestOvnOctaviaBase):
@mock.patch('ovn_octavia_provider.common.clients.get_neutron_client')
def test__get_subnet_from_pool(self, net_cli):
net_cli.return_value.show_subnet.return_value = {
'subnet': {'cidr': '10.22.33.0/24'}}
net_cli.return_value.get_subnet.return_value = (
fakes.FakeSubnet.create_one_subnet(
attrs={'cidr': '10.22.33.0/24'}))
f = self.helper._get_subnet_from_pool
@ -533,7 +540,7 @@ class TestOvnProviderHelper(ovn_base.TestOvnOctaviaBase):
@mock.patch('ovn_octavia_provider.common.clients.get_neutron_client')
def test_lb_create_disabled(self, net_cli):
self.lb['admin_state_up'] = False
net_cli.return_value.list_ports.return_value = self.ports
net_cli.return_value.ports.return_value = self.ports
status = self.helper.lb_create(self.lb)
self.assertEqual(status['loadbalancers'][0]['provisioning_status'],
constants.ACTIVE)
@ -551,7 +558,7 @@ class TestOvnProviderHelper(ovn_base.TestOvnOctaviaBase):
@mock.patch('ovn_octavia_provider.common.clients.get_neutron_client')
def test_lb_create_enabled(self, net_cli):
self.lb['admin_state_up'] = True
net_cli.return_value.list_ports.return_value = self.ports
net_cli.return_value.ports.return_value = self.ports
status = self.helper.lb_create(self.lb)
self.assertEqual(status['loadbalancers'][0]['provisioning_status'],
constants.ACTIVE)
@ -576,7 +583,7 @@ class TestOvnProviderHelper(ovn_base.TestOvnOctaviaBase):
self._update_lb_to_ls_association.stop()
self.lb['admin_state_up'] = True
f_lr.return_value = self.router
net_cli.return_value.list_ports.return_value = self.ports
net_cli.return_value.ports.return_value = self.ports
self.helper._update_lb_to_lr_association.side_effect = [
idlutils.RowNotFound]
status = self.helper.lb_create(self.lb)
@ -603,7 +610,7 @@ class TestOvnProviderHelper(ovn_base.TestOvnOctaviaBase):
@mock.patch('ovn_octavia_provider.common.clients.get_neutron_client')
def test_lb_create_selection_fields_not_supported(self, net_cli):
self.lb['admin_state_up'] = True
net_cli.return_value.list_ports.return_value = self.ports
net_cli.return_value.ports.return_value = self.ports
self.helper._are_selection_fields_supported = (
mock.Mock(return_value=False))
status = self.helper.lb_create(self.lb)
@ -622,8 +629,8 @@ class TestOvnProviderHelper(ovn_base.TestOvnOctaviaBase):
@mock.patch('ovn_octavia_provider.common.clients.get_neutron_client')
def test_lb_create_selection_fields_not_supported_algo(self, net_cli):
self.lb['admin_state_up'] = True
net_cli.return_value.list_ports.return_value = self.ports
net_cli.return_value.show_subnet.return_value = {
net_cli.return_value.ports.return_value = self.ports
net_cli.return_value.get_subnet.return_value = {
'subnet': mock.MagicMock()}
self.pool['lb_algoritm'] = 'foo'
status = self.helper.lb_create(self.lb)
@ -653,7 +660,7 @@ class TestOvnProviderHelper(ovn_base.TestOvnOctaviaBase):
self.lb['protocol'] = protocol
self.lb[ovn_const.LB_EXT_IDS_LR_REF_KEY] = 'foo'
self.lb[ovn_const.LB_EXT_IDS_LS_REFS_KEY] = '{\"neutron-foo\": 1}'
net_cli.return_value.list_ports.return_value = self.ports
net_cli.return_value.ports.return_value = self.ports
status = self.helper.lb_create(self.lb, protocol=protocol)
self.assertEqual(status['loadbalancers'][0]['provisioning_status'],
constants.ACTIVE)
@ -683,8 +690,9 @@ class TestOvnProviderHelper(ovn_base.TestOvnOctaviaBase):
@mock.patch('ovn_octavia_provider.common.clients.get_neutron_client')
def test_lb_create_neutron_client_exception(self, net_cli):
net_cli.return_value.list_ports.return_value = self.ports
net_cli.return_value.show_subnet.side_effect = [n_exc.NotFound]
net_cli.return_value.ports.return_value = self.ports
net_cli.return_value.get_subnet.side_effect = [
openstack.exceptions.ResourceNotFound]
status = self.helper.lb_create(self.lb)
self.assertEqual(status['loadbalancers'][0]['provisioning_status'],
constants.ERROR)
@ -695,13 +703,13 @@ class TestOvnProviderHelper(ovn_base.TestOvnOctaviaBase):
@mock.patch.object(ovn_helper.OvnProviderHelper, 'delete_vip_port')
def test_lb_create_exception(self, del_port, net_cli):
self.helper._find_ovn_lbs.side_effect = [RuntimeError]
net_cli.return_value.list_ports.return_value = self.ports
net_cli.return_value.ports.return_value = self.ports
status = self.helper.lb_create(self.lb)
self.assertEqual(status['loadbalancers'][0]['provisioning_status'],
constants.ERROR)
self.assertEqual(status['loadbalancers'][0]['operating_status'],
constants.ERROR)
del_port.assert_called_once_with(self.ports.get('ports')[0]['id'])
del_port.assert_called_once_with(self.ports[0].id)
del_port.side_effect = [Exception]
status = self.helper.lb_create(self.lb)
self.assertEqual(status['loadbalancers'][0]['provisioning_status'],
@ -1540,7 +1548,7 @@ class TestOvnProviderHelper(ovn_base.TestOvnOctaviaBase):
@mock.patch('ovn_octavia_provider.common.clients.get_neutron_client')
def test_member_create(self, net_cli):
net_cli.return_value.show_subnet.side_effect = [
net_cli.return_value.get_subnet.side_effect = [
idlutils.RowNotFound, idlutils.RowNotFound]
self.ovn_lb.external_ids = mock.MagicMock()
status = self.helper.member_create(self.member)
@ -1566,7 +1574,7 @@ class TestOvnProviderHelper(ovn_base.TestOvnOctaviaBase):
@mock.patch('ovn_octavia_provider.common.clients.get_neutron_client')
def test_member_create_lb_add_from_lr(self, net_cli, f_lr, folbpi):
fake_subnet = fakes.FakeSubnet.create_one_subnet()
net_cli.return_value.show_subnet.return_value = {'subnet': fake_subnet}
net_cli.return_value.get_subnet.return_value = fake_subnet
f_lr.return_value = self.router
pool_key = 'pool_%s' % self.pool_id
folbpi.return_value = (pool_key, self.ovn_lb)
@ -1583,10 +1591,10 @@ class TestOvnProviderHelper(ovn_base.TestOvnOctaviaBase):
@mock.patch('ovn_octavia_provider.common.clients.get_neutron_client')
def test_member_create_lb_add_from_lr_no_ls(self, net_cli, f_lr, f_ls):
fake_subnet = fakes.FakeSubnet.create_one_subnet()
net_cli.return_value.show_subnet.return_value = {'subnet': fake_subnet}
net_cli.return_value.get_subnet.return_value = fake_subnet
self.ovn_lb.external_ids = mock.MagicMock()
(self.helper.ovn_nbdb_api.ls_get.return_value.
execute.side_effect) = [n_exc.NotFound]
execute.side_effect) = [openstack.exceptions.ResourceNotFound]
status = self.helper.member_create(self.member)
self.assertEqual(status['loadbalancers'][0]['provisioning_status'],
constants.ACTIVE)
@ -1615,7 +1623,7 @@ class TestOvnProviderHelper(ovn_base.TestOvnOctaviaBase):
@mock.patch('ovn_octavia_provider.common.clients.get_neutron_client')
def test_member_create_lb_add_from_lr_retry(self, net_cli, f_lr, folbpi):
fake_subnet = fakes.FakeSubnet.create_one_subnet()
net_cli.return_value.show_subnet.return_value = {'subnet': fake_subnet}
net_cli.return_value.get_subnet.return_value = fake_subnet
f_lr.return_value = self.router
pool_key = 'pool_%s' % self.pool_id
folbpi.return_value = (pool_key, self.ovn_lb)
@ -1625,7 +1633,7 @@ class TestOvnProviderHelper(ovn_base.TestOvnOctaviaBase):
status = self.helper.member_create(self.member)
self.assertEqual(status['loadbalancers'][0]['provisioning_status'],
constants.ACTIVE)
f_lr.assert_called_once_with(self.network, fake_subnet['gateway_ip'])
f_lr.assert_called_once_with(self.network, fake_subnet.gateway_ip)
self.helper._update_lb_to_lr_association.assert_called_once_with(
self.ovn_lb, self.router)
self.helper._update_lb_to_lr_association_by_step \
@ -1635,7 +1643,7 @@ class TestOvnProviderHelper(ovn_base.TestOvnOctaviaBase):
@mock.patch('ovn_octavia_provider.common.clients.get_neutron_client')
def test_member_create_listener(self, net_cli):
net_cli.return_value.show_subnet.side_effect = [idlutils.RowNotFound]
net_cli.return_value.get_subnet.side_effect = [idlutils.RowNotFound]
self.ovn_lb.external_ids = mock.MagicMock()
self.helper._get_pool_listeners.return_value = ['listener1']
status = self.helper.member_create(self.member)
@ -2629,8 +2637,7 @@ class TestOvnProviderHelper(ovn_base.TestOvnOctaviaBase):
attrs={'id': 'foo_subnet_id',
'name': 'foo_subnet_name',
'network_id': 'foo_network_id'})
net_cli.return_value.show_subnet.return_value = {
'subnet': subnet}
net_cli.return_value.get_subnet.return_value = subnet
self.helper._update_lb_to_ls_association(
self.ref_lb1, subnet_id=subnet.id,
associate=True, update_ls_ref=True)
@ -2771,7 +2778,8 @@ class TestOvnProviderHelper(ovn_base.TestOvnOctaviaBase):
@mock.patch('ovn_octavia_provider.common.clients.get_neutron_client')
def test__update_lb_to_ls_association_network_dis_net_not_found(
self, net_cli):
net_cli.return_value.show_subnet.side_effect = n_exc.NotFound
net_cli.return_value.get_subnet.side_effect = (
openstack.exceptions.ResourceNotFound)
self._update_lb_to_ls_association.stop()
self._get_lb_to_ls_association_commands.stop()
(self.helper.ovn_nbdb_api.ls_get.return_value.execute.
@ -2893,7 +2901,7 @@ class TestOvnProviderHelper(ovn_base.TestOvnOctaviaBase):
@mock.patch('ovn_octavia_provider.common.clients.get_neutron_client')
def test_delete_vip_port_not_found(self, net_cli):
net_cli.return_value.delete_port.side_effect = (
[n_exc.PortNotFoundClient])
[openstack.exceptions.ResourceNotFound])
self.helper.delete_vip_port('foo')
@mock.patch('ovn_octavia_provider.helper.OvnProviderHelper.'
@ -3037,7 +3045,7 @@ class TestOvnProviderHelper(ovn_base.TestOvnOctaviaBase):
lb.external_ids = external_ids
self.mock_find_lb_pool_key.return_value = lb
self.helper.handle_member_dvr(info)
net_cli.show_subnet.assert_not_called()
net_cli.get_subnet.assert_not_called()
self.helper.ovn_nbdb_api.db_clear.assert_not_called()
@mock.patch('ovn_octavia_provider.common.clients.get_neutron_client')
@ -3072,7 +3080,8 @@ class TestOvnProviderHelper(ovn_base.TestOvnOctaviaBase):
'neutron:vip_fip': '11.11.11.11'}
lb.external_ids = external_ids
self.mock_find_lb_pool_key.return_value = lb
net_cli.return_value.show_subnet.side_effect = [n_exc.NotFound]
net_cli.return_value.get_subnet.side_effect = [
openstack.exceptions.ResourceNotFound]
self.helper.handle_member_dvr(info)
self.helper.ovn_nbdb_api.db_clear.assert_not_called()
@ -3104,10 +3113,9 @@ class TestOvnProviderHelper(ovn_base.TestOvnOctaviaBase):
'subnet_id': fake_port['fixed_ips'][0]['subnet_id'],
'action': action}
member_subnet = fakes.FakeSubnet.create_one_subnet()
member_subnet['id'] = self.member_subnet_id
member_subnet['network_id'] = 'foo'
net_cli.return_value.show_subnet.return_value = {
'subnet': member_subnet}
member_subnet.update({'id': self.member_subnet_id})
member_subnet.update({'network_id': 'foo'})
net_cli.return_value.get_subnet.return_value = member_subnet
fake_lsp = fakes.FakeOVNPort.from_neutron_port(
fake_port)
fake_ls = fakes.FakeOvsdbRow.create_one_ovsdb_row(
@ -3121,10 +3129,8 @@ class TestOvnProviderHelper(ovn_base.TestOvnOctaviaBase):
'external_ip': '22.22.22.22',
'external_ids': {
ovn_const.OVN_FIP_EXT_ID_KEY: 'fip_id'}})
fip_info = {
'floatingip': {
'description': 'bar'}}
net_cli.return_value.show_floatingip.return_value = fip_info
fip_info = {'description': 'bar'}
net_cli.return_value.get_ip.return_value = fip_info
self.helper.ovn_nbdb_api.db_find_rows.return_value.\
execute.return_value = [fake_nat]
external_ids = {
@ -3145,11 +3151,10 @@ class TestOvnProviderHelper(ovn_base.TestOvnOctaviaBase):
mock.ANY]
self.helper.ovn_nbdb_api.assert_has_calls(calls)
else:
(net_cli.return_value.show_floatingip.
(net_cli.return_value.get_ip.
assert_called_once_with('fip_id'))
(net_cli.return_value.update_floatingip.
assert_called_once_with('fip_id', {
'floatingip': {'description': 'bar'}}))
(net_cli.return_value.update_ip.
assert_called_once_with('fip_id', description='bar'))
self.helper.ovn_nbdb_api.db_clear.assert_not_called()
@mock.patch('ovn_octavia_provider.common.clients.get_neutron_client')
@ -3177,85 +3182,81 @@ class TestOvnProviderHelper(ovn_base.TestOvnOctaviaBase):
def test_create_vip_port_vip_selected(self):
expected_dict = {
'port': {'name': '%s%s' % (ovn_const.LB_VIP_PORT_PREFIX,
self.loadbalancer_id),
'fixed_ips': [{'subnet_id':
self.vip_dict['vip_subnet_id'],
'ip_address':'10.1.10.1'}],
'network_id': self.vip_dict['vip_network_id'],
'admin_state_up': True,
'project_id': self.project_id}}
'name': '%s%s' % (ovn_const.LB_VIP_PORT_PREFIX,
self.loadbalancer_id),
'fixed_ips': [{'subnet_id':
self.vip_dict['vip_subnet_id'],
'ip_address':'10.1.10.1'}],
'network_id': self.vip_dict['vip_network_id'],
'admin_state_up': True,
'project_id': self.project_id}
with mock.patch.object(clients, 'get_neutron_client') as net_cli:
self.vip_dict['vip_address'] = '10.1.10.1'
self.helper.create_vip_port(self.project_id,
self.loadbalancer_id,
self.vip_dict)
expected_call = [
mock.call().create_port(expected_dict)]
mock.call().create_port(**expected_dict)]
net_cli.assert_has_calls(expected_call)
def test_create_vip_port_vip_not_selected(self):
expected_dict = {
'port': {'name': '%s%s' % (ovn_const.LB_VIP_PORT_PREFIX,
self.loadbalancer_id),
'fixed_ips': [{'subnet_id':
self.vip_dict['vip_subnet_id']}],
'network_id': self.vip_dict['vip_network_id'],
'admin_state_up': True,
'project_id': self.project_id}}
'name': '%s%s' % (ovn_const.LB_VIP_PORT_PREFIX,
self.loadbalancer_id),
'fixed_ips': [{'subnet_id':
self.vip_dict['vip_subnet_id']}],
'network_id': self.vip_dict['vip_network_id'],
'admin_state_up': True,
'project_id': self.project_id}
with mock.patch.object(clients, 'get_neutron_client') as net_cli:
self.helper.create_vip_port(self.project_id,
self.loadbalancer_id,
self.vip_dict)
expected_call = [
mock.call().create_port(expected_dict)]
mock.call().create_port(**expected_dict)]
net_cli.assert_has_calls(expected_call)
@mock.patch('ovn_octavia_provider.common.clients.get_neutron_client')
def test_create_vip_port_vip_selected_already_exist(self, net_cli):
net_cli.return_value.create_port.side_effect = [
n_exc.IpAddressAlreadyAllocatedClient]
net_cli.return_value.list_ports.return_value = {
'ports': [
{'name': 'ovn-lb-vip-' + self.loadbalancer_id,
'id': self.loadbalancer_id}]}
openstack.exceptions.ConflictException]
net_cli.return_value.find_port.return_value = (
Port(name='ovn-lb-vip-' + self.loadbalancer_id,
id=self.loadbalancer_id))
self.vip_dict['vip_address'] = '10.1.10.1'
ret = self.helper.create_vip_port(
self.project_id,
self.loadbalancer_id,
self.vip_dict)
expected = {
'port': {
'name': '%s%s' % (ovn_const.LB_VIP_PORT_PREFIX,
self.loadbalancer_id),
'id': self.loadbalancer_id}}
self.assertDictEqual(expected, ret)
self.assertEqual(
'%s%s' % (ovn_const.LB_VIP_PORT_PREFIX,
self.loadbalancer_id), ret.name)
self.assertEqual(self.loadbalancer_id, ret.id)
expected_call = [
mock.call().list_ports(
mock.call().find_port(
network_id='%s' % self.vip_dict['vip_network_id'],
name='%s%s' % (ovn_const.LB_VIP_PORT_PREFIX,
self.loadbalancer_id))]
name_or_id='%s%s' % (ovn_const.LB_VIP_PORT_PREFIX,
self.loadbalancer_id))]
net_cli.assert_has_calls(expected_call)
@mock.patch('ovn_octavia_provider.common.clients.get_neutron_client')
def test_create_vip_port_vip_selected_other_allocation_exist(
self, net_cli):
net_cli.return_value.create_port.side_effect = [
n_exc.IpAddressAlreadyAllocatedClient]
net_cli.return_value.list_ports.return_value = {
'ports': []}
openstack.exceptions.ConflictException]
net_cli.return_value.find_port.return_value = None
self.vip_dict['vip_address'] = '10.1.10.1'
self.assertRaises(
n_exc.IpAddressAlreadyAllocatedClient,
openstack.exceptions.ConflictException,
self.helper.create_vip_port,
self.project_id,
self.loadbalancer_id,
self.vip_dict)
expected_call = [
mock.call().list_ports(
mock.call().find_port(
network_id='%s' % self.vip_dict['vip_network_id'],
name='%s%s' % (ovn_const.LB_VIP_PORT_PREFIX,
self.loadbalancer_id))]
name_or_id='%s%s' % (ovn_const.LB_VIP_PORT_PREFIX,
self.loadbalancer_id))]
net_cli.assert_has_calls(expected_call)
self.helper._update_status_to_octavia.assert_not_called()
@ -3264,22 +3265,21 @@ class TestOvnProviderHelper(ovn_base.TestOvnOctaviaBase):
def test_create_vip_port_vip_neutron_client_other_exception(
self, del_port, net_cli):
net_cli.return_value.create_port.side_effect = [
n_exc.NeutronClientException]
net_cli.return_value.list_ports.return_value = {
'ports': [
{'name': 'ovn-lb-vip-' + self.loadbalancer_id,
'id': self.loadbalancer_id}]}
openstack.exceptions.HttpException]
net_cli.return_value.find_port.return_value = Port(
name='ovn-lb-vip-' + self.loadbalancer_id,
id=self.loadbalancer_id)
self.assertRaises(
n_exc.NeutronClientException,
openstack.exceptions.HttpException,
self.helper.create_vip_port,
self.project_id,
self.loadbalancer_id,
self.vip_dict)
expected_call = [
mock.call().list_ports(
mock.call().find_port(
network_id='%s' % self.vip_dict['vip_network_id'],
name='%s%s' % (ovn_const.LB_VIP_PORT_PREFIX,
self.loadbalancer_id))]
name_or_id='%s%s' % (ovn_const.LB_VIP_PORT_PREFIX,
self.loadbalancer_id))]
net_cli.assert_has_calls(expected_call)
del_port.assert_called_once_with(self.loadbalancer_id)
self.helper._update_status_to_octavia.assert_not_called()
@ -3377,7 +3377,7 @@ class TestOvnProviderHelper(ovn_base.TestOvnOctaviaBase):
self.ovn_hm_lb.protocol = [protocol]
folbpi.return_value = (pool_key, self.ovn_hm_lb)
uhm.return_value = True
net_cli.return_value.show_subnet.return_value = {'subnet': fake_subnet}
net_cli.return_value.get_subnet.return_value = {'subnet': fake_subnet}
status = self.helper.hm_create(self.health_monitor)
self.assertEqual(status['healthmonitors'][0]['provisioning_status'],
constants.ACTIVE)
@ -3496,7 +3496,8 @@ class TestOvnProviderHelper(ovn_base.TestOvnOctaviaBase):
pool_key = 'pool_%s' % self.pool_id
self.ovn_hm_lb.external_ids[pool_key] = self.member_line
folbpi.return_value = (pool_key, self.ovn_hm_lb)
net_cli.return_value.show_subnet.side_effect = [n_exc.NotFound]
net_cli.return_value.get_subnet.side_effect = [
openstack.exceptions.ResourceNotFound]
status = self.helper.hm_create(self.health_monitor)
self.assertEqual(status['healthmonitors'][0]['provisioning_status'],
constants.ERROR)
@ -3523,8 +3524,8 @@ class TestOvnProviderHelper(ovn_base.TestOvnOctaviaBase):
pool_key = 'pool_%s' % self.pool_id
self.ovn_hm_lb.external_ids[pool_key] = member_line
folbpi.return_value = (pool_key, self.ovn_hm_lb)
net_cli.return_value.show_subnet.return_value = {'subnet': fake_subnet}
net_cli.return_value.list_ports.return_value = {'ports': []}
net_cli.return_value.get_subnet.return_value = fake_subnet
net_cli.return_value.ports.return_value = iter(())
fake_lsp = fakes.FakeOVNPort.from_neutron_port(fake_port)
fake_ls = fakes.FakeOvsdbRow.create_one_ovsdb_row(
attrs={
@ -3557,7 +3558,7 @@ class TestOvnProviderHelper(ovn_base.TestOvnOctaviaBase):
pool_key = 'pool_%s' % self.pool_id
self.ovn_hm_lb.external_ids[pool_key] = member_line
folbpi.return_value = (pool_key, self.ovn_hm_lb)
net_cli.return_value.show_subnet.return_value = {'subnet': fake_subnet}
net_cli.return_value.get_subnet.return_value = fake_subnet
fake_lsp = fakes.FakeOVNPort.from_neutron_port(fake_port)
fake_ls = fakes.FakeOvsdbRow.create_one_ovsdb_row(
attrs={