Add RBAC tests

Create new job magnum-tempest-plugin-tests-api-rbac
Also fix get_credentials calls
Depends-On: https://review.opendev.org/c/openstack/magnum/+/875625

Change-Id: I8f9e184881c2da2a0432e25f5f1f3ddcf178015a
This commit is contained in:
ricolin 2023-03-02 17:48:33 +08:00
parent 89cba9b96c
commit 718190b7da
14 changed files with 832 additions and 480 deletions

View File

@ -6,6 +6,7 @@
check:
jobs:
- magnum-tempest-plugin-tests-api
- magnum-tempest-plugin-tests-api-rbac
- magnum-tempest-plugin-tests-api-2023-1
- magnum-tempest-plugin-tests-api-zed
- magnum-tempest-plugin-tests-api-yoga
@ -38,6 +39,19 @@
nodeset: openstack-single-node-focal
override-checkout: stable/xena
- job:
name: magnum-tempest-plugin-tests-api-rbac
parent: magnum-tempest-plugin-tests-api
vars:
tempest_test_regex: ^magnum_tempest_plugin.tests.api.v1.rbac
devstack_localrc:
MAGNUM_ENFORCE_SCOPE: true
devstack_local_conf:
test-config:
$TEMPEST_CONFIG:
enforce_scope:
magnum: true
- job:
name: magnum-tempest-plugin-tests-api
parent: magnum-tempest-plugin-base

View File

