Use tobiko curl module in Octavia validators

So far, we used a raw ssh command which uses curl in order to make
sure the LB members are balanced (in the Octavia validators module).

This patch uses the Tobiko curl module instead, a change which
will make the maintenance of Octavia modules easier in the future.

To make the use of Tobiko curl possible, there has to be added a new
waiter method which waits until the members are reachable.

Change-Id: I98bd593422d7f7c8dde805fe0eb75293b5598dbe
This commit is contained in:
Omer 2021-10-20 14:50:30 +02:00 committed by Federico Ressi
parent 5e3ed15e1b
commit 21e5cbff56
8 changed files with 135 additions and 117 deletions

View File

@ -33,7 +33,8 @@ get_amphoras_compute_nodes = _client.get_amphoras_compute_nodes
# Waiters
wait_for_status = _waiters.wait_for_status
wait_for_active_members_and_lb = _waiters.wait_for_active_members_and_lb
wait_for_active_and_functional_members_and_lb = (
_waiters.wait_for_active_and_functional_members_and_lb)
wait_for_lb_to_be_updated_and_active = (
_waiters.wait_for_lb_to_be_updated_and_active)

View File

@ -14,64 +14,64 @@
# under the License.
from __future__ import absolute_import
import time
import collections
import typing
from oslo_log import log
import netaddr
import tobiko
from tobiko.shell import sh
from tobiko.shell import curl
from tobiko.shell import ssh
LOG = log.getLogger(__name__)
CURL_OPTIONS = "-f --connect-timeout 2 -g"
def request(client_stack, ip_address, protocol, port, ssh_client=None):
ssh_client = ssh_client or client_stack.ssh_client
if netaddr.IPAddress(ip_address) == 6:
ip_address = f"[{ip_address}]"
cmd = f"curl {CURL_OPTIONS} {protocol.lower()}://{ip_address}:{port}/id"
return sh.ssh_execute(ssh_client, cmd).stdout
def check_members_balanced(pool_stack, client_stack,
members_count,
loadbalancer_vip, loadbalancer_protocol,
loadbalancer_port, ssh_client=None):
def check_members_balanced(members_count: int,
ip_address: str,
protocol: str,
port: int,
lb_algorithm: str = None,
requests_count: int = 10,
connect_timeout: tobiko.Seconds = 2.,
interval: tobiko.Seconds = 1,
ssh_client: ssh.SSHClientFixture = None) -> (
typing.Dict[str, int]):
"""Check if traffic is properly balanced between members."""
test_case = tobiko.get_test_case()
replies = {}
for _ in range(members_count * 10):
content = request(
client_stack, loadbalancer_vip,
loadbalancer_protocol, loadbalancer_port, ssh_client)
if content not in replies:
replies[content] = 0
replies: typing.Dict[str, int] = collections.defaultdict(lambda: 0)
for attempt in tobiko.retry(count=members_count * requests_count,
interval=interval):
content = curl.execute_curl(hostname=ip_address,
scheme=protocol,
port=port,
path='id',
connect_timeout=connect_timeout,
ssh_client=ssh_client).strip()
replies[content] += 1
# wait one second (required when using cirros' nc fake webserver)
time.sleep(1)
if attempt.is_last:
break
else:
raise RuntimeError('Broken retry loop')
LOG.debug("Replies from load balancer: {}".format(replies))
LOG.debug(f"Replies counts from load balancer: {replies}")
# assert that 'members_count' servers replied
test_case.assertEqual(members_count, len(replies),
'The number of detected active members:{} is not '
'as expected:{}'.format(len(replies), members_count))
missing_members_count = members_count - len(replies)
test_case.assertEqual(0, missing_members_count,
f'Missing replies from {missing_members_count} "'
'"members.')
if pool_stack.lb_algorithm == 'ROUND_ROBIN':
if lb_algorithm == 'ROUND_ROBIN':
# assert that requests have been fairly dispatched (each server
# received the same number of requests)
test_case.assertEqual(1, len(set(replies.values())),
'The number of requests served by each member is'
' different and not as expected by used '
'ROUND_ROBIN algorithm.')
return replies

View File

