Add Active/standby amphora fault tests

So far Octavia QE had only one fault test which tested a failover.

This patch adds other tests which commit disruptive operations
and forces the amphora provider LB to run a failover process.
Finally, each test verifies the Octavia functionality.

The tests which are being added:
    * test_kill_amphora_agent
    * test_stop_keepalived
    * test_stop_haproxy

All of them will be run only if the Octavia topology is
ACTIVE_STANDBY.

Change-Id: I2ecab0973bc3467f4841991d6626166c319c96ac
This commit is contained in:
Omer 2021-11-04 16:55:07 +01:00 committed by Federico Ressi
parent d519f43417
commit e5193d86db
1 changed files with 135 additions and 0 deletions

View File

@ -14,13 +14,17 @@
# under the License.
from __future__ import absolute_import
import typing
import testtools
from oslo_log import log
import tobiko
from tobiko.openstack import keystone
from tobiko.openstack import octavia
from tobiko.openstack import neutron
from tobiko.openstack import stacks
from tobiko.shell import ssh
from tobiko.shell import sh
from tobiko import tripleo
@ -46,6 +50,8 @@ class OctaviaBasicFaultTest(testtools.TestCase):
listener_stack = tobiko.required_fixture(
stacks.HttpRoundRobinAmphoraIpv4Listener)
amphora_ssh_client: typing.Optional[ssh.SSHClientFixture] = None
def setUp(self):
# pylint: disable=no-member
super(OctaviaBasicFaultTest, self).setUp()
@ -82,6 +88,15 @@ class OctaviaBasicFaultTest(testtools.TestCase):
if attempt.is_last:
raise
# Attach a FIP to the LB
fip = self._set_fip_to_amphora()
# Get an ssh_client and execute the command on the Amphora
self.amphora_ssh_client = ssh.ssh_client(
host=fip['floating_ip_address'],
username='cloud-user',
connection_timeout=10)
def test_reboot_amphora_compute_node(self):
amphora_compute_host = octavia.get_amphora_compute_node(
loadbalancer_id=self.loadbalancer_stack.loadbalancer_id,
@ -134,3 +149,123 @@ class OctaviaBasicFaultTest(testtools.TestCase):
f"{attempt.elapsed_time} seconds")
if attempt.is_last:
raise
def test_kill_amphora_agent(self):
"""Kill the MASTER amphora agent
This test kills the amphora agent on the MASTER amphora.
Killing the amphora agent will cause a failover.
Octavia's functionality will be verified afterwards.
"""
self._skip_if_not_active_standby()
# Finding the amphora agent pid and kill it
amp_agent_pid_command = (
"ps -ef | awk '/amphora/{print $2}' | head -n 1")
amp_agent_pid = sh.execute(
amp_agent_pid_command, ssh_client=self.amphora_ssh_client,
sudo=True).stdout.strip()
sh.execute(f'kill -9 {amp_agent_pid}',
ssh_client=self.amphora_ssh_client,
sudo=True)
self._wait_for_failover_and_test_functionality()
def test_stop_keepalived(self):
"""Stop keepalived on MASTER amphora
This test stops keepalived on the MASTER amphora.
Stopping keepalived on the amphora will cause a failover.
Octavia's functionality will be verified afterwards.
"""
self._skip_if_not_active_standby()
sh.stop_systemd_unit(
unit='octavia-keepalived',
ssh_client=self.amphora_ssh_client,
sudo=True)
self._wait_for_failover_and_test_functionality()
def test_stop_haproxy(self):
"""Stop haproxy on MASTER amphora
This test stops haproxy on the MASTER amphora.
Stopping haproxy on the amphora will cause a failover.
Octavia's functionality will be verified afterwards.
"""
self._skip_if_not_active_standby()
haproxy_service = sh.list_systemd_units(
'haproxy-*',
ssh_client=self.amphora_ssh_client)
sh.stop_systemd_unit(
unit=haproxy_service,
ssh_client=self.amphora_ssh_client,
sudo=True)
self._wait_for_failover_and_test_functionality()
def _skip_if_not_active_standby(self):
"""Skip the test if Octavia doesn't use Active/standby topology
"""
if len(octavia.list_amphorae(
self.loadbalancer_stack.loadbalancer_id)) > 2:
skipping_stmt = 'Skipping the test as it requires ' \
'Active/standby topology.'
LOG.info(skipping_stmt)
self.skipTest(skipping_stmt)
def _wait_for_failover_and_test_functionality(self):
"""Wait for failover to end and test Octavia functionality"""
self.loadbalancer_stack.wait_for_update_loadbalancer()
self.loadbalancer_stack.wait_for_active_loadbalancer()
LOG.debug(f'Load Balancer {self.loadbalancer_stack.loadbalancer_id} is'
f' ACTIVE')
# Wait for Octavia objects' provisioning status to be ACTIVE
self.listener_stack.wait_for_active_members()
# Verify Octavia functionality
octavia.check_members_balanced(
pool_id=self.listener_stack.pool_id,
ip_address=self.loadbalancer_stack.floating_ip_address,
lb_algorithm=self.listener_stack.lb_algorithm,
protocol=self.listener_stack.lb_protocol,
port=self.listener_stack.lb_port)
def _set_fip_to_amphora(self):
"""Set a FIP to the LB management network port
This method sets a FIP to the LB management network port, which will
allow us afterwards to ssh the MASTER/SINGLE Amphora.
"""
amphora = octavia.get_master_amphora(
amphorae=octavia.list_amphorae(
self.loadbalancer_stack.loadbalancer_id),
ip_address=self.loadbalancer_stack.floating_ip_address,
lb_port=self.listener_stack.lb_port,
lb_protocol=self.listener_stack.lb_protocol)
# Finding the loadbalancer's management port
lb_network_ip = amphora['lb_network_ip']
port = neutron.find_port(device_id=amphora['compute_id'],
fixed_ips=[f'ip_address={lb_network_ip}'])
# Ensure there is a floating IP which is set to the LB management port
try:
floating_ip = neutron.find_floating_ip(port_id=port['id'])
except tobiko.ObjectNotFound:
floating_ip = neutron.create_floating_ip(port_id=port['id'])
return floating_ip