@ -0,0 +1,275 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import fixtures
from oslo_log import log as logging
from oslo_utils import uuidutils
from tempest.lib.common.utils import data_utils
from tempest.lib import decorators
from tempest.lib import exceptions
import testtools
from magnum_tempest_plugin.common import config
from magnum_tempest_plugin.common import datagen
from magnum_tempest_plugin.tests.api import base
HEADERS = {'OpenStack-API-Version': 'container-infra latest',
'Accept': 'application/json',
'Content-Type': 'application/json'}
# Please overwrite this ID map to specific IDs for tests
IDEMPOTENT_IDS = {
"test_delete_cluster_for_nonexisting_cluster": (
"208fabdb-f7b8-4bdd-8762-fdf88be0fbca"),
"test_update_cluster_for_nonexisting_cluster": (
"71c75c03-090c-4f33-9aad-f2919a3b63d2"),
"test_create_cluster_with_nonexisting_flavor": (
"5e4e68d7-6b0a-4548-a462-2f5c83004509"),
"test_create_cluster_with_zero_masters": (
"395288a7-dc5d-474b-ba2b-09158553d549"),
"test_create_cluster_with_zero_nodes": (
"6201ac61-fd28-43a9-824a-2d452b2fe5c5"),
"test_create_cluster_for_nonexisting_cluster_template": (
"1c81a8b2-de86-4494-a8a7-b194f4c1a1f9"),
"test_create_list_sign_delete_clusters": (
"82bdca24-d732-41d1-bfc0-983ec8229ca0"),
}
class ClusterTestTemplate(base.BaseTempestTest):
"""Tests template for cluster CRUD."""
LOG = logging.getLogger(__name__)
delete_template = False
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.clusters = []
@classmethod
def _setup_cluster_template(cls):
if config.Config.cluster_template_id:
_, cls.cluster_template = \
cls.ct_reader_client.get_cluster_template(
config.Config.cluster_template_id)
else:
model = datagen.valid_cluster_template()
_, cls.cluster_template = cls._create_cluster_template(model)
cls.delete_template = True
def setUp(self):
super().setUp()
# NOTE (dimtruck) by default tempest sets timeout to 20 mins.
# We need more time.
test_timeout = 3600
self.useFixture(fixtures.Timeout(test_timeout, gentle=True))
def tearDown(self):
try:
cluster_list = self.clusters[:]
for cluster_id in cluster_list:
self._delete_cluster(cluster_id)
self.clusters.remove(cluster_id)
finally:
super().tearDown()
@classmethod
def _create_cluster_template(cls, cm_model):
cls.LOG.debug('We will create a clustertemplate for %s', cm_model)
resp, model = cls.ct_member_client.post_cluster_template(
cm_model)
return resp, model
@classmethod
def _delete_cluster_template(cls, cm_id):
cls.LOG.debug('We will delete a clustertemplate for %s', cm_id)
resp, model = cls.ct_member_client.delete_cluster_template(
cm_id)
return resp, model
def _create_cluster(self, cluster_model):
self.LOG.debug('We will create cluster for %s', cluster_model)
resp, model = self.cluster_member_client.post_cluster(cluster_model)
self.LOG.debug('Response: %s', resp)
self.assertEqual(202, resp.status)
self.assertIsNotNone(model.uuid)
self.assertTrue(uuidutils.is_uuid_like(model.uuid))
self.clusters.append(model.uuid)
self.cluster_uuid = model.uuid
if config.Config.copy_logs:
self.addCleanup(self.copy_logs_handler(
lambda: list(
[self._get_cluster_by_id(model.uuid)[1].master_addresses,
self._get_cluster_by_id(model.uuid)[1].node_addresses]),
self.cluster_template.coe,
self.keypair))
timeout = config.Config.cluster_creation_timeout * 60
self.cluster_admin_client.wait_for_created_cluster(
model.uuid, delete_on_error=False, timeout=timeout)
return resp, model
def _delete_cluster(self, cluster_id):
self.LOG.debug('We will delete a cluster for %s', cluster_id)
resp, model = self.cluster_member_client.delete_cluster(cluster_id)
self.assertEqual(204, resp.status)
self.cluster_admin_client.wait_for_cluster_to_delete(cluster_id)
self.assertRaises(exceptions.NotFound,
self.cert_reader_client.get_cert,
cluster_id, headers=HEADERS)
return resp, model
def _get_cluster_by_id(self, cluster_id):
resp, model = self.cluster_reader_client.get_cluster(cluster_id)
return resp, model
# (dimtruck) Combining all these tests in one because
# they time out on the gate (2 hours not enough)
@testtools.testcase.attr('positive')
@testtools.testcase.attr('slow')
@decorators.idempotent_id(
IDEMPOTENT_IDS['test_create_list_sign_delete_clusters'])
def test_create_list_sign_delete_clusters(self):
gen_model = datagen.valid_cluster_data(
cluster_template_id=self.cluster_template.uuid, node_count=1)
# test cluster create
_, cluster_model = self._create_cluster(gen_model)
self.assertNotIn('status', cluster_model)
# test cluster list
resp, cluster_list_model = self.cluster_reader_client.list_clusters()
self.assertEqual(200, resp.status)
self.assertGreater(len(cluster_list_model.clusters), 0)
self.assertIn(
cluster_model.uuid, list([x['uuid']
for x in cluster_list_model.clusters]))
# test ca show
resp, cert_model = self.cert_reader_client.get_cert(
cluster_model.uuid, headers=HEADERS)
self.LOG.debug("cert resp: %s", resp)
self.assertEqual(200, resp.status)
self.assertEqual(cert_model.cluster_uuid, cluster_model.uuid)
self.assertIsNotNone(cert_model.pem)
self.assertIn('-----BEGIN CERTIFICATE-----', cert_model.pem)
self.assertIn('-----END CERTIFICATE-----', cert_model.pem)
# test ca sign
csr_sample = """-----BEGIN CERTIFICATE REQUEST-----
MIIByjCCATMCAQAwgYkxCzAJBgNVBAYTAlVTMRMwEQYDVQQIEwpDYWxpZm9ybmlh
MRYwFAYDVQQHEw1Nb3VudGFpbiBWaWV3MRMwEQYDVQQKEwpHb29nbGUgSW5jMR8w
HQYDVQQLExZJbmZvcm1hdGlvbiBUZWNobm9sb2d5MRcwFQYDVQQDEw53d3cuZ29v
Z2xlLmNvbTCBnzANBgkqhkiG9w0BAQEFAAOBjQAwgYkCgYEApZtYJCHJ4VpVXHfV
IlstQTlO4qC03hjX+ZkPyvdYd1Q4+qbAeTwXmCUKYHThVRd5aXSqlPzyIBwieMZr
WFlRQddZ1IzXAlVRDWwAo60KecqeAXnnUK+5fXoTI/UgWshre8tJ+x/TMHaQKR/J
cIWPhqaQhsJuzZbvAdGA80BLxdMCAwEAAaAAMA0GCSqGSIb3DQEBBQUAA4GBAIhl
4PvFq+e7ipARgI5ZM+GZx6mpCz44DTo0JkwfRDf+BtrsaC0q68eTf2XhYOsq4fkH
Q0uA0aVog3f5iJxCa3Hp5gxbJQ6zV6kJ0TEsuaaOhEko9sdpCoPOnRBm2i/XRD2D
6iNh8f8z0ShGsFqjDgFHyF3o+lUyj+UC6H1QW7bn
-----END CERTIFICATE REQUEST-----
"""
cert_data_model = datagen.cert_data(cluster_model.uuid,
csr_data=csr_sample)
resp, cert_model = self.cert_member_client.post_cert(
cert_data_model, headers=HEADERS)
self.LOG.debug("cert resp: %s", resp)
self.assertEqual(201, resp.status)
self.assertEqual(cert_model.cluster_uuid, cluster_model.uuid)
self.assertIsNotNone(cert_model.pem)
self.assertIn('-----BEGIN CERTIFICATE-----', cert_model.pem)
self.assertIn('-----END CERTIFICATE-----', cert_model.pem)
# test cluster delete
self._delete_cluster(cluster_model.uuid)
self.clusters.remove(cluster_model.uuid)
@testtools.testcase.attr('negative')
@decorators.idempotent_id(
IDEMPOTENT_IDS['test_create_cluster_for_nonexisting_cluster_template'])
def test_create_cluster_for_nonexisting_cluster_template(self):
cm_id = 'this-does-not-exist'
gen_model = datagen.valid_cluster_data(cluster_template_id=cm_id)
self.assertRaises(
exceptions.BadRequest,
self.cluster_member_client.post_cluster, gen_model)
@testtools.testcase.attr('positive')
@testtools.testcase.attr('slow')
@decorators.idempotent_id(
IDEMPOTENT_IDS['test_create_cluster_with_zero_nodes'])
def test_create_cluster_with_zero_nodes(self):
gen_model = datagen.valid_cluster_data(
cluster_template_id=self.cluster_template.uuid, node_count=0)
# test cluster create
_, cluster_model = self._create_cluster(gen_model)
self.assertNotIn('status', cluster_model)
# test cluster delete
self._delete_cluster(cluster_model.uuid)
self.clusters.remove(cluster_model.uuid)
@testtools.testcase.attr('negative')
@decorators.idempotent_id(
IDEMPOTENT_IDS['test_create_cluster_with_zero_masters'])
def test_create_cluster_with_zero_masters(self):
uuid = self.cluster_template.uuid
gen_model = datagen.valid_cluster_data(cluster_template_id=uuid,
master_count=0)
self.assertRaises(
exceptions.BadRequest,
self.cluster_member_client.post_cluster, gen_model)
@testtools.testcase.attr('negative')
@decorators.idempotent_id(
IDEMPOTENT_IDS['test_create_cluster_with_nonexisting_flavor'])
def test_create_cluster_with_nonexisting_flavor(self):
gen_model = \
datagen.cluster_template_data_with_valid_keypair_image_flavor()
resp, cluster_template = self._create_cluster_template(gen_model)
self.assertEqual(201, resp.status)
self.assertIsNotNone(cluster_template.uuid)
uuid = cluster_template.uuid
gen_model = datagen.valid_cluster_data(cluster_template_id=uuid)
gen_model.flavor_id = 'aaa'
self.assertRaises(exceptions.BadRequest,
self.cluster_member_client.post_cluster, gen_model)
resp, _ = self._delete_cluster_template(cluster_template.uuid)
self.assertEqual(204, resp.status)
@testtools.testcase.attr('negative')
@decorators.idempotent_id(
IDEMPOTENT_IDS['test_update_cluster_for_nonexisting_cluster'])
def test_update_cluster_for_nonexisting_cluster(self):
patch_model = datagen.cluster_name_patch_data()
self.assertRaises(
exceptions.NotFound,
self.cluster_member_client.patch_cluster, 'fooo', patch_model)
@testtools.testcase.attr('negative')
@decorators.idempotent_id(
IDEMPOTENT_IDS['test_delete_cluster_for_nonexisting_cluster'])
def test_delete_cluster_for_nonexisting_cluster(self):
self.assertRaises(
exceptions.NotFound,
self.cluster_member_client.delete_cluster, data_utils.rand_uuid())

View File

