Implements v3 auth client.

Added support for domain scoping.

Enhancement on AccessInfo to support reading v2/v3 token information.

Enhancement on ServiceCatalog for reading/filtering v2/v3 service
catalog information.

Change-Id: Ibb678b9933d3673e37d0fba857a152a3c5d2b4f4
This commit is contained in:
lin-hua-cheng
2013-02-13 22:52:05 -06:00
committed by Lin Hua Cheng
parent abe6781913
commit 2239c3b27c
22 changed files with 2391 additions and 510 deletions

View File

@@ -20,13 +20,41 @@ import datetime
from keystoneclient.openstack.common import timeutils
from keystoneclient import service_catalog
# gap, in seconds, to determine whether the given token is about to expire
STALE_TOKEN_DURATION = 30
class AccessInfo(dict):
"""An object for encapsulating a raw authentication token from keystone
and helper methods for extracting useful values from that token."""
"""Encapsulates a raw authentication token from keystone.
Provides helper methods for extracting useful values from that token.
"""
@classmethod
def factory(cls, resp=None, body=None, **kwargs):
"""Create AccessInfo object given a successful auth response & body
or a user-provided dict.
"""
if body is not None or len(kwargs):
if AccessInfoV3.is_valid(body, **kwargs):
token = None
if resp:
token = resp.headers['X-Subject-Token']
if body:
return AccessInfoV3(token, **body['token'])
else:
return AccessInfoV3(token, **kwargs)
elif AccessInfoV2.is_valid(body, **kwargs):
if body:
return AccessInfoV2(**body['access'])
else:
return AccessInfoV2(**kwargs)
else:
raise NotImplementedError('Unrecognized auth response')
else:
return AccessInfoV2(**kwargs)
def __init__(self, *args, **kwargs):
super(AccessInfo, self).__init__(*args, **kwargs)
@@ -37,7 +65,7 @@ class AccessInfo(dict):
return 'serviceCatalog' in self
def will_expire_soon(self, stale_duration=None):
""" Determines if expiration is about to occur.
"""Determines if expiration is about to occur.
:return: boolean : true if expiration is within the given duration
@@ -51,71 +79,243 @@ class AccessInfo(dict):
seconds=stale_duration))
return norm_expires < soon
@property
def expires(self):
""" Returns the token expiration (as datetime object)
:returns: datetime
@classmethod
def is_valid(cls, body, **kwargs):
"""Determines if processing v2 or v3 token given a successful
auth body or a user-provided dict.
:return: boolean : true if auth body matches implementing class
"""
return timeutils.parse_isotime(self['token']['expires'])
raise NotImplementedError()
def has_service_catalog(self):
"""Returns true if the authorization token has a service catalog.
:returns: boolean
"""
raise NotImplementedError()
@property
def auth_token(self):
""" Returns the token_id associated with the auth request, to be used
"""Returns the token_id associated with the auth request, to be used
in headers for authenticating OpenStack API requests.
:returns: str
"""
return self['token'].get('id', None)
raise NotImplementedError()
@property
def expires(self):
"""Returns the token expiration (as datetime object)
:returns: datetime
"""
raise NotImplementedError()
@property
def username(self):
""" Returns the username associated with the authentication request.
"""Returns the username associated with the authentication request.
Follows the pattern defined in the V2 API of first looking for 'name',
returning that if available, and falling back to 'username' if name
is unavailable.
:returns: str
"""
name = self['user'].get('name', None)
if name:
return name
else:
return self['user'].get('username', None)
raise NotImplementedError()
@property
def user_id(self):
""" Returns the user id associated with the authentication request.
"""Returns the user id associated with the authentication request.
:returns: str
"""
return self['user'].get('id', None)
raise NotImplementedError()
@property
def tenant_name(self):
""" Returns the tenant (project) name associated with the
authentication request.
def user_domain_id(self):
"""Returns the domain id of the user associated with the authentication
request.
For v2, it always returns 'default' which maybe different from the
Keystone configuration.
:returns: str
"""
tenant_dict = self['token'].get('tenant', None)
if tenant_dict:
return tenant_dict.get('name', None)
return None
raise NotImplementedError()
@property
def domain_name(self):
"""Returns the domain name associated with the authentication token.
:returns: str or None (if no domain associated with the token)
"""
raise NotImplementedError()
@property
def domain_id(self):
"""Returns the domain id associated with the authentication token.
:returns: str or None (if no domain associated with the token)
"""
raise NotImplementedError()
@property
def project_name(self):
""" Synonym for tenant_name """
return self.tenant_name
"""Returns the project name associated with the authentication request.
:returns: str or None (if no project associated with the token)
"""
raise NotImplementedError()
@property
def tenant_name(self):
"""Synonym for project_name"""
return self.project_name
@property
def scoped(self):
""" Returns true if the authorization token was scoped to a tenant
(project), and contains a populated service catalog.
(project), and contains a populated service catalog.
This is deprecated, use project_scoped instead.
:returns: bool
"""
raise NotImplementedError()
@property
def project_scoped(self):
""" Returns true if the authorization token was scoped to a tenant
(project).
:returns: bool
"""
raise NotImplementedError()
@property
def domain_scoped(self):
""" Returns true if the authorization token was scoped to a domain.
:returns: bool
"""
raise NotImplementedError()
@property
def project_id(self):
"""Returns the project ID associated with the authentication
request, or None if the authentication request wasn't scoped to a
project.
:returns: str or None ((if no project associated with the token)
"""
raise NotImplementedError()
@property
def tenant_id(self):
"""Synonym for project_id """
return self.project_id
@property
def project_domain_id(self):
"""Returns the domain id of the project associated with the
authentication request.
For v2, it always returns 'default' which maybe different from the
keystone configuration.
:returns: str
"""
raise NotImplementedError()
@property
def auth_url(self):
"""Returns a tuple of URLs from publicURL and adminURL for the service
'identity' from the service catalog associated with the authorization
request. If the authentication request wasn't scoped to a tenant
(project), this property will return None.
:returns: tuple of urls
"""
raise NotImplementedError()
@property
def management_url(self):
"""Returns the first adminURL for 'identity' from the service catalog
associated with the authorization request, or None if the
authentication request wasn't scoped to a tenant (project).
:returns: tuple of urls
"""
raise NotImplementedError()
@property
def version(self):
"""Returns the version of the auth token from identity service.
:returns: str
"""
return self.get('version')
class AccessInfoV2(AccessInfo):
"""An object for encapsulating a raw v2 auth token from identity
service.
"""
def __init__(self, *args, **kwargs):
super(AccessInfo, self).__init__(*args, **kwargs)
self.update(version='v2.0')
self.service_catalog = service_catalog.ServiceCatalog.factory(
resource_dict=self,
token=self['token']['id'],
region_name=self.get('region_name'))
@classmethod
def is_valid(cls, body, **kwargs):
if body:
return 'access' in body
elif kwargs:
return kwargs.get('version') == 'v2.0'
else:
return False
def has_service_catalog(self):
return 'serviceCatalog' in self
@property
def auth_token(self):
return self['token']['id']
@property
def expires(self):
return timeutils.parse_isotime(self['token']['expires'])
@property
def username(self):
return self['user'].get('name', self['user'].get('username'))
@property
def user_id(self):
return self['user']['id']
@property
def user_domain_id(self):
return 'default'
@property
def domain_name(self):
return None
@property
def domain_id(self):
return None
@property
def project_name(self):
tenant_dict = self['token'].get('tenant', None)
if tenant_dict:
return tenant_dict.get('name', None)
@property
def scoped(self):
if ('serviceCatalog' in self
and self['serviceCatalog']
and 'tenant' in self['token']):
@@ -123,51 +323,143 @@ class AccessInfo(dict):
return False
@property
def tenant_id(self):
""" Returns the tenant (project) id associated with the authentication
request, or None if the authentication request wasn't scoped to a
tenant (project).
def project_scoped(self):
return 'tenant' in self['token']
:returns: str
"""
@property
def domain_scoped(self):
return False
@property
def project_id(self):
tenant_dict = self['token'].get('tenant', None)
if tenant_dict:
return tenant_dict.get('id', None)
return None
@property
def project_id(self):
""" Synonym for project_id """
return self.tenant_id
def _get_identity_endpoint(self, endpoint_type):
if not self.get('serviceCatalog'):
return
identity_services = [x for x in self['serviceCatalog']
if x['type'] == 'identity']
return tuple(endpoint[endpoint_type]
for svc in identity_services
for endpoint in svc['endpoints']
if endpoint_type in endpoint)
def project_domain_id(self):
if self.project_id:
return 'default'
@property
def auth_url(self):
""" Returns a tuple of URLs from publicURL and adminURL for the service
'identity' from the service catalog associated with the authorization
request. If the authentication request wasn't scoped to a tenant
(project), this property will return None.
:returns: tuple of urls
"""
return self._get_identity_endpoint('publicURL')
if self.service_catalog:
return self.service_catalog.get_urls(service_type='identity',
endpoint_type='publicURL')
else:
return None
@property
def management_url(self):
""" Returns the first adminURL for 'identity' from the service catalog
associated with the authorization request, or None if the
authentication request wasn't scoped to a tenant (project).
if self.service_catalog:
return self.service_catalog.get_urls(service_type='identity',
endpoint_type='adminURL')
else:
return None
:returns: tuple of urls
"""
return self._get_identity_endpoint('adminURL')
class AccessInfoV3(AccessInfo):
"""An object for encapsulating a raw v3 auth token from identity
service.
"""
def __init__(self, token, *args, **kwargs):
super(AccessInfo, self).__init__(*args, **kwargs)
self.update(version='v3')
self.service_catalog = service_catalog.ServiceCatalog.factory(
resource_dict=self,
token=token,
region_name=self.get('region_name'))
if token:
self.update(auth_token=token)
@classmethod
def is_valid(cls, body, **kwargs):
if body:
return 'token' in body
elif kwargs:
return kwargs.get('version') == 'v3'
else:
return False
def has_service_catalog(self):
return 'catalog' in self
@property
def auth_token(self):
return self['auth_token']
@property
def expires(self):
return timeutils.parse_isotime(self['expires_at'])
@property
def user_id(self):
return self['user']['id']
@property
def user_domain_id(self):
return self['user']['domain']['id']
@property
def username(self):
return self['user']['name']
@property
def domain_name(self):
domain = self.get('domain')
if domain:
return domain['name']
@property
def domain_id(self):
domain = self.get('domain')
if domain:
return domain['id']
@property
def project_id(self):
project = self.get('project')
if project:
return project['id']
@property
def project_domain_id(self):
project = self.get('project')
if project:
return project['domain']['id']
@property
def project_name(self):
project = self.get('project')
if project:
return project['name']
@property
def scoped(self):
return ('catalog' in self and self['catalog'] and 'project' in self)
@property
def project_scoped(self):
return 'project' in self
@property
def domain_scoped(self):
return 'domain' in self
@property
def auth_url(self):
if self.service_catalog:
return self.service_catalog.get_urls(service_type='identity',
endpoint_type='public')
else:
return None
@property
def management_url(self):
if self.service_catalog:
return self.service_catalog.get_urls(service_type='identity',
endpoint_type='admin')
else:
return None

View File

@@ -48,40 +48,160 @@ class HTTPClient(object):
endpoint=None, token=None, cacert=None, key=None,
cert=None, insecure=False, original_ip=None, debug=False,
auth_ref=None, use_keyring=False, force_new_token=False,
stale_duration=None):
stale_duration=None, user_id=None, user_domain_id=None,
user_domain_name=None, domain_id=None, domain_name=None,
project_id=None, project_name=None, project_domain_id=None,
project_domain_name=None):
"""Construct a new http client
@param: timeout the request libary timeout in seconds (default None)
:param string user_id: User ID for authentication. (optional)
:param string username: Username for authentication. (optional)
:param string user_domain_id: User's domain ID for authentication.
(optional)
:param string user_domain_name: User's domain name for authentication.
(optional)
:param string password: Password for authentication. (optional)
:param string domain_id: Domain ID for domain scoping. (optional)
:param string domain_name: Domain name for domain scoping. (optional)
:param string project_id: Project ID for project scoping. (optional)
:param string project_name: Project name for project scoping.
(optional)
:param string project_domain_id: Project's domain ID for project
scoping. (optional)
:param string project_domain_name: Project's domain name for project
scoping. (optional)
:param string auth_url: Identity service endpoint for authorization.
:param string region_name: Name of a region to select when choosing an
endpoint from the service catalog.
:param integer timeout: Allows customization of the timeout for client
http requests. (optional)
:param string endpoint: A user-supplied endpoint URL for the identity
service. Lazy-authentication is possible for
API service calls if endpoint is set at
instantiation. (optional)
:param string token: Token for authentication. (optional)
:param string cacert: Path to the Privacy Enhanced Mail (PEM) file
which contains the trusted authority X.509
certificates needed to established SSL connection
with the identity service. (optional)
:param string key: Path to the Privacy Enhanced Mail (PEM) file which
contains the unencrypted client private key needed
to established two-way SSL connection with the
identity service. (optional)
:param string cert: Path to the Privacy Enhanced Mail (PEM) file which
contains the corresponding X.509 client certificate
needed to established two-way SSL connection with
the identity service. (optional)
:param boolean insecure: Does not perform X.509 certificate validation
when establishing SSL connection with identity
service. default: False (optional)
:param string original_ip: The original IP of the requesting user
which will be sent to identity service in a
'Forwarded' header. (optional)
:param boolean debug: Enables debug logging of all request and
responses to identity service.
default False (optional)
:param dict auth_ref: To allow for consumers of the client to manage
their own caching strategy, you may initialize a
client with a previously captured auth_reference
(token). If there are keyword arguments passed
that also exist in auth_ref, the value from the
argument will take precedence.
:param boolean use_keyring: Enables caching auth_ref into keyring.
default: False (optional)
:param boolean force_new_token: Keyring related parameter, forces
request for new token.
default: False (optional)
:param integer stale_duration: Gap in seconds to determine if token
from keyring is about to expire.
default: 30 (optional)
:param string tenant_name: Tenant name. (optional)
The tenant_name keyword argument is
deprecated, use project_name instead.
:param string tenant_id: Tenant id. (optional)
The tenant_id keyword argument is
deprecated, use project_id instead.
"""
self.version = 'v2.0'
# set baseline defaults
self.user_id = None
self.username = None
self.tenant_id = None
self.tenant_name = None
self.user_domain_id = None
self.user_domain_name = None
self.domain_id = None
self.domain_name = None
self.project_id = None
self.project_name = None
self.project_domain_id = None
self.project_domain_name = None
self.auth_url = None
self.management_url = None
if timeout is not None:
self.timeout = float(timeout)
else:
self.timeout = None
self.timeout = float(timeout) if timeout is not None else None
# if loading from a dictionary passed in via auth_ref,
# load values from AccessInfo parsing that dictionary
self.auth_ref = access.AccessInfo(**auth_ref) if auth_ref else None
if self.auth_ref:
if auth_ref:
self.auth_ref = access.AccessInfo.factory(**auth_ref)
self.version = self.auth_ref.version
self.user_id = self.auth_ref.user_id
self.username = self.auth_ref.username
self.tenant_id = self.auth_ref.tenant_id
self.tenant_name = self.auth_ref.tenant_name
self.user_domain_id = self.auth_ref.user_domain_id
self.domain_id = self.auth_ref.domain_id
self.domain_name = self.auth_ref.domain_name
self.project_id = self.auth_ref.project_id
self.project_name = self.auth_ref.project_name
self.project_domain_id = self.auth_ref.project_domain_id
self.auth_url = self.auth_ref.auth_url[0]
self.management_url = self.auth_ref.management_url[0]
self.auth_token = self.auth_ref.auth_token
else:
self.auth_ref = None
# allow override of the auth_ref defaults from explicit
# values provided to the client
# apply deprecated variables first, so modern variables override them
if tenant_id:
self.project_id = tenant_id
if tenant_name:
self.project_name = tenant_name
# user-related attributes
self.password = password
if user_id:
self.user_id = user_id
if username:
self.username = username
if tenant_id:
self.tenant_id = tenant_id
if tenant_name:
self.tenant_name = tenant_name
if user_domain_id:
self.user_domain_id = user_domain_id
elif not (user_id or user_domain_name):
self.user_domain_id = 'default'
if user_domain_name:
self.user_domain_name = user_domain_name
# domain-related attributes
if domain_id:
self.domain_id = domain_id
if domain_name:
self.domain_name = domain_name
# project-related attributes
if project_id:
self.project_id = project_id
if project_name:
self.project_name = project_name
if project_domain_id:
self.project_domain_id = project_domain_id
elif not (project_id or project_domain_name):
self.project_domain_id = 'default'
if project_domain_name:
self.project_domain_name = project_domain_name
# endpoint selection
if auth_url:
self.auth_url = auth_url.rstrip('/')
if token:
@@ -90,9 +210,9 @@ class HTTPClient(object):
self.auth_token_from_user = None
if endpoint:
self.management_url = endpoint.rstrip('/')
self.password = password
self.original_ip = original_ip
self.region_name = region_name
self.original_ip = original_ip
if cacert:
self.verify_cert = cacert
else:
@@ -138,20 +258,55 @@ class HTTPClient(object):
def auth_token(self):
del self.auth_token_from_user
@property
def service_catalog(self):
"""Returns this client's service catalog."""
return self.auth_ref.service_catalog
def has_service_catalog(self):
"""Returns True if this client provides a service catalog."""
return self.auth_ref.has_service_catalog()
@property
def tenant_id(self):
"""Provide read-only backwards compatibility for tenant_id.
This is deprecated, use project_id instead.
"""
return self.project_id
@property
def tenant_name(self):
"""Provide read-only backwards compatibility for tenant_name.
This is deprecated, use project_name instead.
"""
return self.project_name
def authenticate(self, username=None, password=None, tenant_name=None,
tenant_id=None, auth_url=None, token=None):
""" Authenticate user.
tenant_id=None, auth_url=None, token=None,
user_id=None, domain_name=None, domain_id=None,
project_name=None, project_id=None, user_domain_id=None,
user_domain_name=None, project_domain_id=None,
project_domain_name=None):
"""Authenticate user.
Uses the data provided at instantiation to authenticate against
the Keystone server. This may use either a username and password
the Identity server. This may use either a username and password
or token for authentication. If a tenant name or id was provided
then the resulting authenticated client will be scoped to that
tenant and contain a service catalog of available endpoints.
With the v2.0 API, if a tenant name or ID is not provided, the
authenication token returned will be 'unscoped' and limited in
authentication token returned will be 'unscoped' and limited in
capabilities until a fully-scoped token is acquired.
With the v3 API, if a domain name or id was provided then the resulting
authenticated client will be scoped to that domain. If a project name
or ID is not provided, and the authenticating user has a default
project configured, the authentication token returned will be 'scoped'
to the default project. Otherwise, the authentication token returned
will be 'unscoped' and limited in capabilities until a fully-scoped
token is acquired.
If successful, sets the self.auth_ref and self.auth_token with
the returned token. If not already set, will also set
self.management_url from the details provided in the token.
@@ -173,10 +328,18 @@ class HTTPClient(object):
"""
auth_url = auth_url or self.auth_url
user_id = user_id or self.user_id
username = username or self.username
password = password or self.password
tenant_name = tenant_name or self.tenant_name
tenant_id = tenant_id or self.tenant_id
user_domain_id = user_domain_id or self.user_domain_id
user_domain_name = user_domain_name or self.user_domain_name
domain_id = domain_id or self.domain_id
domain_name = domain_name or self.domain_name
project_id = project_id or tenant_id or self.project_id
project_name = project_name or tenant_name or self.project_name
project_domain_id = project_domain_id or self.project_domain_id
project_domain_name = project_domain_name or self.project_domain_name
if not token:
token = self.auth_token_from_user
@@ -184,21 +347,27 @@ class HTTPClient(object):
self.auth_ref.will_expire_soon(self.stale_duration)):
token = self.auth_ref.auth_token
(keyring_key, auth_ref) = self.get_auth_ref_from_keyring(auth_url,
username,
tenant_name,
tenant_id,
token)
kwargs = {
'auth_url': auth_url,
'user_id': user_id,
'username': username,
'user_domain_id': user_domain_id,
'user_domain_name': user_domain_name,
'domain_id': domain_id,
'domain_name': domain_name,
'project_id': project_id,
'project_name': project_name,
'project_domain_id': project_domain_id,
'project_domain_name': project_domain_name,
'token': token
}
(keyring_key, auth_ref) = self.get_auth_ref_from_keyring(**kwargs)
new_token_needed = False
if auth_ref is None or self.force_new_token:
new_token_needed = True
raw_token = self.get_raw_token_from_identity_service(auth_url,
username,
password,
tenant_name,
tenant_id,
token)
self.auth_ref = access.AccessInfo(**raw_token)
kwargs['password'] = password
resp, body = self.get_raw_token_from_identity_service(**kwargs)
self.auth_ref = access.AccessInfo.factory(resp, body)
else:
self.auth_ref = auth_ref
self.process_token()
@@ -206,23 +375,18 @@ class HTTPClient(object):
self.store_auth_ref_into_keyring(keyring_key)
return True
def _build_keyring_key(self, auth_url, username, tenant_name,
tenant_id, token):
""" Create a unique key for keyring.
def _build_keyring_key(self, **kwargs):
"""Create a unique key for keyring.
Used to store and retrieve auth_ref from keyring.
"""
keys = [auth_url, username, tenant_name, tenant_id, token]
for index, key in enumerate(keys):
if key is None:
keys[index] = '?'
keyring_key = '/'.join(keys)
return keyring_key
Returns a slash-separated string of values ordered by key name.
def get_auth_ref_from_keyring(self, auth_url, username, tenant_name,
tenant_id, token):
""" Retrieve auth_ref from keyring.
"""
return '/'.join([kwargs[k] or '?' for k in sorted(kwargs.keys())])
def get_auth_ref_from_keyring(self, **kwargs):
"""Retrieve auth_ref from keyring.
If auth_ref is found in keyring, (keyring_key, auth_ref) is returned.
Otherwise, (keyring_key, None) is returned.
@@ -234,9 +398,7 @@ class HTTPClient(object):
keyring_key = None
auth_ref = None
if self.use_keyring:
keyring_key = self._build_keyring_key(auth_url, username,
tenant_name, tenant_id,
token)
keyring_key = self._build_keyring_key(**kwargs)
try:
auth_ref = keyring.get_password("keystoneclient_auth",
keyring_key)
@@ -252,7 +414,7 @@ class HTTPClient(object):
return (keyring_key, auth_ref)
def store_auth_ref_into_keyring(self, keyring_key):
""" Store auth_ref into keyring.
"""Store auth_ref into keyring.
"""
if self.use_keyring:
@@ -264,30 +426,36 @@ class HTTPClient(object):
_logger.warning("Failed to store token into keyring %s" % (e))
def process_token(self):
""" Extract and process information from the new auth_ref.
"""Extract and process information from the new auth_ref.
"""
raise NotImplementedError
def get_raw_token_from_identity_service(self, auth_url, username=None,
password=None, tenant_name=None,
tenant_id=None, token=None):
""" Authenticate against the Identity API and get a token.
tenant_id=None, token=None,
user_id=None, user_domain_id=None,
user_domain_name=None,
domain_id=None, domain_name=None,
project_id=None, project_name=None,
project_domain_id=None,
project_domain_name=None):
"""Authenticate against the Identity API and get a token.
Not implemented here because auth protocols should be API
version-specific.
Expected to authenticate or validate an existing authentication
reference already associated with the client. Invoking this call
*always* makes a call to the Keystone.
*always* makes a call to the Identity service.
:returns: ``raw token``
:returns: (``resp``, ``body``)
"""
raise NotImplementedError
def _extract_service_catalog(self, url, body):
""" Set the client's service catalog from the response data.
"""Set the client's service catalog from the response data.
Not implemented here because data returned may be API
version-specific.
@@ -334,7 +502,7 @@ class HTTPClient(object):
return self.auth_ref.has_service_catalog()
def request(self, url, method, **kwargs):
""" Send an http request with the specified characteristics.
"""Send an http request with the specified characteristics.
Wrapper around requests.request to handle tasks such as
setting headers, JSON encoding/decoding, and error handling.
@@ -392,9 +560,10 @@ class HTTPClient(object):
return resp, body
def _cs_request(self, url, method, **kwargs):
""" Makes an authenticated request to keystone endpoint by
"""Makes an authenticated request to keystone endpoint by
concatenating self.management_url and url and passing in method and
any associated kwargs. """
any associated kwargs.
"""
is_management = kwargs.pop('management', True)

