Merge "Remove DDT tests from tempest plugin" into stable/queens

This commit is contained in:
Zuul 2019-06-26 04:01:05 +00:00 committed by Gerrit Code Review
commit 39800b427f
5 changed files with 0 additions and 672 deletions

View File

@ -1,228 +0,0 @@
# Copyright 2015 Hewlett-Packard Development Company, L.P.
# Copyright 2016 Rackspace Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import os
from tempest import config
from tempest.lib.common.utils import data_utils
from tempest import test
import testscenarios
from neutron_lbaas.tests.tempest.v2.api import base
CONF = config.CONF
# Use local tempest conf if one is available.
# This usually means we're running tests outside of devstack
if os.path.exists('./tests/tempest/etc/dev_tempest.conf'):
CONF.set_config_path('./tests/tempest/etc/dev_tempest.conf')
class AdminStateTests(testscenarios.TestWithScenarios,
base.BaseTestCase):
"""
Scenario Tests(admin_state_up tests):
This class supplies the resource set up methods and the check
operating status methods for the admin_sate_up tests.
"""
@classmethod
def resource_setup(cls):
super(AdminStateTests, cls).resource_setup()
if not test.is_extension_enabled("lbaasv2", "network"):
msg = "lbaas extension not enabled."
raise cls.skipException(msg)
network_name = data_utils.rand_name('network-')
cls.network = cls.create_network(network_name)
cls.subnet = cls.create_subnet(cls.network)
cls.tenant_id = cls.subnet.get('tenant_id')
cls.subnet_id = cls.subnet.get('id')
cls.protocol = 'HTTP'
cls.port = 8081
cls.lb_algorithm = 'ROUND_ROBIN'
cls.address = '127.0.0.1'
@classmethod
def resource_setup_load_balancer(cls, admin_state_up_flag):
cls.create_lb_kwargs = {'tenant_id': cls.tenant_id,
'vip_subnet_id': cls.subnet_id,
'admin_state_up': admin_state_up_flag}
cls.load_balancer = cls._create_active_load_balancer(
**cls.create_lb_kwargs)
cls.load_balancer_id = cls.load_balancer['id']
@classmethod
def resource_setup_listener(cls, admin_state_up_flag):
cls.create_listener_kwargs = {'loadbalancer_id': cls.load_balancer_id,
'protocol': cls.protocol,
'protocol_port': cls.port,
'admin_state_up': admin_state_up_flag
}
cls.listener = cls._create_listener(
**cls.create_listener_kwargs)
cls.listener_id = cls.listener['id']
@classmethod
def resource_setup_pool(cls, admin_state_up_flag):
cls.create_pool_kwargs = {'protocol': cls.protocol,
'lb_algorithm': cls.lb_algorithm,
'listener_id': cls.listener_id,
'admin_state_up': admin_state_up_flag
}
cls.pool = cls._create_pool(
**cls.create_pool_kwargs)
cls.pool_id = cls.pool['id']
@classmethod
def resource_setup_member(cls, admin_state_up_flag):
cls.create_member_kwargs = {'address': cls.address,
'protocol_port': cls.port,
'subnet_id': cls.subnet_id,
'admin_state_up': admin_state_up_flag}
cls.member = cls._create_member(
cls.pool_id, **cls.create_member_kwargs)
cls.member_id = cls.member['id']
@classmethod
def resource_set_health_monitor(cls, admin_state_up_flag, creator):
cls.create_hm_kwargs = {'type': cls.protocol,
'delay': 3,
'max_retries': 10,
'timeout': 5,
'pool_id': cls.pool_id,
'admin_state_up': admin_state_up_flag}
cls.health_monitor = creator(**cls.create_hm_kwargs)
cls.health_monitor_id = cls.health_monitor['id']
@classmethod
def resource_cleanup(cls):
super(AdminStateTests, cls).resource_cleanup()
def check_lb_operating_status(self,
load_balancer,
listeners=None,
pools=None,
members=None):
if bool(load_balancer) and self.load_balancer.get('admin_state_up'):
self.assertEqual(
load_balancer.get('operating_status'), 'ONLINE')
return True
elif bool(load_balancer):
self.assertEqual(
load_balancer.get('operating_status'), 'DISABLED')
if bool(listeners):
self.assertEqual(listeners[0].
get('operating_status'), 'DISABLED')
if bool(pools):
self.assertEqual(pools[0].
get('operating_status'), 'DISABLED')
if bool(members):
self.assertEqual(members[0].
get('operating_status'), 'DISABLED')
return False
def check_listener_operating_status(self,
listeners,
pools=None,
members=None):
if bool(listeners) and self.listener.get('admin_state_up'):
self.assertEqual(listeners[0].
get('operating_status'), 'ONLINE')
return True
elif bool(listeners):
self.assertEqual(listeners[0].
get('operating_status'), 'DISABLED')
if bool(pools):
self.assertEqual(pools[0].
get('operating_status'), 'DISABLED')
if bool(members):
self.assertEqual(members[0].
get('operating_status'), 'DISABLED')
return False
def check_pool_operating_status(self,
pools,
members=None):
if bool(pools) and self.pool.get('admin_state_up'):
self.assertEqual(pools[0].
get('operating_status'), 'ONLINE')
return True
elif bool(pools):
self.assertEqual(pools[0].
get('operating_status'), 'DISABLED')
if bool(members):
self.assertEqual(members[0].
get('operating_status'), 'DISABLED')
return False
def check_member_operating_status(self, members):
if bool(members) and self.member.get('admin_state_up'):
self.assertEqual(members[0].
get('operating_status'), 'ONLINE')
return True
elif bool(members):
self.assertEqual(members[0].
get('operating_status'), 'DISABLED')
return False
def check_health_monitor_provisioning_status(self, health_monitor):
if bool(health_monitor) and self.health_monitor.get('admin_state_up'):
self.assertEqual(health_monitor.get('provisioning_status'),
'ACTIVE')
return True
elif bool(health_monitor):
self.assertEqual(health_monitor.get('provisioning_status'),
'DISABLED')
return False
def check_operating_status(self):
statuses = (self.load_balancers_client.
get_load_balancer_status_tree
(self.load_balancer_id))
load_balancer = statuses['loadbalancer']
listeners = load_balancer['listeners']
pools = None
members = None
health_monitor = None
if bool(listeners):
pools = listeners[0]['pools']
if bool(pools):
members = pools[0]['members']
health_monitor = pools[0]['healthmonitor']
if self.check_lb_operating_status(load_balancer,
listeners,
pools,
members):
if self.check_listener_operating_status(listeners,
pools,
members):
if self.check_pool_operating_status(pools,
members):
self.check_member_operating_status(members)
self.check_health_monitor_provisioning_status(
health_monitor)

