Functional tests with Tempest - BayModel CRUD

This patch will add functional test coverage for
baymodels.  It sets up tempest-lib and core
framework architecture with clients, models,
and data generator abstractions.

Partially implements: blueprint magnum-tempest
Change-Id: I9f4bf4293544ef1fe2a6b8aa209df49ddaabd04b
This commit is contained in:
dimtruck 2015-09-16 10:59:00 -05:00
parent 4381c685b3
commit 87db63227e
20 changed files with 994 additions and 0 deletions

View File

@ -2,6 +2,10 @@
[auth]
auth_url = http://127.0.0.1:5000/v2.0
magnum_url = http://127.0.0.1:9511/v1
username = demo
tenant_name = demo
password = password
auth_version = v2
[admin]
user = admin
tenant = admin
@ -9,3 +13,4 @@ pass = secrete
[magnum]
image_id = fedora-21-atomic-3
nic_id = public
keypair_id = default

View File

@ -71,6 +71,10 @@ cat <<EOF > $CREDS_FILE
[auth]
auth_url = $OS_AUTH_URL
magnum_url = $BYPASS_URL
username = $OS_USERNAME
tenant_name = $OS_TENANT_NAME
password = $OS_PASSWORD
auth_version = v2
[admin]
user = $OS_USERNAME
tenant = $OS_TENANT_NAME
@ -79,6 +83,7 @@ region_name = $OS_REGION_NAME
[magnum]
image_id = $IMAGE_ID
nic_id = $NIC_ID
keypair_id = default
EOF
# Create a keypair for use in the functional tests.

View File

@ -0,0 +1,19 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import logging
logging.basicConfig(
filename='functional-tests.log',
filemode='w',
level=logging.DEBUG,
)

View File

View File

@ -0,0 +1,105 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from magnum.tests.functional.api.v1.models import baymodel_model
from magnum.tests.functional.common import client
class BayModelClient(client.ClientMixin):
"""Encapsulates REST calls and maps JSON to/from models"""
@classmethod
def baymodels_uri(cls, filters=None):
"""Construct baymodels uri with optional filters
:param filters: Optional k:v dict that's converted to url query
:returns: url string
"""
url = "/baymodels"
if filters:
url = cls.add_filters(url, filters)
return url
@classmethod
def baymodel_uri(cls, baymodel_id):
"""Construct baymodel uri
:param baymodel_id: baymodel uuid or name
:returns: url string
"""
return "{0}/{1}".format(cls.baymodels_uri(), baymodel_id)
def list_baymodels(self, filters=None, **kwargs):
"""Makes GET /baymodels request and returns BayModelCollection
Abstracts REST call to return all baymodels
:param filters: Optional k:v dict that's converted to url query
:returns: response object and BayModelCollection object
"""
resp, body = self.client.get(self.baymodels_uri(filters), **kwargs)
return self.deserialize(resp, body, baymodel_model.BayModelCollection)
def get_baymodel(self, baymodel_id, **kwargs):
"""Makes GET /baymodel request and returns BayModelEntity
Abstracts REST call to return a single baymodel based on uuid or name
:param baymodel_id: baymodel uuid or name
:returns: response object and BayModelCollection object
"""
resp, body = self.client.get(self.baymodel_uri(baymodel_id))
return self.deserialize(resp, body, baymodel_model.BayModelEntity)
def post_baymodel(self, model, **kwargs):
"""Makes POST /baymodel request and returns BayModelEntity
Abstracts REST call to create new baymodel
:param model: BayModelEntity
:returns: response object and BayModelEntity object
"""
resp, body = self.client.post(
self.baymodels_uri(),
body=model.to_json(), **kwargs)
return self.deserialize(resp, body, baymodel_model.BayModelEntity)
def patch_baymodel(self, baymodel_id, baymodelpatch_listmodel, **kwargs):
"""Makes PATCH /baymodel request and returns BayModelEntity
Abstracts REST call to update baymodel attributes
:param baymodel_id: UUID of baymodel
:param baymodelpatch_listmodel: BayModelPatchCollection
:returns: response object and BayModelEntity object
"""
resp, body = self.client.patch(
self.baymodel_uri(baymodel_id),
body=baymodelpatch_listmodel.to_json(), **kwargs)
return self.deserialize(resp, body, baymodel_model.BayModelEntity)
def delete_baymodel(self, baymodel_id, **kwargs):
"""Makes DELETE /baymodel request and returns response object
Abstracts REST call to delete baymodel based on uuid or name
:param baymodel_id: UUID or name of baymodel
:returns: response object
"""
return self.client.delete(self.baymodel_uri(baymodel_id), **kwargs)

