Merge "Use tobiko curl module in Octavia validators"

This commit is contained in:
Zuul 2021-10-22 16:27:32 +00:00 committed by Gerrit Code Review
commit d30994b00a
8 changed files with 135 additions and 117 deletions

View File

@ -33,7 +33,8 @@ get_amphoras_compute_nodes = _client.get_amphoras_compute_nodes
# Waiters # Waiters
wait_for_status = _waiters.wait_for_status wait_for_status = _waiters.wait_for_status
wait_for_active_members_and_lb = _waiters.wait_for_active_members_and_lb wait_for_active_and_functional_members_and_lb = (
_waiters.wait_for_active_and_functional_members_and_lb)
wait_for_lb_to_be_updated_and_active = ( wait_for_lb_to_be_updated_and_active = (
_waiters.wait_for_lb_to_be_updated_and_active) _waiters.wait_for_lb_to_be_updated_and_active)

View File

@ -14,64 +14,64 @@
# under the License. # under the License.
from __future__ import absolute_import from __future__ import absolute_import
import time import collections
import typing
from oslo_log import log from oslo_log import log
import netaddr
import tobiko import tobiko
from tobiko.shell import sh from tobiko.shell import curl
from tobiko.shell import ssh
LOG = log.getLogger(__name__) LOG = log.getLogger(__name__)
CURL_OPTIONS = "-f --connect-timeout 2 -g"
def request(client_stack, ip_address, protocol, port, ssh_client=None): def check_members_balanced(members_count: int,
ssh_client = ssh_client or client_stack.ssh_client ip_address: str,
protocol: str,
if netaddr.IPAddress(ip_address) == 6: port: int,
ip_address = f"[{ip_address}]" lb_algorithm: str = None,
requests_count: int = 10,
cmd = f"curl {CURL_OPTIONS} {protocol.lower()}://{ip_address}:{port}/id" connect_timeout: tobiko.Seconds = 2.,
interval: tobiko.Seconds = 1,
return sh.ssh_execute(ssh_client, cmd).stdout ssh_client: ssh.SSHClientFixture = None) -> (
typing.Dict[str, int]):
def check_members_balanced(pool_stack, client_stack,
members_count,
loadbalancer_vip, loadbalancer_protocol,
loadbalancer_port, ssh_client=None):
"""Check if traffic is properly balanced between members.""" """Check if traffic is properly balanced between members."""
test_case = tobiko.get_test_case() test_case = tobiko.get_test_case()
replies = {} replies: typing.Dict[str, int] = collections.defaultdict(lambda: 0)
for attempt in tobiko.retry(count=members_count * requests_count,
for _ in range(members_count * 10): interval=interval):
content = request( content = curl.execute_curl(hostname=ip_address,
client_stack, loadbalancer_vip, scheme=protocol,
loadbalancer_protocol, loadbalancer_port, ssh_client) port=port,
path='id',
if content not in replies: connect_timeout=connect_timeout,
replies[content] = 0 ssh_client=ssh_client).strip()
replies[content] += 1 replies[content] += 1
# wait one second (required when using cirros' nc fake webserver) if attempt.is_last:
time.sleep(1) break
else:
raise RuntimeError('Broken retry loop')
LOG.debug("Replies from load balancer: {}".format(replies)) LOG.debug(f"Replies counts from load balancer: {replies}")
# assert that 'members_count' servers replied # assert that 'members_count' servers replied
test_case.assertEqual(members_count, len(replies), missing_members_count = members_count - len(replies)
'The number of detected active members:{} is not ' test_case.assertEqual(0, missing_members_count,
'as expected:{}'.format(len(replies), members_count)) f'Missing replies from {missing_members_count} "'
'"members.')
if pool_stack.lb_algorithm == 'ROUND_ROBIN': if lb_algorithm == 'ROUND_ROBIN':
# assert that requests have been fairly dispatched (each server # assert that requests have been fairly dispatched (each server
# received the same number of requests) # received the same number of requests)
test_case.assertEqual(1, len(set(replies.values())), test_case.assertEqual(1, len(set(replies.values())),
'The number of requests served by each member is' 'The number of requests served by each member is'
' different and not as expected by used ' ' different and not as expected by used '
'ROUND_ROBIN algorithm.') 'ROUND_ROBIN algorithm.')
return replies

View File