View File

@@ -23,9 +23,15 @@ from keystoneclient import exceptions
class ServiceCatalog(object):
"""Helper methods for dealing with a Keystone Service Catalog."""
def __init__(self, resource_dict, region_name=None):
self.catalog = resource_dict
self.region_name = region_name
@classmethod
def factory(cls, resource_dict, token=None, region_name=None):
"""Create ServiceCatalog object given a auth token."""
if ServiceCatalogV3.is_valid(resource_dict):
return ServiceCatalogV3(token, resource_dict, region_name)
elif ServiceCatalogV2.is_valid(resource_dict):
return ServiceCatalogV2(resource_dict, region_name)
else:
raise NotImplementedError('Unrecognized auth response')
def get_token(self):
"""Fetch token details from service catalog.
@@ -36,8 +42,71 @@ class ServiceCatalog(object):
- `expires`: Token's expiration
- `user_id`: Authenticated user's ID
- `tenant_id`: Authorized project's ID
- `domain_id`: Authorized domain's ID
"""
raise NotImplementedError()
def get_endpoints(self, service_type=None, endpoint_type=None):
"""Fetch and filter endpoints for the specified service(s).
Returns endpoints for the specified service (or all) and
that contain the specified type (or all).
"""
raise NotImplementedError()
def get_urls(self, attr=None, filter_value=None,
service_type='identity', endpoint_type='publicURL'):
"""Fetch endpoint urls from the service catalog.
Fetch the endpoints from the service catalog for a particular
endpoint attribute. If no attribute is given, return the first
endpoint of the specified type.
:param string attr: Endpoint attribute name.
:param string filter_value: Endpoint attribute value.
:param string service_type: Service type of the endpoint.
:param string endpoint_type: Type of endpoint.
Possible values: public or publicURL,
internal or internalURL,
admin or adminURL
:param string region_name: Region of the endpoint.
:returns: tuple of urls or None (if no match found)
"""
raise NotImplementedError()
def url_for(self, attr=None, filter_value=None,
service_type='identity', endpoint_type='publicURL'):
"""Fetch an endpoint from the service catalog.
Fetch the specified endpoint from the service catalog for
a particular endpoint attribute. If no attribute is given, return
the first endpoint of the specified type.
Valid endpoint types: `public` or `publicURL`,
`internal` or `internalURL`,
`admin` or 'adminURL`
"""
raise NotImplementedError()
class ServiceCatalogV2(ServiceCatalog):
"""An object for encapsulating the service catalog using raw v2 auth token
from Keystone."""
def __init__(self, resource_dict, region_name=None):
self.catalog = resource_dict
self.region_name = region_name
@classmethod
def is_valid(cls, resource_dict):
# This class is also used for reading token info of an unscoped token.
# Unscoped token does not have 'serviceCatalog' in V2, checking this
# will not work. Use 'token' attribute instead.
return 'token' in resource_dict
def get_token(self):
token = {'id': self.catalog['token']['id'],
'expires': self.catalog['token']['expires']}
try:
@@ -48,23 +117,50 @@ class ServiceCatalog(object):
pass
return token
def get_endpoints(self, service_type=None, endpoint_type=None):
if endpoint_type and 'URL' not in endpoint_type:
endpoint_type = endpoint_type + 'URL'
sc = {}
for service in self.catalog.get('serviceCatalog', []):
if service_type and service_type != service['type']:
continue
sc[service['type']] = []
for endpoint in service['endpoints']:
if endpoint_type and endpoint_type not in endpoint.keys():
continue
sc[service['type']].append(endpoint)
return sc
def get_urls(self, attr=None, filter_value=None,
service_type='identity', endpoint_type='publicURL'):
sc_endpoints = self.get_endpoints(service_type=service_type,
endpoint_type=endpoint_type)
endpoints = sc_endpoints.get(service_type)
if not endpoints:
return
if endpoint_type and 'URL' not in endpoint_type:
endpoint_type = endpoint_type + 'URL'
return tuple(endpoint[endpoint_type]
for endpoint in endpoints
if (endpoint_type in endpoint
and (not self.region_name
or endpoint.get('region') == self.region_name)
and (not filter_value
or endpoint.get(attr) == filter_value)))
def url_for(self, attr=None, filter_value=None,
service_type='identity', endpoint_type='publicURL'):
"""Fetch an endpoint from the service catalog.
Fetch the specified endpoint from the service catalog for
a particular endpoint attribute. If no attribute is given, return
the first endpoint of the specified type.
Valid endpoint types: `publicURL`, `internalURL`, `adminURL`
See tests for a sample service catalog.
"""
catalog = self.catalog.get('serviceCatalog', [])
if not catalog:
raise exceptions.EmptyCatalog('The service catalog is empty.')
if 'URL' not in endpoint_type:
endpoint_type = endpoint_type + 'URL'
for service in catalog:
if service['type'] != service_type:
continue
@@ -80,19 +176,96 @@ class ServiceCatalog(object):
raise exceptions.EndpointNotFound('%s endpoint for %s not found.' %
(endpoint_type, service_type))
def get_endpoints(self, service_type=None, endpoint_type=None):
"""Fetch and filter endpoints for the specified service(s).
Returns endpoints for the specified service (or all) and
that contain the specified type (or all).
"""
class ServiceCatalogV3(ServiceCatalog):
"""An object for encapsulating the service catalog using raw v3 auth token
from Keystone."""
def __init__(self, token, resource_dict, region_name=None):
self._auth_token = token
self.catalog = resource_dict
self.region_name = region_name
@classmethod
def is_valid(cls, resource_dict):
# This class is also used for reading token info of an unscoped token.
# Unscoped token does not have 'catalog', checking this
# will not work. Use 'methods' attribute instead.
return 'methods' in resource_dict
def get_token(self):
token = {'id': self._auth_token,
'expires': self.catalog['expires_at']}
try:
token['user_id'] = self.catalog['user']['id']
domain = self.catalog.get('domain')
if domain:
token['domain_id'] = domain['id']
project = self.catalog.get('project')
if project:
token['tenant_id'] = project['id']
except Exception:
# just leave the domain, project and user out if it doesn't exist
pass
return token
def get_endpoints(self, service_type=None, endpoint_type=None):
if endpoint_type:
endpoint_type = endpoint_type.rstrip('URL')
sc = {}
for service in self.catalog.get('serviceCatalog', []):
for service in self.catalog.get('catalog', []):
if service_type and service_type != service['type']:
continue
sc[service['type']] = []
for endpoint in service['endpoints']:
if endpoint_type and endpoint_type not in endpoint.keys():
if endpoint_type and endpoint_type != endpoint['interface']:
continue
sc[service['type']].append(endpoint)
return sc
def get_urls(self, attr=None, filter_value=None,
service_type='identity', endpoint_type='public'):
if endpoint_type:
endpoint_type = endpoint_type.rstrip('URL')
sc_endpoints = self.get_endpoints(service_type=service_type,
endpoint_type=endpoint_type)
endpoints = sc_endpoints.get(service_type)
if not endpoints:
return None
urls = list()
for endpoint in endpoints:
if (endpoint['interface'] == endpoint_type
and (not self.region_name
or endpoint.get('region') == self.region_name)
and (not filter_value
or endpoint.get(attr) == filter_value)):
urls.append(endpoint['url'])
return tuple(urls)
def url_for(self, attr=None, filter_value=None,
service_type='identity', endpoint_type='public'):
catalog = self.catalog.get('catalog', [])
if not catalog:
raise exceptions.EmptyCatalog('The service catalog is empty.')
if endpoint_type:
endpoint_type = endpoint_type.rstrip('URL')
for service in catalog:
if service['type'] != service_type:
continue
endpoints = service['endpoints']
for endpoint in endpoints:
if endpoint.get('interface') != endpoint_type:
continue
if (self.region_name and
endpoint.get('region') != self.region_name):
continue
if not filter_value or endpoint.get(attr) == filter_value:
return endpoint['url']
raise exceptions.EndpointNotFound('%s endpoint for %s not found.' %
(endpoint_type, service_type))

