Browse Source

Scatter Octavia's test logic to tobiko framework

So far every test was forced to have its own logic by duplicating
Octavia's scenario test logic (test_traffic.py).

This patch scatters the Octavia's test_traffic logic into tobiko
framework.

It creates several new files in tobiko/tests/scenario/octavia:
        expections.py - Octavia's scenarios exceptions.
        octavia_base.py - Octavia's BaseTests.
        validators.py - Octavia's traffic validators.
        waiters.py - Octavia's waiters.

No additional Octavia logic was added.

Change-Id: I7d6d6d7c54bae0ba02f08a187b8b16a4757e46c8
changes/39/774639/2
oschwart 6 months ago
parent
commit
5c11625189
  1. 3
      tobiko/openstack/stacks/__init__.py
  2. 11
      tobiko/openstack/stacks/_octavia.py
  3. 26
      tobiko/tests/scenario/octavia/exceptions.py
  4. 21
      tobiko/tests/scenario/octavia/octavia_base.py
  5. 188
      tobiko/tests/scenario/octavia/test_traffic.py
  6. 89
      tobiko/tests/scenario/octavia/validators.py
  7. 105
      tobiko/tests/scenario/octavia/waiters.py

3
tobiko/openstack/stacks/__init__.py

@ -82,3 +82,6 @@ OctaviaListenerStackFixture = _octavia.OctaviaListenerStackFixture
OctaviaMemberServerStackFixture = _octavia.OctaviaMemberServerStackFixture
OctaviaServerStackFixture = _octavia.OctaviaServerStackFixture
OctaviaClientServerStackFixture = _octavia.OctaviaClientServerStackFixture
OctaviaOtherServerStackFixture = _octavia.OctaviaOtherServerStackFixture
OctaviaOtherMemberServerStackFixture = (
_octavia.OctaviaOtherMemberServerStackFixture)

11
tobiko/openstack/stacks/_octavia.py

@ -150,3 +150,14 @@ class OctaviaMemberServerStackFixture(heat.HeatStackFixture):
class OctaviaClientServerStackFixture(_cirros.CirrosServerStackFixture):
network_stack = tobiko.required_setup_fixture(
OctaviaVipNetworkStackFixture)
class OctaviaOtherServerStackFixture(
OctaviaServerStackFixture):
pass
class OctaviaOtherMemberServerStackFixture(
OctaviaMemberServerStackFixture):
server_stack = tobiko.required_setup_fixture(
OctaviaOtherServerStackFixture)

26
tobiko/tests/scenario/octavia/exceptions.py

@ -0,0 +1,26 @@
# Copyright (c) 2021 Red Hat
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from __future__ import absolute_import
import tobiko
class RequestException(tobiko.TobikoException):
message = ("Error while sending request to server "
"(command was '{command}'): {error}")
class TimeoutException(tobiko.TobikoException):
message = "Timeout exception: {reason}"

21
tobiko/tests/scenario/octavia/octavia_base.py

@ -0,0 +1,21 @@
# Copyright (c) 2021 Red Hat
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from __future__ import absolute_import
from tobiko.tests.scenario.octavia import validators
class OctaviaTest(validators.Validators):
pass

188
tobiko/tests/scenario/octavia/test_traffic.py

