Remove python-neutronclient

This patch replaces the deprecated library with openstacksdk. It also
deprecates the use of the [service_auth] section of the configuration
for authentication with Neutron. In a future release these settings
need to be part of the [neutron] configuration section.

Update needed on OVN provider side:
https://review.opendev.org/c/openstack/ovn-octavia-provider/+/870514

Story: 2010509
Task: 47104
Change-Id: I686cfdef78de927fa4bc1921c15e8d5853fd2ef9
This commit is contained in:
Tom Weininger 2022-12-20 17:08:29 +01:00
parent 8d70a80505
commit 3e6fd13bfb
16 changed files with 1217 additions and 1207 deletions

View File

@ -321,6 +321,16 @@ function octavia_configure {
iniset $OCTAVIA_CONF service_auth cafile $SSL_BUNDLE_FILE
iniset $OCTAVIA_CONF service_auth memcached_servers $SERVICE_HOST:11211
# neutron
iniset $OCTAVIA_CONF neutron auth_url $KEYSTONE_SERVICE_URI
iniset $OCTAVIA_CONF neutron auth_type password
iniset $OCTAVIA_CONF neutron username $OCTAVIA_USERNAME
iniset $OCTAVIA_CONF neutron password $OCTAVIA_PASSWORD
iniset $OCTAVIA_CONF neutron user_domain_name $OCTAVIA_USER_DOMAIN_NAME
iniset $OCTAVIA_CONF neutron project_name $OCTAVIA_PROJECT_NAME
iniset $OCTAVIA_CONF neutron project_domain_name $OCTAVIA_PROJECT_DOMAIN_NAME
iniset $OCTAVIA_CONF neutron cafile $SSL_BUNDLE_FILE
# Setting other required default options
iniset $OCTAVIA_CONF controller_worker amphora_driver ${OCTAVIA_AMPHORA_DRIVER}
iniset $OCTAVIA_CONF controller_worker compute_driver ${OCTAVIA_COMPUTE_DRIVER}

View File

@ -12,9 +12,11 @@
from cinderclient import client as cinder_client
from glanceclient import client as glance_client
from neutronclient.neutron import client as neutron_client
from keystoneauth1 import session
from keystoneauth1 import token_endpoint
from novaclient import api_versions
from novaclient import client as nova_client
import openstack
from oslo_config import cfg
from oslo_log import log as logging
from oslo_utils import excutils
@ -25,7 +27,6 @@ LOG = logging.getLogger(__name__)
CONF = cfg.CONF
GLANCE_VERSION = '2'
NEUTRON_VERSION = '2.0'
NOVA_VERSION = '2.15'
CINDER_VERSION = '3'
@ -73,38 +74,20 @@ class NeutronAuth(object):
neutron_client = None
@classmethod
def get_neutron_client(cls, region, service_name=None, endpoint=None,
endpoint_type='publicURL', insecure=False,
ca_cert=None):
"""Create neutron client object.
:param region: The region of the service
:param service_name: The name of the neutron service in the catalog
:param endpoint: The endpoint of the service
:param endpoint_type: The endpoint_type of the service
:param insecure: Turn off certificate validation
:param ca_cert: CA Cert file path
:return: a Neutron Client object.
:raises Exception: if the client cannot be created
"""
ksession = keystone.KeystoneSession()
def get_neutron_client(cls):
"""Create neutron client object."""
ksession = keystone.KeystoneSession('neutron')
if not cls.neutron_client:
kwargs = {'region_name': region,
'session': ksession.get_session(),
'endpoint_type': endpoint_type,
'insecure': insecure}
if service_name:
kwargs['service_name'] = service_name
if endpoint:
kwargs['endpoint_override'] = endpoint
if ca_cert:
kwargs['ca_cert'] = ca_cert
try:
cls.neutron_client = neutron_client.Client(
NEUTRON_VERSION, **kwargs)
except Exception:
with excutils.save_and_reraise_exception():
LOG.exception("Error creating Neutron client.")
sess = ksession.get_session()
kwargs = {}
if CONF.neutron.endpoint_override:
kwargs['network_endpoint_override'] = (
CONF.neutron.endpoint_override)
conn = openstack.connection.Connection(
session=sess, **kwargs)
cls.neutron_client = conn
return cls.neutron_client
@classmethod
@ -113,26 +96,23 @@ class NeutronAuth(object):
It's possible that the token in the context is a trust scoped
which can't be used to initialize a keystone session.
We directly use the token and endpoint_url to initialize neutron
client.
"""
neutron_endpoint = CONF.neutron.endpoint
if not neutron_endpoint:
session = keystone.KeystoneSession().get_session()
endpoint_data = session.get_endpoint_data(
sess = keystone.KeystoneSession('neutron').get_session()
neutron_endpoint = CONF.neutron.endpoint_override
if neutron_endpoint is None:
endpoint_data = sess.get_endpoint_data(
service_type='network', interface=CONF.neutron.endpoint_type,
region_name=CONF.neutron.region_name)
neutron_endpoint = endpoint_data.catalog_url
kwargs = {
'token': context.auth_token,
'endpoint_url': neutron_endpoint,
'insecure': CONF.neutron.insecure,
'ca_cert': CONF.neutron.ca_certificates_file
}
user_auth = token_endpoint.Token(neutron_endpoint, context.auth_token)
user_sess = session.Session(auth=user_auth)
return neutron_client.Client(NEUTRON_VERSION, **kwargs)
conn = openstack.connection.Connection(
session=user_sess, oslo_conf=CONF)
return conn.network
class GlanceAuth(object):

View File

@ -37,10 +37,6 @@ from octavia import version
LOG = logging.getLogger(__name__)
EXTRA_LOG_LEVEL_DEFAULTS = [
'neutronclient.v2_0.client=INFO',
]
core_opts = [
cfg.HostnameOpt('host', default=utils.get_hostname(),
sample_default='<server-hostname.example.com>',
@ -89,7 +85,7 @@ api_opts = [
'octavia.api.drivers entrypoint.'),
default={'amphora': 'The Octavia Amphora driver.',
'octavia': 'Deprecated alias of the Octavia Amphora '
'driver.',
'driver.',
}),
cfg.StrOpt('default_provider_driver', default='amphora',
help=_('Default provider driver.')),
@ -747,21 +743,27 @@ cinder_opts = [
]
neutron_opts = [
cfg.StrOpt('service_name',
help=_('The name of the neutron service in the '
'keystone catalog')),
cfg.StrOpt('endpoint', help=_('A new endpoint to override the endpoint '
'in the keystone catalog.')),
cfg.StrOpt('region_name',
help=_('Region in Identity service catalog to use for '
'communication with the OpenStack services.')),
cfg.StrOpt('endpoint_type', default='publicURL',
help=_('Endpoint interface in identity service to use')),
'in the keystone catalog.'),
deprecated_for_removal=True,
deprecated_reason=_('The endpoint_override option defined by '
'keystoneauth1 is the new name for this '
'option.'),
deprecated_since='2023.2/Bobcat'),
cfg.StrOpt('endpoint_type', help=_('Endpoint interface in identity '
'service to use'),
deprecated_for_removal=True,
deprecated_reason=_('This option was replaced by the '
'valid_interfaces option defined by '
'keystoneauth.'),
deprecated_since='2023.2/Bobcat'),
cfg.StrOpt('ca_certificates_file',
help=_('CA certificates file path')),
cfg.BoolOpt('insecure',
default=False,
help=_('Disable certificate validation on SSL connections ')),
help=_('CA certificates file path'),
deprecated_for_removal=True,
deprecated_reason=_('The cafile option defined by '
'keystoneauth1 is the new name for this '
'option.'),
deprecated_since='2023.2/Bobcat'),
]
glance_opts = [
@ -902,8 +904,16 @@ _SQL_CONNECTION_DEFAULT = 'sqlite://'
db_options.set_defaults(cfg.CONF, connection=_SQL_CONNECTION_DEFAULT,
max_pool_size=10, max_overflow=20, pool_timeout=10)
ks_loading.register_auth_conf_options(cfg.CONF, constants.SERVICE_AUTH)
ks_loading.register_session_conf_options(cfg.CONF, constants.SERVICE_AUTH)
def register_ks_options(group):
ks_loading.register_auth_conf_options(cfg.CONF, group)
ks_loading.register_session_conf_options(cfg.CONF, group)
ks_loading.register_adapter_conf_options(cfg.CONF, group,
include_deprecated=False)
register_ks_options(constants.SERVICE_AUTH)
register_ks_options('neutron')
def register_cli_opts():
@ -911,6 +921,31 @@ def register_cli_opts():
logging.register_options(cfg.CONF)
def handle_neutron_deprecations():
# Apply neutron deprecated options to their new setting if needed
# Basicaly: if the value of the deprecated option is not the default:
# * convert it to a valid "new" value if needed
# * set it as the default for the new option
# Thus [neutron].<new_option> has an higher precedence than
# [neutron].<deprecated_option>
loc = cfg.CONF.get_location('endpoint', 'neutron')
if loc and loc.location != cfg.Locations.opt_default:
cfg.CONF.set_default('endpoint_override', cfg.CONF.neutron.endpoint,
'neutron')
loc = cfg.CONF.get_location('endpoint_type', 'neutron')
if loc and loc.location != cfg.Locations.opt_default:
endpoint_type = cfg.CONF.neutron.endpoint_type.replace('URL', '')
cfg.CONF.set_default('valid_interfaces', [endpoint_type],
'neutron')
loc = cfg.CONF.get_location('ca_certificates_file', 'neutron')
if loc and loc.location != cfg.Locations.opt_default:
cfg.CONF.set_default('cafile', cfg.CONF.neutron.ca_certificates_file,
'neutron')
def init(args, **kwargs):
register_cli_opts()
cfg.CONF(args=args, project='octavia',
@ -920,14 +955,20 @@ def init(args, **kwargs):
setup_remote_debugger()
validate.check_default_ciphers_prohibit_list_conflict()
# Override default auth_type for plugins with the default from service_auth
auth_type = cfg.CONF.service_auth.auth_type
cfg.CONF.set_default('auth_type', auth_type, 'neutron')
handle_neutron_deprecations()
def setup_logging(conf):
"""Sets up the logging options for a log with supplied name.
:param conf: a cfg.ConfOpts object
"""
logging.set_defaults(default_log_levels=logging.get_default_log_levels() +
EXTRA_LOG_LEVEL_DEFAULTS)
ll = logging.get_default_log_levels()
logging.set_defaults(default_log_levels=ll)
product_name = "octavia"
logging.setup(conf, product_name)
LOG.info("Logging enabled!")

View File

@ -12,6 +12,7 @@
# License for the specific language governing permissions and limitations
# under the License.
from keystoneauth1 import exceptions as ks_exceptions
from keystoneauth1 import loading as ks_loading
from keystonemiddleware import auth_token
from oslo_config import cfg
@ -32,14 +33,17 @@ class KeystoneSession(object):
self._auth = None
self.section = section
ks_loading.register_auth_conf_options(cfg.CONF, self.section)
ks_loading.register_session_conf_options(cfg.CONF, self.section)
def get_session(self):
def get_session(self, auth=None):
"""Initializes a Keystone session.
:return: a Keystone Session object
"""
if auth:
# Do not use the singleton with custom auth params
return ks_loading.load_session_from_conf_options(
cfg.CONF, self.section, auth=auth)
if not self._session:
self._session = ks_loading.load_session_from_conf_options(
cfg.CONF, self.section, auth=self.get_auth())
@ -48,8 +52,57 @@ class KeystoneSession(object):
def get_auth(self):
if not self._auth:
self._auth = ks_loading.load_auth_from_conf_options(
cfg.CONF, self.section)
try:
self._auth = ks_loading.load_auth_from_conf_options(
cfg.CONF, self.section)
except ks_exceptions.auth_plugins.MissingRequiredOptions as e:
if self.section == constants.SERVICE_AUTH:
raise e
# NOTE(gthiemonge): MissingRequiredOptions is raised: there is
# one or more missing auth options in the config file. It may
# be due to the migration from python-neutronclient to
# openstacksdk.
# With neutronclient, most of the auth settings were in
# [service_auth] with a few overrides in [neutron],
# but with openstacksdk, we have all the auth settings in the
# [neutron] section. In order to support smooth upgrades, in
# case those options are missing, we override the undefined
# options with the existing settings from [service_auth].
# This code should be removed when all the deployment tools set
# the correct options in [neutron]
# The config options are lazily registered/loaded by keystone,
# it means that we cannot get/set them before invoking
# 'load_auth_from_conf_options' on 'service_auth'.
ks_loading.load_auth_from_conf_options(
cfg.CONF, constants.SERVICE_AUTH)
config = getattr(cfg.CONF, self.section)
for opt in config:
# For each option in the [neutron] section, get its setting
# location, if the location is 'opt_default' or
# 'set_default', it means that the option is not configured
# in the config file, it should be replaced with the one
# from [service_auth]
loc = cfg.CONF.get_location(opt, self.section)
if not loc or loc.location in (cfg.Locations.opt_default,
cfg.Locations.set_default):
if hasattr(cfg.CONF.service_auth, opt):
cur_value = getattr(config, opt)
value = getattr(cfg.CONF.service_auth, opt)
if value != cur_value:
log_value = (value if opt != "password"
else "<hidden>")
LOG.debug("Overriding [%s].%s with '%s'",
self.section, opt, log_value)
cfg.CONF.set_override(opt, value, self.section)
# Now we can call load_auth_from_conf_options for this specific
# service with the newly defined options.
self._auth = ks_loading.load_auth_from_conf_options(
cfg.CONF, self.section)
return self._auth
def get_service_user_id(self):

View File

@ -14,9 +14,9 @@
import ipaddress
import time
from neutronclient.common import exceptions as neutron_client_exceptions
from novaclient import exceptions as nova_client_exceptions
from octavia_lib.common import constants as lib_consts
import openstack.exceptions as os_exceptions
from oslo_config import cfg
from oslo_log import log as logging
from stevedore import driver as stevedore_driver
@ -84,14 +84,15 @@ class AllowedAddressPairsDriver(neutron_base.BaseNeutronDriver):
def _plug_amphora_vip(self, amphora, subnet):
# We need a vip port owned by Octavia for Act/Stby and failover
try:
port = {constants.PORT: {
port = {
constants.NAME: 'octavia-lb-vrrp-' + amphora.id,
constants.NETWORK_ID: subnet.network_id,
constants.FIXED_IPS: [{'subnet_id': subnet.id}],
constants.ADMIN_STATE_UP: True,
constants.DEVICE_OWNER: OCTAVIA_OWNER}}
new_port = self.neutron_client.create_port(port)
new_port = utils.convert_port_dict_to_model(new_port)
constants.DEVICE_OWNER: OCTAVIA_OWNER,
}
new_port = self.network_proxy.create_port(**port)
new_port = utils.convert_port_to_model(new_port)
LOG.debug('Created vip port: %(port_id)s for amphora: %(amp)s',
{'port_id': new_port.id, 'amp': amphora.id})
@ -112,7 +113,7 @@ class AllowedAddressPairsDriver(neutron_base.BaseNeutronDriver):
LOG.exception(message)
try:
if new_port:
self.neutron_client.delete_port(new_port.id)
self.network_proxy.delete_port(new_port.id)
LOG.debug('Deleted base (VRRP) port %s due to plug_port '
'failure.', new_port.id)
except Exception:
@ -126,7 +127,7 @@ class AllowedAddressPairsDriver(neutron_base.BaseNeutronDriver):
def _add_vip_address_pairs(self, port_id, vip_address_list):
try:
self._add_allowed_address_pairs_to_port(port_id, vip_address_list)
except neutron_client_exceptions.PortNotFoundClient as e:
except os_exceptions.ResourceNotFound as e:
raise base.PortNotFound(str(e))
except Exception as e:
message = _('Error adding allowed address pair(s) {ips} '
@ -138,10 +139,8 @@ class AllowedAddressPairsDriver(neutron_base.BaseNeutronDriver):
def _get_lb_security_group(self, load_balancer_id):
sec_grp_name = common_utils.get_vip_security_group_name(
load_balancer_id)
sec_grps = self.neutron_client.list_security_groups(name=sec_grp_name)
if sec_grps and sec_grps.get(constants.SECURITY_GROUPS):
return sec_grps.get(constants.SECURITY_GROUPS)[0]
return None
sec_grp = self.network_proxy.find_security_group(sec_grp_name)
return sec_grp
def _get_ethertype_for_ip(self, ip):
address = ipaddress.ip_address(ip)
@ -152,8 +151,8 @@ class AllowedAddressPairsDriver(neutron_base.BaseNeutronDriver):
return 'IPv6' if net.version == 6 else 'IPv4'
def _update_security_group_rules(self, load_balancer, sec_grp_id):
rules = self.neutron_client.list_security_group_rules(
security_group_id=sec_grp_id)
rules = tuple(self.network_proxy.security_group_rules(
security_group_id=sec_grp_id))
updated_ports = []
listener_peer_ports = []
@ -192,11 +191,11 @@ class AllowedAddressPairsDriver(neutron_base.BaseNeutronDriver):
# port_range_max and min will be the same since this driver is
# responsible for creating these rules
old_ports = []
for rule in rules.get('security_group_rules', []):
for rule in rules:
# Don't remove egress rules and don't confuse other protocols with
# None ports with the egress rules. VRRP uses protocol 51 and 112
if (rule.get('direction') == 'egress' or
rule.get('protocol').upper() not in
rule.get('protocol').upper() not in
[constants.PROTOCOL_TCP, constants.PROTOCOL_UDP,
lib_consts.PROTOCOL_SCTP]):
continue
@ -206,7 +205,7 @@ class AllowedAddressPairsDriver(neutron_base.BaseNeutronDriver):
add_ports = set(updated_ports) - set(old_ports)
del_ports = set(old_ports) - set(updated_ports)
for rule in rules.get('security_group_rules', []):
for rule in rules:
if (rule.get('protocol', '') and
rule.get('protocol', '').upper() in
[constants.PROTOCOL_TCP, constants.PROTOCOL_UDP,
@ -215,8 +214,8 @@ class AllowedAddressPairsDriver(neutron_base.BaseNeutronDriver):
rule.get('remote_ip_prefix')) in del_ports):
rule_id = rule.get(constants.ID)
try:
self.neutron_client.delete_security_group_rule(rule_id)
except neutron_client_exceptions.NotFound:
self.network_proxy.delete_security_group_rule(rule_id)
except os_exceptions.ResourceNotFound:
LOG.info("Security group rule %s not found, will assume "
"it is already deleted.", rule_id)
@ -247,7 +246,7 @@ class AllowedAddressPairsDriver(neutron_base.BaseNeutronDriver):
constants.VRRP_PROTOCOL_NUM,
direction='ingress',
ethertype=primary_ethertype)
except neutron_client_exceptions.Conflict:
except os_exceptions.ConflictException:
# It's ok if this rule already exists
pass
except Exception as e:
@ -257,7 +256,7 @@ class AllowedAddressPairsDriver(neutron_base.BaseNeutronDriver):
self._create_security_group_rule(
sec_grp_id, constants.AUTH_HEADER_PROTOCOL_NUMBER,
direction='ingress', ethertype=primary_ethertype)
except neutron_client_exceptions.Conflict:
except os_exceptions.ConflictException:
# It's ok if this rule already exists
pass
except Exception as e:
@ -284,10 +283,10 @@ class AllowedAddressPairsDriver(neutron_base.BaseNeutronDriver):
attempts = 0
while attempts <= CONF.networking.max_retries:
try:
self.neutron_client.delete_security_group(sec_grp)
self.network_proxy.delete_security_group(sec_grp)
LOG.info("Deleted security group %s", sec_grp)
return
except neutron_client_exceptions.NotFound:
except os_exceptions.ResourceNotFound:
LOG.info("Security group %s not found, will assume it is "
"already deleted", sec_grp)
return
@ -304,31 +303,33 @@ class AllowedAddressPairsDriver(neutron_base.BaseNeutronDriver):
def _delete_security_group(self, vip, port):
if self.sec_grp_enabled:
sec_grp = self._get_lb_security_group(vip.load_balancer.id)
try:
lb_id = vip.load_balancer.id
except AttributeError:
sec_grp = None
else:
sec_grp = self._get_lb_security_group(lb_id)
if sec_grp:
sec_grp_id = sec_grp.get(constants.ID)
sec_grp_id = sec_grp.id
LOG.info(
"Removing security group %(sg)s from port %(port)s",
{'sg': sec_grp_id, constants.PORT: vip.port_id})
raw_port = None
try:
if port:
raw_port = self.neutron_client.show_port(port.id)
raw_port = self.network_proxy.get_port(port.id)
except Exception:
LOG.warning('Unable to get port information for port '
'%s. Continuing to delete the security '
'group.', port.id)
if raw_port:
sec_grps = raw_port.get(
constants.PORT, {}).get(constants.SECURITY_GROUPS, [])
if sec_grp_id in sec_grps:
sec_grps = raw_port.security_group_ids
if sec_grps and sec_grp_id in sec_grps:
sec_grps.remove(sec_grp_id)
port_update = {constants.PORT: {
constants.SECURITY_GROUPS: sec_grps}}
try:
self.neutron_client.update_port(port.id,
port_update)
except neutron_client_exceptions.PortNotFoundClient:
self.network_proxy.update_port(
port.id, security_group_ids=sec_grps)
except os_exceptions.ResourceNotFound:
LOG.warning('Unable to update port information '
'for port %s. Continuing to delete '
'the security group since port not '
@ -348,7 +349,7 @@ class AllowedAddressPairsDriver(neutron_base.BaseNeutronDriver):
try:
LOG.warning('Deleting extra port %s on security '
'group %s...', port_id, sec_grp_id)
self.neutron_client.delete_port(port_id)
self.network_proxy.delete_port(port_id)
except Exception:
LOG.warning('Failed to delete extra port %s on '
'security group %s.',
@ -361,13 +362,17 @@ class AllowedAddressPairsDriver(neutron_base.BaseNeutronDriver):
This can happen if a failover has occurred.
"""
for amphora in vip.load_balancer.amphorae:
try:
self.neutron_client.delete_port(amphora.vrrp_port_id)
except (neutron_client_exceptions.NotFound,
neutron_client_exceptions.PortNotFoundClient):
LOG.debug('VIP instance port %s already deleted. Skipping.',
amphora.vrrp_port_id)
try:
for amphora in vip.load_balancer.amphorae:
try:
self.network_proxy.delete_port(amphora.vrrp_port_id)
except os_exceptions.ResourceNotFound:
LOG.debug(
'VIP instance port %s already deleted. Skipping.',
amphora.vrrp_port_id)
except AttributeError as ex:
LOG.warning(f"Cannot delete port from amphorae. Object does not "
f"exist ({ex!r})")
try:
port = self.get_port(vip.port_id)
@ -381,9 +386,8 @@ class AllowedAddressPairsDriver(neutron_base.BaseNeutronDriver):
if port and port.device_owner == OCTAVIA_OWNER:
try:
self.neutron_client.delete_port(vip.port_id)
except (neutron_client_exceptions.NotFound,
neutron_client_exceptions.PortNotFoundClient):
self.network_proxy.delete_port(vip.port_id)
except os_exceptions.ResourceNotFound:
LOG.debug('VIP port %s already deleted. Skipping.',
vip.port_id)
except Exception as e:
@ -546,29 +550,29 @@ class AllowedAddressPairsDriver(neutron_base.BaseNeutronDriver):
project_id_key = 'tenant_id'
# It can be assumed that network_id exists
port = {constants.PORT: {
port = {
constants.NAME: 'octavia-lb-' + load_balancer.id,
constants.NETWORK_ID: load_balancer.vip.network_id,
constants.ADMIN_STATE_UP: False,
'device_id': 'lb-{0}'.format(load_balancer.id),
constants.DEVICE_OWNER: OCTAVIA_OWNER,
project_id_key: load_balancer.project_id}}
project_id_key: load_balancer.project_id}
if fixed_ips:
port[constants.PORT][constants.FIXED_IPS] = fixed_ips
port[constants.FIXED_IPS] = fixed_ips
try:
new_port = self.neutron_client.create_port(port)
new_port = self.network_proxy.create_port(**port)
except Exception as e:
message = _('Error creating neutron port on network '
'{network_id} due to {e}.').format(
network_id=load_balancer.vip.network_id, e=str(e))
network_id=load_balancer.vip.network_id, e=repr(e))
LOG.exception(message)
raise base.AllocateVIPException(
message,
orig_msg=getattr(e, constants.MESSAGE, None),
orig_code=getattr(e, constants.STATUS_CODE, None),
)
new_port = utils.convert_port_dict_to_model(new_port)
new_port = utils.convert_port_to_model(new_port)
return self._port_to_vip(new_port, load_balancer, octavia_owned=True)
def unplug_aap_port(self, vip, amphora, subnet):
@ -585,11 +589,11 @@ class AllowedAddressPairsDriver(neutron_base.BaseNeutronDriver):
except Exception:
pass
try:
aap_update = {constants.PORT: {
aap_update = {
constants.ALLOWED_ADDRESS_PAIRS: []
}}
self.neutron_client.update_port(interface.port_id,
aap_update)
}
self.network_proxy.update_port(interface.port_id,
**aap_update)
except Exception as e:
message = _('Error unplugging VIP. Could not clear '
'allowed address pairs from port '
@ -601,9 +605,8 @@ class AllowedAddressPairsDriver(neutron_base.BaseNeutronDriver):
try:
port = self.get_port(amphora.vrrp_port_id)
if port.name.startswith('octavia-lb-vrrp-'):
self.neutron_client.delete_port(amphora.vrrp_port_id)
except (neutron_client_exceptions.NotFound,
neutron_client_exceptions.PortNotFoundClient):
self.network_proxy.delete_port(amphora.vrrp_port_id)
except base.PortNotFound:
pass
except Exception as e:
LOG.error('Failed to delete port. Resources may still be in '
@ -704,11 +707,10 @@ class AllowedAddressPairsDriver(neutron_base.BaseNeutronDriver):
for port in ports:
try:
self.neutron_client.update_port(
port.id, {constants.PORT: {'dns_name': ''}})
self.network_proxy.update_port(
port.id, dns_name='')
except (neutron_client_exceptions.NotFound,
neutron_client_exceptions.PortNotFoundClient) as e:
except os_exceptions.ResourceNotFound as e:
raise base.PortNotFound() from e
def plug_port(self, amphora, port):
@ -824,15 +826,15 @@ class AllowedAddressPairsDriver(neutron_base.BaseNeutronDriver):
for port in ports:
try:
neutron_port = self.neutron_client.show_port(
port.id).get(constants.PORT)
neutron_port = self.network_proxy.get_port(
port.id)
device_id = neutron_port['device_id']
start = int(time.time())
while device_id:
time.sleep(CONF.networking.retry_interval)
neutron_port = self.neutron_client.show_port(
port.id).get(constants.PORT)
neutron_port = self.network_proxy.get_port(
port.id)
device_id = neutron_port['device_id']
timed_out = int(time.time()) - start >= port_detach_timeout
@ -843,8 +845,7 @@ class AllowedAddressPairsDriver(neutron_base.BaseNeutronDriver):
(port.id, device_id, port_detach_timeout))
raise base.TimeoutException(message)
except (neutron_client_exceptions.NotFound,
neutron_client_exceptions.PortNotFoundClient):
except os_exceptions.ResourceNotFound:
pass
def delete_port(self, port_id):
@ -854,9 +855,8 @@ class AllowedAddressPairsDriver(neutron_base.BaseNeutronDriver):
:returns: None
"""
try:
self.neutron_client.delete_port(port_id)
except (neutron_client_exceptions.NotFound,
neutron_client_exceptions.PortNotFoundClient):
self.network_proxy.delete_port(port_id)
except os_exceptions.ResourceNotFound:
LOG.debug('VIP instance port %s already deleted. Skipping.',
port_id)
except Exception as e:
@ -870,10 +870,9 @@ class AllowedAddressPairsDriver(neutron_base.BaseNeutronDriver):
:returns: None
"""
try:
self.neutron_client.update_port(
port_id, {constants.PORT: {constants.ADMIN_STATE_UP: state}})
except (neutron_client_exceptions.NotFound,
neutron_client_exceptions.PortNotFoundClient) as e:
self.network_proxy.update_port(
port_id, admin_state_up=state)
except os_exceptions.ResourceNotFound as e:
raise base.PortNotFound(str(e))
except Exception as e:
raise exceptions.NetworkServiceError(net_error=str(e))
@ -912,11 +911,11 @@ class AllowedAddressPairsDriver(neutron_base.BaseNeutronDriver):
if security_group_ids:
port[constants.SECURITY_GROUPS] = security_group_ids
new_port = self.neutron_client.create_port({constants.PORT: port})
new_port = self.network_proxy.create_port(**port)
LOG.debug('Created port: %(port)s', {constants.PORT: new_port})
return utils.convert_port_dict_to_model(new_port)
return utils.convert_port_to_model(new_port)
except Exception as e:
message = _('Error creating a port on network '
'{network_id} due to {error}.').format(
@ -933,14 +932,15 @@ class AllowedAddressPairsDriver(neutron_base.BaseNeutronDriver):
"""
try:
if self.sec_grp_enabled and sg_name:
sec_grps = self.neutron_client.list_security_groups(
name=sg_name)
if sec_grps and sec_grps.get(constants.SECURITY_GROUPS):
sg_dict = sec_grps.get(constants.SECURITY_GROUPS)[0]
return utils.convert_security_group_dict_to_model(sg_dict)
message = _('Security group {name} not found.').format(
name=sg_name)
raise base.SecurityGroupNotFound(message)
sec_grps = self.network_proxy.security_groups(name=sg_name)
try:
sg = next(sec_grps)
return utils.convert_security_group_to_model(sg)
except StopIteration:
# pylint: disable=raise-missing-from
message = _('Security group {name} not found.').format(
name=sg_name)
raise base.SecurityGroupNotFound(message)
return None
except base.SecurityGroupNotFound:
raise

View File

@ -12,7 +12,9 @@
# License for the specific language governing permissions and limitations
# under the License.
from neutronclient.common import exceptions as neutron_client_exceptions
from openstack.connection import Connection
import openstack.exceptions as os_exceptions
from openstack.network.v2._proxy import Proxy
from oslo_config import cfg
from oslo_log import log as logging
@ -23,33 +25,28 @@ from octavia.network import base
from octavia.network import data_models as network_models
from octavia.network.drivers.neutron import utils
LOG = logging.getLogger(__name__)
DNS_INT_EXT_ALIAS = 'dns-integration'
SEC_GRP_EXT_ALIAS = 'security-group'
QOS_EXT_ALIAS = 'qos'
CONF_GROUP = 'neutron'
CONF = cfg.CONF
class BaseNeutronDriver(base.AbstractNetworkDriver):
def __init__(self):
self.neutron_client = clients.NeutronAuth.get_neutron_client(
endpoint=CONF.neutron.endpoint,
region=CONF.neutron.region_name,
endpoint_type=CONF.neutron.endpoint_type,
service_name=CONF.neutron.service_name,
insecure=CONF.neutron.insecure,
ca_cert=CONF.neutron.ca_certificates_file
)
self.network_proxy: Proxy = self.os_connection.network
self._check_extension_cache = {}
self.sec_grp_enabled = self._check_extension_enabled(SEC_GRP_EXT_ALIAS)
self.dns_integration_enabled = self._check_extension_enabled(
DNS_INT_EXT_ALIAS)
self._qos_enabled = self._check_extension_enabled(QOS_EXT_ALIAS)
self.project_id = self.neutron_client.get_auth_info().get(
'auth_tenant_id')
self.project_id = self.os_connection.current_project_id
@property
def os_connection(self) -> Connection:
return clients.NeutronAuth.get_neutron_client()
def _check_extension_enabled(self, extension_alias):
if extension_alias in self._check_extension_cache:
@ -60,12 +57,11 @@ class BaseNeutronDriver(base.AbstractNetworkDriver):
'status': 'enabled' if status else 'disabled'
})
else:
try:
self.neutron_client.show_extension(extension_alias)
if self.network_proxy.find_extension(extension_alias):
LOG.debug('Neutron extension %(ext)s found enabled',
{'ext': extension_alias})
self._check_extension_cache[extension_alias] = True
except neutron_client_exceptions.NotFound:
else:
LOG.debug('Neutron extension %(ext)s is not enabled',
{'ext': extension_alias})
self._check_extension_cache[extension_alias] = False
@ -123,64 +119,51 @@ class BaseNeutronDriver(base.AbstractNetworkDriver):
fixed_ips=fixed_ips)
def _add_allowed_address_pairs_to_port(self, port_id, ip_address_list):
aap = {
'port': {
'allowed_address_pairs': [
{'ip_address': ip} for ip in ip_address_list
]
}
}
self.neutron_client.update_port(port_id, aap)
aap = [{'ip_address': ip} for ip in ip_address_list]
self.network_proxy.update_port(port_id,
allowed_address_pairs=aap)
def _add_security_group_to_port(self, sec_grp_id, port_id):
port_update = {'port': {'security_groups': [sec_grp_id]}}
# Note: Neutron accepts the SG even if it already exists
try:
self.neutron_client.update_port(port_id, port_update)
except neutron_client_exceptions.PortNotFoundClient as e:
self.network_proxy.update_port(
port_id, security_groups=[sec_grp_id])
except os_exceptions.NotFoundException as e:
raise base.PortNotFound(str(e))
except Exception as e:
raise base.NetworkException(str(e))
def _get_ports_by_security_group(self, sec_grp_id):
all_ports = self.neutron_client.list_ports(project_id=self.project_id)
filtered_ports = []
for port in all_ports.get('ports', []):
if sec_grp_id in port.get('security_groups', []):
filtered_ports.append(port)
all_ports = self.network_proxy.ports(project_id=self.project_id)
filtered_ports = [
p for p in all_ports if (p.security_group_ids and
sec_grp_id in p.security_group_ids)]
return filtered_ports
def _create_security_group(self, name):
new_sec_grp = {'security_group': {'name': name}}
sec_grp = self.neutron_client.create_security_group(new_sec_grp)
return sec_grp['security_group']
sec_grp = self.network_proxy.create_security_group(name=name)
return sec_grp
def _create_security_group_rule(self, sec_grp_id, protocol,
direction='ingress', port_min=None,
port_max=None, ethertype='IPv6',
cidr=None):
rule = {
'security_group_rule': {
'security_group_id': sec_grp_id,
'direction': direction,
'protocol': protocol,
'port_range_min': port_min,
'port_range_max': port_max,
'ethertype': ethertype,
'remote_ip_prefix': cidr,
}
'security_group_id': sec_grp_id,
'direction': direction,
'protocol': protocol,
'port_range_min': port_min,
'port_range_max': port_max,
'ethertype': ethertype,
'remote_ip_prefix': cidr,
}
self.neutron_client.create_security_group_rule(rule)
self.network_proxy.create_security_group_rule(**rule)
def apply_qos_on_port(self, qos_id, port_id):
body = {
'port':
{'qos_policy_id': qos_id}
}
try:
self.neutron_client.update_port(port_id, body)
except neutron_client_exceptions.PortNotFoundClient as e:
self.network_proxy.update_port(port_id, qos_policy_id=qos_id)
except os_exceptions.ResourceNotFound as e:
raise base.PortNotFound(str(e))
except Exception as e:
raise base.NetworkException(str(e))
@ -188,26 +171,26 @@ class BaseNeutronDriver(base.AbstractNetworkDriver):
def get_plugged_networks(self, compute_id):
# List neutron ports associated with the Amphora
try:
ports = self.neutron_client.list_ports(device_id=compute_id)
ports = self.network_proxy.ports(device_id=compute_id)
except Exception:
LOG.debug('Error retrieving plugged networks for compute '
'device %s.', compute_id)
ports = {'ports': []}
return [self._port_to_octavia_interface(
compute_id, port) for port in ports['ports']]
ports = tuple()
return [self._port_to_octavia_interface(compute_id, port) for port in
ports]
def _get_resource(self, resource_type, resource_id, context=None):
neutron_client = self.neutron_client
network = self.network_proxy
if context and not CONF.networking.allow_invisible_resource_usage:
neutron_client = clients.NeutronAuth.get_user_neutron_client(
network = clients.NeutronAuth.get_user_neutron_client(
context)
try:
resource = getattr(neutron_client, 'show_%s' %
resource_type)(resource_id)
return getattr(utils, 'convert_%s_dict_to_model' %
resource = getattr(
network, f"get_{resource_type}")(resource_id)
return getattr(utils, 'convert_%s_to_model' %
resource_type)(resource)
except neutron_client_exceptions.NotFound as e:
except os_exceptions.ResourceNotFound as e:
message = _('{resource_type} not found '
'({resource_type} id: {resource_id}).').format(
resource_type=resource_type, resource_id=resource_id)
@ -228,20 +211,25 @@ class BaseNeutronDriver(base.AbstractNetworkDriver):
If unique_item set to True, only the first resource is returned.
"""
try:
resource = getattr(self.neutron_client, 'list_%ss' %
resource_type)(**filters)
resources = getattr(
self.network_proxy, f"{resource_type}s")(**filters)
conversion_function = getattr(
utils,
'convert_%s_dict_to_model' % resource_type)
if not resource['%ss' % resource_type]:
# no items found
raise neutron_client_exceptions.NotFound()
if unique_item:
return conversion_function(resource['%ss' % resource_type][0])
'convert_%s_to_model' % resource_type)
try:
# get first item to see if there is at least one resource
res_list = [conversion_function(next(resources))]
except StopIteration:
# pylint: disable=raise-missing-from
raise os_exceptions.NotFoundException(
f'No resource of type {resource_type} found that matches '
f'given filter criteria: {filters}.')
return list(map(conversion_function,
resource['%ss' % resource_type]))
except neutron_client_exceptions.NotFound as e:
if unique_item:
return res_list[0]
return res_list + [conversion_function(r) for r in resources]
except os_exceptions.NotFoundException as e:
message = _('{resource_type} not found '
'({resource_type} Filters: {filters}.').format(
resource_type=resource_type, filters=filters)
@ -300,10 +288,10 @@ class BaseNeutronDriver(base.AbstractNetworkDriver):
fixed_ips.append(new_fixed_ip_dict)
body = {'port': {'fixed_ips': fixed_ips}}
try:
updated_port = self.neutron_client.update_port(port_id, body)
return utils.convert_port_dict_to_model(updated_port)
updated_port = self.network_proxy.update_port(
port_id, fixed_ips=fixed_ips)
return utils.convert_port_to_model(updated_port)
except Exception as e:
raise base.NetworkException(str(e))
@ -315,9 +303,9 @@ class BaseNeutronDriver(base.AbstractNetworkDriver):
if fixed_ip.subnet_id != subnet_id
]
body = {'port': {'fixed_ips': fixed_ips}}
try:
updated_port = self.neutron_client.update_port(port_id, body)
return utils.convert_port_dict_to_model(updated_port)
updated_port = self.network_proxy.update_port(
port_id, fixed_ips=fixed_ips)
return utils.convert_port_to_model(updated_port)
except Exception as e:
raise base.NetworkException(str(e))