View File

@@ -48,16 +48,21 @@ class Client(client.HTTPClient):
:param string original_ip: The original IP of the requesting user
which will be sent to Keystone in a
'Forwarded' header. (optional)
:param string cert: If provided, used as a local certificate to communicate
with the keystone endpoint. If provided, requires the
additional parameter key. (optional)
:param string key: The key associated with the certificate for secure
keystone communication. (optional)
:param string cacert: the ca-certs to verify the secure communications
with keystone. (optional)
:param boolean insecure: If using an SSL endpoint, allows for the certicate
to be unsigned - does not verify the certificate
chain. default: False (optional)
:param string cert: Path to the Privacy Enhanced Mail (PEM) file which
contains the corresponding X.509 client certificate
needed to established two-way SSL connection with
the identity service. (optional)
:param string key: Path to the Privacy Enhanced Mail (PEM) file which
contains the unencrypted client private key needed
to established two-way SSL connection with the
identity service. (optional)
:param string cacert: Path to the Privacy Enhanced Mail (PEM) file which
contains the trusted authority X.509 certificates
needed to established SSL connection with the
identity service. (optional)
:param boolean insecure: Does not perform X.509 certificate validation
when establishing SSL connection with identity
service. default: False (optional)
:param dict auth_ref: To allow for consumers of the client to manage their
own caching strategy, you may initialize a client
with a previously captured auth_reference (token)
@@ -119,6 +124,7 @@ class Client(client.HTTPClient):
def __init__(self, **kwargs):
""" Initialize a new client for the Keystone v2.0 API. """
super(Client, self).__init__(**kwargs)
self.version = 'v2.0'
self.endpoints = endpoints.EndpointManager(self)
self.roles = roles.RoleManager(self)
self.services = services.ServiceManager(self)
@@ -140,28 +146,33 @@ class Client(client.HTTPClient):
# if we got a response without a service catalog, set the local
# list of tenants for introspection, and leave to client user
# to determine what to do. Otherwise, load up the service catalog
if self.auth_ref.scoped:
if self.auth_ref.project_scoped:
if not self.auth_ref.tenant_id:
raise exceptions.AuthorizationFailure(
"Token didn't provide tenant_id")
if not self.auth_ref.user_id:
raise exceptions.AuthorizationFailure(
"Token didn't provide user_id")
if self.management_url is None and self.auth_ref.management_url:
self.management_url = self.auth_ref.management_url[0]
self.tenant_name = self.auth_ref.tenant_name
self.tenant_id = self.auth_ref.tenant_id
self.user_id = self.auth_ref.user_id
self.project_name = self.auth_ref.tenant_name
self.project_id = self.auth_ref.tenant_id
self.auth_user_id = self.auth_ref.user_id
if not self.auth_ref.user_id:
raise exceptions.AuthorizationFailure(
"Token didn't provide user_id")
self.user_id = self.auth_ref.user_id
self.auth_domain_id = self.auth_ref.domain_id
self.auth_tenant_id = self.auth_ref.tenant_id
self.auth_user_id = self.auth_ref.user_id
def get_raw_token_from_identity_service(self, auth_url, username=None,
password=None, tenant_name=None,
tenant_id=None, token=None):
""" Authenticate against the Keystone API.
tenant_id=None, token=None,
project_name=None, project_id=None,
**kwargs):
""" Authenticate against the v2 Identity API.
:returns: ``raw token`` if authentication was successful.
:returns: (``resp``, ``body``) if authentication was successful.
:raises: AuthorizationFailure if unable to authenticate or validate
the existing authorization token
:raises: ValueError if insufficient parameters are used.
@@ -170,8 +181,8 @@ class Client(client.HTTPClient):
try:
return self._base_authN(auth_url,
username=username,
tenant_id=tenant_id,
tenant_name=tenant_name,
tenant_id=project_id or tenant_id,
tenant_name=project_name or tenant_name,
password=password,
token=token)
except (exceptions.AuthorizationFailure, exceptions.Unauthorized):
@@ -202,4 +213,4 @@ class Client(client.HTTPClient):
elif tenant_name:
params['auth']['tenantName'] = tenant_name
resp, body = self.request(url, 'POST', body=params, headers=headers)
return body['access']
return resp, body

View File

@@ -15,10 +15,11 @@
import json
import logging
from keystoneclient import exceptions
from keystoneclient.v2_0 import client
from keystoneclient.v3 import credentials
from keystoneclient.v3 import endpoints
from keystoneclient.v3 import domains
from keystoneclient.v3 import endpoints
from keystoneclient.v3 import groups
from keystoneclient.v3 import policies
from keystoneclient.v3 import projects
@@ -33,40 +34,60 @@ _logger = logging.getLogger(__name__)
class Client(client.Client):
"""Client for the OpenStack Identity API v3.
:param string user_id: User ID for authentication. (optional)
:param string username: Username for authentication. (optional)
:param string user_domain_id: User's domain ID for authentication.
(optional)
:param string user_domain_name: User's domain name for authentication.
(optional)
:param string password: Password for authentication. (optional)
:param string token: Token for authentication. (optional)
:param string tenant_name: Tenant id. (optional)
:param string tenant_id: Tenant name. (optional)
:param string auth_url: Keystone service endpoint for authorization.
:param string domain_id: Domain ID for domain scoping. (optional)
:param string domain_name: Domain name for domain scoping. (optional)
:param string project_id: Project ID for project scoping. (optional)
:param string project_name: Project name for project scoping. (optional)
:param string project_domain_id: Project's domain ID for project
scoping. (optional)
:param string project_domain_name: Project's domain name for project
scoping. (optional)
:param string tenant_name: Tenant name. (optional)
The tenant_name keyword argument is deprecated,
use project_name instead.
:param string tenant_id: Tenant id. (optional)
The tenant_id keyword argument is deprecated,
use project_id instead.
:param string auth_url: Identity service endpoint for authorization.
:param string region_name: Name of a region to select when choosing an
endpoint from the service catalog.
:param string endpoint: A user-supplied endpoint URL for the keystone
:param string endpoint: A user-supplied endpoint URL for the identity
service. Lazy-authentication is possible for API
service calls if endpoint is set at
instantiation.(optional)
instantiation. (optional)
:param integer timeout: Allows customization of the timeout for client
http requests. (optional)
Example::
>>> from keystoneclient.v3 import client
>>> keystone = client.Client(username=USER,
>>> keystone = client.Client(user_domain_name=DOMAIN_NAME,
... username=USER,
... password=PASS,
... tenant_name=TENANT_NAME,
... project_domain_name=PROJECT_DOMAIN_NAME,
... project_name=PROJECT_NAME,
... auth_url=KEYSTONE_URL)
...
>>> keystone.tenants.list()
>>> keystone.projects.list()
...
>>> user = keystone.users.get(USER_ID)
>>> user.delete()
"""
def __init__(self, endpoint=None, **kwargs):
""" Initialize a new client for the Keystone v3.0 API. """
super(Client, self).__init__(endpoint=endpoint, **kwargs)
def __init__(self, **kwargs):
""" Initialize a new client for the Keystone v3 API. """
super(Client, self).__init__(**kwargs)
self.version = 'v3'
self.credentials = credentials.CredentialManager(self)
self.endpoints = endpoints.EndpointManager(self)
self.domains = domains.DomainManager(self)
@@ -77,12 +98,133 @@ class Client(client.Client):
self.services = services.ServiceManager(self)
self.users = users.UserManager(self)
# NOTE(gabriel): If we have a pre-defined endpoint then we can
# get away with lazy auth. Otherwise auth immediately.
if endpoint:
self.management_url = endpoint
else:
self.authenticate()
def serialize(self, entity):
return json.dumps(entity, sort_keys=True)
def process_token(self):
""" Extract and process information from the new auth_ref.
And set the relevant authentication information.
"""
super(Client, self).process_token()
if self.auth_ref.domain_scoped:
if not self.auth_ref.domain_id:
raise exceptions.AuthorizationFailure(
"Token didn't provide domain_id")
if self.management_url is None and self.auth_ref.management_url:
self.management_url = self.auth_ref.management_url[0]
self.domain_name = self.auth_ref.domain_name
self.domain_id = self.auth_ref.domain_id
def get_raw_token_from_identity_service(self, auth_url, user_id=None,
username=None,
user_domain_id=None,
user_domain_name=None,
password=None,
domain_id=None, domain_name=None,
project_id=None, project_name=None,
project_domain_id=None,
project_domain_name=None,
token=None, **kwargs):
""" Authenticate against the v3 Identity API.
:returns: (``resp``, ``body``) if authentication was successful.
:raises: AuthorizationFailure if unable to authenticate or validate
the existing authorization token
:raises: Unauthorized if authentication fails due to invalid token
"""
try:
return self._do_auth(
auth_url,
user_id=user_id,
username=username,
user_domain_id=user_domain_id,
user_domain_name=user_domain_name,
password=password,
domain_id=domain_id,
domain_name=domain_name,
project_id=project_id,
project_name=project_name,
project_domain_id=project_domain_id,
project_domain_name=project_domain_name,
token=token)
except (exceptions.AuthorizationFailure, exceptions.Unauthorized):
_logger.debug('Authorization failed.')
raise
except Exception as e:
raise exceptions.AuthorizationFailure('Authorization failed: '
'%s' % e)
def _do_auth(self, auth_url, user_id=None, username=None,
user_domain_id=None, user_domain_name=None, password=None,
domain_id=None, domain_name=None,
project_id=None, project_name=None, project_domain_id=None,
project_domain_name=None, token=None):
headers = {}
url = auth_url + "/auth/tokens"
body = {'auth': {'identity': {}}}
ident = body['auth']['identity']
if token:
headers['X-Auth-Token'] = token
ident['methods'] = ['token']
ident['token'] = {}
ident['token']['id'] = token
if password:
ident['methods'] = ['password']
ident['password'] = {}
ident['password']['user'] = {}
user = ident['password']['user']
user['password'] = password
if user_id:
user['id'] = user_id
elif username:
user['name'] = username
if user_domain_id or user_domain_name:
user['domain'] = {}
if user_domain_id:
user['domain']['id'] = user_domain_id
elif user_domain_name:
user['domain']['name'] = user_domain_name
if (domain_id or domain_name) and (project_id or project_name):
raise ValueError('Authentication cannot be scoped to both domain'
' and project.')
if domain_id or domain_name:
body['auth']['scope'] = {}
scope = body['auth']['scope']
scope['domain'] = {}
if domain_id:
scope['domain']['id'] = domain_id
elif domain_name:
scope['domain']['name'] = domain_name
if project_id or project_name:
body['auth']['scope'] = {}
scope = body['auth']['scope']
scope['project'] = {}
if project_id:
scope['project']['id'] = project_id
elif project_name:
scope['project']['name'] = project_name
if project_domain_id or project_domain_name:
scope['project']['domain'] = {}
if project_domain_id:
scope['project']['domain']['id'] = project_domain_id
elif project_domain_name:
scope['project']['domain']['name'] = project_domain_name
if not (ident or token):
raise ValueError('Authentication method required (e.g. password)')
resp, body = self.request(url, 'POST', body=body, headers=headers)
return resp, body