@ -0,0 +1,217 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from tempest.lib.common.utils import data_utils
from tempest.lib import exceptions
import testtools
from magnum_tempest_plugin.common import datagen
from magnum_tempest_plugin.tests.api import base
class ClusterTemplateTestTemplate(base.BaseTempestTest):
"""Template for tests for clustertemplate CRUD."""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.cluster_templates = []
def tearDown(self):
for cluster_template_id in self.cluster_templates:
self._delete_cluster_template(cluster_template_id)
self.cluster_templates.remove(cluster_template_id)
super().tearDown()
def _create_cluster_template(self, cmodel_model):
resp, model = \
self.ct_member_client.post_cluster_template(cmodel_model)
self.assertEqual(201, resp.status)
self.cluster_templates.append(model.uuid)
return resp, model
def _delete_cluster_template(self, model_id):
resp, model = \
self.ct_member_client.delete_cluster_template(model_id)
self.assertEqual(204, resp.status)
return resp, model
@testtools.testcase.attr('positive')
def test_list_cluster_templates(self):
gen_model = \
datagen.cluster_template_data_with_valid_keypair_image_flavor()
_, temp_model = self._create_cluster_template(gen_model)
resp, model = self.ct_reader_client.list_cluster_templates()
self.assertEqual(200, resp.status)
self.assertGreater(len(model.clustertemplates), 0)
self.assertIn(
temp_model.uuid,
list([x['uuid'] for x in model.clustertemplates]))
@testtools.testcase.attr('positive')
def test_create_cluster_template(self):
gen_model = \
datagen.cluster_template_data_with_valid_keypair_image_flavor()
resp, model = self._create_cluster_template(gen_model)
@testtools.testcase.attr('positive')
def test_create_get_public_cluster_template(self):
gen_model = datagen.valid_cluster_template(is_public=True)
self.assertRaises(
exceptions.Forbidden,
self.ct_member_client.post_cluster_template, gen_model)
@testtools.testcase.attr('positive')
def test_update_cluster_template_public_by_uuid(self):
path = "/public"
gen_model = \
datagen.cluster_template_data_with_valid_keypair_image_flavor()
resp, old_model = self._create_cluster_template(gen_model)
patch_model = datagen.cluster_template_replace_patch_data(path,
value=True)
self.assertRaises(
exceptions.Forbidden,
self.ct_member_client.patch_cluster_template,
old_model.uuid, patch_model)
@testtools.testcase.attr('positive')
def test_update_cluster_template_by_uuid(self):
gen_model = \
datagen.cluster_template_data_with_valid_keypair_image_flavor()
resp, old_model = self._create_cluster_template(gen_model)
patch_model = datagen.cluster_template_name_patch_data()
resp, new_model = self.ct_member_client.patch_cluster_template(
old_model.uuid, patch_model)
self.assertEqual(200, resp.status)
resp, model = \
self.ct_reader_client.get_cluster_template(new_model.uuid)
self.assertEqual(200, resp.status)
self.assertEqual(old_model.uuid, new_model.uuid)
self.assertEqual(model.name, new_model.name)
@testtools.testcase.attr('positive')
def test_delete_cluster_template_by_uuid(self):
gen_model = \
datagen.cluster_template_data_with_valid_keypair_image_flavor()
resp, model = self._create_cluster_template(gen_model)
resp, _ = self.ct_member_client.delete_cluster_template(
model.uuid)
self.assertEqual(204, resp.status)
self.cluster_templates.remove(model.uuid)
@testtools.testcase.attr('positive')
def test_delete_cluster_template_by_name(self):
gen_model = \
datagen.cluster_template_data_with_valid_keypair_image_flavor()
resp, model = self._create_cluster_template(gen_model)
resp, _ = self.ct_member_client.delete_cluster_template(
model.name)
self.assertEqual(204, resp.status)
self.cluster_templates.remove(model.uuid)
@testtools.testcase.attr('negative')
def test_get_cluster_template_by_uuid_404(self):
self.assertRaises(
exceptions.NotFound,
self.ct_reader_client.get_cluster_template,
data_utils.rand_uuid())
@testtools.testcase.attr('negative')
def test_update_cluster_template_404(self):
patch_model = datagen.cluster_template_name_patch_data()
self.assertRaises(
exceptions.NotFound,
self.ct_member_client.patch_cluster_template,
data_utils.rand_uuid(), patch_model)
@testtools.testcase.attr('negative')
def test_delete_cluster_template_404(self):
self.assertRaises(
exceptions.NotFound,
self.ct_member_client.delete_cluster_template,
data_utils.rand_uuid())
@testtools.testcase.attr('negative')
def test_get_cluster_template_by_name_404(self):
self.assertRaises(
exceptions.NotFound,
self.ct_reader_client.get_cluster_template, 'fooo')
@testtools.testcase.attr('negative')
def test_update_cluster_template_name_not_found(self):
patch_model = datagen.cluster_template_name_patch_data()
self.assertRaises(
exceptions.NotFound,
self.ct_member_client.patch_cluster_template,
'fooo', patch_model)
@testtools.testcase.attr('negative')
def test_delete_cluster_template_by_name_404(self):
self.assertRaises(
exceptions.NotFound,
self.ct_reader_client.get_cluster_template, 'fooo')
@testtools.testcase.attr('negative')
def test_create_cluster_template_missing_image(self):
gen_model = datagen.cluster_template_data_with_missing_image()
self.assertRaises(
exceptions.BadRequest,
self.ct_member_client.post_cluster_template, gen_model)
@testtools.testcase.attr('negative')
def test_create_cluster_template_missing_flavor(self):
gen_model = datagen.cluster_template_data_with_missing_flavor()
self.assertRaises(
exceptions.BadRequest,
self.ct_member_client.post_cluster_template, gen_model)
@testtools.testcase.attr('positive')
def test_create_cluster_template_missing_keypair(self):
gen_model = \
datagen.cluster_template_data_with_missing_keypair()
resp, model = self._create_cluster_template(gen_model)
@testtools.testcase.attr('negative')
def test_update_cluster_template_invalid_patch(self):
# get json object
gen_model = \
datagen.cluster_template_data_with_valid_keypair_image_flavor()
resp, old_model = self._create_cluster_template(gen_model)
self.assertRaises(
exceptions.BadRequest,
self.ct_member_client.patch_cluster_template,
data_utils.rand_uuid(), gen_model)
@testtools.testcase.attr('negative')
def test_create_cluster_template_invalid_network_driver(self):
gen_model = \
datagen.cluster_template_data_with_valid_keypair_image_flavor()
gen_model.network_driver = 'invalid_network_driver'
self.assertRaises(
exceptions.BadRequest,
self.ct_member_client.post_cluster_template, gen_model)
@testtools.testcase.attr('negative')
def test_create_cluster_template_invalid_volume_driver(self):
gen_model = \
datagen.cluster_template_data_with_valid_keypair_image_flavor()
gen_model.volume_driver = 'invalid_volume_driver'
self.assertRaises(
exceptions.BadRequest,
self.ct_member_client.post_cluster_template, gen_model)