View File

@ -0,0 +1,30 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from magnum.tests.functional.common import models
class BayModelData(models.BaseModel):
"""Data that encapsulates baymodel attributes"""
pass
class BayModelEntity(models.EntityModel):
"""Entity Model that represents a single instance of BayModelData"""
ENTITY_NAME = 'baymodel'
MODEL_TYPE = BayModelData
class BayModelCollection(models.CollectionModel):
"""Collection Model that represents a list of BayModelData objects"""
COLLECTION_NAME = 'baymodellists'
MODEL_TYPE = BayModelData

View File

@ -0,0 +1,76 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import json
from magnum.tests.functional.common import models
class BayModelPatchData(models.BaseModel):
"""Data that encapsulates baymodelpatch attributes"""
pass
class BayModelPatchEntity(models.EntityModel):
"""Entity Model that represents a single instance of BayModelPatchData"""
ENTITY_NAME = 'baymodelpatch'
MODEL_TYPE = BayModelPatchData
class BayModelPatchCollection(models.CollectionModel):
"""Collection Model that represents a list of BayModelPatchData objects"""
MODEL_TYPE = BayModelPatchData
COLLECTION_NAME = 'baymodelpatchlist'
def to_json(self):
"""Converts BayModelPatchCollection to json
Retrieves list from COLLECTION_NAME attribute and converts each object
to dict, appending it to a list. Then converts the entire list to json
This is required due to COLLECTION_NAME holding a list of objects that
needed to be converted to dict individually
:returns: json object
"""
data = getattr(self, BayModelPatchCollection.COLLECTION_NAME)
collection = []
for d in data:
collection.append(d.to_dict())
return json.dumps(collection)
@classmethod
def from_dict(cls, data):
"""Converts dict to BayModelPatchData
Converts data dict to list of BayModelPatchData objects and stores it
in COLLECTION_NAME
Example of dict data:
[{
"path": "/name",
"value": "myname",
"op": "replace"
}]
:param data: dict of patch data
:returns: json object
"""
model = cls()
collection = []
for d in data:
collection.append(cls.MODEL_TYPE.from_dict(d))
setattr(model, cls.COLLECTION_NAME, collection)
return model

View File