View File

@@ -1,72 +0,0 @@
UNSCOPED_TOKEN = {
u'access': {u'serviceCatalog': {},
u'token': {u'expires': u'2012-10-03T16:58:01Z',
u'id': u'3e2813b7ba0b4006840c3825860b86ed'},
u'user': {u'id': u'c4da488862bd435c9e6c0275a0d0e49a',
u'name': u'exampleuser',
u'roles': [],
u'roles_links': [],
u'username': u'exampleuser'}
}
}
PROJECT_SCOPED_TOKEN = {
u'access': {
u'serviceCatalog': [{
u'endpoints': [{
u'adminURL': u'http://admin:8776/v1/225da22d3ce34b15877ea70b2a575f58',
u'internalURL':
u'http://internal:8776/v1/225da22d3ce34b15877ea70b2a575f58',
u'publicURL':
u'http://public.com:8776/v1/225da22d3ce34b15877ea70b2a575f58',
u'region': u'RegionOne'
}],
u'endpoints_links': [],
u'name': u'Volume Service',
u'type': u'volume'},
{u'endpoints': [{
u'adminURL': u'http://admin:9292/v1',
u'internalURL': u'http://internal:9292/v1',
u'publicURL': u'http://public.com:9292/v1',
u'region': u'RegionOne'}],
u'endpoints_links': [],
u'name': u'Image Service',
u'type': u'image'},
{u'endpoints': [{
u'adminURL': u'http://admin:8774/v2/225da22d3ce34b15877ea70b2a575f58',
u'internalURL': u'http://internal:8774/v2/225da22d3ce34b15877ea70b2a575f58',
u'publicURL': u'http://public.com:8774/v2/225da22d3ce34b15877ea70b2a575f58',
u'region': u'RegionOne'}],
u'endpoints_links': [],
u'name': u'Compute Service',
u'type': u'compute'},
{u'endpoints': [{
u'adminURL': u'http://admin:8773/services/Admin',
u'internalURL': u'http://internal:8773/services/Cloud',
u'publicURL': u'http://public.com:8773/services/Cloud',
u'region': u'RegionOne'}],
u'endpoints_links': [],
u'name': u'EC2 Service',
u'type': u'ec2'},
{u'endpoints': [{
u'adminURL': u'http://admin:35357/v2.0',
u'internalURL': u'http://internal:5000/v2.0',
u'publicURL': u'http://public.com:5000/v2.0',
u'region': u'RegionOne'}],
u'endpoints_links': [],
u'name': u'Identity Service',
u'type': u'identity'}],
u'token': {u'expires': u'2012-10-03T16:53:36Z',
u'id': u'04c7d5ffaeef485f9dc69c06db285bdb',
u'tenant': {u'description': u'',
u'enabled': True,
u'id': u'225da22d3ce34b15877ea70b2a575f58',
u'name': u'exampleproject'}},
u'user': {u'id': u'c4da488862bd435c9e6c0275a0d0e49a',
u'name': u'exampleuser',
u'roles': [{u'id': u'edc12489faa74ee0aca0b8a0b4d74a74',
u'name': u'Member'}],
u'roles_links': [],
u'username': u'exampleuser'}
}
}

View File

@@ -65,37 +65,46 @@ class FakeClient(object):
def authenticate(self, cl_obj):
cl_obj.user_id = '1'
cl_obj.auth_user_id = '1'
cl_obj.tenant_id = '1'
cl_obj.project_id = '1'
cl_obj.auth_tenant_id = '1'
cl_obj.auth_ref = access.AccessInfo({
"token": {
"expires": "2012-02-05T00:00:00",
"id": "887665443383838",
"tenant": {
cl_obj.auth_ref = access.AccessInfo.factory(None, {
"access": {
"token": {
"expires": "2012-02-05T00:00:00",
"id": "887665443383838",
"tenant": {
"id": "1",
"name": "customer-x"
}
},
"serviceCatalog": [{
"endpoints": [{
"adminURL": "http://swift.admin-nets.local:8080/",
"region": "RegionOne",
"internalURL": "http://127.0.0.1:8080/v1/AUTH_1",
"publicURL":
"http://swift.publicinternets.com/v1/AUTH_1"
}],
"type": "object-store",
"name": "swift"
}, {
"endpoints": [{
"adminURL": "http://cdn.admin-nets.local/v1.1/1",
"region": "RegionOne",
"internalURL": "http://127.0.0.1:7777/v1.1/1",
"publicURL": "http://cdn.publicinternets.com/v1.1/1"
}],
"type": "object-store",
"name": "cdn"
}],
"user": {
"id": "1",
"name": "customer-x"}},
"serviceCatalog": [
{"endpoints": [
{"adminURL": "http://swift.admin-nets.local:8080/",
"region": "RegionOne",
"internalURL": "http://127.0.0.1:8080/v1/AUTH_1",
"publicURL":
"http://swift.publicinternets.com/v1/AUTH_1"}],
"type": "object-store",
"name": "swift"},
{"endpoints": [
{"adminURL": "http://cdn.admin-nets.local/v1.1/1",
"region": "RegionOne",
"internalURL": "http://127.0.0.1:7777/v1.1/1",
"publicURL": "http://cdn.publicinternets.com/v1.1/1"}],
"type": "object-store",
"name": "cdn"}],
"user": {
"id": "1",
"roles": [
{"tenantId": "1",
"id": "3",
"name": "Member"}],
"name": "joeuser"}
}
)
"roles": [{
"tenantId": "1",
"id": "3",
"name": "Member"
}],
"name": "joeuser"
}
}
})

View File

@@ -1,40 +0,0 @@
import json
import mock
import requests
from keystoneclient.v2_0 import client
from tests import client_fixtures
from tests import utils
fake_response = utils.TestResponse({
"status_code": 200,
"text": json.dumps(client_fixtures.PROJECT_SCOPED_TOKEN)
})
mock_request = mock.Mock(return_value=(fake_response))
class KeystoneclientTest(utils.TestCase):
def test_scoped_init(self):
with mock.patch.object(requests, "request", mock_request):
cl = client.Client(username='exampleuser',
password='password',
auth_url='http://somewhere/')
self.assertIsNotNone(cl.auth_ref)
self.assertTrue(cl.auth_ref.scoped)
def test_auth_ref_load(self):
with mock.patch.object(requests, "request", mock_request):
cl = client.Client(username='exampleuser',
password='password',
auth_url='http://somewhere/')
cache = json.dumps(cl.auth_ref)
new_client = client.Client(auth_ref=json.loads(cache))
self.assertIsNotNone(new_client.auth_ref)
self.assertTrue(new_client.auth_ref.scoped)
self.assertEquals(new_client.username, 'exampleuser')
self.assertIsNone(new_client.password)
self.assertEqual(new_client.management_url,
'http://admin:35357/v2.0')

View File

@@ -68,3 +68,27 @@ class ClientTest(utils.TestCase):
headers=headers,
data='[1, 2, 3]',
**kwargs)
def test_post_auth(self):
with mock.patch.object(requests, "request", MOCK_REQUEST):
cl = client.HTTPClient(
username="username", password="password", tenant_id="tenant",
auth_url="auth_test", cacert="ca.pem", key="key.pem",
cert="cert.pem")
cl.management_url = "https://127.0.0.1:5000"
cl.auth_token = "token"
cl.post("/hi", body=[1, 2, 3])
headers = {
"X-Auth-Token": "token",
"Content-Type": "application/json",
"User-Agent": cl.USER_AGENT
}
kwargs = copy.copy(self.TEST_REQUEST_BASE)
kwargs['cert'] = ('cert.pem', 'key.pem')
kwargs['verify'] = 'ca.pem'
MOCK_REQUEST.assert_called_with(
"POST",
"https://127.0.0.1:5000/hi",
headers=headers,
data='[1, 2, 3]',
**kwargs)

View File

@@ -4,7 +4,7 @@ import unittest
from keystoneclient import access
from keystoneclient import client
from keystoneclient.openstack.common import timeutils
from tests import client_fixtures
from tests.v2_0 import client_fixtures
from tests import utils
try:
@@ -65,11 +65,12 @@ class KeyringTest(utils.TestCase):
cl = client.HTTPClient(username=USERNAME, password=PASSWORD,
tenant_id=TENANT_ID, auth_url=AUTH_URL)
(keyring_key, auth_ref) = cl.get_auth_ref_from_keyring(AUTH_URL,
USERNAME,
TENANT,
TENANT_ID,
TOKEN)
(keyring_key, auth_ref) = cl.get_auth_ref_from_keyring(
auth_url=AUTH_URL,
username=USERNAME,
tenant_name=TENANT,
tenant_id=TENANT_ID,
token=TOKEN)
self.assertIsNone(keyring_key)
self.assertIsNone(auth_ref)
@@ -78,49 +79,57 @@ class KeyringTest(utils.TestCase):
cl = client.HTTPClient(username=USERNAME, password=PASSWORD,
tenant_id=TENANT_ID, auth_url=AUTH_URL)
keyring_key = cl._build_keyring_key(AUTH_URL, USERNAME,
TENANT, TENANT_ID,
TOKEN)
keyring_key = cl._build_keyring_key(auth_url=AUTH_URL,
username=USERNAME,
tenant_name=TENANT,
tenant_id=TENANT_ID,
token=TOKEN)
self.assertEqual(keyring_key,
'%s/%s/%s/%s/%s' %
(AUTH_URL, USERNAME, TENANT, TENANT_ID, TOKEN))
(AUTH_URL, TENANT_ID, TENANT, TOKEN, USERNAME))
def test_set_and_get_keyring_expired(self):
cl = client.HTTPClient(username=USERNAME, password=PASSWORD,
tenant_id=TENANT_ID, auth_url=AUTH_URL,
use_keyring=True)
keyring_key = cl._build_keyring_key(AUTH_URL, USERNAME,
TENANT, TENANT_ID,
TOKEN)
keyring_key = cl._build_keyring_key(auth_url=AUTH_URL,
username=USERNAME,
tenant_name=TENANT,
tenant_id=TENANT_ID,
token=TOKEN)
cl.auth_ref = access.AccessInfo(PROJECT_SCOPED_TOKEN['access'])
cl.auth_ref = access.AccessInfo.factory(body=PROJECT_SCOPED_TOKEN)
expired = timeutils.utcnow() - datetime.timedelta(minutes=30)
cl.auth_ref['token']['expires'] = timeutils.isotime(expired)
cl.store_auth_ref_into_keyring(keyring_key)
(keyring_key, auth_ref) = cl.get_auth_ref_from_keyring(AUTH_URL,
USERNAME,
TENANT,
TENANT_ID,
TOKEN)
(keyring_key, auth_ref) = cl.get_auth_ref_from_keyring(
auth_url=AUTH_URL,
username=USERNAME,
tenant_name=TENANT,
tenant_id=TENANT_ID,
token=TOKEN)
self.assertIsNone(auth_ref)
def test_set_and_get_keyring(self):
cl = client.HTTPClient(username=USERNAME, password=PASSWORD,
tenant_id=TENANT_ID, auth_url=AUTH_URL,
use_keyring=True)
keyring_key = cl._build_keyring_key(AUTH_URL, USERNAME,
TENANT, TENANT_ID,
TOKEN)
keyring_key = cl._build_keyring_key(auth_url=AUTH_URL,
username=USERNAME,
tenant_name=TENANT,
tenant_id=TENANT_ID,
token=TOKEN)
cl.auth_ref = access.AccessInfo(PROJECT_SCOPED_TOKEN['access'])
cl.auth_ref = access.AccessInfo.factory(body=PROJECT_SCOPED_TOKEN)
expires = timeutils.utcnow() + datetime.timedelta(minutes=30)
cl.auth_ref['token']['expires'] = timeutils.isotime(expires)
cl.store_auth_ref_into_keyring(keyring_key)
(keyring_key, auth_ref) = cl.get_auth_ref_from_keyring(AUTH_URL,
USERNAME,
TENANT,
TENANT_ID,
TOKEN)
(keyring_key, auth_ref) = cl.get_auth_ref_from_keyring(
auth_url=AUTH_URL,
username=USERNAME,
tenant_name=TENANT,
tenant_id=TENANT_ID,
token=TOKEN)
self.assertEqual(auth_ref.auth_token, TOKEN)
self.assertEqual(auth_ref.username, USERNAME)

