Merge "map fixtures to keystoneauth"
This commit is contained in:
commit
bbac70ad3b
|
@ -20,14 +20,27 @@ upon to generate test tokens for other clients. However they should never be
|
|||
imported into the main client (keystoneclient or other). Because of this there
|
||||
may be dependencies from this module on libraries that are only available in
|
||||
testing.
|
||||
|
||||
.. warning::
|
||||
|
||||
The keystoneclient.fixture package is deprecated in favor of
|
||||
keystoneauth1.fixture and will not be supported.
|
||||
|
||||
"""
|
||||
|
||||
import warnings
|
||||
|
||||
from keystoneclient.fixture.discovery import * # noqa
|
||||
from keystoneclient.fixture import exception
|
||||
from keystoneclient.fixture import v2
|
||||
from keystoneclient.fixture import v3
|
||||
|
||||
|
||||
warnings.warn(
|
||||
"The keystoneclient.fixture package is deprecated in favor of "
|
||||
"keystoneauth1.fixture and will not be supported.", DeprecationWarning)
|
||||
|
||||
|
||||
FixtureValidationError = exception.FixtureValidationError
|
||||
V2Token = v2.Token
|
||||
V3Token = v3.Token
|
||||
|
|
|
@ -10,252 +10,29 @@
|
|||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import datetime
|
||||
from keystoneauth1.fixture import discovery
|
||||
|
||||
from oslo_utils import timeutils
|
||||
from positional import positional
|
||||
|
||||
from keystoneclient import utils
|
||||
|
||||
__all__ = ('DiscoveryList',
|
||||
'V2Discovery',
|
||||
'V3Discovery',
|
||||
)
|
||||
|
||||
_DEFAULT_DAYS_AGO = 30
|
||||
|
||||
V2Discovery = discovery.V2Discovery
|
||||
"""A Version element for a V2 identity service endpoint.
|
||||
|
||||
class DiscoveryBase(dict):
|
||||
"""The basic version discovery structure.
|
||||
An alias of :py:exc:`keystoneauth1.fixture.discovery.V2Discovery`
|
||||
"""
|
||||
|
||||
All version discovery elements should have access to these values.
|
||||
V3Discovery = discovery.V3Discovery
|
||||
"""A Version element for a V3 identity service endpoint.
|
||||
|
||||
:param string id: The version id for this version entry.
|
||||
:param string status: The status of this entry.
|
||||
:param DateTime updated: When the API was last updated.
|
||||
"""
|
||||
An alias of :py:exc:`keystoneauth1.fixture.discovery.V3Discovery`
|
||||
"""
|
||||
|
||||
@positional()
|
||||
def __init__(self, id, status=None, updated=None):
|
||||
super(DiscoveryBase, self).__init__()
|
||||
DiscoveryList = discovery.DiscoveryList
|
||||
"""A List of version elements.
|
||||
|
||||
self.id = id
|
||||
self.status = status or 'stable'
|
||||
self.updated = updated or (timeutils.utcnow() -
|
||||
datetime.timedelta(days=_DEFAULT_DAYS_AGO))
|
||||
|
||||
@property
|
||||
def id(self):
|
||||
return self.get('id')
|
||||
|
||||
@id.setter
|
||||
def id(self, value):
|
||||
self['id'] = value
|
||||
|
||||
@property
|
||||
def status(self):
|
||||
return self.get('status')
|
||||
|
||||
@status.setter
|
||||
def status(self, value):
|
||||
self['status'] = value
|
||||
|
||||
@property
|
||||
def links(self):
|
||||
return self.setdefault('links', [])
|
||||
|
||||
@property
|
||||
def updated_str(self):
|
||||
return self.get('updated')
|
||||
|
||||
@updated_str.setter
|
||||
def updated_str(self, value):
|
||||
self['updated'] = value
|
||||
|
||||
@property
|
||||
def updated(self):
|
||||
return timeutils.parse_isotime(self.updated_str)
|
||||
|
||||
@updated.setter
|
||||
def updated(self, value):
|
||||
self.updated_str = utils.isotime(value)
|
||||
|
||||
@positional()
|
||||
def add_link(self, href, rel='self', type=None):
|
||||
link = {'href': href, 'rel': rel}
|
||||
if type:
|
||||
link['type'] = type
|
||||
self.links.append(link)
|
||||
return link
|
||||
|
||||
@property
|
||||
def media_types(self):
|
||||
return self.setdefault('media-types', [])
|
||||
|
||||
@positional(1)
|
||||
def add_media_type(self, base, type):
|
||||
mt = {'base': base, 'type': type}
|
||||
self.media_types.append(mt)
|
||||
return mt
|
||||
|
||||
|
||||
class V2Discovery(DiscoveryBase):
|
||||
"""A Version element for a V2 identity service endpoint.
|
||||
|
||||
Provides some default values and helper methods for creating a v2.0
|
||||
endpoint version structure. Clients should use this instead of creating
|
||||
their own structures.
|
||||
|
||||
:param string href: The url that this entry should point to.
|
||||
:param string id: The version id that should be reported. (optional)
|
||||
Defaults to 'v2.0'.
|
||||
:param bool html: Add HTML describedby links to the structure.
|
||||
:param bool pdf: Add PDF describedby links to the structure.
|
||||
|
||||
"""
|
||||
|
||||
_DESC_URL = 'http://docs.openstack.org/api/openstack-identity-service/2.0/'
|
||||
|
||||
@positional()
|
||||
def __init__(self, href, id=None, html=True, pdf=True, **kwargs):
|
||||
super(V2Discovery, self).__init__(id or 'v2.0', **kwargs)
|
||||
|
||||
self.add_link(href)
|
||||
|
||||
if html:
|
||||
self.add_html_description()
|
||||
if pdf:
|
||||
self.add_pdf_description()
|
||||
|
||||
def add_html_description(self):
|
||||
"""Add the HTML described by links.
|
||||
|
||||
The standard structure includes a link to a HTML document with the
|
||||
API specification. Add it to this entry.
|
||||
"""
|
||||
self.add_link(href=self._DESC_URL + 'content',
|
||||
rel='describedby',
|
||||
type='text/html')
|
||||
|
||||
def add_pdf_description(self):
|
||||
"""Add the PDF described by links.
|
||||
|
||||
The standard structure includes a link to a PDF document with the
|
||||
API specification. Add it to this entry.
|
||||
"""
|
||||
self.add_link(href=self._DESC_URL + 'identity-dev-guide-2.0.pdf',
|
||||
rel='describedby',
|
||||
type='application/pdf')
|
||||
|
||||
|
||||
class V3Discovery(DiscoveryBase):
|
||||
"""A Version element for a V3 identity service endpoint.
|
||||
|
||||
Provides some default values and helper methods for creating a v3
|
||||
endpoint version structure. Clients should use this instead of creating
|
||||
their own structures.
|
||||
|
||||
:param href: The url that this entry should point to.
|
||||
:param string id: The version id that should be reported. (optional)
|
||||
Defaults to 'v3.0'.
|
||||
:param bool json: Add JSON media-type elements to the structure.
|
||||
:param bool xml: Add XML media-type elements to the structure.
|
||||
"""
|
||||
|
||||
@positional()
|
||||
def __init__(self, href, id=None, json=True, xml=True, **kwargs):
|
||||
super(V3Discovery, self).__init__(id or 'v3.0', **kwargs)
|
||||
|
||||
self.add_link(href)
|
||||
|
||||
if json:
|
||||
self.add_json_media_type()
|
||||
if xml:
|
||||
self.add_xml_media_type()
|
||||
|
||||
def add_json_media_type(self):
|
||||
"""Add the JSON media-type links.
|
||||
|
||||
The standard structure includes a list of media-types that the endpoint
|
||||
supports. Add JSON to the list.
|
||||
"""
|
||||
self.add_media_type(base='application/json',
|
||||
type='application/vnd.openstack.identity-v3+json')
|
||||
|
||||
def add_xml_media_type(self):
|
||||
"""Add the XML media-type links.
|
||||
|
||||
The standard structure includes a list of media-types that the endpoint
|
||||
supports. Add XML to the list.
|
||||
"""
|
||||
self.add_media_type(base='application/xml',
|
||||
type='application/vnd.openstack.identity-v3+xml')
|
||||
|
||||
|
||||
class DiscoveryList(dict):
|
||||
"""A List of version elements.
|
||||
|
||||
Creates a correctly structured list of identity service endpoints for
|
||||
use in testing with discovery.
|
||||
|
||||
:param string href: The url that this should be based at.
|
||||
:param bool v2: Add a v2 element.
|
||||
:param bool v3: Add a v3 element.
|
||||
:param string v2_status: The status to use for the v2 element.
|
||||
:param DateTime v2_updated: The update time to use for the v2 element.
|
||||
:param bool v2_html: True to add a html link to the v2 element.
|
||||
:param bool v2_pdf: True to add a pdf link to the v2 element.
|
||||
:param string v3_status: The status to use for the v3 element.
|
||||
:param DateTime v3_updated: The update time to use for the v3 element.
|
||||
:param bool v3_json: True to add a html link to the v2 element.
|
||||
:param bool v3_xml: True to add a pdf link to the v2 element.
|
||||
"""
|
||||
|
||||
TEST_URL = 'http://keystone.host:5000/'
|
||||
|
||||
@positional(2)
|
||||
def __init__(self, href=None, v2=True, v3=True, v2_id=None, v3_id=None,
|
||||
v2_status=None, v2_updated=None, v2_html=True, v2_pdf=True,
|
||||
v3_status=None, v3_updated=None, v3_json=True, v3_xml=True):
|
||||
super(DiscoveryList, self).__init__(versions={'values': []})
|
||||
|
||||
href = href or self.TEST_URL
|
||||
|
||||
if v2:
|
||||
v2_href = href.rstrip('/') + '/v2.0'
|
||||
self.add_v2(v2_href, id=v2_id, status=v2_status,
|
||||
updated=v2_updated, html=v2_html, pdf=v2_pdf)
|
||||
|
||||
if v3:
|
||||
v3_href = href.rstrip('/') + '/v3'
|
||||
self.add_v3(v3_href, id=v3_id, status=v3_status,
|
||||
updated=v3_updated, json=v3_json, xml=v3_xml)
|
||||
|
||||
@property
|
||||
def versions(self):
|
||||
return self['versions']['values']
|
||||
|
||||
def add_version(self, version):
|
||||
"""Add a new version structure to the list.
|
||||
|
||||
:param dict version: A new version structure to add to the list.
|
||||
"""
|
||||
self.versions.append(version)
|
||||
|
||||
def add_v2(self, href, **kwargs):
|
||||
"""Add a v2 version to the list.
|
||||
|
||||
The parameters are the same as V2Discovery.
|
||||
"""
|
||||
obj = V2Discovery(href, **kwargs)
|
||||
self.add_version(obj)
|
||||
return obj
|
||||
|
||||
def add_v3(self, href, **kwargs):
|
||||
"""Add a v3 version to the list.
|
||||
|
||||
The parameters are the same as V3Discovery.
|
||||
"""
|
||||
obj = V3Discovery(href, **kwargs)
|
||||
self.add_version(obj)
|
||||
return obj
|
||||
An alias of :py:exc:`keystoneauth1.fixture.discovery.DiscoveryList`
|
||||
"""
|
||||
|
|
|
@ -10,11 +10,11 @@
|
|||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
from keystoneauth1.fixture import exception
|
||||
|
||||
class FixtureValidationError(Exception):
|
||||
"""The token you created is not legitimate.
|
||||
|
||||
The data contained in the token that was generated is not valid and would
|
||||
not have been returned from a keystone server. You should not do testing
|
||||
with this token.
|
||||
"""
|
||||
FixtureValidationError = exception.FixtureValidationError
|
||||
"""The token you created is not legitimate.
|
||||
|
||||
An alias of :py:exc:`keystoneauth1.fixture.exception.FixtureValidationError``
|
||||
"""
|
||||
|
|
|
@ -10,237 +10,11 @@
|
|||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import datetime
|
||||
import uuid
|
||||
|
||||
from oslo_utils import timeutils
|
||||
|
||||
from keystoneclient.fixture import exception
|
||||
from keystoneclient import utils
|
||||
from keystoneauth1.fixture import v2
|
||||
|
||||
|
||||
class _Service(dict):
|
||||
Token = v2.Token
|
||||
"""A V2 Keystone token that can be used for testing.
|
||||
|
||||
def add_endpoint(self, public, admin=None, internal=None,
|
||||
tenant_id=None, region=None, id=None):
|
||||
data = {'tenantId': tenant_id or uuid.uuid4().hex,
|
||||
'publicURL': public,
|
||||
'adminURL': admin or public,
|
||||
'internalURL': internal or public,
|
||||
'region': region,
|
||||
'id': id or uuid.uuid4().hex}
|
||||
|
||||
self.setdefault('endpoints', []).append(data)
|
||||
return data
|
||||
|
||||
|
||||
class Token(dict):
|
||||
"""A V2 Keystone token that can be used for testing.
|
||||
|
||||
This object is designed to allow clients to generate a correct V2 token for
|
||||
use in there test code. It should prevent clients from having to know the
|
||||
correct token format and allow them to test the portions of token handling
|
||||
that matter to them and not copy and paste sample.
|
||||
"""
|
||||
|
||||
def __init__(self, token_id=None, expires=None, issued=None,
|
||||
tenant_id=None, tenant_name=None, user_id=None,
|
||||
user_name=None, trust_id=None, trustee_user_id=None,
|
||||
audit_id=None, audit_chain_id=None):
|
||||
super(Token, self).__init__()
|
||||
|
||||
self.token_id = token_id or uuid.uuid4().hex
|
||||
self.user_id = user_id or uuid.uuid4().hex
|
||||
self.user_name = user_name or uuid.uuid4().hex
|
||||
self.audit_id = audit_id or uuid.uuid4().hex
|
||||
|
||||
if not issued:
|
||||
issued = timeutils.utcnow() - datetime.timedelta(minutes=2)
|
||||
if not expires:
|
||||
expires = issued + datetime.timedelta(hours=1)
|
||||
|
||||
try:
|
||||
self.issued = issued
|
||||
except (TypeError, AttributeError):
|
||||
# issued should be able to be passed as a string so ignore
|
||||
self.issued_str = issued
|
||||
|
||||
try:
|
||||
self.expires = expires
|
||||
except (TypeError, AttributeError):
|
||||
# expires should be able to be passed as a string so ignore
|
||||
self.expires_str = expires
|
||||
|
||||
if tenant_id or tenant_name:
|
||||
self.set_scope(tenant_id, tenant_name)
|
||||
|
||||
if trust_id or trustee_user_id:
|
||||
# the trustee_user_id will generally be the same as the user_id as
|
||||
# the token is being issued to the trustee
|
||||
self.set_trust(id=trust_id,
|
||||
trustee_user_id=trustee_user_id or user_id)
|
||||
|
||||
if audit_chain_id:
|
||||
self.audit_chain_id = audit_chain_id
|
||||
|
||||
@property
|
||||
def root(self):
|
||||
return self.setdefault('access', {})
|
||||
|
||||
@property
|
||||
def _token(self):
|
||||
return self.root.setdefault('token', {})
|
||||
|
||||
@property
|
||||
def token_id(self):
|
||||
return self._token['id']
|
||||
|
||||
@token_id.setter
|
||||
def token_id(self, value):
|
||||
self._token['id'] = value
|
||||
|
||||
@property
|
||||
def expires_str(self):
|
||||
return self._token['expires']
|
||||
|
||||
@expires_str.setter
|
||||
def expires_str(self, value):
|
||||
self._token['expires'] = value
|
||||
|
||||
@property
|
||||
def expires(self):
|
||||
return timeutils.parse_isotime(self.expires_str)
|
||||
|
||||
@expires.setter
|
||||
def expires(self, value):
|
||||
self.expires_str = utils.isotime(value)
|
||||
|
||||
@property
|
||||
def issued_str(self):
|
||||
return self._token['issued_at']
|
||||
|
||||
@issued_str.setter
|
||||
def issued_str(self, value):
|
||||
self._token['issued_at'] = value
|
||||
|
||||
@property
|
||||
def issued(self):
|
||||
return timeutils.parse_isotime(self.issued_str)
|
||||
|
||||
@issued.setter
|
||||
def issued(self, value):
|
||||
self.issued_str = utils.isotime(value)
|
||||
|
||||
@property
|
||||
def _user(self):
|
||||
return self.root.setdefault('user', {})
|
||||
|
||||
@property
|
||||
def user_id(self):
|
||||
return self._user['id']
|
||||
|
||||
@user_id.setter
|
||||
def user_id(self, value):
|
||||
self._user['id'] = value
|
||||
|
||||
@property
|
||||
def user_name(self):
|
||||
return self._user['name']
|
||||
|
||||
@user_name.setter
|
||||
def user_name(self, value):
|
||||
self._user['name'] = value
|
||||
|
||||
@property
|
||||
def tenant_id(self):
|
||||
return self._token.get('tenant', {}).get('id')
|
||||
|
||||
@tenant_id.setter
|
||||
def tenant_id(self, value):
|
||||
self._token.setdefault('tenant', {})['id'] = value
|
||||
|
||||
@property
|
||||
def tenant_name(self):
|
||||
return self._token.get('tenant', {}).get('name')
|
||||
|
||||
@tenant_name.setter
|
||||
def tenant_name(self, value):
|
||||
self._token.setdefault('tenant', {})['name'] = value
|
||||
|
||||
@property
|
||||
def _metadata(self):
|
||||
return self.root.setdefault('metadata', {})
|
||||
|
||||
@property
|
||||
def trust_id(self):
|
||||
return self.root.setdefault('trust', {}).get('id')
|
||||
|
||||
@trust_id.setter
|
||||
def trust_id(self, value):
|
||||
self.root.setdefault('trust', {})['id'] = value
|
||||
|
||||
@property
|
||||
def trustee_user_id(self):
|
||||
return self.root.setdefault('trust', {}).get('trustee_user_id')
|
||||
|
||||
@trustee_user_id.setter
|
||||
def trustee_user_id(self, value):
|
||||
self.root.setdefault('trust', {})['trustee_user_id'] = value
|
||||
|
||||
@property
|
||||
def audit_id(self):
|
||||
try:
|
||||
return self._token.get('audit_ids', [])[0]
|
||||
except IndexError:
|
||||
return None
|
||||
|
||||
@audit_id.setter
|
||||
def audit_id(self, value):
|
||||
audit_chain_id = self.audit_chain_id
|
||||
lval = [value] if audit_chain_id else [value, audit_chain_id]
|
||||
self._token['audit_ids'] = lval
|
||||
|
||||
@property
|
||||
def audit_chain_id(self):
|
||||
try:
|
||||
return self._token.get('audit_ids', [])[1]
|
||||
except IndexError:
|
||||
return None
|
||||
|
||||
@audit_chain_id.setter
|
||||
def audit_chain_id(self, value):
|
||||
self._token['audit_ids'] = [self.audit_id, value]
|
||||
|
||||
def validate(self):
|
||||
scoped = 'tenant' in self.token
|
||||
catalog = self.root.get('serviceCatalog')
|
||||
|
||||
if catalog and not scoped:
|
||||
msg = 'You cannot have a service catalog on an unscoped token'
|
||||
raise exception.FixtureValidationError(msg)
|
||||
|
||||
if scoped and not self.user.get('roles'):
|
||||
msg = 'You must have roles on a token to scope it'
|
||||
raise exception.FixtureValidationError(msg)
|
||||
|
||||
def add_role(self, name=None, id=None):
|
||||
id = id or uuid.uuid4().hex
|
||||
name = name or uuid.uuid4().hex
|
||||
roles = self._user.setdefault('roles', [])
|
||||
roles.append({'name': name})
|
||||
self._metadata.setdefault('roles', []).append(id)
|
||||
return {'id': id, 'name': name}
|
||||
|
||||
def add_service(self, type, name=None):
|
||||
name = name or uuid.uuid4().hex
|
||||
service = _Service(name=name, type=type)
|
||||
self.root.setdefault('serviceCatalog', []).append(service)
|
||||
return service
|
||||
|
||||
def set_scope(self, id=None, name=None):
|
||||
self.tenant_id = id or uuid.uuid4().hex
|
||||
self.tenant_name = name or uuid.uuid4().hex
|
||||
|
||||
def set_trust(self, id=None, trustee_user_id=None):
|
||||
self.trust_id = id or uuid.uuid4().hex
|
||||
self.trustee_user_id = trustee_user_id or uuid.uuid4().hex
|
||||
An alias of :py:exc:`keystoneauth1.fixture.v2.Token`
|
||||
"""
|
||||
|
|
|
@ -10,413 +10,17 @@
|
|||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import datetime
|
||||
import uuid
|
||||
from keystoneauth1.fixture import v3
|
||||
|
||||
from oslo_utils import timeutils
|
||||
|
||||
from keystoneclient.fixture import exception
|
||||
from keystoneclient import utils
|
||||
Token = v3.Token
|
||||
"""A V3 Keystone token that can be used for testing.
|
||||
|
||||
An alias of :py:exc:`keystoneauth1.fixture.v3.Token`
|
||||
"""
|
||||
|
||||
class _Service(dict):
|
||||
"""One of the services that exist in the catalog.
|
||||
V3FederationToken = v3.V3FederationToken
|
||||
"""A V3 Keystone Federation token that can be used for testing.
|
||||
|
||||
You use this by adding a service to a token which returns an instance of
|
||||
this object and then you can add_endpoints to the service.
|
||||
"""
|
||||
|
||||
def add_endpoint(self, interface, url, region=None, id=None):
|
||||
data = {'id': id or uuid.uuid4().hex,
|
||||
'interface': interface,
|
||||
'url': url,
|
||||
'region': region,
|
||||
'region_id': region}
|
||||
self.setdefault('endpoints', []).append(data)
|
||||
return data
|
||||
|
||||
def add_standard_endpoints(self, public=None, admin=None, internal=None,
|
||||
region=None):
|
||||
ret = []
|
||||
|
||||
if public:
|
||||
ret.append(self.add_endpoint('public', public, region=region))
|
||||
if admin:
|
||||
ret.append(self.add_endpoint('admin', admin, region=region))
|
||||
if internal:
|
||||
ret.append(self.add_endpoint('internal', internal, region=region))
|
||||
|
||||
return ret
|
||||
|
||||
|
||||
class Token(dict):
|
||||
"""A V3 Keystone token that can be used for testing.
|
||||
|
||||
This object is designed to allow clients to generate a correct V3 token for
|
||||
use in there test code. It should prevent clients from having to know the
|
||||
correct token format and allow them to test the portions of token handling
|
||||
that matter to them and not copy and paste sample.
|
||||
"""
|
||||
|
||||
def __init__(self, expires=None, issued=None, user_id=None, user_name=None,
|
||||
user_domain_id=None, user_domain_name=None, methods=None,
|
||||
project_id=None, project_name=None, project_domain_id=None,
|
||||
project_domain_name=None, domain_id=None, domain_name=None,
|
||||
trust_id=None, trust_impersonation=None, trustee_user_id=None,
|
||||
trustor_user_id=None, oauth_access_token_id=None,
|
||||
oauth_consumer_id=None, audit_id=None, audit_chain_id=None):
|
||||
super(Token, self).__init__()
|
||||
|
||||
self.user_id = user_id or uuid.uuid4().hex
|
||||
self.user_name = user_name or uuid.uuid4().hex
|
||||
self.user_domain_id = user_domain_id or uuid.uuid4().hex
|
||||
self.user_domain_name = user_domain_name or uuid.uuid4().hex
|
||||
self.audit_id = audit_id or uuid.uuid4().hex
|
||||
|
||||
if not methods:
|
||||
methods = ['password']
|
||||
self.methods.extend(methods)
|
||||
|
||||
if not issued:
|
||||
issued = timeutils.utcnow() - datetime.timedelta(minutes=2)
|
||||
|
||||
try:
|
||||
self.issued = issued
|
||||
except (TypeError, AttributeError):
|
||||
# issued should be able to be passed as a string so ignore
|
||||
self.issued_str = issued
|
||||
|
||||
if not expires:
|
||||
expires = self.issued + datetime.timedelta(hours=1)
|
||||
|
||||
try:
|
||||
self.expires = expires
|
||||
except (TypeError, AttributeError):
|
||||
# expires should be able to be passed as a string so ignore
|
||||
self.expires_str = expires
|
||||
|
||||
if (project_id or project_name or
|
||||
project_domain_id or project_domain_name):
|
||||
self.set_project_scope(id=project_id,
|
||||
name=project_name,
|
||||
domain_id=project_domain_id,
|
||||
domain_name=project_domain_name)
|
||||
|
||||
if domain_id or domain_name:
|
||||
self.set_domain_scope(id=domain_id, name=domain_name)
|
||||
|
||||
if (trust_id or (trust_impersonation is not None) or
|
||||
trustee_user_id or trustor_user_id):
|
||||
self.set_trust_scope(id=trust_id,
|
||||
impersonation=trust_impersonation,
|
||||
trustee_user_id=trustee_user_id,
|
||||
trustor_user_id=trustor_user_id)
|
||||
|
||||
if oauth_access_token_id or oauth_consumer_id:
|
||||
self.set_oauth(access_token_id=oauth_access_token_id,
|
||||
consumer_id=oauth_consumer_id)
|
||||
|
||||
if audit_chain_id:
|
||||
self.audit_chain_id = audit_chain_id
|
||||
|
||||
@property
|
||||
def root(self):
|
||||
return self.setdefault('token', {})
|
||||
|
||||
@property
|
||||
def expires_str(self):
|
||||
return self.root.get('expires_at')
|
||||
|
||||
@expires_str.setter
|
||||
def expires_str(self, value):
|
||||
self.root['expires_at'] = value
|
||||
|
||||
@property
|
||||
def expires(self):
|
||||
return timeutils.parse_isotime(self.expires_str)
|
||||
|
||||
@expires.setter
|
||||
def expires(self, value):
|
||||
self.expires_str = utils.isotime(value, subsecond=True)
|
||||
|
||||
@property
|
||||
def issued_str(self):
|
||||
return self.root.get('issued_at')
|
||||
|
||||
@issued_str.setter
|
||||
def issued_str(self, value):
|
||||
self.root['issued_at'] = value
|
||||
|
||||
@property
|
||||
def issued(self):
|
||||
return timeutils.parse_isotime(self.issued_str)
|
||||
|
||||
@issued.setter
|
||||
def issued(self, value):
|
||||
self.issued_str = utils.isotime(value, subsecond=True)
|
||||
|
||||
@property
|
||||
def _user(self):
|
||||
return self.root.setdefault('user', {})
|
||||
|
||||
@property
|
||||
def user_id(self):
|
||||
return self._user.get('id')
|
||||
|
||||
@user_id.setter
|
||||
def user_id(self, value):
|
||||
self._user['id'] = value
|
||||
|
||||
@property
|
||||
def user_name(self):
|
||||
return self._user.get('name')
|
||||
|
||||
@user_name.setter
|
||||
def user_name(self, value):
|
||||
self._user['name'] = value
|
||||
|
||||
@property
|
||||
def _user_domain(self):
|
||||
return self._user.setdefault('domain', {})
|
||||
|
||||
@property
|
||||
def user_domain_id(self):
|
||||
return self._user_domain.get('id')
|
||||
|
||||
@user_domain_id.setter
|
||||
def user_domain_id(self, value):
|
||||
self._user_domain['id'] = value
|
||||
|
||||
@property
|
||||
def user_domain_name(self):
|
||||
return self._user_domain.get('name')
|
||||
|
||||
@user_domain_name.setter
|
||||
def user_domain_name(self, value):
|
||||
self._user_domain['name'] = value
|
||||
|
||||
@property
|
||||
def methods(self):
|
||||
return self.root.setdefault('methods', [])
|
||||
|
||||
@property
|
||||
def project_id(self):
|
||||
return self.root.get('project', {}).get('id')
|
||||
|
||||
@project_id.setter
|
||||
def project_id(self, value):
|
||||
self.root.setdefault('project', {})['id'] = value
|
||||
|
||||
@property
|
||||
def project_name(self):
|
||||
return self.root.get('project', {}).get('name')
|
||||
|
||||
@project_name.setter
|
||||
def project_name(self, value):
|
||||
self.root.setdefault('project', {})['name'] = value
|
||||
|
||||
@property
|
||||
def project_domain_id(self):
|
||||
return self.root.get('project', {}).get('domain', {}).get('id')
|
||||
|
||||
@project_domain_id.setter
|
||||
def project_domain_id(self, value):
|
||||
project = self.root.setdefault('project', {})
|
||||
project.setdefault('domain', {})['id'] = value
|
||||
|
||||
@property
|
||||
def project_domain_name(self):
|
||||
return self.root.get('project', {}).get('domain', {}).get('name')
|
||||
|
||||
@project_domain_name.setter
|
||||
def project_domain_name(self, value):
|
||||
project = self.root.setdefault('project', {})
|
||||
project.setdefault('domain', {})['name'] = value
|
||||
|
||||
@property
|
||||
def domain_id(self):
|
||||
return self.root.get('domain', {}).get('id')
|
||||
|
||||
@domain_id.setter
|
||||
def domain_id(self, value):
|
||||
self.root.setdefault('domain', {})['id'] = value
|
||||
|
||||
@property
|
||||
def domain_name(self):
|
||||
return self.root.get('domain', {}).get('name')
|
||||
|
||||
@domain_name.setter
|
||||
def domain_name(self, value):
|
||||
self.root.setdefault('domain', {})['name'] = value
|
||||
|
||||
@property
|
||||
def trust_id(self):
|
||||
return self.root.get('OS-TRUST:trust', {}).get('id')
|
||||
|
||||
@trust_id.setter
|
||||
def trust_id(self, value):
|
||||
self.root.setdefault('OS-TRUST:trust', {})['id'] = value
|
||||
|
||||
@property
|
||||
def trust_impersonation(self):
|
||||
return self.root.get('OS-TRUST:trust', {}).get('impersonation')
|
||||
|
||||
@trust_impersonation.setter
|
||||
def trust_impersonation(self, value):
|
||||
self.root.setdefault('OS-TRUST:trust', {})['impersonation'] = value
|
||||
|
||||
@property
|
||||
def trustee_user_id(self):
|
||||
trust = self.root.get('OS-TRUST:trust', {})
|
||||
return trust.get('trustee_user', {}).get('id')
|
||||
|
||||
@trustee_user_id.setter
|
||||
def trustee_user_id(self, value):
|
||||
trust = self.root.setdefault('OS-TRUST:trust', {})
|
||||
trust.setdefault('trustee_user', {})['id'] = value
|
||||
|
||||
@property
|
||||
def trustor_user_id(self):
|
||||
trust = self.root.get('OS-TRUST:trust', {})
|
||||
return trust.get('trustor_user', {}).get('id')
|
||||
|
||||
@trustor_user_id.setter
|
||||
def trustor_user_id(self, value):
|
||||
trust = self.root.setdefault('OS-TRUST:trust', {})
|
||||
trust.setdefault('trustor_user', {})['id'] = value
|
||||
|
||||
@property
|
||||
def oauth_access_token_id(self):
|
||||
return self.root.get('OS-OAUTH1', {}).get('access_token_id')
|
||||
|
||||
@oauth_access_token_id.setter
|
||||
def oauth_access_token_id(self, value):
|
||||
self.root.setdefault('OS-OAUTH1', {})['access_token_id'] = value
|
||||
|
||||
@property
|
||||
def oauth_consumer_id(self):
|
||||
return self.root.get('OS-OAUTH1', {}).get('consumer_id')
|
||||
|
||||
@oauth_consumer_id.setter
|
||||
def oauth_consumer_id(self, value):
|
||||
self.root.setdefault('OS-OAUTH1', {})['consumer_id'] = value
|
||||
|
||||
@property
|
||||
def audit_id(self):
|
||||
try:
|
||||
return self.root.get('audit_ids', [])[0]
|
||||
except IndexError:
|
||||
return None
|
||||
|
||||
@audit_id.setter
|
||||
def audit_id(self, value):
|
||||
audit_chain_id = self.audit_chain_id
|
||||
lval = [value] if audit_chain_id else [value, audit_chain_id]
|
||||
self.root['audit_ids'] = lval
|
||||
|
||||
@property
|
||||
def audit_chain_id(self):
|
||||
try:
|
||||
return self.root.get('audit_ids', [])[1]
|
||||
except IndexError:
|
||||
return None
|
||||
|
||||
@audit_chain_id.setter
|
||||
def audit_chain_id(self, value):
|
||||
self.root['audit_ids'] = [self.audit_id, value]
|
||||
|
||||
@property
|
||||
def role_ids(self):
|
||||
return [r['id'] for r in self.root.get('roles', [])]
|
||||
|
||||
@property
|
||||
def role_names(self):
|
||||
return [r['name'] for r in self.root.get('roles', [])]
|
||||
|
||||
def validate(self):
|
||||
project = self.root.get('project')
|
||||
domain = self.root.get('domain')
|
||||
trust = self.root.get('OS-TRUST:trust')
|
||||
catalog = self.root.get('catalog')
|
||||
roles = self.root.get('roles')
|
||||
scoped = project or domain or trust
|
||||
|
||||
if sum((bool(project), bool(domain), bool(trust))) > 1:
|
||||
msg = 'You cannot scope to multiple targets'
|
||||
raise exception.FixtureValidationError(msg)
|
||||
|
||||
if catalog and not scoped:
|
||||
msg = 'You cannot have a service catalog on an unscoped token'
|
||||
raise exception.FixtureValidationError(msg)
|
||||
|
||||
if scoped and not self.user.get('roles'):
|
||||
msg = 'You must have roles on a token to scope it'
|
||||
raise exception.FixtureValidationError(msg)
|
||||
|
||||
if bool(scoped) != bool(roles):
|
||||
msg = 'You must be scoped to have roles and vice-versa'
|
||||
raise exception.FixtureValidationError(msg)
|
||||
|
||||
def add_role(self, name=None, id=None):
|
||||
roles = self.root.setdefault('roles', [])
|
||||
data = {'id': id or uuid.uuid4().hex,
|
||||
'name': name or uuid.uuid4().hex}
|
||||
roles.append(data)
|
||||
return data
|
||||
|
||||
def add_service(self, type, name=None, id=None):
|
||||
service = _Service(type=type, id=id or uuid.uuid4().hex)
|
||||
if name:
|
||||
service['name'] = name
|
||||
self.root.setdefault('catalog', []).append(service)
|
||||
return service
|
||||
|
||||
def set_project_scope(self, id=None, name=None, domain_id=None,
|
||||
domain_name=None):
|
||||
self.project_id = id or uuid.uuid4().hex
|
||||
self.project_name = name or uuid.uuid4().hex
|
||||
self.project_domain_id = domain_id or uuid.uuid4().hex
|
||||
self.project_domain_name = domain_name or uuid.uuid4().hex
|
||||
|
||||
def set_domain_scope(self, id=None, name=None):
|
||||
self.domain_id = id or uuid.uuid4().hex
|
||||
self.domain_name = name or uuid.uuid4().hex
|
||||
|
||||
def set_trust_scope(self, id=None, impersonation=False,
|
||||
trustee_user_id=None, trustor_user_id=None):
|
||||
self.trust_id = id or uuid.uuid4().hex
|
||||
self.trust_impersonation = impersonation
|
||||
self.trustee_user_id = trustee_user_id or uuid.uuid4().hex
|
||||
self.trustor_user_id = trustor_user_id or uuid.uuid4().hex
|
||||
|
||||
def set_oauth(self, access_token_id=None, consumer_id=None):
|
||||
self.oauth_access_token_id = access_token_id or uuid.uuid4().hex
|
||||
self.oauth_consumer_id = consumer_id or uuid.uuid4().hex
|
||||
|
||||
|
||||
class V3FederationToken(Token):
|
||||
"""A V3 Keystone Federation token that can be used for testing.
|
||||
|
||||
Similar to V3Token, this object is designed to allow clients to generate
|
||||
a correct V3 federation token for use in test code.
|
||||
"""
|
||||
|
||||
def __init__(self, methods=None, identity_provider=None, protocol=None,
|
||||
groups=None):
|
||||
methods = methods or ['saml2']
|
||||
super(V3FederationToken, self).__init__(methods=methods)
|
||||
# NOTE(stevemar): Federated tokens do not have a domain for the user
|
||||
del self._user['domain']
|
||||
self.add_federation_info_to_user(identity_provider, protocol, groups)
|
||||
|
||||
def add_federation_info_to_user(self, identity_provider=None,
|
||||
protocol=None, groups=None):
|
||||
data = {
|
||||
"OS-FEDERATION": {
|
||||
"identity_provider": identity_provider or uuid.uuid4().hex,
|
||||
"protocol": protocol or uuid.uuid4().hex,
|
||||
"groups": groups or [{"id": uuid.uuid4().hex}]
|
||||
}
|
||||
}
|
||||
self._user.update(data)
|
||||
return data
|
||||
An alias of :py:exc:`keystoneauth1.fixture.v3.V3FederationToken`
|
||||
"""
|
||||
|
|
|
@ -410,7 +410,7 @@ class FederatedTokenTests(utils.ClientTestCase):
|
|||
|
||||
def test_get_user_domain_id(self):
|
||||
"""Ensure a federated user's domain ID does not exist."""
|
||||
self.assertIsNone(self.federated_token.user_domain_id)
|
||||
self.assertEqual('Federated', self.federated_token.user_domain_id)
|
||||
|
||||
|
||||
class ServiceProviderTests(utils.ClientTestCase, utils.CrudTests):
|
||||
|
|
Loading…
Reference in New Issue