@ -19,6 +19,7 @@ from oslo_log import log
import tobiko import tobiko
from tobiko.openstack import octavia from tobiko.openstack import octavia
from tobiko import config from tobiko import config
from tobiko.shell import sh
LOG = log.getLogger(__name__) LOG = log.getLogger(__name__)
@ -68,19 +69,67 @@ def wait_for_status(status_key, status, get_client, object_id,
f"from '{response[status_key]}' to '{status}'...") f"from '{response[status_key]}' to '{status}'...")
def wait_for_active_members_and_lb(members, pool_id, loadbalancer_id): def wait_for_members_to_be_reachable(members,
for member_id in members: lb_protocol: str,
lb_port: int,
interval: tobiko.Seconds = None,
timeout: tobiko.Seconds = None,
count: int = 10):
# Wait for members to be reachable from localhost
last_reached_id = 0
for attempt in tobiko.retry(timeout=timeout,
count=count,
interval=interval):
try:
for member in members[last_reached_id:]:
octavia.check_members_balanced(
members_count=1,
ip_address=member.server_stack.ip_address,
protocol=lb_protocol,
port=lb_port,
requests_count=1)
last_reached_id += 1 # prevent retrying same member again
except sh.ShellCommandFailed:
LOG.info("Waiting for members to have HTTP service available...")
else:
break
if attempt.is_last:
break
else:
raise RuntimeError("Members couldn't be reached!")
def wait_for_active_and_functional_members_and_lb(
members,
pool_id: str,
lb_protocol: str,
lb_port: int,
loadbalancer_id: str,
interval: tobiko.Seconds = None,
timeout: tobiko.Seconds = None):
# Wait for members to have an ACTIVE provisioning status
for member_stack in members:
octavia.wait_for_status(status_key=octavia.PROVISIONING_STATUS, octavia.wait_for_status(status_key=octavia.PROVISIONING_STATUS,
status=octavia.ACTIVE, status=octavia.ACTIVE,
get_client=octavia.get_member, get_client=octavia.get_member,
object_id=pool_id, member_id=member_id) object_id=pool_id,
member_id=member_stack.member_id)
# Wait for LB is provisioned and ACTIVE # Wait for LB to have an ACTIVE provisioning status
octavia.wait_for_status(status_key=octavia.PROVISIONING_STATUS, octavia.wait_for_status(status_key=octavia.PROVISIONING_STATUS,
status=octavia.ACTIVE, status=octavia.ACTIVE,
get_client=octavia.get_loadbalancer, get_client=octavia.get_loadbalancer,
object_id=loadbalancer_id) object_id=loadbalancer_id)
wait_for_members_to_be_reachable(members=members,
lb_protocol=lb_protocol,
lb_port=lb_port,
timeout=timeout,
interval=interval)
def wait_for_lb_to_be_updated_and_active(loadbalancer_id): def wait_for_lb_to_be_updated_and_active(loadbalancer_id):
octavia.wait_for_status(status_key=octavia.PROVISIONING_STATUS, octavia.wait_for_status(status_key=octavia.PROVISIONING_STATUS,

View File

@ -86,7 +86,6 @@ OctaviaListenerStackFixture = _octavia.OctaviaListenerStackFixture
OctaviaPoolStackFixture = _octavia.OctaviaPoolStackFixture OctaviaPoolStackFixture = _octavia.OctaviaPoolStackFixture
OctaviaMemberServerStackFixture = _octavia.OctaviaMemberServerStackFixture OctaviaMemberServerStackFixture = _octavia.OctaviaMemberServerStackFixture
OctaviaServerStackFixture = _octavia.OctaviaServerStackFixture OctaviaServerStackFixture = _octavia.OctaviaServerStackFixture
OctaviaClientServerStackFixture = _octavia.OctaviaClientServerStackFixture
OctaviaOtherServerStackFixture = _octavia.OctaviaOtherServerStackFixture OctaviaOtherServerStackFixture = _octavia.OctaviaOtherServerStackFixture
OctaviaOtherMemberServerStackFixture = ( OctaviaOtherMemberServerStackFixture = (
_octavia.OctaviaOtherMemberServerStackFixture) _octavia.OctaviaOtherMemberServerStackFixture)

View File

@ -182,11 +182,6 @@ class OctaviaMemberServerStackFixture(heat.HeatStackFixture):
][0] ][0]
class OctaviaClientServerStackFixture(_cirros.CirrosServerStackFixture):
network_stack = tobiko.required_setup_fixture(
OctaviaVipNetworkStackFixture)
class OctaviaOtherServerStackFixture( class OctaviaOtherServerStackFixture(
OctaviaServerStackFixture): OctaviaServerStackFixture):
pass pass

View File