View File

@@ -1,144 +0,0 @@
from keystoneclient import exceptions
from keystoneclient import service_catalog
from tests import utils
# Taken directly from keystone/content/common/samples/auth.json
# Do not edit this structure. Instead, grab the latest from there.
SERVICE_CATALOG = {
"access": {
"token": {
"id": "ab48a9efdfedb23ty3494",
"expires": "2010-11-01T03:32:15-05:00",
"tenant": {
"id": "345",
"name": "My Project"
}
},
"user": {
"id": "123",
"name": "jqsmith",
"roles": [
{"id": "234",
"name": "compute:admin"},
{"id": "235",
"name": "object-store:admin",
"tenantId": "1"}
],
"roles_links": []
},
"serviceCatalog": [{
"name": "Cloud Servers",
"type": "compute",
"endpoints": [
{"tenantId": "1",
"publicURL": "https://compute.north.host/v1/1234",
"internalURL": "https://compute.north.host/v1/1234",
"region": "North",
"versionId": "1.0",
"versionInfo": "https://compute.north.host/v1.0/",
"versionList": "https://compute.north.host/"},
{"tenantId": "2",
"publicURL": "https://compute.north.host/v1.1/3456",
"internalURL": "https://compute.north.host/v1.1/3456",
"region": "North",
"versionId": "1.1",
"versionInfo": "https://compute.north.host/v1.1/",
"versionList": "https://compute.north.host/"}
],
"endpoints_links": []
},
{
"name": "Cloud Files",
"type": "object-store",
"endpoints": [
{"tenantId": "11",
"publicURL": "https://compute.north.host/v1/blah-blah",
"internalURL": "https://compute.north.host/v1/blah-blah",
"region": "South",
"versionId": "1.0",
"versionInfo": "uri",
"versionList": "uri"},
{"tenantId": "2",
"publicURL": "https://compute.north.host/v1.1/blah-blah",
"internalURL":
"https://compute.north.host/v1.1/blah-blah",
"region": "South",
"versionId": "1.1",
"versionInfo": "https://compute.north.host/v1.1/",
"versionList": "https://compute.north.host/"}
],
"endpoints_links": [{
"rel": "next",
"href": "https://identity.north.host/v2.0/"
"endpoints?marker=2"
}]
},
{
"name": "Image Servers",
"type": "image",
"endpoints": [
{"publicURL": "https://image.north.host/v1/",
"internalURL": "https://image-internal.north.host/v1/",
"region": "North"},
{"publicURL": "https://image.south.host/v1/",
"internalURL": "https://image-internal.south.host/v1/",
"region": "South"}
],
"endpoints_links": []
},
],
"serviceCatalog_links": [{
"rel": "next",
"href": ("https://identity.host/v2.0/endpoints?"
"session=2hfh8Ar&marker=2")
}]
}
}
class ServiceCatalogTest(utils.TestCase):
def test_building_a_service_catalog(self):
sc = service_catalog.ServiceCatalog(SERVICE_CATALOG['access'])
self.assertEquals(sc.url_for(service_type='compute'),
"https://compute.north.host/v1/1234")
self.assertEquals(sc.url_for('tenantId', '1', service_type='compute'),
"https://compute.north.host/v1/1234")
self.assertEquals(sc.url_for('tenantId', '2', service_type='compute'),
"https://compute.north.host/v1.1/3456")
self.assertRaises(exceptions.EndpointNotFound, sc.url_for, "region",
"South", service_type='compute')
def test_service_catalog_endpoints(self):
sc = service_catalog.ServiceCatalog(SERVICE_CATALOG['access'])
public_ep = sc.get_endpoints(service_type='compute',
endpoint_type='publicURL')
self.assertEquals(public_ep['compute'][1]['tenantId'], '2')
self.assertEquals(public_ep['compute'][1]['versionId'], '1.1')
self.assertEquals(public_ep['compute'][1]['internalURL'],
"https://compute.north.host/v1.1/3456")
def test_service_catalog_regions(self):
sc = service_catalog.ServiceCatalog(SERVICE_CATALOG['access'],
region_name="North")
url = sc.url_for(service_type='image', endpoint_type='publicURL')
self.assertEquals(url, "https://image.north.host/v1/")
sc = service_catalog.ServiceCatalog(SERVICE_CATALOG['access'],
region_name="South")
url = sc.url_for(service_type='image', endpoint_type='internalURL')
self.assertEquals(url, "https://image-internal.south.host/v1/")
def test_token(self):
sc = service_catalog.ServiceCatalog(SERVICE_CATALOG['access'])
self.assertEquals(sc.get_token(), {
'id': 'ab48a9efdfedb23ty3494',
'tenant_id': '345',
'user_id': '123',
'expires': '2010-11-01T03:32:15-05:00'})
self.assertEquals(sc.catalog['token']['expires'],
"2010-11-01T03:32:15-05:00")
self.assertEquals(sc.catalog['token']['tenant']['id'], '345')

View File

@@ -8,6 +8,8 @@ from keystoneclient.v2_0 import client
class TestCase(testtools.TestCase):
TEST_DOMAIN_ID = '1'
TEST_DOMAIN_NAME = 'aDomain'
TEST_TENANT_ID = '1'
TEST_TENANT_NAME = 'aTenant'
TEST_TOKEN = 'aToken'

View File

@@ -0,0 +1,163 @@
UNSCOPED_TOKEN = {
u'access': {u'serviceCatalog': {},
u'token': {u'expires': u'2012-10-03T16:58:01Z',
u'id': u'3e2813b7ba0b4006840c3825860b86ed'},
u'user': {u'id': u'c4da488862bd435c9e6c0275a0d0e49a',
u'name': u'exampleuser',
u'roles': [],
u'roles_links': [],
u'username': u'exampleuser'}
}
}
PROJECT_SCOPED_TOKEN = {
u'access': {
u'serviceCatalog': [{
u'endpoints': [{
u'adminURL': u'http://admin:8776/v1/225da22d3ce34b15877ea70b2a575f58',
u'internalURL':
u'http://internal:8776/v1/225da22d3ce34b15877ea70b2a575f58',
u'publicURL':
u'http://public.com:8776/v1/225da22d3ce34b15877ea70b2a575f58',
u'region': u'RegionOne'
}],
u'endpoints_links': [],
u'name': u'Volume Service',
u'type': u'volume'},
{u'endpoints': [{
u'adminURL': u'http://admin:9292/v1',
u'internalURL': u'http://internal:9292/v1',
u'publicURL': u'http://public.com:9292/v1',
u'region': u'RegionOne'}],
u'endpoints_links': [],
u'name': u'Image Service',
u'type': u'image'},
{u'endpoints': [{
u'adminURL': u'http://admin:8774/v2/225da22d3ce34b15877ea70b2a575f58',
u'internalURL': u'http://internal:8774/v2/225da22d3ce34b15877ea70b2a575f58',
u'publicURL': u'http://public.com:8774/v2/225da22d3ce34b15877ea70b2a575f58',
u'region': u'RegionOne'}],
u'endpoints_links': [],
u'name': u'Compute Service',
u'type': u'compute'},
{u'endpoints': [{
u'adminURL': u'http://admin:8773/services/Admin',
u'internalURL': u'http://internal:8773/services/Cloud',
u'publicURL': u'http://public.com:8773/services/Cloud',
u'region': u'RegionOne'}],
u'endpoints_links': [],
u'name': u'EC2 Service',
u'type': u'ec2'},
{u'endpoints': [{
u'adminURL': u'http://admin:35357/v2.0',
u'internalURL': u'http://internal:5000/v2.0',
u'publicURL': u'http://public.com:5000/v2.0',
u'region': u'RegionOne'}],
u'endpoints_links': [],
u'name': u'Identity Service',
u'type': u'identity'}],
u'token': {u'expires': u'2012-10-03T16:53:36Z',
u'id': u'04c7d5ffaeef485f9dc69c06db285bdb',
u'tenant': {u'description': u'',
u'enabled': True,
u'id': u'225da22d3ce34b15877ea70b2a575f58',
u'name': u'exampleproject'}},
u'user': {u'id': u'c4da488862bd435c9e6c0275a0d0e49a',
u'name': u'exampleuser',
u'roles': [{u'id': u'edc12489faa74ee0aca0b8a0b4d74a74',
u'name': u'Member'}],
u'roles_links': [],
u'username': u'exampleuser'}
}
}
AUTH_RESPONSE_BODY = {
u'access': {
u'token': {
u'id': u'ab48a9efdfedb23ty3494',
u'expires': u'2010-11-01T03:32:15-05:00',
u'tenant': {
u'id': u'345',
u'name': u'My Project'
}
},
u'user': {
u'id': u'123',
u'name': u'jqsmith',
u'roles': [{
u'id': u'234',
u'name': u'compute:admin'
}, {
u'id': u'235',
u'name': u'object-store:admin',
u'tenantId': u'1'
}],
u'roles_links': []
},
u'serviceCatalog': [{
u'name': u'Cloud Servers',
u'type': u'compute',
u'endpoints': [{
u'tenantId': u'1',
u'publicURL': u'https://compute.north.host/v1/1234',
u'internalURL': u'https://compute.north.host/v1/1234',
u'region': u'North',
u'versionId': u'1.0',
u'versionInfo': u'https://compute.north.host/v1.0/',
u'versionList': u'https://compute.north.host/u'
}, {
u'tenantId': u'2',
u'publicURL': u'https://compute.north.host/v1.1/3456',
u'internalURL': u'https://compute.north.host/v1.1/3456',
u'region': u'North',
u'versionId': u'1.1',
u'versionInfo': u'https://compute.north.host/v1.1/',
u'versionList': u'https://compute.north.host/u'
}],
u'endpoints_links': []
}, {
u'name': u'Cloud Files',
u'type': u'object-store',
u'endpoints': [{
u'tenantId': u'11',
u'publicURL': u'https://swift.north.host/v1/blah',
u'internalURL': u'https://swift.north.host/v1/blah',
u'region': u'South',
u'versionId': u'1.0',
u'versionInfo': u'uri',
u'versionList': u'uri'
}, {
u'tenantId': u'2',
u'publicURL': u'https://swift.north.host/v1.1/blah',
u'internalURL': u'https://compute.north.host/v1.1/blah',
u'region': u'South',
u'versionId': u'1.1',
u'versionInfo': u'https://swift.north.host/v1.1/',
u'versionList': u'https://swift.north.host/'
}],
u'endpoints_links': [{
u'rel': u'next',
u'href': u'https://identity.north.host/v2.0/'
u'endpoints?marker=2'
}]
}, {
u'name': u'Image Servers',
u'type': u'image',
u'endpoints': [{
u'publicURL': u'https://image.north.host/v1/',
u'internalURL': u'https://image-internal.north.host/v1/',
u'region': u'North'
}, {
u'publicURL': u'https://image.south.host/v1/',
u'internalURL': u'https://image-internal.south.host/v1/',
u'region': u'South'
}],
u'endpoints_links': []
}],
u'serviceCatalog_links': [{
u'rel': u'next',
u'href': (u'https://identity.host/v2.0/endpoints?'
u'session=2hfh8Ar&marker=2')
}]
}
}

View File

@@ -3,7 +3,7 @@ import datetime
from keystoneclient import access
from keystoneclient.openstack.common import timeutils
from tests import utils
from tests import client_fixtures
from tests.v2_0 import client_fixtures
UNSCOPED_TOKEN = client_fixtures.UNSCOPED_TOKEN
PROJECT_SCOPED_TOKEN = client_fixtures.PROJECT_SCOPED_TOKEN
@@ -11,7 +11,7 @@ PROJECT_SCOPED_TOKEN = client_fixtures.PROJECT_SCOPED_TOKEN
class AccessInfoTest(utils.TestCase):
def test_building_unscoped_accessinfo(self):
auth_ref = access.AccessInfo(UNSCOPED_TOKEN['access'])
auth_ref = access.AccessInfo.factory(body=UNSCOPED_TOKEN)
self.assertTrue(auth_ref)
self.assertIn('token', auth_ref)
@@ -30,6 +30,8 @@ class AccessInfoTest(utils.TestCase):
self.assertEquals(auth_ref.management_url, None)
self.assertFalse(auth_ref.scoped)
self.assertFalse(auth_ref.domain_scoped)
self.assertFalse(auth_ref.project_scoped)
self.assertEquals(auth_ref.expires, timeutils.parse_isotime(
UNSCOPED_TOKEN['access']['token']['expires']))
@@ -37,13 +39,13 @@ class AccessInfoTest(utils.TestCase):
def test_will_expire_soon(self):
expires = timeutils.utcnow() + datetime.timedelta(minutes=5)
UNSCOPED_TOKEN['access']['token']['expires'] = expires.isoformat()
auth_ref = access.AccessInfo(UNSCOPED_TOKEN['access'])
auth_ref = access.AccessInfo.factory(body=UNSCOPED_TOKEN)
self.assertFalse(auth_ref.will_expire_soon(stale_duration=120))
self.assertTrue(auth_ref.will_expire_soon(stale_duration=300))
self.assertFalse(auth_ref.will_expire_soon())
def test_building_scoped_accessinfo(self):
auth_ref = access.AccessInfo(PROJECT_SCOPED_TOKEN['access'])
auth_ref = access.AccessInfo.factory(body=PROJECT_SCOPED_TOKEN)
self.assertTrue(auth_ref)
self.assertIn('token', auth_ref)
@@ -68,3 +70,5 @@ class AccessInfoTest(utils.TestCase):
('http://admin:35357/v2.0',))
self.assertTrue(auth_ref.scoped)
self.assertTrue(auth_ref.project_scoped)
self.assertFalse(auth_ref.domain_scoped)