@ -14,48 +14,14 @@
# under the License.
from __future__ import absolute_import
import time
from oslo_log import log
import tobiko
from tobiko import config
from tobiko.openstack import keystone
from tobiko.openstack import octavia
from tobiko.openstack import stacks
from tobiko.shell import ssh
from tobiko.shell import sh
from tobiko.tests import base
LOG = log.getLogger(__name__)
CONF = config.CONF
CURL_OPTIONS = "-f --connect-timeout 2 -g"
class OctaviaOtherServerStackFixture(
stacks.OctaviaServerStackFixture):
pass
class OctaviaOtherMemberServerStackFixture(
stacks.OctaviaMemberServerStackFixture):
server_stack = tobiko.required_setup_fixture(
OctaviaOtherServerStackFixture)
class RequestException(tobiko.TobikoException):
message = ("Error while sending request to server "
"(command was '{command}'): {error}")
class TimeoutException(tobiko.TobikoException):
message = "Timeout exception: {reason}"
from tobiko.tests.scenario.octavia import waiters, octavia_base
@keystone.skip_if_missing_service(name='octavia')
class OctaviaBasicTrafficScenarioTest(base.TobikoTest):
class OctaviaBasicTrafficScenarioTest(octavia_base.OctaviaTest):
"""Octavia traffic scenario test.
Create a load balancer with 2 members that run a server application,
@ -72,7 +38,7 @@ class OctaviaBasicTrafficScenarioTest(base.TobikoTest):
stacks.OctaviaMemberServerStackFixture)
member2_stack = tobiko.required_setup_fixture(
OctaviaOtherMemberServerStackFixture)
stacks.OctaviaOtherMemberServerStackFixture)
client_stack = tobiko.required_setup_fixture(
stacks.OctaviaClientServerStackFixture)
@ -88,139 +54,29 @@ class OctaviaBasicTrafficScenarioTest(base.TobikoTest):
self.loadbalancer_protocol = self.listener_stack.lb_protocol
# Wait for members
self._check_member(self.member1_stack)
self._check_member(self.member2_stack)
waiters.wait_for_member_functional(self.client_stack,
self.listener_stack,
self.member1_stack, self.request)
waiters.wait_for_member_functional(self.client_stack,
self.listener_stack,
self.member2_stack, self.request)
# Check if load balancer is functional
self._check_loadbalancer()
def _request(self, client_stack, server_ip_address, protocol, server_port):
"""Perform a request on a server.
Returns the response in case of success, throws an RequestException
otherwise.
"""
if ':' in server_ip_address:
# Add square brackets around IPv6 address to please curl
server_ip_address = "[{}]".format(server_ip_address)
cmd = "curl {} {}://{}:{}/id".format(
CURL_OPTIONS, protocol.lower(), server_ip_address, server_port)
ssh_client = ssh.ssh_client(
client_stack.floating_ip_address,
username=client_stack.image_fixture.username)
ret = sh.ssh_execute(ssh_client, cmd)
if ret.exit_status != 0:
raise RequestException(command=cmd,
error=ret.stderr)
return ret.stdout
def _wait_resource_operating_status(self, resource_type, operating_status,
resource_get, *args):
start = time.time()
while time.time() - start < CONF.tobiko.octavia.check_timeout:
res = resource_get(*args)
if res['operating_status'] == operating_status:
return
time.sleep(CONF.tobiko.octavia.check_interval)
waiters.wait_for_loadbalancer_functional(self.loadbalancer_stack,
self.client_stack,
self.loadbalancer_vip,
self.loadbalancer_protocol,
self.loadbalancer_port,
self.request)
raise TimeoutException(
reason=("Cannot get operating_status '{}' from {} {} "
"within the timeout period.".format(operating_status,
resource_type, args)))
@property
def loadbalancer(self):
return self.loadbalancer_stack
def _wait_lb_operating_status(self, lb_id, operating_status):
LOG.debug("Wait for loadbalancer {} to have '{}' "
"operating_status".format(lb_id, operating_status))
self._wait_resource_operating_status("loadbalancer",
operating_status,
octavia.get_loadbalancer,
lb_id)
def _wait_for_request_data(self, client_stack, server_ip_address,
server_protocol, server_port):
"""Wait until a request on a server succeeds
Throws a TimeoutException after CONF.tobiko.octavia.check_timeout
if the server doesn't reply.
"""
start = time.time()
while time.time() - start < CONF.tobiko.octavia.check_timeout:
try:
ret = self._request(client_stack, server_ip_address,
server_protocol, server_port)
except Exception as e:
LOG.warning("Received exception {} while performing a "
"request".format(e))
else:
return ret
time.sleep(CONF.tobiko.octavia.check_interval)
raise TimeoutException(
reason=("Cannot get data from {} on port {} with "
"protocol {} within the timeout period.".format(
server_ip_address, server_port,
server_protocol)))
def _check_loadbalancer(self):
"""Wait until the load balancer is functional."""
# Check load balancer status
loadbalancer_id = self.loadbalancer_stack.loadbalancer_id
self._wait_lb_operating_status(loadbalancer_id, 'ONLINE')
self._wait_for_request_data(self.client_stack,
def test_traffic(self):
self.check_members_balanced(self.listener_stack,
self.client_stack,
self.members_count,
self.loadbalancer_vip,
self.loadbalancer_protocol,
self.loadbalancer_port)
def _check_member(self, member_stack):
"""Wait until a member server is functional."""
member_ip = member_stack.server_stack.floating_ip_address
member_port = member_stack.application_port
member_protocol = self.listener_stack.pool_protocol
self._wait_for_request_data(self.client_stack, member_ip,
member_protocol, member_port)
def _check_members_balanced(self):
"""Check if traffic is properly balanced between members."""
replies = {}
for _ in range(self.members_count*10):
content = self._request(
self.client_stack, self.loadbalancer_vip,
self.loadbalancer_protocol, self.loadbalancer_port)
if content not in replies:
replies[content] = 0
replies[content] += 1
# wait one second (required when using cirros' nc fake webserver)
time.sleep(1)
LOG.debug("Replies from load balancer: {}".format(
replies))
# assert that 'members_count' servers replied
self.assertEqual(
self.members_count, len(replies),
'The number of detected active members:{} is not '
'as expected:{}'.format(len(replies), self.members_count))
if self.listener_stack.lb_algorithm == 'ROUND_ROBIN':
# assert that requests have been fairly dispatched (each server
# received the same number of requests)
self.assertEqual(
1, len(set(replies.values())),
'The number of requests served by each member is '
'different and not as expected by used ROUND_ROBIN algorithm.')
def test_traffic(self):
self._check_members_balanced()

89
tobiko/tests/scenario/octavia/validators.py

@ -0,0 +1,89 @@
# Copyright (c) 2021 Red Hat
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from __future__ import absolute_import
import time
from oslo_log import log
from tobiko.shell import ssh
from tobiko.shell import sh
from tobiko.tests import base
from tobiko.tests.scenario.octavia.exceptions import RequestException
LOG = log.getLogger(__name__)
CURL_OPTIONS = "-f --connect-timeout 2 -g"
class Validators(base.TobikoTest):
def request(self, client_stack, server_ip_address, protocol, server_port):
"""Perform a request on a server.
Returns the response in case of success, throws an RequestException
otherwise.
"""
if ':' in server_ip_address:
# Add square brackets around IPv6 address to please curl
server_ip_address = "[{}]".format(server_ip_address)
cmd = "curl {} {}://{}:{}/id".format(
CURL_OPTIONS, protocol.lower(), server_ip_address, server_port)
ssh_client = ssh.ssh_client(
client_stack.floating_ip_address,
username=client_stack.image_fixture.username)
ret = sh.ssh_execute(ssh_client, cmd)
if ret.exit_status != 0:
raise RequestException(command=cmd,
error=ret.stderr)
return ret.stdout
def check_members_balanced(self, listener_stack, client_stack,
members_count,
loadbalancer_vip, loadbalancer_protocol,
loadbalancer_port):
"""Check if traffic is properly balanced between members."""
replies = {}
for _ in range(members_count * 10):
content = self.request(
client_stack, loadbalancer_vip,
loadbalancer_protocol, loadbalancer_port)
if content not in replies:
replies[content] = 0
replies[content] += 1
# wait one second (required when using cirros' nc fake webserver)
time.sleep(1)
LOG.debug("Replies from load balancer: {}".format(replies))
# assert that 'members_count' servers replied
self.assertEqual(members_count, len(replies),
'The number of detected active members:{} is not '
'as expected:{}'.format(len(replies), members_count))
if listener_stack.lb_algorithm == 'ROUND_ROBIN':
# assert that requests have been fairly dispatched (each server
# received the same number of requests)
self.assertEqual(1, len(set(replies.values())),
'The number of requests served by each member is '
'different and not as expected by used '
'ROUND_ROBIN algorithm.')

105
tobiko/tests/scenario/octavia/waiters.py

@ -0,0 +1,105 @@
# Copyright (c) 2021 Red Hat
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from __future__ import absolute_import
import time
from oslo_log import log
from tobiko import config
from tobiko.openstack import octavia
from tobiko.tests.scenario.octavia import exceptions
LOG = log.getLogger(__name__)
CONF = config.CONF
def wait_resource_operating_status(resource_type, operating_status,
resource_get, *args):
start = time.time()
while time.time() - start < CONF.tobiko.octavia.check_timeout:
res = resource_get(*args)
if res['operating_status'] == operating_status:
return
time.sleep(CONF.tobiko.octavia.check_interval)
raise exceptions.TimeoutException(
reason=("Cannot get operating_status '{}' from {} {} "
"within the timeout period.".format(operating_status,
resource_type, args)))
def wait_lb_operating_status(lb_id, operating_status):
LOG.debug("Wait for loadbalancer {} to have '{}' "
"operating_status".format(lb_id, operating_status))
wait_resource_operating_status("loadbalancer",
operating_status,
octavia.get_loadbalancer,
lb_id)
def wait_for_request_data(client_stack, server_ip_address,
server_protocol, server_port, request_function):
"""Wait until a request on a server succeeds
Throws a TimeoutException after CONF.tobiko.octavia.check_timeout
if the server doesn't reply.
"""
start = time.time()
while time.time() - start < CONF.tobiko.octavia.check_timeout:
try:
ret = request_function(client_stack, server_ip_address,
server_protocol, server_port)
except Exception as e:
LOG.warning("Received exception {} while performing a "
"request".format(e))
else:
return ret
time.sleep(CONF.tobiko.octavia.check_interval)
raise exceptions.TimeoutException(
reason=("Cannot get data from {} on port {} with "
"protocol {} within the timeout period.".format(
server_ip_address, server_port, server_protocol)))
def wait_for_loadbalancer_functional(loadbalancer_stack, client_stack,
loadbalancer_vip, loadbalancer_protocol,
loadbalancer_port, request_function):
"""Wait until the load balancer is functional."""
# Check load balancer status
loadbalancer_id = loadbalancer_stack.loadbalancer_id
wait_lb_operating_status(loadbalancer_id, 'ONLINE')
wait_for_request_data(client_stack, loadbalancer_vip,
loadbalancer_protocol, loadbalancer_port,
request_function)
def wait_for_member_functional(client_stack, listener_stack, member_stack,
request_function):
"""Wait until a member server is functional."""
member_ip = member_stack.server_stack.floating_ip_address
member_port = member_stack.application_port
member_protocol = listener_stack.pool_protocol
wait_for_request_data(client_stack, member_ip, member_protocol,
member_port, request_function)
Loading…
Cancel
Save