@ -22,7 +22,6 @@ from tobiko.openstack import keystone
from tobiko.openstack import octavia from tobiko.openstack import octavia
from tobiko.openstack import stacks from tobiko.openstack import stacks
from tobiko import tripleo from tobiko import tripleo
from tobiko.shell import sh
LOG = log.getLogger(__name__) LOG = log.getLogger(__name__)
@ -55,9 +54,6 @@ class OctaviaBasicFaultTest(testtools.TestCase):
member2_stack = tobiko.required_setup_fixture( member2_stack = tobiko.required_setup_fixture(
stacks.OctaviaOtherMemberServerStackFixture) stacks.OctaviaOtherMemberServerStackFixture)
client_stack = tobiko.required_setup_fixture(
stacks.OctaviaClientServerStackFixture)
members_count = 2 members_count = 2
def setUp(self): def setUp(self):
@ -79,9 +75,11 @@ class OctaviaBasicFaultTest(testtools.TestCase):
# Send traffic # Send traffic
octavia.check_members_balanced( octavia.check_members_balanced(
self.pool_stack, self.client_stack, self.members_count, members_count=self.members_count,
self.loadbalancer_stack.loadbalancer_vip, ip_address=self.loadbalancer_stack.floating_ip_address,
self.listener_stack.lb_protocol, self.listener_stack.lb_port) lb_algorithm=self.pool_stack.lb_algorithm,
protocol=self.listener_stack.lb_protocol,
port=self.listener_stack.lb_port)
def test_reboot_amphora_compute_node(self): def test_reboot_amphora_compute_node(self):
amphora_compute_hosts = octavia.get_amphoras_compute_nodes( amphora_compute_hosts = octavia.get_amphoras_compute_nodes(
@ -102,25 +100,18 @@ class OctaviaBasicFaultTest(testtools.TestCase):
f' ACTIVE') f' ACTIVE')
# Wait for Octavia objects' provisioning status to be ACTIVE # Wait for Octavia objects' provisioning status to be ACTIVE
octavia.wait_for_active_members_and_lb( octavia.wait_for_active_and_functional_members_and_lb(
members=[self.member1_stack.member_id, members=[self.member1_stack,
self.member2_stack.member_id], self.member2_stack],
pool_id=self.pool_stack.pool_id, pool_id=self.pool_stack.pool_id,
lb_protocol=self.listener_stack.lb_protocol,
lb_port=self.listener_stack.lb_port,
loadbalancer_id=self.loadbalancer_stack.loadbalancer_id) loadbalancer_id=self.loadbalancer_stack.loadbalancer_id)
# Reach members before verifying Octavia functionality
curl_member = "curl -f --connect-timeout 2 -g "
sh.ssh_execute(
self.client_stack.ssh_client,
f'{curl_member} + {self.member1_stack.server_stack.ip_address}')
sh.ssh_execute(
self.client_stack.ssh_client,
f'{curl_member} + {self.member2_stack.server_stack.ip_address}')
# Verify Octavia functionality # Verify Octavia functionality
octavia.check_members_balanced( octavia.check_members_balanced(
self.pool_stack, self.client_stack, self.members_count, members_count=self.members_count,
self.loadbalancer_stack.loadbalancer_vip, ip_address=self.loadbalancer_stack.floating_ip_address,
self.listener_stack.lb_protocol, self.listener_stack.lb_port) lb_algorithm=self.pool_stack.lb_algorithm,
protocol=self.listener_stack.lb_protocol,
port=self.listener_stack.lb_port)

View File