@ -19,6 +19,7 @@ from oslo_log import log
import tobiko
from tobiko.openstack import octavia
from tobiko import config
from tobiko.shell import sh
LOG = log.getLogger(__name__)
@ -68,19 +69,67 @@ def wait_for_status(status_key, status, get_client, object_id,
f"from '{response[status_key]}' to '{status}'...")
def wait_for_active_members_and_lb(members, pool_id, loadbalancer_id):
for member_id in members:
def wait_for_members_to_be_reachable(members,
lb_protocol: str,
lb_port: int,
interval: tobiko.Seconds = None,
timeout: tobiko.Seconds = None,
count: int = 10):
# Wait for members to be reachable from localhost
last_reached_id = 0
for attempt in tobiko.retry(timeout=timeout,
count=count,
interval=interval):
try:
for member in members[last_reached_id:]:
octavia.check_members_balanced(
members_count=1,
ip_address=member.server_stack.ip_address,
protocol=lb_protocol,
port=lb_port,
requests_count=1)
last_reached_id += 1 # prevent retrying same member again
except sh.ShellCommandFailed:
LOG.info("Waiting for members to have HTTP service available...")
else:
break
if attempt.is_last:
break
else:
raise RuntimeError("Members couldn't be reached!")
def wait_for_active_and_functional_members_and_lb(
members,
pool_id: str,
lb_protocol: str,
lb_port: int,
loadbalancer_id: str,
interval: tobiko.Seconds = None,
timeout: tobiko.Seconds = None):
# Wait for members to have an ACTIVE provisioning status
for member_stack in members:
octavia.wait_for_status(status_key=octavia.PROVISIONING_STATUS,
status=octavia.ACTIVE,
get_client=octavia.get_member,
object_id=pool_id, member_id=member_id)
object_id=pool_id,
member_id=member_stack.member_id)
# Wait for LB is provisioned and ACTIVE
# Wait for LB to have an ACTIVE provisioning status
octavia.wait_for_status(status_key=octavia.PROVISIONING_STATUS,
status=octavia.ACTIVE,
get_client=octavia.get_loadbalancer,
object_id=loadbalancer_id)
wait_for_members_to_be_reachable(members=members,
lb_protocol=lb_protocol,
lb_port=lb_port,
timeout=timeout,
interval=interval)
def wait_for_lb_to_be_updated_and_active(loadbalancer_id):
octavia.wait_for_status(status_key=octavia.PROVISIONING_STATUS,

View File

@ -86,7 +86,6 @@ OctaviaListenerStackFixture = _octavia.OctaviaListenerStackFixture
OctaviaPoolStackFixture = _octavia.OctaviaPoolStackFixture
OctaviaMemberServerStackFixture = _octavia.OctaviaMemberServerStackFixture
OctaviaServerStackFixture = _octavia.OctaviaServerStackFixture
OctaviaClientServerStackFixture = _octavia.OctaviaClientServerStackFixture
OctaviaOtherServerStackFixture = _octavia.OctaviaOtherServerStackFixture
OctaviaOtherMemberServerStackFixture = (
_octavia.OctaviaOtherMemberServerStackFixture)

View File

@ -184,11 +184,6 @@ class OctaviaMemberServerStackFixture(heat.HeatStackFixture):
][0]
class OctaviaClientServerStackFixture(_cirros.CirrosServerStackFixture):
network_stack = tobiko.required_setup_fixture(
OctaviaVipNetworkStackFixture)
class OctaviaOtherServerStackFixture(
OctaviaServerStackFixture):
pass

View File

@ -22,7 +22,6 @@ from tobiko.openstack import keystone
from tobiko.openstack import octavia
from tobiko.openstack import stacks
from tobiko import tripleo
from tobiko.shell import sh
LOG = log.getLogger(__name__)
@ -55,9 +54,6 @@ class OctaviaBasicFaultTest(testtools.TestCase):
member2_stack = tobiko.required_setup_fixture(
stacks.OctaviaOtherMemberServerStackFixture)
client_stack = tobiko.required_setup_fixture(
stacks.OctaviaClientServerStackFixture)
members_count = 2
def setUp(self):
@ -79,9 +75,11 @@ class OctaviaBasicFaultTest(testtools.TestCase):
# Send traffic
octavia.check_members_balanced(
self.pool_stack, self.client_stack, self.members_count,
self.loadbalancer_stack.loadbalancer_vip,
self.listener_stack.lb_protocol, self.listener_stack.lb_port)
members_count=self.members_count,
ip_address=self.loadbalancer_stack.floating_ip_address,
lb_algorithm=self.pool_stack.lb_algorithm,
protocol=self.listener_stack.lb_protocol,
port=self.listener_stack.lb_port)
def test_reboot_amphora_compute_node(self):
amphora_compute_hosts = octavia.get_amphoras_compute_nodes(
@ -102,25 +100,18 @@ class OctaviaBasicFaultTest(testtools.TestCase):
f' ACTIVE')
# Wait for Octavia objects' provisioning status to be ACTIVE
octavia.wait_for_active_members_and_lb(
members=[self.member1_stack.member_id,
self.member2_stack.member_id],
octavia.wait_for_active_and_functional_members_and_lb(
members=[self.member1_stack,
self.member2_stack],
pool_id=self.pool_stack.pool_id,
lb_protocol=self.listener_stack.lb_protocol,
lb_port=self.listener_stack.lb_port,
loadbalancer_id=self.loadbalancer_stack.loadbalancer_id)
# Reach members before verifying Octavia functionality
curl_member = "curl -f --connect-timeout 2 -g "
sh.ssh_execute(
self.client_stack.ssh_client,
f'{curl_member} + {self.member1_stack.server_stack.ip_address}')
sh.ssh_execute(
self.client_stack.ssh_client,
f'{curl_member} + {self.member2_stack.server_stack.ip_address}')
# Verify Octavia functionality
octavia.check_members_balanced(
self.pool_stack, self.client_stack, self.members_count,
self.loadbalancer_stack.loadbalancer_vip,
self.listener_stack.lb_protocol, self.listener_stack.lb_port)
members_count=self.members_count,
ip_address=self.loadbalancer_stack.floating_ip_address,
lb_algorithm=self.pool_stack.lb_algorithm,
protocol=self.listener_stack.lb_protocol,
port=self.listener_stack.lb_port)