View File

@ -1,145 +0,0 @@
# Copyright 2015 Hewlett-Packard Development Company, L.P.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from tempest import config
from tempest.lib import decorators
import testscenarios
from neutron_lbaas.tests.tempest.v2.ddt import base_ddt
CONF = config.CONF
scenario_lb_T = ('lb_T', {'lb_flag': True})
scenario_lb_F = ('lb_F', {'lb_flag': False})
scenario_listener_T = ('listener_T', {'listener_flag': True})
scenario_listener_F = ('listener_F', {'listener_flag': False})
scenario_pool_T = ('pool_T', {'pool_flag': True})
scenario_pool_F = ('pool_F', {'pool_flag': False})
scenario_healthmonitor_T = ('healthmonitor_T', {'healthmonitor_flag': True})
scenario_healthmonitor_F = ('healthmonitor_F', {'healthmonitor_flag': False})
scenario_healthmonitor_to_flag_T = ('healthmonitor_to_flag_T', {
'healthmonitor_to_flag': True})
scenario_healthmonitor_to_flag_F = ('healthmonitor_to_flag_F', {
'healthmonitor_to_flag': False})
# The following command creates 16 unique scenarios
scenario_create_health_monitor = testscenarios.multiply_scenarios(
[scenario_lb_T, scenario_lb_F],
[scenario_listener_T, scenario_listener_F],
[scenario_pool_T, scenario_pool_F],
[scenario_healthmonitor_T, scenario_healthmonitor_F])
# The following command creates 32 unique scenarios
scenario_update_health_monitor = testscenarios.multiply_scenarios(
[scenario_healthmonitor_to_flag_T, scenario_healthmonitor_to_flag_F],
scenario_create_health_monitor)
class BaseHealthMonitorAdminStateTest(base_ddt.AdminStateTests):
@classmethod
def resource_setup(cls):
super(BaseHealthMonitorAdminStateTest, cls).resource_setup()
@classmethod
def resource_cleanup(cls):
super(BaseHealthMonitorAdminStateTest, cls).resource_cleanup()
def setUp(self):
"""Set up resources.
Including :load balancer, listener, and pool and
health_monitor with scenarios.
"""
super(BaseHealthMonitorAdminStateTest, self).setUp()
self.resource_setup_load_balancer(self.lb_flag)
self.addCleanup(self._delete_load_balancer, self.load_balancer_id)
self.resource_setup_listener(self.listener_flag)
self.addCleanup(self._delete_listener, self.listener_id)
self.resource_setup_pool(self.pool_flag)
self.addCleanup(self._delete_pool, self.pool_id)
self.resource_set_health_monitor(self.healthmonitor_flag,
self._create_health_monitor)
self.addCleanup(self._delete_health_monitor, self.health_monitor_id)
@classmethod
def resource_setup_listener(cls, admin_state_up_flag):
"""Set up resources for listener."""
(super(BaseHealthMonitorAdminStateTest, cls).
resource_setup_listener(admin_state_up_flag))
@classmethod
def resource_setup_pool(cls, admin_state_up_flag):
"""Set up resources for pool."""
(super(BaseHealthMonitorAdminStateTest, cls).
resource_setup_pool(admin_state_up_flag))
@classmethod
def resource_setup_load_balancer(cls, admin_state_up_flag):
"""Set up resources for load balancer."""
(super(BaseHealthMonitorAdminStateTest, cls).
resource_setup_load_balancer(admin_state_up_flag))
class CreateHealthMonitorAdminStateTest(BaseHealthMonitorAdminStateTest):
scenarios = scenario_create_health_monitor
"""
Tests the following operations in the Neutron-LBaaS API using the
REST client for health monitor with testscenarios, the goal is to test
the various admin_state_up boolean combinations and their expected
operating_status and provision_status results from the status tree.
create healthmonitor
"""
# @decorators.skip_because(bug="1449775")
def test_create_health_monitor_with_scenarios(self):
"""Test creating healthmonitor with 16 scenarios.
Compare the status tree before and after setting up admin_state_up flag
for health monitor.
"""
self.check_operating_status()
class UpdateHealthMonitorAdminStateTest(BaseHealthMonitorAdminStateTest):
scenarios = scenario_update_health_monitor
"""
Tests the following operations in the Neutron-LBaaS API using the
REST client for health monitor with testscenarios, the goal is to test
the various admin_state_up boolean combinations and their expected
operating_status and provision_status results from the status tree.
update healthmonitor
"""
@decorators.skip_because(bug="1449775")
def test_update_health_monitor_with_admin_state_up(self):
"""Test update a monitor.
Compare the status tree before and after setting the admin_state_up
flag for health_monitor.
"""
self.create_health_monitor_kwargs = {
'admin_state_up': self.healthmonitor_to_flag}
self.health_monitor = self._update_health_monitor(
self.health_monitor_id, **self.create_health_monitor_kwargs)
self.check_operating_status()

