Fixes failover flow with namespace driver

This patch updates the haproxy service scripts to handle the case
where the network interfaces have not yet been plugged.  This can
occur in a failover situation.
This patch also makes sure we don't move the management lan interface
into the network namespace.

Closes-Bug: #1509706
Closes-Bug: #1577963
Change-Id: I04d267bd3cdedca11f0350c5255086233cba14ec
This commit is contained in:
Michael Johnson 2016-05-03 22:30:42 +00:00
parent d30d18656e
commit 7ba33e6ee4
26 changed files with 521 additions and 134 deletions

View File

@ -280,6 +280,8 @@ Use the following options in the /etc/octavia/octavia.conf file.
- (IntOpt) The maximum attempts to retry an action with the networking service.
* - ``retry_interval`` = ``1``
- (IntOpt) Seconds to wait before retrying an action with the networking service.
* - ``port_detach_timeout`` = ``300``
- (IntOpt) Seconds to wait for a port to detach from an amphora.
* - **[neutron]**
-
* - ``ca_certificates_file`` = ``None``

View File

@ -101,6 +101,8 @@
# max_retries = 15
# Seconds to wait before retrying an action with the networking service.
# retry_interval = 1
# The maximum time to wait, in seconds, for a port to detach from an amphora
# port_detach_timeout = 300
[haproxy_amphora]
# base_path = /var/lib/octavia

View File

@ -69,14 +69,24 @@ def plug_vip(vip, subnet_cidr, gateway, mac_address, vrrp_ip=None):
return flask.make_response(flask.jsonify(dict(
message="Invalid VIP")), 400)
interface = _interface_by_mac(mac_address)
primary_interface = "{interface}".format(interface=interface)
secondary_interface = "{interface}:0".format(interface=interface)
# Check if the interface is already in the network namespace
# Do not attempt to re-plug the VIP if it is already in the
# network namespace
if _netns_interface_exists(mac_address):
return flask.make_response(flask.jsonify(dict(
message="Interface already exists")), 409)
# This is the interface prior to moving into the netns
default_netns_interface = _interface_by_mac(mac_address)
# Always put the VIP interface as eth1
primary_interface = consts.NETNS_PRIMARY_INTERFACE
secondary_interface = "{interface}:0".format(interface=primary_interface)
# We need to setup the netns network directory so that the ifup
# commands used here and in the startup scripts "sees" the right
# interfaces and scripts.
interface_file_path = util.get_network_interface_file(interface)
interface_file_path = util.get_network_interface_file(primary_interface)
os.makedirs('/etc/netns/' + consts.AMPHORA_NAMESPACE)
shutil.copytree('/etc/network',
'/etc/netns/{}/network'.format(consts.AMPHORA_NAMESPACE),
@ -109,7 +119,7 @@ def plug_vip(vip, subnet_cidr, gateway, mac_address, vrrp_ip=None):
with os.fdopen(os.open(interface_file_path, flags, mode),
'w') as text_file:
text = template_vip.render(
interface=interface,
interface=primary_interface,
vip=vip,
vip_ipv6=ip.version is 6,
broadcast=broadcast,
@ -122,7 +132,7 @@ def plug_vip(vip, subnet_cidr, gateway, mac_address, vrrp_ip=None):
# Update the list of interfaces to add to the namespace
# This is used in the amphora reboot case to re-establish the namespace
_update_plugged_interfaces_file(interface, mac_address)
_update_plugged_interfaces_file(primary_interface, mac_address)
# Create the namespace
netns = pyroute2.NetNS(consts.AMPHORA_NAMESPACE, flags=os.O_CREAT)
@ -130,8 +140,9 @@ def plug_vip(vip, subnet_cidr, gateway, mac_address, vrrp_ip=None):
with pyroute2.IPRoute() as ipr:
# Move the interfaces into the namespace
idx = ipr.link_lookup(ifname=primary_interface)[0]
ipr.link('set', index=idx, net_ns_fd=consts.AMPHORA_NAMESPACE)
idx = ipr.link_lookup(ifname=default_netns_interface)[0]
ipr.link('set', index=idx, net_ns_fd=consts.AMPHORA_NAMESPACE,
IFLA_IFNAME=primary_interface)
# bring interfaces up
_bring_if_down(primary_interface)
@ -142,10 +153,18 @@ def plug_vip(vip, subnet_cidr, gateway, mac_address, vrrp_ip=None):
return flask.make_response(flask.jsonify(dict(
message="OK",
details="VIP {vip} plugged on interface {interface}".format(
vip=vip, interface=interface))), 202)
vip=vip, interface=primary_interface))), 202)
def plug_network(mac_address, fixed_ips):
# Check if the interface is already in the network namespace
# Do not attempt to re-plug the network if it is already in the
# network namespace
if _netns_interface_exists(mac_address):
return flask.make_response(flask.jsonify(dict(
message="Interface already exists")), 409)
# This is the interface as it was initially plugged into the
# default network namespace, this will likely always be eth1
default_netns_interface = _interface_by_mac(mac_address)
@ -283,3 +302,12 @@ def _update_plugged_interfaces_file(interface, mac_address):
if mac_address not in inf_list:
text_file.write("{mac_address} {interface}\n".format(
mac_address=mac_address, interface=interface))
def _netns_interface_exists(mac_address):
with pyroute2.NetNS(consts.AMPHORA_NAMESPACE, flags=os.O_CREAT) as netns:
for link in netns.get_links():
for attr in link['attrs']:
if attr[0] == 'IFLA_ADDRESS' and attr[1] == mac_address:
return True
return False

View File

@ -53,9 +53,9 @@ haproxy_start()
# Re-add the namespace
ip netns add {{ amphora_nsname }} || true
# We need the plugged_interfaces file sorted to join the host interfaces
sort -k 1 /var/lib/octavia/plugged_interfaces > /var/lib/octavia/plugged_interfaces.sorted
sort -k 1 /var/lib/octavia/plugged_interfaces > /var/lib/octavia/plugged_interfaces.sorted || true
# Assign the interfaces into the namespace with the appropriate name
ip link | awk '{getline n; print $0,n}' | awk '{sub(":","",$2)} {print $17 " " $2}' | sort -k 1 | join -j 1 - /var/lib/octavia/plugged_interfaces.sorted | awk '{system("ip link set "$2" netns {{ amphora_nsname }} name "$3"")}'
ip link | awk '{getline n; print $0,n}' | awk '{sub(":","",$2)} {print $17 " " $2}' | sort -k 1 | join -j 1 - /var/lib/octavia/plugged_interfaces.sorted | awk '{system("ip link set "$2" netns {{ amphora_nsname }} name "$3"")}' || true
# Bring up all of the namespace interfaces
ip netns exec {{ amphora_nsname }} ifup -a || true

View File