@ -0,0 +1,174 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from tempest_lib import exceptions
import testtools
from magnum.tests.functional.api.v1.clients import baymodel_client as cli
from magnum.tests.functional.common import base
from magnum.tests.functional.common import datagen
class BayModelTest(base.BaseMagnumTest):
"""Tests for baymodel CRUD."""
def __init__(self, *args, **kwargs):
super(BayModelTest, self).__init__(*args, **kwargs)
self.baymodels = []
def setUp(self):
super(BayModelTest, self).setUp()
def tearDown(self):
super(BayModelTest, self).tearDown()
for (baymodel_id, user) in self.baymodels:
self._delete_baymodel(baymodel_id, user)
self.baymodels.remove((baymodel_id, user))
def _create_baymodel(self, baymodel_model, user='default'):
resp, model = cli.BayModelClient.as_user(user).post_baymodel(
baymodel_model)
self.assertEqual(resp.status, 201)
self.baymodels.append((model.uuid, user))
return resp, model
def _delete_baymodel(self, baymodel_id, user='default'):
resp, model = cli.BayModelClient.as_user(user).delete_baymodel(
baymodel_id)
self.assertEqual(resp.status, 204)
return resp, model
@testtools.testcase.attr('positive')
def test_list_baymodels(self):
gen_model = datagen.random_baymodel_data_w_valid_keypair_and_image_id()
_, temp_model = self._create_baymodel(gen_model, user='default')
resp, model = cli.BayModelClient.as_user('default').list_baymodels()
self.assertEqual(resp.status, 200)
self.assertGreater(len(model.baymodels), 0)
self.assertIn(
temp_model.uuid, list([x['uuid'] for x in model.baymodels]))
@testtools.testcase.attr('positive')
def test_create_baymodel(self):
gen_model = datagen.random_baymodel_data_w_valid_keypair_and_image_id()
resp, model = self._create_baymodel(gen_model, user='default')
@testtools.testcase.attr('positive')
def test_update_baymodel_by_uuid(self):
gen_model = datagen.random_baymodel_data_w_valid_keypair_and_image_id()
resp, old_model = self._create_baymodel(gen_model, user='default')
patch_model = datagen.random_baymodel_name_patch_data()
bay_model_client = cli.BayModelClient.as_user('default')
resp, new_model = bay_model_client.patch_baymodel(
old_model.uuid, patch_model)
self.assertEqual(resp.status, 200)
resp, model = cli.BayModelClient.as_user('default').get_baymodel(
new_model.uuid)
self.assertEqual(resp.status, 200)
self.assertEqual(new_model.uuid, old_model.uuid)
self.assertEqual(new_model.name, model.name)
@testtools.testcase.attr('positive')
def test_delete_baymodel_by_uuid(self):
gen_model = datagen.random_baymodel_data_w_valid_keypair_and_image_id()
resp, model = self._create_baymodel(gen_model, user='default')
resp, _ = cli.BayModelClient.as_user('default').delete_baymodel(
model.uuid)
self.assertEqual(resp.status, 204)
self.baymodels.remove((model.uuid, 'default'))
@testtools.testcase.attr('positive')
def test_delete_baymodel_by_name(self):
gen_model = datagen.random_baymodel_data_w_valid_keypair_and_image_id()
resp, model = self._create_baymodel(gen_model, user='default')
resp, _ = cli.BayModelClient.as_user('default').delete_baymodel(
model.name)
self.assertEqual(resp.status, 204)
self.baymodels.remove((model.uuid, 'default'))
@testtools.testcase.attr('negative')
def test_get_baymodel_by_uuid_404(self):
bay_model_client = cli.BayModelClient.as_user('default')
self.assertRaises(
exceptions.NotFound,
bay_model_client.get_baymodel, datagen.random_uuid())
@testtools.testcase.attr('negative')
def test_update_baymodel_404(self):
patch_model = datagen.random_baymodel_name_patch_data()
bay_model_client = cli.BayModelClient.as_user('default')
self.assertRaises(
exceptions.NotFound,
bay_model_client.patch_baymodel,
datagen.random_uuid(), patch_model)
@testtools.testcase.attr('negative')
def test_delete_baymodel_404(self):
bay_model_client = cli.BayModelClient.as_user('default')
self.assertRaises(
exceptions.NotFound,
bay_model_client.delete_baymodel, datagen.random_uuid())
@testtools.testcase.attr('negative')
def test_get_baymodel_by_name_404(self):
bay_model_client = cli.BayModelClient.as_user('default')
self.assertRaises(
exceptions.NotFound,
bay_model_client.get_baymodel, 'fooo')
@testtools.testcase.attr('negative')
def test_update_baymodel_invalid_uuid(self):
patch_model = datagen.random_baymodel_name_patch_data()
bay_model_client = cli.BayModelClient.as_user('default')
self.assertRaises(
exceptions.BadRequest,
bay_model_client.patch_baymodel, 'fooo', patch_model)
@testtools.testcase.attr('negative')
def test_delete_baymodel_by_name_404(self):
bay_model_client = cli.BayModelClient.as_user('default')
self.assertRaises(
exceptions.NotFound,
bay_model_client.get_baymodel, 'fooo')
@testtools.testcase.attr('negative')
def test_create_baymodel_missing_image(self):
bay_model_client = cli.BayModelClient.as_user('default')
gen_model = datagen.random_baymodel_data_w_valid_keypair()
self.assertRaises(
exceptions.NotFound,
bay_model_client.post_baymodel, gen_model)
@testtools.testcase.attr('negative')
def test_create_baymodel_missing_keypair(self):
bay_model_client = cli.BayModelClient.as_user('default')
gen_model = datagen.random_baymodel_data_w_valid_image_id()
self.assertRaises(
exceptions.NotFound,
bay_model_client.post_baymodel, gen_model)
@testtools.testcase.attr('negative')
def test_update_baymodel_invalid_patch(self):
# get json object
gen_model = datagen.random_baymodel_data_w_valid_keypair_and_image_id()
resp, old_model = self._create_baymodel(gen_model)
bay_model_client = cli.BayModelClient.as_user('default')
self.assertRaises(
exceptions.BadRequest,
bay_model_client.patch_baymodel, datagen.random_uuid(), gen_model)

View File