View File

@ -0,0 +1,73 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import testtools
from magnum_tempest_plugin.common import datagen
from magnum_tempest_plugin.tests.api import base
class ClusterTemplateAdminTestTemplate(base.BaseTempestTest):
"""Template for Tests for clustertemplate admin operations."""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.cluster_templates = []
def tearDown(self):
for cluster_template_id in self.cluster_templates:
self._delete_cluster_template(cluster_template_id)
self.cluster_templates.remove(cluster_template_id)
super().tearDown()
def _create_cluster_template(self, cmodel_model):
resp, model = \
self.ct_admin_client.post_cluster_template(cmodel_model)
self.assertEqual(201, resp.status)
self.cluster_templates.append(model.uuid)
return resp, model
def _delete_cluster_template(self, model_id):
resp, model = \
self.ct_admin_client.delete_cluster_template(model_id)
self.assertEqual(204, resp.status)
return resp, model
@testtools.testcase.attr('positive')
def test_create_get_public_cluster_template(self):
gen_model = datagen.valid_cluster_template(is_public=True)
resp, model = self._create_cluster_template(gen_model)
resp, model = \
self.ct_admin_client.get_cluster_template(model.uuid)
self.assertEqual(200, resp.status)
self.assertTrue(model.public)
@testtools.testcase.attr('positive')
def test_update_cluster_template_public_by_uuid(self):
path = "/public"
gen_model = \
datagen.cluster_template_data_with_valid_keypair_image_flavor()
resp, old_model = self._create_cluster_template(gen_model)
patch_model = datagen.cluster_template_replace_patch_data(path,
value=True)
resp, new_model = self.ct_admin_client.patch_cluster_template(
old_model.uuid, patch_model)
self.assertEqual(200, resp.status)
resp, model = self.ct_admin_client.get_cluster_template(
new_model.uuid)
self.assertEqual(200, resp.status)
self.assertTrue(model.public)

View File

@ -95,7 +95,7 @@ class BaseTempestTest(base.BaseMagnumTest):
elif "default" == type_of_creds:
creds = ic.get_primary_creds()
else:
creds = ic.self.get_credentials(type_of_creds)
creds = ic.get_credentials(type_of_creds)
keypair = None

View File

@ -0,0 +1,118 @@
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from tempest import clients
from tempest import config
from tempest.lib import auth
from tempest.lib.common.utils import data_utils
from magnum_tempest_plugin.common import base
from magnum_tempest_plugin.common import config as magnum_config
CONF = config.CONF
class RbacBaseTests(base.BaseMagnumTest):
credentials = ['project_admin', 'project_member', 'project_reader']
identity_version = 'v3'
@classmethod
def skip_checks(cls):
super(RbacBaseTests, cls).skip_checks()
if not CONF.enforce_scope.magnum:
raise cls.skipException(
"Tempest is not configured to enforce_scope for magnum, "
"skipping RBAC tests. To enable these tests set "
"`tempest.conf [enforce_scope] magnum=True`."
)
@classmethod
def setUpClass(cls):
magnum_config.Config.setUp()
super(RbacBaseTests, cls).setUpClass()
@classmethod
def setup_clients(cls):
super(RbacBaseTests, cls).setup_clients()
# default to os_project_admin in base class
cls.persona = getattr(cls, f"os_{cls.credentials[0]}")
cls.magnum_clients = cls.persona.magnum_v1
cls.project_admin = cls.os_project_admin.magnum_v1
cls.project_reader = cls.os_project_reader.magnum_v1
cls.project_member = cls.os_project_member.magnum_v1
cls.ct_reader_client = cls.project_reader.ClusterTemplateClient()
cls.ct_member_client = cls.project_member.ClusterTemplateClient()
cls.ct_admin_client = cls.project_admin.ClusterTemplateClient()
cls.cluster_reader_client = cls.project_reader.ClusterClient()
cls.cluster_member_client = cls.project_member.ClusterClient()
cls.cluster_admin_client = cls.project_admin.ClusterClient()
cls.cert_reader_client = cls.project_reader.CertClient()
cls.cert_member_client = cls.project_member.CertClient()
cls.cert_admin_client = cls.project_admin.CertClient()
@classmethod
def setup_credentials(cls):
super().setup_credentials()
cls.os_primary = getattr(cls, f'os_{cls.credentials[0]}')
def do_request(self, method, expected_status=200, client=None, **payload):
if not client:
client = self.client
if isinstance(expected_status, type(Exception)):
self.assertRaises(expected_status,
getattr(client, method),
**payload)
else:
response, mode = getattr(client, method)(**payload)
self.assertEqual(response.status, expected_status)
return response, mode
def setup_user_client(self, project_id=None):
"""Set up project user with its own client.
This is useful for testing protection of resources in separate
projects.
Returns a client object and the user's ID.
"""
user_dict = {
'name': data_utils.rand_name('user'),
'password': data_utils.rand_password(),
}
user_id = self.os_system_admin.users_v3_client.create_user(
**user_dict)['user']['id']
self.addCleanup(self.os_system_admin.users_v3_client.delete_user,
user_id)
if not project_id:
project_id = self.os_system_admin.projects_client.create_project(
data_utils.rand_name())['project']['id']
self.addCleanup(
self.os_system_admin.projects_client.delete_project,
project_id)
member_role_id = self.os_system_admin.roles_v3_client.list_roles(
name='member')['roles'][0]['id']
self.os_system_admin.roles_v3_client.create_user_role_on_project(
project_id, user_id, member_role_id)
creds = auth.KeystoneV3Credentials(
user_id=user_id,
password=user_dict['password'],
project_id=project_id)
auth_provider = clients.get_auth_provider(creds)
creds = auth_provider.fill_credentials()
client = clients.Manager(credentials=creds)
return client