@ -35,9 +35,9 @@ pre-start script
ip netns add {{ amphora_nsname }} || true
# We need the plugged_interfaces file sorted to join with the host
# interfaces
sort -k 1 /var/lib/octavia/plugged_interfaces > /var/lib/octavia/plugged_interfaces.sorted
sort -k 1 /var/lib/octavia/plugged_interfaces > /var/lib/octavia/plugged_interfaces.sorted || true
# Assign the interfaces into the namespace with the appropriate name
ip link | awk '{getline n; print $0,n}' | awk '{sub(":","",$2)} {print $17 " " $2}' | sort -k 1 | join -j 1 - /var/lib/octavia/plugged_interfaces.sorted | awk '{system("ip link set "$2" netns {{ amphora_nsname }} name "$3"")}'
ip link | awk '{getline n; print $0,n}' | awk '{sub(":","",$2)} {print $17 " " $2}' | sort -k 1 | join -j 1 - /var/lib/octavia/plugged_interfaces.sorted | awk '{system("ip link set "$2" netns {{ amphora_nsname }} name "$3"")}' || true
# Bring up all of the namespace interfaces
ip netns exec {{ amphora_nsname }} ifup -a || true
end script

View File

@ -135,7 +135,7 @@ class AmphoraLoadBalancerDriver(object):
"""
pass
def post_vip_plug(self, load_balancer, amphorae_network_config):
def post_vip_plug(self, amphora, load_balancer, amphorae_network_config):
"""Called after network driver has allocated and plugged the VIP
:param load_balancer: A load balancer that just had its vip allocated

View File

