Merge "Provide new_xyz_ref functions in tests.core"

This commit is contained in:
Jenkins 2015-09-02 22:59:27 +00:00 committed by Gerrit Code Review
commit 973cd22396
3 changed files with 159 additions and 107 deletions

View File

@ -14,6 +14,7 @@
from __future__ import absolute_import
import atexit
import datetime
import functools
import logging
import os
@ -21,12 +22,14 @@ import re
import shutil
import socket
import sys
import uuid
import warnings
import fixtures
from oslo_config import cfg
from oslo_config import fixture as config_fixture
from oslo_log import log
from oslo_utils import timeutils
import oslotest.base as oslotest
from oslotest import mockpatch
from paste.deploy import loadwsgi
@ -83,6 +86,8 @@ rules.init()
IN_MEM_DB_CONN_STRING = 'sqlite://'
TIME_FORMAT = '%Y-%m-%dT%H:%M:%S.%fZ'
exception._FATAL_EXCEPTION_FORMAT_ERRORS = True
os.makedirs(TMPDIR)
atexit.register(shutil.rmtree, TMPDIR)
@ -253,6 +258,137 @@ class TestClient(object):
return self.request('PUT', path=path, headers=headers, body=body)
def new_ref():
"""Populates a ref with attributes common to some API entities."""
return {
'id': uuid.uuid4().hex,
'name': uuid.uuid4().hex,
'description': uuid.uuid4().hex,
'enabled': True}
def new_region_ref():
ref = new_ref()
# Region doesn't have name or enabled.
del ref['name']
del ref['enabled']
ref['parent_region_id'] = None
return ref
def new_service_ref():
ref = new_ref()
ref['type'] = uuid.uuid4().hex
return ref
def new_endpoint_ref(service_id, interface='public', default_region_id=None,
**kwargs):
ref = new_ref()
del ref['enabled'] # enabled is optional
ref['interface'] = interface
ref['service_id'] = service_id
ref['url'] = 'https://' + uuid.uuid4().hex + '.com'
ref['region_id'] = default_region_id
ref.update(kwargs)
return ref
def new_domain_ref():
ref = new_ref()
return ref
def new_project_ref(domain_id=None, parent_id=None, is_domain=False):
ref = new_ref()
ref['domain_id'] = domain_id
ref['parent_id'] = parent_id
ref['is_domain'] = is_domain
return ref
def new_user_ref(domain_id, project_id=None):
ref = new_ref()
ref['domain_id'] = domain_id
ref['email'] = uuid.uuid4().hex
ref['password'] = uuid.uuid4().hex
if project_id:
ref['default_project_id'] = project_id
return ref
def new_group_ref(domain_id):
ref = new_ref()
ref['domain_id'] = domain_id
return ref
def new_credential_ref(user_id, project_id=None, cred_type=None):
ref = dict()
ref['id'] = uuid.uuid4().hex
ref['user_id'] = user_id
if cred_type == 'ec2':
ref['type'] = 'ec2'
ref['blob'] = {'blah': 'test'}
else:
ref['type'] = 'cert'
ref['blob'] = uuid.uuid4().hex
if project_id:
ref['project_id'] = project_id
return ref
def new_role_ref():
ref = new_ref()
# Roles don't have a description or the enabled flag
del ref['description']
del ref['enabled']
return ref
def new_policy_ref():
ref = new_ref()
ref['blob'] = uuid.uuid4().hex
ref['type'] = uuid.uuid4().hex
return ref
def new_trust_ref(trustor_user_id, trustee_user_id, project_id=None,
impersonation=None, expires=None, role_ids=None,
role_names=None, remaining_uses=None,
allow_redelegation=False):
ref = dict()
ref['id'] = uuid.uuid4().hex
ref['trustor_user_id'] = trustor_user_id
ref['trustee_user_id'] = trustee_user_id
ref['impersonation'] = impersonation or False
ref['project_id'] = project_id
ref['remaining_uses'] = remaining_uses
ref['allow_redelegation'] = allow_redelegation
if isinstance(expires, six.string_types):
ref['expires_at'] = expires
elif isinstance(expires, dict):
ref['expires_at'] = (
timeutils.utcnow() + datetime.timedelta(**expires)
).strftime(TIME_FORMAT)
elif expires is None:
pass
else:
raise NotImplementedError('Unexpected value for "expires"')
role_ids = role_ids or []
role_names = role_names or []
if role_ids or role_names:
ref['roles'] = []
for role_id in role_ids:
ref['roles'].append({'id': role_id})
for role_name in role_names:
ref['roles'].append({'name': role_name})
return ref
class BaseTestCase(oslotest.BaseTestCase):
"""Light weight base test class.

View File

@ -30,7 +30,7 @@ class V2CatalogTestCase(rest.RestfulTestCase):
self.useFixture(database.Database())
self.service_id = uuid.uuid4().hex
self.service = self.new_service_ref()
self.service = tests.new_service_ref()
self.service['id'] = self.service_id
self.catalog_api.create_service(
self.service_id,
@ -47,19 +47,6 @@ class V2CatalogTestCase(rest.RestfulTestCase):
super(V2CatalogTestCase, self).config_overrides()
self.config_fixture.config(group='catalog', driver='sql')
def new_ref(self):
"""Populates a ref with attributes common to all API entities."""
return {
'id': uuid.uuid4().hex,
'name': uuid.uuid4().hex,
'description': uuid.uuid4().hex,
'enabled': True}
def new_service_ref(self):
ref = self.new_ref()
ref['type'] = uuid.uuid4().hex
return ref
def _get_token_id(self, r):
"""Applicable only to JSON."""
return r.result['access']['token']['id']

View File

@ -18,7 +18,6 @@ import uuid
from oslo_config import cfg
from oslo_serialization import jsonutils
from oslo_utils import timeutils
import six
from testtools import matchers
from keystone import auth
@ -34,7 +33,7 @@ from keystone.tests.unit import rest
CONF = cfg.CONF
DEFAULT_DOMAIN_ID = 'default'
TIME_FORMAT = '%Y-%m-%dT%H:%M:%S.%fZ'
TIME_FORMAT = tests.TIME_FORMAT
class AuthTestMixin(object):
@ -265,122 +264,52 @@ class RestfulTestCase(tests.SQLDriverOverrides, rest.RestfulTestCase,
self.endpoint['enabled'] = True
def new_ref(self):
"""Populates a ref with attributes common to all API entities."""
return {
'id': uuid.uuid4().hex,
'name': uuid.uuid4().hex,
'description': uuid.uuid4().hex,
'enabled': True}
"""Populates a ref with attributes common to some API entities."""
return tests.new_ref()
def new_region_ref(self):
ref = self.new_ref()
# Region doesn't have name or enabled.
del ref['name']
del ref['enabled']
ref['parent_region_id'] = None
return ref
return tests.new_region_ref()
def new_service_ref(self):
ref = self.new_ref()
ref['type'] = uuid.uuid4().hex
return ref
return tests.new_service_ref()
def new_endpoint_ref(self, service_id, interface='public', **kwargs):
ref = self.new_ref()
del ref['enabled'] # enabled is optional
ref['interface'] = interface
ref['service_id'] = service_id
ref['url'] = 'https://' + uuid.uuid4().hex + '.com'
ref['region_id'] = self.region_id
ref.update(kwargs)
return ref
return tests.new_endpoint_ref(
service_id, interface=interface, default_region_id=self.region_id,
**kwargs)
def new_domain_ref(self):
ref = self.new_ref()
return ref
return tests.new_domain_ref()
def new_project_ref(self, domain_id=None, parent_id=None, is_domain=False):
ref = self.new_ref()
ref['domain_id'] = domain_id
ref['parent_id'] = parent_id
ref['is_domain'] = is_domain
return ref
return tests.new_project_ref(domain_id=domain_id, parent_id=parent_id,
is_domain=is_domain)
def new_user_ref(self, domain_id, project_id=None):
ref = self.new_ref()
ref['domain_id'] = domain_id
ref['email'] = uuid.uuid4().hex
ref['password'] = uuid.uuid4().hex
if project_id:
ref['default_project_id'] = project_id
return ref
return tests.new_user_ref(domain_id, project_id=project_id)
def new_group_ref(self, domain_id):
ref = self.new_ref()
ref['domain_id'] = domain_id
return ref
return tests.new_group_ref(domain_id)
def new_credential_ref(self, user_id, project_id=None, cred_type=None):
ref = dict()
ref['id'] = uuid.uuid4().hex
ref['user_id'] = user_id
if cred_type == 'ec2':
ref['type'] = 'ec2'
ref['blob'] = {'blah': 'test'}
else:
ref['type'] = 'cert'
ref['blob'] = uuid.uuid4().hex
if project_id:
ref['project_id'] = project_id
return ref
return tests.new_credential_ref(user_id, project_id=project_id,
cred_type=cred_type)
def new_role_ref(self):
ref = self.new_ref()
# Roles don't have a description or the enabled flag
del ref['description']
del ref['enabled']
return ref
return tests.new_role_ref()
def new_policy_ref(self):
ref = self.new_ref()
ref['blob'] = uuid.uuid4().hex
ref['type'] = uuid.uuid4().hex
return ref
return tests.new_policy_ref()
def new_trust_ref(self, trustor_user_id, trustee_user_id, project_id=None,
impersonation=None, expires=None, role_ids=None,
role_names=None, remaining_uses=None,
allow_redelegation=False):
ref = dict()
ref['id'] = uuid.uuid4().hex
ref['trustor_user_id'] = trustor_user_id
ref['trustee_user_id'] = trustee_user_id
ref['impersonation'] = impersonation or False
ref['project_id'] = project_id
ref['remaining_uses'] = remaining_uses
ref['allow_redelegation'] = allow_redelegation
if isinstance(expires, six.string_types):
ref['expires_at'] = expires
elif isinstance(expires, dict):
ref['expires_at'] = (
timeutils.utcnow() + datetime.timedelta(**expires)
).strftime(TIME_FORMAT)
elif expires is None:
pass
else:
raise NotImplementedError('Unexpected value for "expires"')
role_ids = role_ids or []
role_names = role_names or []
if role_ids or role_names:
ref['roles'] = []
for role_id in role_ids:
ref['roles'].append({'id': role_id})
for role_name in role_names:
ref['roles'].append({'name': role_name})
return ref
return tests.new_trust_ref(
trustor_user_id, trustee_user_id, project_id=project_id,
impersonation=impersonation, expires=expires, role_ids=role_ids,
role_names=role_names, remaining_uses=remaining_uses,
allow_redelegation=allow_redelegation)
def create_new_default_project_for_user(self, user_id, domain_id,
enable_project=True):