@ -0,0 +1,27 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from tempest_lib import base
from magnum.tests.functional.common import config
class BaseMagnumTest(base.BaseTestCase):
"""Sets up configuration required for functional tests"""
def __init__(self, *args, **kwargs):
super(BaseMagnumTest, self).__init__(*args, **kwargs)
@classmethod
def setUpClass(cls):
super(BaseMagnumTest, cls).setUpClass()
config.Config.setUp()

View File

@ -0,0 +1,96 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import abc
import six
from six.moves.urllib import parse
from tempest_lib.common import rest_client
from magnum.tests.functional.common import config
from magnum.tests.functional.common import manager
from magnum.tests.functional.common.utils import memoized
@six.add_metaclass(abc.ABCMeta)
class BaseMagnumClient(rest_client.RestClient):
"""Abstract class responsible for setting up auth provider"""
def __init__(self):
super(BaseMagnumClient, self).__init__(
auth_provider=self.get_auth_provider(),
service='container',
region=config.Config.region
)
@abc.abstractmethod
def get_auth_provider(self):
pass
class MagnumClient(BaseMagnumClient):
"""Responsible for setting up auth provider for default user
"""
def get_auth_provider(self):
mgr = manager.Manager()
return mgr.get_auth_provider(
username=config.Config.user,
password=config.Config.passwd,
tenant_name=config.Config.tenant
)
class ClientMixin(object):
"""Responsible for mapping setting up common client use cases:
- deserializing response data to a model
- mapping user requests to a specific client for authentication
- generating request URLs
"""
@classmethod
@memoized
def get_clients(cls):
return {
'default': MagnumClient(),
}
def __init__(self, client):
self.client = client
@classmethod
def deserialize(cls, resp, body, model_type):
return resp, model_type.from_json(body)
@classmethod
def as_user(cls, user):
"""Retrieves Magnum client based on user parameter
:param user: type of user ('default')
:returns: a class that maps to user type in get_clients dict
"""
return cls(cls.get_clients()[user])
@property
def tenant_id(self):
return self.client.tenant_id
@classmethod
def add_filters(cls, url, filters):
"""add_filters adds dict values (filters) to url as query parameters
:param url: base URL for the request
:param filters: dict with var:val pairs to add as parameters to URL
:returns: url string
"""
return url + "?" + parse(filters)

View File

@ -0,0 +1,71 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import ConfigParser
class Config(object):
"""Parses configuration to attributes required for auth and test data"""
@classmethod
def setUp(cls):
config = ConfigParser.RawConfigParser()
if config.read('functional_creds.conf'):
# admin creds
cls.admin_user = config.get('admin', 'user')
cls.admin_passwd = config.get('admin', 'pass')
cls.admin_tenant = config.get('admin', 'tenant')
# normal user creds
cls.user = config.get('auth', 'username')
cls.passwd = config.get('auth', 'password')
cls.tenant = config.get('auth', 'tenant_name')
# auth version for client authentication
if config.has_option('auth', 'auth_version'):
cls.auth_version = config.get('auth', 'auth_version')
else:
cls.auth_version = 'v3'
# auth_url for client authentication
if cls.auth_version == 'v3':
if not config.has_option('auth', 'auth_v3_url'):
raise Exception('config missing auth_v3_url key')
cls.auth_v3_url = config.get('auth', 'auth_v3_url')
else:
if not config.has_option('auth', 'auth_url'):
raise Exception('config missing auth_url key')
cls.auth_url = config.get('auth', 'auth_url')
# optional magnum bypass url
cls.magnum_url = config.get('auth', 'magnum_url')
if config.has_option('auth', 'region'):
cls.region = config.get('auth', 'region')
else:
cls.region = 'RegionOne'
# magnum functional test variables
cls.image_id = config.get('magnum', 'image_id')
if not config.has_option('magnum', 'image_id'):
raise Exception('config missing image_id key')
cls.nic_id = config.get('magnum', 'nic_id')
if not config.has_option('magnum', 'nic_id'):
raise Exception('config missing nic_id key')
cls.keypair_id = config.get('magnum', 'keypair_id')
if not config.has_option('magnum', 'keypair_id'):
raise Exception('config missing keypair_id key')
else:
raise Exception('missing functional_creds.conf file')

View File