@ -113,27 +113,32 @@ class HaproxyAmphoraLoadBalancerDriver(
def finalize_amphora(self, amphora):
pass
def post_vip_plug(self, load_balancer, amphorae_network_config):
for amp in load_balancer.amphorae:
if amp.status != constants.DELETED:
subnet = amphorae_network_config.get(amp.id).vip_subnet
# NOTE(blogan): using the vrrp port here because that
# is what the allowed address pairs network driver sets
# this particular port to. This does expose a bit of
# tight coupling between the network driver and amphora
# driver. We will need to revisit this to try and remove
# this tight coupling.
# NOTE (johnsom): I am loading the vrrp_ip into the
# net_info structure here so that I don't break
# compatibility with old amphora agent versions.
port = amphorae_network_config.get(amp.id).vrrp_port
net_info = {'subnet_cidr': subnet.cidr,
'gateway': subnet.gateway_ip,
'mac_address': port.mac_address,
'vrrp_ip': amp.vrrp_ip}
self.client.plug_vip(amp,
def post_vip_plug(self, amphora, load_balancer, amphorae_network_config):
if amphora.status != constants.DELETED:
subnet = amphorae_network_config.get(amphora.id).vip_subnet
# NOTE(blogan): using the vrrp port here because that
# is what the allowed address pairs network driver sets
# this particular port to. This does expose a bit of
# tight coupling between the network driver and amphora
# driver. We will need to revisit this to try and remove
# this tight coupling.
# NOTE (johnsom): I am loading the vrrp_ip into the
# net_info structure here so that I don't break
# compatibility with old amphora agent versions.
port = amphorae_network_config.get(amphora.id).vrrp_port
net_info = {'subnet_cidr': subnet.cidr,
'gateway': subnet.gateway_ip,
'mac_address': port.mac_address,
'vrrp_ip': amphora.vrrp_ip}
try:
self.client.plug_vip(amphora,
load_balancer.vip.ip_address,
net_info)
except exc.Conflict:
LOG.warning(_LW('VIP with MAC {mac} already exists on '
'amphora, skipping post_vip_plug').format(
mac=port.mac_address))
def post_network_plug(self, amphora, port):
fixed_ips = []
@ -143,7 +148,12 @@ class HaproxyAmphoraLoadBalancerDriver(
fixed_ips.append(ip)
port_info = {'mac_address': port.mac_address,
'fixed_ips': fixed_ips}
self.client.plug_network(amphora, port_info)
try:
self.client.plug_network(amphora, port_info)
except exc.Conflict:
LOG.warning(_LW('Network with MAC {mac} already exists on '
'amphora, skipping post_network_plug').format(
mac=port.mac_address))
def get_vrrp_interface(self, amphora):
return self.client.get_interface(amphora, amphora.vrrp_ip)['interface']

View File

@ -86,7 +86,7 @@ class NoopManager(object):
self.amphoraconfig[amphora.id, port.id] = (amphora.id, port.id,
'post_network_plug')
def post_vip_plug(self, load_balancer, amphorae_network_config):
def post_vip_plug(self, amphora, load_balancer, amphorae_network_config):
LOG.debug("Amphora %s no-op, post vip plug load balancer %s",
self.__class__.__name__, load_balancer.id)
self.amphoraconfig[(load_balancer.id, id(amphorae_network_config))] = (
@ -136,9 +136,10 @@ class NoopAmphoraLoadBalancerDriver(driver_base.AmphoraLoadBalancerDriver):
self.driver.post_network_plug(amphora, port)
def post_vip_plug(self, load_balancer, amphorae_network_config):
def post_vip_plug(self, amphora, load_balancer, amphorae_network_config):
self.driver.post_vip_plug(load_balancer, amphorae_network_config)
self.driver.post_vip_plug(amphora,
load_balancer, amphorae_network_config)
def upload_cert_amp(self, amphora, pem_file):

View File

@ -86,7 +86,10 @@ networking_opts = [
'networking service.')),
cfg.IntOpt('retry_interval', default=1,
help=_('Seconds to wait before retrying an action with the '
'networking service.'))
'networking service.')),
cfg.IntOpt('port_detach_timeout', default=300,
help=_('Seconds to wait for a port to detach from an '
'amphora.'))
]
healthmanager_opts = [

View File

@ -325,3 +325,5 @@ FLOW_DOC_TITLES = {'AmphoraFlows': 'Amphora Flows',
'HealthMonitorFlows': 'Health Monitor Flows',
'L7PolicyFlows': 'Layer 7 Policy Flows',
'L7RuleFlows': 'Layer 7 Rule Flows'}
NETNS_PRIMARY_INTERFACE = 'eth1'

View File

@ -283,13 +283,6 @@ class AmphoraFlows(object):
failover_amphora_flow = linear_flow.Flow(
constants.FAILOVER_AMPHORA_FLOW)
failover_amphora_flow.add(
network_tasks.RetrievePortIDsOnAmphoraExceptLBNetwork(
rebind={constants.AMPHORA: constants.FAILED_AMPHORA},
requires=constants.AMPHORA, provides=constants.PORTS))
failover_amphora_flow.add(network_tasks.FailoverPreparationForAmphora(
rebind={constants.AMPHORA: constants.FAILED_AMPHORA},
requires=constants.AMPHORA))
# Delete the old amphora
failover_amphora_flow.add(
@ -303,6 +296,9 @@ class AmphoraFlows(object):
failover_amphora_flow.add(compute_tasks.ComputeDelete(
rebind={constants.AMPHORA: constants.FAILED_AMPHORA},
requires=constants.AMPHORA))
failover_amphora_flow.add(network_tasks.WaitForPortDetach(
rebind={constants.AMPHORA: constants.FAILED_AMPHORA},
requires=constants.AMPHORA))
failover_amphora_flow.add(
database_tasks.DisableAmphoraHealthMonitoring(
rebind={constants.AMPHORA: constants.FAILED_AMPHORA},
@ -337,23 +333,25 @@ class AmphoraFlows(object):
provides=constants.AMPHORAE_NETWORK_CONFIG))
failover_amphora_flow.add(database_tasks.GetListenersFromLoadbalancer(
requires=constants.LOADBALANCER, provides=constants.LISTENERS))
failover_amphora_flow.add(database_tasks.GetVipFromLoadbalancer(
requires=constants.LOADBALANCER, provides=constants.VIP))
failover_amphora_flow.add(amphora_driver_tasks.ListenersUpdate(
requires=(constants.LOADBALANCER, constants.LISTENERS)))
failover_amphora_flow.add(network_tasks.PlugPorts(
requires=(constants.AMPHORA, constants.PORTS)))
# Plug the VIP ports into the new amphora
failover_amphora_flow.add(network_tasks.PlugVIPPort(
requires=(constants.AMPHORA, constants.AMPHORAE_NETWORK_CONFIG)))
failover_amphora_flow.add(amphora_driver_tasks.AmphoraPostVIPPlug(
requires=(constants.LOADBALANCER,
requires=(constants.AMPHORA, constants.LOADBALANCER,
constants.AMPHORAE_NETWORK_CONFIG)))
failover_amphora_flow.add(
network_tasks.GetMemberPorts(
requires=(constants.LOADBALANCER, constants.AMPHORA),
provides=constants.MEMBER_PORTS
))
failover_amphora_flow.add(amphora_driver_tasks.AmphoraPostNetworkPlug(
rebind={constants.PORTS: constants.MEMBER_PORTS},
requires=(constants.AMPHORA, constants.PORTS)))
# Plug the member networks into the new amphora
failover_amphora_flow.add(network_tasks.CalculateDelta(
requires=(constants.LOADBALANCER),
provides=constants.DELTAS))
failover_amphora_flow.add(network_tasks.HandleNetworkDeltas(
requires=constants.DELTAS, provides=constants.ADDED_PORTS))
failover_amphora_flow.add(amphora_driver_tasks.AmphoraePostNetworkPlug(
requires=(constants.LOADBALANCER, constants.ADDED_PORTS)))
# Handle the amphora role and VRRP if necessary
if role == constants.ROLE_MASTER:

View File

@ -300,7 +300,7 @@ class LoadBalancerFlows(object):
new_LB_net_subflow.add(network_tasks.GetAmphoraeNetworkConfigs(
requires=constants.LOADBALANCER,
provides=constants.AMPHORAE_NETWORK_CONFIG))
new_LB_net_subflow.add(amphora_driver_tasks.AmphoraPostVIPPlug(
new_LB_net_subflow.add(amphora_driver_tasks.AmphoraePostVIPPlug(
requires=(constants.LOADBALANCER,
constants.AMPHORAE_NETWORK_CONFIG)))

View File

@ -223,17 +223,40 @@ class AmphoraePostNetworkPlug(BaseAmphoraTask):
class AmphoraPostVIPPlug(BaseAmphoraTask):
"""Task to notify the amphora post VIP plug."""
def execute(self, loadbalancer, amphorae_network_config):
def execute(self, amphora, loadbalancer, amphorae_network_config):
"""Execute post_vip_routine."""
self.amphora_driver.post_vip_plug(
loadbalancer, amphorae_network_config)
amphora, loadbalancer, amphorae_network_config)
LOG.debug("Notified amphora of vip plug")
def revert(self, result, amphora, loadbalancer, *args, **kwargs):
"""Handle a failed amphora vip plug notification."""
if isinstance(result, failure.Failure):
return
LOG.warning(_LW("Reverting post vip plug."))
self.amphora_repo.update(db_apis.get_session(), id=amphora.id,
status=constants.ERROR)
self.loadbalancer_repo.update(db_apis.get_session(),
id=loadbalancer.id,
provisioning_status=constants.ERROR)
class AmphoraePostVIPPlug(BaseAmphoraTask):
"""Task to notify the amphorae post VIP plug."""
def execute(self, loadbalancer, amphorae_network_config):
"""Execute post_vip_plug across the amphorae."""
amp_post_vip_plug = AmphoraPostVIPPlug()
for amphora in loadbalancer.amphorae:
amp_post_vip_plug.execute(amphora,
loadbalancer,
amphorae_network_config)
def revert(self, result, loadbalancer, *args, **kwargs):
"""Handle a failed amphora vip plug notification."""
if isinstance(result, failure.Failure):
return
LOG.warning(_LW("Reverting post vip plug."))
LOG.warning(_LW("Reverting amphorae post vip plug."))
self.loadbalancer_repo.update(db_apis.get_session(),
id=loadbalancer.id,
provisioning_status=constants.ERROR)

View File

@ -204,9 +204,13 @@ class GetMemberPorts(BaseNetworkTask):
continue
port.network = self.network_driver.get_network(port.network_id)
for fixed_ip in port.fixed_ips:
if amphora.lb_network_ip == fixed_ip.ip_address:
break
fixed_ip.subnet = self.network_driver.get_subnet(
fixed_ip.subnet_id)
member_ports.append(port)
# Only add the port to the list if the IP wasn't the mgmt IP
else:
member_ports.append(port)
return member_ports
@ -399,3 +403,47 @@ class PlugPorts(BaseNetworkTask):
'{compute_id}.'.format(port_id=port.id,
compute_id=amphora.compute_id))
self.network_driver.plug_port(amphora, port)
class PlugVIPPort(BaseNetworkTask):
"""Task to plug a VIP into a compute instance."""
def execute(self, amphora, amphorae_network_config):
vip_port = amphorae_network_config.get(amphora.id).vip_port
vrrp_port = amphorae_network_config.get(amphora.id).vrrp_port
LOG.debug('Plugging VIP VRRP port ID: {port_id} into compute '
'instance: {compute_id}.'.format(
port_id=vrrp_port.id, compute_id=amphora.compute_id))
self.network_driver.plug_port(amphora, vrrp_port)
LOG.debug('Plugging VIP port ID: {port_id} into compute instance: '
'{compute_id}.'.format(port_id=vip_port.id,
compute_id=amphora.compute_id))
self.network_driver.plug_port(amphora, vip_port)
def revert(self, result, amphora, amphorae_network_config,
*args, **kwargs):
vip_port = None
try:
vip_port = amphorae_network_config.get(amphora.id).vip_port
self.network_driver.unplug_port(amphora, vip_port)
except Exception:
LOG.warning(_LW('Failed to unplug vip port: {port} '
'from amphora: {amp}').format(port=vip_port.id,
amp=amphora.id))
vrrp_port = None
try:
vrrp_port = amphorae_network_config.get(amphora.id).vrrp_port
self.network_driver.unplug_port(amphora, vrrp_port)
except Exception:
LOG.warning(_LW('Failed to unplug vrrp port: {port} '
'from amphora: {amp}').format(port=vrrp_port.id,
amp=amphora.id))
class WaitForPortDetach(BaseNetworkTask):
"""Task to wait for the neutron ports to detach from an amphora."""
def execute(self, amphora):
LOG.debug('Waiting for ports to detach from amphora: '
'{amp_id}.'.format(amp_id=amphora.id))
self.network_driver.wait_for_port_detach(amphora)

View File

@ -75,6 +75,10 @@ class PluggedVIPNotFound(NetworkException):
pass
class TimeoutException(NetworkException):
pass
@six.add_metaclass(abc.ABCMeta)
class AbstractNetworkDriver(object):
"""This class defines the methods for a fully functional network driver.
@ -257,3 +261,18 @@ class AbstractNetworkDriver(object):
:raises: NotFound, NetworkNotFound, SubnetNotFound, PortNotFound
"""
pass
@abc.abstractmethod
def wait_for_port_detach(self, amphora):
"""Waits for the amphora ports device_id to be unset.
This method waits for the ports on an amphora device_id
parameter to be '' or None which signifies that nova has
finished detaching the port from the instance.
:param amphora: Amphora to wait for ports to detach.
:returns: None
:raises TimeoutException: Port did not detach in interval.
:raises PortNotFound: Port was not found by neutron.
"""
pass

View File

@ -83,13 +83,24 @@ class AllowedAddressPairsDriver(neutron_base.BaseNeutronDriver):
if interface.network_id == network_id:
return interface
def _plug_amphora_vip(self, compute_id, network_id):
def _plug_amphora_vip(self, amphora, network_id):
# We need a vip port owned by Octavia for Act/Stby and failover
try:
interface = self.plug_network(compute_id, network_id)
port = {'port': {'name': 'octavia-lb-vip-' + amphora.id,
'network_id': network_id,
'admin_state_up': True,
'device_owner': OCTAVIA_OWNER}}
new_port = self.neutron_client.create_port(port)
new_port = utils.convert_port_dict_to_model(new_port)
LOG.debug('Created vip port: {port_id} for amphora: {amp}'.format(
port_id=new_port.id, amp=amphora.id))
interface = self.plug_port(amphora, new_port)
except Exception:
message = _LE('Error plugging amphora (compute_id: {compute_id}) '
'into vip network {network_id}.').format(
compute_id=compute_id,
compute_id=amphora.compute_id,
network_id=network_id)
LOG.exception(message)
raise base.PlugVIPException(message)
@ -293,8 +304,8 @@ class AllowedAddressPairsDriver(neutron_base.BaseNeutronDriver):
interface = self._get_plugged_interface(amphora.compute_id,
subnet.network_id)
if not interface:
interface = self._plug_amphora_vip(amphora.compute_id,
subnet.network_id)
interface = self._plug_amphora_vip(amphora, subnet.network_id)
self._add_vip_address_pair(interface.port_id, vip.ip_address)
if self.sec_grp_enabled:
self._add_vip_security_group_to_port(load_balancer.id,
@ -457,22 +468,20 @@ class AllowedAddressPairsDriver(neutron_base.BaseNeutronDriver):
if self.dns_integration_enabled:
self.neutron_client.update_port(port.id,
{'port': {'device_id': '',
'dns_name': ''}})
else:
self.neutron_client.update_port(port.id,
{'port':
{'device_id': ''}})
{'port': {'dns_name': ''}})
except (neutron_client_exceptions.NotFound,
neutron_client_exceptions.PortNotFoundClient):
raise base.PortNotFound()
def plug_port(self, amphora, port):
plugged_interface = None
try:
self.nova_client.servers.interface_attach(
interface = self.nova_client.servers.interface_attach(
server=amphora.compute_id, net_id=None,
fixed_ip=None, port_id=port.id)
plugged_interface = self._nova_interface_to_octavia_interface(
amphora.compute_id, interface)
except nova_client_exceptions.NotFound as e:
if 'Instance' in e.message:
raise base.AmphoraNotFound(e.message)
@ -483,6 +492,11 @@ class AllowedAddressPairsDriver(neutron_base.BaseNeutronDriver):
except nova_client_exceptions.Conflict:
LOG.info(_LI('Port %(portid)s is already plugged, '
'skipping') % {'portid': port.id})
plugged_interface = n_data_models.Interface(
compute_id=amphora.compute_id,
network_id=port.network_id,
port_id=port.id,
fixed_ips=port.fixed_ips)
except Exception:
message = _LE('Error plugging amphora (compute_id: '
'{compute_id}) into port '
@ -492,6 +506,8 @@ class AllowedAddressPairsDriver(neutron_base.BaseNeutronDriver):
LOG.exception(message)
raise base.PlugNetworkException(message)
return plugged_interface
def get_network_configs(self, loadbalancer):
vip_subnet = self.get_subnet(loadbalancer.vip.subnet_id)
vip_port = self.get_port(loadbalancer.vip.port_id)
@ -516,3 +532,54 @@ class AllowedAddressPairsDriver(neutron_base.BaseNeutronDriver):
ha_port=ha_port
)
return amp_configs
def wait_for_port_detach(self, amphora):
"""Waits for the amphora ports device_id to be unset.
This method waits for the ports on an amphora device_id
parameter to be '' or None which signifies that nova has
finished detaching the port from the instance.
:param amphora: Amphora to wait for ports to detach.
:returns: None
:raises TimeoutException: Port did not detach in interval.
:raises PortNotFound: Port was not found by neutron.
"""
interfaces = self.get_plugged_networks(compute_id=amphora.compute_id)
ports = []
port_detach_timeout = CONF.networking.port_detach_timeout
for interface_ in interfaces:
port = self.get_port(port_id=interface_.port_id)
ips = port.fixed_ips
lb_network = False
for ip in ips:
if ip.ip_address == amphora.lb_network_ip:
lb_network = True
if not lb_network:
ports.append(port)
for port in ports:
try:
neutron_port = self.neutron_client.show_port(
port.id).get('port')
device_id = neutron_port['device_id']
start = int(time.time())
while device_id:
time.sleep(CONF.networking.retry_interval)
neutron_port = self.neutron_client.show_port(
port.id).get('port')
device_id = neutron_port['device_id']
timed_out = int(time.time()) - start >= port_detach_timeout
if device_id and timed_out:
message = ('Port %s failed to detach (device_id %s) '
'within the required time (%s s).' %
(port.id, device_id, port_detach_timeout))
raise base.TimeoutException(message)
except (neutron_client_exceptions.NotFound,
neutron_client_exceptions.PortNotFoundClient):
pass

View File

@ -137,6 +137,10 @@ class NoopManager(object):
self.networkconfigconfig[(loadbalancer)] = (
loadbalancer, 'get_network_configs')
def wait_for_port_detach(self, amphora):
LOG.debug("failover %s no-op, wait_for_port_detach, amphora id %s",
self.__class__.__name__, amphora.id)
class NoopNetworkDriver(driver_base.AbstractNetworkDriver):
def __init__(self):
@ -185,3 +189,6 @@ class NoopNetworkDriver(driver_base.AbstractNetworkDriver):
def get_network_configs(self, loadbalancer):
return self.driver.get_network_configs(loadbalancer)
def wait_for_port_detach(self, amphora):
self.driver.wait_for_port_detach(amphora)

View File

@ -487,21 +487,35 @@ class ServerTestCase(base.TestCase):
handle.write.assert_any_call(six.b('TestT'))
handle.write.assert_any_call(six.b('est'))
@mock.patch('octavia.amphorae.backends.agent.api_server.'
'plug._netns_interface_exists')
@mock.patch('netifaces.interfaces')
@mock.patch('netifaces.ifaddresses')
@mock.patch('pyroute2.IPRoute')
@mock.patch('pyroute2.NetNS')
@mock.patch('subprocess.check_output')
def test_plug_network(self, mock_check_output, mock_netns,
mock_pyroute2, mock_ifaddress, mock_interfaces):
mock_pyroute2, mock_ifaddress, mock_interfaces,
mock_int_exists):
port_info = {'mac_address': '123'}
test_int_num = random.randint(0, 9999)
mock_int_exists.return_value = False
netns_handle = mock_netns.return_value.__enter__.return_value
netns_handle.get_links.return_value = [0] * test_int_num
test_int_num = str(test_int_num)
# Interface already plugged
mock_int_exists.return_value = True
rv = self.app.post('/' + api_server.VERSION + "/plug/network",
content_type='application/json',
data=json.dumps(port_info))
self.assertEqual(409, rv.status_code)
self.assertEqual(dict(message="Interface already exists"),
json.loads(rv.data.decode('utf-8')))
mock_int_exists.return_value = False
# No interface at all
mock_interfaces.side_effect = [[]]
rv = self.app.post('/' + api_server.VERSION + "/plug/network",
@ -682,6 +696,8 @@ class ServerTestCase(base.TestCase):
'message': 'Error plugging network'},
json.loads(rv.data.decode('utf-8')))
@mock.patch('octavia.amphorae.backends.agent.api_server.'
'plug._netns_interface_exists')
@mock.patch('netifaces.interfaces')
@mock.patch('netifaces.ifaddresses')
@mock.patch('pyroute2.IPRoute')
@ -690,9 +706,9 @@ class ServerTestCase(base.TestCase):
@mock.patch('subprocess.check_output')
@mock.patch('shutil.copytree')
@mock.patch('os.makedirs')
def test_plug_vip4(self, mock_makedirs, mock_copytree, mock_check_output,
def test_plug_VIP4(self, mock_makedirs, mock_copytree, mock_check_output,
mock_netns, mock_netns_create, mock_pyroute2,
mock_ifaddress, mock_interfaces):
mock_ifaddress, mock_interfaces, mock_int_exists):
subnet_info = {
'subnet_cidr': '203.0.113.0/24',
@ -710,6 +726,16 @@ class ServerTestCase(base.TestCase):
rv = self.app.post('/' + api_server.VERSION + '/plug/vip/error')
self.assertEqual(400, rv.status_code)
# Interface already plugged
mock_int_exists.return_value = True
rv = self.app.post('/' + api_server.VERSION + "/plug/vip/203.0.113.2",
content_type='application/json',
data=json.dumps(subnet_info))
self.assertEqual(409, rv.status_code)
self.assertEqual(dict(message="Interface already exists"),
json.loads(rv.data.decode('utf-8')))
mock_int_exists.return_value = False
# No interface at all
mock_interfaces.side_effect = [[]]
rv = self.app.post('/' + api_server.VERSION + "/plug/vip/203.0.113.2",
@ -736,8 +762,10 @@ class ServerTestCase(base.TestCase):
flags = os.O_WRONLY | os.O_CREAT | os.O_TRUNC
mode = stat.S_IRUSR | stat.S_IWUSR | stat.S_IRGRP | stat.S_IROTH
file_name = '/etc/netns/{}/network/interfaces.d/blah.cfg'.format(
consts.AMPHORA_NAMESPACE)
file_name = ('/etc/netns/{netns}/network/interfaces.d/'
'{netns_int}.cfg'.format(
netns=consts.AMPHORA_NAMESPACE,
netns_int=consts.NETNS_PRIMARY_INTERFACE))
m = self.useFixture(test_utils.OpenFixture(file_name)).mock_open
with mock.patch('os.open') as mock_open, mock.patch.object(
@ -760,15 +788,17 @@ class ServerTestCase(base.TestCase):
handle = m()
handle.write.assert_any_call(
'\n# Generated by Octavia agent\n'
'auto blah blah:0\n'
'iface blah inet dhcp\n\n'
'iface blah:0 inet static\n'
'auto {netns_int} {netns_int}:0\n'
'iface {netns_int} inet dhcp\n\n'
'iface {netns_int}:0 inet static\n'
'address 203.0.113.2\n'
'broadcast 203.0.113.255\n'
'netmask 255.255.255.0')
'netmask 255.255.255.0'.format(
netns_int=consts.NETNS_PRIMARY_INTERFACE))
mock_check_output.assert_called_with(
['ip', 'netns', 'exec', consts.AMPHORA_NAMESPACE,
'ifup', 'blah:0'], stderr=-2)
'ifup', '{netns_int}:0'.format(
netns_int=consts.NETNS_PRIMARY_INTERFACE)], stderr=-2)
mock_interfaces.side_effect = [['blah']]
mock_ifaddress.side_effect = [[netifaces.AF_LINK],
@ -845,8 +875,10 @@ class ServerTestCase(base.TestCase):
flags = os.O_WRONLY | os.O_CREAT | os.O_TRUNC
mode = stat.S_IRUSR | stat.S_IWUSR | stat.S_IRGRP | stat.S_IROTH
file_name = '/etc/netns/{}/network/interfaces.d/blah.cfg'.format(
consts.AMPHORA_NAMESPACE)
file_name = ('/etc/netns/{netns}/network/interfaces.d/'
'{netns_int}.cfg'.format(
netns=consts.AMPHORA_NAMESPACE,
netns_int=consts.NETNS_PRIMARY_INTERFACE))
m = self.useFixture(test_utils.OpenFixture(file_name)).mock_open
with mock.patch('os.open') as mock_open, mock.patch.object(
@ -868,15 +900,16 @@ class ServerTestCase(base.TestCase):
handle = m()
handle.write.assert_any_call(
'\n# Generated by Octavia agent\n'
'auto blah blah:0\n'
'iface blah inet6 auto\n\n'
'iface blah:0 inet6 static\n'
'auto {netns_int} {netns_int}:0\n'
'iface {netns_int} inet6 auto\n\n'
'iface {netns_int}:0 inet6 static\n'
'address 2001:0db8:0000:0000:0000:0000:0000:0002\n'
'broadcast 2001:0db8:ffff:ffff:ffff:ffff:ffff:ffff\n'
'netmask 32')
'netmask 32'.format(netns_int=consts.NETNS_PRIMARY_INTERFACE))
mock_check_output.assert_called_with(
['ip', 'netns', 'exec', consts.AMPHORA_NAMESPACE,
'ifup', 'blah:0'], stderr=-2)
'ifup', '{netns_int}:0'.format(
netns_int=consts.NETNS_PRIMARY_INTERFACE)], stderr=-2)
mock_interfaces.side_effect = [['blah']]
mock_ifaddress.side_effect = [[netifaces.AF_LINK],

View File

@ -496,12 +496,15 @@ class ServerTestCase(base.TestCase):
@mock.patch('pyroute2.IPRoute')
@mock.patch('pyroute2.NetNS')
@mock.patch('subprocess.check_output')
def test_plug_network(self, mock_check_output, mock_netns,
@mock.patch('octavia.amphorae.backends.agent.api_server.'
'plug._netns_interface_exists')
def test_plug_network(self, mock_int_exists, mock_check_output, mock_netns,
mock_pyroute2, mock_ifaddress,
mock_interfaces):
port_info = {'mac_address': '123'}
test_int_num = random.randint(0, 9999)
mock_int_exists.return_value = False
netns_handle = mock_netns.return_value.__enter__.return_value
netns_handle.get_links.return_value = [0] * test_int_num
@ -639,8 +642,10 @@ class ServerTestCase(base.TestCase):
flags = os.O_WRONLY | os.O_CREAT | os.O_TRUNC
mode = stat.S_IRUSR | stat.S_IWUSR | stat.S_IRGRP | stat.S_IROTH
file_name = '/etc/netns/{0}/network/interfaces.d/blah.cfg'.format(
consts.AMPHORA_NAMESPACE)
file_name = ('/etc/netns/{netns}/network/interfaces.d/'
'{netns_int}.cfg'.format(
netns=consts.AMPHORA_NAMESPACE,
netns_int=consts.NETNS_PRIMARY_INTERFACE))
m = mock.mock_open()
with mock.patch('os.open') as mock_open, mock.patch.object(
os, 'fdopen', m) as mock_fdopen:
@ -663,15 +668,17 @@ class ServerTestCase(base.TestCase):
handle = m()
handle.write.assert_any_call(
'\n# Generated by Octavia agent\n'
'auto blah blah:0\n'
'iface blah inet dhcp\n\n'
'iface blah:0 inet static\n'
'auto {netns_int} {netns_int}:0\n'
'iface {netns_int} inet dhcp\n\n'
'iface {netns_int}:0 inet static\n'
'address 203.0.113.2\n'
'broadcast 203.0.113.255\n'
'netmask 255.255.255.0')
'netmask 255.255.255.0'.format(
netns_int=consts.NETNS_PRIMARY_INTERFACE))
mock_check_output.assert_called_with(
['ip', 'netns', 'exec', 'amphora-haproxy', 'ifup',
'blah:0'], stderr=-2)
'{netns_int}:0'.format(
netns_int=consts.NETNS_PRIMARY_INTERFACE)], stderr=-2)
mock_interfaces.side_effect = [['blah']]
mock_ifaddress.side_effect = [[netifaces.AF_LINK],
@ -748,8 +755,10 @@ class ServerTestCase(base.TestCase):
flags = os.O_WRONLY | os.O_CREAT | os.O_TRUNC
mode = stat.S_IRUSR | stat.S_IWUSR | stat.S_IRGRP | stat.S_IROTH
file_name = '/etc/netns/{0}/network/interfaces.d/blah.cfg'.format(
consts.AMPHORA_NAMESPACE)
file_name = ('/etc/netns/{netns}/network/interfaces.d/'
'{netns_int}.cfg'.format(
netns=consts.AMPHORA_NAMESPACE,
netns_int=consts.NETNS_PRIMARY_INTERFACE))
m = mock.mock_open()
with mock.patch('os.open') as mock_open, mock.patch.object(
os, 'fdopen', m) as mock_fdopen:
@ -772,15 +781,17 @@ class ServerTestCase(base.TestCase):
handle = m()
handle.write.assert_any_call(
'\n# Generated by Octavia agent\n'
'auto blah blah:0\n'
'iface blah inet6 auto\n\n'
'iface blah:0 inet6 static\n'
'auto {netns_int} {netns_int}:0\n'
'iface {netns_int} inet6 auto\n\n'
'iface {netns_int}:0 inet6 static\n'
'address 2001:0db8:0000:0000:0000:0000:0000:0002\n'
'broadcast 2001:0db8:ffff:ffff:ffff:ffff:ffff:ffff\n'
'netmask 32')
'netmask 32'.format(
netns_int=consts.NETNS_PRIMARY_INTERFACE))
mock_check_output.assert_called_with(
['ip', 'netns', 'exec', 'amphora-haproxy', 'ifup',
'blah:0'], stderr=-2)
'{netns_int}:0'.format(
netns_int=consts.NETNS_PRIMARY_INTERFACE)], stderr=-2)
mock_interfaces.side_effect = [['blah']]
mock_ifaddress.side_effect = [[netifaces.AF_LINK],

View File

@ -72,7 +72,7 @@ class TestPlug(base.TestCase):
mock_flask.jsonify.assert_any_call({
'message': 'OK',
'details': 'VIP {vip} plugged on interface {interface}'.format(
vip=FAKE_IP_IPV4, interface=FAKE_INTERFACE)
vip=FAKE_IP_IPV4, interface='eth1')
})
@mock.patch.object(plug, "flask")
@ -96,7 +96,7 @@ class TestPlug(base.TestCase):
mock_flask.jsonify.assert_any_call({
'message': 'OK',
'details': 'VIP {vip} plugged on interface {interface}'.format(
vip=FAKE_IP_IPV6_EXPANDED, interface=FAKE_INTERFACE)
vip=FAKE_IP_IPV6_EXPANDED, interface='eth1')
})
@mock.patch.object(plug, "flask")
@ -118,3 +118,17 @@ class TestPlug(base.TestCase):
mac_address=FAKE_MAC_ADDRESS
)
mock_flask.jsonify.assert_any_call({'message': 'Invalid VIP'})
@mock.patch('pyroute2.NetNS')
def test__netns_interface_exists(self, mock_netns):
netns_handle = mock_netns.return_value.__enter__.return_value
netns_handle.get_links.return_value = [{
'attrs': [['IFLA_ADDRESS', '123']]}]
# Interface is found in netns
self.assertTrue(plug._netns_interface_exists('123'))
# Interface is not found in netns
self.assertFalse(plug._netns_interface_exists('321'))