84
tests/v2_0/test_client.py Normal file
View File

@@ -0,0 +1,84 @@
import json
import mock
import requests
from keystoneclient.v2_0 import client
from tests import utils
from tests.v2_0 import client_fixtures
class KeystoneClientTest(utils.TestCase):
def setUp(self):
super(KeystoneClientTest, self).setUp()
scoped_fake_resp = utils.TestResponse({
"status_code": 200,
"text": json.dumps(client_fixtures.PROJECT_SCOPED_TOKEN)
})
self.scoped_mock_req = mock.Mock(return_value=scoped_fake_resp)
unscoped_fake_resp = utils.TestResponse({
"status_code": 200,
"text": json.dumps(client_fixtures.UNSCOPED_TOKEN)
})
self.unscoped_mock_req = mock.Mock(return_value=unscoped_fake_resp)
def test_unscoped_init(self):
with mock.patch.object(requests, "request", self.unscoped_mock_req):
c = client.Client(username='exampleuser',
password='password',
auth_url='http://somewhere/')
self.assertIsNotNone(c.auth_ref)
self.assertFalse(c.auth_ref.scoped)
self.assertFalse(c.auth_ref.domain_scoped)
self.assertFalse(c.auth_ref.project_scoped)
def test_scoped_init(self):
with mock.patch.object(requests, "request", self.scoped_mock_req):
c = client.Client(username='exampleuser',
password='password',
tenant_name='exampleproject',
auth_url='http://somewhere/')
self.assertIsNotNone(c.auth_ref)
self.assertTrue(c.auth_ref.scoped)
self.assertTrue(c.auth_ref.project_scoped)
self.assertFalse(c.auth_ref.domain_scoped)
def test_auth_ref_load(self):
with mock.patch.object(requests, "request", self.scoped_mock_req):
cl = client.Client(username='exampleuser',
password='password',
tenant_name='exampleproject',
auth_url='http://somewhere/')
cache = json.dumps(cl.auth_ref)
new_client = client.Client(auth_ref=json.loads(cache))
self.assertIsNotNone(new_client.auth_ref)
self.assertTrue(new_client.auth_ref.scoped)
self.assertTrue(new_client.auth_ref.project_scoped)
self.assertFalse(new_client.auth_ref.domain_scoped)
self.assertEquals(new_client.username, 'exampleuser')
self.assertIsNone(new_client.password)
self.assertEqual(new_client.management_url,
'http://admin:35357/v2.0')
def test_auth_ref_load_with_overridden_arguments(self):
with mock.patch.object(requests, "request", self.scoped_mock_req):
cl = client.Client(username='exampleuser',
password='password',
tenant_name='exampleproject',
auth_url='http://somewhere/')
cache = json.dumps(cl.auth_ref)
new_auth_url = "http://new-public:5000/v2.0"
new_client = client.Client(auth_ref=json.loads(cache),
auth_url=new_auth_url)
self.assertIsNotNone(new_client.auth_ref)
self.assertTrue(new_client.auth_ref.scoped)
self.assertTrue(new_client.auth_ref.scoped)
self.assertTrue(new_client.auth_ref.project_scoped)
self.assertFalse(new_client.auth_ref.domain_scoped)
self.assertEquals(new_client.auth_url, new_auth_url)
self.assertEquals(new_client.username, 'exampleuser')
self.assertIsNone(new_client.password)
self.assertEqual(new_client.management_url,
'http://admin:35357/v2.0')

View File

@@ -0,0 +1,50 @@
from keystoneclient import access
from keystoneclient import exceptions
from tests import utils
from tests.v2_0 import client_fixtures
class ServiceCatalogTest(utils.TestCase):
def setUp(self):
super(ServiceCatalogTest, self).setUp()
self.AUTH_RESPONSE_BODY = client_fixtures.AUTH_RESPONSE_BODY
def test_building_a_service_catalog(self):
auth_ref = access.AccessInfo.factory(None, self.AUTH_RESPONSE_BODY)
sc = auth_ref.service_catalog
self.assertEquals(sc.url_for(service_type='compute'),
"https://compute.north.host/v1/1234")
self.assertEquals(sc.url_for('tenantId', '1', service_type='compute'),
"https://compute.north.host/v1/1234")
self.assertEquals(sc.url_for('tenantId', '2', service_type='compute'),
"https://compute.north.host/v1.1/3456")
self.assertRaises(exceptions.EndpointNotFound, sc.url_for, "region",
"South", service_type='compute')
def test_service_catalog_endpoints(self):
auth_ref = access.AccessInfo.factory(None, self.AUTH_RESPONSE_BODY)
sc = auth_ref.service_catalog
public_ep = sc.get_endpoints(service_type='compute',
endpoint_type='publicURL')
self.assertEquals(public_ep['compute'][1]['tenantId'], '2')
self.assertEquals(public_ep['compute'][1]['versionId'], '1.1')
self.assertEquals(public_ep['compute'][1]['internalURL'],
"https://compute.north.host/v1.1/3456")
def test_service_catalog_regions(self):
self.AUTH_RESPONSE_BODY['access']['region_name'] = "North"
auth_ref = access.AccessInfo.factory(None, self.AUTH_RESPONSE_BODY)
sc = auth_ref.service_catalog
url = sc.url_for(service_type='image', endpoint_type='publicURL')
self.assertEquals(url, "https://image.north.host/v1/")
self.AUTH_RESPONSE_BODY['access']['region_name'] = "South"
auth_ref = access.AccessInfo.factory(None, self.AUTH_RESPONSE_BODY)
sc = auth_ref.service_catalog
url = sc.url_for(service_type='image', endpoint_type='internalURL')
self.assertEquals(url, "https://image-internal.south.host/v1/")

234
tests/v3/client_fixtures.py Normal file
View File

@@ -0,0 +1,234 @@
UNSCOPED_TOKEN = {
u'token': {
u'methods': [
u'password'
],
u'catalog': {},
u'expires_at': u'2010-11-01T03:32:15-05:00',
u'user': {
u'domain': {
u'id': u'4e6893b7ba0b4006840c3845660b86ed',
u'name': u'exampledomain'
},
u'id': u'c4da488862bd435c9e6c0275a0d0e49a',
u'name': u'exampleuser',
}
}
}
DOMAIN_SCOPED_TOKEN = {
u'token': {
u'methods': [
u'password'
],
u'catalog': {},
u'expires_at': u'2010-11-01T03:32:15-05:00',
u'user': {
u'domain': {
u'id': u'4e6893b7ba0b4006840c3845660b86ed',
u'name': u'exampledomain'
},
u'id': u'c4da488862bd435c9e6c0275a0d0e49a',
u'name': u'exampleuser',
},
u'domain': {
u'id': u'8e9283b7ba0b1038840c3842058b86ab',
u'name': u'anotherdomain'
},
}
}
PROJECT_SCOPED_TOKEN = {
u'token': {
u'methods': [
u'password'
],
u'catalog': [{
u'endpoints': [{
u'url':
u'http://public.com:8776/v1/225da22d3ce34b15877ea70b2a575f58',
u'region': u'RegionOne',
u'interface': u'public'
}, {
u'url':
u'http://internal:8776/v1/225da22d3ce34b15877ea70b2a575f58',
u'region': u'RegionOne',
u'interface': u'internal'
}, {
u'url':
u'http://admin:8776/v1/225da22d3ce34b15877ea70b2a575f58',
u'region': u'RegionOne',
u'interface': u'admin'
}],
u'type': u'volume'
}, {
u'endpoints': [{
u'url': u'http://public.com:9292/v1',
u'region': u'RegionOne',
u'interface': u'public'
}, {
u'url': u'http://internal:9292/v1',
u'region': u'RegionOne',
u'interface': u'internal'
}, {
u'url': u'http://admin:9292/v1',
u'region': u'RegionOne',
u'interface': u'admin'
}],
u'type': u'image'
}, {
u'endpoints': [{
u'url':
u'http://public.com:8774/v2/225da22d3ce34b15877ea70b2a575f58',
u'region': u'RegionOne',
u'interface': u'public'
}, {
u'url':
u'http://internal:8774/v2/225da22d3ce34b15877ea70b2a575f58',
u'region': u'RegionOne',
u'interface': u'internal'
}, {
u'url':
u'http://admin:8774/v2/225da22d3ce34b15877ea70b2a575f58',
u'region': u'RegionOne',
u'interface': u'admin'
}],
u'type': u'compute'
}, {
u'endpoints': [{
u'url': u'http://public.com:8773/services/Cloud',
u'region': u'RegionOne',
u'interface': u'public'
}, {
u'url': u'http://internal:8773/services/Cloud',
u'region': u'RegionOne',
u'interface': u'internal'
}, {
u'url': u'http://admin:8773/services/Admin',
u'region': u'RegionOne',
u'interface': u'admin'
}],
u'type': u'ec2'
}, {
u'endpoints': [{
u'url': u'http://public.com:5000/v3',
u'region': u'RegionOne',
u'interface': u'public'
}, {
u'url': u'http://internal:5000/v3',
u'region': u'RegionOne',
u'interface': u'internal'
}, {
u'url': u'http://admin:35357/v3',
u'region': u'RegionOne',
u'interface': u'admin'
}],
u'type': u'identity'
}],
u'expires_at': u'2010-11-01T03:32:15-05:00',
u'user': {
u'domain': {
u'id': u'4e6893b7ba0b4006840c3845660b86ed',
u'name': u'exampledomain'
},
u'id': u'c4da488862bd435c9e6c0275a0d0e49a',
u'name': u'exampleuser',
},
u'project': {
u'domain': {
u'id': u'4e6893b7ba0b4006840c3845660b86ed',
u'name': u'exampledomain'
},
u'id': u'225da22d3ce34b15877ea70b2a575f58',
u'name': u'exampleproject',
},
}
}
AUTH_RESPONSE_HEADERS = {
u'X-Subject-Token': u'3e2813b7ba0b4006840c3825860b86ed'
}
AUTH_RESPONSE_BODY = {
u'token': {
u'methods': [
u'password'
],
u'expires_at': u'2010-11-01T03:32:15-05:00',
u'project': {
u'domain': {
u'id': u'123',
u'name': u'aDomain'
},
u'id': u'345',
u'name': u'aTenant'
},
u'user': {
u'domain': {
u'id': u'1',
u'name': u'aDomain'
},
u'id': u'567',
u'name': u'test'
},
u'issued_at': u'2010-10-31T03:32:15-05:00',
u'catalog': [{
u'endpoints': [{
u'url': u'https://compute.north.host/novapi/public',
u'region': u'North',
u'interface': u'public'
}, {
u'url': u'https://compute.north.host/novapi/internal',
u'region': u'North',
u'interface': u'internal'
}, {
u'url': u'https://compute.north.host/novapi/admin',
u'region': u'North',
u'interface': u'admin'
}],
u'type': u'compute'
}, {
u'endpoints': [{
u'url': u'http://swift.north.host/swiftapi/public',
u'region': u'South',
u'interface': u'public'
}, {
u'url': u'http://swift.north.host/swiftapi/internal',
u'region': u'South',
u'interface': u'internal'
}, {
u'url': u'http://swift.north.host/swiftapi/admin',
u'region': u'South',
u'interface': u'admin'
}],
u'type': u'object-store'
}, {
u'endpoints': [{
u'url': u'http://glance.north.host/glanceapi/public',
u'region': u'North',
u'interface': u'public'
}, {
u'url': u'http://glance.north.host/glanceapi/internal',
u'region': u'North',
u'interface': u'internal'
}, {
u'url': u'http://glance.north.host/glanceapi/admin',
u'region': u'North',
u'interface': u'admin'
}, {
u'url': u'http://glance.south.host/glanceapi/public',
u'region': u'South',
u'interface': u'public'
}, {
u'url': u'http://glance.south.host/glanceapi/internal',
u'region': u'South',
u'interface': u'internal'
}, {
u'url': u'http://glance.south.host/glanceapi/admin',
u'region': u'South',
u'interface': u'admin'
}],
u'type': u'image'
}]
}
}

106
tests/v3/test_access.py Normal file
View File

