removing repeat attempt at authorization in client

blueprint solidify-python-api

* extended and updated documentation strings
* updated README.rst with latest options
* made debug a pass-through value, optionally set on client (instead of
  just being pulled from environment variable)
* adding AccessInfo object and associated tests
  (access.AccessInfo meant to be a cacheable object external to client
  and ultimately to replace service_catalog and it's existing functionality)
* extending authtoken to support lists of endpoints
* maintaining a single entity for client.management_url with first from
  list of possible endpoints
* create project_name and project_id synonyms to match tenant_name and
  tenant_id
* replacing authenticate call to a pure method, not overloading the
  resource/manager path that confuses base URL concepts.
* throw AuthorizationFailure if client attempts to access keystone
  resources before it has a management url
* special case listing tenant using auth_url for unscoped tokens authorized
  through client
* special case listing tokens.authenticate for Dashboard to allow unscoped
  tokens to hand back parity information to dashboard

Change-Id: I4bb3a1b6a5ce2c4b3fbcebeb59116286cac8b2e3
This commit is contained in:
Joe Heck
2012-10-13 00:15:39 +00:00
parent d471f65231
commit f1cc3cfc42
10 changed files with 487 additions and 92 deletions

View File

@@ -78,14 +78,18 @@ You'll find complete documentation on the shell by running
[--os-tenant-id <tenant-id>] [--os-auth-url <auth-url>]
[--os-region-name <region-name>]
[--os-identity-api-version <identity-api-version>]
[--token <service-token>] [--endpoint <service-endpoint>]
[--os-token <service-token>]
[--os-endpoint <service-endpoint>]
[--os-cacert <ca-certificate>] [--os-cert <certificate>]
[--os-key <key>] [--insecure] [--token <service-token>]
[--endpoint <service-endpoint>]
<subcommand> ...
Command-line interface to the OpenStack Identity API.
Positional arguments:
<subcommand>
catalog List service catalog, possibly filtered by service.
catalog
ec2-credentials-create
Create EC2-compatibile credentials for user per tenant
ec2-credentials-delete
@@ -96,13 +100,12 @@ You'll find complete documentation on the shell by running
List EC2-compatibile credentials for a user
endpoint-create Create a new endpoint associated with a service
endpoint-delete Delete a service endpoint
endpoint-get Find endpoint filtered by a specific attribute or
service type
endpoint-get
endpoint-list List configured service endpoints
role-create Create new role
role-delete Delete role
role-get Display role details
role-list List all available roles
role-list List all roles
service-create Add service to Service Catalog
service-delete Delete service from Service Catalog
service-get Display service from Service Catalog
@@ -112,18 +115,22 @@ You'll find complete documentation on the shell by running
tenant-get Display tenant details
tenant-list List all tenants
tenant-update Update tenant name, description, enabled status
token-get Display the current user token
token-get
user-create Create new user
user-delete Delete user
user-get Display user details.
user-list List users
user-password-update
Update user password
user-role-add Add role to user
user-role-list List roles granted to a user
user-role-remove Remove role from user
user-role-list List roles for user
user-update Update user's name, email, and enabled status
discover Discover Keystone servers and show authentication
protocols and
bootstrap Grants a new role to a new user on a new tenant, after
creating each.
bash-completion Prints all of the commands and options to stdout.
help Display help about this program or one of its
subcommands.
@@ -142,9 +149,22 @@ You'll find complete documentation on the shell by running
Defaults to env[OS_REGION_NAME]
--os-identity-api-version <identity-api-version>
Defaults to env[OS_IDENTITY_API_VERSION] or 2.0
--os-token <service-token>
Defaults to env[OS_SERVICE_TOKEN]
--os-endpoint <service-endpoint>
Defaults to env[OS_SERVICE_ENDPOINT]
--os-cacert <ca-certificate>
Defaults to env[OS_CACERT]
--os-cert <certificate>
Defaults to env[OS_CERT]
--os-key <key> Defaults to env[OS_KEY]
--insecure Explicitly allow keystoneclient to perform "insecure"
SSL (https) requests. The server's certificate will
not be verified against any certificate authorities.
This option should be used with caution.
--token <service-token>
Defaults to env[SERVICE_TOKEN]
Deprecated. use --os-token
--endpoint <service-endpoint>
Defaults to env[SERVICE_ENDPOINT]
Deprecated. use --os-endpoint
See "keystone help COMMAND" for help on a specific command.
See "keystone help COMMAND" for help on a specific command.

144
keystoneclient/access.py Normal file
View File

@@ -0,0 +1,144 @@
# Copyright 2012 Nebula, Inc.
#
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
class AccessInfo(dict):
"""An object for encapsulating a raw authentication token from keystone
and helper methods for extracting useful values from that token."""
def __init__(self, *args, **kwargs):
dict.__init__(self, *args, **kwargs)
@property
def auth_token(self):
""" Returns the token_id associated with the auth request, to be used
in headers for authenticating OpenStack API requests.
:returns: str
"""
return self['token'].get('id', None)
@property
def username(self):
""" Returns the username associated with the authentication request.
Follows the pattern defined in the V2 API of first looking for 'name',
returning that if available, and falling back to 'username' if name
is unavailable.
:returns: str
"""
name = self['user'].get('name', None)
if name:
return name
else:
return self['user'].get('username', None)
@property
def user_id(self):
""" Returns the user id associated with the authentication request.
:returns: str
"""
return self['user'].get('id', None)
@property
def tenant_name(self):
""" Returns the tenant (project) name associated with the
authentication request.
:returns: str
"""
tenant_dict = self['token'].get('tenant', None)
if tenant_dict:
return tenant_dict.get('name', None)
return None
@property
def project_name(self):
""" Synonym for tenant_name """
return self.tenant_name
@property
def scoped(self):
""" Returns true if the authorization token was scoped to a tenant
(project), and contains a populated service catalog.
:returns: bool
"""
if ('serviceCatalog' in self
and self['serviceCatalog']
and 'tenant' in self['token']):
return True
return False
@property
def tenant_id(self):
""" Returns the tenant (project) id associated with the authentication
request, or None if the authentication request wasn't scoped to a
tenant (project).
:returns: str
"""
tenant_dict = self['token'].get('tenant', None)
if tenant_dict:
return tenant_dict.get('id', None)
return None
@property
def project_id(self):
""" Synonym for project_id """
return self.tenant_id
@property
def auth_url(self):
""" Returns a tuple of URLs from publicURL and adminURL for the service
'identity' from the service catalog associated with the authorization
request. If the authentication request wasn't scoped to a tenant
(project), this property will return None.
:returns: tuple of urls
"""
return_list = []
if 'serviceCatalog' in self and self['serviceCatalog']:
identity_services = [x for x in self['serviceCatalog']
if x['type'] == 'identity']
for svc in identity_services:
for endpoint in svc['endpoints']:
if 'publicURL' in endpoint:
return_list.append(endpoint['publicURL'])
if len(return_list) > 0:
return tuple(return_list)
return None
@property
def management_url(self):
""" Returns the first adminURL for 'identity' from the service catalog
associated with the authorization request, or None if the
authentication request wasn't scoped to a tenant (project).
:returns: tuple of urls
"""
return_list = []
if 'serviceCatalog' in self and self['serviceCatalog']:
identity_services = [x for x in self['serviceCatalog']
if x['type'] == 'identity']
for svc in identity_services:
for endpoint in svc['endpoints']:
if 'adminURL' in endpoint:
return_list.append(endpoint['adminURL'])
if len(return_list) > 0:
return tuple(return_list)
return None

View File

@@ -10,7 +10,6 @@ OpenStack Client interface. Handles the REST calls and responses.
import copy
import logging
import os
import urlparse
import httplib2
@@ -26,6 +25,7 @@ if not hasattr(urlparse, 'parse_qsl'):
urlparse.parse_qsl = cgi.parse_qsl
from keystoneclient import access
from keystoneclient import exceptions
@@ -39,31 +39,42 @@ class HTTPClient(httplib2.Http):
def __init__(self, username=None, tenant_id=None, tenant_name=None,
password=None, auth_url=None, region_name=None, timeout=None,
endpoint=None, token=None, cacert=None, key=None,
cert=None, insecure=False, original_ip=None):
cert=None, insecure=False, original_ip=None, debug=False,
auth_ref=None):
super(HTTPClient, self).__init__(timeout=timeout, ca_certs=cacert)
if cert:
if key:
self.add_certificate(key=key, cert=cert, domain='')
else:
self.add_certificate(key=cert, cert=cert, domain='')
self.version = 'v2.0'
self.auth_ref = access.AccessInfo(**auth_ref) if auth_ref else None
if self.auth_ref:
self.username = self.auth_ref.username
self.tenant_id = self.auth_ref.tenant_id
self.tenant_name = self.auth_ref.tenant_name
self.auth_url = self.auth_ref.auth_url
self.management_url = self.auth_ref.management_url
self.auth_token = self.auth_ref.auth_token
#NOTE(heckj): allow override of the auth_ref defaults from explicit
# values provided to the client
self.username = username
self.tenant_id = tenant_id
self.tenant_name = tenant_name
self.password = password
self.auth_url = auth_url.rstrip('/') if auth_url else None
self.version = 'v2.0'
self.region_name = region_name
self.auth_token = token
self.original_ip = original_ip
self.management_url = endpoint
self.management_url = endpoint.rstrip('/') if endpoint else None
self.region_name = region_name
# httplib2 overrides
self.force_exception_to_status_code = True
self.disable_ssl_certificate_validation = insecure
# logging setup
self.debug_log = os.environ.get('KEYSTONECLIENT_DEBUG', False)
self.debug_log = debug
if self.debug_log:
ch = logging.StreamHandler()
_logger.setLevel(logging.DEBUG)
@@ -74,6 +85,10 @@ class HTTPClient(httplib2.Http):
Not implemented here because auth protocols should be API
version-specific.
Expected to authenticate or validate an existing authentication
reference already associated with the client. Invoking this call
*always* makes a call to the Keystone.
"""
raise NotImplementedError
@@ -135,7 +150,7 @@ class HTTPClient(httplib2.Http):
self.http_log_resp(resp, body)
if resp.status in (400, 401, 403, 404, 408, 409, 413, 500, 501):
_logger.debug("Request returned failure status.")
_logger.debug("Request returned failure status: %s", resp.status)
raise exceptions.from_response(resp, body)
elif resp.status in (301, 302, 305):
# Redirected. Reissue the request to the new location.
@@ -153,32 +168,20 @@ class HTTPClient(httplib2.Http):
return resp, body
def _cs_request(self, url, method, **kwargs):
if not self.management_url:
self.authenticate()
""" Makes an authenticated request to keystone endpoint by
concatenating self.management_url and url and passing in method and
any associated kwargs. """
if self.management_url is None:
raise exceptions.AuthorizationFailure(
'Current authorization does not have a known management url')
kwargs.setdefault('headers', {})
if self.auth_token:
kwargs['headers']['X-Auth-Token'] = self.auth_token
# Perform the request once. If we get a 401 back then it
# might be because the auth token expired, so try to
# re-authenticate and try again. If it still fails, bail.
try:
resp, body = self.request(self.management_url + url, method,
**kwargs)
return resp, body
except exceptions.Unauthorized:
try:
if getattr(self, '_failures', 0) < 1:
self._failures = getattr(self, '_failures', 0) + 1
self.authenticate()
resp, body = self.request(self.management_url + url,
method, **kwargs)
return resp, body
else:
raise
except exceptions.Unauthorized:
raise
resp, body = self.request(self.management_url + url, method,
**kwargs)
return resp, body
def get(self, url, **kwargs):
return self._cs_request(url, 'GET', **kwargs)

View File

@@ -259,9 +259,6 @@ class OpenStackIdentityShell(object):
self.do_bash_completion(args)
return 0
#FIXME(usrleon): Here should be restrict for project id same as
# for username or apikey but for compatibility it is not.
# TODO(heckj): supporting backwards compatibility with environment
# variables. To be removed after DEVSTACK is updated, ideally in
# the Grizzly release cycle.
@@ -328,27 +325,27 @@ class OpenStackIdentityShell(object):
cacert=args.os_cacert,
key=args.os_key,
cert=args.os_cert,
insecure=args.insecure)
insecure=args.insecure,
debug=args.debug)
else:
token = None
endpoint = None
if args.os_token and args.os_endpoint:
token = args.os_token
endpoint = args.os_endpoint
api_version = options.os_identity_api_version
self.cs = self.get_api_class(api_version)(
username=args.os_username,
tenant_name=args.os_tenant_name,
tenant_id=args.os_tenant_id,
token=token,
endpoint=endpoint,
endpoint=args.os_endpoint,
password=args.os_password,
auth_url=args.os_auth_url,
region_name=args.os_region_name,
cacert=args.os_cacert,
key=args.os_key,
cert=args.os_cert,
insecure=args.insecure)
insecure=args.insecure,
debug=args.debug)
try:
args.func(self.cs, args)

View File

@@ -14,6 +14,7 @@
# under the License.
import logging
from keystoneclient import access
from keystoneclient import client
from keystoneclient import exceptions
from keystoneclient import service_catalog
@@ -35,8 +36,8 @@ class Client(client.HTTPClient):
:param string username: Username for authentication. (optional)
:param string password: Password for authentication. (optional)
:param string token: Token for authentication. (optional)
:param string tenant_name: Tenant id. (optional)
:param string tenant_id: Tenant name. (optional)
:param string tenant_id: Tenant id. (optional)
:param string tenant_name: Tenant name. (optional)
:param string auth_url: Keystone service endpoint for authorization.
:param string region_name: Name of a region to select when choosing an
endpoint from the service catalog.
@@ -49,6 +50,32 @@ class Client(client.HTTPClient):
:param string original_ip: The original IP of the requesting user
which will be sent to Keystone in a
'Forwarded' header. (optional)
:param string cert: If provided, used as a local certificate to communicate
with the keystone endpoint. If provided, requires the
additional parameter key. (optional)
:param string key: The key associated with the certificate for secure
keystone communication. (optional)
:param string cacert: the ca-certs to verify the secure communications
with keystone. (optional)
:param boolean insecure: If using an SSL endpoint, allows for the certicate
to be unsigned - does not verify the certificate
chain. default: False (optional)
:param dict auth_ref: To allow for consumers of the client to manage their
own caching strategy, you may initialize a client
with a previously captured auth_reference (token)
:param boolean debug: Enables debug logging of all request and responses
to keystone. default False (option)
.. warning::
If debug is enabled, it may show passwords in plain text as a part of its
output.
The client can be created and used like a user or in a strictly
bootstrap mode. Normal operation expects a username, password, auth_url,
and tenant_name or id to be provided. Other values will be lazily loaded
as needed from the service catalog.
Example::
@@ -62,51 +89,105 @@ class Client(client.HTTPClient):
>>> user = keystone.users.get(USER_ID)
>>> user.delete()
Once authenticated, you can store and attempt to re-use the
authenticated token. the auth_ref property on the client
returns as a dictionary-like-object so that you can export and
cache it, re-using it when initiating another client::
>>> from keystoneclient.v2_0 import client
>>> keystone = client.Client(username=USER,
password=PASS,
tenant_name=TENANT_NAME,
auth_url=KEYSTONE_URL)
>>> auth_ref = keystone.auth_ref
>>> # pickle or whatever you like here
>>> new_client = client.Client(auth_ref=auth_ref)
Alternatively, you can provide the administrative token configured in
keystone and an endpoint to communicate with directly. See
(``admin_token`` in ``keystone.conf``) In this case, authenticate()
is not needed, and no service catalog will be loaded.
Example::
>>> from keystoneclient.v2_0 import client
>>> admin_client = client.Client(
token='12345secret7890',
endpoint='http://localhost:35357/v2.0')
>>> keystone.tenants.list()
"""
def __init__(self, endpoint=None, **kwargs):
def __init__(self, **kwargs):
""" Initialize a new client for the Keystone v2.0 API. """
super(Client, self).__init__(endpoint=endpoint, **kwargs)
super(Client, self).__init__(**kwargs)
self.endpoints = endpoints.EndpointManager(self)
self.roles = roles.RoleManager(self)
self.services = services.ServiceManager(self)
self.tenants = tenants.TenantManager(self)
self.tokens = tokens.TokenManager(self)
self.users = users.UserManager(self)
# NOTE(gabriel): If we have a pre-defined endpoint then we can
# get away with lazy auth. Otherwise auth immediately.
# extensions
self.ec2 = ec2.CredentialsManager(self)
self.management_url = endpoint
if endpoint is None:
if self.management_url is None:
self.authenticate()
#TODO(heckj): move to a method on auth_ref
def has_service_catalog(self):
"""Returns True if this client provides a service catalog."""
return hasattr(self, 'service_catalog')
def authenticate(self):
""" Authenticate against the Identity API.
def authenticate(self, username=None, password=None, tenant_name=None,
tenant_id=None, auth_url=None, token=None):
""" Authenticate against the Keystone API.
Uses the data provided at instantiation to authenticate against
the Keystone server. This may use either a username and password
or token for authentication. If a tenant id was provided
or token for authentication. If a tenant name or id was provided
then the resulting authenticated client will be scoped to that
tenant and contain a service catalog of available endpoints.
Returns ``True`` if authentication was successful.
With the v2.0 API, if a tenant name or ID is not provided, the
authenication token returned will be 'unscoped' and limited in
capabilities until a fully-scoped token is acquired.
If successful, sets the self.auth_ref and self.auth_token with
the returned token. If not already set, will also set
self.management_url from the details provided in the token.
:returns: ``True`` if authentication was successful.
:raises: AuthorizationFailure if unable to authenticate or validate
the existing authorization token
:raises: ValueError if insufficient parameters are used.
"""
self.management_url = self.auth_url
auth_url = auth_url or self.auth_url
username = username or self.username
password = password or self.password
tenant_name = tenant_name or self.tenant_name
tenant_id = tenant_id or self.tenant_id
token = token or self.auth_token
try:
raw_token = self.tokens.authenticate(username=self.username,
tenant_id=self.tenant_id,
tenant_name=self.tenant_name,
password=self.password,
token=self.auth_token,
return_raw=True)
self._extract_service_catalog(self.auth_url, raw_token)
raw_token = self._base_authN(auth_url,
username=username,
tenant_id=tenant_id,
tenant_name=tenant_name,
password=password,
token=token)
self.auth_ref = access.AccessInfo(**raw_token)
# if we got a response without a service catalog, set the local
# list of tenants for introspection, and leave to client user
# to determine what to do. Otherwise, load up the service catalog
self.auth_token = self.auth_ref.auth_token
if self.auth_ref.scoped:
if self.management_url is None:
self.management_url = self.auth_ref.management_url[0]
self.tenant_name = self.auth_ref.tenant_name
self.tenant_id = self.auth_ref.tenant_id
self.user_id = self.auth_ref.user_id
self._extract_service_catalog(self.auth_url, self.auth_ref)
return True
except (exceptions.AuthorizationFailure, exceptions.Unauthorized):
_logger.debug("Authorization Failed.")
@@ -115,31 +196,38 @@ class Client(client.HTTPClient):
raise exceptions.AuthorizationFailure("Authorization Failed: "
"%s" % e)
def _base_authN(self, auth_url, username=None, password=None,
tenant_name=None, tenant_id=None, token=None):
""" Takes a username, password, and optionally a tenant_id or
tenant_name to get an authentication token from keystone.
May also take a token and a tenant_id to re-scope a token
to a tenant."""
headers = {}
url = auth_url + "/tokens"
if token:
headers['X-Auth-Token'] = token
params = {"auth": {"token": {"id": token}}}
elif username and password:
params = {"auth": {"passwordCredentials": {"username": username,
"password": password}}}
else:
raise ValueError('A username and password or token is required.')
if tenant_id:
params['auth']['tenantId'] = tenant_id
elif tenant_name:
params['auth']['tenantName'] = tenant_name
resp, body = self.request(url, 'POST', body=params, headers=headers)
return body['access']
# TODO(heckj): remove entirely in favor of access.AccessInfo and
# associated methods
def _extract_service_catalog(self, url, body):
""" Set the client's service catalog from the response data. """
self.service_catalog = service_catalog.ServiceCatalog(body)
try:
sc = self.service_catalog.get_token()
self.auth_token = sc['id']
# Save these since we have them and they'll be useful later
self.auth_tenant_id = sc.get('tenant_id')
self.auth_user_id = sc.get('user_id')
except KeyError:
raise exceptions.AuthorizationFailure()
# FIXME(ja): we should be lazy about setting managment_url.
# in fact we should rewrite the client to support the service
# catalog (api calls should be directable to any endpoints)
try:
self.management_url = self.service_catalog.url_for(
attr='region', filter_value=self.region_name,
endpoint_type='adminURL')
except exceptions.EmptyCatalog:
# Unscoped tokens don't return a service catalog;
# allow those to pass while any other errors bubble up.
pass
except exceptions.EndpointNotFound:
# the client shouldn't expect the authenticating user to
# be authorized to view adminURL's, nor expect the identity
# endpoint to publish one
pass

