Merge "Scatter Octavia's test logic to tobiko framework"

This commit is contained in:
Zuul 2021-02-15 12:31:02 +00:00 committed by Gerrit Code Review
commit 596bb01340
7 changed files with 277 additions and 166 deletions

View File

@ -82,3 +82,6 @@ OctaviaListenerStackFixture = _octavia.OctaviaListenerStackFixture
OctaviaMemberServerStackFixture = _octavia.OctaviaMemberServerStackFixture
OctaviaServerStackFixture = _octavia.OctaviaServerStackFixture
OctaviaClientServerStackFixture = _octavia.OctaviaClientServerStackFixture
OctaviaOtherServerStackFixture = _octavia.OctaviaOtherServerStackFixture
OctaviaOtherMemberServerStackFixture = (
_octavia.OctaviaOtherMemberServerStackFixture)

View File

@ -150,3 +150,14 @@ class OctaviaMemberServerStackFixture(heat.HeatStackFixture):
class OctaviaClientServerStackFixture(_cirros.CirrosServerStackFixture):
network_stack = tobiko.required_setup_fixture(
OctaviaVipNetworkStackFixture)
class OctaviaOtherServerStackFixture(
OctaviaServerStackFixture):
pass
class OctaviaOtherMemberServerStackFixture(
OctaviaMemberServerStackFixture):
server_stack = tobiko.required_setup_fixture(
OctaviaOtherServerStackFixture)

View File

@ -0,0 +1,26 @@
# Copyright (c) 2021 Red Hat
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from __future__ import absolute_import
import tobiko
class RequestException(tobiko.TobikoException):
message = ("Error while sending request to server "
"(command was '{command}'): {error}")
class TimeoutException(tobiko.TobikoException):
message = "Timeout exception: {reason}"

View File

@ -0,0 +1,21 @@
# Copyright (c) 2021 Red Hat
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from __future__ import absolute_import
from tobiko.tests.scenario.octavia import validators
class OctaviaTest(validators.Validators):
pass

View File

