neutron-lbaas/neutron_lbaas/tests/tempest/v2/api/test_pools_admin.py

169 lines
6.7 KiB
Python

# Copyright 2015 Hewlett-Packard Development Company, L.P.
# Copyright 2016 Rackspace Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from tempest.common import utils
from tempest.lib.common.utils import data_utils
from tempest.lib import decorators
from tempest.lib import exceptions as ex
from neutron_lbaas.tests.tempest.v2.api import base
PROTOCOL_PORT = 80
class TestPools(base.BaseAdminTestCase):
"""
Tests the following operations in the Neutron-LBaaS API using the
REST client for Pools:
list pools
create pool
get pool
update pool
delete pool
"""
@classmethod
def resource_setup(cls):
super(TestPools, cls).resource_setup()
if not utils.is_extension_enabled('lbaasv2', 'network'):
msg = "lbaas extension not enabled."
raise cls.skipException(msg)
network_name = data_utils.rand_name('network-')
cls.network = cls.create_network(network_name)
cls.subnet = cls.create_subnet(cls.network)
cls.load_balancer = cls._create_load_balancer(
tenant_id=cls.subnet.get('tenant_id'),
vip_subnet_id=cls.subnet.get('id'))
def increment_protocol_port(self):
global PROTOCOL_PORT
PROTOCOL_PORT += 1
def _prepare_and_create_pool(self, protocol=None, lb_algorithm=None,
listener_id=None, cleanup=True, **kwargs):
self.increment_protocol_port()
if not protocol:
protocol = self.pool_protocol
if not lb_algorithm:
lb_algorithm = 'ROUND_ROBIN'
if not listener_id:
listener = self._create_listener(
loadbalancer_id=self.load_balancer.get('id'),
protocol=self.listener_protocol,
protocol_port=PROTOCOL_PORT, **kwargs)
listener_id = listener.get('id')
response = self._create_pool(protocol=protocol,
lb_algorithm=lb_algorithm,
listener_id=listener_id,
**kwargs)
if cleanup:
self.addCleanup(self._delete_pool, response['id'])
return response
@decorators.skip_because(bug="1468457")
@decorators.attr(type='negative')
def test_create_pool_using_empty_tenant_field(self):
"""Test create pool with empty tenant field should fail"""
self.assertRaises(ex.BadRequest, self._create_pool,
protocol=self.pool_protocol,
tenant_id="",
lb_algorithm='ROUND_ROBIN')
@decorators.skip_because(bug="1468457")
def test_create_pool_missing_tenant_id_for_other_tenant(self):
"""
Test create pool with a missing tenant id field. Verify
tenant_id does not match when creating pool vs.
pool (admin client)
"""
new_pool = self._prepare_and_create_pool()
pool = self.pools_client.get_pool(new_pool.get('id'))
pool_tenant = pool['tenant_id']
self.assertNotEqual(pool_tenant, self.subnet['tenant_id'])
@decorators.skip_because(bug="1468457")
def test_create_pool_missing_tenant_id_for_admin(self):
"""
Test create pool with a missing tenant id field. Verify
tenant_id matches when creating pool vs. pool (admin client)
"""
new_pool = self._prepare_and_create_pool()
pool = self.pools_client.get_pool(new_pool.get('id'))
pool_tenant = pool['tenant_id']
self.assertEqual(pool_tenant, pool.get('tenant_id'))
@decorators.skip_because(bug="1468457")
@decorators.attr(type='smoke')
def test_create_pool_for_another_tenant(self):
"""Test create pool for other tenant field"""
tenant = 'deffb4d7c0584e89a8ec99551565713c'
new_pool = self._prepare_and_create_pool(
tenant_id=tenant)
pool = self.pools_client.get_pool(new_pool.get('id'))
pool_tenant = pool.get('tenant_id')
self.assertEqual(pool_tenant, tenant)
@decorators.attr(type='smoke')
def test_update_pool_sesssion_persistence_app_cookie(self):
"""Test update admin pool's session persistence"""
new_pool = self._prepare_and_create_pool()
session_persistence = {"type": "APP_COOKIE",
"cookie_name": "my_cookie"}
self._update_pool(new_pool.get('id'),
session_persistence=session_persistence)
pool = self.pools_client.get_pool(new_pool.get('id'))
result_sp = pool.get('session_persistence')
for key in session_persistence:
self.assertEqual(session_persistence[key], result_sp[key])
def test_update_pool_sesssion_persistence_app_to_http(self):
"""
Test update admin pool's session persistence type from
app cookie to http cookie
"""
new_pool = self._prepare_and_create_pool()
session_persistence = {"type": "APP_COOKIE",
"cookie_name": "my_cookie"}
self._update_pool(new_pool.get('id'),
session_persistence=session_persistence)
pool = self.pools_client.get_pool(new_pool.get('id'))
result_sp = pool.get('session_persistence')
for key in session_persistence:
self.assertEqual(session_persistence[key], result_sp[key])
self._update_pool(new_pool.get('id'),
session_persistence={"type": "HTTP_COOKIE"})
pool = self.pools_client.get_pool(new_pool.get('id'))
session_persistence = {"type": "HTTP_COOKIE",
"cookie_name": None}
result_sp = pool.get('session_persistence')
for key in session_persistence:
self.assertEqual(session_persistence[key], result_sp[key])
@decorators.attr(type='smoke')
def test_delete_pool(self):
"""Test delete admin pool"""
new_pool = self._prepare_and_create_pool(cleanup=False)
pool = self.pools_client.get_pool(new_pool.get('id'))
self._test_provisioning_status_if_exists(new_pool, pool)
self.assertEqual(new_pool, pool)
self._delete_pool(new_pool.get('id'))
self.assertRaises(ex.NotFound, self.pools_client.get_pool,
new_pool.get('id'))