View File

@ -1,132 +0,0 @@
# Copyright 2015 Hewlett-Packard Development Company, L.P.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from tempest import config
import testscenarios
from neutron_lbaas.tests.tempest.v2.ddt import base_ddt
CONF = config.CONF
"""
Tests the following operations in the Neutron-LBaaS API using the
REST client for Listeners:
|-----|------------------|------------------|-------------------------|
|S.No |Action |LB admin_state_up | Listener admin_state_up |
|-----|------------------|------------------|-------------------------|
| 1 | Create Listener | True | True |
| 2 | | True | False |
| 3 | | False | True |
| 4 | | False | False |
| 5 | Update Listener | True | True --> True |
| 6 | | True | True --> False |
| 7 | | True | False --> True |
| 8 | | True | False --> False |
| 9 | | False | True --> True |
| 10 | | False | True --> False |
| 11 | | False | False --> True |
| 12 | | False | False --> False |
|-----|------------------|------------------|-------------------------|
"""
# set up the scenarios
scenario_lb_T = ('lb_T', {'lb_flag': True})
scenario_lb_F = ('lb_F', {'lb_flag': False})
scenario_listener_T = ('listener_T', {'listener_flag': True})
scenario_listener_F = ('listener_F', {'listener_flag': False})
scenario_lis_to_flag_T = ('listener_to_flag_T', {'listener_to_flag': True})
scenario_lis_to_flag_F = ('listener_to_flag_F', {'listener_to_flag': False})
# The following command creates 4 unique scenarios
scenario_create_member = testscenarios.multiply_scenarios(
[scenario_lb_T, scenario_lb_F],
[scenario_listener_T, scenario_listener_F])
# The following command creates 8 unique scenarios
scenario_update_member = testscenarios.multiply_scenarios(
[scenario_lis_to_flag_T, scenario_lis_to_flag_F],
scenario_create_member)
class CreateListenerAdminStateTests(base_ddt.AdminStateTests):
scenarios = scenario_create_member
@classmethod
def resource_setup(cls):
super(CreateListenerAdminStateTests, cls).resource_setup()
@classmethod
def resource_cleanup(cls):
super(CreateListenerAdminStateTests, cls).resource_cleanup()
@classmethod
def setup_load_balancer(cls, **kwargs):
super(CreateListenerAdminStateTests,
cls).setup_load_balancer(**kwargs)
def test_create_listener_with_lb_and_listener_admin_states_up(self):
"""Test create a listener.
Create a listener with various combinations of
values for admin_state_up field of the listener and
the load-balancer.
"""
self.resource_setup_load_balancer(self.lb_flag)
self.resource_setup_listener(self.listener_flag)
self.check_operating_status()
self._delete_listener(self.listener_id)
self._delete_load_balancer(self.load_balancer_id)
class UpdateListenerAdminStateTests(base_ddt.AdminStateTests):
scenarios = scenario_update_member
@classmethod
def resource_setup(cls):
super(UpdateListenerAdminStateTests, cls).resource_setup()
@classmethod
def resource_cleanup(cls):
super(UpdateListenerAdminStateTests, cls).resource_cleanup()
@classmethod
def setup_load_balancer(cls, **kwargs):
super(UpdateListenerAdminStateTests,
cls).setup_load_balancer(**kwargs)
def test_update_listener_with_listener_admin_state_up(self):
"""Test updating a listener.
Update a listener with various combinations of
admin_state_up field of the listener and the
load-balancer.
"""
self.resource_setup_load_balancer(self.lb_flag)
self.resource_setup_listener(self.listener_flag)
self.check_operating_status()
self.listener = (self._update_listener(
self.listener_id,
name='new_name',
admin_state_up=self.listener_to_flag))
self.check_operating_status()
self._delete_listener(self.listener_id)
self._delete_load_balancer(self.load_balancer_id)

