Fix Amphora's SSH proxy creation

Move creation of floating IP address to a specific heat stack class.
Add a method in the listener stack class to get Amphora's SSH client

Change-Id: I98e03bb06e6a2ee470f476358f5429fa03db3121
This commit is contained in:
Federico Ressi 2022-05-12 17:08:07 +02:00
parent f27c8572d7
commit f27e5f0b41
12 changed files with 395 additions and 181 deletions

View File

@ -223,6 +223,8 @@ class HeatStackFixture(tobiko.SharedFixture):
# a new stack
self.ensure_quota_limits()
self.prepare_external_resources()
LOG.debug('Begin creating stack %r...', self.stack_name)
try:
stack_id: str = self.setup_client().stacks.create(
@ -253,6 +255,9 @@ class HeatStackFixture(tobiko.SharedFixture):
f"(name={self.stack_name}, id={stack.id}).")
return stack
def prepare_external_resources(self):
pass
_resources = None
@tobiko.fixture_property

View File

@ -65,7 +65,6 @@ skip_if_missing_networking_extensions = (
create_floating_ip = _floating_ip.create_floating_ip
delete_floating_ip = _floating_ip.delete_floating_ip
ensure_floating_ip = _floating_ip.ensure_floating_ip
get_floating_ip = _floating_ip.get_floating_ip
get_floating_ip_id = _floating_ip.get_floating_ip_id
find_floating_ip = _floating_ip.find_floating_ip
@ -109,7 +108,6 @@ NetworkIdType = _network.NetworkIdType
add_router_interface = _router.add_router_interface
create_router = _router.create_router
delete_router = _router.delete_router
ensure_router_interface = _router.ensure_router_interface
get_router = _router.get_router
get_router_id = _router.get_router_id
remove_router_interface = _router.remove_router_interface
@ -120,6 +118,7 @@ NoSuchRouter = _router.NoSuchRouter
create_subnet = _subnet.create_subnet
delete_subnet = _subnet.delete_subnet
ensure_subnet_gateway = _subnet.ensure_subnet_gateway
get_subnet = _subnet.get_subnet
get_subnet_id = _subnet.get_subnet_id
find_subnet = _subnet.find_subnet

View File

@ -19,7 +19,6 @@ import tobiko
from tobiko.openstack.neutron import _client
from tobiko.openstack.neutron import _network
from tobiko.openstack.neutron import _port
from tobiko.openstack.neutron import _router
FloatingIpType = typing.Dict[str, typing.Any]
@ -121,20 +120,5 @@ def update_floating_ip(floating_ip: FloatingIpIdType,
raise NoSuchFloatingIp(id=floating_ip_id) from ex
def ensure_floating_ip(fixed_ip_address: str,
device_id: str = None) \
-> FloatingIpType:
port = _port.find_port(device_id=device_id,
fixed_ips=[f'ip_address={fixed_ip_address}'])
try:
return find_floating_ip(port=port)
except tobiko.ObjectNotFound:
from tobiko.openstack import stacks
for fixed_ip in port['fixed_ips']:
_router.ensure_router_interface(subnet=fixed_ip['subnet_id'])
fixture = stacks.FloatingIpStackFixture(port=port)
return tobiko.setup_fixture(fixture).floating_ip_details
class NoSuchFloatingIp(tobiko.ObjectNotFound):
message = "No such floating IP found for {id!r}"

View File

@ -182,34 +182,5 @@ def remove_router_interface(router: RouterIdType,
raise tobiko.ObjectNotFound() from ex
def ensure_router_interface(subnet: _subnet.SubnetIdType,
router: RouterIdType = None,
client: _client.NeutronClientType = None):
if isinstance(subnet, str):
subnet = _subnet.get_subnet(subnet=subnet, client=client)
gateway_ip = subnet.get('gateway_ip')
if gateway_ip is not None:
try:
gateway_port = _port.find_port(fixed_ip=f'ip_address={gateway_ip}')
except _port.NoSuchPort:
pass
else:
if gateway_port['device_owner'] == 'network:router_interface':
return
if router is None:
from tobiko.openstack import stacks
router = stacks.get_router_id()
LOG.debug("Add router interface: subnet={subnet['id']}")
interface = add_router_interface(router=router,
subnet=subnet,
add_cleanup=False,
client=client)
interface_dump = json.dumps(interface, sort_keys=True, indent=4)
LOG.info(f"Added router interface:\n{interface_dump}")
subnet = _subnet.get_subnet(subnet=subnet, client=client)
assert subnet['gateway_ip']
class NoSuchRouter(tobiko.ObjectNotFound):
message = "No such router found for {id!r}"

View File

@ -115,8 +115,11 @@ def find_subnet(client: _client.NeutronClientType = None,
def update_subnet(subnet: SubnetIdType,
client: _client.NeutronClientType = None,
gateway_ip: typing.Union[str, netaddr.IPAddress] = None,
**params) -> SubnetType:
subnet_id = get_subnet_id(subnet)
if gateway_ip is not None:
params['gateway_ip'] = str(gateway_ip)
try:
return _client.neutron_client(client).update_subnet(
subnet_id, body={'subnet': params})['subnet']
@ -124,5 +127,20 @@ def update_subnet(subnet: SubnetIdType,
raise NoSuchSubnet(id=subnet_id) from ex
def ensure_subnet_gateway(subnet: SubnetIdType,
client: _client.NeutronClientType = None) \
-> SubnetType:
"""Make sure given subnet has a gateway IP"""
if isinstance(subnet, str):
subnet = get_subnet(subnet, client=client)
if not subnet.get('gateway_ip'):
cidr = netaddr.IPNetwork(subnet['cidr'])
gateway_ip = netaddr.IPAddress(cidr.first + 1)
subnet = update_subnet(subnet=subnet,
client=client,
gateway_ip=gateway_ip)
return subnet
class NoSuchSubnet(tobiko.ObjectNotFound):
message = "No such subnet found for {id!r}"

View File

@ -13,23 +13,34 @@
# under the License.
from __future__ import absolute_import
from tobiko.openstack.octavia import _amphora
from tobiko.openstack.octavia import _client
from tobiko.openstack.octavia import _waiters
from tobiko.openstack.octavia import _constants
from tobiko.openstack.octavia import _validators
from tobiko.openstack.octavia import _exceptions
from tobiko.openstack.octavia import _load_balancer
from tobiko.openstack.octavia import _validators
from tobiko.openstack.octavia import _waiters
AmphoraIdType = _amphora.AmphoraIdType
AmphoraType = _amphora.AmphoraType
get_amphora_id = _amphora.get_amphora_id
get_amphora = _amphora.get_amphora
get_amphora_compute_node = _amphora.get_amphora_compute_node
get_master_amphora = _amphora.get_master_amphora
list_amphorae = _amphora.list_amphorae
OCTAVIA_CLIENT_CLASSSES = _client.OCTAVIA_CLIENT_CLASSSES
get_octavia_client = _client.get_octavia_client
octavia_client = _client.octavia_client
OctaviaClientFixture = _client.OctaviaClientFixture
get_loadbalancer = _client.get_loadbalancer
OctaviaClientType = _client.OctaviaClientType
get_member = _client.get_member
list_members = _client.list_members
list_amphorae = _client.list_amphorae
get_amphora_compute_node = _client.get_amphora_compute_node
get_master_amphora = _client.get_master_amphora
LoadBalancerIdType = _load_balancer.LoadBalancerIdType
LoadBalancerType = _load_balancer.LoadBalancerType
get_load_balancer = _load_balancer.get_load_balancer
get_load_balancer_id = _load_balancer.get_load_balancer_id
# Waiters
wait_for_status = _waiters.wait_for_status

View File

@ -0,0 +1,126 @@
# Copyright 2022 Red Hat
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from __future__ import absolute_import
import typing
import tobiko
from tobiko.openstack.octavia import _client
from tobiko.openstack.octavia import _load_balancer
from tobiko.openstack import nova
from tobiko.openstack.octavia import _validators
from tobiko.openstack import topology
AmphoraType = typing.Dict[str, typing.Any]
AmphoraIdType = typing.Union[str, AmphoraType]
def get_amphora_id(amphora: AmphoraIdType) -> str:
if isinstance(amphora, str):
return amphora
else:
return amphora['id']
def get_amphora(amphora: AmphoraIdType,
client: _client.OctaviaClientType = None) -> AmphoraType:
amphora_id = get_amphora_id(amphora)
return _client.octavia_client(client).amphora_show(amphora_id)['amphora']
def list_amphorae(load_balancer: _load_balancer.LoadBalancerIdType = None,
client: _client.OctaviaClientType = None,
**params) \
-> tobiko.Selection[AmphoraType]:
if load_balancer is not None:
params['load_balancer_id'] = _load_balancer.get_load_balancer_id(
load_balancer)
amphorae = _client.octavia_client(client).amphora_list(
**params)['amphorae']
return tobiko.select(amphorae)
def get_amphora_compute_node(load_balancer: _load_balancer.LoadBalancerIdType,
port: int,
protocol: str,
ip_address: str,
client: _client.OctaviaClientType = None) -> (
topology.OpenStackTopologyNode):
"""Gets the compute node which hosts the LB amphora
This function finds the Overcloud compute node which
hosts the amphora. In case there are more than 1 amphora
(e.g. if the LB's topology is Active/standby), so the compute node which
hosts the master amphora will be returned.
:param load_balancer: the load balancer ID.
:param port: the load balancer port.
:param protocol: the load balancer protocol.
:param ip_address: the IP address of the load balancer
:param client: the Octavia client
:return: the compute node which hosts the Amphora.
"""
amphorae = list_amphorae(load_balancer)
amphora = get_master_amphora(amphorae=amphorae,
port=port,
protocol=protocol,
ip_address=ip_address,
client=client)
server = nova.get_server(amphora['compute_id'])
hostname = getattr(server, 'OS-EXT-SRV-ATTR:hypervisor_hostname')
return topology.get_openstack_node(hostname=hostname)
def get_master_amphora(amphorae: typing.Iterable[AmphoraType],
port: int,
protocol: str,
ip_address: str,
client=None) -> AmphoraType:
"""Gets the master Amphora in a High Available LB
(a loadbalancer which uses the Active/standby topology)
:param amphorae: The list of amphoras (each represented by
JSON).
:param port: the load balancer port.
:param protocol: the load balancer protocol.
:param ip_address: the IP address of the load balancer
:param client: the Octavia client
:return amphora (dict): JSON of the Master Amphora.
"""
amphorae = tobiko.select(amphorae)
try:
return amphorae.unique
except tobiko.MultipleObjectsFound:
# For a high available LB
pass
# Generate traffic on the LB so we can identify the current Master
_validators.check_members_balanced(ip_address=ip_address,
protocol=protocol,
port=port,
members_count=1,
requests_count=1)
# The amphora which has total_connections > 0 is the master.
# Backup amphora will always have total_connections == 0.
for amphora in amphorae:
amphora_stats = _client.octavia_client(client).amphora_stats_show(
amphora['id'])
for listener in list(amphora_stats.values())[0]:
if listener['total_connections'] > 0:
return amphora
raise ValueError("Master Amphora wasn't found!")

View File

@ -13,19 +13,22 @@
# under the License.
from __future__ import absolute_import
import typing
from octaviaclient.api.v2 import octavia
import tobiko
from tobiko.openstack import _client
from tobiko.openstack import keystone
from tobiko.openstack import nova
from tobiko.openstack.octavia import _validators
from tobiko.openstack import topology
OCTAVIA_CLIENT_CLASSSES = octavia.OctaviaAPI,
OctaviaClientType = typing.Union[octavia.OctaviaAPI,
'OctaviaClientFixture']
def get_octavia_endpoint(keystone_client=None):
return keystone.find_service_endpoint(name='octavia',
client=keystone_client)
@ -48,8 +51,8 @@ class OctaviaClientManager(_client.OpenstackClientManager):
CLIENTS = OctaviaClientManager()
def octavia_client(obj):
if not obj:
def octavia_client(obj: OctaviaClientType = None) -> octavia.OctaviaAPI:
if obj is None:
return get_octavia_client()
if isinstance(obj, OCTAVIA_CLIENT_CLASSSES):
@ -72,10 +75,6 @@ def get_octavia_client(session=None, shared=True, init_client=None,
return client.client
def get_loadbalancer(loadbalancer_id: str, client=None):
return octavia_client(client).load_balancer_show(lb_id=loadbalancer_id)
def get_member(pool_id: str, member_id: str, client=None):
return octavia_client(client).member_show(pool_id=pool_id,
member_id=member_id)
@ -83,78 +82,3 @@ def get_member(pool_id: str, member_id: str, client=None):
def list_members(pool_id: str, client=None):
return octavia_client(client).member_list(pool_id=pool_id)['members']
def list_amphorae(loadbalancer_id: str, client=None):
return octavia_client(client).amphora_list(
loadbalancer_id=loadbalancer_id)['amphorae']
def get_amphora_compute_node(loadbalancer_id: str,
lb_port: int,
lb_protocol: str,
ip_address: str) -> (
topology.OpenStackTopologyNode):
"""Gets the compute node which hosts the LB amphora
This function finds the Overcloud compute node which
hosts the amphora. In case there are more than 1 amphora
(e.g. if the LB's topology is Active/standby), so the compute node which
hosts the master amphora will be returned.
:param loadbalancer_id (str): The loadbalancer ID.
:param lb_port (int): The loadbalancer port.
:param lb_protocol (str): The loadbalancer protocol.
:param ip_address (str): The ip adress of the loadbalancer.
:return (TripleoTopologyNode): The compute node which hosts the Amphora.
"""
amphorae = list_amphorae(loadbalancer_id)
if len(amphorae) > 1: # For a high available LB
amphora = get_master_amphora(amphorae=amphorae,
lb_port=lb_port,
lb_protocol=lb_protocol,
ip_address=ip_address)
else:
amphora = amphorae[0]
server = nova.get_server(amphora['compute_id'])
hostname = getattr(server, 'OS-EXT-SRV-ATTR:hypervisor_hostname')
return topology.get_openstack_node(hostname=hostname)
def get_master_amphora(amphorae: dict,
lb_port: int,
lb_protocol: str,
ip_address: str,
client=None) -> dict:
"""Gets the master Amphora in a High Available LB
(a loadbalancer which uses the Active/standby topology)
:param amphorae (dict): The list of amphoras (each represented by
JSON).
:param lb_port (int): The loadbalancer port.
:param lb_protocol (str): The loadbalancer protocol.
:param ip_address (str): The ip adress of the loadbalancer.
:param client (optional): Any client with access to the Octavia APIs.
:return amphora (dict): JSON of the Master Amphora.
"""
# Generate traffic on the LB so we can identify the current Master
_validators.check_members_balanced(
ip_address=ip_address,
protocol=lb_protocol,
port=lb_port,
members_count=1,
requests_count=1)
# The amphora which has total_connections > 0 is the master.
# Backup amphora will always have total_connections == 0.
for amphora in amphorae:
amphora_stats = octavia_client(client).amphora_stats_show(
amphora['id'])
for listener in list(amphora_stats.values())[0]:
if listener['total_connections'] > 0:
return amphora
raise ValueError("Master Amphora wasn't found!")

View File

@ -0,0 +1,36 @@
# Copyright 2022 Red Hat
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from __future__ import absolute_import
import typing
from tobiko.openstack.octavia import _client
LoadBalancerType = typing.Dict[str, typing.Any]
LoadBalancerIdType = typing.Union[str, LoadBalancerType]
def get_load_balancer_id(load_balancer: LoadBalancerIdType) -> str:
if isinstance(load_balancer, str):
return load_balancer
else:
return load_balancer['id']
def get_load_balancer(load_balancer: LoadBalancerIdType,
client: _client.OctaviaClientType = None) \
-> LoadBalancerType:
load_balancer_id = get_load_balancer_id(load_balancer)
return _client.octavia_client(client).load_balancer_show(
load_balancer_id)

View File

@ -132,6 +132,11 @@ class ExternalNetworkStackFixture(heat.HeatStackFixture):
class RouterStackFixture(ExternalNetworkStackFixture):
def __init__(self,
neutron_client: neutron.NeutronClientType = None):
super(RouterStackFixture, self).__init__()
self._neutron_client = neutron_client
@property
def external_name(self) -> typing.Optional[str]:
return tobiko.tobiko_config().neutron.floating_network
@ -149,6 +154,44 @@ class RouterStackFixture(ExternalNetworkStackFixture):
requirements['router'] += 1
return requirements
def ensure_router_interface(self,
subnet: neutron.SubnetIdType):
ensure_router_interface(subnet=subnet,
router=self.router_id,
client=self.neutron_client)
@property
def neutron_client(self) -> neutron.NeutronClientType:
if self._neutron_client is None:
self._neutron_client = neutron.neutron_client()
return self._neutron_client
def ensure_router_interface(subnet: neutron.SubnetIdType,
router: neutron.RouterIdType = None,
client: neutron.NeutronClientType = None) \
-> neutron.PortType:
if router is None:
router = get_router_id()
router_id = neutron.get_router_id(router)
subnet_id = neutron.get_subnet_id(subnet)
try:
return neutron.find_port(fixed_ips=f"subnet_id={subnet_id}",
device_id=router_id)
except tobiko.ObjectNotFound:
pass
LOG.debug(f"Add router interface: subnet={subnet_id}")
subnet = neutron.ensure_subnet_gateway(subnet=subnet)
interface = neutron.add_router_interface(router=router,
subnet=subnet,
add_cleanup=False,
client=client)
interface_dump = json.dumps(interface, sort_keys=True, indent=4)
LOG.info(f"Added router interface:\n{interface_dump}")
return neutron.find_port(fixed_ips=f"subnet_id={subnet_id}",
device_id=router_id)
@neutron.skip_if_missing_networking_extensions('port-security')
class NetworkStackFixture(heat.HeatStackFixture):
@ -516,12 +559,15 @@ class FloatingIpStackFixture(heat.HeatStackFixture):
def __init__(self,
stack_name: str = None,
network: neutron.NetworkIdType = None,
port: neutron.PortIdType = None):
port: neutron.PortIdType = None,
device_id: str = None,
fixed_ip_address: str = None,
neutron_client: neutron.NeutronClientType = None):
self._network = network
self._port = port
if port is not None and stack_name is None:
stack_name = (f"{tobiko.get_object_name(self)}-"
f"{neutron.get_port_id(port)}")
self._neutron_client = neutron_client
self._device_id = device_id
self._fixed_ip_address = fixed_ip_address
super(FloatingIpStackFixture, self).__init__(stack_name=stack_name)
@property
@ -534,11 +580,50 @@ class FloatingIpStackFixture(heat.HeatStackFixture):
@property
def port(self) -> str:
port = self._port
if port is None:
raise ValueError(f"Undefined floating IP port ID for stack"
f" {self.stack_name}")
return neutron.get_port_id(port)
if isinstance(self._port, str):
return self._port
else:
return self.port_details['id']
@property
def port_details(self) -> neutron.PortType:
if self._port is None:
params: typing.Dict[str, typing.Any] = {}
device_id = self.device_id
if device_id is not None:
params['device_id'] = device_id
self._port = neutron.find_port(
client=self.neutron_client,
fixed_ips=[f'ip_address={self.fixed_ip_address}'],
**params)
elif isinstance(self._port, str):
self._port = neutron.get_port(self.port,
client=self.neutron_client)
assert isinstance(self._port, dict)
return self._port
@property
def fixed_ip_address(self) -> str:
if self._fixed_ip_address is None:
raise ValueError(
'Must specify at least a port or a fixed IP address')
return self._fixed_ip_address
@property
def device_id(self) -> typing.Optional[str]:
return self._device_id
def setup_stack_name(self) -> str:
stack_name = self.stack_name
if stack_name is None:
self.stack_name = stack_name = f"{self.fixture_name}-{self.port}"
return stack_name
def prepare_external_resources(self):
super().prepare_external_resources()
for fixed_ip in self.port_details['fixed_ips']:
self.router_stack.ensure_router_interface(
subnet=fixed_ip['subnet_id'])
@property
def network_details(self) -> neutron.NetworkType:
@ -546,8 +631,14 @@ class FloatingIpStackFixture(heat.HeatStackFixture):
@property
def router_details(self) -> neutron.RouterType:
return neutron.get_network(self.router_id)
return neutron.get_router(self.router_id)
@property
def floating_ip_details(self) -> neutron.FloatingIpType:
return neutron.get_floating_ip(self.floating_ip_id)
@property
def neutron_client(self) -> neutron.NeutronClientType:
if self._neutron_client is None:
self._neutron_client = self.router_stack.neutron_client
return self._neutron_client

View File

@ -15,16 +15,20 @@
# under the License.
from __future__ import absolute_import
import typing
from oslo_log import log
import tobiko
from tobiko import config
from tobiko.openstack import heat
from tobiko.openstack import octavia
from tobiko.openstack import neutron
from tobiko.openstack.stacks import _hot
from tobiko.openstack.stacks import _neutron
from tobiko.openstack.stacks import _ubuntu
from tobiko.shell import sh
from tobiko.shell import ssh
CONF = config.CONF
LOG = log.getLogger(__name__)
@ -59,7 +63,7 @@ class AmphoraIPv4LoadBalancerStack(heat.HeatStackFixture):
timeout: tobiko.Seconds = None):
octavia.wait_for_status(status_key=octavia.PROVISIONING_STATUS,
status=octavia.ACTIVE,
get_client=octavia.get_loadbalancer,
get_client=octavia.get_load_balancer,
object_id=self.loadbalancer_id,
timeout=timeout)
@ -67,7 +71,7 @@ class AmphoraIPv4LoadBalancerStack(heat.HeatStackFixture):
timeout: tobiko.Seconds = None):
octavia.wait_for_status(status_key=octavia.PROVISIONING_STATUS,
status=octavia.PENDING_UPDATE,
get_client=octavia.get_loadbalancer,
get_client=octavia.get_load_balancer,
object_id=self.loadbalancer_id,
timeout=timeout)
@ -229,6 +233,34 @@ class HttpRoundRobinAmphoraIpv4Listener(heat.HeatStackFixture):
def get_member_address(self, server_stack):
return str(server_stack.find_fixed_ip(ip_version=self.ip_version))
@property
def amphora(self) -> octavia.AmphoraType:
return octavia.get_master_amphora(
amphorae=octavia.list_amphorae(
self.loadbalancer.loadbalancer_id),
ip_address=self.loadbalancer.floating_ip_address,
port=self.lb_port,
protocol=self.lb_protocol)
_amphora_floating_ip_stack: typing.Optional[
'AmphoraFloatingIpStack'] = None
@property
def amphora_floating_ip(self) -> neutron.FloatingIpType:
if self._amphora_floating_ip_stack is None:
self._amphora_floating_ip_stack = AmphoraFloatingIpStack(
amphora=self.amphora)
return tobiko.setup_fixture(
self._amphora_floating_ip_stack).floating_ip_details
@property
def amphora_ssh_client(self) -> ssh.SSHClientType:
"""Get an ssh_client and execute the command on the Amphora"""
floating_ip = self.amphora_floating_ip
return ssh.ssh_client(host=floating_ip['floating_ip_address'],
username='cloud-user',
connection_timeout=10)
class HttpRoundRobinAmphoraIpv6Listener(HttpRoundRobinAmphoraIpv4Listener):
ip_version = 6
@ -274,3 +306,46 @@ class TcpSourceIpPortOvnIpv4Listener(HttpRoundRobinAmphoraIpv4Listener):
class TcpSourceIpPortOvnIpv6Listener(TcpSourceIpPortOvnIpv4Listener):
ip_version = 6
class AmphoraFloatingIpStack(_neutron.FloatingIpStackFixture):
def __init__(self,
amphora: octavia.AmphoraIdType = None,
stack_name: str = None,
network: neutron.NetworkIdType = None,
neutron_client: neutron.NeutronClientType = None,
octavia_client: octavia.OctaviaClientType = None):
super().__init__(stack_name=stack_name,
network=network,
neutron_client=neutron_client)
self._amphora = amphora
self._octavia_client = octavia_client
@property
def amphora(self) -> str:
return octavia.get_amphora_id(self.amphora_details)
@property
def amphora_details(self) -> octavia.AmphoraType:
if self._amphora is None:
raise ValueError('Amphora not specified')
if isinstance(self._amphora, str):
self._amphora = octavia.get_amphora(self._amphora)
assert isinstance(self._amphora, dict)
return self._amphora
def setup_stack_name(self) -> str:
stack_name = self.stack_name
if stack_name is None:
self.stack_name = stack_name = (
f"{self.fixture_name}-{self.amphora}")
return stack_name
@property
def fixed_ip_address(self) -> str:
return self.amphora_details['lb_network_ip']
@property
def device_id(self) -> typing.Optional[str]:
return self.amphora_details['compute_id']

View File

@ -14,15 +14,12 @@
# under the License.
from __future__ import absolute_import
import typing
import testtools
from oslo_log import log
import tobiko
from tobiko.openstack import keystone
from tobiko.openstack import octavia
from tobiko.openstack import neutron
from tobiko.openstack import stacks
from tobiko.shell import ssh
from tobiko.shell import sh
@ -50,8 +47,6 @@ class OctaviaBasicFaultTest(testtools.TestCase):
listener_stack = tobiko.required_fixture(
stacks.HttpRoundRobinAmphoraIpv4Listener)
amphora_ssh_client: typing.Optional[ssh.SSHClientFixture] = None
def setUp(self):
# pylint: disable=no-member
super(OctaviaBasicFaultTest, self).setUp()
@ -88,20 +83,15 @@ class OctaviaBasicFaultTest(testtools.TestCase):
if attempt.is_last:
raise
# Attach a FIP to the LB
fip = self._set_fip_to_amphora()
# Get an ssh_client and execute the command on the Amphora
self.amphora_ssh_client = ssh.ssh_client(
host=fip['floating_ip_address'],
username='cloud-user',
connection_timeout=10)
@property
def amphora_ssh_client(self) -> ssh.SSHClientType:
return self.listener_stack.amphora_ssh_client
def test_reboot_amphora_compute_node(self):
amphora_compute_host = octavia.get_amphora_compute_node(
loadbalancer_id=self.loadbalancer_stack.loadbalancer_id,
lb_port=self.listener_stack.lb_port,
lb_protocol=self.listener_stack.lb_protocol,
load_balancer=self.loadbalancer_stack.loadbalancer_id,
port=self.listener_stack.lb_port,
protocol=self.listener_stack.lb_protocol,
ip_address=self.loadbalancer_stack.floating_ip_address)
LOG.debug('Rebooting compute node...')
@ -245,19 +235,3 @@ class OctaviaBasicFaultTest(testtools.TestCase):
lb_algorithm=self.listener_stack.lb_algorithm,
protocol=self.listener_stack.lb_protocol,
port=self.listener_stack.lb_port)
def _set_fip_to_amphora(self):
"""Set a FIP to the LB management network port
This method sets a FIP to the LB management network port, which will
allow us afterwards to ssh the MASTER/SINGLE Amphora.
"""
amphora = octavia.get_master_amphora(
amphorae=octavia.list_amphorae(
self.loadbalancer_stack.loadbalancer_id),
ip_address=self.loadbalancer_stack.floating_ip_address,
lb_port=self.listener_stack.lb_port,
lb_protocol=self.listener_stack.lb_protocol)
return neutron.ensure_floating_ip(
fixed_ip_address=amphora['lb_network_ip'],
device_id=amphora['compute_id'])