View File

@ -0,0 +1,49 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import abc
from magnum_tempest_plugin.common.templates import cluster
from magnum_tempest_plugin.tests.api.v1.rbac import base
IDEMPOTENT_IDS = {
"test_delete_cluster_for_nonexisting_cluster": (
"09589881-e3c1-4507-9ded-55994e46963f"),
"test_update_cluster_for_nonexisting_cluster": (
"145d02ee-a804-476a-948f-727099c3525d"),
"test_create_cluster_with_nonexisting_flavor": (
"a08f129e-b0cb-411b-9cc0-e6b97db3d6d7"),
"test_create_cluster_with_zero_masters": (
"9eee202d-77a0-43e0-b5e3-657ab5d46835"),
"test_create_cluster_with_zero_nodes": (
"bdfb3894-d970-4e35-95b3-fec3e8a717ae"),
"test_create_cluster_for_nonexisting_cluster_template": (
"f3be4b27-62f1-46d4-8c07-8b7315a2b4c3"),
"test_create_list_sign_delete_clusters": (
"b03cb130-d2ca-4721-8298-b491020d72db"),
}
class ClusterTest(base.RbacBaseTests,
cluster.ClusterTestTemplate,
metaclass=abc.ABCMeta):
"""Tests for cluster CRUD with RBAC."""
@classmethod
def tearDownClass(cls):
if cls.delete_template:
cls._delete_cluster_template(cls.cluster_template.uuid)
super(ClusterTest, cls).tearDownClass()
def setUp(self):
super(ClusterTest, self).setUp()
self._setup_cluster_template()

View File

@ -0,0 +1,21 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import abc
from magnum_tempest_plugin.common.templates import cluster_template
from magnum_tempest_plugin.tests.api.v1.rbac import base
class ClusterTemplateTest(base.RbacBaseTests,
cluster_template.ClusterTemplateTestTemplate,
metaclass=abc.ABCMeta):
"""Tests for clustertemplate CRUD with RBAC."""

View File

@ -0,0 +1,24 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import abc
from magnum_tempest_plugin.common.templates import cluster_template_admin
from magnum_tempest_plugin.tests.api.v1.rbac import base
class ClusterTemplateAdminTest(
base.RbacBaseTests,
cluster_template_admin.ClusterTemplateAdminTestTemplate,
metaclass=abc.ABCMeta
):
"""Tests for clustertemplate admin operations with RBAC."""

View File

