openstacksdk/openstack/tests/unit/base.py

766 lines
31 KiB
Python

# Copyright 2010-2011 OpenStack Foundation
# Copyright (c) 2013 Hewlett-Packard Development Company, L.P.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import collections
import os
import time
import uuid
import fixtures
from keystoneauth1 import loading as ks_loading
import openstack.config as occ
from oslo_config import cfg
from requests import structures
from requests_mock.contrib import fixture as rm_fixture
from six.moves import urllib
import tempfile
import openstack.cloud
import openstack.connection
from openstack.tests import fakes
from openstack.fixture import connection as os_fixture
from openstack.tests import base
_ProjectData = collections.namedtuple(
'ProjectData',
'project_id, project_name, enabled, domain_id, description, '
'json_response, json_request')
_UserData = collections.namedtuple(
'UserData',
'user_id, password, name, email, description, domain_id, enabled, '
'json_response, json_request')
_GroupData = collections.namedtuple(
'GroupData',
'group_id, group_name, domain_id, description, json_response, '
'json_request')
_DomainData = collections.namedtuple(
'DomainData',
'domain_id, domain_name, description, json_response, '
'json_request')
_ServiceData = collections.namedtuple(
'Servicedata',
'service_id, service_name, service_type, description, enabled, '
'json_response_v3, json_response_v2, json_request')
_EndpointDataV3 = collections.namedtuple(
'EndpointData',
'endpoint_id, service_id, interface, region, url, enabled, '
'json_response, json_request')
_EndpointDataV2 = collections.namedtuple(
'EndpointData',
'endpoint_id, service_id, region, public_url, internal_url, '
'admin_url, v3_endpoint_list, json_response, '
'json_request')
# NOTE(notmorgan): Shade does not support domain-specific roles
# This should eventually be fixed if it becomes a main-stream feature.
_RoleData = collections.namedtuple(
'RoleData',
'role_id, role_name, json_response, json_request')
class TestCase(base.TestCase):
strict_cloud = False
def setUp(self, cloud_config_fixture='clouds.yaml'):
"""Run before each test method to initialize test environment."""
super(TestCase, self).setUp()
# Sleeps are for real testing, but unit tests shouldn't need them
realsleep = time.sleep
def _nosleep(seconds):
return realsleep(seconds * 0.0001)
self.sleep_fixture = self.useFixture(fixtures.MonkeyPatch(
'time.sleep',
_nosleep))
self.fixtures_directory = 'openstack/tests/unit/fixtures'
self.os_fixture = self.useFixture(
os_fixture.ConnectionFixture(project_id=fakes.PROJECT_ID))
# Isolate openstack.config from test environment
config = tempfile.NamedTemporaryFile(delete=False)
cloud_path = '%s/clouds/%s' % (self.fixtures_directory,
cloud_config_fixture)
with open(cloud_path, 'rb') as f:
content = f.read()
config.write(content)
config.close()
vendor = tempfile.NamedTemporaryFile(delete=False)
vendor.write(b'{}')
vendor.close()
self.config = occ.OpenStackConfig(
config_files=[config.name],
vendor_files=[vendor.name],
secure_files=['non-existant'])
self.oslo_config_dict = {
# All defaults for nova
'nova': {},
# monasca-api not in the service catalog
'monasca-api': {},
# Overrides for heat
'heat': {
'region_name': 'SpecialRegion',
'interface': 'internal',
'endpoint_override': 'https://example.org:8888/heat/v2'
},
# test a service with dashes
'ironic_inspector': {
'endpoint_override': 'https://example.org:5050',
},
}
# FIXME(notmorgan): Convert the uri_registry, discovery.json, and
# use of keystone_v3/v2 to a proper fixtures.Fixture. For now this
# is acceptable, but eventually this should become it's own fixture
# that encapsulates the registry, registering the URIs, and
# assert_calls (and calling assert_calls every test case that uses
# it on cleanup). Subclassing here could be 100% eliminated in the
# future allowing any class to simply
# self.useFixture(openstack.cloud.RequestsMockFixture) and get all
# the benefits.
# NOTE(notmorgan): use an ordered dict here to ensure we preserve the
# order in which items are added to the uri_registry. This makes
# the behavior more consistent when dealing with ensuring the
# requests_mock uri/query_string matchers are ordered and parse the
# request in the correct orders.
self._uri_registry = collections.OrderedDict()
self.discovery_json = os.path.join(
self.fixtures_directory, 'discovery.json')
self.use_keystone_v3()
self.__register_uris_called = False
def _load_ks_cfg_opts(self):
conf = cfg.ConfigOpts()
for group, opts in self.oslo_config_dict.items():
conf.register_group(cfg.OptGroup(group))
if opts is not None:
ks_loading.register_adapter_conf_options(conf, group)
for name, val in opts.items():
conf.set_override(name, val, group=group)
return conf
# TODO(shade) Update this to handle service type aliases
def get_mock_url(self, service_type, interface='public', resource=None,
append=None, base_url_append=None,
qs_elements=None):
endpoint_url = self.cloud.endpoint_for(
service_type=service_type, interface=interface)
# Strip trailing slashes, so as not to produce double-slashes below
if endpoint_url.endswith('/'):
endpoint_url = endpoint_url[:-1]
to_join = [endpoint_url]
qs = ''
if base_url_append:
to_join.append(base_url_append)
if resource:
to_join.append(resource)
to_join.extend(append or [])
if qs_elements is not None:
qs = '?%s' % '&'.join(qs_elements)
return '%(uri)s%(qs)s' % {'uri': '/'.join(to_join), 'qs': qs}
def mock_for_keystone_projects(self, project=None, v3=True,
list_get=False, id_get=False,
project_list=None, project_count=None):
if project:
assert not (project_list or project_count)
elif project_list:
assert not (project or project_count)
elif project_count:
assert not (project or project_list)
else:
raise Exception('Must specify a project, project_list, '
'or project_count')
assert list_get or id_get
base_url_append = 'v3' if v3 else None
if project:
project_list = [project]
elif project_count:
# Generate multiple projects
project_list = [self._get_project_data(v3=v3)
for c in range(0, project_count)]
uri_mock_list = []
if list_get:
uri_mock_list.append(
dict(method='GET',
uri=self.get_mock_url(
service_type='identity',
interface='admin',
resource='projects',
base_url_append=base_url_append),
status_code=200,
json={'projects': [p.json_response['project']
for p in project_list]})
)
if id_get:
for p in project_list:
uri_mock_list.append(
dict(method='GET',
uri=self.get_mock_url(
service_type='identity',
interface='admin',
resource='projects',
append=[p.project_id],
base_url_append=base_url_append),
status_code=200,
json=p.json_response)
)
self.__do_register_uris(uri_mock_list)
return project_list
def _get_project_data(self, project_name=None, enabled=None,
domain_id=None, description=None, v3=True,
project_id=None):
project_name = project_name or self.getUniqueString('projectName')
project_id = uuid.UUID(project_id or uuid.uuid4().hex).hex
response = {'id': project_id, 'name': project_name}
request = {'name': project_name}
domain_id = (domain_id or uuid.uuid4().hex) if v3 else None
if domain_id:
request['domain_id'] = domain_id
response['domain_id'] = domain_id
if enabled is not None:
enabled = bool(enabled)
response['enabled'] = enabled
request['enabled'] = enabled
response.setdefault('enabled', True)
request.setdefault('enabled', True)
if description:
response['description'] = description
request['description'] = description
request.setdefault('description', None)
if v3:
project_key = 'project'
else:
project_key = 'tenant'
return _ProjectData(project_id, project_name, enabled, domain_id,
description, {project_key: response},
{project_key: request})
def _get_group_data(self, name=None, domain_id=None, description=None):
group_id = uuid.uuid4().hex
name = name or self.getUniqueString('groupname')
domain_id = uuid.UUID(domain_id or uuid.uuid4().hex).hex
response = {'id': group_id, 'name': name, 'domain_id': domain_id}
request = {'name': name, 'domain_id': domain_id}
if description is not None:
response['description'] = description
request['description'] = description
return _GroupData(group_id, name, domain_id, description,
{'group': response}, {'group': request})
def _get_user_data(self, name=None, password=None, **kwargs):
name = name or self.getUniqueString('username')
password = password or self.getUniqueString('user_password')
user_id = uuid.uuid4().hex
response = {'name': name, 'id': user_id}
request = {'name': name, 'password': password}
if kwargs.get('domain_id'):
kwargs['domain_id'] = uuid.UUID(kwargs['domain_id']).hex
response['domain_id'] = kwargs.pop('domain_id')
request['domain_id'] = response['domain_id']
response['email'] = kwargs.pop('email', None)
request['email'] = response['email']
response['enabled'] = kwargs.pop('enabled', True)
request['enabled'] = response['enabled']
response['description'] = kwargs.pop('description', None)
if response['description']:
request['description'] = response['description']
self.assertIs(0, len(kwargs), message='extra key-word args received '
'on _get_user_data')
return _UserData(user_id, password, name, response['email'],
response['description'], response.get('domain_id'),
response.get('enabled'), {'user': response},
{'user': request})
def _get_domain_data(self, domain_name=None, description=None,
enabled=None):
domain_id = uuid.uuid4().hex
domain_name = domain_name or self.getUniqueString('domainName')
response = {'id': domain_id, 'name': domain_name}
request = {'name': domain_name}
if enabled is not None:
request['enabled'] = bool(enabled)
response['enabled'] = bool(enabled)
if description:
response['description'] = description
request['description'] = description
response.setdefault('enabled', True)
return _DomainData(domain_id, domain_name, description,
{'domain': response}, {'domain': request})
def _get_service_data(self, type=None, name=None, description=None,
enabled=True):
service_id = uuid.uuid4().hex
name = name or uuid.uuid4().hex
type = type or uuid.uuid4().hex
response = {'id': service_id, 'name': name, 'type': type,
'enabled': enabled}
if description is not None:
response['description'] = description
request = response.copy()
request.pop('id')
return _ServiceData(service_id, name, type, description, enabled,
{'service': response},
{'OS-KSADM:service': response}, request)
def _get_endpoint_v3_data(self, service_id=None, region=None,
url=None, interface=None, enabled=True):
endpoint_id = uuid.uuid4().hex
service_id = service_id or uuid.uuid4().hex
region = region or uuid.uuid4().hex
url = url or 'https://example.com/'
interface = interface or uuid.uuid4().hex
response = {'id': endpoint_id, 'service_id': service_id,
'region': region, 'interface': interface,
'url': url, 'enabled': enabled}
request = response.copy()
request.pop('id')
response['region_id'] = response['region']
return _EndpointDataV3(endpoint_id, service_id, interface, region,
url, enabled, {'endpoint': response},
{'endpoint': request})
def _get_endpoint_v2_data(self, service_id=None, region=None,
public_url=None, admin_url=None,
internal_url=None):
endpoint_id = uuid.uuid4().hex
service_id = service_id or uuid.uuid4().hex
region = region or uuid.uuid4().hex
response = {'id': endpoint_id, 'service_id': service_id,
'region': region}
v3_endpoints = {}
request = response.copy()
request.pop('id')
if admin_url:
response['adminURL'] = admin_url
v3_endpoints['admin'] = self._get_endpoint_v3_data(
service_id, region, public_url, interface='admin')
if internal_url:
response['internalURL'] = internal_url
v3_endpoints['internal'] = self._get_endpoint_v3_data(
service_id, region, internal_url, interface='internal')
if public_url:
response['publicURL'] = public_url
v3_endpoints['public'] = self._get_endpoint_v3_data(
service_id, region, public_url, interface='public')
request = response.copy()
request.pop('id')
for u in ('publicURL', 'internalURL', 'adminURL'):
if request.get(u):
request[u.lower()] = request.pop(u)
return _EndpointDataV2(endpoint_id, service_id, region, public_url,
internal_url, admin_url, v3_endpoints,
{'endpoint': response}, {'endpoint': request})
def _get_role_data(self, role_name=None):
role_id = uuid.uuid4().hex
role_name = role_name or uuid.uuid4().hex
request = {'name': role_name}
response = request.copy()
response['id'] = role_id
return _RoleData(role_id, role_name, {'role': response},
{'role': request})
def use_broken_keystone(self):
self.adapter = self.useFixture(rm_fixture.Fixture())
self.calls = []
self._uri_registry.clear()
self.__do_register_uris([
dict(method='GET', uri='https://identity.example.com/',
text=open(self.discovery_json, 'r').read()),
dict(method='POST',
uri='https://identity.example.com/v3/auth/tokens',
status_code=400),
])
self._make_test_cloud(identity_api_version='3')
def use_nothing(self):
self.calls = []
self._uri_registry.clear()
def get_keystone_v3_token(
self,
project_name='admin',
):
return dict(
method='POST',
uri='https://identity.example.com/v3/auth/tokens',
headers={
'X-Subject-Token': self.getUniqueString('KeystoneToken')
},
json=self.os_fixture.v3_token,
validate=dict(json={
'auth': {
'identity': {
'methods': ['password'],
'password': {
'user': {
'domain': {
'name': 'default',
},
'name': 'admin',
'password': 'password'
}
}
},
'scope': {
'project': {
'domain': {
'name': 'default'
},
'name': project_name
}
}
}
}),
)
def get_keystone_discovery(self):
with open(self.discovery_json, 'r') as discovery_file:
return dict(
method='GET',
uri='https://identity.example.com/',
text=discovery_file.read(),
)
def use_keystone_v3(self):
self.adapter = self.useFixture(rm_fixture.Fixture())
self.calls = []
self._uri_registry.clear()
self.__do_register_uris([
self.get_keystone_discovery(),
self.get_keystone_v3_token(),
])
self._make_test_cloud(identity_api_version='3')
def use_keystone_v2(self):
self.adapter = self.useFixture(rm_fixture.Fixture())
self.calls = []
self._uri_registry.clear()
self.__do_register_uris([
self.get_keystone_discovery(),
dict(method='POST',
uri='https://identity.example.com/v2.0/tokens',
json=self.os_fixture.v2_token,
),
])
self._make_test_cloud(cloud_name='_test_cloud_v2_',
identity_api_version='2.0')
def _make_test_cloud(self, cloud_name='_test_cloud_', **kwargs):
test_cloud = os.environ.get('OPENSTACKSDK_OS_CLOUD', cloud_name)
self.cloud_config = self.config.get_one(
cloud=test_cloud, validate=True, **kwargs)
self.cloud = openstack.connection.Connection(
config=self.cloud_config, strict=self.strict_cloud)
def get_cinder_discovery_mock_dict(
self,
block_storage_version_json='block-storage-version.json',
block_storage_discovery_url='https://block-storage.example.com/'):
discovery_fixture = os.path.join(
self.fixtures_directory, block_storage_version_json)
return dict(method='GET', uri=block_storage_discovery_url,
text=open(discovery_fixture, 'r').read())
def get_glance_discovery_mock_dict(
self,
image_version_json='image-version.json',
image_discovery_url='https://image.example.com/'):
discovery_fixture = os.path.join(
self.fixtures_directory, image_version_json)
return dict(method='GET', uri=image_discovery_url,
status_code=300,
text=open(discovery_fixture, 'r').read())
def get_nova_discovery_mock_dict(
self,
compute_version_json='compute-version.json',
compute_discovery_url='https://compute.example.com/v2.1/'):
discovery_fixture = os.path.join(
self.fixtures_directory, compute_version_json)
return dict(
method='GET',
uri=compute_discovery_url,
text=open(discovery_fixture, 'r').read())
def get_placement_discovery_mock_dict(self):
discovery_fixture = os.path.join(
self.fixtures_directory, "placement.json")
return dict(method='GET', uri="https://placement.example.com/",
text=open(discovery_fixture, 'r').read())
def get_designate_discovery_mock_dict(self):
discovery_fixture = os.path.join(
self.fixtures_directory, "dns.json")
return dict(method='GET', uri="https://dns.example.com/",
text=open(discovery_fixture, 'r').read())
def get_ironic_discovery_mock_dict(self):
discovery_fixture = os.path.join(
self.fixtures_directory, "baremetal.json")
return dict(method='GET', uri="https://baremetal.example.com/",
text=open(discovery_fixture, 'r').read())
def get_senlin_discovery_mock_dict(self):
discovery_fixture = os.path.join(
self.fixtures_directory, "clustering.json")
return dict(method='GET', uri="https://clustering.example.com/",
text=open(discovery_fixture, 'r').read())
def use_compute_discovery(
self, compute_version_json='compute-version.json',
compute_discovery_url='https://compute.example.com/v2.1/'):
self.__do_register_uris([
self.get_nova_discovery_mock_dict(
compute_version_json, compute_discovery_url),
])
def use_glance(
self, image_version_json='image-version.json',
image_discovery_url='https://image.example.com/'):
# NOTE(notmorgan): This method is only meant to be used in "setUp"
# where the ordering of the url being registered is tightly controlled
# if the functionality of .use_glance is meant to be used during an
# actual test case, use .get_glance_discovery_mock and apply to the
# right location in the mock_uris when calling .register_uris
self.__do_register_uris([
self.get_glance_discovery_mock_dict(
image_version_json, image_discovery_url)])
def use_cinder(self):
self.__do_register_uris([
self.get_cinder_discovery_mock_dict()])
def use_placement(self):
self.__do_register_uris([
self.get_placement_discovery_mock_dict()])
def use_designate(self):
# NOTE(slaweq): This method is only meant to be used in "setUp"
# where the ordering of the url being registered is tightly controlled
# if the functionality of .use_designate is meant to be used during an
# actual test case, use .get_designate_discovery_mock and apply to the
# right location in the mock_uris when calling .register_uris
self.__do_register_uris([
self.get_designate_discovery_mock_dict()])
def use_ironic(self):
# NOTE(TheJulia): This method is only meant to be used in "setUp"
# where the ordering of the url being registered is tightly controlled
# if the functionality of .use_ironic is meant to be used during an
# actual test case, use .get_ironic_discovery_mock and apply to the
# right location in the mock_uris when calling .register_uris
self.__do_register_uris([
self.get_ironic_discovery_mock_dict()])
def use_senlin(self):
# NOTE(elachance): This method is only meant to be used in "setUp"
# where the ordering of the url being registered is tightly controlled
# if the functionality of .use_senlin is meant to be used during an
# actual test case, use .get_senlin_discovery_mock and apply to the
# right location in the mock_uris when calling .register_uris
self.__do_register_uris([
self.get_senlin_discovery_mock_dict()])
def register_uris(self, uri_mock_list=None):
"""Mock a list of URIs and responses via requests mock.
This method may be called only once per test-case to avoid odd
and difficult to debug interactions. Discovery and Auth request mocking
happens separately from this method.
:param uri_mock_list: List of dictionaries that template out what is
passed to requests_mock fixture's `register_uri`.
Format is:
{'method': <HTTP_METHOD>,
'uri': <URI to be mocked>,
...
}
Common keys to pass in the dictionary:
* json: the json response (dict)
* status_code: the HTTP status (int)
* validate: The request body (dict) to
validate with assert_calls
all key-word arguments that are valid to send to
requests_mock are supported.
This list should be in the order in which calls
are made. When `assert_calls` is executed, order
here will be validated. Duplicate URIs and
Methods are allowed and will be collapsed into a
single matcher. Each response will be returned
in order as the URI+Method is hit.
:type uri_mock_list: list
:return: None
"""
assert not self.__register_uris_called
self.__do_register_uris(uri_mock_list or [])
self.__register_uris_called = True
def __do_register_uris(self, uri_mock_list=None):
for to_mock in uri_mock_list:
kw_params = {k: to_mock.pop(k)
for k in ('request_headers', 'complete_qs',
'_real_http')
if k in to_mock}
method = to_mock.pop('method')
uri = to_mock.pop('uri')
# NOTE(notmorgan): make sure the delimiter is non-url-safe, in this
# case "|" is used so that the split can be a bit easier on
# maintainers of this code.
key = '{method}|{uri}|{params}'.format(
method=method, uri=uri, params=kw_params)
validate = to_mock.pop('validate', {})
valid_keys = set(['json', 'headers', 'params', 'data'])
invalid_keys = set(validate.keys()) - valid_keys
if invalid_keys:
raise TypeError(
"Invalid values passed to validate: {keys}".format(
keys=invalid_keys))
headers = structures.CaseInsensitiveDict(to_mock.pop('headers',
{}))
if 'content-type' not in headers:
headers[u'content-type'] = 'application/json'
if 'exc' not in to_mock:
to_mock['headers'] = headers
self.calls += [
dict(
method=method,
url=uri, **validate)
]
self._uri_registry.setdefault(
key, {'response_list': [], 'kw_params': kw_params})
if self._uri_registry[key]['kw_params'] != kw_params:
raise AssertionError(
'PROGRAMMING ERROR: key-word-params '
'should be part of the uri_key and cannot change, '
'it will affect the matcher in requests_mock. '
'%(old)r != %(new)r' %
{'old': self._uri_registry[key]['kw_params'],
'new': kw_params})
self._uri_registry[key]['response_list'].append(to_mock)
for mocked, params in self._uri_registry.items():
mock_method, mock_uri, _ignored = mocked.split('|', 2)
self.adapter.register_uri(
mock_method, mock_uri, params['response_list'],
**params['kw_params'])
def assert_no_calls(self):
# TODO(mordred) For now, creating the adapter for self.conn is
# triggering catalog lookups. Make sure no_calls is only 2.
# When we can make that on-demand through a descriptor object,
# drop this to 0.
self.assertEqual(2, len(self.adapter.request_history))
def assert_calls(self, stop_after=None, do_count=True):
for (x, (call, history)) in enumerate(
zip(self.calls, self.adapter.request_history)):
if stop_after and x > stop_after:
break
call_uri_parts = urllib.parse.urlparse(call['url'])
history_uri_parts = urllib.parse.urlparse(history.url)
self.assertEqual(
(call['method'], call_uri_parts.scheme, call_uri_parts.netloc,
call_uri_parts.path, call_uri_parts.params,
urllib.parse.parse_qs(call_uri_parts.query)),
(history.method, history_uri_parts.scheme,
history_uri_parts.netloc, history_uri_parts.path,
history_uri_parts.params,
urllib.parse.parse_qs(history_uri_parts.query)),
('REST mismatch on call %(index)d. Expected %(call)r. '
'Got %(history)r). '
'NOTE: query string order differences wont cause mismatch' %
{
'index': x,
'call': '{method} {url}'.format(method=call['method'],
url=call['url']),
'history': '{method} {url}'.format(
method=history.method,
url=history.url)})
)
if 'json' in call:
self.assertEqual(
call['json'], history.json(),
'json content mismatch in call {index}'.format(index=x))
# headers in a call isn't exhaustive - it's checking to make sure
# a specific header or headers are there, not that they are the
# only headers
if 'headers' in call:
for key, value in call['headers'].items():
self.assertEqual(
value, history.headers[key],
'header mismatch in call {index}'.format(index=x))
if do_count:
self.assertEqual(
len(self.calls), len(self.adapter.request_history))
class IronicTestCase(TestCase):
def setUp(self):
super(IronicTestCase, self).setUp()
self.use_ironic()
self.uuid = str(uuid.uuid4())
self.name = self.getUniqueString('name')
def get_mock_url(self, **kwargs):
kwargs.setdefault('service_type', 'baremetal')
kwargs.setdefault('interface', 'public')
kwargs.setdefault('base_url_append', 'v1')
return super(IronicTestCase, self).get_mock_url(**kwargs)