@ -0,0 +1,144 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import random
import socket
import string
import struct
import uuid
from magnum.tests.functional.api.v1.models import baymodel_model
from magnum.tests.functional.api.v1.models import baymodelpatch_model
from magnum.tests.functional.common import config
def random_uuid():
return uuid.uuid4()
def random_string(prefix='rand', n=8, suffix=''):
"""Return a string containing random digits
:param prefix: the exact text to start the string. Defaults to "rand"
:param n: the number of random digits to generate
:param suffix: the exact text to end the string
"""
digits = "".join(str(random.randrange(0, 10)) for _ in range(n))
return prefix + digits + suffix
def generate_random_network():
network_list = ["public", "private"]
return network_list[random.randrange(0, len(network_list))]
def generate_random_coe():
coe_list = ["swarm", "kubernetes", "mesos"]
return coe_list[random.randrange(0, len(coe_list))]
def generate_random_port():
return random.randrange(49152, 65535)
def generate_random_docker_volume_size():
return random.randrange(1, 100)
def generate_fake_ssh_pubkey():
chars = "".join(
random.choice(string.ascii_uppercase +
string.ascii_letters + string.digits + '/+=')
for _ in range(372))
return "ssh-rsa " + chars
def generate_random_ip():
return socket.inet_ntoa(struct.pack('>I', random.randint(1, 0xffffffff)))
def random_baymodel_data(keypair_id=random_string(), image_id=random_string()):
"""Generates random baymodel data
Keypair and image id cannot be random for the baymodel to be valid due to
validations for the presence of keypair and image id prior to baymodel
creation.
:param keypair_id: keypair name
:param image_id: image id or name
:returns: BayModelEntity with generated data
"""
data = {
"name": random_string(),
"image_id": image_id,
"flavor_id": random_string(),
"master_flavor_id": random_string(),
"dns_nameserver": generate_random_ip(),
"keypair_id": keypair_id,
"external_network_id": str(random_uuid()),
"fixed_network": generate_random_network(),
"apiserver_port": generate_random_port(),
"docker_volume_size": generate_random_docker_volume_size(),
"cluster_distro": random_string(),
"ssh_authorized_key": generate_fake_ssh_pubkey(),
"coe": generate_random_coe(),
"http_proxy": "http://proxy.com:%s" % generate_random_port(),
"https_proxy": "https://proxy.com:%s" % generate_random_port(),
"no_proxy": ",".join(generate_random_ip() for x in range(3))
}
model = baymodel_model.BayModelEntity.from_dict(data)
return model
def random_baymodel_name_patch_data(name=random_string()):
"""Generates random baymodel patch data
:param name: name to replace in patch
:returns: BayModelPatchCollection with generated data
"""
data = [{
"path": "/name",
"value": name,
"op": "replace"
}]
return baymodelpatch_model.BayModelPatchCollection.from_dict(data)
def random_baymodel_data_w_valid_keypair_and_image_id():
"""Generates random baymodel data with valid keypair and image
:returns: BayModelEntity with generated data
"""
return random_baymodel_data(keypair_id=config.Config.keypair_id,
image_id=config.Config.image_id)
def random_baymodel_data_w_valid_keypair():
"""Generates random baymodel data with valid keypair
:returns: BayModelEntity with generated data
"""
return random_baymodel_data(keypair_id=config.Config.keypair_id)
def random_baymodel_data_w_valid_image_id():
"""Generates random baymodel data with valid image
:returns: BayModelEntity with generated data
"""
return random_baymodel_data(image_id=config.Config.image_id)

View File

@ -0,0 +1,62 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from tempest_lib import auth
from tempest_lib import exceptions
import magnum.tests.functional.common.config as config
class Manager(object):
"""Responsible for providing an auth provider object"""
def _get_auth_credentials(self, auth_version, **credentials):
"""Retrieves auth credentials based on passed in creds and version
:param auth_version: auth version ('v2' or 'v3')
:param credentials: credentials dict to validate against
:returns: credentials object
"""
if credentials is None:
raise exceptions.InvalidCredentials(
'Credentials must be specified')
if auth_version == 'v3':
return auth.KeystoneV3Credentials(**credentials)
elif auth_version == 'v2':
return auth.KeystoneV2Credentials(**credentials)
else:
raise exceptions.InvalidCredentials('Specify identity version')
def get_auth_provider(self, **credentials):
"""Validates credentials and returns auth provider
Auth provider will contain required security context to pass to magnum
:param credentials: credentials dict to validate against
:returns: auth provider object
"""
auth_version = config.Config.auth_version
creds = self._get_auth_credentials(auth_version, **credentials)
if auth_version == 'v3':
auth_provider = auth.KeystoneV3AuthProvider(
creds, config.Config.auth_url)
elif auth_version == 'v2':
auth_provider = auth.KeystoneV2AuthProvider(
creds, config.Config.auth_url)
else:
raise exceptions.InvalidCredentials('Specify identity version')
auth_provider.fill_credentials()
return auth_provider