@ -10,36 +10,31 @@
# License for the specific language governing permissions and limitations
# under the License.
import fixtures
from oslo_log import log as logging
from oslo_utils import uuidutils
from tempest.lib.common.utils import data_utils
from tempest.lib import decorators
from tempest.lib import exceptions
import testtools
from magnum_tempest_plugin.common import config
from magnum_tempest_plugin.common import datagen
from magnum_tempest_plugin.tests.api import base
from magnum_tempest_plugin.common.templates import cluster
IDEMPOTENT_IDS = {
"test_delete_cluster_for_nonexisting_cluster": (
"376c45ea-a857-11e9-9382-00224d6b7bc1"),
"test_update_cluster_for_nonexisting_cluster": (
"33bd0416-a857-11e9-9382-00224d6b7bc1"),
"test_create_cluster_with_nonexisting_flavor": (
"2cf16528-a857-11e9-9382-00224d6b7bc1"),
"test_create_cluster_with_zero_masters": (
"29c6c5f0-a857-11e9-9382-00224d6b7bc1"),
"test_create_cluster_with_zero_nodes": (
"262eb132-a857-11e9-9382-00224d6b7bc1"),
"test_create_cluster_for_nonexisting_cluster_template": (
"11c293da-a857-11e9-9382-00224d6b7bc1"),
"test_create_list_sign_delete_clusters": (
"44158a8c-a856-11e9-9382-00224d6b7bc1"),
}
HEADERS = {'OpenStack-API-Version': 'container-infra latest',
'Accept': 'application/json',
'Content-Type': 'application/json'}
class ClusterTest(base.BaseTempestTest):
class ClusterTest(cluster.ClusterTestTemplate):
"""Tests for cluster CRUD."""
LOG = logging.getLogger(__name__)
delete_template = False
def __init__(self, *args, **kwargs):
super(ClusterTest, self).__init__(*args, **kwargs)
self.clusters = []
@classmethod
def setUpClass(cls):
super(ClusterTest, cls).setUpClass()
@ -54,24 +49,27 @@ class ClusterTest(base.BaseTempestTest):
type_of_creds='default',
request_type='cluster_template'
)
cls.ct_reader_client = cls.cluster_template_client
cls.ct_member_client = cls.cluster_template_client
cls.ct_admin_client = cls.cluster_template_client
(cls.cluster_client, _) = cls.get_clients_with_existing_creds(
creds=cls.creds,
type_of_creds='default',
request_type='cluster'
)
cls.cluster_reader_client = cls.cluster_client
cls.cluster_member_client = cls.cluster_client
cls.cluster_admin_client = cls.cluster_client
(cls.cert_client, _) = cls.get_clients_with_existing_creds(
creds=cls.creds,
type_of_creds='default',
request_type='cert'
)
cls.cert_reader_client = cls.cert_client
cls.cert_member_client = cls.cert_client
cls.cert_admin_client = cls.cert_client
if config.Config.cluster_template_id:
_, cls.cluster_template = cls.cluster_template_client.\
get_cluster_template(config.Config.cluster_template_id)
else:
model = datagen.valid_cluster_template()
_, cls.cluster_template = cls._create_cluster_template(model)
cls.delete_template = True
cls._setup_cluster_template()
except Exception:
raise
@ -84,201 +82,3 @@ class ClusterTest(base.BaseTempestTest):
cls.keypairs_client.delete_keypair(config.Config.keypair_name)
super(ClusterTest, cls).tearDownClass()
def setUp(self):
super(ClusterTest, self).setUp()
# NOTE (dimtruck) by default tempest sets timeout to 20 mins.
# We need more time.
test_timeout = 3600
self.useFixture(fixtures.Timeout(test_timeout, gentle=True))
def tearDown(self):
try:
cluster_list = self.clusters[:]
for cluster_id in cluster_list:
self._delete_cluster(cluster_id)
self.clusters.remove(cluster_id)
finally:
super(ClusterTest, self).tearDown()
@classmethod
def _create_cluster_template(cls, cm_model):
cls.LOG.debug('We will create a clustertemplate for %s', cm_model)
resp, model = cls.cluster_template_client.post_cluster_template(
cm_model)
return resp, model
@classmethod
def _delete_cluster_template(cls, cm_id):
cls.LOG.debug('We will delete a clustertemplate for %s', cm_id)
resp, model = cls.cluster_template_client.delete_cluster_template(
cm_id)
return resp, model
def _create_cluster(self, cluster_model):
self.LOG.debug('We will create cluster for %s', cluster_model)
resp, model = self.cluster_client.post_cluster(cluster_model)
self.LOG.debug('Response: %s', resp)
self.assertEqual(202, resp.status)
self.assertIsNotNone(model.uuid)
self.assertTrue(uuidutils.is_uuid_like(model.uuid))
self.clusters.append(model.uuid)
self.cluster_uuid = model.uuid
if config.Config.copy_logs:
self.addCleanup(self.copy_logs_handler(
lambda: list(
[self._get_cluster_by_id(model.uuid)[1].master_addresses,
self._get_cluster_by_id(model.uuid)[1].node_addresses]),
self.cluster_template.coe,
self.keypair))
timeout = config.Config.cluster_creation_timeout * 60
self.cluster_client.wait_for_created_cluster(model.uuid,
delete_on_error=False,
timeout=timeout)
return resp, model
def _delete_cluster(self, cluster_id):
self.LOG.debug('We will delete a cluster for %s', cluster_id)
resp, model = self.cluster_client.delete_cluster(cluster_id)
self.assertEqual(204, resp.status)
self.cluster_client.wait_for_cluster_to_delete(cluster_id)
self.assertRaises(exceptions.NotFound, self.cert_client.get_cert,
cluster_id, headers=HEADERS)
return resp, model
def _get_cluster_by_id(self, cluster_id):
resp, model = self.cluster_client.get_cluster(cluster_id)
return resp, model
# (dimtruck) Combining all these tests in one because
# they time out on the gate (2 hours not enough)
@testtools.testcase.attr('positive')
@testtools.testcase.attr('slow')
@decorators.idempotent_id('44158a8c-a856-11e9-9382-00224d6b7bc1')
def test_create_list_sign_delete_clusters(self):
gen_model = datagen.valid_cluster_data(
cluster_template_id=self.cluster_template.uuid, node_count=1)
# test cluster create
_, cluster_model = self._create_cluster(gen_model)
self.assertNotIn('status', cluster_model)
# test cluster list
resp, cluster_list_model = self.cluster_client.list_clusters()
self.assertEqual(200, resp.status)
self.assertGreater(len(cluster_list_model.clusters), 0)
self.assertIn(
cluster_model.uuid, list([x['uuid']
for x in cluster_list_model.clusters]))
# test ca show
resp, cert_model = self.cert_client.get_cert(
cluster_model.uuid, headers=HEADERS)
self.LOG.debug("cert resp: %s", resp)
self.assertEqual(200, resp.status)
self.assertEqual(cert_model.cluster_uuid, cluster_model.uuid)
self.assertIsNotNone(cert_model.pem)
self.assertIn('-----BEGIN CERTIFICATE-----', cert_model.pem)
self.assertIn('-----END CERTIFICATE-----', cert_model.pem)
# test ca sign
csr_sample = """-----BEGIN CERTIFICATE REQUEST-----
MIIByjCCATMCAQAwgYkxCzAJBgNVBAYTAlVTMRMwEQYDVQQIEwpDYWxpZm9ybmlh
MRYwFAYDVQQHEw1Nb3VudGFpbiBWaWV3MRMwEQYDVQQKEwpHb29nbGUgSW5jMR8w
HQYDVQQLExZJbmZvcm1hdGlvbiBUZWNobm9sb2d5MRcwFQYDVQQDEw53d3cuZ29v
Z2xlLmNvbTCBnzANBgkqhkiG9w0BAQEFAAOBjQAwgYkCgYEApZtYJCHJ4VpVXHfV
IlstQTlO4qC03hjX+ZkPyvdYd1Q4+qbAeTwXmCUKYHThVRd5aXSqlPzyIBwieMZr
WFlRQddZ1IzXAlVRDWwAo60KecqeAXnnUK+5fXoTI/UgWshre8tJ+x/TMHaQKR/J
cIWPhqaQhsJuzZbvAdGA80BLxdMCAwEAAaAAMA0GCSqGSIb3DQEBBQUAA4GBAIhl
4PvFq+e7ipARgI5ZM+GZx6mpCz44DTo0JkwfRDf+BtrsaC0q68eTf2XhYOsq4fkH
Q0uA0aVog3f5iJxCa3Hp5gxbJQ6zV6kJ0TEsuaaOhEko9sdpCoPOnRBm2i/XRD2D
6iNh8f8z0ShGsFqjDgFHyF3o+lUyj+UC6H1QW7bn
-----END CERTIFICATE REQUEST-----
"""
cert_data_model = datagen.cert_data(cluster_model.uuid,
csr_data=csr_sample)
resp, cert_model = self.cert_client.post_cert(cert_data_model,
headers=HEADERS)
self.LOG.debug("cert resp: %s", resp)
self.assertEqual(201, resp.status)
self.assertEqual(cert_model.cluster_uuid, cluster_model.uuid)
self.assertIsNotNone(cert_model.pem)
self.assertIn('-----BEGIN CERTIFICATE-----', cert_model.pem)
self.assertIn('-----END CERTIFICATE-----', cert_model.pem)
# test cluster delete
self._delete_cluster(cluster_model.uuid)
self.clusters.remove(cluster_model.uuid)
@testtools.testcase.attr('negative')
@decorators.idempotent_id('11c293da-a857-11e9-9382-00224d6b7bc1')
def test_create_cluster_for_nonexisting_cluster_template(self):
cm_id = 'this-does-not-exist'
gen_model = datagen.valid_cluster_data(cluster_template_id=cm_id)
self.assertRaises(
exceptions.BadRequest,
self.cluster_client.post_cluster, gen_model)
@testtools.testcase.attr('positive')
@testtools.testcase.attr('slow')
@decorators.idempotent_id('262eb132-a857-11e9-9382-00224d6b7bc1')
def test_create_cluster_with_zero_nodes(self):
gen_model = datagen.valid_cluster_data(
cluster_template_id=self.cluster_template.uuid, node_count=0)
# test cluster create
_, cluster_model = self._create_cluster(gen_model)
self.assertNotIn('status', cluster_model)
# test cluster delete
self._delete_cluster(cluster_model.uuid)
self.clusters.remove(cluster_model.uuid)
@testtools.testcase.attr('negative')
@decorators.idempotent_id('29c6c5f0-a857-11e9-9382-00224d6b7bc1')
def test_create_cluster_with_zero_masters(self):
uuid = self.cluster_template.uuid
gen_model = datagen.valid_cluster_data(cluster_template_id=uuid,
master_count=0)
self.assertRaises(
exceptions.BadRequest,
self.cluster_client.post_cluster, gen_model)
@testtools.testcase.attr('negative')
@decorators.idempotent_id('2cf16528-a857-11e9-9382-00224d6b7bc1')
def test_create_cluster_with_nonexisting_flavor(self):
gen_model = \
datagen.cluster_template_data_with_valid_keypair_image_flavor()
resp, cluster_template = self._create_cluster_template(gen_model)
self.assertEqual(201, resp.status)
self.assertIsNotNone(cluster_template.uuid)
uuid = cluster_template.uuid
gen_model = datagen.valid_cluster_data(cluster_template_id=uuid)
gen_model.flavor_id = 'aaa'
self.assertRaises(exceptions.BadRequest,
self.cluster_client.post_cluster, gen_model)
resp, _ = self._delete_cluster_template(cluster_template.uuid)
self.assertEqual(204, resp.status)
@testtools.testcase.attr('negative')
@decorators.idempotent_id('33bd0416-a857-11e9-9382-00224d6b7bc1')
def test_update_cluster_for_nonexisting_cluster(self):
patch_model = datagen.cluster_name_patch_data()
self.assertRaises(
exceptions.NotFound,
self.cluster_client.patch_cluster, 'fooo', patch_model)
@testtools.testcase.attr('negative')
@decorators.idempotent_id('376c45ea-a857-11e9-9382-00224d6b7bc1')
def test_delete_cluster_for_nonexisting_cluster(self):
self.assertRaises(
exceptions.NotFound,
self.cluster_client.delete_cluster, data_utils.rand_uuid())

