Merge "Generalize and refactor Octavia waiters module"

This commit is contained in:
Zuul 2021-04-21 13:37:22 +00:00 committed by Gerrit Code Review
commit f1b73d5ac2
9 changed files with 215 additions and 256 deletions

View File

@ -14,12 +14,30 @@
from __future__ import absolute_import from __future__ import absolute_import
from tobiko.openstack.octavia import _client from tobiko.openstack.octavia import _client
from tobiko.openstack.octavia import _waiters
from tobiko.openstack.octavia import _constants
from tobiko.openstack.octavia import _validators
from tobiko.openstack.octavia import _exceptions
OCTAVIA_CLIENT_CLASSSES = _client.OCTAVIA_CLIENT_CLASSSES OCTAVIA_CLIENT_CLASSSES = _client.OCTAVIA_CLIENT_CLASSSES
get_loadbalancer = _client.get_loadbalancer
get_octavia_client = _client.get_octavia_client get_octavia_client = _client.get_octavia_client
octavia_client = _client.octavia_client octavia_client = _client.octavia_client
OctaviaClientFixture = _client.OctaviaClientFixture OctaviaClientFixture = _client.OctaviaClientFixture
get_loadbalancer = _client.get_loadbalancer get_loadbalancer = _client.get_loadbalancer
get_member = _client.get_member
# Waiters
wait_for_status = _waiters.wait_for_status
# Validators
check_members_balanced = _validators.check_members_balanced
# Exceptions
RequestException = _exceptions.RequestException
TimeoutException = _exceptions.TimeoutException
# Constants
PROVISIONING_STATUS = _constants.PROVISIONING_STATUS
ACTIVE = _constants.ACTIVE
ERROR = _constants.ERROR

View File

