From 21e5cbff56cf46370c3c16a48af3d0c991414893 Mon Sep 17 00:00:00 2001 From: Omer Date: Wed, 20 Oct 2021 14:50:30 +0200 Subject: [PATCH] Use tobiko curl module in Octavia validators So far, we used a raw ssh command which uses curl in order to make sure the LB members are balanced (in the Octavia validators module). This patch uses the Tobiko curl module instead, a change which will make the maintenance of Octavia modules easier in the future. To make the use of Tobiko curl possible, there has to be added a new waiter method which waits until the members are reachable. Change-Id: I98bd593422d7f7c8dde805fe0eb75293b5598dbe --- tobiko/openstack/octavia/__init__.py | 3 +- tobiko/openstack/octavia/_validators.py | 70 +++++++++---------- tobiko/openstack/octavia/_waiters.py | 57 +++++++++++++-- tobiko/openstack/stacks/__init__.py | 1 - tobiko/openstack/stacks/_octavia.py | 5 -- tobiko/tests/faults/octavia/test_faults.py | 39 ++++------- tobiko/tests/faults/octavia/test_services.py | 27 +++---- tobiko/tests/scenario/octavia/test_traffic.py | 50 ++++--------- 8 files changed, 135 insertions(+), 117 deletions(-) diff --git a/tobiko/openstack/octavia/__init__.py b/tobiko/openstack/octavia/__init__.py index 67ab047c2..93252933b 100644 --- a/tobiko/openstack/octavia/__init__.py +++ b/tobiko/openstack/octavia/__init__.py @@ -33,7 +33,8 @@ get_amphoras_compute_nodes = _client.get_amphoras_compute_nodes # Waiters wait_for_status = _waiters.wait_for_status -wait_for_active_members_and_lb = _waiters.wait_for_active_members_and_lb +wait_for_active_and_functional_members_and_lb = ( + _waiters.wait_for_active_and_functional_members_and_lb) wait_for_lb_to_be_updated_and_active = ( _waiters.wait_for_lb_to_be_updated_and_active) diff --git a/tobiko/openstack/octavia/_validators.py b/tobiko/openstack/octavia/_validators.py index e9179f9ea..3b36b0904 100644 --- a/tobiko/openstack/octavia/_validators.py +++ b/tobiko/openstack/octavia/_validators.py @@ -14,64 +14,64 @@ # under the License. from __future__ import absolute_import -import time +import collections +import typing from oslo_log import log -import netaddr import tobiko -from tobiko.shell import sh +from tobiko.shell import curl +from tobiko.shell import ssh LOG = log.getLogger(__name__) -CURL_OPTIONS = "-f --connect-timeout 2 -g" -def request(client_stack, ip_address, protocol, port, ssh_client=None): - ssh_client = ssh_client or client_stack.ssh_client - - if netaddr.IPAddress(ip_address) == 6: - ip_address = f"[{ip_address}]" - - cmd = f"curl {CURL_OPTIONS} {protocol.lower()}://{ip_address}:{port}/id" - - return sh.ssh_execute(ssh_client, cmd).stdout - - -def check_members_balanced(pool_stack, client_stack, - members_count, - loadbalancer_vip, loadbalancer_protocol, - loadbalancer_port, ssh_client=None): +def check_members_balanced(members_count: int, + ip_address: str, + protocol: str, + port: int, + lb_algorithm: str = None, + requests_count: int = 10, + connect_timeout: tobiko.Seconds = 2., + interval: tobiko.Seconds = 1, + ssh_client: ssh.SSHClientFixture = None) -> ( + typing.Dict[str, int]): """Check if traffic is properly balanced between members.""" test_case = tobiko.get_test_case() - replies = {} - - for _ in range(members_count * 10): - content = request( - client_stack, loadbalancer_vip, - loadbalancer_protocol, loadbalancer_port, ssh_client) - - if content not in replies: - replies[content] = 0 + replies: typing.Dict[str, int] = collections.defaultdict(lambda: 0) + for attempt in tobiko.retry(count=members_count * requests_count, + interval=interval): + content = curl.execute_curl(hostname=ip_address, + scheme=protocol, + port=port, + path='id', + connect_timeout=connect_timeout, + ssh_client=ssh_client).strip() replies[content] += 1 - # wait one second (required when using cirros' nc fake webserver) - time.sleep(1) + if attempt.is_last: + break + else: + raise RuntimeError('Broken retry loop') - LOG.debug("Replies from load balancer: {}".format(replies)) + LOG.debug(f"Replies counts from load balancer: {replies}") # assert that 'members_count' servers replied - test_case.assertEqual(members_count, len(replies), - 'The number of detected active members:{} is not ' - 'as expected:{}'.format(len(replies), members_count)) + missing_members_count = members_count - len(replies) + test_case.assertEqual(0, missing_members_count, + f'Missing replies from {missing_members_count} "' + '"members.') - if pool_stack.lb_algorithm == 'ROUND_ROBIN': + if lb_algorithm == 'ROUND_ROBIN': # assert that requests have been fairly dispatched (each server # received the same number of requests) test_case.assertEqual(1, len(set(replies.values())), 'The number of requests served by each member is' ' different and not as expected by used ' 'ROUND_ROBIN algorithm.') + + return replies diff --git a/tobiko/openstack/octavia/_waiters.py b/tobiko/openstack/octavia/_waiters.py index 422ddb0da..a35d60936 100644 --- a/tobiko/openstack/octavia/_waiters.py +++ b/tobiko/openstack/octavia/_waiters.py @@ -19,6 +19,7 @@ from oslo_log import log import tobiko from tobiko.openstack import octavia from tobiko import config +from tobiko.shell import sh LOG = log.getLogger(__name__) @@ -68,19 +69,67 @@ def wait_for_status(status_key, status, get_client, object_id, f"from '{response[status_key]}' to '{status}'...") -def wait_for_active_members_and_lb(members, pool_id, loadbalancer_id): - for member_id in members: +def wait_for_members_to_be_reachable(members, + lb_protocol: str, + lb_port: int, + interval: tobiko.Seconds = None, + timeout: tobiko.Seconds = None, + count: int = 10): + + # Wait for members to be reachable from localhost + last_reached_id = 0 + for attempt in tobiko.retry(timeout=timeout, + count=count, + interval=interval): + try: + for member in members[last_reached_id:]: + octavia.check_members_balanced( + members_count=1, + ip_address=member.server_stack.ip_address, + protocol=lb_protocol, + port=lb_port, + requests_count=1) + last_reached_id += 1 # prevent retrying same member again + except sh.ShellCommandFailed: + LOG.info("Waiting for members to have HTTP service available...") + else: + break + + if attempt.is_last: + break + else: + raise RuntimeError("Members couldn't be reached!") + + +def wait_for_active_and_functional_members_and_lb( + members, + pool_id: str, + lb_protocol: str, + lb_port: int, + loadbalancer_id: str, + interval: tobiko.Seconds = None, + timeout: tobiko.Seconds = None): + + # Wait for members to have an ACTIVE provisioning status + for member_stack in members: octavia.wait_for_status(status_key=octavia.PROVISIONING_STATUS, status=octavia.ACTIVE, get_client=octavia.get_member, - object_id=pool_id, member_id=member_id) + object_id=pool_id, + member_id=member_stack.member_id) - # Wait for LB is provisioned and ACTIVE + # Wait for LB to have an ACTIVE provisioning status octavia.wait_for_status(status_key=octavia.PROVISIONING_STATUS, status=octavia.ACTIVE, get_client=octavia.get_loadbalancer, object_id=loadbalancer_id) + wait_for_members_to_be_reachable(members=members, + lb_protocol=lb_protocol, + lb_port=lb_port, + timeout=timeout, + interval=interval) + def wait_for_lb_to_be_updated_and_active(loadbalancer_id): octavia.wait_for_status(status_key=octavia.PROVISIONING_STATUS, diff --git a/tobiko/openstack/stacks/__init__.py b/tobiko/openstack/stacks/__init__.py index 68cf2f532..be7be18bf 100644 --- a/tobiko/openstack/stacks/__init__.py +++ b/tobiko/openstack/stacks/__init__.py @@ -86,7 +86,6 @@ OctaviaListenerStackFixture = _octavia.OctaviaListenerStackFixture OctaviaPoolStackFixture = _octavia.OctaviaPoolStackFixture OctaviaMemberServerStackFixture = _octavia.OctaviaMemberServerStackFixture OctaviaServerStackFixture = _octavia.OctaviaServerStackFixture -OctaviaClientServerStackFixture = _octavia.OctaviaClientServerStackFixture OctaviaOtherServerStackFixture = _octavia.OctaviaOtherServerStackFixture OctaviaOtherMemberServerStackFixture = ( _octavia.OctaviaOtherMemberServerStackFixture) diff --git a/tobiko/openstack/stacks/_octavia.py b/tobiko/openstack/stacks/_octavia.py index d9a3098c7..6d73f64a2 100644 --- a/tobiko/openstack/stacks/_octavia.py +++ b/tobiko/openstack/stacks/_octavia.py @@ -184,11 +184,6 @@ class OctaviaMemberServerStackFixture(heat.HeatStackFixture): ][0] -class OctaviaClientServerStackFixture(_cirros.CirrosServerStackFixture): - network_stack = tobiko.required_setup_fixture( - OctaviaVipNetworkStackFixture) - - class OctaviaOtherServerStackFixture( OctaviaServerStackFixture): pass diff --git a/tobiko/tests/faults/octavia/test_faults.py b/tobiko/tests/faults/octavia/test_faults.py index 897eb1e95..1949e7c9a 100644 --- a/tobiko/tests/faults/octavia/test_faults.py +++ b/tobiko/tests/faults/octavia/test_faults.py @@ -22,7 +22,6 @@ from tobiko.openstack import keystone from tobiko.openstack import octavia from tobiko.openstack import stacks from tobiko import tripleo -from tobiko.shell import sh LOG = log.getLogger(__name__) @@ -55,9 +54,6 @@ class OctaviaBasicFaultTest(testtools.TestCase): member2_stack = tobiko.required_setup_fixture( stacks.OctaviaOtherMemberServerStackFixture) - client_stack = tobiko.required_setup_fixture( - stacks.OctaviaClientServerStackFixture) - members_count = 2 def setUp(self): @@ -79,9 +75,11 @@ class OctaviaBasicFaultTest(testtools.TestCase): # Send traffic octavia.check_members_balanced( - self.pool_stack, self.client_stack, self.members_count, - self.loadbalancer_stack.loadbalancer_vip, - self.listener_stack.lb_protocol, self.listener_stack.lb_port) + members_count=self.members_count, + ip_address=self.loadbalancer_stack.floating_ip_address, + lb_algorithm=self.pool_stack.lb_algorithm, + protocol=self.listener_stack.lb_protocol, + port=self.listener_stack.lb_port) def test_reboot_amphora_compute_node(self): amphora_compute_hosts = octavia.get_amphoras_compute_nodes( @@ -102,25 +100,18 @@ class OctaviaBasicFaultTest(testtools.TestCase): f' ACTIVE') # Wait for Octavia objects' provisioning status to be ACTIVE - octavia.wait_for_active_members_and_lb( - members=[self.member1_stack.member_id, - self.member2_stack.member_id], + octavia.wait_for_active_and_functional_members_and_lb( + members=[self.member1_stack, + self.member2_stack], pool_id=self.pool_stack.pool_id, + lb_protocol=self.listener_stack.lb_protocol, + lb_port=self.listener_stack.lb_port, loadbalancer_id=self.loadbalancer_stack.loadbalancer_id) - # Reach members before verifying Octavia functionality - curl_member = "curl -f --connect-timeout 2 -g " - - sh.ssh_execute( - self.client_stack.ssh_client, - f'{curl_member} + {self.member1_stack.server_stack.ip_address}') - - sh.ssh_execute( - self.client_stack.ssh_client, - f'{curl_member} + {self.member2_stack.server_stack.ip_address}') - # Verify Octavia functionality octavia.check_members_balanced( - self.pool_stack, self.client_stack, self.members_count, - self.loadbalancer_stack.loadbalancer_vip, - self.listener_stack.lb_protocol, self.listener_stack.lb_port) + members_count=self.members_count, + ip_address=self.loadbalancer_stack.floating_ip_address, + lb_algorithm=self.pool_stack.lb_algorithm, + protocol=self.listener_stack.lb_protocol, + port=self.listener_stack.lb_port) diff --git a/tobiko/tests/faults/octavia/test_services.py b/tobiko/tests/faults/octavia/test_services.py index cc0cd4df3..3c8485e91 100644 --- a/tobiko/tests/faults/octavia/test_services.py +++ b/tobiko/tests/faults/octavia/test_services.py @@ -64,9 +64,6 @@ class OctaviaServicesFaultTest(testtools.TestCase): member2_stack = tobiko.required_setup_fixture( stacks.OctaviaOtherMemberServerStackFixture) - client_stack = tobiko.required_setup_fixture( - stacks.OctaviaClientServerStackFixture) - members_count = 2 list_octavia_active_units = ('systemctl list-units ' + @@ -94,9 +91,11 @@ class OctaviaServicesFaultTest(testtools.TestCase): # Sending initial traffic before we stop octavia services octavia.check_members_balanced( - self.pool_stack, self.client_stack, self.members_count, - self.loadbalancer_stack.loadbalancer_vip, - self.listener_stack.lb_protocol, self.listener_stack.lb_port) + members_count=self.members_count, + ip_address=self.loadbalancer_stack.floating_ip_address, + lb_algorithm=self.pool_stack.lb_algorithm, + protocol=self.listener_stack.lb_protocol, + port=self.listener_stack.lb_port) def test_services_fault(self): # excluded_services are the services which will be stopped @@ -180,9 +179,11 @@ class OctaviaServicesFaultTest(testtools.TestCase): self.assertTrue(service not in octavia_active_units, err_msg) octavia.check_members_balanced( - self.pool_stack, self.client_stack, self.members_count, - self.loadbalancer_stack.loadbalancer_vip, - self.listener_stack.lb_protocol, self.listener_stack.lb_port) + members_count=self.members_count, + ip_address=self.loadbalancer_stack.floating_ip_address, + lb_algorithm=self.pool_stack.lb_algorithm, + protocol=self.listener_stack.lb_protocol, + port=self.listener_stack.lb_port) def _start_octavia_main_services( self, controllers: typing.List[OpenStackTopologyNode] = None): @@ -209,6 +210,8 @@ class OctaviaServicesFaultTest(testtools.TestCase): self._make_sure_octavia_services_are_active(controller) octavia.check_members_balanced( - self.pool_stack, self.client_stack, self.members_count, - self.loadbalancer_stack.loadbalancer_vip, - self.listener_stack.lb_protocol, self.listener_stack.lb_port) + members_count=self.members_count, + ip_address=self.loadbalancer_stack.floating_ip_address, + lb_algorithm=self.pool_stack.lb_algorithm, + protocol=self.listener_stack.lb_protocol, + port=self.listener_stack.lb_port) diff --git a/tobiko/tests/scenario/octavia/test_traffic.py b/tobiko/tests/scenario/octavia/test_traffic.py index a2910d895..de7dd6157 100644 --- a/tobiko/tests/scenario/octavia/test_traffic.py +++ b/tobiko/tests/scenario/octavia/test_traffic.py @@ -45,46 +45,26 @@ class OctaviaBasicTrafficScenarioTest(testtools.TestCase): member2_stack = tobiko.required_setup_fixture( stacks.OctaviaOtherMemberServerStackFixture) - client_stack = tobiko.required_setup_fixture( - stacks.OctaviaClientServerStackFixture) - members_count = 2 def setUp(self): # pylint: disable=no-member super(OctaviaBasicTrafficScenarioTest, self).setUp() - self.loadbalancer_vip = self.loadbalancer_stack.loadbalancer_vip - self.loadbalancer_port = self.listener_stack.lb_port - self.loadbalancer_protocol = self.listener_stack.lb_protocol - - octavia.wait_for_status(status_key=octavia.PROVISIONING_STATUS, - status=octavia.ACTIVE, - get_client=octavia.get_member, - object_id=self.pool_stack.pool_id, - member_id=self.member1_stack.member_id) - - octavia.wait_for_status(status_key=octavia.PROVISIONING_STATUS, - status=octavia.ACTIVE, - get_client=octavia.get_member, - object_id=self.pool_stack.pool_id, - member_id=self.member2_stack.member_id) - - # Wait for LB is provisioned and ACTIVE - octavia.wait_for_status(status_key=octavia.PROVISIONING_STATUS, - status=octavia.ACTIVE, - get_client=octavia.get_loadbalancer, - object_id=( - self.loadbalancer_stack.loadbalancer_id)) - - @property - def loadbalancer(self): - return self.loadbalancer_stack + # Wait for Octavia objects' provisioning status to be ACTIVE + # and reachable + octavia.wait_for_active_and_functional_members_and_lb( + members=[self.member1_stack, + self.member2_stack], + pool_id=self.pool_stack.pool_id, + lb_protocol=self.listener_stack.lb_protocol, + lb_port=self.listener_stack.lb_port, + loadbalancer_id=self.loadbalancer_stack.loadbalancer_id) def test_traffic(self): - octavia.check_members_balanced(self.pool_stack, - self.client_stack, - self.members_count, - self.loadbalancer_vip, - self.loadbalancer_protocol, - self.loadbalancer_port) + octavia.check_members_balanced( + members_count=self.members_count, + ip_address=self.loadbalancer_stack.floating_ip_address, + lb_algorithm=self.pool_stack.lb_algorithm, + protocol=self.listener_stack.lb_protocol, + port=self.listener_stack.lb_port)