View File

@ -11,21 +11,15 @@
# under the License.
from tempest.lib.common.utils import data_utils
from tempest.lib import exceptions
import testtools
from magnum_tempest_plugin.common import datagen
from magnum_tempest_plugin.tests.api import base
from magnum_tempest_plugin.common.templates import cluster_template
class ClusterTemplateTest(base.BaseTempestTest):
class ClusterTemplateTest(cluster_template.ClusterTemplateTestTemplate):
"""Tests for clustertemplate CRUD."""
def __init__(self, *args, **kwargs):
super(ClusterTemplateTest, self).__init__(*args, **kwargs)
self.cluster_templates = []
self.cluster_template_client = None
self.keypairs_client = None
@ -36,195 +30,8 @@ class ClusterTemplateTest(base.BaseTempestTest):
self.keypairs_client) = self.get_clients_with_new_creds(
type_of_creds='default',
request_type='cluster_template')
self.ct_reader_client = self.cluster_template_client
self.ct_member_client = self.cluster_template_client
except Exception:
self.tearDown()
raise
def tearDown(self):
for cluster_template_id in self.cluster_templates:
self._delete_cluster_template(cluster_template_id)
self.cluster_templates.remove(cluster_template_id)
super(ClusterTemplateTest, self).tearDown()
def _create_cluster_template(self, cmodel_model):
resp, model = \
self.cluster_template_client.post_cluster_template(cmodel_model)
self.assertEqual(201, resp.status)
self.cluster_templates.append(model.uuid)
return resp, model
def _delete_cluster_template(self, model_id):
resp, model = \
self.cluster_template_client.delete_cluster_template(model_id)
self.assertEqual(204, resp.status)
return resp, model
@testtools.testcase.attr('positive')
def test_list_cluster_templates(self):
gen_model = \
datagen.cluster_template_data_with_valid_keypair_image_flavor()
_, temp_model = self._create_cluster_template(gen_model)
resp, model = self.cluster_template_client.list_cluster_templates()
self.assertEqual(200, resp.status)
self.assertGreater(len(model.clustertemplates), 0)
self.assertIn(
temp_model.uuid,
list([x['uuid'] for x in model.clustertemplates]))
@testtools.testcase.attr('positive')
def test_create_cluster_template(self):
gen_model = \
datagen.cluster_template_data_with_valid_keypair_image_flavor()
resp, model = self._create_cluster_template(gen_model)
@testtools.testcase.attr('positive')
def test_create_get_public_cluster_template(self):
gen_model = datagen.valid_cluster_template(is_public=True)
self.assertRaises(
exceptions.Forbidden,
self.cluster_template_client.post_cluster_template, gen_model)
@testtools.testcase.attr('positive')
def test_update_cluster_template_public_by_uuid(self):
path = "/public"
gen_model = \
datagen.cluster_template_data_with_valid_keypair_image_flavor()
resp, old_model = self._create_cluster_template(gen_model)
patch_model = datagen.cluster_template_replace_patch_data(path,
value=True)
self.assertRaises(
exceptions.Forbidden,
self.cluster_template_client.patch_cluster_template,
old_model.uuid, patch_model)
@testtools.testcase.attr('positive')
def test_update_cluster_template_by_uuid(self):
gen_model = \
datagen.cluster_template_data_with_valid_keypair_image_flavor()
resp, old_model = self._create_cluster_template(gen_model)
patch_model = datagen.cluster_template_name_patch_data()
resp, new_model = self.cluster_template_client.patch_cluster_template(
old_model.uuid, patch_model)
self.assertEqual(200, resp.status)
resp, model = \
self.cluster_template_client.get_cluster_template(new_model.uuid)
self.assertEqual(200, resp.status)
self.assertEqual(old_model.uuid, new_model.uuid)
self.assertEqual(model.name, new_model.name)
@testtools.testcase.attr('positive')
def test_delete_cluster_template_by_uuid(self):
gen_model = \
datagen.cluster_template_data_with_valid_keypair_image_flavor()
resp, model = self._create_cluster_template(gen_model)
resp, _ = self.cluster_template_client.delete_cluster_template(
model.uuid)
self.assertEqual(204, resp.status)
self.cluster_templates.remove(model.uuid)
@testtools.testcase.attr('positive')
def test_delete_cluster_template_by_name(self):
gen_model = \
datagen.cluster_template_data_with_valid_keypair_image_flavor()
resp, model = self._create_cluster_template(gen_model)
resp, _ = self.cluster_template_client.delete_cluster_template(
model.name)
self.assertEqual(204, resp.status)
self.cluster_templates.remove(model.uuid)
@testtools.testcase.attr('negative')
def test_get_cluster_template_by_uuid_404(self):
self.assertRaises(
exceptions.NotFound,
self.cluster_template_client.get_cluster_template,
data_utils.rand_uuid())
@testtools.testcase.attr('negative')
def test_update_cluster_template_404(self):
patch_model = datagen.cluster_template_name_patch_data()
self.assertRaises(
exceptions.NotFound,
self.cluster_template_client.patch_cluster_template,
data_utils.rand_uuid(), patch_model)
@testtools.testcase.attr('negative')
def test_delete_cluster_template_404(self):
self.assertRaises(
exceptions.NotFound,
self.cluster_template_client.delete_cluster_template,
data_utils.rand_uuid())
@testtools.testcase.attr('negative')
def test_get_cluster_template_by_name_404(self):
self.assertRaises(
exceptions.NotFound,
self.cluster_template_client.get_cluster_template, 'fooo')
@testtools.testcase.attr('negative')
def test_update_cluster_template_name_not_found(self):
patch_model = datagen.cluster_template_name_patch_data()
self.assertRaises(
exceptions.NotFound,
self.cluster_template_client.patch_cluster_template,
'fooo', patch_model)
@testtools.testcase.attr('negative')
def test_delete_cluster_template_by_name_404(self):
self.assertRaises(
exceptions.NotFound,
self.cluster_template_client.get_cluster_template, 'fooo')
@testtools.testcase.attr('negative')
def test_create_cluster_template_missing_image(self):
gen_model = datagen.cluster_template_data_with_missing_image()
self.assertRaises(
exceptions.BadRequest,
self.cluster_template_client.post_cluster_template, gen_model)
@testtools.testcase.attr('negative')
def test_create_cluster_template_missing_flavor(self):
gen_model = datagen.cluster_template_data_with_missing_flavor()
self.assertRaises(
exceptions.BadRequest,
self.cluster_template_client.post_cluster_template, gen_model)
@testtools.testcase.attr('positive')
def test_create_cluster_template_missing_keypair(self):
gen_model = \
datagen.cluster_template_data_with_missing_keypair()
resp, model = self._create_cluster_template(gen_model)
@testtools.testcase.attr('negative')
def test_update_cluster_template_invalid_patch(self):
# get json object
gen_model = \
datagen.cluster_template_data_with_valid_keypair_image_flavor()
resp, old_model = self._create_cluster_template(gen_model)
self.assertRaises(
exceptions.BadRequest,
self.cluster_template_client.patch_cluster_template,
data_utils.rand_uuid(), gen_model)
@testtools.testcase.attr('negative')
def test_create_cluster_template_invalid_network_driver(self):
gen_model = \
datagen.cluster_template_data_with_valid_keypair_image_flavor()
gen_model.network_driver = 'invalid_network_driver'
self.assertRaises(
exceptions.BadRequest,
self.cluster_template_client.post_cluster_template, gen_model)
@testtools.testcase.attr('negative')
def test_create_cluster_template_invalid_volume_driver(self):
gen_model = \
datagen.cluster_template_data_with_valid_keypair_image_flavor()
gen_model.volume_driver = 'invalid_volume_driver'
self.assertRaises(
exceptions.BadRequest,
self.cluster_template_client.post_cluster_template, gen_model)