View File

@ -1,167 +0,0 @@
# Copyright 2015 Hewlett-Packard Development Company, L.P.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from tempest import config
import testscenarios
from neutron_lbaas.tests.tempest.v2.ddt import base_ddt
CONF = config.CONF
"""
Tests the following operations in the Neutron-LBaaS API using the
REST client with various combinations of values for the
admin_state_up field of lb, listener, pool and member.
create member
update member
"""
# set up the scenarios
scenario_lb_T = ('lb_T', {'lb_flag': True})
scenario_lb_F = ('lb_F', {'lb_flag': False})
scenario_listener_T = ('listener_T', {'listener_flag': True})
scenario_listener_F = ('listener_F', {'listener_flag': False})
scenario_pool_T = ('pool_T', {'pool_flag': True})
scenario_pool_F = ('pool_F', {'pool_flag': False})
scenario_member_T = ('member_T', {'member_flag': True})
scenario_member_F = ('member_F', {'member_flag': False})
scenario_mem_to_flag_T = ('member_to_flag_T', {'member_to_flag': True})
scenario_mem_to_flag_F = ('member_to_flag_F', {'member_to_flag': False})
# The following command creates 16 unique scenarios
scenario_create_member = testscenarios.multiply_scenarios(
[scenario_lb_T, scenario_lb_F],
[scenario_listener_T, scenario_listener_F],
[scenario_pool_T, scenario_pool_F],
[scenario_member_T, scenario_member_F])
# The following command creates 32 unique scenarios
scenario_update_member = testscenarios.multiply_scenarios(
[scenario_mem_to_flag_T, scenario_mem_to_flag_F],
scenario_create_member)
class CreateMemberAdminStateTests(base_ddt.AdminStateTests):
scenarios = scenario_create_member
@classmethod
def resource_setup(cls):
super(CreateMemberAdminStateTests, cls).resource_setup()
@classmethod
def resource_cleanup(cls):
super(CreateMemberAdminStateTests, cls).resource_cleanup()
def setUp(self):
"""Set up load balancer, listener, pool and member."""
super(CreateMemberAdminStateTests, self).setUp()
self.resource_setup_load_balancer(self.lb_flag)
self.addCleanup(self._delete_load_balancer, self.load_balancer_id)
self.resource_setup_listener(self.listener_flag)
self.addCleanup(self._delete_listener, self.listener_id)
self.resource_setup_pool(self.pool_flag)
self.addCleanup(self._delete_pool, self.pool_id)
self.resource_setup_member(self.member_flag)
self.addCleanup(self._delete_member, self.pool_id, self.member_id)
@classmethod
def resource_setup_load_balancer(cls, admin_state_up_flag):
(super(CreateMemberAdminStateTests, cls).
resource_setup_load_balancer(admin_state_up_flag))
@classmethod
def resource_setup_listener(cls, admin_state_up_flag):
(super(CreateMemberAdminStateTests, cls).
resource_setup_listener(admin_state_up_flag))
@classmethod
def resource_setup_pool(cls, admin_state_up_flag):
(super(CreateMemberAdminStateTests, cls).
resource_setup_pool(admin_state_up_flag))
@classmethod
def resource_setup_member(cls, admin_state_up_flag):
(super(CreateMemberAdminStateTests, cls).
resource_setup_member(admin_state_up_flag))
def test_create_member_with_admin_state_up(self):
"""Test create a member. """
self.check_operating_status()
class UpdateMemberAdminStateTests(base_ddt.AdminStateTests):
scenarios = scenario_update_member
@classmethod
def resource_setup(cls):
super(UpdateMemberAdminStateTests, cls).resource_setup()
@classmethod
def resource_cleanup(cls):
super(UpdateMemberAdminStateTests, cls).resource_cleanup()
def setUp(self):
"""Set up load balancer, listener, pool and member resources."""
super(UpdateMemberAdminStateTests, self).setUp()
self.resource_setup_load_balancer(self.lb_flag)
self.addCleanup(self._delete_load_balancer, self.load_balancer_id)
self.resource_setup_listener(self.listener_flag)
self.addCleanup(self._delete_listener, self.listener_id)
self.resource_setup_pool(self.pool_flag)
self.addCleanup(self._delete_pool, self.pool_id)
self.resource_setup_member(self.member_flag)
self.addCleanup(self._delete_member, self.pool_id, self.member_id)
@classmethod
def resource_setup_load_balancer(cls, admin_state_up_flag):
(super(UpdateMemberAdminStateTests, cls).
resource_setup_load_balancer(admin_state_up_flag))
@classmethod
def resource_setup_listener(cls, admin_state_up_flag):
(super(UpdateMemberAdminStateTests, cls).
resource_setup_listener(admin_state_up_flag))
@classmethod
def resource_setup_pool(cls, admin_state_up_flag):
(super(UpdateMemberAdminStateTests, cls).
resource_setup_pool(admin_state_up_flag))
@classmethod
def resource_setup_member(cls, admin_state_up_flag):
(super(UpdateMemberAdminStateTests, cls).
resource_setup_member(admin_state_up_flag))
def test_update_member_with_admin_state_up(self):
"""Test update a member. """
self.create_member_kwargs = {'admin_state_up': self.member_to_flag}
self.member = self._update_member(self.pool_id,
self.member_id,
**self.create_member_kwargs)
self.check_operating_status()