@ -14,48 +14,14 @@
# under the License.
from __future__ import absolute_import
import time
from oslo_log import log
import tobiko
from tobiko import config
from tobiko.openstack import keystone
from tobiko.openstack import octavia
from tobiko.openstack import stacks
from tobiko.shell import ssh
from tobiko.shell import sh
from tobiko.tests import base
LOG = log.getLogger(__name__)
CONF = config.CONF
CURL_OPTIONS = "-f --connect-timeout 2 -g"
class OctaviaOtherServerStackFixture(
stacks.OctaviaServerStackFixture):
pass
class OctaviaOtherMemberServerStackFixture(
stacks.OctaviaMemberServerStackFixture):
server_stack = tobiko.required_setup_fixture(
OctaviaOtherServerStackFixture)
class RequestException(tobiko.TobikoException):
message = ("Error while sending request to server "
"(command was '{command}'): {error}")
class TimeoutException(tobiko.TobikoException):
message = "Timeout exception: {reason}"
from tobiko.tests.scenario.octavia import waiters, octavia_base
@keystone.skip_if_missing_service(name='octavia')
class OctaviaBasicTrafficScenarioTest(base.TobikoTest):
class OctaviaBasicTrafficScenarioTest(octavia_base.OctaviaTest):
"""Octavia traffic scenario test.
Create a load balancer with 2 members that run a server application,
@ -72,7 +38,7 @@ class OctaviaBasicTrafficScenarioTest(base.TobikoTest):
stacks.OctaviaMemberServerStackFixture)
member2_stack = tobiko.required_setup_fixture(
OctaviaOtherMemberServerStackFixture)
stacks.OctaviaOtherMemberServerStackFixture)
client_stack = tobiko.required_setup_fixture(
stacks.OctaviaClientServerStackFixture)
@ -88,139 +54,29 @@ class OctaviaBasicTrafficScenarioTest(base.TobikoTest):
self.loadbalancer_protocol = self.listener_stack.lb_protocol
# Wait for members
self._check_member(self.member1_stack)
self._check_member(self.member2_stack)
waiters.wait_for_member_functional(self.client_stack,
self.listener_stack,
self.member1_stack, self.request)
waiters.wait_for_member_functional(self.client_stack,
self.listener_stack,
self.member2_stack, self.request)
# Check if load balancer is functional
self._check_loadbalancer()
waiters.wait_for_loadbalancer_functional(self.loadbalancer_stack,
self.client_stack,
self.loadbalancer_vip,
self.loadbalancer_protocol,
self.loadbalancer_port,
self.request)
def _request(self, client_stack, server_ip_address, protocol, server_port):
"""Perform a request on a server.
@property
def loadbalancer(self):
return self.loadbalancer_stack
Returns the response in case of success, throws an RequestException
otherwise.
"""
if ':' in server_ip_address:
# Add square brackets around IPv6 address to please curl
server_ip_address = "[{}]".format(server_ip_address)
cmd = "curl {} {}://{}:{}/id".format(
CURL_OPTIONS, protocol.lower(), server_ip_address, server_port)
ssh_client = ssh.ssh_client(
client_stack.floating_ip_address,
username=client_stack.image_fixture.username)
ret = sh.ssh_execute(ssh_client, cmd)
if ret.exit_status != 0:
raise RequestException(command=cmd,
error=ret.stderr)
return ret.stdout
def _wait_resource_operating_status(self, resource_type, operating_status,
resource_get, *args):
start = time.time()
while time.time() - start < CONF.tobiko.octavia.check_timeout:
res = resource_get(*args)
if res['operating_status'] == operating_status:
return
time.sleep(CONF.tobiko.octavia.check_interval)
raise TimeoutException(
reason=("Cannot get operating_status '{}' from {} {} "
"within the timeout period.".format(operating_status,
resource_type, args)))
def _wait_lb_operating_status(self, lb_id, operating_status):
LOG.debug("Wait for loadbalancer {} to have '{}' "
"operating_status".format(lb_id, operating_status))
self._wait_resource_operating_status("loadbalancer",
operating_status,
octavia.get_loadbalancer,
lb_id)
def _wait_for_request_data(self, client_stack, server_ip_address,
server_protocol, server_port):
"""Wait until a request on a server succeeds
Throws a TimeoutException after CONF.tobiko.octavia.check_timeout
if the server doesn't reply.
"""
start = time.time()
while time.time() - start < CONF.tobiko.octavia.check_timeout:
try:
ret = self._request(client_stack, server_ip_address,
server_protocol, server_port)
except Exception as e:
LOG.warning("Received exception {} while performing a "
"request".format(e))
else:
return ret
time.sleep(CONF.tobiko.octavia.check_interval)
raise TimeoutException(
reason=("Cannot get data from {} on port {} with "
"protocol {} within the timeout period.".format(
server_ip_address, server_port,
server_protocol)))
def _check_loadbalancer(self):
"""Wait until the load balancer is functional."""
# Check load balancer status
loadbalancer_id = self.loadbalancer_stack.loadbalancer_id
self._wait_lb_operating_status(loadbalancer_id, 'ONLINE')
self._wait_for_request_data(self.client_stack,
def test_traffic(self):
self.check_members_balanced(self.listener_stack,
self.client_stack,
self.members_count,
self.loadbalancer_vip,
self.loadbalancer_protocol,
self.loadbalancer_port)
def _check_member(self, member_stack):
"""Wait until a member server is functional."""
member_ip = member_stack.server_stack.floating_ip_address
member_port = member_stack.application_port
member_protocol = self.listener_stack.pool_protocol
self._wait_for_request_data(self.client_stack, member_ip,
member_protocol, member_port)
def _check_members_balanced(self):
"""Check if traffic is properly balanced between members."""
replies = {}
for _ in range(self.members_count*10):
content = self._request(
self.client_stack, self.loadbalancer_vip,
self.loadbalancer_protocol, self.loadbalancer_port)
if content not in replies:
replies[content] = 0
replies[content] += 1
# wait one second (required when using cirros' nc fake webserver)
time.sleep(1)
LOG.debug("Replies from load balancer: {}".format(
replies))
# assert that 'members_count' servers replied
self.assertEqual(
self.members_count, len(replies),
'The number of detected active members:{} is not '
'as expected:{}'.format(len(replies), self.members_count))
if self.listener_stack.lb_algorithm == 'ROUND_ROBIN':
# assert that requests have been fairly dispatched (each server
# received the same number of requests)
self.assertEqual(
1, len(set(replies.values())),
'The number of requests served by each member is '
'different and not as expected by used ROUND_ROBIN algorithm.')
def test_traffic(self):
self._check_members_balanced()

View File

@ -0,0 +1,89 @@
# Copyright (c) 2021 Red Hat
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from __future__ import absolute_import
import time
from oslo_log import log
from tobiko.shell import ssh
from tobiko.shell import sh
from tobiko.tests import base
from tobiko.tests.scenario.octavia.exceptions import RequestException
LOG = log.getLogger(__name__)
CURL_OPTIONS = "-f --connect-timeout 2 -g"
class Validators(base.TobikoTest):
def request(self, client_stack, server_ip_address, protocol, server_port):
"""Perform a request on a server.
Returns the response in case of success, throws an RequestException
otherwise.
"""
if ':' in server_ip_address:
# Add square brackets around IPv6 address to please curl
server_ip_address = "[{}]".format(server_ip_address)
cmd = "curl {} {}://{}:{}/id".format(
CURL_OPTIONS, protocol.lower(), server_ip_address, server_port)
ssh_client = ssh.ssh_client(
client_stack.floating_ip_address,
username=client_stack.image_fixture.username)
ret = sh.ssh_execute(ssh_client, cmd)
if ret.exit_status != 0:
raise RequestException(command=cmd,
error=ret.stderr)
return ret.stdout
def check_members_balanced(self, listener_stack, client_stack,
members_count,
loadbalancer_vip, loadbalancer_protocol,
loadbalancer_port):
"""Check if traffic is properly balanced between members."""
replies = {}
for _ in range(members_count * 10):
content = self.request(
client_stack, loadbalancer_vip,
loadbalancer_protocol, loadbalancer_port)
if content not in replies:
replies[content] = 0
replies[content] += 1
# wait one second (required when using cirros' nc fake webserver)
time.sleep(1)
LOG.debug("Replies from load balancer: {}".format(replies))
# assert that 'members_count' servers replied
self.assertEqual(members_count, len(replies),
'The number of detected active members:{} is not '
'as expected:{}'.format(len(replies), members_count))
if listener_stack.lb_algorithm == 'ROUND_ROBIN':
# assert that requests have been fairly dispatched (each server
# received the same number of requests)
self.assertEqual(1, len(set(replies.values())),
'The number of requests served by each member is '
'different and not as expected by used '
'ROUND_ROBIN algorithm.')

View File

@ -0,0 +1,105 @@
# Copyright (c) 2021 Red Hat
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from __future__ import absolute_import
import time
from oslo_log import log
from tobiko import config
from tobiko.openstack import octavia
from tobiko.tests.scenario.octavia import exceptions
LOG = log.getLogger(__name__)
CONF = config.CONF
def wait_resource_operating_status(resource_type, operating_status,
resource_get, *args):
start = time.time()
while time.time() - start < CONF.tobiko.octavia.check_timeout:
res = resource_get(*args)
if res['operating_status'] == operating_status:
return
time.sleep(CONF.tobiko.octavia.check_interval)
raise exceptions.TimeoutException(
reason=("Cannot get operating_status '{}' from {} {} "
"within the timeout period.".format(operating_status,
resource_type, args)))
def wait_lb_operating_status(lb_id, operating_status):
LOG.debug("Wait for loadbalancer {} to have '{}' "
"operating_status".format(lb_id, operating_status))
wait_resource_operating_status("loadbalancer",
operating_status,
octavia.get_loadbalancer,
lb_id)
def wait_for_request_data(client_stack, server_ip_address,
server_protocol, server_port, request_function):
"""Wait until a request on a server succeeds
Throws a TimeoutException after CONF.tobiko.octavia.check_timeout
if the server doesn't reply.
"""
start = time.time()
while time.time() - start < CONF.tobiko.octavia.check_timeout:
try:
ret = request_function(client_stack, server_ip_address,
server_protocol, server_port)
except Exception as e:
LOG.warning("Received exception {} while performing a "
"request".format(e))
else:
return ret
time.sleep(CONF.tobiko.octavia.check_interval)
raise exceptions.TimeoutException(
reason=("Cannot get data from {} on port {} with "
"protocol {} within the timeout period.".format(
server_ip_address, server_port, server_protocol)))
def wait_for_loadbalancer_functional(loadbalancer_stack, client_stack,
loadbalancer_vip, loadbalancer_protocol,
loadbalancer_port, request_function):
"""Wait until the load balancer is functional."""
# Check load balancer status
loadbalancer_id = loadbalancer_stack.loadbalancer_id
wait_lb_operating_status(loadbalancer_id, 'ONLINE')
wait_for_request_data(client_stack, loadbalancer_vip,
loadbalancer_protocol, loadbalancer_port,
request_function)
def wait_for_member_functional(client_stack, listener_stack, member_stack,
request_function):
"""Wait until a member server is functional."""
member_ip = member_stack.server_stack.floating_ip_address
member_port = member_stack.application_port
member_protocol = listener_stack.pool_protocol
wait_for_request_data(client_stack, member_ip, member_protocol,
member_port, request_function)