@ -71,3 +71,8 @@ def get_octavia_client(session=None, shared=True, init_client=None,
def get_loadbalancer(loadbalancer_id, client=None): def get_loadbalancer(loadbalancer_id, client=None):
return octavia_client(client).load_balancer_show(lb_id=loadbalancer_id) return octavia_client(client).load_balancer_show(lb_id=loadbalancer_id)
def get_member(pool_id, member_id, client=None):
return octavia_client(client).member_show(pool_id=pool_id,
member_id=member_id)

View File

@ -12,10 +12,10 @@
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations # License for the specific language governing permissions and limitations
# under the License. # under the License.
from __future__ import absolute_import
from tobiko.tests.scenario.octavia import validators # Octavia attributes
PROVISIONING_STATUS = 'provisioning_status'
# Octavia provisioning status
class OctaviaTest(validators.Validators): ACTIVE = 'ACTIVE'
pass ERROR = 'ERROR'

View File

@ -0,0 +1,91 @@
# Copyright (c) 2021 Red Hat
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from __future__ import absolute_import
import time
from oslo_log import log
import tobiko
from tobiko.openstack import octavia
from tobiko.shell import ssh
from tobiko.shell import sh
LOG = log.getLogger(__name__)
CURL_OPTIONS = "-f --connect-timeout 2 -g"
def request(client_stack, server_ip_address, protocol, server_port):
"""Perform a request on a server.
Returns the response in case of success, throws an RequestException
otherwise.
"""
if ':' in server_ip_address:
# Add square brackets around IPv6 address to please curl
server_ip_address = "[{}]".format(server_ip_address)
cmd = "curl {} {}://{}:{}/id".format(
CURL_OPTIONS, protocol.lower(), server_ip_address, server_port)
ssh_client = ssh.ssh_client(
client_stack.floating_ip_address,
username=client_stack.image_fixture.username)
ret = sh.ssh_execute(ssh_client, cmd)
if ret.exit_status != 0:
raise octavia.RequestException(command=cmd,
error=ret.stderr)
return ret.stdout
def check_members_balanced(pool_stack, client_stack,
members_count,
loadbalancer_vip, loadbalancer_protocol,
loadbalancer_port):
"""Check if traffic is properly balanced between members."""
test_case = tobiko.get_test_case()
replies = {}
for _ in range(members_count * 10):
content = request(
client_stack, loadbalancer_vip,
loadbalancer_protocol, loadbalancer_port)
if content not in replies:
replies[content] = 0
replies[content] += 1
# wait one second (required when using cirros' nc fake webserver)
time.sleep(1)
LOG.debug("Replies from load balancer: {}".format(replies))
# assert that 'members_count' servers replied
test_case.assertEqual(members_count, len(replies),
'The number of detected active members:{} is not '
'as expected:{}'.format(len(replies), members_count))
if pool_stack.lb_algorithm == 'ROUND_ROBIN':
# assert that requests have been fairly dispatched (each server
# received the same number of requests)
test_case.assertEqual(1, len(set(replies.values())),
'The number of requests served by each member is'
' different and not as expected by used '
'ROUND_ROBIN algorithm.')

View File

@ -0,0 +1,68 @@
# Copyright (c) 2021 Red Hat
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from __future__ import absolute_import
from oslo_log import log
import tobiko
from tobiko.openstack import octavia
from tobiko import config
LOG = log.getLogger(__name__)
CONF = config.CONF
def wait_for_status(status_key, status, get_client, object_id,
interval: tobiko.Seconds = None,
timeout: tobiko.Seconds = None,
error_ok=False, **kwargs):
"""Waits for an object to reach a specific status.
:param status_key: The key of the status field in the response.
Ex. provisioning_status
:param status: The status to wait for. Ex. "ACTIVE"
:param get_client: The tobiko client get method.
Ex. _client.get_loadbalancer
:param object_id: The id of the object to query.
:param interval: How often to check the status, in seconds.
:param timeout: The maximum time, in seconds, to check the status.
:param error_ok: When true, ERROR status will not raise an exception.
:raises TimeoutException: The object did not achieve the status or ERROR in
the check_timeout period.
:raises UnexpectedStatusException: The request returned an unexpected
response code.
"""
for attempt in tobiko.retry(timeout=timeout,
interval=interval,
default_timeout=(
CONF.tobiko.octavia.check_timeout),
default_interval=(
CONF.tobiko.octavia.check_interval)):
response = get_client(object_id, **kwargs)
if response[status_key] == status:
return response
if response[status_key] == octavia.ERROR and not error_ok:
message = ('{name} {field} was updated to an invalid state of '
'ERROR'.format(name=get_client.__name__,
field=status_key))
raise octavia.RequestException(message)
# it will raise tobiko.RetryTimeLimitError in case of timeout
attempt.check_limits()
LOG.debug(f"Waiting for {get_client.__name__} {status_key} to get "
f"from '{response[status_key]}' to '{status}'...")

View File

@ -14,14 +14,16 @@
# under the License. # under the License.
from __future__ import absolute_import from __future__ import absolute_import
import testtools
import tobiko import tobiko
from tobiko.openstack import keystone from tobiko.openstack import keystone
from tobiko.openstack import octavia
from tobiko.openstack import stacks from tobiko.openstack import stacks
from tobiko.tests.scenario.octavia import waiters, octavia_base
@keystone.skip_if_missing_service(name='octavia') @keystone.skip_if_missing_service(name='octavia')
class OctaviaBasicTrafficScenarioTest(octavia_base.OctaviaTest): class OctaviaBasicTrafficScenarioTest(testtools.TestCase):
"""Octavia traffic scenario test. """Octavia traffic scenario test.
Create a load balancer with 2 members that run a server application, Create a load balancer with 2 members that run a server application,
@ -56,31 +58,31 @@ class OctaviaBasicTrafficScenarioTest(octavia_base.OctaviaTest):
self.loadbalancer_port = self.listener_stack.lb_port self.loadbalancer_port = self.listener_stack.lb_port
self.loadbalancer_protocol = self.listener_stack.lb_protocol self.loadbalancer_protocol = self.listener_stack.lb_protocol
# Wait for members octavia.wait_for_status(status_key=octavia.PROVISIONING_STATUS,
waiters.wait_for_member_functional(self.client_stack, status=octavia.ACTIVE,
self.pool_stack, get_client=octavia.get_member,
self.member1_stack, self.request) object_id=self.pool_stack.pool_id,
waiters.wait_for_member_functional(self.client_stack, member_id=self.member1_stack.member_id)
self.pool_stack,
self.member2_stack, self.request) octavia.wait_for_status(status_key=octavia.PROVISIONING_STATUS,
status=octavia.ACTIVE,
get_client=octavia.get_member,
object_id=self.pool_stack.pool_id,
member_id=self.member2_stack.member_id)
# Wait for LB is provisioned and ACTIVE # Wait for LB is provisioned and ACTIVE
waiters.wait_for_loadbalancer_is_active(self.loadbalancer_stack) octavia.wait_for_status(status_key=octavia.PROVISIONING_STATUS,
status=octavia.ACTIVE,
# Check if load balancer is functional get_client=octavia.get_loadbalancer,
waiters.wait_for_loadbalancer_functional(self.loadbalancer_stack, object_id=(
self.client_stack, self.loadbalancer_stack.loadbalancer_id))
self.loadbalancer_vip,
self.loadbalancer_protocol,
self.loadbalancer_port,
self.request)
@property @property
def loadbalancer(self): def loadbalancer(self):
return self.loadbalancer_stack return self.loadbalancer_stack
def test_traffic(self): def test_traffic(self):
self.check_members_balanced(self.pool_stack, octavia.check_members_balanced(self.pool_stack,
self.client_stack, self.client_stack,
self.members_count, self.members_count,
self.loadbalancer_vip, self.loadbalancer_vip,

View File

@ -1,89 +0,0 @@
# Copyright (c) 2021 Red Hat
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from __future__ import absolute_import
import time
from oslo_log import log
from tobiko.shell import ssh
from tobiko.shell import sh
from tobiko.tests import base
from tobiko.tests.scenario.octavia.exceptions import RequestException
LOG = log.getLogger(__name__)
CURL_OPTIONS = "-f --connect-timeout 2 -g"
class Validators(base.TobikoTest):
def request(self, client_stack, server_ip_address, protocol, server_port):
"""Perform a request on a server.
Returns the response in case of success, throws an RequestException
otherwise.
"""
if ':' in server_ip_address:
# Add square brackets around IPv6 address to please curl
server_ip_address = "[{}]".format(server_ip_address)
cmd = "curl {} {}://{}:{}/id".format(
CURL_OPTIONS, protocol.lower(), server_ip_address, server_port)
ssh_client = ssh.ssh_client(
client_stack.floating_ip_address,
username=client_stack.image_fixture.username)
ret = sh.ssh_execute(ssh_client, cmd)
if ret.exit_status != 0:
raise RequestException(command=cmd,
error=ret.stderr)
return ret.stdout
def check_members_balanced(self, pool_stack, client_stack,
members_count,
loadbalancer_vip, loadbalancer_protocol,
loadbalancer_port):
"""Check if traffic is properly balanced between members."""
replies = {}
for _ in range(members_count * 10):
content = self.request(
client_stack, loadbalancer_vip,
loadbalancer_protocol, loadbalancer_port)
if content not in replies:
replies[content] = 0
replies[content] += 1
# wait one second (required when using cirros' nc fake webserver)
time.sleep(1)
LOG.debug("Replies from load balancer: {}".format(replies))
# assert that 'members_count' servers replied
self.assertEqual(members_count, len(replies),
'The number of detected active members:{} is not '
'as expected:{}'.format(len(replies), members_count))
if pool_stack.lb_algorithm == 'ROUND_ROBIN':
# assert that requests have been fairly dispatched (each server
# received the same number of requests)
self.assertEqual(1, len(set(replies.values())),
'The number of requests served by each member is '
'different and not as expected by used '
'ROUND_ROBIN algorithm.')

View File

@ -1,136 +0,0 @@
# Copyright (c) 2021 Red Hat
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from __future__ import absolute_import
import time
from oslo_log import log
from tobiko import config
from tobiko.openstack import octavia
from tobiko.tests.scenario.octavia import exceptions
LOG = log.getLogger(__name__)
CONF = config.CONF
def wait_resource_operating_status(resource_type, operating_status,
resource_get, *args):
start = time.time()
while time.time() - start < CONF.tobiko.octavia.check_timeout:
res = resource_get(*args)
if res['operating_status'] == operating_status:
return
time.sleep(CONF.tobiko.octavia.check_interval)
raise exceptions.TimeoutException(
reason=("Cannot get operating_status '{}' from {} {} "
"within the timeout period.".format(operating_status,
resource_type, args)))
def wait_lb_operating_status(lb_id, operating_status):
LOG.debug("Wait for loadbalancer {} to have '{}' "
"operating_status".format(lb_id, operating_status))
wait_resource_operating_status("loadbalancer",
operating_status,
octavia.get_loadbalancer,
lb_id)
def wait_resource_provisioning_status(resource_type, provisioning_status,
resource_get, *args):
start = time.time()
while time.time() - start < CONF.tobiko.octavia.check_timeout:
res = resource_get(*args)
if res['provisioning_status'] == provisioning_status:
return
time.sleep(CONF.tobiko.octavia.check_interval)
raise exceptions.TimeoutException(
reason=("Cannot get provisioning_status '{}' from {} {} "
"within the timeout period.".format(provisioning_status,
resource_type, args)))
def wait_lb_provisioning_status(lb_id, provisioning_status):
LOG.debug("Wait for loadbalancer {} to have '{}' "
"provisioning_status".format(lb_id, provisioning_status))
wait_resource_provisioning_status("loadbalancer",
provisioning_status,
octavia.get_loadbalancer,
lb_id)
def wait_for_request_data(client_stack, server_ip_address,
server_protocol, server_port, request_function):
"""Wait until a request on a server succeeds
Throws a TimeoutException after CONF.tobiko.octavia.check_timeout
if the server doesn't reply.
"""
start = time.time()
while time.time() - start < CONF.tobiko.octavia.check_timeout:
try:
ret = request_function(client_stack, server_ip_address,
server_protocol, server_port)
except Exception as e:
LOG.warning("Received exception {} while performing a "
"request".format(e))
else:
return ret
time.sleep(CONF.tobiko.octavia.check_interval)
raise exceptions.TimeoutException(
reason=("Cannot get data from {} on port {} with "
"protocol {} within the timeout period.".format(
server_ip_address, server_port, server_protocol)))
def wait_for_loadbalancer_is_active(loadbalancer_stack):
loadbalancer_id = loadbalancer_stack.loadbalancer_id
wait_lb_provisioning_status(loadbalancer_id, 'ACTIVE')
def wait_for_loadbalancer_functional(loadbalancer_stack, client_stack,
loadbalancer_vip, loadbalancer_protocol,
loadbalancer_port, request_function):
"""Wait until the load balancer is functional."""
# Check load balancer status
loadbalancer_id = loadbalancer_stack.loadbalancer_id
wait_lb_operating_status(loadbalancer_id, 'ONLINE')
wait_for_request_data(client_stack, loadbalancer_vip,
loadbalancer_protocol, loadbalancer_port,
request_function)
def wait_for_member_functional(client_stack, pool_stack, member_stack,
request_function):
"""Wait until a member server is functional."""
member_ip = member_stack.server_stack.floating_ip_address
member_port = member_stack.application_port
member_protocol = pool_stack.pool_protocol
wait_for_request_data(client_stack, member_ip, member_protocol,
member_port, request_function)