View File

@ -0,0 +1,70 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import json
class BaseModel(object):
"""Superclass Responsible for converting json data to/from model"""
@classmethod
def from_json(cls, json_str):
return cls.from_dict(json.loads(json_str))
def to_json(self):
return json.dumps(self.to_dict())
@classmethod
def from_dict(cls, data):
model = cls()
for key in data:
setattr(model, key, data.get(key))
return model
def to_dict(self):
result = {}
for key in self.__dict__:
result[key] = getattr(self, key)
if isinstance(result[key], BaseModel):
result[key] = result[key].to_dict()
return result
def __str__(self):
return "%s" % self.to_dict()
class EntityModel(BaseModel):
"""Superclass resposible from converting dict to instance of model"""
@classmethod
def from_dict(cls, data):
model = super(EntityModel, cls).from_dict(data)
if hasattr(model, cls.ENTITY_NAME):
val = getattr(model, cls.ENTITY_NAME)
setattr(model, cls.ENTITY_NAME, cls.MODEL_TYPE.from_dict(val))
return model
class CollectionModel(BaseModel):
"""Superclass resposible from converting dict to list of models"""
@classmethod
def from_dict(cls, data):
model = super(CollectionModel, cls).from_dict(data)
collection = []
if hasattr(model, cls.COLLECTION_NAME):
for d in getattr(model, cls.COLLECTION_NAME):
collection.append(cls.MODEL_TYPE.from_dict(d))
setattr(model, cls.COLLECTION_NAME, collection)
return model

View File

@ -0,0 +1,109 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import collections
import functools
import inspect
import time
import types
def def_method(f, *args, **kwargs):
@functools.wraps(f)
def new_method(self):
return f(self, *args, **kwargs)
return new_method
def parameterized_class(cls):
"""A class decorator for running parameterized test cases.
Mark your class with @parameterized_class.
Mark your test cases with @parameterized.
"""
test_functions = inspect.getmembers(cls, predicate=inspect.ismethod)
for (name, f) in test_functions:
if name.startswith('test_') and not hasattr(f, '_test_data'):
continue
# remove the original test function from the class
delattr(cls, name)
# add a new test function to the class for each entry in f._test_data
for tag, args in f._test_data.items():
new_name = "{0}_{1}".format(f.__name__, tag)
if hasattr(cls, new_name):
raise Exception(
"Parameterized test case '{0}.{1}' created from '{0}.{2}' "
"already exists".format(cls.__name__, new_name, name))
# Using `def new_method(self): f(self, **args)` is not sufficient
# (all new_methods use the same args value due to late binding).
# Instead, use this factory function.
new_method = def_method(f, **args)
# To add a method to a class, available for all instances:
# MyClass.method = types.MethodType(f, None, MyClass)
setattr(cls, new_name, types.MethodType(new_method, None, cls))
return cls
def parameterized(data):
"""A function decorator for parameterized test cases.
Example:
@parameterized({
'zero': dict(val=0),
'one': dict(val=1),
})
def test_val(self, val):
self.assertEqual(self.get_val(), val)
The above will generate two test cases:
`test_val_zero` which runs with val=0
`test_val_one` which runs with val=1
:param data: A dictionary that looks like {tag: {arg1: val1, ...}}
"""
def wrapped(f):
f._test_data = data
return f
return wrapped
def wait_for_condition(condition, interval=1, timeout=40):
end_time = time.time() + timeout
while time.time() < end_time:
result = condition()
if result:
return result
time.sleep(interval)
raise Exception("Timed out after {0} seconds".format(timeout))
def memoized(func):
"""A decorator to cache function's return value"""
cache = {}
@functools.wraps(func)
def wrapper(*args):
if not isinstance(args, collections.Hashable):
# args is not cacheable. just call the function.
return func(*args)
if args in cache:
return cache[args]
else:
value = func(*args)
cache[args] = value
return value
return wrapper

View File

@ -16,3 +16,4 @@ sphinx!=1.2.0,!=1.3b1,<1.3,>=1.1.2
testrepository>=0.0.18
testscenarios>=0.4
testtools>=1.4.0
tempest-lib>=0.8.0