View File

@ -13,113 +13,97 @@
# under the License.
from octavia.common import constants
from openstack.network.v2.network_ip_availability import NetworkIPAvailability
from octavia.network import data_models as network_models
def convert_subnet_dict_to_model(subnet_dict):
subnet = subnet_dict.get('subnet', subnet_dict)
subnet_hrs = subnet.get('host_routes', [])
def convert_subnet_to_model(subnet):
host_routes = [network_models.HostRoute(nexthop=hr.get('nexthop'),
destination=hr.get('destination'))
for hr in subnet_hrs]
return network_models.Subnet(id=subnet.get(constants.ID),
name=subnet.get(constants.NAME),
network_id=subnet.get('network_id'),
project_id=subnet.get(constants.TENANT_ID),
gateway_ip=subnet.get('gateway_ip'),
cidr=subnet.get('cidr'),
ip_version=subnet.get('ip_version'),
host_routes=host_routes
)
for hr in subnet.host_routes] if subnet.host_routes else []
return network_models.Subnet(
id=subnet.id,
name=subnet.name,
network_id=subnet.network_id,
project_id=subnet.project_id,
gateway_ip=subnet.gateway_ip,
cidr=subnet.cidr,
ip_version=subnet.ip_version,
host_routes=host_routes,
)
def convert_port_dict_to_model(port_dict):
port = port_dict.get('port', port_dict)
fixed_ips = [network_models.FixedIP(subnet_id=fixed_ip.get('subnet_id'),
ip_address=fixed_ip.get('ip_address'))
for fixed_ip in port.get('fixed_ips', [])]
def convert_port_to_model(port):
if port.get('fixed_ips'):
fixed_ips = [convert_fixed_ip_dict_to_model(fixed_ip)
for fixed_ip in port.fixed_ips]
else:
fixed_ips = []
return network_models.Port(
id=port.get(constants.ID),
name=port.get(constants.NAME),
device_id=port.get('device_id'),
device_owner=port.get('device_owner'),
mac_address=port.get('mac_address'),
network_id=port.get('network_id'),
status=port.get('status'),
project_id=port.get(constants.TENANT_ID),
admin_state_up=port.get('admin_state_up'),
id=port.id,
name=port.name,
device_id=port.device_id,
device_owner=port.device_owner,
mac_address=port.mac_address,
network_id=port.network_id,
status=port.status,
project_id=port.project_id,
admin_state_up=port.is_admin_state_up,
fixed_ips=fixed_ips,
qos_policy_id=port.get('qos_policy_id'),
security_group_ids=port.get(constants.SECURITY_GROUPS, [])
qos_policy_id=port.qos_policy_id,
security_group_ids=port.security_group_ids
)
def convert_network_dict_to_model(network_dict):
nw = network_dict.get('network', network_dict)
def convert_network_to_model(nw):
return network_models.Network(
id=nw.get(constants.ID),
name=nw.get(constants.NAME),
subnets=nw.get('subnets'),
project_id=nw.get(constants.TENANT_ID),
admin_state_up=nw.get('admin_state_up'),
mtu=nw.get('mtu'),
provider_network_type=nw.get('provider:network_type'),
provider_physical_network=nw.get('provider:physical_network'),
provider_segmentation_id=nw.get('provider:segmentation_id'),
router_external=nw.get('router:external'),
port_security_enabled=nw.get('port_security_enabled')
id=nw.id,
name=nw.name,
subnets=nw.subnet_ids,
project_id=nw.project_id,
admin_state_up=nw.is_admin_state_up,
mtu=nw.mtu,
provider_network_type=nw.provider_network_type,
provider_physical_network=nw.provider_physical_network,
provider_segmentation_id=nw.provider_segmentation_id,
router_external=nw.is_router_external,
port_security_enabled=nw.is_port_security_enabled,
)
def convert_fixed_ip_dict_to_model(fixed_ip_dict):
fixed_ip = fixed_ip_dict.get('fixed_ip', fixed_ip_dict)
def convert_fixed_ip_dict_to_model(fixed_ip: dict):
return network_models.FixedIP(subnet_id=fixed_ip.get('subnet_id'),
ip_address=fixed_ip.get('ip_address'))
def convert_qos_policy_dict_to_model(qos_policy_dict):
qos_policy = qos_policy_dict.get('policy', qos_policy_dict)
return network_models.QosPolicy(id=qos_policy.get(constants.ID))
def convert_qos_policy_to_model(qos_policy):
return network_models.QosPolicy(id=qos_policy.id)
# We can't use "floating_ip" because we need to match the neutron client method
def convert_floatingip_dict_to_model(floating_ip_dict):
floating_ip = floating_ip_dict.get('floatingip', floating_ip_dict)
return network_models.FloatingIP(
id=floating_ip.get(constants.ID),
description=floating_ip.get(constants.DESCRIPTION),
project_id=floating_ip.get(constants.PROJECT_ID,
floating_ip.get(constants.TENANT_ID)),
status=floating_ip.get('status'),
router_id=floating_ip.get('router_id'),
port_id=floating_ip.get('port_id'),
floating_network_id=floating_ip.get('floating_network_id'),
floating_ip_address=floating_ip.get('floating_ip_address'),
fixed_ip_address=floating_ip.get('fixed_ip_address'),
fixed_port_id=floating_ip.get('fixed_port_id')
)
def convert_network_ip_availability_dict_to_model(
network_ip_availability_dict):
nw_ip_avail = network_ip_availability_dict.get(
'network_ip_availability', network_ip_availability_dict)
ip_avail = network_models.Network_IP_Availability.from_dict(nw_ip_avail)
ip_avail.subnet_ip_availability = nw_ip_avail.get('subnet_ip_availability')
def convert_network_ip_availability_to_model(
nw_ip_avail: NetworkIPAvailability):
ip_avail = network_models.Network_IP_Availability(
network_id=nw_ip_avail.network_id,
tenant_id=nw_ip_avail.tenant_id,
project_id=nw_ip_avail.project_id,
network_name=nw_ip_avail.network_name, total_ips=nw_ip_avail.total_ips,
used_ips=nw_ip_avail.used_ips,
subnet_ip_availability=nw_ip_avail.subnet_ip_availability)
return ip_avail
def convert_security_group_dict_to_model(security_group_dict):
sg_rule_ids = [rule.get(constants.ID) for rule in
security_group_dict.get(constants.SECURITY_GROUP_RULES, [])]
def convert_security_group_to_model(security_group):
if security_group.security_group_rules:
sg_rule_ids = [rule['id'] for rule in
security_group.security_group_rules]
else:
sg_rule_ids = []
return network_models.SecurityGroup(
id=security_group_dict.get(constants.ID),
project_id=security_group_dict.get(
constants.PROJECT_ID,
security_group_dict.get(constants.TENANT_ID)),
name=security_group_dict.get(constants.NAME),
description=security_group_dict.get(constants.DESCRIPTION),
id=security_group.id,
project_id=security_group.project_id,
name=security_group.name,
description=security_group.description,
security_group_rule_ids=sg_rule_ids,
tags=security_group_dict.get(constants.TAGS, []),
stateful=security_group_dict.get('stateful'))
tags=security_group.tags,
stateful=security_group.stateful)