View File

@@ -107,7 +107,16 @@ class TenantManager(base.ManagerWithFind):
if params:
query = "?" + urllib.urlencode(params)
return self._list("/tenants%s" % query, "tenants")
reset = 0
if self.api.management_url is None:
# special casing to allow tenant lists on the auth_url
# for unscoped tokens
reset = 1
self.api.management_url = self.api.auth_url
tenant_list = self._list("/tenants%s" % query, "tenants")
if reset:
self.api.management_url = None
return tenant_list
def update(self, tenant_id, tenant_name=None, description=None,
enabled=None):

View File

@@ -34,7 +34,15 @@ class TokenManager(base.ManagerWithFind):
params['auth']['tenantId'] = tenant_id
elif tenant_name:
params['auth']['tenantName'] = tenant_name
return self._create('/tokens', params, "access", return_raw=return_raw)
reset = 0
if self.api.management_url is None:
reset = 1
self.api.management_url = self.api.auth_url
token_ref = self._create('/tokens', params, "access",
return_raw=return_raw)
if reset:
self.api.management_url = None
return token_ref
def delete(self, token):
return self._delete("/tokens/%s" % base.getid(token))

125
tests/test_access.py Normal file
View File

@@ -0,0 +1,125 @@
from keystoneclient import access
from tests import utils
UNSCOPED_TOKEN = {
u'access': {u'serviceCatalog': {},
u'token': {u'expires': u'2012-10-03T16:58:01Z',
u'id': u'3e2813b7ba0b4006840c3825860b86ed'},
u'user': {u'id': u'c4da488862bd435c9e6c0275a0d0e49a',
u'name': u'exampleuser',
u'roles': [],
u'roles_links': [],
u'username': u'exampleuser'}
}
}
PROJECT_SCOPED_TOKEN = {
u'access': {
u'serviceCatalog': [{
u'endpoints': [{
u'adminURL': u'http://admin:8776/v1/225da22d3ce34b15877ea70b2a575f58',
u'internalURL':
u'http://internal:8776/v1/225da22d3ce34b15877ea70b2a575f58',
u'publicURL':
u'http://public.com:8776/v1/225da22d3ce34b15877ea70b2a575f58',
u'region': u'RegionOne'
}],
u'endpoints_links': [],
u'name': u'Volume Service',
u'type': u'volume'},
{u'endpoints': [{
u'adminURL': u'http://admin:9292/v1',
u'internalURL': u'http://internal:9292/v1',
u'publicURL': u'http://public.com:9292/v1',
u'region': u'RegionOne'}],
u'endpoints_links': [],
u'name': u'Image Service',
u'type': u'image'},
{u'endpoints': [{
u'adminURL': u'http://admin:8774/v2/225da22d3ce34b15877ea70b2a575f58',
u'internalURL': u'http://internal:8774/v2/225da22d3ce34b15877ea70b2a575f58',
u'publicURL': u'http://public.com:8774/v2/225da22d3ce34b15877ea70b2a575f58',
u'region': u'RegionOne'}],
u'endpoints_links': [],
u'name': u'Compute Service',
u'type': u'compute'},
{u'endpoints': [{
u'adminURL': u'http://admin:8773/services/Admin',
u'internalURL': u'http://internal:8773/services/Cloud',
u'publicURL': u'http://public.com:8773/services/Cloud',
u'region': u'RegionOne'}],
u'endpoints_links': [],
u'name': u'EC2 Service',
u'type': u'ec2'},
{u'endpoints': [{
u'adminURL': u'http://admin:35357/v2.0',
u'internalURL': u'http://internal:5000/v2.0',
u'publicURL': u'http://public.com:5000/v2.0',
u'region': u'RegionOne'}],
u'endpoints_links': [],
u'name': u'Identity Service',
u'type': u'identity'}],
u'token': {u'expires': u'2012-10-03T16:53:36Z',
u'id': u'04c7d5ffaeef485f9dc69c06db285bdb',
u'tenant': {u'description': u'',
u'enabled': True,
u'id': u'225da22d3ce34b15877ea70b2a575f58',
u'name': u'exampleproject'}},
u'user': {u'id': u'c4da488862bd435c9e6c0275a0d0e49a',
u'name': u'exampleuser',
u'roles': [{u'id': u'edc12489faa74ee0aca0b8a0b4d74a74',
u'name': u'Member'}],
u'roles_links': [],
u'username': u'exampleuser'}
}
}
class AccessInfoTest(utils.TestCase):
def test_building_unscoped_accessinfo(self):
auth_ref = access.AccessInfo(UNSCOPED_TOKEN['access'])
self.assertTrue(auth_ref)
self.assertIn('token', auth_ref)
self.assertIn('serviceCatalog', auth_ref)
self.assertFalse(auth_ref['serviceCatalog'])
self.assertEquals(auth_ref.auth_token,
'3e2813b7ba0b4006840c3825860b86ed')
self.assertEquals(auth_ref.username, 'exampleuser')
self.assertEquals(auth_ref.user_id, 'c4da488862bd435c9e6c0275a0d0e49a')
self.assertEquals(auth_ref.tenant_name, None)
self.assertEquals(auth_ref.tenant_id, None)
self.assertEquals(auth_ref.auth_url, None)
self.assertEquals(auth_ref.management_url, None)
self.assertFalse(auth_ref.scoped)
def test_building_scoped_accessinfo(self):
auth_ref = access.AccessInfo(PROJECT_SCOPED_TOKEN['access'])
self.assertTrue(auth_ref)
self.assertIn('token', auth_ref)
self.assertIn('serviceCatalog', auth_ref)
self.assertTrue(auth_ref['serviceCatalog'])
self.assertEquals(auth_ref.auth_token,
'04c7d5ffaeef485f9dc69c06db285bdb')
self.assertEquals(auth_ref.username, 'exampleuser')
self.assertEquals(auth_ref.user_id, 'c4da488862bd435c9e6c0275a0d0e49a')
self.assertEquals(auth_ref.tenant_name, 'exampleproject')
self.assertEquals(auth_ref.tenant_id,
'225da22d3ce34b15877ea70b2a575f58')
self.assertEquals(auth_ref.tenant_name, auth_ref.project_name)
self.assertEquals(auth_ref.tenant_id, auth_ref.project_id)
self.assertEquals(auth_ref.auth_url,
('http://public.com:5000/v2.0',))
self.assertEquals(auth_ref.management_url,
('http://admin:35357/v2.0',))
self.assertTrue(auth_ref.scoped)

View File

@@ -26,6 +26,13 @@ def get_authed_client():
class ClientTest(utils.TestCase):
def test_unauthorized_client_requests(self):
cl = get_client()
self.assertRaises(exceptions.AuthorizationFailure, cl.get, '/hi')
self.assertRaises(exceptions.AuthorizationFailure, cl.post, '/hi')
self.assertRaises(exceptions.AuthorizationFailure, cl.put, '/hi')
self.assertRaises(exceptions.AuthorizationFailure, cl.delete, '/hi')
def test_get(self):
cl = get_authed_client()

View File

@@ -65,12 +65,6 @@ class AuthenticateAgainstKeystoneTests(utils.TestCase):
}),
})
# Implicit retry on API calls, so it gets called twice
httplib2.Http.request(self.TEST_URL + "/tokens",
'POST',
body=json.dumps(self.TEST_REQUEST_BODY),
headers=self.TEST_REQUEST_HEADERS) \
.AndReturn((resp, resp['body']))
httplib2.Http.request(self.TEST_URL + "/tokens",
'POST',
body=json.dumps(self.TEST_REQUEST_BODY),