diff --git a/tobiko/tests/faults/octavia/test_faults.py b/tobiko/tests/faults/octavia/test_faults.py index 488f6f4f7..06b49eb93 100644 --- a/tobiko/tests/faults/octavia/test_faults.py +++ b/tobiko/tests/faults/octavia/test_faults.py @@ -14,13 +14,17 @@ # under the License. from __future__ import absolute_import +import typing + import testtools from oslo_log import log import tobiko from tobiko.openstack import keystone from tobiko.openstack import octavia +from tobiko.openstack import neutron from tobiko.openstack import stacks +from tobiko.shell import ssh from tobiko.shell import sh from tobiko import tripleo @@ -46,6 +50,8 @@ class OctaviaBasicFaultTest(testtools.TestCase): listener_stack = tobiko.required_fixture( stacks.HttpRoundRobinAmphoraIpv4Listener) + amphora_ssh_client: typing.Optional[ssh.SSHClientFixture] = None + def setUp(self): # pylint: disable=no-member super(OctaviaBasicFaultTest, self).setUp() @@ -82,6 +88,15 @@ class OctaviaBasicFaultTest(testtools.TestCase): if attempt.is_last: raise + # Attach a FIP to the LB + fip = self._set_fip_to_amphora() + + # Get an ssh_client and execute the command on the Amphora + self.amphora_ssh_client = ssh.ssh_client( + host=fip['floating_ip_address'], + username='cloud-user', + connection_timeout=10) + def test_reboot_amphora_compute_node(self): amphora_compute_host = octavia.get_amphora_compute_node( loadbalancer_id=self.loadbalancer_stack.loadbalancer_id, @@ -134,3 +149,123 @@ class OctaviaBasicFaultTest(testtools.TestCase): f"{attempt.elapsed_time} seconds") if attempt.is_last: raise + + def test_kill_amphora_agent(self): + """Kill the MASTER amphora agent + + This test kills the amphora agent on the MASTER amphora. + Killing the amphora agent will cause a failover. + + Octavia's functionality will be verified afterwards. + """ + + self._skip_if_not_active_standby() + + # Finding the amphora agent pid and kill it + + amp_agent_pid_command = ( + "ps -ef | awk '/amphora/{print $2}' | head -n 1") + + amp_agent_pid = sh.execute( + amp_agent_pid_command, ssh_client=self.amphora_ssh_client, + sudo=True).stdout.strip() + + sh.execute(f'kill -9 {amp_agent_pid}', + ssh_client=self.amphora_ssh_client, + sudo=True) + + self._wait_for_failover_and_test_functionality() + + def test_stop_keepalived(self): + """Stop keepalived on MASTER amphora + + This test stops keepalived on the MASTER amphora. + Stopping keepalived on the amphora will cause a failover. + + Octavia's functionality will be verified afterwards. + """ + + self._skip_if_not_active_standby() + + sh.stop_systemd_unit( + unit='octavia-keepalived', + ssh_client=self.amphora_ssh_client, + sudo=True) + + self._wait_for_failover_and_test_functionality() + + def test_stop_haproxy(self): + """Stop haproxy on MASTER amphora + + This test stops haproxy on the MASTER amphora. + Stopping haproxy on the amphora will cause a failover. + + Octavia's functionality will be verified afterwards. + """ + + self._skip_if_not_active_standby() + + haproxy_service = sh.list_systemd_units( + 'haproxy-*', + ssh_client=self.amphora_ssh_client) + + sh.stop_systemd_unit( + unit=haproxy_service, + ssh_client=self.amphora_ssh_client, + sudo=True) + + self._wait_for_failover_and_test_functionality() + + def _skip_if_not_active_standby(self): + """Skip the test if Octavia doesn't use Active/standby topology + """ + if len(octavia.list_amphorae( + self.loadbalancer_stack.loadbalancer_id)) > 2: + skipping_stmt = 'Skipping the test as it requires ' \ + 'Active/standby topology.' + LOG.info(skipping_stmt) + self.skipTest(skipping_stmt) + + def _wait_for_failover_and_test_functionality(self): + """Wait for failover to end and test Octavia functionality""" + + self.loadbalancer_stack.wait_for_update_loadbalancer() + self.loadbalancer_stack.wait_for_active_loadbalancer() + + LOG.debug(f'Load Balancer {self.loadbalancer_stack.loadbalancer_id} is' + f' ACTIVE') + + # Wait for Octavia objects' provisioning status to be ACTIVE + self.listener_stack.wait_for_active_members() + + # Verify Octavia functionality + octavia.check_members_balanced( + pool_id=self.listener_stack.pool_id, + ip_address=self.loadbalancer_stack.floating_ip_address, + lb_algorithm=self.listener_stack.lb_algorithm, + protocol=self.listener_stack.lb_protocol, + port=self.listener_stack.lb_port) + + def _set_fip_to_amphora(self): + """Set a FIP to the LB management network port + + This method sets a FIP to the LB management network port, which will + allow us afterwards to ssh the MASTER/SINGLE Amphora. + """ + amphora = octavia.get_master_amphora( + amphorae=octavia.list_amphorae( + self.loadbalancer_stack.loadbalancer_id), + ip_address=self.loadbalancer_stack.floating_ip_address, + lb_port=self.listener_stack.lb_port, + lb_protocol=self.listener_stack.lb_protocol) + + # Finding the loadbalancer's management port + lb_network_ip = amphora['lb_network_ip'] + port = neutron.find_port(device_id=amphora['compute_id'], + fixed_ips=[f'ip_address={lb_network_ip}']) + # Ensure there is a floating IP which is set to the LB management port + try: + floating_ip = neutron.find_floating_ip(port_id=port['id']) + except tobiko.ObjectNotFound: + floating_ip = neutron.create_floating_ip(port_id=port['id']) + return floating_ip