@@ -0,0 +1,106 @@
import datetime
from keystoneclient import access
from keystoneclient.openstack.common import timeutils
from tests import utils
from tests.v3 import client_fixtures
TOKEN_RESPONSE = utils.TestResponse({
"headers": client_fixtures.AUTH_RESPONSE_HEADERS
})
UNSCOPED_TOKEN = client_fixtures.UNSCOPED_TOKEN
DOMAIN_SCOPED_TOKEN = client_fixtures.DOMAIN_SCOPED_TOKEN
PROJECT_SCOPED_TOKEN = client_fixtures.PROJECT_SCOPED_TOKEN
class AccessInfoTest(utils.TestCase):
def test_building_unscoped_accessinfo(self):
auth_ref = access.AccessInfo.factory(resp=TOKEN_RESPONSE,
body=UNSCOPED_TOKEN)
self.assertTrue(auth_ref)
self.assertIn('methods', auth_ref)
self.assertIn('catalog', auth_ref)
self.assertFalse(auth_ref['catalog'])
self.assertEquals(auth_ref.auth_token,
'3e2813b7ba0b4006840c3825860b86ed')
self.assertEquals(auth_ref.username, 'exampleuser')
self.assertEquals(auth_ref.user_id, 'c4da488862bd435c9e6c0275a0d0e49a')
self.assertEquals(auth_ref.project_name, None)
self.assertEquals(auth_ref.project_id, None)
self.assertEquals(auth_ref.auth_url, None)
self.assertEquals(auth_ref.management_url, None)
self.assertFalse(auth_ref.domain_scoped)
self.assertFalse(auth_ref.project_scoped)
self.assertEquals(auth_ref.expires, timeutils.parse_isotime(
UNSCOPED_TOKEN['token']['expires_at']))
def test_will_expire_soon(self):
expires = timeutils.utcnow() + datetime.timedelta(minutes=5)
UNSCOPED_TOKEN['token']['expires_at'] = expires.isoformat()
auth_ref = access.AccessInfo.factory(resp=TOKEN_RESPONSE,
body=UNSCOPED_TOKEN)
self.assertFalse(auth_ref.will_expire_soon(stale_duration=120))
self.assertTrue(auth_ref.will_expire_soon(stale_duration=300))
self.assertFalse(auth_ref.will_expire_soon())
def test_building_domain_scoped_accessinfo(self):
auth_ref = access.AccessInfo.factory(resp=TOKEN_RESPONSE,
body=DOMAIN_SCOPED_TOKEN)
self.assertTrue(auth_ref)
self.assertIn('methods', auth_ref)
self.assertIn('catalog', auth_ref)
self.assertFalse(auth_ref['catalog'])
self.assertEquals(auth_ref.auth_token,
'3e2813b7ba0b4006840c3825860b86ed')
self.assertEquals(auth_ref.username, 'exampleuser')
self.assertEquals(auth_ref.user_id, 'c4da488862bd435c9e6c0275a0d0e49a')
self.assertEquals(auth_ref.domain_name, 'anotherdomain')
self.assertEquals(auth_ref.domain_id,
'8e9283b7ba0b1038840c3842058b86ab')
self.assertEquals(auth_ref.project_name, None)
self.assertEquals(auth_ref.project_id, None)
self.assertTrue(auth_ref.domain_scoped)
self.assertFalse(auth_ref.project_scoped)
def test_building_project_scoped_accessinfo(self):
auth_ref = access.AccessInfo.factory(resp=TOKEN_RESPONSE,
body=PROJECT_SCOPED_TOKEN)
self.assertTrue(auth_ref)
self.assertIn('methods', auth_ref)
self.assertIn('catalog', auth_ref)
self.assertTrue(auth_ref['catalog'])
self.assertEquals(auth_ref.auth_token,
'3e2813b7ba0b4006840c3825860b86ed')
self.assertEquals(auth_ref.username, 'exampleuser')
self.assertEquals(auth_ref.user_id, 'c4da488862bd435c9e6c0275a0d0e49a')
self.assertEquals(auth_ref.domain_name, None)
self.assertEquals(auth_ref.domain_id, None)
self.assertEquals(auth_ref.project_name, 'exampleproject')
self.assertEquals(auth_ref.project_id,
'225da22d3ce34b15877ea70b2a575f58')
self.assertEquals(auth_ref.tenant_name, auth_ref.project_name)
self.assertEquals(auth_ref.tenant_id, auth_ref.project_id)
self.assertEquals(auth_ref.auth_url,
('http://public.com:5000/v3',))
self.assertEquals(auth_ref.management_url,
('http://admin:35357/v3',))
self.assertFalse(auth_ref.domain_scoped)
self.assertTrue(auth_ref.project_scoped)

408
tests/v3/test_auth.py Normal file
View File

@@ -0,0 +1,408 @@
import copy
import json
import requests
from keystoneclient import exceptions
from keystoneclient.v3 import client
from tests.v3 import utils
class AuthenticateAgainstKeystoneTests(utils.TestCase):
def setUp(self):
super(AuthenticateAgainstKeystoneTests, self).setUp()
self.TEST_RESPONSE_DICT = {
"token": {
"methods": [
"token",
"password"
],
"expires_at": "2020-01-01T00:00:10.000123Z",
"project": {
"domain": {
"id": self.TEST_DOMAIN_ID,
"name": self.TEST_DOMAIN_NAME
},
"id": self.TEST_TENANT_ID,
"name": self.TEST_TENANT_NAME
},
"user": {
"domain": {
"id": self.TEST_DOMAIN_ID,
"name": self.TEST_DOMAIN_NAME
},
"id": self.TEST_USER,
"name": self.TEST_USER
},
"issued_at": "2013-05-29T16:55:21.468960Z",
"catalog": self.TEST_SERVICE_CATALOG
},
}
self.TEST_REQUEST_BODY = {
"auth": {
"identity": {
"methods": ["password"],
"password": {
"user": {
"domain": {
"name": self.TEST_DOMAIN_NAME
},
"name": self.TEST_USER,
"password": self.TEST_TOKEN
}
}
},
"scope": {
"project": {
"id": self.TEST_TENANT_ID
},
}
}
}
self.TEST_REQUEST_HEADERS = {
'Content-Type': 'application/json',
'User-Agent': 'python-keystoneclient'
}
self.TEST_RESPONSE_HEADERS = {
'X-Subject-Token': self.TEST_TOKEN
}
def test_authenticate_success(self):
TEST_TOKEN = "abcdef"
self.TEST_RESPONSE_HEADERS['X-Subject-Token'] = TEST_TOKEN
ident = self.TEST_REQUEST_BODY['auth']['identity']
del ident['password']['user']['domain']
del ident['password']['user']['name']
ident['password']['user']['id'] = self.TEST_USER
resp = utils.TestResponse({
"status_code": 200,
"text": json.dumps(self.TEST_RESPONSE_DICT),
"headers": self.TEST_RESPONSE_HEADERS,
})
kwargs = copy.copy(self.TEST_REQUEST_BASE)
kwargs['headers'] = self.TEST_REQUEST_HEADERS
kwargs['data'] = json.dumps(self.TEST_REQUEST_BODY, sort_keys=True)
requests.request('POST',
self.TEST_URL + "/auth/tokens",
**kwargs).AndReturn((resp))
self.mox.ReplayAll()
cs = client.Client(user_id=self.TEST_USER,
password=self.TEST_TOKEN,
project_id=self.TEST_TENANT_ID,
auth_url=self.TEST_URL)
self.assertEqual(cs.auth_token, TEST_TOKEN)
def test_authenticate_failure(self):
ident = self.TEST_REQUEST_BODY['auth']['identity']
ident['password']['user']['password'] = 'bad_key'
resp = utils.TestResponse({
"status_code": 401,
"text": json.dumps({
"unauthorized": {
"message": "Unauthorized",
"code": "401",
},
}),
})
kwargs = copy.copy(self.TEST_REQUEST_BASE)
kwargs['headers'] = self.TEST_REQUEST_HEADERS
kwargs['data'] = json.dumps(self.TEST_REQUEST_BODY, sort_keys=True)
requests.request('POST',
self.TEST_URL + "/auth/tokens",
**kwargs).AndReturn((resp))
self.mox.ReplayAll()
# Workaround for issue with assertRaises on python2.6
# where with assertRaises(exceptions.Unauthorized): doesn't work
# right
def client_create_wrapper():
client.Client(user_domain_name=self.TEST_DOMAIN_NAME,
username=self.TEST_USER,
password="bad_key",
project_id=self.TEST_TENANT_ID,
auth_url=self.TEST_URL)
self.assertRaises(exceptions.Unauthorized, client_create_wrapper)
def test_auth_redirect(self):
correct_response = json.dumps(self.TEST_RESPONSE_DICT, sort_keys=True)
dict_responses = [
{
"headers": {
'location': self.TEST_ADMIN_URL + "/auth/tokens",
'X-Subject-Token': self.TEST_TOKEN,
},
"status_code": 305,
"text": "Use proxy",
},
{
"headers": {'X-Subject-Token': self.TEST_TOKEN},
"status_code": 200,
"text": correct_response,
},
]
responses = [(utils.TestResponse(resp))
for resp in dict_responses]
kwargs = copy.copy(self.TEST_REQUEST_BASE)
kwargs['headers'] = self.TEST_REQUEST_HEADERS
kwargs['data'] = json.dumps(self.TEST_REQUEST_BODY, sort_keys=True)
requests.request('POST',
self.TEST_URL + "/auth/tokens",
**kwargs).AndReturn(responses[0])
kwargs = copy.copy(self.TEST_REQUEST_BASE)
kwargs['headers'] = self.TEST_REQUEST_HEADERS
kwargs['data'] = json.dumps(self.TEST_REQUEST_BODY, sort_keys=True)
requests.request('POST',
self.TEST_ADMIN_URL + "/auth/tokens",
**kwargs).AndReturn(responses[1])
self.mox.ReplayAll()
cs = client.Client(user_domain_name=self.TEST_DOMAIN_NAME,
username=self.TEST_USER,
password=self.TEST_TOKEN,
project_id=self.TEST_TENANT_ID,
auth_url=self.TEST_URL)
self.assertEqual(cs.management_url,
self.TEST_RESPONSE_DICT["token"]["catalog"][3]
['endpoints'][2]["url"])
self.assertEqual(cs.auth_token,
self.TEST_RESPONSE_HEADERS["X-Subject-Token"])
def test_authenticate_success_domain_username_password_scoped(self):
resp = utils.TestResponse({
"status_code": 200,
"text": json.dumps(self.TEST_RESPONSE_DICT),
"headers": self.TEST_RESPONSE_HEADERS,
})
kwargs = copy.copy(self.TEST_REQUEST_BASE)
kwargs['headers'] = self.TEST_REQUEST_HEADERS
kwargs['data'] = json.dumps(self.TEST_REQUEST_BODY, sort_keys=True)
requests.request('POST',
self.TEST_URL + "/auth/tokens",
**kwargs).AndReturn((resp))
self.mox.ReplayAll()
cs = client.Client(user_domain_name=self.TEST_DOMAIN_NAME,
username=self.TEST_USER,
password=self.TEST_TOKEN,
project_id=self.TEST_TENANT_ID,
auth_url=self.TEST_URL)
self.assertEqual(cs.management_url,
self.TEST_RESPONSE_DICT["token"]["catalog"][3]
['endpoints'][2]["url"])
self.assertEqual(cs.auth_token,
self.TEST_RESPONSE_HEADERS["X-Subject-Token"])
def test_authenticate_success_userid_password_domain_scoped(self):
ident = self.TEST_REQUEST_BODY['auth']['identity']
del ident['password']['user']['domain']
del ident['password']['user']['name']
ident['password']['user']['id'] = self.TEST_USER
scope = self.TEST_REQUEST_BODY['auth']['scope']
del scope['project']
scope['domain'] = {}
scope['domain']['id'] = self.TEST_DOMAIN_ID
token = self.TEST_RESPONSE_DICT['token']
del token['project']
token['domain'] = {}
token['domain']['id'] = self.TEST_DOMAIN_ID
token['domain']['name'] = self.TEST_DOMAIN_NAME
resp = utils.TestResponse({
"status_code": 200,
"text": json.dumps(self.TEST_RESPONSE_DICT),
"headers": self.TEST_RESPONSE_HEADERS,
})
kwargs = copy.copy(self.TEST_REQUEST_BASE)
kwargs['headers'] = self.TEST_REQUEST_HEADERS
kwargs['data'] = json.dumps(self.TEST_REQUEST_BODY, sort_keys=True)
requests.request('POST',
self.TEST_URL + "/auth/tokens",
**kwargs).AndReturn((resp))
self.mox.ReplayAll()
cs = client.Client(user_id=self.TEST_USER,
password=self.TEST_TOKEN,
domain_id=self.TEST_DOMAIN_ID,
auth_url=self.TEST_URL)
self.assertEqual(cs.auth_domain_id,
self.TEST_DOMAIN_ID)
self.assertEqual(cs.management_url,
self.TEST_RESPONSE_DICT["token"]["catalog"][3]
['endpoints'][2]["url"])
self.assertEqual(cs.auth_token,
self.TEST_RESPONSE_HEADERS["X-Subject-Token"])
def test_authenticate_success_userid_password_project_scoped(self):
ident = self.TEST_REQUEST_BODY['auth']['identity']
del ident['password']['user']['domain']
del ident['password']['user']['name']
ident['password']['user']['id'] = self.TEST_USER
resp = utils.TestResponse({
"status_code": 200,
"text": json.dumps(self.TEST_RESPONSE_DICT),
"headers": self.TEST_RESPONSE_HEADERS,
})
kwargs = copy.copy(self.TEST_REQUEST_BASE)
kwargs['headers'] = self.TEST_REQUEST_HEADERS
kwargs['data'] = json.dumps(self.TEST_REQUEST_BODY, sort_keys=True)
requests.request('POST',
self.TEST_URL + "/auth/tokens",
**kwargs).AndReturn((resp))
self.mox.ReplayAll()
cs = client.Client(user_id=self.TEST_USER,
password=self.TEST_TOKEN,
project_id=self.TEST_TENANT_ID,
auth_url=self.TEST_URL)
self.assertEqual(cs.auth_tenant_id,
self.TEST_TENANT_ID)
self.assertEqual(cs.management_url,
self.TEST_RESPONSE_DICT["token"]["catalog"][3]
['endpoints'][2]["url"])
self.assertEqual(cs.auth_token,
self.TEST_RESPONSE_HEADERS["X-Subject-Token"])
def test_authenticate_success_password_unscoped(self):
del self.TEST_RESPONSE_DICT['token']['catalog']
del self.TEST_REQUEST_BODY['auth']['scope']
resp = utils.TestResponse({
"status_code": 200,
"text": json.dumps(self.TEST_RESPONSE_DICT),
"headers": self.TEST_RESPONSE_HEADERS,
})
kwargs = copy.copy(self.TEST_REQUEST_BASE)
kwargs['headers'] = self.TEST_REQUEST_HEADERS
kwargs['data'] = json.dumps(self.TEST_REQUEST_BODY, sort_keys=True)
requests.request('POST',
self.TEST_URL + "/auth/tokens",
**kwargs).AndReturn((resp))
self.mox.ReplayAll()
cs = client.Client(user_domain_name=self.TEST_DOMAIN_NAME,
username=self.TEST_USER,
password=self.TEST_TOKEN,
auth_url=self.TEST_URL)
self.assertEqual(cs.auth_token,
self.TEST_RESPONSE_HEADERS["X-Subject-Token"])
self.assertFalse('catalog' in cs.service_catalog.catalog)
def test_authenticate_success_token_domain_scoped(self):
ident = self.TEST_REQUEST_BODY['auth']['identity']
del ident['password']
ident['methods'] = ['token']
ident['token'] = {}
ident['token']['id'] = self.TEST_TOKEN
scope = self.TEST_REQUEST_BODY['auth']['scope']
del scope['project']
scope['domain'] = {}
scope['domain']['id'] = self.TEST_DOMAIN_ID
token = self.TEST_RESPONSE_DICT['token']
del token['project']
token['domain'] = {}
token['domain']['id'] = self.TEST_DOMAIN_ID
token['domain']['name'] = self.TEST_DOMAIN_NAME
self.TEST_REQUEST_HEADERS['X-Auth-Token'] = self.TEST_TOKEN
resp = utils.TestResponse({
"status_code": 200,
"text": json.dumps(self.TEST_RESPONSE_DICT),
"headers": self.TEST_RESPONSE_HEADERS,
})
kwargs = copy.copy(self.TEST_REQUEST_BASE)
kwargs['headers'] = self.TEST_REQUEST_HEADERS
kwargs['data'] = json.dumps(self.TEST_REQUEST_BODY, sort_keys=True)
requests.request('POST',
self.TEST_URL + "/auth/tokens",
**kwargs).AndReturn((resp))
self.mox.ReplayAll()
cs = client.Client(token=self.TEST_TOKEN,
domain_id=self.TEST_DOMAIN_ID,
auth_url=self.TEST_URL)
self.assertEqual(cs.auth_domain_id,
self.TEST_DOMAIN_ID)
self.assertEqual(cs.management_url,
self.TEST_RESPONSE_DICT["token"]["catalog"][3]
['endpoints'][2]["url"])
self.assertEqual(cs.auth_token,
self.TEST_RESPONSE_HEADERS["X-Subject-Token"])
def test_authenticate_success_token_project_scoped(self):
ident = self.TEST_REQUEST_BODY['auth']['identity']
del ident['password']
ident['methods'] = ['token']
ident['token'] = {}
ident['token']['id'] = self.TEST_TOKEN
self.TEST_REQUEST_HEADERS['X-Auth-Token'] = self.TEST_TOKEN
resp = utils.TestResponse({
"status_code": 200,
"text": json.dumps(self.TEST_RESPONSE_DICT),
"headers": self.TEST_RESPONSE_HEADERS,
})
kwargs = copy.copy(self.TEST_REQUEST_BASE)
kwargs['headers'] = self.TEST_REQUEST_HEADERS
kwargs['data'] = json.dumps(self.TEST_REQUEST_BODY, sort_keys=True)
requests.request('POST',
self.TEST_URL + "/auth/tokens",
**kwargs).AndReturn((resp))
self.mox.ReplayAll()
cs = client.Client(token=self.TEST_TOKEN,
project_id=self.TEST_TENANT_ID,
auth_url=self.TEST_URL)
self.assertEqual(cs.auth_tenant_id,
self.TEST_TENANT_ID)
self.assertEqual(cs.management_url,
self.TEST_RESPONSE_DICT["token"]["catalog"][3]
['endpoints'][2]["url"])
self.assertEqual(cs.auth_token,
self.TEST_RESPONSE_HEADERS["X-Subject-Token"])
def test_authenticate_success_token_unscoped(self):
ident = self.TEST_REQUEST_BODY['auth']['identity']
del ident['password']
ident['methods'] = ['token']
ident['token'] = {}
ident['token']['id'] = self.TEST_TOKEN
del self.TEST_REQUEST_BODY['auth']['scope']
del self.TEST_RESPONSE_DICT['token']['catalog']
self.TEST_REQUEST_HEADERS['X-Auth-Token'] = self.TEST_TOKEN
resp = utils.TestResponse({
"status_code": 200,
"text": json.dumps(self.TEST_RESPONSE_DICT),
"headers": self.TEST_RESPONSE_HEADERS,
})
kwargs = copy.copy(self.TEST_REQUEST_BASE)
kwargs['headers'] = self.TEST_REQUEST_HEADERS
kwargs['data'] = json.dumps(self.TEST_REQUEST_BODY, sort_keys=True)
requests.request('POST',
self.TEST_URL + "/auth/tokens",
**kwargs).AndReturn((resp))
self.mox.ReplayAll()
cs = client.Client(token=self.TEST_TOKEN,
auth_url=self.TEST_URL)
self.assertEqual(cs.auth_token,
self.TEST_RESPONSE_HEADERS["X-Subject-Token"])
self.assertFalse('catalog' in cs.service_catalog.catalog)

