Setup base neutron network driver

There are common methods that all neutron based network drivers will most
likely implement the same way.  To prevent duplicate code, a common base driver
for all neutron drivers has been created. In the process of splitting this out,
cleaning up of some existing code was done as well.

The network driver's plug_network and unplug_network methods took an
amphora_id as a parameter, but it was always assumed to be the compute_id.
This parameter has been changed to compute_id.

The octavia interface network model originally had just a single ip_address,
but to more accurately reflect what neutron and probably other networking
as a services will return, this has been changed to fixed_ips because
interfaces and ports can have multiple ip addresses.

Other cleanup includes calling the network drivers own get_network, get_subnet,
and get_port methods instead of calling the neutron client's show_network,
show_subnet, and show_port methods.  Also, all of these changes required some
test changes as well.

Change-Id: Ie6ebc5bc8babe8562c280ba12a1feab21b4ff3f9
This commit is contained in:
Brandon Logan 2015-08-03 23:02:29 -05:00
parent 71e5725a7f
commit c7617ff992
10 changed files with 688 additions and 464 deletions

View File

@ -92,7 +92,7 @@ class AbstractNetworkDriver(object):
:param load_balancer: octavia.common.data_models.LoadBalancer instance
:return: octavia.common.data_models.VIP
:raises: AllocateVIPException, PortNotFound, NetworkNotFound
:raises: AllocateVIPException, PortNotFound, SubnetNotFound
"""
pass
@ -119,7 +119,7 @@ class AbstractNetworkDriver(object):
:return: dict consisting of amphora_id as key and bind_ip as value.
bind_ip is the ip that the amphora should listen on to
receive traffic to load balance.
:raises: PlugVIPException
:raises: PlugVIPException, PortNotFound
"""
pass
@ -138,10 +138,10 @@ class AbstractNetworkDriver(object):
pass
@abc.abstractmethod
def plug_network(self, amphora_id, network_id, ip_address=None):
def plug_network(self, compute_id, network_id, ip_address=None):
"""Connects an existing amphora to an existing network.
:param amphora_id: id of an amphora in the compute service
:param compute_id: id of an amphora in the compute service
:param network_id: id of a network
:param ip_address: ip address to attempt to be assigned to interface
:return: octavia.network.data_models.Interface instance
@ -149,27 +149,27 @@ class AbstractNetworkDriver(object):
"""
@abc.abstractmethod
def unplug_network(self, amphora_id, network_id, ip_address=None):
def unplug_network(self, compute_id, network_id, ip_address=None):
"""Disconnects an existing amphora from an existing network.
If ip_address is not specificed, all the interfaces plugged on
network_id should be unplugged.
:param amphora_id: id of an amphora in the compute service
:param compute_id: id of an amphora in the compute service
:param network_id: id of a network
:param ip_address: specific ip_address to unplug
:return: None
:raises: UnplugNetworkException, AmphoraNotFound, NetworkNotFound
:raises: UnplugNetworkException, AmphoraNotFound, NetworkNotFound,
NetworkException
"""
pass
@abc.abstractmethod
def get_plugged_networks(self, amphora_id):
def get_plugged_networks(self, compute_id):
"""Retrieves the current plugged networking configuration.
:param amphora_id: id of an amphora in the compute service
:param compute_id: id of an amphora in the compute service
:return: [octavia.network.data_models.Instance]
:raises: AmphoraNotFound
"""
def update_vip(self, load_balancer):

View File

@ -17,13 +17,13 @@ from octavia.common import data_models
class Interface(data_models.BaseDataModel):
def __init__(self, id=None, amphora_id=None, network_id=None,
ip_address=None, port_id=None):
def __init__(self, id=None, compute_id=None, network_id=None,
fixed_ips=None, port_id=None):
self.id = id
self.amphora_id = amphora_id
self.compute_id = compute_id
self.network_id = network_id
self.port_id = port_id
self.ip_address = ip_address
self.fixed_ips = fixed_ips
class Delta(data_models.BaseDataModel):

View File