View File

@ -11,19 +11,17 @@
# under the License.
import testtools
from magnum_tempest_plugin.common import datagen
from magnum_tempest_plugin.tests.api import base
from magnum_tempest_plugin.common.templates import cluster_template_admin
class ClusterTemplateAdminTest(base.BaseTempestTest):
class ClusterTemplateAdminTest(
cluster_template_admin.ClusterTemplateAdminTestTemplate
):
"""Tests for clustertemplate admin operations."""
def __init__(self, *args, **kwargs):
super(ClusterTemplateAdminTest, self).__init__(*args, **kwargs)
self.cluster_templates = []
self.cluster_template_client = None
self.keypairs_client = None
@ -34,53 +32,9 @@ class ClusterTemplateAdminTest(base.BaseTempestTest):
self.keypairs_client) = self.get_clients_with_new_creds(
type_of_creds='admin',
request_type='cluster_template')
self.ct_reader_client = self.cluster_template_client
self.ct_member_client = self.cluster_template_client
self.ct_admin_client = self.cluster_template_client
except Exception:
self.tearDown()
raise
def tearDown(self):
for cluster_template_id in self.cluster_templates:
self._delete_cluster_template(cluster_template_id)
self.cluster_templates.remove(cluster_template_id)
super(ClusterTemplateAdminTest, self).tearDown()
def _create_cluster_template(self, cmodel_model):
resp, model = \
self.cluster_template_client.post_cluster_template(cmodel_model)
self.assertEqual(201, resp.status)
self.cluster_templates.append(model.uuid)
return resp, model
def _delete_cluster_template(self, model_id):
resp, model = \
self.cluster_template_client.delete_cluster_template(model_id)
self.assertEqual(204, resp.status)
return resp, model
@testtools.testcase.attr('positive')
def test_create_get_public_cluster_template(self):
gen_model = datagen.valid_cluster_template(is_public=True)
resp, model = self._create_cluster_template(gen_model)
resp, model = \
self.cluster_template_client.get_cluster_template(model.uuid)
self.assertEqual(200, resp.status)
self.assertTrue(model.public)
@testtools.testcase.attr('positive')
def test_update_cluster_template_public_by_uuid(self):
path = "/public"
gen_model = \
datagen.cluster_template_data_with_valid_keypair_image_flavor()
resp, old_model = self._create_cluster_template(gen_model)
patch_model = datagen.cluster_template_replace_patch_data(path,
value=True)
resp, new_model = self.cluster_template_client.patch_cluster_template(
old_model.uuid, patch_model)
self.assertEqual(200, resp.status)
resp, model = self.cluster_template_client.get_cluster_template(
new_model.uuid)
self.assertEqual(200, resp.status)
self.assertTrue(model.public)