View File

@ -64,9 +64,6 @@ class OctaviaServicesFaultTest(testtools.TestCase):
member2_stack = tobiko.required_setup_fixture(
stacks.OctaviaOtherMemberServerStackFixture)
client_stack = tobiko.required_setup_fixture(
stacks.OctaviaClientServerStackFixture)
members_count = 2
list_octavia_active_units = ('systemctl list-units ' +
@ -94,9 +91,11 @@ class OctaviaServicesFaultTest(testtools.TestCase):
# Sending initial traffic before we stop octavia services
octavia.check_members_balanced(
self.pool_stack, self.client_stack, self.members_count,
self.loadbalancer_stack.loadbalancer_vip,
self.listener_stack.lb_protocol, self.listener_stack.lb_port)
members_count=self.members_count,
ip_address=self.loadbalancer_stack.floating_ip_address,
lb_algorithm=self.pool_stack.lb_algorithm,
protocol=self.listener_stack.lb_protocol,
port=self.listener_stack.lb_port)
def test_services_fault(self):
# excluded_services are the services which will be stopped
@ -180,9 +179,11 @@ class OctaviaServicesFaultTest(testtools.TestCase):
self.assertTrue(service not in octavia_active_units, err_msg)
octavia.check_members_balanced(
self.pool_stack, self.client_stack, self.members_count,
self.loadbalancer_stack.loadbalancer_vip,
self.listener_stack.lb_protocol, self.listener_stack.lb_port)
members_count=self.members_count,
ip_address=self.loadbalancer_stack.floating_ip_address,
lb_algorithm=self.pool_stack.lb_algorithm,
protocol=self.listener_stack.lb_protocol,
port=self.listener_stack.lb_port)
def _start_octavia_main_services(
self, controllers: typing.List[OpenStackTopologyNode] = None):
@ -209,6 +210,8 @@ class OctaviaServicesFaultTest(testtools.TestCase):
self._make_sure_octavia_services_are_active(controller)
octavia.check_members_balanced(
self.pool_stack, self.client_stack, self.members_count,
self.loadbalancer_stack.loadbalancer_vip,
self.listener_stack.lb_protocol, self.listener_stack.lb_port)
members_count=self.members_count,
ip_address=self.loadbalancer_stack.floating_ip_address,
lb_algorithm=self.pool_stack.lb_algorithm,
protocol=self.listener_stack.lb_protocol,
port=self.listener_stack.lb_port)

View File

@ -45,46 +45,26 @@ class OctaviaBasicTrafficScenarioTest(testtools.TestCase):
member2_stack = tobiko.required_setup_fixture(
stacks.OctaviaOtherMemberServerStackFixture)
client_stack = tobiko.required_setup_fixture(
stacks.OctaviaClientServerStackFixture)
members_count = 2
def setUp(self):
# pylint: disable=no-member
super(OctaviaBasicTrafficScenarioTest, self).setUp()
self.loadbalancer_vip = self.loadbalancer_stack.loadbalancer_vip
self.loadbalancer_port = self.listener_stack.lb_port
self.loadbalancer_protocol = self.listener_stack.lb_protocol
octavia.wait_for_status(status_key=octavia.PROVISIONING_STATUS,
status=octavia.ACTIVE,
get_client=octavia.get_member,
object_id=self.pool_stack.pool_id,
member_id=self.member1_stack.member_id)
octavia.wait_for_status(status_key=octavia.PROVISIONING_STATUS,
status=octavia.ACTIVE,
get_client=octavia.get_member,
object_id=self.pool_stack.pool_id,
member_id=self.member2_stack.member_id)
# Wait for LB is provisioned and ACTIVE
octavia.wait_for_status(status_key=octavia.PROVISIONING_STATUS,
status=octavia.ACTIVE,
get_client=octavia.get_loadbalancer,
object_id=(
self.loadbalancer_stack.loadbalancer_id))
@property
def loadbalancer(self):
return self.loadbalancer_stack
# Wait for Octavia objects' provisioning status to be ACTIVE
# and reachable
octavia.wait_for_active_and_functional_members_and_lb(
members=[self.member1_stack,
self.member2_stack],
pool_id=self.pool_stack.pool_id,
lb_protocol=self.listener_stack.lb_protocol,
lb_port=self.listener_stack.lb_port,
loadbalancer_id=self.loadbalancer_stack.loadbalancer_id)
def test_traffic(self):
octavia.check_members_balanced(self.pool_stack,
self.client_stack,
self.members_count,
self.loadbalancer_vip,
self.loadbalancer_protocol,
self.loadbalancer_port)
octavia.check_members_balanced(
members_count=self.members_count,
ip_address=self.loadbalancer_stack.floating_ip_address,
lb_algorithm=self.pool_stack.lb_algorithm,
protocol=self.listener_stack.lb_protocol,
port=self.listener_stack.lb_port)