View File

@ -159,7 +159,7 @@ class TestHaproxyAmphoraLoadBalancerDriverTest(base.TestCase):
amphorae_network_config.get().vip_subnet.cidr = FAKE_CIDR
amphorae_network_config.get().vip_subnet.gateway_ip = FAKE_GATEWAY
amphorae_network_config.get().vrrp_port = self.port
self.driver.post_vip_plug(self.lb, amphorae_network_config)
self.driver.post_vip_plug(self.amp, self.lb, amphorae_network_config)
self.driver.client.plug_vip.assert_called_once_with(
self.amp, self.lb.vip.ip_address, self.subnet_info)

View File

@ -115,7 +115,7 @@ class TestNoopAmphoraLoadBalancerDriver(base.TestCase):
self.amphora.id, self.port.id)])
def test_post_vip_plug(self):
self.driver.post_vip_plug(self.load_balancer,
self.driver.post_vip_plug(self.amphora, self.load_balancer,
self.amphorae_net_configs)
expected_method_and_args = (self.load_balancer.id,
self.amphorae_net_configs,

View File

@ -264,12 +264,9 @@ class TestAmphoraFlows(base.TestCase):
self.assertIn(constants.COMPUTE_OBJ, amp_flow.provides)
self.assertIn(constants.LISTENERS, amp_flow.provides)
self.assertIn(constants.LOADBALANCER, amp_flow.provides)
self.assertIn(constants.MEMBER_PORTS, amp_flow.provides)
self.assertIn(constants.PORTS, amp_flow.provides)
self.assertIn(constants.VIP, amp_flow.provides)
self.assertEqual(2, len(amp_flow.requires))
self.assertEqual(12, len(amp_flow.provides))
self.assertEqual(11, len(amp_flow.provides))
amp_flow = self.AmpFlow.get_failover_flow(role=constants.ROLE_MASTER)
@ -286,12 +283,9 @@ class TestAmphoraFlows(base.TestCase):
self.assertIn(constants.COMPUTE_OBJ, amp_flow.provides)
self.assertIn(constants.LISTENERS, amp_flow.provides)
self.assertIn(constants.LOADBALANCER, amp_flow.provides)
self.assertIn(constants.MEMBER_PORTS, amp_flow.provides)
self.assertIn(constants.PORTS, amp_flow.provides)
self.assertIn(constants.VIP, amp_flow.provides)
self.assertEqual(2, len(amp_flow.requires))
self.assertEqual(12, len(amp_flow.provides))
self.assertEqual(11, len(amp_flow.provides))
amp_flow = self.AmpFlow.get_failover_flow(role=constants.ROLE_BACKUP)
@ -308,12 +302,9 @@ class TestAmphoraFlows(base.TestCase):
self.assertIn(constants.COMPUTE_OBJ, amp_flow.provides)
self.assertIn(constants.LISTENERS, amp_flow.provides)
self.assertIn(constants.LOADBALANCER, amp_flow.provides)
self.assertIn(constants.MEMBER_PORTS, amp_flow.provides)
self.assertIn(constants.PORTS, amp_flow.provides)
self.assertIn(constants.VIP, amp_flow.provides)
self.assertEqual(2, len(amp_flow.requires))
self.assertEqual(12, len(amp_flow.provides))
self.assertEqual(11, len(amp_flow.provides))
amp_flow = self.AmpFlow.get_failover_flow(role='BOGUSROLE')
@ -330,12 +321,9 @@ class TestAmphoraFlows(base.TestCase):
self.assertIn(constants.COMPUTE_OBJ, amp_flow.provides)
self.assertIn(constants.LISTENERS, amp_flow.provides)
self.assertIn(constants.LOADBALANCER, amp_flow.provides)
self.assertIn(constants.MEMBER_PORTS, amp_flow.provides)
self.assertIn(constants.PORTS, amp_flow.provides)
self.assertIn(constants.VIP, amp_flow.provides)
self.assertEqual(2, len(amp_flow.requires))
self.assertEqual(12, len(amp_flow.provides))
self.assertEqual(11, len(amp_flow.provides))
def test_cert_rotate_amphora_flow(self, mock_get_net_driver):
self.AmpFlow = amphora_flows.AmphoraFlows()

View File

@ -59,7 +59,7 @@ class TestAmphoraDriverTasks(base.TestCase):
def setUp(self):
_LB_mock.amphorae = _amphora_mock
_LB_mock.amphorae = [_amphora_mock]
_LB_mock.id = LB_ID
super(TestAmphoraDriverTasks, self).setUp()
@ -307,10 +307,40 @@ class TestAmphoraDriverTasks(base.TestCase):
amphorae_net_config_mock = mock.Mock()
amphora_post_vip_plug_obj = amphora_driver_tasks.AmphoraPostVIPPlug()
amphora_post_vip_plug_obj.execute(_LB_mock, amphorae_net_config_mock)
amphora_post_vip_plug_obj.execute(_amphora_mock,
_LB_mock,
amphorae_net_config_mock)
mock_driver.post_vip_plug.assert_called_once_with(
_LB_mock, amphorae_net_config_mock)
_amphora_mock, _LB_mock, amphorae_net_config_mock)
# Test revert
amp = amphora_post_vip_plug_obj.revert(None, _amphora_mock, _LB_mock)
repo.LoadBalancerRepository.update.assert_called_once_with(
_session_mock,
id=LB_ID,
provisioning_status=constants.ERROR)
self.assertIsNone(amp)
@mock.patch('octavia.db.repositories.LoadBalancerRepository.update')
def test_amphorae_post_vip_plug(self,
mock_loadbalancer_repo_update,
mock_driver,
mock_generate_uuid,
mock_log,
mock_get_session,
mock_listener_repo_get,
mock_listener_repo_update,
mock_amphora_repo_update):
amphorae_net_config_mock = mock.Mock()
amphora_post_vip_plug_obj = amphora_driver_tasks.AmphoraePostVIPPlug()
amphora_post_vip_plug_obj.execute(_LB_mock,
amphorae_net_config_mock)
mock_driver.post_vip_plug.assert_called_once_with(
_amphora_mock, _LB_mock, amphorae_net_config_mock)
# Test revert
amp = amphora_post_vip_plug_obj.revert(None, _LB_mock)

View File

@ -490,3 +490,35 @@ class TestNetworkTasks(base.TestCase):
mock_driver.plug_port.assert_any_call(amphora, port1)
mock_driver.plug_port.assert_any_call(amphora, port2)
def test_plug_vip_port(self, mock_get_net_driver):
mock_driver = mock.MagicMock()
mock_get_net_driver.return_value = mock_driver
vip_port = mock.MagicMock()
vrrp_port = mock.MagicMock()
amphorae_network_config = mock.MagicMock()
amphorae_network_config.get().vip_port = vip_port
amphorae_network_config.get().vrrp_port = vrrp_port
plugvipport = network_tasks.PlugVIPPort()
plugvipport.execute(self.amphora_mock, amphorae_network_config)
mock_driver.plug_port.assert_any_call(self.amphora_mock, vip_port)
mock_driver.plug_port.assert_any_call(self.amphora_mock, vrrp_port)
# test revert
plugvipport.revert(None, self.amphora_mock, amphorae_network_config)
mock_driver.unplug_port.assert_any_call(self.amphora_mock, vip_port)
mock_driver.unplug_port.assert_any_call(self.amphora_mock, vrrp_port)
def test_wait_for_port_detach(self, mock_get_net_driver):
mock_driver = mock.MagicMock()
mock_get_net_driver.return_value = mock_driver
amphora = o_data_models.Amphora(id=AMPHORA_ID,
lb_network_ip=IP_ADDRESS)
waitforportdetach = network_tasks.WaitForPortDetach()
waitforportdetach.execute(amphora)
mock_driver.wait_for_port_detach.assert_called_once_with(amphora)

View File

@ -17,6 +17,8 @@ import copy
import mock
from neutronclient.common import exceptions as neutron_exceptions
from novaclient.client import exceptions as nova_exceptions
from oslo_config import cfg
from oslo_config import fixture as oslo_fixture
from oslo_utils import uuidutils
from octavia.common import clients
@ -52,6 +54,7 @@ class TestAllowedAddressPairsDriver(base.TestCase):
HA_PORT_ID = "8"
HA_IP = "12.0.0.2"
PORT_ID = uuidutils.generate_uuid()
DEVICE_ID = uuidutils.generate_uuid()
def setUp(self):
super(TestAllowedAddressPairsDriver, self).setUp()
@ -586,8 +589,7 @@ class TestAllowedAddressPairsDriver(base.TestCase):
lb_network_ip=self.LB_NET_IP, ha_port_id=self.HA_PORT_ID,
ha_ip=self.HA_IP)
self.driver.failover_preparation(amphora)
port_update.assert_called_once_with(ports['ports'][1].get('id'),
{'port': {'device_id': ''}})
self.assertFalse(port_update.called)
self.driver.dns_integration_enabled = original_dns_integration_state
def test_failover_preparation_dns_integration(self):
@ -611,8 +613,7 @@ class TestAllowedAddressPairsDriver(base.TestCase):
ha_ip=self.HA_IP)
self.driver.failover_preparation(amphora)
port_update.assert_called_once_with(ports['ports'][1].get('id'),
{'port': {'dns_name': '',
'device_id': ''}})
{'port': {'dns_name': ''}})
self.driver.dns_integration_enabled = original_dns_integration_state
def _failover_show_port_side_effect(self, port_id):
@ -712,3 +713,71 @@ class TestAllowedAddressPairsDriver(base.TestCase):
expected_subnet_id = fake_subnet['subnet']['id']
self.assertEqual(expected_subnet_id, config.ha_subnet.id)
self.assertEqual(expected_subnet_id, config.vrrp_subnet.id)
@mock.patch('time.sleep')
def test_wait_for_port_detach(self, mock_sleep):
amphora = data_models.Amphora(
id=self.AMPHORA_ID, load_balancer_id=self.LB_ID,
compute_id=self.COMPUTE_ID, status=self.ACTIVE,
lb_network_ip=self.LB_NET_IP, ha_port_id=self.HA_PORT_ID,
ha_ip=self.HA_IP)
ports = {"ports": [
{"fixed_ips": [{"subnet_id": self.SUBNET_ID_1,
"ip_address": self.IP_ADDRESS_1}],
"id": self.FIXED_IP_ID_1, "network_id": self.NETWORK_ID_1},
{"fixed_ips": [{"subnet_id": self.SUBNET_ID_2,
"ip_address": self.IP_ADDRESS_2}],
"id": self.FIXED_IP_ID_2, "network_id": self.NETWORK_ID_2}]}
show_port_1_without_device_id = {"fixed_ips": [
{"subnet_id": self.SUBNET_ID_1, "ip_address": self.IP_ADDRESS_1}],
"id": self.FIXED_IP_ID_1, "network_id": self.NETWORK_ID_1,
"device_id": ''}
show_port_2_with_device_id = {"fixed_ips": [
{"subnet_id": self.SUBNET_ID_2, "ip_address": self.IP_ADDRESS_2}],
"id": self.FIXED_IP_ID_2, "network_id": self.NETWORK_ID_2,
"device_id": self.DEVICE_ID}
show_port_2_without_device_id = {"fixed_ips": [
{"subnet_id": self.SUBNET_ID_2, "ip_address": self.IP_ADDRESS_2}],
"id": self.FIXED_IP_ID_2, "network_id": self.NETWORK_ID_2,
"device_id": None}
self.driver.neutron_client.list_ports.return_value = ports
port_mock = mock.MagicMock()
port_mock.get = mock.Mock(
side_effect=[show_port_1_without_device_id,
show_port_2_with_device_id,
show_port_2_with_device_id,
show_port_2_without_device_id])
self.driver.neutron_client.show_port.return_value = port_mock
self.driver.wait_for_port_detach(amphora)
self.assertEqual(1, mock_sleep.call_count)
@mock.patch('time.time')
@mock.patch('time.sleep')
def test_wait_for_port_detach_timeout(self, mock_sleep, mock_time):
mock_time.side_effect = [1, 2, 6]
conf = oslo_fixture.Config(cfg.CONF)
conf.config(group="networking", port_detach_timeout=5)
amphora = data_models.Amphora(
id=self.AMPHORA_ID, load_balancer_id=self.LB_ID,
compute_id=self.COMPUTE_ID, status=self.ACTIVE,
lb_network_ip=self.LB_NET_IP, ha_port_id=self.HA_PORT_ID,
ha_ip=self.HA_IP)
ports = {"ports": [
{"fixed_ips": [{"subnet_id": self.SUBNET_ID_1,
"ip_address": self.IP_ADDRESS_1}],
"id": self.FIXED_IP_ID_1, "network_id": self.NETWORK_ID_1},
{"fixed_ips": [{"subnet_id": self.SUBNET_ID_2,
"ip_address": self.IP_ADDRESS_2}],
"id": self.FIXED_IP_ID_2, "network_id": self.NETWORK_ID_2}]}
show_port_1_with_device_id = {"fixed_ips": [
{"subnet_id": self.SUBNET_ID_2, "ip_address": self.IP_ADDRESS_2}],
"id": self.FIXED_IP_ID_2, "network_id": self.NETWORK_ID_2,
"device_id": self.DEVICE_ID}
self.driver.neutron_client.list_ports.return_value = ports
port_mock = mock.MagicMock()
port_mock.get = mock.Mock(
return_value=show_port_1_with_device_id)
self.driver.neutron_client.show_port.return_value = port_mock
self.assertRaises(network_base.TimeoutException,
self.driver.wait_for_port_detach,
amphora)