121
tests/v3/test_client.py Normal file
View File

@@ -0,0 +1,121 @@
import json
import mock
import requests
from keystoneclient.v3 import client
from tests import utils
from tests.v3 import client_fixtures
class KeystoneClientTest(utils.TestCase):
def setUp(self):
super(KeystoneClientTest, self).setUp()
domain_scoped_fake_resp = utils.TestResponse({
"status_code": 200,
"text": json.dumps(client_fixtures.DOMAIN_SCOPED_TOKEN),
"headers": client_fixtures.AUTH_RESPONSE_HEADERS
})
self.domain_scoped_mock_req = mock.Mock(
return_value=domain_scoped_fake_resp)
project_scoped_fake_resp = utils.TestResponse({
"status_code": 200,
"text": json.dumps(client_fixtures.PROJECT_SCOPED_TOKEN),
"headers": client_fixtures.AUTH_RESPONSE_HEADERS
})
self.project_scoped_mock_req = mock.Mock(
return_value=project_scoped_fake_resp)
unscoped_fake_resp = utils.TestResponse({
"status_code": 200,
"text": json.dumps(client_fixtures.UNSCOPED_TOKEN),
"headers": client_fixtures.AUTH_RESPONSE_HEADERS
})
self.unscoped_mock_req = mock.Mock(return_value=unscoped_fake_resp)
def test_unscoped_init(self):
with mock.patch.object(requests, "request", self.unscoped_mock_req):
c = client.Client(user_domain_name='exampledomain',
username='exampleuser',
password='password',
auth_url='http://somewhere/')
self.assertIsNotNone(c.auth_ref)
self.assertFalse(c.auth_ref.domain_scoped)
self.assertFalse(c.auth_ref.project_scoped)
self.assertEquals(c.auth_user_id,
'c4da488862bd435c9e6c0275a0d0e49a')
def test_domain_scoped_init(self):
with mock.patch.object(requests,
"request",
self.domain_scoped_mock_req):
c = client.Client(user_id='c4da488862bd435c9e6c0275a0d0e49a',
password='password',
domain_name='exampledomain',
auth_url='http://somewhere/')
self.assertIsNotNone(c.auth_ref)
self.assertTrue(c.auth_ref.domain_scoped)
self.assertFalse(c.auth_ref.project_scoped)
self.assertEquals(c.auth_user_id,
'c4da488862bd435c9e6c0275a0d0e49a')
self.assertEquals(c.auth_domain_id,
'8e9283b7ba0b1038840c3842058b86ab')
def test_project_scoped_init(self):
with mock.patch.object(requests,
"request",
self.project_scoped_mock_req):
c = client.Client(user_id='c4da488862bd435c9e6c0275a0d0e49a',
password='password',
user_domain_name='exampledomain',
project_name='exampleproject',
auth_url='http://somewhere/')
self.assertIsNotNone(c.auth_ref)
self.assertFalse(c.auth_ref.domain_scoped)
self.assertTrue(c.auth_ref.project_scoped)
self.assertEquals(c.auth_user_id,
'c4da488862bd435c9e6c0275a0d0e49a')
self.assertEquals(c.auth_tenant_id,
'225da22d3ce34b15877ea70b2a575f58')
def test_auth_ref_load(self):
with mock.patch.object(requests,
"request",
self.project_scoped_mock_req):
c = client.Client(user_id='c4da488862bd435c9e6c0275a0d0e49a',
password='password',
project_id='225da22d3ce34b15877ea70b2a575f58',
auth_url='http://somewhere/')
cache = json.dumps(c.auth_ref)
new_client = client.Client(auth_ref=json.loads(cache))
self.assertIsNotNone(new_client.auth_ref)
self.assertFalse(new_client.auth_ref.domain_scoped)
self.assertTrue(new_client.auth_ref.project_scoped)
self.assertEquals(new_client.username, 'exampleuser')
self.assertIsNone(new_client.password)
self.assertEqual(new_client.management_url,
'http://admin:35357/v3')
def test_auth_ref_load_with_overridden_arguments(self):
with mock.patch.object(requests,
"request",
self.project_scoped_mock_req):
c = client.Client(user_id='c4da488862bd435c9e6c0275a0d0e49a',
password='password',
project_id='225da22d3ce34b15877ea70b2a575f58',
auth_url='http://somewhere/')
cache = json.dumps(c.auth_ref)
new_auth_url = "http://new-public:5000/v3"
new_client = client.Client(auth_ref=json.loads(cache),
auth_url=new_auth_url)
self.assertIsNotNone(new_client.auth_ref)
self.assertFalse(new_client.auth_ref.domain_scoped)
self.assertTrue(new_client.auth_ref.project_scoped)
self.assertEquals(new_client.auth_url, new_auth_url)
self.assertEquals(new_client.username, 'exampleuser')
self.assertIsNone(new_client.password)
self.assertEqual(new_client.management_url,
'http://admin:35357/v3')

View File

@@ -0,0 +1,55 @@
from keystoneclient import access
from keystoneclient import exceptions
from tests.v3 import client_fixtures
from tests.v3 import utils
class ServiceCatalogTest(utils.TestCase):
def setUp(self):
super(ServiceCatalogTest, self).setUp()
self.AUTH_RESPONSE_BODY = client_fixtures.AUTH_RESPONSE_BODY
self.RESPONSE = utils.TestResponse({
"headers": client_fixtures.AUTH_RESPONSE_HEADERS
})
def test_building_a_service_catalog(self):
auth_ref = access.AccessInfo.factory(self.RESPONSE,
self.AUTH_RESPONSE_BODY)
sc = auth_ref.service_catalog
self.assertEquals(sc.url_for(service_type='compute'),
"https://compute.north.host/novapi/public")
self.assertEquals(sc.url_for(service_type='compute',
endpoint_type='internal'),
"https://compute.north.host/novapi/internal")
self.assertRaises(exceptions.EndpointNotFound, sc.url_for, "region",
"South", service_type='compute')
def test_service_catalog_endpoints(self):
auth_ref = access.AccessInfo.factory(self.RESPONSE,
self.AUTH_RESPONSE_BODY)
sc = auth_ref.service_catalog
public_ep = sc.get_endpoints(service_type='compute',
endpoint_type='public')
self.assertEquals(public_ep['compute'][0]['region'], 'North')
self.assertEquals(public_ep['compute'][0]['url'],
"https://compute.north.host/novapi/public")
def test_service_catalog_regions(self):
self.AUTH_RESPONSE_BODY['token']['region_name'] = "North"
auth_ref = access.AccessInfo.factory(self.RESPONSE,
self.AUTH_RESPONSE_BODY)
sc = auth_ref.service_catalog
url = sc.url_for(service_type='image', endpoint_type='public')
self.assertEquals(url, "http://glance.north.host/glanceapi/public")
self.AUTH_RESPONSE_BODY['token']['region_name'] = "South"
auth_ref = access.AccessInfo.factory(self.RESPONSE,
self.AUTH_RESPONSE_BODY)
sc = auth_ref.service_catalog
url = sc.url_for(service_type='image', endpoint_type='internal')
self.assertEquals(url, "http://glance.south.host/glanceapi/internal")

View File

@@ -1,8 +1,8 @@
import copy
import json
import uuid
import time
import urlparse
import uuid
import mox
import requests
@@ -32,6 +32,9 @@ class TestClient(client.Client):
class TestCase(testtools.TestCase):
TEST_DOMAIN_ID = '1'
TEST_DOMAIN_NAME = 'aDomain'
TEST_TENANT_ID = '1'
TEST_TENANT_NAME = 'aTenant'
TEST_TOKEN = 'aToken'
TEST_USER = 'test'
@@ -43,6 +46,84 @@ class TestCase(testtools.TestCase):
'verify': True,
}
TEST_SERVICE_CATALOG = [{
"endpoints": [{
"url": "http://cdn.admin-nets.local:8774/v1.0/",
"region": "RegionOne",
"interface": "public"
}, {
"url": "http://127.0.0.1:8774/v1.0",
"region": "RegionOne",
"interface": "internal"
}, {
"url": "http://cdn.admin-nets.local:8774/v1.0",
"region": "RegionOne",
"interface": "admin"
}],
"type": "nova_compat"
}, {
"endpoints": [{
"url": "http://nova/novapi/public",
"region": "RegionOne",
"interface": "public"
}, {
"url": "http://nova/novapi/internal",
"region": "RegionOne",
"interface": "internal"
}, {
"url": "http://nova/novapi/admin",
"region": "RegionOne",
"interface": "admin"
}],
"type": "compute"
}, {
"endpoints": [{
"url": "http://glance/glanceapi/public",
"region": "RegionOne",
"interface": "public"
}, {
"url": "http://glance/glanceapi/internal",
"region": "RegionOne",
"interface": "internal"
}, {
"url": "http://glance/glanceapi/admin",
"region": "RegionOne",
"interface": "admin"
}],
"type": "image",
"name": "glance"
}, {
"endpoints": [{
"url": "http://127.0.0.1:5000/v3",
"region": "RegionOne",
"interface": "public"
}, {
"url": "http://127.0.0.1:5000/v3",
"region": "RegionOne",
"interface": "internal"
}, {
"url": "http://127.0.0.1:35357/v3",
"region": "RegionOne",
"interface": "admin"
}],
"type": "identity"
}, {
"endpoints": [{
"url": "http://swift/swiftapi/public",
"region": "RegionOne",
"interface": "public"
}, {
"url": "http://swift/swiftapi/internal",
"region": "RegionOne",
"interface": "internal"
}, {
"url": "http://swift/swiftapi/admin",
"region": "RegionOne",
"interface": "admin"
}],
"type": "object-store"
}]
def setUp(self):
super(TestCase, self).setUp()
self.mox = mox.Mox()