View File

@ -12,6 +12,12 @@
# License for the specific language governing permissions and limitations
# under the License.
from octavia_lib.common import constants as lib_constants
from openstack.network.v2.floating_ip import FloatingIP
from openstack.network.v2.network import Network
from openstack.network.v2.network_ip_availability import NetworkIPAvailability
from openstack.network.v2.port import Port
from openstack.network.v2.security_group import SecurityGroup
from openstack.network.v2.subnet import Subnet
from octavia.common import constants
@ -43,16 +49,16 @@ MOCK_MAC_ADDR = 'fe:16:3e:00:95:5c'
MOCK_MAC_ADDR2 = 'fe:16:3e:00:95:5d'
MOCK_PROJECT_ID = 'mock-project-1'
MOCK_HOST_ROUTES = []
MOCK_SUBNET = {'subnet': {'id': MOCK_SUBNET_ID,
'network_id': MOCK_NETWORK_ID,
'name': MOCK_SUBNET_NAME,
'tenant_id': MOCK_PROJECT_ID,
'gateway_ip': MOCK_GATEWAY_IP,
'cidr': MOCK_CIDR,
'ip_version': MOCK_IP_VERSION,
'host_routes': MOCK_HOST_ROUTES}}
MOCK_SUBNET2 = {'subnet': {'id': MOCK_SUBNET_ID2,
'network_id': MOCK_NETWORK_ID2}}
MOCK_SUBNET = Subnet(**{'id': MOCK_SUBNET_ID,
'network_id': MOCK_NETWORK_ID,
'name': MOCK_SUBNET_NAME,
'tenant_id': MOCK_PROJECT_ID,
'gateway_ip': MOCK_GATEWAY_IP,
'cidr': MOCK_CIDR,
'ip_version': MOCK_IP_VERSION,
'host_routes': MOCK_HOST_ROUTES})
MOCK_SUBNET2 = Subnet(**{'id': MOCK_SUBNET_ID2,
'network_id': MOCK_NETWORK_ID2})
MOCK_HOST_ROUTES = []
MOCK_NOVA_INTERFACE = MockNovaInterface()
@ -69,7 +75,7 @@ MOCK_DEVICE_ID2 = 'Moctavia124'
MOCK_SECURITY_GROUP_ID = 'security-group-1'
MOCK_SECURITY_GROUP_NAME = 'SecurityGroup1'
MOCK_SECURITY_GROUP = {
MOCK_SECURITY_GROUP = SecurityGroup(**{
"id": MOCK_SECURITY_GROUP_ID,
"name": MOCK_SECURITY_GROUP_NAME,
"tenant_id": MOCK_PROJECT_ID,
@ -113,7 +119,7 @@ MOCK_SECURITY_GROUP = {
"created_at": "2020-03-12T20:43:31Z",
"updated_at": "2020-03-12T20:44:48Z",
"revision_number": 3,
"project_id": MOCK_PROJECT_ID}
"project_id": MOCK_PROJECT_ID})
MOCK_ADMIN_STATE_UP = True
MOCK_STATUS = 'ACTIVE'
@ -122,59 +128,60 @@ MOCK_NETWORK_TYPE = 'flat'
MOCK_SEGMENTATION_ID = 1
MOCK_ROUTER_EXTERNAL = False
MOCK_NEUTRON_PORT = {'port': {'network_id': MOCK_NETWORK_ID,
'device_id': MOCK_DEVICE_ID,
'device_owner': MOCK_DEVICE_OWNER,
'id': MOCK_PORT_ID,
'name': MOCK_PORT_NAME,
'tenant_id': MOCK_PROJECT_ID,
'admin_state_up': MOCK_ADMIN_STATE_UP,
'status': MOCK_STATUS,
'mac_address': MOCK_MAC_ADDR,
'fixed_ips': [{'ip_address': MOCK_IP_ADDRESS,
'subnet_id': MOCK_SUBNET_ID}],
'security_groups': [MOCK_SECURITY_GROUP_ID]}}
MOCK_NEUTRON_PORT = Port(**{'network_id': MOCK_NETWORK_ID,
'device_id': MOCK_DEVICE_ID,
'device_owner': MOCK_DEVICE_OWNER,
'id': MOCK_PORT_ID,
'name': MOCK_PORT_NAME,
'tenant_id': MOCK_PROJECT_ID,
'admin_state_up': MOCK_ADMIN_STATE_UP,
'status': MOCK_STATUS,
'mac_address': MOCK_MAC_ADDR,
'fixed_ips': [{'ip_address': MOCK_IP_ADDRESS,
'subnet_id': MOCK_SUBNET_ID}],
'security_groups': [MOCK_SECURITY_GROUP_ID]})
MOCK_NEUTRON_QOS_POLICY_ID = 'mock-qos-id'
MOCK_QOS_POLICY_ID1 = 'qos1-id'
MOCK_QOS_POLICY_ID2 = 'qos2-id'
MOCK_NEUTRON_PORT2 = {'port': {'network_id': MOCK_NETWORK_ID2,
'device_id': MOCK_DEVICE_ID2,
'device_owner': MOCK_DEVICE_OWNER,
'id': MOCK_PORT_ID2,
'name': MOCK_PORT_NAME2,
'tenant_id': MOCK_PROJECT_ID,
'admin_state_up': MOCK_ADMIN_STATE_UP,
'status': MOCK_STATUS,
'mac_address': MOCK_MAC_ADDR2,
'fixed_ips': [{'ip_address': MOCK_IP_ADDRESS2,
'subnet_id': MOCK_SUBNET_ID2}]}}
MOCK_NEUTRON_PORT2 = Port(**{'network_id': MOCK_NETWORK_ID2,
'device_id': MOCK_DEVICE_ID2,
'device_owner': MOCK_DEVICE_OWNER,
'id': MOCK_PORT_ID2,
'name': MOCK_PORT_NAME2,
'tenant_id': MOCK_PROJECT_ID,
'admin_state_up': MOCK_ADMIN_STATE_UP,
'status': MOCK_STATUS,
'mac_address': MOCK_MAC_ADDR2,
'fixed_ips': [{'ip_address': MOCK_IP_ADDRESS2,
'subnet_id': MOCK_SUBNET_ID2}]})
MOCK_NETWORK = {'network': {'id': MOCK_NETWORK_ID,
'name': MOCK_NETWORK_NAME,
'tenant_id': MOCK_PROJECT_ID,
'admin_state_up': MOCK_ADMIN_STATE_UP,
'subnets': [MOCK_SUBNET_ID],
'mtu': MOCK_MTU,
'provider:network_type': 'flat',
'provider:physical_network': MOCK_NETWORK_NAME,
'provider:segmentation_id': MOCK_SEGMENTATION_ID,
'router:external': MOCK_ROUTER_EXTERNAL}}
MOCK_FIXED_IP = {'fixed_ip': {'subnet_id': MOCK_SUBNET_ID,
'ip_address': MOCK_IP_ADDRESS}}
MOCK_NETWORK = Network(**{'id': MOCK_NETWORK_ID,
'name': MOCK_NETWORK_NAME,
'project_id': MOCK_PROJECT_ID,
'admin_state_up': MOCK_ADMIN_STATE_UP,
'subnet_ids': [MOCK_SUBNET_ID],
'mtu': MOCK_MTU,
'provider_network_type': 'flat',
'provider_physical_network': MOCK_NETWORK_NAME,
'provider_segmentation_id': MOCK_SEGMENTATION_ID,
'router_external': MOCK_ROUTER_EXTERNAL,
'port_security_enabled': False})
MOCK_FIXED_IP = {'subnet_id': MOCK_SUBNET_ID,
'ip_address': MOCK_IP_ADDRESS}
MOCK_FLOATING_IP_ID = 'floating-ip-1'
MOCK_FLOATING_IP_DESC = 'TestFloatingIP1'
MOCK_ROUTER_ID = 'mock-router-1'
MOCK_FLOATING_IP = {'floatingip': {'id': MOCK_FLOATING_IP_ID,
'description': MOCK_FLOATING_IP_DESC,
'tenant_id': MOCK_PROJECT_ID,
'status': MOCK_STATUS,
'port_id': MOCK_PORT_ID,
'router_id': MOCK_ROUTER_ID,
'floating_network_id': MOCK_NETWORK_ID,
'floating_ip_address': MOCK_IP_ADDRESS,
'fixed_ip_address': MOCK_IP_ADDRESS2,
'fixed_port_id': MOCK_PORT_ID2}}
MOCK_FLOATING_IP = FloatingIP(**{'id': MOCK_FLOATING_IP_ID,
'description': MOCK_FLOATING_IP_DESC,
'tenant_id': MOCK_PROJECT_ID,
'status': MOCK_STATUS,
'port_id': MOCK_PORT_ID,
'router_id': MOCK_ROUTER_ID,
'floating_network_id': MOCK_NETWORK_ID,
'floating_ip_address': MOCK_IP_ADDRESS,
'fixed_ip_address': MOCK_IP_ADDRESS2,
'fixed_port_id': MOCK_PORT_ID2})
MOCK_AMP_ID1 = 'amp1-id'
MOCK_AMP_ID2 = 'amp2-id'
@ -205,17 +212,17 @@ MOCK_MANAGEMENT_INTERFACE2.net_id = MOCK_MANAGEMENT_NET_ID
MOCK_MANAGEMENT_INTERFACE2.port_id = MOCK_MANAGEMENT_PORT_ID2
MOCK_MANAGEMENT_INTERFACE2.fixed_ips = MOCK_MANAGEMENT_FIXED_IPS2
MOCK_MANAGEMENT_PORT1 = {'port': {'network_id': MOCK_MANAGEMENT_NET_ID,
'device_id': MOCK_AMP_COMPUTE_ID1,
'device_owner': MOCK_DEVICE_OWNER,
'id': MOCK_MANAGEMENT_PORT_ID1,
'fixed_ips': MOCK_MANAGEMENT_FIXED_IPS1}}
MOCK_MANAGEMENT_PORT1 = Port(**{'network_id': MOCK_MANAGEMENT_NET_ID,
'device_id': MOCK_AMP_COMPUTE_ID1,
'device_owner': MOCK_DEVICE_OWNER,
'id': MOCK_MANAGEMENT_PORT_ID1,
'fixed_ips': MOCK_MANAGEMENT_FIXED_IPS1})
MOCK_MANAGEMENT_PORT2 = {'port': {'network_id': MOCK_MANAGEMENT_NET_ID,
'device_id': MOCK_AMP_COMPUTE_ID2,
'device_owner': MOCK_DEVICE_OWNER,
'id': MOCK_MANAGEMENT_PORT_ID2,
'fixed_ips': MOCK_MANAGEMENT_FIXED_IPS2}}
MOCK_MANAGEMENT_PORT2 = Port(**{'network_id': MOCK_MANAGEMENT_NET_ID,
'device_id': MOCK_AMP_COMPUTE_ID2,
'device_owner': MOCK_DEVICE_OWNER,
'id': MOCK_MANAGEMENT_PORT_ID2,
'fixed_ips': MOCK_MANAGEMENT_FIXED_IPS2})
MOCK_VIP_SUBNET_ID = 'vip-subnet-1'
MOCK_VIP_SUBNET_ID2 = 'vip-subnet-2'
@ -242,17 +249,17 @@ MOCK_VRRP_INTERFACE2.net_id = MOCK_VIP_NET_ID
MOCK_VRRP_INTERFACE2.port_id = MOCK_VRRP_PORT_ID2
MOCK_VRRP_INTERFACE2.fixed_ips = MOCK_VRRP_FIXED_IPS2
MOCK_VRRP_PORT1 = {'port': {'network_id': MOCK_VIP_NET_ID,
'device_id': MOCK_AMP_COMPUTE_ID1,
'device_owner': MOCK_DEVICE_OWNER,
'id': MOCK_VRRP_PORT_ID1,
'fixed_ips': MOCK_VRRP_FIXED_IPS1}}
MOCK_VRRP_PORT1 = Port(**{'network_id': MOCK_VIP_NET_ID,
'device_id': MOCK_AMP_COMPUTE_ID1,
'device_owner': MOCK_DEVICE_OWNER,
'id': MOCK_VRRP_PORT_ID1,
'fixed_ips': MOCK_VRRP_FIXED_IPS1})
MOCK_VRRP_PORT2 = {'port': {'network_id': MOCK_VIP_NET_ID,
'device_id': MOCK_AMP_COMPUTE_ID2,
'device_owner': MOCK_DEVICE_OWNER,
'id': MOCK_VRRP_PORT_ID2,
'fixed_ips': MOCK_VRRP_FIXED_IPS2}}
MOCK_VRRP_PORT2 = Port(**{'network_id': MOCK_VIP_NET_ID,
'device_id': MOCK_AMP_COMPUTE_ID2,
'device_owner': MOCK_DEVICE_OWNER,
'id': MOCK_VRRP_PORT_ID2,
'fixed_ips': MOCK_VRRP_FIXED_IPS2})
MOCK_NETWORK_TOTAL_IPS = 254
MOCK_NETWORK_USED_IPS = 0
@ -262,13 +269,13 @@ MOCK_SUBNET_IP_AVAILABILITY = [{'used_ips': MOCK_SUBNET_USED_IPS,
'subnet_id': MOCK_SUBNET_ID,
'total_ips': MOCK_SUBNET_TOTAL_IPS}]
MOCK_NETWORK_IP_AVAILABILITY = {'network_ip_availability': (
{'network_id': MOCK_NETWORK_ID,
'tenant_id': MOCK_PROJECT_ID,
'network_name': MOCK_NETWORK_NAME,
'total_ips': MOCK_NETWORK_TOTAL_IPS,
'used_ips': MOCK_NETWORK_USED_IPS,
'subnet_ip_availability': MOCK_SUBNET_IP_AVAILABILITY})}
MOCK_NETWORK_IP_AVAILABILITY = NetworkIPAvailability(
**{'network_id': MOCK_NETWORK_ID,
'tenant_id': MOCK_PROJECT_ID,
'network_name': MOCK_NETWORK_NAME,
'total_ips': MOCK_NETWORK_TOTAL_IPS,
'used_ips': MOCK_NETWORK_USED_IPS,
'subnet_ip_availability': MOCK_SUBNET_IP_AVAILABILITY})
INVALID_LISTENER_POOL_PROTOCOL_MAP = {
constants.PROTOCOL_HTTP: [constants.PROTOCOL_HTTPS,

View File

@ -13,7 +13,6 @@ from unittest import mock
import cinderclient.v3
import glanceclient.v2
import neutronclient.v2_0
import novaclient.v2
from oslo_config import cfg
@ -62,44 +61,6 @@ class TestNovaAuth(base.TestCase):
self.assertIs(bc1, bc2)
class TestNeutronAuth(base.TestCase):
def setUp(self):
# Reset the session and client
clients.NeutronAuth.neutron_client = None
keystone._SESSION = None
super().setUp()
@mock.patch('keystoneauth1.session.Session', mock.Mock())
def test_get_neutron_client(self):
# There should be no existing client
self.assertIsNone(
clients.NeutronAuth.neutron_client
)
# Mock out the keystone session and get the client
keystone._SESSION = mock.MagicMock()
bc1 = clients.NeutronAuth.get_neutron_client(
region=None, endpoint_type='publicURL')
# Our returned client should also be the saved client
self.assertIsInstance(
clients.NeutronAuth.neutron_client,
neutronclient.v2_0.client.Client
)
self.assertIs(
clients.NeutronAuth.neutron_client,
bc1
)
# Getting the session again should return the same object
bc2 = clients.NeutronAuth.get_neutron_client(
region="test-region", service_name="neutronEndpoint1",
endpoint="test-endpoint", endpoint_type='publicURL', insecure=True)
self.assertIs(bc1, bc2)
class TestGlanceAuth(base.TestCase):
def setUp(self):

View File

@ -0,0 +1,54 @@
# Copyright Red Hat
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from unittest import mock
from unittest.mock import call
from keystoneauth1 import exceptions as ks_exceptions
from oslo_config import cfg
from oslo_config import fixture as oslo_fixture
import octavia.common.keystone as ks
import octavia.tests.unit.base as base
class TestKeystoneSession(base.TestCase):
@mock.patch("oslo_config.cfg.ConfigOpts.get_location", return_value=None)
@mock.patch("octavia.common.keystone.ks_loading"
".load_auth_from_conf_options")
@mock.patch("octavia.common.keystone.LOG")
def test_get_auth_neutron_override(self, mock_log, mock_load_auth,
mock_get_location):
opt_mock = mock.MagicMock()
opt_mock.dest = "foo"
conf = oslo_fixture.Config(cfg.CONF)
conf.conf.service_auth.cafile = "bar"
mock_load_auth.side_effect = [
ks_exceptions.auth_plugins.MissingRequiredOptions(
[opt_mock]),
None,
None
]
sess = ks.KeystoneSession("neutron")
sess.get_auth()
mock_load_auth.assert_has_calls([call(cfg.CONF, 'neutron'),
call(cfg.CONF, 'service_auth'),
call(cfg.CONF, 'neutron')])
mock_log.debug.assert_has_calls(
[call("Overriding [%s].%s with '%s'", 'neutron', 'cafile',
'bar')]
)

View File

@ -13,11 +13,9 @@
# under the License.
from unittest import mock
from neutronclient.common import exceptions as neutron_client_exceptions
from oslo_config import cfg
from oslo_config import fixture as oslo_fixture
from octavia.common import clients
from octavia.common import data_models
from octavia.network import base as network_base
from octavia.network import data_models as network_models
@ -26,6 +24,12 @@ from octavia.network.drivers.neutron import utils
from octavia.tests.common import constants as t_constants
from octavia.tests.common import data_model_helpers as dmh
from octavia.tests.unit import base
import openstack.exceptions as os_exceptions
from openstack.network.v2.network import Network
from openstack.network.v2.network_ip_availability import NetworkIPAvailability
from openstack.network.v2.port import Port
from openstack.network.v2.qos_policy import QoSPolicy
from openstack.network.v2.subnet import Subnet
class TestBaseNeutronNetworkDriver(base.TestCase):
@ -42,95 +46,88 @@ class TestBaseNeutronNetworkDriver(base.TestCase):
def setUp(self):
super().setUp()
with mock.patch('octavia.common.clients.neutron_client.Client',
autospec=True) as neutron_client:
client = neutron_client(clients.NEUTRON_VERSION)
client.list_extensions.return_value = {
'extensions': [
{'alias': neutron_base.SEC_GRP_EXT_ALIAS}
]
}
with mock.patch('octavia.common.clients.openstack.connection.'
'Connection', autospec=True) as os_connection:
self._original_find_extension = (
os_connection.return_value.network.find_extension)
os_connection.return_value.network.find_extension = (
lambda x: 'alias' if x == neutron_base.SEC_GRP_EXT_ALIAS else
None)
self.k_session = mock.patch(
'keystoneauth1.session.Session').start()
self.driver = self._instantiate_partial_abc(
neutron_base.BaseNeutronDriver)
def test__check_extension_enabled(self):
show_extension = self.driver.neutron_client.show_extension
show_extension.side_effect = [None, neutron_client_exceptions.NotFound]
self.assertTrue(self.driver._check_extension_enabled('TEST1'))
self.assertFalse(self.driver._check_extension_enabled('TEST2'))
show_extension.assert_has_calls(
[mock.call('TEST1'), mock.call('TEST2')])
with mock.patch.object(self.driver.network_proxy, "find_extension",
side_effect=[True, False]) as show_extension:
self.assertTrue(self.driver._check_extension_enabled('TEST1'))
self.assertFalse(self.driver._check_extension_enabled('TEST2'))
show_extension.assert_has_calls(
[mock.call('TEST1'), mock.call('TEST2')])
def test__check_extension_enabled_cached(self):
show_extension = self.driver.neutron_client.show_extension
self.driver._check_extension_cache = {'TEST1': True, 'TEST2': False}
self.assertTrue(self.driver._check_extension_enabled('TEST1'))
self.assertFalse(self.driver._check_extension_enabled('TEST2'))
self.assertNotIn(mock.call('TEST1'), show_extension.mock_calls)
self.assertNotIn(mock.call('TEST2'), show_extension.mock_calls)
with mock.patch.object(self.driver.network_proxy, "find_extension",
) as show_extension:
self.driver._check_extension_cache = {'TEST1': True,
'TEST2': False}
self.assertTrue(self.driver._check_extension_enabled('TEST1'))
self.assertFalse(self.driver._check_extension_enabled('TEST2'))
self.assertNotIn(mock.call('TEST1'), show_extension.mock_calls)
self.assertNotIn(mock.call('TEST2'), show_extension.mock_calls)
def test__add_allowed_address_pair_to_port(self):
self.driver._add_allowed_address_pairs_to_port(
t_constants.MOCK_PORT_ID, [t_constants.MOCK_IP_ADDRESS])
expected_aap_dict = {
'port': {
'allowed_address_pairs': [
{'ip_address': t_constants.MOCK_IP_ADDRESS}]}}
self.driver.neutron_client.update_port.assert_has_calls([
mock.call(t_constants.MOCK_PORT_ID, expected_aap_dict)])
'allowed_address_pairs': [
{'ip_address': t_constants.MOCK_IP_ADDRESS}]}
self.driver.network_proxy.update_port.assert_has_calls([
mock.call(t_constants.MOCK_PORT_ID, **expected_aap_dict)])
def test__add_security_group_to_port(self):
self.driver._add_security_group_to_port(
t_constants.MOCK_SECURITY_GROUP_ID, t_constants.MOCK_PORT_ID)
expected_sg_dict = {
'port': {
'security_groups': [
t_constants.MOCK_SECURITY_GROUP_ID]}}
self.driver.neutron_client.update_port.assert_has_calls([
mock.call(t_constants.MOCK_PORT_ID, expected_sg_dict)])
'security_groups': [
t_constants.MOCK_SECURITY_GROUP_ID]}
self.driver.network_proxy.update_port.assert_has_calls([
mock.call(t_constants.MOCK_PORT_ID, **expected_sg_dict)])
def test__add_security_group_to_port_with_port_not_found(self):
self.driver.neutron_client.update_port.side_effect = (
neutron_client_exceptions.PortNotFoundClient)
self.driver.network_proxy.update_port.side_effect = (
os_exceptions.ResourceNotFound)
self.assertRaises(
network_base.PortNotFound,
self.driver._add_security_group_to_port,
t_constants.MOCK_SECURITY_GROUP_ID, t_constants.MOCK_PORT_ID)
def test__add_security_group_to_port_with_other_exception(self):
self.driver.neutron_client.update_port.side_effect = IOError
self.driver.network_proxy.update_port.side_effect = IOError
self.assertRaises(
network_base.NetworkException,
self.driver._add_security_group_to_port,
t_constants.MOCK_SECURITY_GROUP_ID, t_constants.MOCK_PORT_ID)
def test__get_ports_by_security_group(self):
self.driver.neutron_client.list_ports.return_value = {
"ports": [
t_constants.MOCK_NEUTRON_PORT['port'],
t_constants.MOCK_NEUTRON_PORT2['port']]
}
self.driver.network_proxy.ports.return_value = [
t_constants.MOCK_NEUTRON_PORT,
t_constants.MOCK_NEUTRON_PORT2]
ports = self.driver._get_ports_by_security_group(
t_constants.MOCK_SECURITY_GROUP_ID)
self.assertEqual(1, len(ports))
self.assertIn(t_constants.MOCK_NEUTRON_PORT['port'], ports)
self.assertIn(t_constants.MOCK_NEUTRON_PORT, ports)
def test__create_security_group(self):
sg_return = self.driver._create_security_group(
t_constants.MOCK_SECURITY_GROUP_NAME)
expected_sec_grp_dict = {
'security_group': {
'name': t_constants.MOCK_SECURITY_GROUP_NAME}}
self.driver.neutron_client.create_security_group.assert_has_calls([
mock.call(expected_sec_grp_dict)])
'name': t_constants.MOCK_SECURITY_GROUP_NAME}
self.driver.network_proxy.create_security_group.assert_has_calls([
mock.call(**expected_sec_grp_dict)])
self.assertEqual(
sg_return,
self.driver.neutron_client.create_security_group()[
'security_group'])
self.driver.network_proxy.create_security_group())
def test__create_security_group_rule(self):
self.driver._create_security_group_rule(
@ -142,22 +139,21 @@ class TestBaseNeutronNetworkDriver(base.TestCase):
ethertype=5,
cidr="10.0.0.0/24")
expected_sec_grp_rule_dict = {
'security_group_rule': {
'security_group_id': t_constants.MOCK_SECURITY_GROUP_ID,
'direction': 1,
'protocol': 2,
'port_range_min': 3,
'port_range_max': 4,
'ethertype': 5,
'remote_ip_prefix': '10.0.0.0/24'}}
self.driver.neutron_client.create_security_group_rule.assert_has_calls(
[mock.call(expected_sec_grp_rule_dict)])
'security_group_id': t_constants.MOCK_SECURITY_GROUP_ID,
'direction': 1,
'protocol': 2,
'port_range_min': 3,
'port_range_max': 4,
'ethertype': 5,
'remote_ip_prefix': '10.0.0.0/24'}
self.driver.network_proxy.create_security_group_rule.assert_has_calls(
[mock.call(**expected_sec_grp_rule_dict)])
def test__port_to_vip(self):
lb = dmh.generate_load_balancer_tree()
lb.vip.subnet_id = t_constants.MOCK_SUBNET_ID
lb.vip.ip_address = t_constants.MOCK_IP_ADDRESS
port = utils.convert_port_dict_to_model(t_constants.MOCK_NEUTRON_PORT)
port = utils.convert_port_to_model(t_constants.MOCK_NEUTRON_PORT)
vip, additional_vips = self.driver._port_to_vip(port, lb)
self.assertIsInstance(vip, data_models.Vip)
self.assertIsInstance(additional_vips, list)
@ -179,19 +175,19 @@ class TestBaseNeutronNetworkDriver(base.TestCase):
self.assertIn('10.0.0.1', ips)
def test_get_plugged_networks(self):
list_ports = self.driver.neutron_client.list_ports
list_ports = self.driver.network_proxy.ports
list_ports.side_effect = TypeError
o_ifaces = self.driver.get_plugged_networks(
t_constants.MOCK_DEVICE_ID)
self.assertEqual(0, len(o_ifaces))
list_ports.side_effect = None
list_ports.reset_mock()
port1 = t_constants.MOCK_NEUTRON_PORT['port']
port1 = t_constants.MOCK_NEUTRON_PORT
port2 = {
'id': '4', 'network_id': '3', 'fixed_ips':
[{'ip_address': '10.0.0.2'}]
[{'ip_address': '10.0.0.2'}]
}
list_ports.return_value = {'ports': [port1, port2]}
list_ports.return_value = [port1, port2]
plugged_networks = self.driver.get_plugged_networks(
t_constants.MOCK_DEVICE_ID)
for pn in plugged_networks:
@ -207,10 +203,10 @@ class TestBaseNeutronNetworkDriver(base.TestCase):
config = self.useFixture(oslo_fixture.Config(cfg.CONF))
config.config(group="networking", allow_invisible_resource_usage=True)
show_network = self.driver.neutron_client.show_network
show_network.return_value = {'network': {
show_network = self.driver.network_proxy.get_network
show_network.return_value = Network(**{
'id': t_constants.MOCK_NETWORK_ID,
'subnets': [t_constants.MOCK_SUBNET_ID]}}
'subnets': [t_constants.MOCK_SUBNET_ID]})
network = self.driver.get_network(t_constants.MOCK_NETWORK_ID)
self.assertIsInstance(network, network_models.Network)
self.assertEqual(t_constants.MOCK_NETWORK_ID, network.id)
@ -219,10 +215,10 @@ class TestBaseNeutronNetworkDriver(base.TestCase):
@mock.patch("octavia.common.clients.NeutronAuth.get_user_neutron_client")
def test_get_user_network(self, neutron_client_mock):
show_network = neutron_client_mock.return_value.show_network
show_network.return_value = {'network': {
show_network = neutron_client_mock.return_value.get_network
show_network.return_value = Network(**{
'id': t_constants.MOCK_NETWORK_ID,
'subnets': [t_constants.MOCK_SUBNET_ID]}}
'subnets': [t_constants.MOCK_SUBNET_ID]})
network = self.driver.get_network(t_constants.MOCK_NETWORK_ID,
context=mock.ANY)
@ -236,11 +232,11 @@ class TestBaseNeutronNetworkDriver(base.TestCase):
config = self.useFixture(oslo_fixture.Config(cfg.CONF))
config.config(group="networking", allow_invisible_resource_usage=True)
show_subnet = self.driver.neutron_client.show_subnet
show_subnet.return_value = {'subnet': {
show_subnet = self.driver.network_proxy.get_subnet
show_subnet.return_value = Subnet(**{
'id': t_constants.MOCK_SUBNET_ID,
'gateway_ip': t_constants.MOCK_IP_ADDRESS,
'cidr': t_constants.MOCK_CIDR}}
'cidr': t_constants.MOCK_CIDR})
subnet = self.driver.get_subnet(t_constants.MOCK_SUBNET_ID)
self.assertIsInstance(subnet, network_models.Subnet)
self.assertEqual(t_constants.MOCK_SUBNET_ID, subnet.id)
@ -249,11 +245,11 @@ class TestBaseNeutronNetworkDriver(base.TestCase):
@mock.patch("octavia.common.clients.NeutronAuth.get_user_neutron_client")
def test_get_user_subnet(self, neutron_client_mock):
show_subnet = neutron_client_mock.return_value.show_subnet
show_subnet.return_value = {'subnet': {
show_subnet = neutron_client_mock.return_value.get_subnet
show_subnet.return_value = Subnet(**{
'id': t_constants.MOCK_SUBNET_ID,
'gateway_ip': t_constants.MOCK_IP_ADDRESS,
'cidr': t_constants.MOCK_CIDR}}
'cidr': t_constants.MOCK_CIDR})
subnet = self.driver.get_subnet(t_constants.MOCK_SUBNET_ID,
context=mock.ANY)
@ -267,15 +263,15 @@ class TestBaseNeutronNetworkDriver(base.TestCase):
config = self.useFixture(oslo_fixture.Config(cfg.CONF))
config.config(group="networking", allow_invisible_resource_usage=True)
show_port = self.driver.neutron_client.show_port
show_port.return_value = {'port': {
show_port = self.driver.network_proxy.get_port
show_port.return_value = Port(**{
'id': t_constants.MOCK_PORT_ID,
'mac_address': t_constants.MOCK_MAC_ADDR,
'network_id': t_constants.MOCK_NETWORK_ID,
'fixed_ips': [{
'subnet_id': t_constants.MOCK_SUBNET_ID,
'ip_address': t_constants.MOCK_IP_ADDRESS
}]}}
}]})
port = self.driver.get_port(t_constants.MOCK_PORT_ID)
self.assertIsInstance(port, network_models.Port)
self.assertEqual(t_constants.MOCK_PORT_ID, port.id)
@ -290,15 +286,15 @@ class TestBaseNeutronNetworkDriver(base.TestCase):
@mock.patch("octavia.common.clients.NeutronAuth.get_user_neutron_client")
def test_get_user_port(self, neutron_client_mock):
show_port = neutron_client_mock.return_value.show_port
show_port.return_value = {'port': {
show_port = neutron_client_mock.return_value.get_port
show_port.return_value = Port(**{
'id': t_constants.MOCK_PORT_ID,
'mac_address': t_constants.MOCK_MAC_ADDR,
'network_id': t_constants.MOCK_NETWORK_ID,
'fixed_ips': [{
'subnet_id': t_constants.MOCK_SUBNET_ID,
'ip_address': t_constants.MOCK_IP_ADDRESS
}]}}
}]})
port = self.driver.get_port(t_constants.MOCK_PORT_ID, context=mock.ANY)
@ -314,11 +310,11 @@ class TestBaseNeutronNetworkDriver(base.TestCase):
port.fixed_ips[0].ip_address)
def test_get_network_by_name(self):
list_network = self.driver.neutron_client.list_networks
list_network.return_value = {'networks': [{'network': {
list_network = self.driver.network_proxy.networks
list_network.return_value = iter([Network(**{
'id': t_constants.MOCK_NETWORK_ID,
'name': t_constants.MOCK_NETWORK_NAME,
'subnets': [t_constants.MOCK_SUBNET_ID]}}]}
'subnets': [t_constants.MOCK_SUBNET_ID]})])
network = self.driver.get_network_by_name(
t_constants.MOCK_NETWORK_NAME)
self.assertIsInstance(network, network_models.Network)
@ -327,7 +323,7 @@ class TestBaseNeutronNetworkDriver(base.TestCase):
self.assertEqual(1, len(network.subnets))
self.assertEqual(t_constants.MOCK_SUBNET_ID, network.subnets[0])
# Negative
list_network.side_effect = neutron_client_exceptions.NotFound
list_network.side_effect = os_exceptions.ResourceNotFound
self.assertRaises(network_base.NetworkNotFound,
self.driver.get_network_by_name,
t_constants.MOCK_NETWORK_NAME)
@ -337,12 +333,12 @@ class TestBaseNeutronNetworkDriver(base.TestCase):
t_constants.MOCK_NETWORK_NAME)
def test_get_subnet_by_name(self):
list_subnet = self.driver.neutron_client.list_subnets
list_subnet.return_value = {'subnets': [{'subnet': {
list_subnet = self.driver.network_proxy.subnets
list_subnet.return_value = iter([Subnet(**{
'id': t_constants.MOCK_SUBNET_ID,
'name': t_constants.MOCK_SUBNET_NAME,
'gateway_ip': t_constants.MOCK_IP_ADDRESS,
'cidr': t_constants.MOCK_CIDR}}]}
'cidr': t_constants.MOCK_CIDR})])
subnet = self.driver.get_subnet_by_name(t_constants.MOCK_SUBNET_NAME)
self.assertIsInstance(subnet, network_models.Subnet)
self.assertEqual(t_constants.MOCK_SUBNET_ID, subnet.id)
@ -350,7 +346,7 @@ class TestBaseNeutronNetworkDriver(base.TestCase):
self.assertEqual(t_constants.MOCK_IP_ADDRESS, subnet.gateway_ip)
self.assertEqual(t_constants.MOCK_CIDR, subnet.cidr)
# Negative
list_subnet.side_effect = neutron_client_exceptions.NotFound
list_subnet.side_effect = os_exceptions.ResourceNotFound
self.assertRaises(network_base.SubnetNotFound,
self.driver.get_subnet_by_name,
t_constants.MOCK_SUBNET_NAME)
@ -360,8 +356,8 @@ class TestBaseNeutronNetworkDriver(base.TestCase):
t_constants.MOCK_SUBNET_NAME)
def test_get_port_by_name(self):
list_port = self.driver.neutron_client.list_ports
list_port.return_value = {'ports': [{'port': {
list_port = self.driver.network_proxy.ports
list_port.return_value = iter([Port(**{
'id': t_constants.MOCK_PORT_ID,
'name': t_constants.MOCK_PORT_NAME,
'mac_address': t_constants.MOCK_MAC_ADDR,
@ -369,7 +365,7 @@ class TestBaseNeutronNetworkDriver(base.TestCase):
'fixed_ips': [{
'subnet_id': t_constants.MOCK_SUBNET_ID,
'ip_address': t_constants.MOCK_IP_ADDRESS
}]}}]}
}]})])
port = self.driver.get_port_by_name(t_constants.MOCK_PORT_NAME)
self.assertIsInstance(port, network_models.Port)
self.assertEqual(t_constants.MOCK_PORT_ID, port.id)
@ -383,7 +379,7 @@ class TestBaseNeutronNetworkDriver(base.TestCase):
self.assertEqual(t_constants.MOCK_IP_ADDRESS,
port.fixed_ips[0].ip_address)
# Negative
list_port.side_effect = neutron_client_exceptions.NotFound
list_port.side_effect = os_exceptions.ResourceNotFound
self.assertRaises(network_base.PortNotFound,
self.driver.get_port_by_name,
t_constants.MOCK_PORT_NAME)
@ -393,8 +389,8 @@ class TestBaseNeutronNetworkDriver(base.TestCase):
t_constants.MOCK_PORT_NAME)
def test_get_port_by_net_id_device_id(self):
list_port = self.driver.neutron_client.list_ports
list_port.return_value = {'ports': [{'port': {
list_port = self.driver.network_proxy.ports
list_port.return_value = iter([Port(**{
'id': t_constants.MOCK_PORT_ID,
'name': t_constants.MOCK_PORT_NAME,
'mac_address': t_constants.MOCK_MAC_ADDR,
@ -403,7 +399,7 @@ class TestBaseNeutronNetworkDriver(base.TestCase):
'fixed_ips': [{
'subnet_id': t_constants.MOCK_SUBNET_ID,
'ip_address': t_constants.MOCK_IP_ADDRESS
}]}}]}
}]})])
port = self.driver.get_port_by_net_id_device_id(
t_constants.MOCK_NETWORK_ID, t_constants.MOCK_DEVICE_ID)
self.assertIsInstance(port, network_models.Port)
@ -419,7 +415,7 @@ class TestBaseNeutronNetworkDriver(base.TestCase):
self.assertEqual(t_constants.MOCK_IP_ADDRESS,
port.fixed_ips[0].ip_address)
# Negative
list_port.side_effect = neutron_client_exceptions.NotFound
list_port.side_effect = os_exceptions.ResourceNotFound
self.assertRaises(network_base.PortNotFound,
self.driver.get_port_by_net_id_device_id,
t_constants.MOCK_PORT_NAME,
@ -436,12 +432,10 @@ class TestBaseNeutronNetworkDriver(base.TestCase):
The expected result is: only the first port is returned.
"""
list_port = self.driver.neutron_client.list_ports
list_port.return_value = {
'ports': [t_constants.MOCK_NEUTRON_PORT,
t_constants.MOCK_NEUTRON_PORT2,
],
}
list_port = self.driver.network_proxy.ports
list_port.return_value = iter([t_constants.MOCK_NEUTRON_PORT,
t_constants.MOCK_NEUTRON_PORT2,
])
port = self.driver.get_port_by_net_id_device_id(
t_constants.MOCK_NETWORK_ID, t_constants.MOCK_DEVICE_ID)
@ -458,7 +452,7 @@ class TestBaseNeutronNetworkDriver(base.TestCase):
self.assertEqual(t_constants.MOCK_IP_ADDRESS,
port.fixed_ips[0].ip_address)
# Negative
list_port.side_effect = neutron_client_exceptions.NotFound
list_port.side_effect = os_exceptions.ResourceNotFound
self.assertRaises(network_base.PortNotFound,
self.driver.get_port_by_net_id_device_id,
t_constants.MOCK_PORT_NAME,
@ -471,12 +465,10 @@ class TestBaseNeutronNetworkDriver(base.TestCase):
def test_get_multiple_ports_by_net_id_device_id(self):
"""Test _get_resources_by_filters, when result is not unique"""
list_port = self.driver.neutron_client.list_ports
list_port.return_value = {
'ports': [t_constants.MOCK_NEUTRON_PORT,
t_constants.MOCK_NEUTRON_PORT2,
],
}
list_port = self.driver.network_proxy.ports
list_port.return_value = iter([t_constants.MOCK_NEUTRON_PORT,
t_constants.MOCK_NEUTRON_PORT2,
])
ports = self.driver._get_resources_by_filters(
'port',
@ -495,10 +487,8 @@ class TestBaseNeutronNetworkDriver(base.TestCase):
def test_get_unique_port_by_name(self):
"""Test _get_resources_by_filters, when result is unique"""
list_port = self.driver.neutron_client.list_ports
list_port.return_value = {
'ports': [t_constants.MOCK_NEUTRON_PORT]
}
list_port = self.driver.network_proxy.ports
list_port.return_value = iter([t_constants.MOCK_NEUTRON_PORT])
port = self.driver._get_resources_by_filters(
'port', unique_item=True, name=t_constants.MOCK_PORT_NAME)
@ -508,44 +498,54 @@ class TestBaseNeutronNetworkDriver(base.TestCase):
def test_get_non_existing_port_by_name(self):
"""Test _get_resources_by_filters, when result is empty"""
list_port = self.driver.neutron_client.list_ports
list_port.return_value = {'ports': []}
list_port = self.driver.network_proxy.ports
list_port.return_value = iter([])
self.assertRaises(network_base.PortNotFound,
self.driver._get_resources_by_filters,
'port', unique_item=True, name='port1')
def test_get_qos_policy(self):
get_qos = self.driver.neutron_client.show_qos_policy
get_qos.return_value = {'policy': {
'id': t_constants.MOCK_NEUTRON_QOS_POLICY_ID}}
get_qos = self.driver.network_proxy.get_qos_policy
get_qos.return_value = QoSPolicy(**{
'id': t_constants.MOCK_NEUTRON_QOS_POLICY_ID})
qos = self.driver.get_qos_policy(
t_constants.MOCK_NEUTRON_QOS_POLICY_ID)
self.assertIsInstance(qos, network_models.QosPolicy)
self.assertEqual(t_constants.MOCK_NEUTRON_QOS_POLICY_ID,
qos.id)
get_qos.side_effect = neutron_client_exceptions.NotFound
get_qos.side_effect = os_exceptions.ResourceNotFound
self.assertRaises(network_base.QosPolicyNotFound,
self.driver.get_qos_policy,
t_constants.MOCK_NEUTRON_QOS_POLICY_ID)
get_qos.side_effect = neutron_client_exceptions.ServiceUnavailable
get_qos.side_effect = os_exceptions.SDKException
self.assertRaises(network_base.NetworkException,
self.driver.get_qos_policy,
t_constants.MOCK_NEUTRON_QOS_POLICY_ID)
def test_apply_qos_on_port(self):
update_port = self.driver.network_proxy.update_port
self.driver.apply_qos_on_port(
t_constants.MOCK_NEUTRON_QOS_POLICY_ID,
t_constants.MOCK_PORT_ID
)
update_port.assert_called_once_with(
t_constants.MOCK_PORT_ID,
qos_policy_id=t_constants.MOCK_NEUTRON_QOS_POLICY_ID)
def test_apply_or_undo_qos_on_port(self):
# The apply and undo qos function use the same "update_port" with
# neutron client. So testing them in one Uts.
update_port = self.driver.neutron_client.update_port
update_port.side_effect = neutron_client_exceptions.PortNotFoundClient
update_port = self.driver.network_proxy.update_port
update_port.side_effect = os_exceptions.ResourceNotFound
self.assertRaises(network_base.PortNotFound,
self.driver.apply_qos_on_port,
t_constants.MOCK_PORT_ID,
t_constants.MOCK_NEUTRON_QOS_POLICY_ID)
update_port.side_effect = neutron_client_exceptions.ServiceUnavailable
update_port.side_effect = os_exceptions.SDKException
self.assertRaises(network_base.NetworkException,
self.driver.apply_qos_on_port,
t_constants.MOCK_PORT_ID,
@ -553,12 +553,13 @@ class TestBaseNeutronNetworkDriver(base.TestCase):
def test_get_network_ip_availability(self):
show_network_ip_availability = (
self.driver.neutron_client.show_network_ip_availability)
self.driver.network_proxy.get_network_ip_availability)
show_network_ip_availability.return_value = (
{'network_ip_availability': {
'network_id': t_constants.MOCK_NETWORK_ID,
'subnet_ip_availability': t_constants.MOCK_SUBNET_IP_AVAILABILITY
}})
NetworkIPAvailability(**{
'network_id': t_constants.MOCK_NETWORK_ID,
'subnet_ip_availability':
t_constants.MOCK_SUBNET_IP_AVAILABILITY
}))
ip_avail = self.driver.get_network_ip_availability(
network_models.Network(t_constants.MOCK_NETWORK_ID))
self.assertIsInstance(ip_avail, network_models.Network_IP_Availability)
@ -567,8 +568,8 @@ class TestBaseNeutronNetworkDriver(base.TestCase):
ip_avail.subnet_ip_availability)
def test_plug_fixed_ip(self):
show_port = self.driver.neutron_client.show_port
show_port.return_value = {
show_port = self.driver.network_proxy.get_port
show_port.return_value = Port(**{
'id': t_constants.MOCK_PORT_ID,
'fixed_ips': [
{
@ -576,34 +577,31 @@ class TestBaseNeutronNetworkDriver(base.TestCase):
'ip_address': t_constants.MOCK_IP_ADDRESS,
'subnet': None
}]
}
})
self.driver.plug_fixed_ip(t_constants.MOCK_PORT_ID,
t_constants.MOCK_SUBNET_ID2,
t_constants.MOCK_IP_ADDRESS2)
expected_body = {
'port': {
'fixed_ips': [
{
'subnet_id': t_constants.MOCK_SUBNET_ID,
'ip_address': t_constants.MOCK_IP_ADDRESS,
'subnet': None
}, {
'subnet_id': t_constants.MOCK_SUBNET_ID2,
'ip_address': t_constants.MOCK_IP_ADDRESS2
}
]
}
'fixed_ips': [
{
'subnet_id': t_constants.MOCK_SUBNET_ID,
'ip_address': t_constants.MOCK_IP_ADDRESS,
'subnet': None
}, {
'subnet_id': t_constants.MOCK_SUBNET_ID2,
'ip_address': t_constants.MOCK_IP_ADDRESS2
}
]
}
self.driver.neutron_client.update_port.assert_called_once_with(
self.driver.network_proxy.update_port.assert_called_once_with(
t_constants.MOCK_PORT_ID,
expected_body)
**expected_body)
def test_plug_fixed_ip_no_ip_address(self):
show_port = self.driver.neutron_client.show_port
show_port.return_value = {
show_port = self.driver.network_proxy.get_port
show_port.return_value = Port(**{
'id': t_constants.MOCK_PORT_ID,
'fixed_ips': [
{
@ -611,31 +609,27 @@ class TestBaseNeutronNetworkDriver(base.TestCase):
'ip_address': t_constants.MOCK_IP_ADDRESS,
'subnet': None
}]
}
})
self.driver.plug_fixed_ip(t_constants.MOCK_PORT_ID,
t_constants.MOCK_SUBNET_ID2)
expected_body = {
'port': {
'fixed_ips': [
{
'subnet_id': t_constants.MOCK_SUBNET_ID,
'ip_address': t_constants.MOCK_IP_ADDRESS,
'subnet': None
}, {
'subnet_id': t_constants.MOCK_SUBNET_ID2,
}
]
}
'fixed_ips': [
{
'subnet_id': t_constants.MOCK_SUBNET_ID,
'ip_address': t_constants.MOCK_IP_ADDRESS,
'subnet': None
}, {
'subnet_id': t_constants.MOCK_SUBNET_ID2,
}
]
}
self.driver.neutron_client.update_port.assert_called_once_with(
t_constants.MOCK_PORT_ID,
expected_body)
self.driver.network_proxy.update_port.assert_called_once_with(
t_constants.MOCK_PORT_ID, **expected_body)
def test_plug_fixed_ip_exception(self):
show_port = self.driver.neutron_client.show_port
show_port = self.driver.network_proxy.get_port
show_port.return_value = {
'id': t_constants.MOCK_PORT_ID,
'fixed_ips': [
@ -646,7 +640,7 @@ class TestBaseNeutronNetworkDriver(base.TestCase):
}]
}
self.driver.neutron_client.update_port.side_effect = Exception
self.driver.network_proxy.update_port.side_effect = Exception
self.assertRaises(network_base.NetworkException,
self.driver.plug_fixed_ip,
@ -654,8 +648,8 @@ class TestBaseNeutronNetworkDriver(base.TestCase):
t_constants.MOCK_SUBNET_ID2)
def test_unplug_fixed_ip(self):
show_port = self.driver.neutron_client.show_port
show_port.return_value = {
show_port = self.driver.network_proxy.get_port
show_port.return_value = Port(**{
'id': t_constants.MOCK_PORT_ID,
'fixed_ips': [
{
@ -667,40 +661,33 @@ class TestBaseNeutronNetworkDriver(base.TestCase):
'ip_address': t_constants.MOCK_IP_ADDRESS2,
'subnet': None
}]
}
})
self.driver.unplug_fixed_ip(t_constants.MOCK_PORT_ID,
t_constants.MOCK_SUBNET_ID)
expected_body = {
'port': {
'fixed_ips': [
{
'subnet_id': t_constants.MOCK_SUBNET_ID2,
'ip_address': t_constants.MOCK_IP_ADDRESS2,
'subnet': None
}
]
}
}
self.driver.neutron_client.update_port.assert_called_once_with(
t_constants.MOCK_PORT_ID,
expected_body)
def test_unplug_fixed_ip_exception(self):
show_port = self.driver.neutron_client.show_port
show_port.return_value = {
'id': t_constants.MOCK_PORT_ID,
'fixed_ips': [
{
'subnet_id': t_constants.MOCK_SUBNET_ID,
'ip_address': t_constants.MOCK_IP_ADDRESS,
'subnet_id': t_constants.MOCK_SUBNET_ID2,
'ip_address': t_constants.MOCK_IP_ADDRESS2,
'subnet': None
}]
}
]
}
self.driver.network_proxy.update_port.assert_called_once_with(
t_constants.MOCK_PORT_ID,
**expected_body)
self.driver.neutron_client.update_port.side_effect = Exception
def test_unplug_fixed_ip_exception(self):
show_port = self.driver.network_proxy.get_port
show_port.return_value = Port(
device_id=t_constants.MOCK_PORT_ID,
fixed_ips=[(t_constants.MOCK_IP_ADDRESS,
t_constants.MOCK_SUBNET_ID)],
)
self.driver.network_proxy.update_port.side_effect = Exception
self.assertRaises(network_base.NetworkException,
self.driver.unplug_fixed_ip,

View File

@ -37,8 +37,8 @@ class TestNeutronUtils(base.TestCase):
if hay[key] is not None})
self.assertIn(newneedle, newhaystack)
def test_convert_subnet_dict_to_model(self):
model_obj = utils.convert_subnet_dict_to_model(
def test_convert_subnet_to_model(self):
model_obj = utils.convert_subnet_to_model(
t_constants.MOCK_SUBNET)
assert_dict = dict(
id=t_constants.MOCK_SUBNET_ID,
@ -52,8 +52,8 @@ class TestNeutronUtils(base.TestCase):
)
self._compare_ignore_value_none(model_obj.to_dict(), assert_dict)
def test_convert_port_dict_to_model(self):
model_obj = utils.convert_port_dict_to_model(
def test_convert_port_to_model(self):
model_obj = utils.convert_port_to_model(
t_constants.MOCK_NEUTRON_PORT)
assert_dict = dict(
id=t_constants.MOCK_PORT_ID,
@ -69,12 +69,12 @@ class TestNeutronUtils(base.TestCase):
security_group_ids=[],
)
self._compare_ignore_value_none(model_obj.to_dict(), assert_dict)
fixed_ips = t_constants.MOCK_NEUTRON_PORT['port']['fixed_ips']
fixed_ips = t_constants.MOCK_NEUTRON_PORT['fixed_ips']
for ip in model_obj.fixed_ips:
self._in_ignore_value_none(ip.to_dict(), fixed_ips)
def test_convert_network_dict_to_model(self):
model_obj = utils.convert_network_dict_to_model(
def test_convert_network_to_model(self):
model_obj = utils.convert_network_to_model(
t_constants.MOCK_NETWORK)
assert_dict = dict(
id=t_constants.MOCK_NETWORK_ID,
@ -86,11 +86,12 @@ class TestNeutronUtils(base.TestCase):
provider_network_type=t_constants.MOCK_NETWORK_TYPE,
provider_physical_network=t_constants.MOCK_NETWORK_NAME,
provider_segmentation_id=t_constants.MOCK_SEGMENTATION_ID,
router_external=t_constants.MOCK_ROUTER_EXTERNAL
router_external=t_constants.MOCK_ROUTER_EXTERNAL,
port_security_enabled=False,
)
model_dict = model_obj.to_dict()
model_dict['subnets'] = model_obj.subnets
self._compare_ignore_value_none(model_dict, assert_dict)
self._compare_ignore_value_none(assert_dict, model_dict)
def test_convert_fixed_ip_dict_to_model(self):
model_obj = utils.convert_fixed_ip_dict_to_model(
@ -99,31 +100,14 @@ class TestNeutronUtils(base.TestCase):
subnet_id=t_constants.MOCK_SUBNET_ID,
ip_address=t_constants.MOCK_IP_ADDRESS
)
self._compare_ignore_value_none(model_obj.to_dict(), assert_dict)
self._compare_ignore_value_none(assert_dict, model_obj.to_dict())
def test_convert_floatingip_dict_to_model(self):
model_obj = utils.convert_floatingip_dict_to_model(
t_constants.MOCK_FLOATING_IP)
assert_dict = dict(
id=t_constants.MOCK_FLOATING_IP_ID,
description=t_constants.MOCK_FLOATING_IP_DESC,
project_id=t_constants.MOCK_PROJECT_ID,
status=t_constants.MOCK_STATUS,
router_id=t_constants.MOCK_ROUTER_ID,
port_id=t_constants.MOCK_PORT_ID,
floating_network_id=t_constants.MOCK_NETWORK_ID,
network_id=t_constants.MOCK_NETWORK_ID,
floating_ip_address=t_constants.MOCK_IP_ADDRESS,
fixed_ip_address=t_constants.MOCK_IP_ADDRESS2,
fixed_port_id=t_constants.MOCK_PORT_ID2
)
self._compare_ignore_value_none(model_obj.to_dict(), assert_dict)
def test_convert_network_ip_availability_dict_to_model(self):
model_obj = utils.convert_network_ip_availability_dict_to_model(
def test_convert_network_ip_availability_to_model(self):
model_obj = utils.convert_network_ip_availability_to_model(
t_constants.MOCK_NETWORK_IP_AVAILABILITY)
assert_dict = dict(
network_id=t_constants.MOCK_NETWORK_ID,
project_id=t_constants.MOCK_PROJECT_ID,
tenant_id=t_constants.MOCK_PROJECT_ID,
network_name=t_constants.MOCK_NETWORK_NAME,
total_ips=t_constants.MOCK_NETWORK_TOTAL_IPS,

View File

@ -0,0 +1,23 @@
---
upgrade:
- |
Authentication settings for Neutron should be added
directly to the [neutron] section of the configuration now. The exact
settings depend on the `auth_type` used. Refer to
https://docs.openstack.org/keystoneauth/latest/plugin-options.html
for a list of possible options.
deprecations:
- |
In a future release Octavia will no longer take the authentication
settings for Neutron from the [service_auth] as a fallback. It will
require them to be in the [neutron] section. The *endpoint* option is now
deprecated and replaced by *endpoint_override*. Similarly, the
new name of the *endpoint_type* option is now *valid_interfaces* and
the new name of the *ca_certificates_file* option is now *cafile*.
Note that [service_auth]
settings will still be used for other services like Nova and Glance.
other:
- |
Replaced code that uses the deprecated python-neutronclient library with
code that uses openstacksdk and removed python-neutronclient as a
dependency.

View File

@ -16,9 +16,9 @@ requests>=2.23.0 # Apache-2.0
rfc3986>=1.2.0 # Apache-2.0
keystoneauth1>=3.4.0 # Apache-2.0
keystonemiddleware>=9.5.0 # Apache-2.0
python-neutronclient>=6.7.0 # Apache-2.0
WebOb>=1.8.2 # MIT
stevedore>=1.20.0 # Apache-2.0
openstacksdk>=0.103.0 # Apache-2.0
oslo.config>=6.8.0 # Apache-2.0
oslo.context>=2.22.0 # Apache-2.0
oslo.db[mysql]>=8.4.0 # Apache-2.0

View File

@ -330,7 +330,7 @@ Aside from the API, are there other ways a user will interact with this
feature? Keep in mind that 'user' in this context could mean either tenant or
operator.
* Does this change have an impact on python-neutronclient? What does the user
* Does this change have an impact on openstacksdk? What does the user
interface there look like?
Performance Impact