@ -64,9 +64,6 @@ class OctaviaServicesFaultTest(testtools.TestCase):
member2_stack = tobiko.required_setup_fixture( member2_stack = tobiko.required_setup_fixture(
stacks.OctaviaOtherMemberServerStackFixture) stacks.OctaviaOtherMemberServerStackFixture)
client_stack = tobiko.required_setup_fixture(
stacks.OctaviaClientServerStackFixture)
members_count = 2 members_count = 2
list_octavia_active_units = ('systemctl list-units ' + list_octavia_active_units = ('systemctl list-units ' +
@ -94,9 +91,11 @@ class OctaviaServicesFaultTest(testtools.TestCase):
# Sending initial traffic before we stop octavia services # Sending initial traffic before we stop octavia services
octavia.check_members_balanced( octavia.check_members_balanced(
self.pool_stack, self.client_stack, self.members_count, members_count=self.members_count,
self.loadbalancer_stack.loadbalancer_vip, ip_address=self.loadbalancer_stack.floating_ip_address,
self.listener_stack.lb_protocol, self.listener_stack.lb_port) lb_algorithm=self.pool_stack.lb_algorithm,
protocol=self.listener_stack.lb_protocol,
port=self.listener_stack.lb_port)
def test_services_fault(self): def test_services_fault(self):
# excluded_services are the services which will be stopped # excluded_services are the services which will be stopped
@ -180,9 +179,11 @@ class OctaviaServicesFaultTest(testtools.TestCase):
self.assertTrue(service not in octavia_active_units, err_msg) self.assertTrue(service not in octavia_active_units, err_msg)
octavia.check_members_balanced( octavia.check_members_balanced(
self.pool_stack, self.client_stack, self.members_count, members_count=self.members_count,
self.loadbalancer_stack.loadbalancer_vip, ip_address=self.loadbalancer_stack.floating_ip_address,
self.listener_stack.lb_protocol, self.listener_stack.lb_port) lb_algorithm=self.pool_stack.lb_algorithm,
protocol=self.listener_stack.lb_protocol,
port=self.listener_stack.lb_port)
def _start_octavia_main_services( def _start_octavia_main_services(
self, controllers: typing.List[OpenStackTopologyNode] = None): self, controllers: typing.List[OpenStackTopologyNode] = None):
@ -209,6 +210,8 @@ class OctaviaServicesFaultTest(testtools.TestCase):
self._make_sure_octavia_services_are_active(controller) self._make_sure_octavia_services_are_active(controller)
octavia.check_members_balanced( octavia.check_members_balanced(
self.pool_stack, self.client_stack, self.members_count, members_count=self.members_count,
self.loadbalancer_stack.loadbalancer_vip, ip_address=self.loadbalancer_stack.floating_ip_address,
self.listener_stack.lb_protocol, self.listener_stack.lb_port) lb_algorithm=self.pool_stack.lb_algorithm,
protocol=self.listener_stack.lb_protocol,
port=self.listener_stack.lb_port)

View File

@ -45,46 +45,26 @@ class OctaviaBasicTrafficScenarioTest(testtools.TestCase):
member2_stack = tobiko.required_setup_fixture( member2_stack = tobiko.required_setup_fixture(
stacks.OctaviaOtherMemberServerStackFixture) stacks.OctaviaOtherMemberServerStackFixture)
client_stack = tobiko.required_setup_fixture(
stacks.OctaviaClientServerStackFixture)
members_count = 2 members_count = 2
def setUp(self): def setUp(self):
# pylint: disable=no-member # pylint: disable=no-member
super(OctaviaBasicTrafficScenarioTest, self).setUp() super(OctaviaBasicTrafficScenarioTest, self).setUp()
self.loadbalancer_vip = self.loadbalancer_stack.loadbalancer_vip # Wait for Octavia objects' provisioning status to be ACTIVE
self.loadbalancer_port = self.listener_stack.lb_port # and reachable
self.loadbalancer_protocol = self.listener_stack.lb_protocol octavia.wait_for_active_and_functional_members_and_lb(
members=[self.member1_stack,
octavia.wait_for_status(status_key=octavia.PROVISIONING_STATUS, self.member2_stack],
status=octavia.ACTIVE, pool_id=self.pool_stack.pool_id,
get_client=octavia.get_member, lb_protocol=self.listener_stack.lb_protocol,
object_id=self.pool_stack.pool_id, lb_port=self.listener_stack.lb_port,
member_id=self.member1_stack.member_id) loadbalancer_id=self.loadbalancer_stack.loadbalancer_id)
octavia.wait_for_status(status_key=octavia.PROVISIONING_STATUS,
status=octavia.ACTIVE,
get_client=octavia.get_member,
object_id=self.pool_stack.pool_id,
member_id=self.member2_stack.member_id)
# Wait for LB is provisioned and ACTIVE
octavia.wait_for_status(status_key=octavia.PROVISIONING_STATUS,
status=octavia.ACTIVE,
get_client=octavia.get_loadbalancer,
object_id=(
self.loadbalancer_stack.loadbalancer_id))
@property
def loadbalancer(self):
return self.loadbalancer_stack
def test_traffic(self): def test_traffic(self):
octavia.check_members_balanced(self.pool_stack, octavia.check_members_balanced(
self.client_stack, members_count=self.members_count,
self.members_count, ip_address=self.loadbalancer_stack.floating_ip_address,
self.loadbalancer_vip, lb_algorithm=self.pool_stack.lb_algorithm,
self.loadbalancer_protocol, protocol=self.listener_stack.lb_protocol,
self.loadbalancer_port) port=self.listener_stack.lb_port)