@ -13,7 +13,6 @@
# under the License.
from neutronclient.common import exceptions as neutron_client_exceptions
from neutronclient.neutron import client as neutron_client
from novaclient import client as nova_client
from novaclient import exceptions as nova_client_exceptions
from oslo_log import log as logging
@ -23,73 +22,43 @@ from octavia.common import data_models
from octavia.common import keystone
from octavia.i18n import _LE, _LI
from octavia.network import base
from octavia.network import data_models as network_models
from octavia.network.drivers.neutron import base as neutron_base
from octavia.network.drivers.neutron import utils
LOG = logging.getLogger(__name__)
NEUTRON_VERSION = '2.0'
NOVA_VERSION = '2'
AAP_EXT_ALIAS = 'allowed-address-pairs'
SEC_GRP_EXT_ALIAS = 'security-group'
VIP_SECURITY_GRP_PREFIX = 'lb-'
class AllowedAddressPairsDriver(base.AbstractNetworkDriver):
class AllowedAddressPairsDriver(neutron_base.BaseNeutronDriver):
def __init__(self):
self.sec_grp_enabled = True
self.neutron_client = neutron_client.Client(
NEUTRON_VERSION, session=keystone.get_session())
self._check_extensions_loaded()
super(AllowedAddressPairsDriver, self).__init__()
self._check_aap_loaded()
self.nova_client = nova_client.Client(
NOVA_VERSION, session=keystone.get_session())
def _check_extensions_loaded(self):
extensions = self.neutron_client.list_extensions()
extensions = extensions.get('extensions')
aliases = [ext.get('alias') for ext in extensions]
def _check_aap_loaded(self):
aliases = [ext.get('alias') for ext in self._extensions]
if AAP_EXT_ALIAS not in aliases:
raise base.NetworkException(
'The {alias} extension is not enabled in neutron. This '
'driver cannot be used with the {alias} extension '
'disabled.'.format(alias=AAP_EXT_ALIAS))
if SEC_GRP_EXT_ALIAS not in aliases:
LOG.info(_LI('Neutron security groups are disabled. This driver'
'will not manage any security groups.'))
self.sec_grp_enabled = False
def _port_to_vip(self, port, load_balancer):
port = port['port']
fixed_ip = {}
for port_fixed_ip in port['fixed_ips']:
if port_fixed_ip['subnet_id'] == load_balancer.vip.subnet_id:
fixed_ip = port_fixed_ip
break
port_id = port['id']
return data_models.Vip(ip_address=fixed_ip.get('ip_address'),
subnet_id=fixed_ip.get('subnet_id'),
port_id=port_id,
load_balancer_id=load_balancer.id)
def _nova_interface_to_octavia_interface(self, amphora_id, nova_interface):
ip_address = nova_interface.fixed_ips[0]['ip_address']
return network_models.Interface(amphora_id=amphora_id,
network_id=nova_interface.net_id,
port_id=nova_interface.port_id,
ip_address=ip_address)
def _get_interfaces_to_unplug(self, interfaces, network_id,
ip_address=None):
ret = []
for interface_ in interfaces:
if interface_.net_id == network_id:
for interface in interfaces:
if interface.network_id == network_id:
if ip_address:
for fixed_ip in interface_.fixed_ips:
if ip_address == fixed_ip.get('ip_address'):
ret.append(interface_)
for fixed_ip in interface.fixed_ips:
if ip_address == fixed_ip.ip_address:
ret.append(interface)
else:
ret.append(interface_)
ret.append(interface)
return ret
def _get_plugged_interface(self, compute_id, network_id):
@ -112,14 +81,7 @@ class AllowedAddressPairsDriver(base.AbstractNetworkDriver):
def _add_vip_address_pair(self, port_id, vip_address):
try:
aap = {
'port': {
'allowed_address_pairs': [
{'ip_address': vip_address}
]
}
}
self.neutron_client.update_port(port_id, aap)
self._add_allowed_address_pair_to_port(port_id, vip_address)
except neutron_client_exceptions.PortNotFoundClient as e:
raise base.PortNotFound(e.message)
except Exception:
@ -132,7 +94,7 @@ class AllowedAddressPairsDriver(base.AbstractNetworkDriver):
def _get_lb_security_group(self, load_balancer_id):
sec_grp_name = VIP_SECURITY_GRP_PREFIX + load_balancer_id
sec_grps = self.neutron_client.list_security_groups(name=sec_grp_name)
if len(sec_grps.get('security_groups')):
if sec_grps and sec_grps.get('security_groups'):
return sec_grps.get('security_groups')[0]
def _update_security_group_rules(self, load_balancer, sec_grp_id):
@ -153,49 +115,40 @@ class AllowedAddressPairsDriver(base.AbstractNetworkDriver):
self.neutron_client.delete_security_group_rule(rule.get('id'))
for port in add_ports:
rule = {
'security_group_rule': {
'security_group_id': sec_grp_id,
'direction': 'ingress',
'protocol': 'TCP',
'port_range_min': port,
'port_range_max': port
}
}
self.neutron_client.create_security_group_rule(rule)
self._create_security_group_rule(sec_grp_id, 'TCP', port_min=port,
port_max=port)
def _update_vip_security_group(self, load_balancer, vip):
sec_grp = self._get_lb_security_group(load_balancer.id)
if not sec_grp:
sec_grp_name = VIP_SECURITY_GRP_PREFIX + load_balancer.id
new_sec_grp = {'security_group': {'name': sec_grp_name}}
sec_grp = self.neutron_client.create_security_group(new_sec_grp)
sec_grp = sec_grp['security_group']
sec_grp = self._create_security_group(sec_grp_name)
self._update_security_group_rules(load_balancer, sec_grp.get('id'))
port_update = {'port': {'security_groups': [sec_grp.get('id')]}}
self._add_vip_security_group_to_port(load_balancer.id, vip.port_id)
def _add_vip_security_group_to_port(self, load_balancer_id, port_id):
sec_grp = self._get_lb_security_group(load_balancer_id)
try:
self.neutron_client.update_port(vip.port_id, port_update)
except neutron_client_exceptions.PortNotFoundClient as e:
raise base.PortNotFound(e.message)
except Exception as e:
self._add_security_group_to_port(sec_grp.get('id'), port_id)
except base.PortNotFound as e:
raise e
except base.NetworkException as e:
raise base.PlugVIPException(str(e))
def _add_vip_security_group_to_amphorae(self, load_balancer_id, amphora):
sec_grp = self._get_lb_security_group(load_balancer_id)
self.nova_client.servers.add_security_group(
amphora.compute_id, sec_grp.get('id'))
def deallocate_vip(self, vip):
port = self.neutron_client.show_port(vip.port_id)
try:
port = self.get_port(vip.port_id)
except base.PortNotFound:
msg = ("Can't deallocate VIP because the vip port {0} cannot be "
"found in neutron".format(vip.port_id))
raise base.VIPConfigurationNotFound(msg)
admin_tenant_id = keystone.get_session().get_project_id()
if port.get('port').get('tenant_id') != admin_tenant_id:
if port.tenant_id != admin_tenant_id:
LOG.info(_LI("Port {0} will not be deleted by Octavia as it was "
"not created by Octavia.").format(vip.port_id))
return
try:
self.neutron_client.delete_port(vip.port_id)
except neutron_client_exceptions.PortNotFoundClient as e:
raise base.VIPConfigurationNotFound(e.message)
except Exception:
message = _LE('Error deleting VIP port_id {port_id} from '
'neutron').format(port_id=vip.port_id)
@ -206,28 +159,26 @@ class AllowedAddressPairsDriver(base.AbstractNetworkDriver):
if self.sec_grp_enabled:
self._update_vip_security_group(load_balancer, vip)
plugged_amphorae = []
try:
subnet = self.neutron_client.show_subnet(vip.subnet_id)
except Exception:
message = _LE('Error retrieving subnet {subnet_id}').format(
subnet_id=load_balancer.vip.subnet_id)
LOG.exception(message)
raise base.NetworkNotFound(message)
subnet = subnet['subnet']
subnet = self.get_subnet(vip.subnet_id)
for amphora in load_balancer.amphorae:
interface = self._get_plugged_interface(amphora.compute_id,
subnet['network_id'])
subnet.network_id)
if not interface:
interface = self._plug_amphora_vip(amphora.compute_id,
subnet['network_id'])
subnet.network_id)
self._add_vip_address_pair(interface.port_id, vip.ip_address)
if self.sec_grp_enabled:
self._add_vip_security_group_to_amphorae(
load_balancer.id, amphora)
self._add_vip_security_group_to_port(load_balancer.id,
interface.port_id)
vrrp_ip = None
for fixed_ip in interface.fixed_ips:
if fixed_ip.subnet_id == subnet.id:
vrrp_ip = fixed_ip.ip_address
break
plugged_amphorae.append(data_models.Amphora(
id=amphora.id,
compute_id=amphora.compute_id,
vrrp_ip=interface.ip_address,
vrrp_ip=vrrp_ip,
ha_ip=vip.ip_address,
vrrp_port_id=interface.port_id,
ha_port_id=vip.port_id))
@ -241,66 +192,45 @@ class AllowedAddressPairsDriver(base.AbstractNetworkDriver):
if load_balancer.vip.port_id:
LOG.info(_LI('Port {port_id} already exists. Nothing to be '
'done.').format(port_id=load_balancer.vip.port_id))
try:
port = self.neutron_client.show_port(load_balancer.vip.port_id)
except neutron_client_exceptions.PortNotFoundClient as e:
raise base.PortNotFound(e.message)
except Exception:
message = _LE('Error retrieving info about port '
'{port_id}.').format(
port_id=load_balancer.vip.port_id)
LOG.exception(message)
raise base.AllocateVIPException(message)
port = self.get_port(load_balancer.vip.port_id)
return self._port_to_vip(port, load_balancer)
# Must retrieve the network_id from the subnet
try:
subnet = self.neutron_client.show_subnet(
load_balancer.vip.subnet_id)
except Exception:
message = _LE('Error retrieving subnet {subnet_id}').format(
subnet_id=load_balancer.vip.subnet_id)
LOG.exception(message)
raise base.NetworkNotFound(message)
subnet = subnet['subnet']
subnet = self.get_subnet(load_balancer.vip.subnet_id)
# It can be assumed that network_id exists
port = {'port': {'name': 'octavia-lb-' + load_balancer.id,
'network_id': subnet['network_id'],
'network_id': subnet.network_id,
'admin_state_up': False,
'device_id': '',
'device_owner': ''}}
try:
new_port = self.neutron_client.create_port(port)
except neutron_client_exceptions.NetworkNotFoundClient as e:
raise base.NetworkNotFound(e.message)
except Exception:
message = _LE('Error creating neutron port on network '
'{network_id}.').format(
network_id=subnet['network_id'])
network_id=subnet.network_id)
LOG.exception(message)
raise base.AllocateVIPException(message)
new_port = utils.convert_port_dict_to_model(new_port)
return self._port_to_vip(new_port, load_balancer)
def unplug_vip(self, load_balancer, vip):
try:
subnet = self.neutron_client.show_subnet(vip.subnet_id)
except Exception:
message = _LE('Error retrieving subnet {subnet_id}').format(
subnet_id=load_balancer.vip.subnet_id)
LOG.exception(message)
raise base.NetworkNotFound(message)
subnet = subnet['subnet']
subnet = self.get_subnet(vip.subnet_id)
except base.SubnetNotFound:
msg = ("Can't unplug vip because vip subnet {0} was not "
"found").format(vip.subnet_id)
raise base.PluggedVIPNotFound(msg)
for amphora in load_balancer.amphorae:
interface = self._get_plugged_interface(amphora.compute_id,
subnet['network_id'])
subnet.network_id)
if not interface:
# Thought about raising PluggedVIPNotFound exception but
# then that wouldn't evaluate all amphorae, so just continue
continue
try:
self.unplug_network(amphora.compute_id, subnet['network_id'])
self.unplug_network(amphora.compute_id, subnet.network_id)
except Exception:
pass
try:
@ -318,7 +248,7 @@ class AllowedAddressPairsDriver(base.AbstractNetworkDriver):
def plug_network(self, compute_id, network_id, ip_address=None):
try:
interface_ = self.nova_client.servers.interface_attach(
interface = self.nova_client.servers.interface_attach(
server=compute_id, net_id=network_id, fixed_ip=ip_address,
port_id=None)
except nova_client_exceptions.NotFound as e:
@ -334,36 +264,14 @@ class AllowedAddressPairsDriver(base.AbstractNetworkDriver):
LOG.exception(message)
raise base.PlugNetworkException(message)
return self._nova_interface_to_octavia_interface(compute_id,
interface_)
def get_plugged_networks(self, amphora_id):
try:
interfaces = self.nova_client.servers.interface_list(
server=amphora_id)
except Exception:
message = _LE('Error retrieving plugged networks for amphora '
'{amphora_id}.').format(amphora_id=amphora_id)
LOG.exception(message)
raise base.NetworkException(message)
return [self._nova_interface_to_octavia_interface(
amphora_id, interface_) for interface_ in interfaces]
return self._nova_interface_to_octavia_interface(compute_id, interface)
def unplug_network(self, compute_id, network_id, ip_address=None):
try:
interfaces = self.nova_client.servers.interface_list(
server=compute_id)
except nova_client_exceptions.NotFound as e:
raise base.AmphoraNotFound(e.message)
except Exception:
message = _LE('Error retrieving nova interfaces for amphora '
'(compute_id: {compute_id}) on network {network_id} '
'with ip {ip_address}.').format(
compute_id=compute_id,
network_id=network_id,
ip_address=ip_address)
LOG.exception(message)
raise base.NetworkException(message)
interfaces = self.get_plugged_networks(compute_id)
if not interfaces:
msg = ('Amphora with compute id {compute_id} does not have any '
'plugged networks').format(compute_id=compute_id)
raise base.AmphoraNotFound(msg)
unpluggers = self._get_interfaces_to_unplug(interfaces, network_id,
ip_address=ip_address)
try:
@ -391,54 +299,3 @@ class AllowedAddressPairsDriver(base.AbstractNetworkDriver):
def update_vip(self, load_balancer):
sec_grp = self._get_lb_security_group(load_balancer.id)
self._update_security_group_rules(load_balancer, sec_grp.get('id'))
def get_network(self, network_id):
try:
network = self.neutron_client.show_network(network_id)
return utils.convert_network_dict_to_model(network)
except neutron_client_exceptions.NotFound:
message = _LE('Network not found '
'(network id: {network_id}.').format(
network_id=network_id)
LOG.exception(message)
raise base.NetworkNotFound(message)
except Exception:
message = _LE('Error retrieving network '
'(network id: {network_id}.').format(
network_id=network_id)
LOG.exception(message)
raise base.NetworkException(message)
def get_subnet(self, subnet_id):
try:
subnet = self.neutron_client.show_subnet(subnet_id)
return utils.convert_subnet_dict_to_model(subnet)
except neutron_client_exceptions.NotFound:
message = _LE('Subnet not found '
'(subnet id: {subnet_id}.').format(
subnet_id=subnet_id)
LOG.exception(message)
raise base.SubnetNotFound(message)
except Exception:
message = _LE('Error retrieving subnet '
'(subnet id: {subnet_id}.').format(
subnet_id=subnet_id)
LOG.exception(message)
raise base.NetworkException(message)
def get_port(self, port_id):
try:
port = self.neutron_client.show_port(port_id)
return utils.convert_port_dict_to_model(port)
except neutron_client_exceptions.NotFound:
message = _LE('Port not found '
'(port id: {port_id}.').format(
port_id=port_id)
LOG.exception(message)
raise base.PortNotFound(message)
except Exception:
message = _LE('Error retrieving port '
'(port id: {port_id}.').format(
port_id=port_id)
LOG.exception(message)
raise base.NetworkException(message)

View File

@ -0,0 +1,175 @@
# Copyright 2015 Rackspace
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from neutronclient.common import exceptions as neutron_client_exceptions
from neutronclient.neutron import client as neutron_client
from oslo_log import log as logging
from octavia.common import data_models
from octavia.common import keystone
from octavia.i18n import _LE, _LI
from octavia.network import base
from octavia.network import data_models as network_models
from octavia.network.drivers.neutron import utils
LOG = logging.getLogger(__name__)
NEUTRON_VERSION = '2.0'
SEC_GRP_EXT_ALIAS = 'security-group'
class BaseNeutronDriver(base.AbstractNetworkDriver):
def __init__(self):
self.sec_grp_enabled = True
self.neutron_client = neutron_client.Client(
NEUTRON_VERSION, session=keystone.get_session())
extensions = self.neutron_client.list_extensions()
self._extensions = extensions.get('extensions')
self._check_sec_grps()
def _check_sec_grps(self):
aliases = [ext.get('alias') for ext in self._extensions]
if SEC_GRP_EXT_ALIAS not in aliases:
LOG.info(_LI('Neutron security groups are disabled. This driver'
'will not manage any security groups.'))
self.sec_grp_enabled = False
def _port_to_vip(self, port, load_balancer):
fixed_ip = None
for port_fixed_ip in port.fixed_ips:
if port_fixed_ip.subnet_id == load_balancer.vip.subnet_id:
fixed_ip = port_fixed_ip
break
return data_models.Vip(ip_address=fixed_ip.ip_address,
subnet_id=fixed_ip.subnet_id,
port_id=port.id,
load_balancer_id=load_balancer.id)
def _nova_interface_to_octavia_interface(self, compute_id, nova_interface):
fixed_ips = [utils.convert_fixed_ip_dict_to_model(fixed_ip)
for fixed_ip in nova_interface.fixed_ips]
return network_models.Interface(compute_id=compute_id,
network_id=nova_interface.net_id,
port_id=nova_interface.port_id,
fixed_ips=fixed_ips)
def _port_to_octavia_interface(self, compute_id, port):
fixed_ips = [utils.convert_fixed_ip_dict_to_model(fixed_ip)
for fixed_ip in port.get('fixed_ips', [])]
return network_models.Interface(compute_id=compute_id,
network_id=port['network_id'],
port_id=port['id'],
fixed_ips=fixed_ips)
def _add_allowed_address_pair_to_port(self, port_id, ip_address):
aap = {
'port': {
'allowed_address_pairs': [
{'ip_address': ip_address}
]
}
}
self.neutron_client.update_port(port_id, aap)
def _add_security_group_to_port(self, sec_grp_id, port_id):
port_update = {'port': {'security_groups': [sec_grp_id]}}
try:
self.neutron_client.update_port(port_id, port_update)
except neutron_client_exceptions.PortNotFoundClient as e:
raise base.PortNotFound(e.message)
except Exception as e:
raise base.NetworkException(str(e))
def _create_security_group(self, name):
new_sec_grp = {'security_group': {'name': name}}
sec_grp = self.neutron_client.create_security_group(new_sec_grp)
return sec_grp['security_group']
def _create_security_group_rule(self, sec_grp_id, protocol,
direction='ingress', port_min=None,
port_max=None):
rule = {
'security_group_rule': {
'security_group_id': sec_grp_id,
'direction': direction,
'protocol': protocol,
'port_range_min': port_min,
'port_range_max': port_max
}
}
self.neutron_client.create_security_group_rule(rule)
def get_plugged_networks(self, compute_id):
# List neutron ports associated with the Amphora
try:
ports = self.neutron_client.list_ports(device_id=compute_id)
except Exception:
message = ('Error retrieving plugged networks for compute '
'device {compute_id}.').format(compute_id=compute_id)
LOG.debug(message)
ports = {'ports': []}
return [self._port_to_octavia_interface(
compute_id, port) for port in ports['ports']]
def get_network(self, network_id):
try:
network = self.neutron_client.show_network(network_id)
return utils.convert_network_dict_to_model(network)
except neutron_client_exceptions.NotFound:
message = _LE('Network not found '
'(network id: {network_id}.').format(
network_id=network_id)
LOG.exception(message)
raise base.NetworkNotFound(message)
except Exception:
message = _LE('Error retrieving network '
'(network id: {network_id}.').format(
network_id=network_id)
LOG.exception(message)
raise base.NetworkException(message)
def get_subnet(self, subnet_id):
try:
subnet = self.neutron_client.show_subnet(subnet_id)
return utils.convert_subnet_dict_to_model(subnet)
except neutron_client_exceptions.NotFound:
message = _LE('Subnet not found '
'(subnet id: {subnet_id}.').format(
subnet_id=subnet_id)
LOG.exception(message)
raise base.SubnetNotFound(message)
except Exception:
message = _LE('Error retrieving subnet '
'(subnet id: {subnet_id}.').format(
subnet_id=subnet_id)
LOG.exception(message)
raise base.NetworkException(message)
def get_port(self, port_id):
try:
port = self.neutron_client.show_port(port_id)
return utils.convert_port_dict_to_model(port)
except neutron_client_exceptions.NotFound:
message = _LE('Port not found '
'(port id: {port_id}.').format(
port_id=port_id)
LOG.exception(message)
raise base.PortNotFound(message)
except Exception:
message = _LE('Error retrieving port '
'(port id: {port_id}.').format(
port_id=port_id)
LOG.exception(message)
raise base.NetworkException(message)

View File

@ -21,7 +21,7 @@ LOG = logging.getLogger(__name__)
def convert_subnet_dict_to_model(subnet_dict):
subnet = subnet_dict.get('subnet') or subnet_dict
subnet = subnet_dict.get('subnet', subnet_dict)
return network_models.Subnet(id=subnet.get('id'), name=subnet.get('name'),
network_id=subnet.get('network_id'),
tenant_id=subnet.get('tenant_id'),
@ -32,7 +32,7 @@ def convert_subnet_dict_to_model(subnet_dict):
def convert_port_dict_to_model(port_dict):
port = port_dict.get('port') or port_dict
port = port_dict.get('port', port_dict)
fixed_ips = [network_models.FixedIP(subnet_id=fixed_ip.get('subnet_id'),
ip_address=fixed_ip.get('ip_address'))
for fixed_ip in port.get('fixed_ips', [])]
@ -51,7 +51,7 @@ def convert_port_dict_to_model(port_dict):
def convert_network_dict_to_model(network_dict):
nw = network_dict.get('network') or network_dict
nw = network_dict.get('network', network_dict)
return network_models.Network(
id=nw.get('id'),
name=nw.get('name'),
@ -64,3 +64,9 @@ def convert_network_dict_to_model(network_dict):
provider_segmentation_id=nw.get('provider:segmentation_id'),
router_external=nw.get('router:external')
)
def convert_fixed_ip_dict_to_model(fixed_ip_dict):
fixed_ip = fixed_ip_dict.get('fixed_ip', fixed_ip_dict)
return network_models.FixedIP(subnet_id=fixed_ip.get('subnet_id'),
ip_address=fixed_ip.get('ip_address'))

View File

@ -52,25 +52,25 @@ class NoopManager(object):
vip.ip_address)] = (load_balancer, vip,
'unplug_vip')
def plug_network(self, amphora_id, network_id, ip_address=None):
LOG.debug("Network %s no-op, plug_network amphora_id %s, network_id "
"%s, ip_address %s", self.__class__.__name__, amphora_id,
def plug_network(self, compute_id, network_id, ip_address=None):
LOG.debug("Network %s no-op, plug_network compute_id %s, network_id "
"%s, ip_address %s", self.__class__.__name__, compute_id,
network_id, ip_address)
self.networkconfigconfig[(amphora_id, network_id, ip_address)] = (
amphora_id, network_id, ip_address, 'plug_network')
self.networkconfigconfig[(compute_id, network_id, ip_address)] = (
compute_id, network_id, ip_address, 'plug_network')
def unplug_network(self, amphora_id, network_id, ip_address=None):
LOG.debug("Network %s no-op, unplug_network amphora_id %s, "
def unplug_network(self, compute_id, network_id, ip_address=None):
LOG.debug("Network %s no-op, unplug_network compute_id %s, "
"network_id %s",
self.__class__.__name__, amphora_id, network_id)
self.networkconfigconfig[(amphora_id, network_id, ip_address)] = (
amphora_id, network_id, ip_address, 'unplug_network')
self.__class__.__name__, compute_id, network_id)
self.networkconfigconfig[(compute_id, network_id, ip_address)] = (
compute_id, network_id, ip_address, 'unplug_network')
def get_plugged_networks(self, amphora_id):
def get_plugged_networks(self, compute_id):
LOG.debug("Network %s no-op, get_plugged_networks amphora_id %s",
self.__class__.__name__, amphora_id)
self.networkconfigconfig[amphora_id] = (
amphora_id, 'get_plugged_networks')
self.__class__.__name__, compute_id)
self.networkconfigconfig[compute_id] = (
compute_id, 'get_plugged_networks')
def update_vip(self, load_balancer):
LOG.debug("Network %s no-op, update_vip load_balancer %s",

View File

@ -0,0 +1,38 @@
# Copyright 2015 Rackspace
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
class MockNovaInterface(object):
net_id = None
port_id = None
fixed_ips = []
MOCK_NETWORK_ID = '1'
MOCK_SUBNET_ID = '2'
MOCK_PORT_ID = '3'
MOCK_COMPUTE_ID = '4'
MOCK_IP_ADDRESS = '10.0.0.1'
MOCK_CIDR = '10.0.0.0/24'
MOCK_MAC_ADDR = 'fe:16:3e:00:95:5c'
MOCK_NOVA_INTERFACE = MockNovaInterface()
MOCK_SUBNET = {'subnet': {'id': MOCK_SUBNET_ID, 'network_id': MOCK_NETWORK_ID}}
MOCK_NOVA_INTERFACE.net_id = MOCK_NETWORK_ID
MOCK_NOVA_INTERFACE.port_id = MOCK_PORT_ID
MOCK_NOVA_INTERFACE.fixed_ips = [{'ip_address': MOCK_IP_ADDRESS}]
MOCK_NEUTRON_PORT = {'port': {'network_id': MOCK_NETWORK_ID,
'device_id': MOCK_COMPUTE_ID,
'id': MOCK_PORT_ID,
'fixed_ips': [{'ip_address': MOCK_IP_ADDRESS,
'subnet_id': MOCK_SUBNET_ID}]}}

View File

@ -21,32 +21,10 @@ from octavia.common import data_models
from octavia.network import base as network_base
from octavia.network import data_models as network_models
from octavia.network.drivers.neutron import allowed_address_pairs
from octavia.network.drivers.neutron import base as neutron_base
from octavia.tests.common import data_model_helpers as dmh
from octavia.tests.unit import base
class MockNovaInterface(object):
net_id = None
port_id = None
fixed_ips = []
MOCK_NETWORK_ID = '1'
MOCK_SUBNET_ID = '2'
MOCK_PORT_ID = '3'
MOCK_COMPUTE_ID = '4'
MOCK_IP_ADDRESS = '10.0.0.1'
MOCK_CIDR = '10.0.0.0/24'
MOCK_MAC_ADDR = 'fe:16:3e:00:95:5c'
MOCK_NOVA_INTERFACE = MockNovaInterface()
MOCK_SUBNET = {'subnet': {'id': MOCK_SUBNET_ID, 'network_id': MOCK_NETWORK_ID}}
MOCK_NOVA_INTERFACE.net_id = MOCK_NETWORK_ID
MOCK_NOVA_INTERFACE.port_id = MOCK_PORT_ID
MOCK_NOVA_INTERFACE.fixed_ips = [{'ip_address': MOCK_IP_ADDRESS}]
MOCK_NEUTRON_PORT = {'port': {'network_id': MOCK_NETWORK_ID,
'id': MOCK_PORT_ID,
'fixed_ips': [{'ip_address': MOCK_IP_ADDRESS,
'subnet_id': MOCK_SUBNET_ID}]}}
from octavia.tests.unit.network.drivers.neutron import constants as n_constants
class TestAllowedAddressPairsDriver(base.TestCase):
@ -58,58 +36,31 @@ class TestAllowedAddressPairsDriver(base.TestCase):
with mock.patch('neutronclient.neutron.client.Client',
autospec=True) as neutron_client:
with mock.patch('novaclient.client.Client', autospec=True):
client = neutron_client(allowed_address_pairs.NEUTRON_VERSION)
client = neutron_client(neutron_base.NEUTRON_VERSION)
client.list_extensions.return_value = {
'extensions': [
{'alias': allowed_address_pairs.AAP_EXT_ALIAS},
{'alias': allowed_address_pairs.SEC_GRP_EXT_ALIAS}
{'alias': neutron_base.SEC_GRP_EXT_ALIAS}
]
}
self.k_session = mock.patch(
'octavia.common.keystone.get_session').start()
self.driver = allowed_address_pairs.AllowedAddressPairsDriver()
def test_check_extensions_loaded(self):
list_extensions = self.driver.neutron_client.list_extensions
list_extensions.return_value = {
'extensions': [{'alias': 'blah'}]}
def test_check_aap_loaded(self):
self.driver._extensions = [{'alias': 'blah'}]
self.assertRaises(network_base.NetworkException,
self.driver._check_extensions_loaded)
def test_port_to_vip(self):
lb = dmh.generate_load_balancer_tree()
lb.vip.subnet_id = MOCK_SUBNET_ID
vip = self.driver._port_to_vip(MOCK_NEUTRON_PORT, lb)
self.assertIsInstance(vip, data_models.Vip)
self.assertEqual(
MOCK_NEUTRON_PORT['port']['fixed_ips'][0]['ip_address'],
vip.ip_address)
self.assertEqual(
MOCK_NEUTRON_PORT['port']['fixed_ips'][0]['subnet_id'],
vip.subnet_id)
self.assertEqual(MOCK_NEUTRON_PORT['port']['id'], vip.port_id)
self.assertEqual(lb.id, vip.load_balancer_id)
def test_nova_interface_to_octavia_interface(self):
nova_interface = MockNovaInterface()
nova_interface.net_id = '1'
nova_interface.port_id = '2'
nova_interface.fixed_ips = [{'ip_address': '10.0.0.1'}]
interface = self.driver._nova_interface_to_octavia_interface(
'3', nova_interface)
self.assertEqual('1', interface.network_id)
self.assertEqual('2', interface.port_id)
self.assertEqual('10.0.0.1', interface.ip_address)
self.driver._check_aap_loaded)
def test_get_interfaces_to_unplug(self):
if1 = MockNovaInterface()
if1.net_id = 'if1-net'
if1 = network_models.Interface()
if1.network_id = 'if1-net'
if1.port_id = 'if1-port'
if1.fixed_ips = [{'ip_address': '10.0.0.1'}]
if2 = MockNovaInterface()
if2.net_id = 'if2-net'
if1.fixed_ips = [network_models.FixedIP(ip_address='10.0.0.1')]
if2 = network_models.Interface()
if2.network_id = 'if2-net'
if2.port_id = 'if2-port'
if2.fixed_ips = [{'ip_address': '11.0.0.1'}]
if2.fixed_ips = [network_models.FixedIP(ip_address='11.0.0.1')]
interfaces = [if1, if2]
unpluggers = self.driver._get_interfaces_to_unplug(
interfaces, 'if1-net')
@ -134,9 +85,6 @@ class TestAllowedAddressPairsDriver(base.TestCase):
show_port.return_value = {'port': {'tenant_id': admin_tenant_id}}
self.driver.deallocate_vip(vip)
delete_port = self.driver.neutron_client.delete_port
delete_port.side_effect = neutron_exceptions.PortNotFoundClient
self.assertRaises(network_base.VIPConfigurationNotFound,
self.driver.deallocate_vip, vip)
delete_port.side_effect = TypeError
self.assertRaises(network_base.DeallocateVIPException,
self.driver.deallocate_vip, vip)
@ -152,184 +100,273 @@ class TestAllowedAddressPairsDriver(base.TestCase):
self.driver.deallocate_vip(vip)
self.assertFalse(delete_port.called)
def test_plug_vip(self):
def test_deallocate_vip_when_vip_port_not_found(self):
vip = data_models.Vip(port_id='1')
admin_tenant_id = 'octavia'
session_mock = mock.MagicMock()
session_mock.get_project_id.return_value = admin_tenant_id
self.k_session.return_value = session_mock
show_port = self.driver.neutron_client.show_port
show_port.side_effect = neutron_exceptions.PortNotFoundClient
self.assertRaises(network_base.VIPConfigurationNotFound,
self.driver.deallocate_vip, vip)
def test_plug_vip_errors_when_nova_cant_find_network_to_attach(self):
lb = dmh.generate_load_balancer_tree()
show_subnet = self.driver.neutron_client.show_subnet
show_subnet.return_value = {
'subnet': {
'id': lb.vip.subnet_id
}
}
list_security_groups = self.driver.neutron_client.list_security_groups
lsc_side_effect = [
None, {
'security_groups': [
{'id': 'lb-sec-grp1'}
]
}
]
list_security_groups.side_effect = lsc_side_effect
interface_attach = self.driver.nova_client.servers.interface_attach
interface_attach.side_effect = nova_exceptions.NotFound
lb = dmh.generate_load_balancer_tree()
self.assertRaises(network_base.PlugVIPException,
self.driver.plug_vip, lb, lb.vip)
interface_attach.side_effect = None
interface_attach.return_value = MOCK_NOVA_INTERFACE
def test_plug_vip_errors_when_neutron_cant_find_port_to_update(self):
lb = dmh.generate_load_balancer_tree()
show_subnet = self.driver.neutron_client.show_subnet
show_subnet.return_value = {
'subnet': {
'id': lb.vip.subnet_id
}
}
list_security_groups = self.driver.neutron_client.list_security_groups
lsc_side_effect = [
None, {
'security_groups': [
{'id': 'lb-sec-grp1'}
]
}
]
list_security_groups.side_effect = lsc_side_effect
interface_attach = self.driver.nova_client.servers.interface_attach
interface_attach.return_value = n_constants.MOCK_NOVA_INTERFACE
update_port = self.driver.neutron_client.update_port
update_port.side_effect = neutron_exceptions.PortNotFoundClient
self.assertRaises(network_base.PortNotFound,
self.driver.plug_vip, lb, lb.vip)
update_port.side_effect = TypeError
self.assertRaises(network_base.PlugVIPException,
self.driver.plug_vip, lb, lb.vip)
update_port.side_effect = None
update_port.reset_mock()
def test_plug_vip(self):
lb = dmh.generate_load_balancer_tree()
show_subnet = self.driver.neutron_client.show_subnet
show_subnet.return_value = {
'subnet': {
'id': lb.vip.subnet_id
}
}
interface_attach = self.driver.nova_client.servers.interface_attach
interface_attach.return_value = n_constants.MOCK_NOVA_INTERFACE
list_security_groups = self.driver.neutron_client.list_security_groups
list_security_groups.return_value = {
'security_groups': [
{'id': 'lb-sec-grp1'}
]
}
mock_ip = MOCK_NOVA_INTERFACE.fixed_ips[0].get('ip_address')
update_port = self.driver.neutron_client.update_port
expected_aap = {'port': {'allowed_address_pairs':
[{'ip_address': lb.vip.ip_address}]}}
interface_list = self.driver.nova_client.servers.interface_list
if1 = MOCK_NOVA_INTERFACE
if2 = MockNovaInterface()
if1 = n_constants.MOCK_NOVA_INTERFACE
if2 = n_constants.MockNovaInterface()
if2.net_id = '3'
if2.port_id = '4'
if2.fixed_ips = [{'ip_address': '10.0.0.2'}]
if1.fixed_ips = [{'ip_address': n_constants.MOCK_IP_ADDRESS,
'subnet_id': lb.vip.subnet_id}]
interface_list.return_value = [if1, if2]
amps = self.driver.plug_vip(lb, lb.vip)
self.assertEqual(3, update_port.call_count)
self.assertEqual(5, update_port.call_count)
update_port.assert_any_call(if1.port_id, expected_aap)
for amp in amps:
self.assertEqual(mock_ip, amp.vrrp_ip)
self.assertEqual(n_constants.MOCK_IP_ADDRESS, amp.vrrp_ip)
self.assertEqual(lb.vip.ip_address, amp.ha_ip)
self.assertIn(amp.id, [lb.amphorae[0].id, lb.amphorae[1].id])
def test_allocate_vip(self):
def test_allocate_vip_when_port_already_provided(self):
fake_lb_vip = data_models.Vip()
fake_lb = data_models.LoadBalancer(id='1', vip=fake_lb_vip)
self.assertRaises(network_base.AllocateVIPException,
self.driver.allocate_vip, fake_lb)
show_port = self.driver.neutron_client.show_port
show_port.return_value = MOCK_NEUTRON_PORT
fake_lb_vip = data_models.Vip(port_id=MOCK_NEUTRON_PORT['port']['id'],
subnet_id=MOCK_SUBNET_ID)
fake_lb = data_models.LoadBalancer(id='1', vip=fake_lb_vip)
vip = self.driver.allocate_vip(fake_lb)
self.assertIsInstance(vip, data_models.Vip)
self.assertEqual(
MOCK_NEUTRON_PORT['port']['fixed_ips'][0]['ip_address'],
vip.ip_address)
self.assertEqual(
MOCK_NEUTRON_PORT['port']['fixed_ips'][0]['subnet_id'],
vip.subnet_id)
self.assertEqual(MOCK_NEUTRON_PORT['port']['id'], vip.port_id)
self.assertEqual(fake_lb.id, vip.load_balancer_id)
create_port = self.driver.neutron_client.create_port
create_port.return_value = MOCK_NEUTRON_PORT
show_port.return_value = n_constants.MOCK_NEUTRON_PORT
fake_lb_vip = data_models.Vip(
subnet_id=MOCK_NEUTRON_PORT['port']['fixed_ips'][0]['subnet_id'])
port_id=n_constants.MOCK_PORT_ID,
subnet_id=n_constants.MOCK_SUBNET_ID)
fake_lb = data_models.LoadBalancer(id='1', vip=fake_lb_vip)
vip = self.driver.allocate_vip(fake_lb)
self.assertIsInstance(vip, data_models.Vip)
self.assertEqual(
MOCK_NEUTRON_PORT['port']['fixed_ips'][0]['ip_address'],
vip.ip_address)
self.assertEqual(
MOCK_NEUTRON_PORT['port']['fixed_ips'][0]['subnet_id'],
vip.subnet_id)
self.assertEqual(MOCK_NEUTRON_PORT['port']['id'], vip.port_id)
self.assertEqual(n_constants.MOCK_IP_ADDRESS, vip.ip_address)
self.assertEqual(n_constants.MOCK_SUBNET_ID, vip.subnet_id)
self.assertEqual(n_constants.MOCK_PORT_ID, vip.port_id)
self.assertEqual(fake_lb.id, vip.load_balancer_id)
def test_unplug_vip(self):
def test_allocate_vip_when_only_subnet_provided(self):
create_port = self.driver.neutron_client.create_port
create_port.return_value = n_constants.MOCK_NEUTRON_PORT
fake_lb_vip = data_models.Vip(subnet_id=n_constants.MOCK_SUBNET_ID)
fake_lb = data_models.LoadBalancer(id='1', vip=fake_lb_vip)
vip = self.driver.allocate_vip(fake_lb)
self.assertIsInstance(vip, data_models.Vip)
self.assertEqual(n_constants.MOCK_IP_ADDRESS, vip.ip_address)
self.assertEqual(n_constants.MOCK_SUBNET_ID, vip.subnet_id)
self.assertEqual(n_constants.MOCK_PORT_ID, vip.port_id)
self.assertEqual(fake_lb.id, vip.load_balancer_id)
def test_unplug_vip_errors_when_update_port_cant_find_port(self):
lb = dmh.generate_load_balancer_tree()
interface_list = self.driver.nova_client.servers.interface_list
interface_list.reset_mock()
self.driver.neutron_client.show_subnet.return_value = MOCK_SUBNET
if1 = MOCK_NOVA_INTERFACE
if2 = MockNovaInterface()
if2.net_id = '3'
if2.port_id = '4'
if2.fixed_ips = [{'ip_address': '10.0.0.2'}]
interface_list.return_value = [if1, if2]
list_ports = self.driver.neutron_client.list_ports
show_subnet = self.driver.neutron_client.show_subnet
show_subnet.return_value = n_constants.MOCK_SUBNET
port1 = n_constants.MOCK_NEUTRON_PORT['port']
port2 = {
'id': '4', 'network_id': '3', 'fixed_ips':
[{'ip_address': '10.0.0.2'}]
}
list_ports.return_value = {'ports': [port1, port2]}
update_port = self.driver.neutron_client.update_port
update_port.side_effect = neutron_exceptions.PortNotFoundClient
self.assertRaises(network_base.UnplugVIPException,
self.driver.unplug_vip, lb, lb.vip)
def test_unplug_vip_errors_when_update_port_fails(self):
lb = dmh.generate_load_balancer_tree()
show_subnet = self.driver.neutron_client.show_subnet
show_subnet.return_value = n_constants.MOCK_SUBNET
port1 = n_constants.MOCK_NEUTRON_PORT['port']
port2 = {
'id': '4', 'network_id': '3', 'fixed_ips':
[{'ip_address': '10.0.0.2'}]
}
list_ports = self.driver.neutron_client.list_ports
list_ports.return_value = {'ports': [port1, port2]}
update_port = self.driver.neutron_client.update_port
update_port.side_effect = TypeError
self.assertRaises(network_base.UnplugVIPException,
self.driver.unplug_vip, lb, lb.vip)
update_port.side_effect = None
update_port.reset_mock()
def test_unplug_vip_errors_when_vip_subnet_not_found(self):
lb = dmh.generate_load_balancer_tree()
show_subnet = self.driver.neutron_client.show_subnet
show_subnet.side_effect = neutron_exceptions.NotFound
self.assertRaises(network_base.PluggedVIPNotFound,
self.driver.unplug_vip, lb, lb.vip)
def test_unplug_vip(self):
lb = dmh.generate_load_balancer_tree()
show_subnet = self.driver.neutron_client.show_subnet
show_subnet.return_value = n_constants.MOCK_SUBNET
update_port = self.driver.neutron_client.update_port
port1 = n_constants.MOCK_NEUTRON_PORT['port']
port2 = {
'id': '4', 'network_id': '3', 'fixed_ips':
[{'ip_address': '10.0.0.2'}]
}
list_ports = self.driver.neutron_client.list_ports
list_ports.return_value = {'ports': [port1, port2]}
self.driver.unplug_vip(lb, lb.vip)
self.assertEqual(len(lb.amphorae), update_port.call_count)
clear_aap = {'port': {'allowed_address_pairs': []}}
update_port.assert_has_calls([mock.call(if1.port_id, clear_aap),
mock.call(if1.port_id, clear_aap)])
update_port.assert_has_calls([mock.call(port1.get('id'), clear_aap),
mock.call(port1.get('id'), clear_aap)])
def test_plug_network(self):
amp_id = '1'
net_id = MOCK_NOVA_INTERFACE.net_id
def test_plug_network_when_compute_instance_cant_be_found(self):
net_id = n_constants.MOCK_NOVA_INTERFACE.net_id
interface_attach = self.driver.nova_client.servers.interface_attach
interface_attach.side_effect = nova_exceptions.NotFound(
404, message='Instance not found')
self.assertRaises(network_base.AmphoraNotFound,
self.driver.plug_network, amp_id, net_id)
self.driver.plug_network,
n_constants.MOCK_COMPUTE_ID, net_id)
def test_plug_network_when_network_cant_be_found(self):
net_id = n_constants.MOCK_NOVA_INTERFACE.net_id
interface_attach = self.driver.nova_client.servers.interface_attach
interface_attach.side_effect = nova_exceptions.NotFound(
404, message='Network not found')
self.assertRaises(network_base.NetworkException,
self.driver.plug_network, amp_id, net_id)
self.driver.plug_network,
n_constants.MOCK_COMPUTE_ID, net_id)
def test_plug_network_when_interface_attach_fails(self):
net_id = n_constants.MOCK_NOVA_INTERFACE.net_id
interface_attach = self.driver.nova_client.servers.interface_attach
interface_attach.side_effect = TypeError
self.assertRaises(network_base.PlugNetworkException,
self.driver.plug_network, amp_id, net_id)
interface_attach.side_effect = None
interface_attach.return_value = MOCK_NOVA_INTERFACE
oct_interface = self.driver.plug_network(amp_id, net_id)
self.assertEqual(MOCK_NOVA_INTERFACE.fixed_ips[0].get('ip_address'),
oct_interface.ip_address)
self.assertEqual(amp_id, oct_interface.amphora_id)
self.driver.plug_network,
n_constants.MOCK_COMPUTE_ID, net_id)
def test_plug_network(self):
net_id = n_constants.MOCK_NOVA_INTERFACE.net_id
interface_attach = self.driver.nova_client.servers.interface_attach
interface_attach.return_value = n_constants.MOCK_NOVA_INTERFACE
oct_interface = self.driver.plug_network(
n_constants.MOCK_COMPUTE_ID, net_id)
exp_ips = [fixed_ip.get('ip_address')
for fixed_ip in n_constants.MOCK_NOVA_INTERFACE.fixed_ips]
actual_ips = [fixed_ip.ip_address
for fixed_ip in oct_interface.fixed_ips]
self.assertEqual(exp_ips, actual_ips)
self.assertEqual(n_constants.MOCK_COMPUTE_ID, oct_interface.compute_id)
self.assertEqual(net_id, oct_interface.network_id)
def test_get_plugged_networks(self):
amp_id = '1'
interface_list = self.driver.nova_client.servers.interface_list
interface_list.side_effect = TypeError
self.assertRaises(network_base.NetworkException,
self.driver.get_plugged_networks, amp_id)
interface_list.side_effect = None
interface_list.reset_mock()
if1 = MOCK_NOVA_INTERFACE
if2 = MockNovaInterface()
if2.net_id = '3'
if2.port_id = '4'
if2.fixed_ips = [{'ip_address': '10.0.0.2'}]
interface_list.return_value = [if1, if2]
plugged_networks = self.driver.get_plugged_networks(amp_id)
for pn in plugged_networks:
self.assertIn(pn.port_id, [if1.port_id, if2.port_id])
self.assertIn(pn.network_id, [if1.net_id, if2.net_id])
self.assertIn(pn.ip_address, [if1.fixed_ips[0]['ip_address'],
if2.fixed_ips[0]['ip_address']])
def test_unplug_network(self):
amp_id = '1'
net_id = MOCK_NOVA_INTERFACE.net_id
interface_list = self.driver.nova_client.servers.interface_list
interface_list.side_effect = nova_exceptions.NotFound(404)
def test_unplug_network_when_compute_port_cant_be_found(self):
net_id = n_constants.MOCK_NOVA_INTERFACE.net_id
list_ports = self.driver.neutron_client.list_ports
list_ports.return_value = {'ports': []}
self.assertRaises(network_base.AmphoraNotFound,
self.driver.unplug_network, amp_id, net_id)
interface_list.side_effect = Exception
self.driver.unplug_network,
n_constants.MOCK_COMPUTE_ID, net_id)
def test_unplug_network_when_list_ports_fails(self):
net_id = n_constants.MOCK_NOVA_INTERFACE.net_id
list_ports = self.driver.neutron_client.list_ports
list_ports.side_effect = Exception
self.assertRaises(network_base.NetworkException,
self.driver.unplug_network, amp_id, net_id)
interface_list.side_effect = None
interface_list.reset_mock()
if1 = MockNovaInterface()
if1.net_id = 'if1-net'
if1.port_id = 'if1-port'
if1.fixed_ips = [{'ip_address': '10.0.0.1'}]
if2 = MockNovaInterface()
if2.net_id = 'if2-net'
if2.port_id = 'if2-port'
if2.fixed_ips = [{'ip_address': '11.0.0.1'}]
interface_list.return_value = [if1, if2]
self.driver.unplug_network,
n_constants.MOCK_COMPUTE_ID, net_id)
def test_unplug_network_when_interface_detach_fails(self):
list_ports = self.driver.neutron_client.list_ports
port1 = n_constants.MOCK_NEUTRON_PORT['port']
port2 = {
'id': '4', 'network_id': '3', 'fixed_ips':
[{'ip_address': '10.0.0.2'}]
}
list_ports.return_value = {'ports': [port1, port2]}
interface_detach = self.driver.nova_client.servers.interface_detach
interface_detach.side_effect = Exception
self.assertRaises(network_base.UnplugNetworkException,
self.driver.unplug_network, amp_id, if2.net_id)
interface_detach.side_effect = None
interface_detach.reset_mock()
self.driver.unplug_network(amp_id, if2.net_id)
interface_detach.assert_called_once_with(server=amp_id,
port_id=if2.port_id)
self.driver.unplug_network,
n_constants.MOCK_COMPUTE_ID,
port2.get('network_id'))
def test_unplug_network(self):
list_ports = self.driver.neutron_client.list_ports
port1 = n_constants.MOCK_NEUTRON_PORT['port']
port2 = {
'id': '4', 'network_id': '3', 'fixed_ips':
[{'ip_address': '10.0.0.2'}]
}
list_ports.return_value = {'ports': [port1, port2]}
interface_detach = self.driver.nova_client.servers.interface_detach
self.driver.unplug_network(n_constants.MOCK_COMPUTE_ID,
port2.get('network_id'))
interface_detach.assert_called_once_with(
server=n_constants.MOCK_COMPUTE_ID, port_id=port2.get('id'))
def test_update_vip(self):
listeners = [data_models.Listener(protocol_port=80),
@ -381,43 +418,3 @@ class TestAllowedAddressPairsDriver(base.TestCase):
self.driver.update_vip(lb)
delete_rule.assert_called_once_with('rule-22')
self.assertFalse(create_rule.called)
def test_get_network(self):
show_network = self.driver.neutron_client.show_network
show_network.return_value = {'network': {'id': MOCK_NETWORK_ID,
'subnets': [MOCK_SUBNET_ID]}}
network = self.driver.get_network(MOCK_NETWORK_ID)
self.assertIsInstance(network, network_models.Network)
self.assertEqual(MOCK_NETWORK_ID, network.id)
self.assertEqual(1, len(network.subnets))
self.assertEqual(MOCK_SUBNET_ID, network.subnets[0])
def test_get_subnet(self):
show_subnet = self.driver.neutron_client.show_subnet
show_subnet.return_value = {'subnet': {'id': MOCK_SUBNET_ID,
'gateway_ip': MOCK_IP_ADDRESS,
'cidr': MOCK_CIDR}}
subnet = self.driver.get_subnet(MOCK_SUBNET_ID)
self.assertIsInstance(subnet, network_models.Subnet)
self.assertEqual(MOCK_SUBNET_ID, subnet.id)
self.assertEqual(MOCK_IP_ADDRESS, subnet.gateway_ip)
self.assertEqual(MOCK_CIDR, subnet.cidr)
def test_get_port(self):
show_port = self.driver.neutron_client.show_port
show_port.return_value = {'port': {'id': MOCK_PORT_ID,
'mac_address': MOCK_MAC_ADDR,
'network_id': MOCK_NETWORK_ID,
'fixed_ips': [{
'subnet_id': MOCK_SUBNET_ID,
'ip_address': MOCK_IP_ADDRESS
}]}}
port = self.driver.get_port(MOCK_PORT_ID)
self.assertIsInstance(port, network_models.Port)
self.assertEqual(MOCK_PORT_ID, port.id)
self.assertEqual(MOCK_MAC_ADDR, port.mac_address)
self.assertEqual(MOCK_NETWORK_ID, port.network_id)
self.assertEqual(1, len(port.fixed_ips))
self.assertIsInstance(port.fixed_ips[0], network_models.FixedIP)
self.assertEqual(MOCK_SUBNET_ID, port.fixed_ips[0].subnet_id)
self.assertEqual(MOCK_IP_ADDRESS, port.fixed_ips[0].ip_address)

View File

@ -0,0 +1,151 @@
# Copyright 2015 Rackspace
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import mock
from octavia.common import data_models
from octavia.network import data_models as network_models
from octavia.network.drivers.neutron import base as neutron_base
from octavia.network.drivers.neutron import utils
from octavia.tests.common import data_model_helpers as dmh
from octavia.tests.unit import base
from octavia.tests.unit.network.drivers.neutron import constants as n_constants
class TestBaseNeutronNetworkDriver(base.TestCase):
def _instantiate_partial_abc(self, abclass):
if "__abstractmethods__" not in abclass.__dict__:
return abclass()
new_dict = abclass.__dict__.copy()
for abstractmethod in abclass.__abstractmethods__:
new_dict[abstractmethod] = lambda x, *args, **kw: (x, args, kw)
impl_class = type("partially_implemented_abc_%s" % abclass.__name__,
(abclass,), new_dict)
return impl_class()
def setUp(self):
super(TestBaseNeutronNetworkDriver, self).setUp()
with mock.patch('neutronclient.neutron.client.Client',
autospec=True) as neutron_client:
client = neutron_client(neutron_base.NEUTRON_VERSION)
client.list_extensions.return_value = {
'extensions': [
{'alias': neutron_base.SEC_GRP_EXT_ALIAS}
]
}
self.k_session = mock.patch(
'octavia.common.keystone.get_session').start()
self.driver = self._instantiate_partial_abc(
neutron_base.BaseNeutronDriver)
def test__port_to_vip(self):
lb = dmh.generate_load_balancer_tree()
lb.vip.subnet_id = n_constants.MOCK_SUBNET_ID
port = utils.convert_port_dict_to_model(n_constants.MOCK_NEUTRON_PORT)
vip = self.driver._port_to_vip(port, lb)
self.assertIsInstance(vip, data_models.Vip)
self.assertEqual(n_constants.MOCK_IP_ADDRESS, vip.ip_address)
self.assertEqual(n_constants.MOCK_SUBNET_ID, vip.subnet_id)
self.assertEqual(n_constants.MOCK_PORT_ID, vip.port_id)
self.assertEqual(lb.id, vip.load_balancer_id)
def test__nova_interface_to_octavia_interface(self):
nova_interface = n_constants.MockNovaInterface()
nova_interface.net_id = '1'
nova_interface.port_id = '2'
nova_interface.fixed_ips = [{'ip_address': '10.0.0.1'}]
interface = self.driver._nova_interface_to_octavia_interface(
'3', nova_interface)
self.assertEqual('1', interface.network_id)
self.assertEqual('2', interface.port_id)
ips = [fixed_ip.ip_address for fixed_ip in interface.fixed_ips]
self.assertIn('10.0.0.1', ips)
def test_get_plugged_networks(self):
list_ports = self.driver.neutron_client.list_ports
list_ports.side_effect = TypeError
o_ifaces = self.driver.get_plugged_networks(
n_constants.MOCK_COMPUTE_ID)
self.assertEqual(0, len(o_ifaces))
list_ports.side_effect = None
list_ports.reset_mock()
port1 = n_constants.MOCK_NEUTRON_PORT['port']
port2 = {
'id': '4', 'network_id': '3', 'fixed_ips':
[{'ip_address': '10.0.0.2'}]
}
list_ports.return_value = {'ports': [port1, port2]}
plugged_networks = self.driver.get_plugged_networks(
n_constants.MOCK_COMPUTE_ID)
for pn in plugged_networks:
self.assertIn(pn.port_id, [port1.get('id'), port2.get('id')])
self.assertIn(pn.network_id, [port1.get('network_id'),
port2.get('network_id')])
for fixed_ip in pn.fixed_ips:
self.assertIn(fixed_ip.ip_address,
[port1['fixed_ips'][0]['ip_address'],
port2['fixed_ips'][0]['ip_address']])
def test_sec_grps_extension_check(self):
self.driver._check_sec_grps()
self.assertTrue(self.driver.sec_grp_enabled)
self.driver._extensions = [{'alias': 'blah'}]
self.driver._check_sec_grps()
self.assertFalse(self.driver.sec_grp_enabled)
def test_get_network(self):
show_network = self.driver.neutron_client.show_network
show_network.return_value = {'network': {
'id': n_constants.MOCK_NETWORK_ID,
'subnets': [n_constants.MOCK_SUBNET_ID]}}
network = self.driver.get_network(n_constants.MOCK_NETWORK_ID)
self.assertIsInstance(network, network_models.Network)
self.assertEqual(n_constants.MOCK_NETWORK_ID, network.id)
self.assertEqual(1, len(network.subnets))
self.assertEqual(n_constants.MOCK_SUBNET_ID, network.subnets[0])
def test_get_subnet(self):
show_subnet = self.driver.neutron_client.show_subnet
show_subnet.return_value = {'subnet': {
'id': n_constants.MOCK_SUBNET_ID,
'gateway_ip': n_constants.MOCK_IP_ADDRESS,
'cidr': n_constants.MOCK_CIDR}}
subnet = self.driver.get_subnet(n_constants.MOCK_SUBNET_ID)
self.assertIsInstance(subnet, network_models.Subnet)
self.assertEqual(n_constants.MOCK_SUBNET_ID, subnet.id)
self.assertEqual(n_constants.MOCK_IP_ADDRESS, subnet.gateway_ip)
self.assertEqual(n_constants.MOCK_CIDR, subnet.cidr)
def test_get_port(self):
show_port = self.driver.neutron_client.show_port
show_port.return_value = {'port': {
'id': n_constants.MOCK_PORT_ID,
'mac_address': n_constants.MOCK_MAC_ADDR,
'network_id': n_constants.MOCK_NETWORK_ID,
'fixed_ips': [{
'subnet_id': n_constants.MOCK_SUBNET_ID,
'ip_address': n_constants.MOCK_IP_ADDRESS
}]}}
port = self.driver.get_port(n_constants.MOCK_PORT_ID)
self.assertIsInstance(port, network_models.Port)
self.assertEqual(n_constants.MOCK_PORT_ID, port.id)
self.assertEqual(n_constants.MOCK_MAC_ADDR, port.mac_address)
self.assertEqual(n_constants.MOCK_NETWORK_ID, port.network_id)
self.assertEqual(1, len(port.fixed_ips))
self.assertIsInstance(port.fixed_ips[0], network_models.FixedIP)
self.assertEqual(n_constants.MOCK_SUBNET_ID,
port.fixed_ips[0].subnet_id)
self.assertEqual(n_constants.MOCK_IP_ADDRESS,
port.fixed_ips[0].ip_address)

View File

@ -66,7 +66,7 @@ New data models:
* id
* network_id - (neutron subnet)
* amphora_id
* ip_address - (IPv4 or IPv6)
* fixed_ips
* class Delta
* amphora_id
@ -157,7 +157,7 @@ class AbstractNetworkDriver
* vip = instance of a VIP
* returns list of Amphora
* raises PlugVIPException
* raises PlugVIPException, PortNotFound
* unplug_vip(loadbalancer, vip)
@ -174,7 +174,7 @@ class AbstractNetworkDriver
connection of a load balancer.
* loadbalancer = instance of a data_models.LoadBalancer
* returns VIP instance
* raises AllocateVIPException, PortNotFound, NetworkNotFound
* raises AllocateVIPException, PortNotFound, SubnetNotFound
* deallocate_vip(vip)
@ -183,31 +183,31 @@ class AbstractNetworkDriver
* returns None
* raises DeallocateVIPException, VIPInUse, VIPConfigurationNotFound
* plug_network(amphora_id, network_id, ip_address=None)
* plug_network(compute_id, network_id, ip_address=None)
* Connects an existing amphora to an existing network.
* amphora = id of an amphora in the compute service
* compute_id = id of an amphora in the compute service
* network_id = id of the network to attach
* ip_address = ip address to attempt to be assigned to interface
* returns Interface instance
* raises PlugNetworkException, AmphoraNotFound, NetworkNotFound
* unplug_network(amphora_id, network_id, ip_address=None)
* unplug_network(compute_id, network_id, ip_address=None)
* Disconnects an existing amphora from an existing network. If ip_address
is not specified then all interfaces on that network will be unplugged.
* amphora = id of an amphora in the compute service to unplug
* compute_id = id of an amphora in the compute service to unplug
* network_id = id of network to unplug amphora
* ip_address = ip address of interface to unplug
* returns None
* raises UnplugNetworkException, AmphoraNotFound, NetworkNotFound
* raises UnplugNetworkException, AmphoraNotFound, NetworkNotFound,
NetworkException
* get_plugged_networks(amphora_id):
* get_plugged_networks(compute_id):
* Retrieves the current plugged networking configuration
* amphora_id = id of an amphora in the compute service
* compute_id = id of an amphora in the compute service
* returns = list of Instance instances
* raises AmphoraNotFound
* update_vip(loadbalancer):