Refactored Client to use Keystone Sessions

This CR changes the Client to use Keystone Sessions exclusively. All
authentication that was previously available in barbicanclient is
removed so that we can defer all authentication to keystoneclient.

Unit tests for the client module have been refactored to test the new
functionality.

The README file has been updated to give an example of how to
authenticate with Identity v3 using Sessions.

The Client will now give priority to an endpoint that is passed in over
endpoints found in the service catalog, which closes bug #1276002.  This
is helpful for development, where you might be running an instance of
Barbican locally that uses a Keystone instance for authentication that
does not have the Barbican service in its catalog.

The CLI uses Keystone API v3 as the default authentication method, but
can also use v2.0.

DocImpact
Implements: blueprint use-keystone-sessions
Closes-Bug: #1276002

Change-Id: I9d07cb50143c1f5cafea48318018a205f745999f
This commit is contained in:
Douglas Mendizabal
2014-10-12 20:25:07 -05:00
parent a096e67401
commit ddec180c41
9 changed files with 438 additions and 960 deletions

View File

@@ -26,69 +26,134 @@ with keystone authentication:
.. code:: pycon
>>> from barbicanclient.common import auth
>>> from keystoneclient.auth import identity
>>> from keystoneclient import session
>>> from barbicanclient import client
>>> # We'll use keystone for authentication
>>> keystone = auth.KeystoneAuthV2(auth_url='http://keystone-int.cloudkeep.io:5000/v2.0',
... username='USER', password='PASSWORD', tenant_name='TENANT')
>>> barbican = client.Client(auth_plugin=keystone)
>>> # Let's store some sensitive data, Barbican encrypts it and stores it securely in the cloud
>>> secret_uri = barbican.secrets.store(name='Self destruction sequence',
... payload='the magic words are squeamish ossifrage',
... payload_content_type='text/plain')
>>> # Let's look at some properties of a barbican Secret
>>> secret = barbican.secrets.get(secret_uri)
>>> # We'll use Keystone API v3 for authentication
>>> auth = identity.v3.Password(auth_url='http://localhost:5000/v3',
... username='admin_user',
... user_domain_name='Default',
... password='password',
... project_name='demo',
... project_domain_name='Default')
>>> # Next we'll create a Keystone session using the auth plugin we just created
>>> sess = session.Session(auth=auth)
>>> # Now we use the session to create a Barbican client
>>> barbican = client.Client(session=sess)
>>> # Let's create a Secret to store some sensitive data
>>> secret = barbican.secrets.create(name='Self destruction sequence',
... payload='the magic words are squeamish ossifrage',
... payload_content_type='text/plain')
>>> # Now let's store the secret by using its store() method. This will send the secret data
>>> # to Barbican, where it will be encrypted and stored securely in the cloud.
>>> secret.store()
u'http://localhost:9311/v1/secrets/85b220fd-f414-483f-94e4-2f422480f655'
>>> # The URI returned by store() uniquely identifies your secret in the Barbican service.
>>> # After a secret is stored, the URI is also avaiable by accessing the secret_ref attribute.
>>> print(secret.secret_ref)
u'http://api-01-int.cloudkeep.io:9311/v1/test_tenant/secrets/49496a6d-c674-4384-b208-7cf4988f84ee'
>>> print(secret.name)
Self destruction sequence
>>> # Now let's retrieve the secret payload. Barbican decrypts it and sends it back.
>>> print(barbican.secrets.decrypt(secret.secret_ref))
http://localhost:9311/v1/secrets/091adb32-4050-4980-8558-90833c531413
>>> # When we need to retrieve our secret at a later time, we can use the secret_ref
>>> retrieved_secret = barbican.secrets.get(u'http://localhost:9311/v1/secrets/091adb32-4050-4980-8558-90833c531413')
>>> # We can access the secret payload by using the payload attribute.
>>> # Barbican decrypts the secret and sends it back.
>>> print(retrieved_secret.payload)
the magic words are squeamish ossifrage
barbican - Command Line Client
------------------------------
Command line client configuration and usage is `documented in the wiki <https://github.com/cloudkeep/python-barbicanclient/wiki/Command-Line-Client>`__.
The command line client is self-documenting. Use the --help flag to access the usage options
.. code:: console
$ barbican
usage: barbican [-h] [--no-auth | --os-auth-url <auth-url>]
[--os-username <auth-user-name>] [--os-password <auth-password>]
[--os-tenant-name <auth-tenant-name>] [--os-tenant-id <tenant-id>]
[--endpoint <barbican-url>]
<entity> <action> ...
$ barbican --help
usage: barbican [--version] [-v] [--log-file LOG_FILE] [-q] [-h] [--debug]
[--no-auth] [--os-identity-api-version <identity-api-version>]
[--os-auth-url <auth-url>] [--os-username <auth-user-name>]
[--os-user-id <auth-user-id>] [--os-password <auth-password>]
[--os-user-domain-id <auth-user-domain-id>]
[--os-user-domain-name <auth-user-domain-name>]
[--os-tenant-name <auth-tenant-name>]
[--os-tenant-id <tenant-id>]
[--os-project-id <auth-project-id>]
[--os-project-name <auth-project-name>]
[--os-project-domain-id <auth-project-domain-id>]
[--os-project-domain-name <auth-project-domain-name>]
[--endpoint <barbican-url>] [--insecure]
[--os-cacert <ca-certificate>] [--os-cert <certificate>]
[--os-key <key>] [--timeout <seconds>]
Command-line interface to the Barbican API.
positional arguments:
<entity> Entity used for command, e.g., order, secret or verification.
optional arguments:
--version show program's version number and exit
-v, --verbose Increase verbosity of output. Can be repeated.
--log-file LOG_FILE Specify a file to log output. Disabled by default.
-q, --quiet suppress output except warnings and errors
-h, --help show this help message and exit
--debug show tracebacks on errors
--no-auth, -N Do not use authentication.
--os-identity-api-version <identity-api-version>
Specify Identity API version to use. Defaults to
env[OS_IDENTITY_API_VERSION] or 3.0.
--os-auth-url <auth-url>, -A <auth-url>
Defaults to env[OS_AUTH_URL].
--os-username <auth-user-name>, -U <auth-user-name>
Defaults to env[OS_USERNAME].
--os-user-id <auth-user-id>
Defaults to env[OS_USER_ID].
--os-password <auth-password>, -P <auth-password>
Defaults to env[OS_PASSWORD].
--os-user-domain-id <auth-user-domain-id>
Defaults to env[OS_USER_DOMAIN_ID].
--os-user-domain-name <auth-user-domain-name>
Defaults to env[OS_USER_DOMAIN_NAME].
--os-tenant-name <auth-tenant-name>, -T <auth-tenant-name>
Defaults to env[OS_TENANT_NAME].
--os-tenant-id <tenant-id>, -I <tenant-id>
Defaults to env[OS_TENANT_ID].
--os-project-id <auth-project-id>
Another way to specify tenant ID. This option is
mutually exclusive with --os-tenant-id. Defaults to
env[OS_PROJECT_ID].
--os-project-name <auth-project-name>
Another way to specify tenant name. This option is
mutually exclusive with --os-tenant-name. Defaults to
env[OS_PROJECT_NAME].
--os-project-domain-id <auth-project-domain-id>
Defaults to env[OS_PROJECT_DOMAIN_ID].
--os-project-domain-name <auth-project-domain-name>
Defaults to env[OS_PROJECT_DOMAIN_NAME].
--endpoint <barbican-url>, -E <barbican-url>
--endpoint <barbican-url>, -E <barbican-url>
Defaults to env[BARBICAN_ENDPOINT].
--insecure Explicitly allow client to perform "insecure" TLS
(https) requests. The server's certificate will not be
verified against any certificate authorities. This
option should be used with caution.
--os-cacert <ca-certificate>
Specify a CA bundle file to use in verifying a TLS
(https) server certificate. Defaults to
env[OS_CACERT].
--os-cert <certificate>
Defaults to env[OS_CERT].
--os-key <key> Defaults to env[OS_KEY].
--timeout <seconds> Set request timeout (in seconds).
subcommands:
Action to perform
Commands:
complete print bash completion command
container create Store a container in Barbican.
container delete Delete a container by providing its href.
container get Retrieve a container by providing its URI.
container list List containers.
help print detailed help for another command
order create Create a new order.
order delete Delete an order by providing its href.
order get Retrieve an order by providing its URI.
order list List orders.
secret delete Delete an secret by providing its href.
secret get Retrieve a secret by providing its URI.
secret list List secrets.
secret store Store a secret in Barbican.
<action>
create Create a new order.
store Store a secret in barbican.
verify Begin a verification process in barbican.
get Retrieve a secret, an order or a verification result by providing its URI.
list List secrets, orders or verifications.
delete Delete a secret, order or verification by providing its href.

View File

@@ -21,12 +21,16 @@ import sys
from cliff import app
from cliff import commandmanager
from keystoneclient.auth import identity
from keystoneclient import session
from barbicanclient.common import auth
from barbicanclient import client
from barbicanclient import version
_DEFAULT_IDENTITY_API_VERSION = '3.0'
class Barbican(app.App):
"""Barbican command line interface."""
@@ -46,6 +50,12 @@ class Barbican(app.App):
description, version, argparse_kwargs)
parser.add_argument('--no-auth', '-N', action='store_true',
help='Do not use authentication.')
parser.add_argument('--os-identity-api-version',
metavar='<identity-api-version>',
default=client.env('OS_IDENTITY_API_VERSION'),
help='Specify Identity API version to use. '
'Defaults to env[OS_IDENTITY_API_VERSION]'
' or 3.0.')
parser.add_argument('--os-auth-url', '-A',
metavar='<auth-url>',
default=client.env('OS_AUTH_URL'),
@@ -104,14 +114,7 @@ class Barbican(app.App):
metavar='<barbican-url>',
default=client.env('BARBICAN_ENDPOINT'),
help='Defaults to env[BARBICAN_ENDPOINT].')
parser.add_argument('--insecure',
default=False,
action="store_true",
help='Explicitly allow barbicanclient to perform '
'"insecure" TLS (https) requests. The '
'server\'s certificate will not be verified '
'against any certificate authorities. This '
'option should be used with caution.')
session.Session.register_cli_options(parser)
return parser
def _assert_no_auth_and_auth_url_mutually_exclusive(self, no_auth,
@@ -136,18 +139,46 @@ class Barbican(app.App):
'ERROR: please specify --endpoint and '
'--os-project-id(or --os-tenant-id)')
self.client = client.Client(endpoint=args.endpoint,
tenant_id=args.os_tenant_id or
project_id=args.os_tenant_id or
args.os_project_id,
insecure=args.insecure)
verify=not args.insecure)
elif all([args.os_auth_url, args.os_user_id or args.os_username,
args.os_password, args.os_tenant_name or args.os_tenant_id or
args.os_project_name or args.os_project_id]):
ks_session = auth.create_keystone_auth_session(args)
kwargs = dict()
kwargs['auth_url'] = args.os_auth_url
kwargs['password'] = args.os_password
if args.os_user_id:
kwargs['user_id'] = args.os_user_id
if args.os_username:
kwargs['username'] = args.os_username
api_version = args.os_identity_api_version
if not api_version or api_version == _DEFAULT_IDENTITY_API_VERSION:
if args.os_project_id:
kwargs['project_id'] = args.os_project_id
if args.os_project_name:
kwargs['project_name'] = args.os_project_name
if args.os_user_domain_id:
kwargs['user_domain_id'] = args.os_user_domain_id
if args.os_user_domain_name:
kwargs['user_domain_name'] = args.os_user_domain_name
if args.os_project_domain_id:
kwargs['project_domain_id'] = args.os_project_domain_id
if args.os_project_domain_name:
kwargs['project_domain_name'] = args.os_project_domain_name
auth = identity.v3.Password(**kwargs)
else:
if args.os_tenant_id:
kwargs['tenant_id'] = args.os_tenant_id
if args.os_tenant_name:
kwargs['tenant_name'] = args.os_tenant_name
auth = identity.v2.Password(**kwargs)
ks_session = session.Session(auth=auth, verify=not args.insecure)
self.client = client.Client(session=ks_session,
endpoint=args.endpoint,
tenant_id=args.os_tenant_id or
args.os_project_id,
insecure=args.insecure)
endpoint=args.endpoint)
else:
self.stderr.write(self.parser.format_usage())
raise Exception('ERROR: please specify authentication credentials')

View File

@@ -19,7 +19,6 @@ import os
from keystoneclient.auth.base import BaseAuthPlugin
from keystoneclient import session as ks_session
from barbicanclient.common.auth import KeystoneAuthPluginWrapper
from barbicanclient import containers
from barbicanclient._i18n import _
from barbicanclient import orders
@@ -27,6 +26,9 @@ from barbicanclient import secrets
LOG = logging.getLogger(__name__)
_DEFAULT_SERVICE_TYPE = 'key-manager'
_DEFAULT_SERVICE_INTERFACE = 'public'
_DEFAULT_API_VERSION = 'v1'
class HTTPError(Exception):
@@ -57,133 +59,109 @@ class HTTPAuthError(HTTPError):
class Client(object):
def __init__(self, session=None, auth_plugin=None, endpoint=None,
tenant_id=None, insecure=False, service_type='keystore',
interface='public'):
def __init__(self, session=None, endpoint=None, project_id=None,
verify=True, service_type=_DEFAULT_SERVICE_TYPE,
service_name=None, interface=_DEFAULT_SERVICE_INTERFACE,
region_name=None):
"""
Barbican client object used to interact with barbican service.
:param session: This can be either requests.Session or
keystoneclient.session.Session
:param auth_plugin: Authentication backend plugin
defaults to None. This can also be a keystoneclient authentication
plugin.
:param endpoint: Barbican endpoint url. Required when not using
an auth_plugin. When not provided, the client will try to
fetch this from the auth service catalog
:param tenant_id: The tenant ID used for context in barbican.
Required when not using auth_plugin. When not provided,
the client will try to get this from the auth_plugin.
:param insecure: Explicitly allow barbicanclient to perform
"insecure" TLS (https) requests. The server's certificate
will not be verified against any certificate authorities.
This option should be used with caution.
:param service_type: Used as an endpoint filter when using a
keystone auth plugin. Defaults to 'keystore'
:param interface: Another endpoint filter. Defaults to 'public'
:param session: An instance of keystoneclient.session.Session that
can be either authenticated, or not authenticated. When using
a non-authenticated Session, you must provide some additional
parameters. When no session is provided it will default to a
non-authenticated Session.
:param endpoint: Barbican endpoint url. Required when a session is not
given, or when using a non-authentciated session.
When using an authenticated session, the client will attempt
to get an endpoint from the session.
:param project_id: The project ID used for context in Barbican.
Required when a session is not given, or when using a
non-authenticated session.
When using an authenticated session, the project ID will be
provided by the authentication mechanism.
:param verify: When a session is not given, the client will create
a non-authenticated session. This parameter is passed to the
session that is created. If set to False, it allows
barbicanclient to perform "insecure" TLS (https) requests.
The server's certificate will not be verified against any
certificate authorities.
WARNING: This option should be used with caution.
:param service_type: Used as an endpoint filter when using an
authenticated keystone session. Defaults to 'key-management'.
:param service_name: Used as an endpoint filter when using an
authenticated keystone session.
:param interface: Used as an endpoint filter when using an
authenticated keystone session. Defaults to 'public'.
:param region_name: Used as an endpoint filter when using an
authenticated keystone session.
"""
LOG.debug(_("Creating Client object"))
self._wrap_session_with_keystone_if_required(session, insecure)
auth_plugin = self._update_session_auth_plugin(auth_plugin)
LOG.debug("Creating Client object")
if auth_plugin:
self._barbican_url = self._session.get_endpoint(
service_type=service_type, interface=interface)
self._tenant_id = self._get_tenant_id(self._session, auth_plugin)
self._session = session or ks_session.Session(verify=verify)
if self._session.auth is None:
self._validate_endpoint_and_project_id(endpoint, project_id)
if endpoint is not None:
self._barbican_endpoint = self._get_normalized_endpoint(endpoint)
else:
# neither auth_plugin is provided nor it is available from session
# fallback to passed in parameters
self._validate_endpoint_and_tenant_id(endpoint, tenant_id)
self._barbican_url = self._get_normalized_endpoint(endpoint)
self._tenant_id = tenant_id
self._barbican_endpoint = self._get_normalized_endpoint(
self._session.get_endpoint(
service_type=service_type, service_name=service_name,
interface=interface, region_name=region_name
)
)
if project_id is None:
self._default_headers = dict()
else:
# If provided we'll include the project ID in all requests.
self._default_headers = {'X-Project-Id': project_id}
self._base_url = '{0}/{1}'.format(self._barbican_endpoint,
_DEFAULT_API_VERSION)
self._base_url = '{0}'.format(self._barbican_url)
self.secrets = secrets.SecretManager(self)
self.orders = orders.OrderManager(self)
self.containers = containers.ContainerManager(self)
def _wrap_session_with_keystone_if_required(self, session, insecure):
# if session is not a keystone session, wrap it
if not isinstance(session, ks_session.Session):
self._session = ks_session.Session(
session=session, verify=not insecure)
else:
self._session = session
def _update_session_auth_plugin(self, auth_plugin):
# if auth_plugin is not provided and the session
# has one, use it
using_auth_from_session = False
if auth_plugin is None and self._session.auth is not None:
auth_plugin = self._session.auth
using_auth_from_session = True
ks_auth_plugin = auth_plugin
# if auth_plugin is not a keystone plugin, wrap it
if auth_plugin and not isinstance(auth_plugin, BaseAuthPlugin):
ks_auth_plugin = KeystoneAuthPluginWrapper(auth_plugin)
# if auth_plugin is provided, override the session's auth with it
if not using_auth_from_session:
self._session.auth = ks_auth_plugin
return auth_plugin
def _validate_endpoint_and_tenant_id(self, endpoint, tenant_id):
def _validate_endpoint_and_project_id(self, endpoint, project_id):
if endpoint is None:
raise ValueError('Barbican endpoint url must be provided, or '
'must be available from auth_plugin or '
'keystone_client')
if tenant_id is None:
raise ValueError('Tenant ID must be provided, or must be '
'available from the auth_plugin or '
'keystone-client')
raise ValueError('Barbican endpoint url must be provided when not '
'using auth in the Keystone Session.')
if project_id is None:
raise ValueError('Project ID must be provided when not using auth '
'in the Keystone Session')
def _get_normalized_endpoint(self, endpoint):
if endpoint.endswith('/'):
endpoint = endpoint[:-1]
return endpoint
def _get_tenant_id(self, session, auth_plugin):
if isinstance(auth_plugin, BaseAuthPlugin):
# this is a keystoneclient auth plugin
if hasattr(auth_plugin, 'get_access'):
return auth_plugin.get_access(session).project_id
else:
# not an identity auth plugin and we don't know how to lookup
# the tenant_id
raise ValueError('Unable to obtain tenant_id from auth plugin')
else:
# this is a Barbican auth plugin
return auth_plugin.tenant_id
def _prepare_auth(self, headers):
if isinstance(headers, dict) and not self._session.auth:
headers['X-Project-Id'] = self._tenant_id
def _get(self, href, params=None):
headers = {'Accept': 'application/json'}
self._prepare_auth(headers)
headers.update(self._default_headers)
resp = self._session.get(href, params=params, headers=headers)
self._check_status_code(resp)
return resp.json()
def _get_raw(self, href, headers):
self._prepare_auth(headers)
headers.update(self._default_headers)
resp = self._session.get(href, headers=headers)
self._check_status_code(resp)
return resp.content
def _delete(self, href, json=None):
headers = {}
self._prepare_auth(headers)
headers = dict()
headers.update(self._default_headers)
resp = self._session.delete(href, headers=headers, json=json)
self._check_status_code(resp)
def _post(self, path, data):
url = '{0}/{1}/'.format(self._base_url, path)
headers = {'content-type': 'application/json'}
self._prepare_auth(headers)
headers = {'Content-Type': 'application/json'}
headers.update(self._default_headers)
resp = self._session.post(url, data=json.dumps(data), headers=headers)
self._check_status_code(resp)
return resp.json()
@@ -207,7 +185,8 @@ class Client(object):
def _get_error_message(self, resp):
try:
message = resp.json()['title']
response_data = resp.json()
message = response_data['title']
except ValueError:
message = resp.content
return message

View File

@@ -1,256 +0,0 @@
# Copyright (c) 2013 Rackspace, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import abc
import json
import logging
from keystoneclient.auth.base import BaseAuthPlugin
from keystoneclient.v2_0 import client as ksclient
from keystoneclient import exceptions
from keystoneclient import session as ks_session
from keystoneclient import discover
import requests
import six
LOG = logging.getLogger(__name__)
class KeystoneAuthPluginWrapper(BaseAuthPlugin):
"""
This class is for backward compatibility only and is an
adapter for using barbican style auth_plugin in place of
the recommended keystone auth_plugin.
"""
def __init__(self, barbican_auth_plugin):
self.barbican_auth_plugin = barbican_auth_plugin
def get_token(self, session, **kwargs):
return self.barbican_auth_plugin.auth_token
def get_endpoint(self, session, **kwargs):
# NOTE(gyee): this is really a hack as Barbican auth plugin only
# cares about Barbican endpoint.
return self.barbican_auth_plugin.barbican_url
def _discover_keystone_info(auth_url):
# From the auth_url, figure the keystone client version to use
try:
disco = discover.Discover(auth_url=auth_url)
versions = disco.available_versions()
except:
error_msg = 'Error: failed to discover keystone version '\
'using auth_url: %s' % auth_url
raise ValueError(error_msg)
else:
# use the first one in the list
if len(versions) > 0:
version = versions[0]['id']
else:
error_msg = 'Error: Unable to discover a keystone plugin '\
'for the specified --os-auth-url.\n'\
'Please provide a valid auth url'
raise ValueError(error_msg)
try:
# the input auth_url may not have the version info in the
# url. get the correct auth_url from the versions
auth_url = versions[0]['links'][0]['href']
except:
raise ValueError('Error: Unable to discover the correct auth url')
return version, auth_url
def create_keystone_auth_session(args):
"""
Creates an authenticated keystone session using
the supplied arguments.
"""
version, auth_url = _discover_keystone_info(args.os_auth_url)
project_name = args.os_project_name or args.os_tenant_name
project_id = args.os_project_id or args.os_tenant_id
# FIXME(tsv): we are depending on the keystone version interface here.
# If keystone changes it, this code will need to be changed accordingly
if version == 'v2.0':
# create a V2 Password plugin
from keystoneclient.auth.identity import v2
auth_plugin = v2.Password(auth_url=auth_url,
username=args.os_username,
password=args.os_password,
tenant_name=project_name,
tenant_id=project_id)
elif version == 'v3.0':
# create a V3 Password plugin
from keystoneclient.auth.identity import v3
auth_plugin = v3.Password(auth_url=auth_url,
username=args.os_username,
user_id=args.os_user_id,
user_domain_name=args.os_user_domain_name,
user_domain_id=args.os_user_domain_id,
password=args.os_password,
project_id=project_id,
project_name=project_name,
project_domain_id=args.os_project_domain_id,
project_domain_name=args.
os_project_domain_name)
else:
raise ValueError('Error: unsupported keystone version!')
return ks_session.Session(auth=auth_plugin, verify=not args.insecure)
class AuthException(Exception):
"""Raised when authorization fails."""
pass
@six.add_metaclass(abc.ABCMeta)
class AuthPluginBase(object):
"""Base class for Auth plugins."""
@abc.abstractproperty
def auth_token(self):
"""
Returns a valid token to be used in X-Auth-Token header for
api requests.
"""
@abc.abstractproperty
def barbican_url(self):
"""
Returns the barbican endpoint url, including the version.
"""
class KeystoneAuthV2(AuthPluginBase):
def __init__(self, auth_url='', username='', password='',
tenant_name='', tenant_id='', insecure=False, keystone=None):
if not keystone:
tenant_info = tenant_name or tenant_id
if not all([auth_url, username, password, tenant_info]):
raise ValueError('Please provide auth_url, username, password,'
' and tenant_id or tenant_name.')
self._barbican_url = None
# TODO(dmend): make these configurable
self._service_type = 'keystore'
self._endpoint_type = 'publicURL'
self._keystone = keystone or ksclient.Client(username=username,
password=password,
tenant_name=tenant_name,
tenant_id=tenant_id,
auth_url=auth_url,
insecure=insecure)
self.tenant_name = self._keystone.tenant_name
self.tenant_id = self._keystone.tenant_id
@property
def auth_token(self):
return self._keystone.auth_token
@property
def barbican_url(self):
if not self._barbican_url:
try:
self._barbican_url = self._keystone.service_catalog.url_for(
attr='region',
filter_value=self._keystone.region_name,
service_type=self._service_type,
endpoint_type=self._endpoint_type
)
except exceptions.EmptyCatalog:
LOG.error('Keystone is reporting an empty catalog.')
raise AuthException('Empty keystone catalog.')
except exceptions.EndpointNotFound:
LOG.error('Barbican endpoint not found in keystone catalog.')
raise AuthException('Barbican endpoint not found.')
return self._barbican_url
class RackspaceAuthV2(AuthPluginBase):
def __init__(self, auth_url='', username='', api_key='', password=''):
if not all([auth_url, username, api_key or password]):
raise ValueError('Please provide auth_url, username, api_key or '
'password.')
self._auth_url = auth_url
self._username = username
self._api_key = api_key
self._password = password
self._auth_token = None
self._barbican_url = None
self.tenant_id = None
self._authenticate()
@property
def auth_token(self):
return self._auth_token
@property
def barbican_url(self):
return self._barbican_url
def _authenticate(self):
auth_url = '{0}/tokens'.format(self._auth_url)
headers = {'Accept': 'application/json',
'Content-Type': 'application/json'}
if self._api_key:
payload = self._authenticate_with_api_key()
else:
payload = self._authenticate_with_password()
r = requests.post(auth_url, data=json.dumps(payload), headers=headers)
try:
r.raise_for_status()
except requests.HTTPError:
msg = 'HTTPError ({0}): Unable to authenticate with Rackspace.'
msg = msg.format(r.status_code)
LOG.error(msg)
raise AuthException(msg)
try:
data = r.json()
except ValueError:
msg = 'Error parsing response from Rackspace Identity.'
LOG.error(msg)
raise AuthException(msg)
else:
# TODO(dmend): get barbican_url from catalog
self._auth_token = data['access']['token']['id']
self.tenant_id = data['access']['token']['tenant']['id']
def _authenticate_with_api_key(self):
return {
'auth': {
'RAX-KSKEY:apiKeyCredentials': {
'username': self._username,
'apiKey': self._api_key
}
}
}
def _authenticate_with_password(self):
return {
'auth': {
'passwordCredentials': {
'username': self._username,
'password': self._password
}
}
}

View File

@@ -1,161 +0,0 @@
# Copyright (c) 2013 Rackspace, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import json
import mock
import requests
import testtools
from barbicanclient.common import auth
class WhenTestingKeystoneAuthentication(testtools.TestCase):
def setUp(self):
super(WhenTestingKeystoneAuthentication, self).setUp()
self.keystone_client = mock.MagicMock()
self.auth_url = 'https://www.yada.com'
self.username = 'user'
self.password = 'pw'
self.tenant_name = 'tenant'
self.tenant_id = '1234'
self.keystone_auth = auth.KeystoneAuthV2(auth_url=self.auth_url,
username=self.username,
password=self.password,
tenant_name=self.tenant_name,
tenant_id=self.tenant_id,
keystone=self.keystone_client)
def test_endpoint_username_password_tenant_are_required(self):
with testtools.ExpectedException(ValueError):
auth.KeystoneAuthV2()
def test_nothing_is_required_if_keystone_is_present(self):
fake_keystone = mock.Mock(tenant_name='foo', tenant_id='bar')
keystone_auth = auth.KeystoneAuthV2(keystone=fake_keystone)
self.assertEqual('foo', keystone_auth.tenant_name)
self.assertEqual('bar', keystone_auth.tenant_id)
def test_get_barbican_url(self):
barbican_url = 'https://www.barbican.com'
self.keystone_auth._barbican_url = barbican_url
self.assertEquals(barbican_url, self.keystone_auth.barbican_url)
class WhenTestingRackspaceAuthentication(testtools.TestCase):
def setUp(self):
super(WhenTestingRackspaceAuthentication, self).setUp()
self._auth_url = 'https://auth.url.com'
self._username = 'username'
self._api_key = 'api_key'
self._auth_token = '078a50dcdc984a639bb287c8d4adf541'
self._tenant_id = '123456'
self._response = requests.Response()
self._response._content = json.dumps({
'access': {
'token': {
'id': self._auth_token,
'expires': '2013-12-19T23:06:17.047Z',
'tenant': {
'id': self._tenant_id,
'name': '123456'
}
}
}
}).encode("ascii")
patcher = mock.patch('barbicanclient.common.auth.requests.post')
self._mock_post = patcher.start()
self._mock_post.return_value = self._response
self.addCleanup(patcher.stop)
def test_auth_url_username_and_api_key_are_required(self):
with testtools.ExpectedException(ValueError):
auth.RackspaceAuthV2()
with testtools.ExpectedException(ValueError):
auth.RackspaceAuthV2(self._auth_url)
with testtools.ExpectedException(ValueError):
auth.RackspaceAuthV2(self._auth_url,
self._username)
with testtools.ExpectedException(ValueError):
auth.RackspaceAuthV2(self._auth_url,
api_key=self._api_key)
def test_tokens_is_appended_to_auth_url(self):
self._response.status_code = 200
identity = auth.RackspaceAuthV2(self._auth_url,
self._username,
api_key=self._api_key)
self._mock_post.assert_called_with(
'https://auth.url.com/tokens',
data=mock.ANY,
headers=mock.ANY)
def test_authenticate_with_api_key(self):
self._response.status_code = 200
with mock.patch(
'barbicanclient.common.auth.RackspaceAuthV2.'
'_authenticate_with_api_key'
) as mock_authenticate:
mock_authenticate.return_value = {}
identity = auth.RackspaceAuthV2(self._auth_url,
self._username,
api_key=self._api_key)
mock_authenticate.assert_called_once_with()
def test_authenticate_with_password(self):
self._response.status_code = 200
with mock.patch(
'barbicanclient.common.auth.RackspaceAuthV2.'
'_authenticate_with_password'
) as mock_authenticate:
mock_authenticate.return_value = {}
identity = auth.RackspaceAuthV2(self._auth_url,
self._username,
password='password')
mock_authenticate.assert_called_once_with()
def test_auth_exception_thrown_for_bad_status(self):
self._response.status_code = 400
with testtools.ExpectedException(auth.AuthException):
auth.RackspaceAuthV2(self._auth_url,
self._username,
api_key=self._api_key)
def test_error_raised_for_bad_response_from_server(self):
self._response._content = b'Not JSON'
self._response.status_code = 200
with testtools.ExpectedException(auth.AuthException):
auth.RackspaceAuthV2(self._auth_url,
self._username,
api_key=self._api_key)
def test_auth_token_is_set(self):
self._response.status_code = 200
identity = auth.RackspaceAuthV2(self._auth_url,
self._username,
api_key=self._api_key)
self.assertEqual(identity.auth_token, self._auth_token)
def test_tenant_id_is_set(self):
self._response.status_code = 200
identity = auth.RackspaceAuthV2(self._auth_url,
self._username,
api_key=self._api_key)
self.assertEqual(identity.tenant_id, self._tenant_id)

View File

@@ -102,13 +102,13 @@ class WhenTestingBarbicanCLI(test_client.BaseEntityResource):
@httpretty.activate
def test_should_succeed_if_noauth_with_valid_args_specified(self):
list_secrets_content = '{"secrets": [], "total": 0}'
list_secrets_url = '{0}secrets'.format(self.endpoint)
list_secrets_url = '{0}/v1/secrets'.format(self.endpoint)
httpretty.register_uri(
httpretty.GET, list_secrets_url,
body=list_secrets_content)
self._expect_success_code(
"--no-auth --endpoint {0} --os-tenant-id {1} secret list".
format(self.endpoint, self.tenant_id))
format(self.endpoint, self.project_id))
def test_should_error_if_required_keystone_auth_arguments_are_missing(
self):

View File

@@ -12,443 +12,263 @@
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import httpretty
import json
import mock
from oslo.serialization import jsonutils
import requests
import testtools
from barbicanclient import client
from barbicanclient.test import keystone_client_fixtures
from keystoneclient.auth.identity import v2
from keystoneclient.auth.identity import v3
class FakeAuth(object):
def __init__(self, auth_token, barbican_url, tenant_name, tenant_id):
self.auth_token = auth_token
self.barbican_url = barbican_url
self.tenant_name = tenant_name
self.tenant_id = tenant_id
class FakeResp(object):
def __init__(self, status_code, response_dict=None, content=None):
self.status_code = status_code
self.response_dict = response_dict
self.content = content
def json(self):
if self.response_dict is None:
return None
resp = self.response_dict
resp['title'] = 'some title here'
return resp
def content(self):
return self.content
class KeystonePasswordPlugins(object):
v2_auth_url = keystone_client_fixtures.V2_URL
v3_auth_url = keystone_client_fixtures.V3_URL
username = 'username'
password = 'password'
project_name = tenant_name = 'tenantname'
tenant_id = project_id = 'tenantid'
user_domain_name = 'udomain_name'
user_domain_id = 'udomain_id'
project_domain_name = 'pdomain_name'
project_domain_id = 'pdomain_id'
@classmethod
def get_v2_plugin(cls):
return v2.Password(auth_url=cls.v2_auth_url, username=cls.username,
password=cls.password, tenant_name=cls.tenant_name)
@classmethod
def get_v3_plugin(cls):
return v3.Password(auth_url=cls.v3_auth_url, username=cls.username,
password=cls.password,
project_name=cls.project_name,
user_domain_name=cls.user_domain_name,
project_domain_name=cls.project_domain_name)
class WhenTestingClientInit(testtools.TestCase):
class TestClient(testtools.TestCase):
def setUp(self):
super(WhenTestingClientInit, self).setUp()
self.auth_endpoint = 'https://localhost:5000/v2.0/'
self.auth_token = 'fake_auth_token'
self.user = 'user'
self.password = 'password'
self.tenant_name = 'tenant'
self.tenant_id = 'tenant_id'
super(TestClient, self).setUp()
self.endpoint = 'http://localhost:9311'
self.project_id = 'project_id'
self.client = client.Client(endpoint=self.endpoint,
project_id=self.project_id)
self.endpoint = 'http://localhost:9311/v1/'
self.fake_auth = FakeAuth(self.auth_token, self.endpoint,
self.tenant_name, self.tenant_id)
class WhenTestingClientInit(TestClient):
def _mock_response(self, content=None, status_code=200):
resp = requests.Response()
resp._content = content or b'{"title": "generic mocked response"}'
resp.status_code = status_code
return resp
def _get_fake_session(self):
sess = mock.MagicMock()
sess.get_endpoint.return_value = self.endpoint
return sess
def test_can_be_used_without_auth_plugin(self):
c = client.Client(auth_plugin=None, endpoint=self.endpoint,
tenant_id=self.tenant_id)
expected = self.endpoint.rstrip('/')
self.assertEqual(expected, c._base_url)
def test_can_be_used_without_a_session(self):
c = client.Client(endpoint=self.endpoint,
project_id=self.project_id)
self.assertIsNotNone(c._session)
def test_auth_token_header_is_set_when_using_auth_plugin(self):
c = client.Client(auth_plugin=self.fake_auth)
self.assertEqual(c._session.get_token(),
self.auth_token)
def test_api_version_is_appended_to_endpoint(self):
c = client.Client(endpoint=self.endpoint,
project_id=self.project_id)
self.assertEqual(c._base_url, 'http://localhost:9311/v1')
def test_error_thrown_when_no_auth_and_no_endpoint(self):
def test_default_headers_are_empty(self):
c = client.Client(session=self._get_fake_session())
self.assertIsInstance(c._default_headers, dict)
self.assertFalse(bool(c._default_headers))
def test_project_id_is_added_to_default_headers(self):
c = client.Client(endpoint=self.endpoint,
project_id=self.project_id)
self.assertIn('X-Project-Id', c._default_headers.keys())
self.assertEqual(c._default_headers['X-Project-Id'], self.project_id)
def test_error_thrown_when_no_session_and_no_endpoint(self):
self.assertRaises(ValueError, client.Client,
**{"tenant_id": self.tenant_id})
**{"project_id": self.project_id})
def test_error_thrown_when_no_auth_and_no_tenant_id(self):
def test_error_thrown_when_no_session_and_no_project_id(self):
self.assertRaises(ValueError, client.Client,
**{"endpoint": self.endpoint})
def test_client_strips_trailing_slash_from_endpoint(self):
c = client.Client(endpoint=self.endpoint, tenant_id=self.tenant_id)
self.assertEqual(c._barbican_url, self.endpoint.strip('/'))
c = client.Client(endpoint=self.endpoint + '/',
project_id=self.project_id)
self.assertEqual(c._barbican_endpoint, self.endpoint)
def test_base_url_starts_with_endpoint_url(self):
c = client.Client(auth_plugin=self.fake_auth)
c = client.Client(endpoint=self.endpoint, project_id=self.project_id)
self.assertTrue(c._base_url.startswith(self.endpoint))
def test_base_url_has_no_tenant_id(self):
c = client.Client(auth_plugin=self.fake_auth)
self.assertNotIn(self.tenant_id, c._base_url)
def test_base_url_ends_with_default_api_version(self):
c = client.Client(endpoint=self.endpoint, project_id=self.project_id)
self.assertTrue(c._base_url.endswith(client._DEFAULT_API_VERSION))
def test_should_raise_for_unauthorized_response(self):
resp = self._mock_response(status_code=401)
c = client.Client(auth_plugin=self.fake_auth)
self.assertRaises(client.HTTPAuthError, c._check_status_code, resp)
def test_should_raise_for_server_error(self):
resp = self._mock_response(status_code=500)
c = client.Client(auth_plugin=self.fake_auth)
self.assertRaises(client.HTTPServerError, c._check_status_code, resp)
def test_should_raise_for_client_errors(self):
resp = self._mock_response(status_code=400)
c = client.Client(auth_plugin=self.fake_auth)
self.assertRaises(client.HTTPClientError, c._check_status_code, resp)
def test_gets_endpoint_from_keystone_session(self):
c = client.Client(session=self._get_fake_session())
self.assertEqual(c._barbican_endpoint, self.endpoint)
class WhenTestingClientWithSession(testtools.TestCase):
class TestClientWithSession(testtools.TestCase):
def setUp(self):
super(WhenTestingClientWithSession, self).setUp()
self.endpoint = 'https://localhost:9311/v1/'
self.tenant_id = '1234567'
super(TestClientWithSession, self).setUp()
self.endpoint = 'http://localhost:9311'
self.entity = 'dummy-entity'
base = self.endpoint
self.entity_base = base + self.entity + "/"
self.entity_href = self.entity_base + \
'abcd1234-eabc-5678-9abc-abcdef012345'
self.entity_name = 'name'
self.entity_dict = {'name': self.entity_name}
self.session = mock.MagicMock()
self.client = client.Client(session=self.session,
endpoint=self.endpoint,
tenant_id=self.tenant_id)
def test_should_post(self):
self.session.request.return_value = mock.MagicMock(status_code=200)
self.session.request.return_value.json.return_value = {
'entity_ref': self.entity_href}
resp_dict = self.client._post(self.entity, self.entity_dict)
self.assertEqual(self.entity_href, resp_dict['entity_ref'])
# Verify the correct URL was used to make the call.
args, kwargs = self.session.request.call_args
url = args[1]
self.assertEqual(self.entity_base, url)
# Verify that correct information was sent in the call.
data = jsonutils.loads(kwargs['data'])
self.assertEqual(self.entity_name, data['name'])
def test_should_get(self):
self.session.request.return_value = mock.MagicMock(status_code=200)
self.session.request.return_value.json.return_value = {
'name': self.entity_name}
resp_dict = self.client._get(self.entity_href)
self.assertEqual(self.entity_name, resp_dict['name'])
# Verify the correct URL was used to make the call.
args, kwargs = self.session.request.call_args
url = args[1]
self.assertEqual(self.entity_href, url)
# Verify that correct information was sent in the call.
headers = kwargs['headers']
self.assertEqual('application/json', headers['Accept'])
def test_should_get_raw(self):
self.session.request.return_value = mock.MagicMock(status_code=200,
content='content')
headers = {'Accept': 'application/octet-stream'}
content = self.client._get_raw(self.entity_href, headers)
self.assertEqual('content', content)
# Verify the correct URL was used to make the call.
args, kwargs = self.session.request.call_args
url = args[1]
self.assertEqual(self.entity_href, url)
# Verify that correct information was sent in the call.
headers = kwargs['headers']
self.assertEqual('application/octet-stream', headers['Accept'])
def test_should_delete(self):
self.session.request.return_value = mock.MagicMock(status_code=200)
self.client._delete(self.entity_href)
# Verify the correct URL was used to make the call.
args, kwargs = self.session.request.call_args
url = args[1]
self.assertEqual(self.entity_href, url)
def _get_fake_session_with_status_code(self, status_code):
resp = mock.MagicMock()
resp.status_code = status_code
sess = mock.MagicMock()
sess.get.return_value = resp
sess.post.return_value = resp
sess.delete.return_value = resp
sess.get_endpoint.return_value = self.endpoint
return sess
class WhenTestingClientWithKeystoneV2(WhenTestingClientWithSession):
class WhenTestingClientPost(TestClientWithSession):
def setUp(self):
super(WhenTestingClientWithKeystoneV2, self).setUp()
super(WhenTestingClientPost, self).setUp()
self.session = self._get_fake_session_with_status_code(201)
self.client = client.Client(session=self.session)
@httpretty.activate
def test_should_get(self):
# emulate Keystone version discovery
httpretty.register_uri(httpretty.GET,
keystone_client_fixtures.V2_URL,
body=keystone_client_fixtures.V2_VERSION_ENTRY)
# emulate Keystone v2 token request
v2_token = keystone_client_fixtures.generate_v2_project_scoped_token()
httpretty.register_uri(httpretty.POST,
'{0}/tokens'.format(
keystone_client_fixtures.V2_URL),
body=json.dumps(v2_token))
auth_plugin = KeystonePasswordPlugins.get_v2_plugin()
c = client.Client(auth_plugin=auth_plugin)
# emulate list secrets
list_secrets_url = '{0}/secrets'.format(c._base_url)
httpretty.register_uri(
httpretty.GET,
list_secrets_url,
status=200,
body='{{"name": "{0}", "secret_ref": "{1}"}}'.format(
self.entity_name, self.entity_href))
resp = c._get(list_secrets_url)
self.assertEqual(self.entity_name, resp['name'])
self.assertEqual(self.entity_href, resp['secret_ref'])
def test_post_normalizes_url_with_traling_slash(self):
self.client._post(path='secrets', data={'test_data': 'test'})
args, kwargs = self.session.post.call_args
url = args[0]
self.assertTrue(url.endswith('/'))
@httpretty.activate
def test_should_post(self):
# emulate Keystone version discovery
httpretty.register_uri(httpretty.GET,
keystone_client_fixtures.V2_URL,
body=keystone_client_fixtures.V2_VERSION_ENTRY)
# emulate Keystone v2 token request
v2_token = keystone_client_fixtures.generate_v2_project_scoped_token()
httpretty.register_uri(httpretty.POST,
'{0}/tokens'.format(
keystone_client_fixtures.V2_URL),
body=json.dumps(v2_token))
auth_plugin = KeystonePasswordPlugins.get_v2_plugin()
c = client.Client(auth_plugin=auth_plugin)
# emulate list secrets
post_secret_url = '{0}/secrets/'.format(c._base_url)
httpretty.register_uri(
httpretty.POST,
post_secret_url,
status=200,
body='{{"name": "{0}", "secret_ref": "{1}"}}'.format(
self.entity_name, self.entity_href))
resp = c._post('secrets', '{"name":"test"}')
self.assertEqual(self.entity_name, resp['name'])
self.assertEqual(self.entity_href, resp['secret_ref'])
def test_post_includes_content_type_header_of_application_json(self):
self.client._post(path='secrets', data={'test_data': 'test'})
args, kwargs = self.session.post.call_args
headers = kwargs.get('headers')
self.assertIn('Content-Type', headers.keys())
self.assertEqual(headers['Content-Type'], 'application/json')
@httpretty.activate
def test_should_get_raw(self):
# emulate Keystone version discovery
httpretty.register_uri(httpretty.GET,
keystone_client_fixtures.V2_URL,
body=keystone_client_fixtures.V2_VERSION_ENTRY)
# emulate Keystone v2 token request
v2_token = keystone_client_fixtures.generate_v2_project_scoped_token()
httpretty.register_uri(httpretty.POST,
'{0}/tokens'.format(
keystone_client_fixtures.V2_URL),
body=json.dumps(v2_token))
auth_plugin = KeystonePasswordPlugins.get_v2_plugin()
c = client.Client(auth_plugin=auth_plugin)
# emulate list secrets
get_secret_url = '{0}/secrets/s1'.format(c._base_url)
httpretty.register_uri(
httpretty.GET,
get_secret_url,
status=200, body='content')
headers = {"Content-Type": "application/json"}
resp = c._get_raw(get_secret_url, headers)
self.assertEqual(b'content', resp)
def test_post_includes_default_headers(self):
self.client._default_headers = {'Test-Default-Header': 'test'}
self.client._post(path='secrets', data={'test_data': 'test'})
args, kwargs = self.session.post.call_args
headers = kwargs.get('headers')
self.assertIn('Test-Default-Header', headers.keys())
@httpretty.activate
def test_should_delete(self):
# emulate Keystone version discovery
httpretty.register_uri(httpretty.GET,
keystone_client_fixtures.V2_URL,
body=keystone_client_fixtures.V2_VERSION_ENTRY)
# emulate Keystone v2 token request
v2_token = keystone_client_fixtures.generate_v2_project_scoped_token()
httpretty.register_uri(httpretty.POST,
'{0}/tokens'.format(
keystone_client_fixtures.V2_URL),
body=json.dumps(v2_token))
auth_plugin = KeystonePasswordPlugins.get_v2_plugin()
c = client.Client(auth_plugin=auth_plugin)
# emulate list secrets
delete_secret_url = '{0}/secrets/s1'.format(c._base_url)
httpretty.register_uri(
httpretty.DELETE,
delete_secret_url,
status=201)
c._delete(delete_secret_url)
def test_post_checks_status_code(self):
self.client._check_status_code = mock.MagicMock()
self.client._post(path='secrets', data={'test_data': 'test'})
resp = self.session.post()
self.client._check_status_code.assert_called_with(resp)
class WhenTestingClientWithKeystoneV3(WhenTestingClientWithSession):
class WhenTestingClientGet(TestClientWithSession):
def setUp(self):
super(WhenTestingClientWithKeystoneV3, self).setUp()
super(WhenTestingClientGet, self).setUp()
self.session = self._get_fake_session_with_status_code(200)
self.client = client.Client(session=self.session)
self.headers = dict()
self.href = 'http://test_href'
@httpretty.activate
def test_should_get(self):
# emulate Keystone version discovery
httpretty.register_uri(httpretty.GET,
keystone_client_fixtures.V3_URL,
body=keystone_client_fixtures.V3_VERSION_ENTRY)
# emulate Keystone v3 token request
id, v3_token = keystone_client_fixtures.\
generate_v3_project_scoped_token()
httpretty.register_uri(httpretty.POST,
'{0}/auth/tokens'.format(
keystone_client_fixtures.V3_URL),
body=json.dumps(v3_token), x_subject_token=id)
auth_plugin = KeystonePasswordPlugins.get_v3_plugin()
c = client.Client(auth_plugin=auth_plugin)
# emulate list secrets
list_secrets_url = '{0}/secrets'.format(c._base_url)
httpretty.register_uri(
httpretty.GET,
list_secrets_url,
status=200,
body='{{"name": "{0}", "secret_ref": "{1}"}}'.format(
self.entity_name, self.entity_href))
resp = c._get(list_secrets_url)
self.assertEqual(self.entity_name, resp['name'])
self.assertEqual(self.entity_href, resp['secret_ref'])
def test_get_uses_href_as_is(self):
self.client._get(self.href)
args, kwargs = self.session.get.call_args
url = args[0]
self.assertEqual(url, self.href)
@httpretty.activate
def test_should_post(self):
# emulate Keystone version discovery
httpretty.register_uri(httpretty.GET,
keystone_client_fixtures.V3_URL,
body=keystone_client_fixtures.V3_VERSION_ENTRY)
# emulate Keystone v3 token request
id, v3_token = keystone_client_fixtures.\
generate_v3_project_scoped_token()
httpretty.register_uri(httpretty.POST,
'{0}/auth/tokens'.format(
keystone_client_fixtures.V3_URL),
body=json.dumps(v3_token),
x_subject_token=id)
auth_plugin = KeystonePasswordPlugins.get_v3_plugin()
c = client.Client(auth_plugin=auth_plugin)
# emulate list secrets
post_secret_url = '{0}/secrets/'.format(c._base_url)
httpretty.register_uri(
httpretty.POST,
post_secret_url,
status=200,
x_subject_token=id,
body='{{"name": "{0}", "secret_ref": "{1}"}}'.format(
self.entity_name, self.entity_href))
resp = c._post('secrets', '{"name":"test"}')
self.assertEqual(self.entity_name, resp['name'])
self.assertEqual(self.entity_href, resp['secret_ref'])
def test_get_passes_params(self):
params = object()
self.client._get(self.href, params=params)
args, kwargs = self.session.get.call_args
passed_params = kwargs.get('params')
self.assertIs(params, passed_params)
@httpretty.activate
def test_should_get_raw(self):
# emulate Keystone version discovery
httpretty.register_uri(httpretty.GET,
keystone_client_fixtures.V3_URL,
body=keystone_client_fixtures.V3_VERSION_ENTRY)
# emulate Keystone v3 token request
id, v3_token = keystone_client_fixtures.\
generate_v3_project_scoped_token()
httpretty.register_uri(httpretty.POST,
'{0}/auth/tokens'.format(
keystone_client_fixtures.V3_URL),
body=json.dumps(v3_token),
x_subject_token=id)
auth_plugin = KeystonePasswordPlugins.get_v3_plugin()
c = client.Client(auth_plugin=auth_plugin)
# emulate list secrets
get_secret_url = '{0}/secrets/s1'.format(c._base_url)
httpretty.register_uri(
httpretty.GET,
get_secret_url,
status=200, body='content')
headers = {"Content-Type": "application/json"}
resp = c._get_raw(get_secret_url, headers)
self.assertEqual(b'content', resp)
def test_get_includes_accept_header_of_application_json(self):
self.client._get(self.href)
args, kwargs = self.session.get.call_args
headers = kwargs.get('headers')
self.assertIn('Accept', headers.keys())
self.assertEqual(headers['Accept'], 'application/json')
@httpretty.activate
def test_should_delete(self):
# emulate Keystone version discovery
httpretty.register_uri(httpretty.GET,
keystone_client_fixtures.V3_URL,
body=keystone_client_fixtures.V3_VERSION_ENTRY)
# emulate Keystone v3 token request
id, v3_token = keystone_client_fixtures.\
generate_v3_project_scoped_token()
httpretty.register_uri(httpretty.POST,
'{0}/auth/tokens'.format(
keystone_client_fixtures.V3_URL),
body=json.dumps(v3_token),
x_subject_token=id)
auth_plugin = KeystonePasswordPlugins.get_v3_plugin()
c = client.Client(auth_plugin=auth_plugin)
# emulate list secrets
delete_secret_url = '{0}/secrets/s1'.format(c._base_url)
httpretty.register_uri(
httpretty.DELETE,
delete_secret_url,
status=201)
c._delete(delete_secret_url)
def test_get_includes_default_headers(self):
self.client._default_headers = {'Test-Default-Header': 'test'}
self.client._get(self.href)
args, kwargs = self.session.get.call_args
headers = kwargs.get('headers')
self.assertIn('Test-Default-Header', headers.keys())
def test_get_checks_status_code(self):
self.client._check_status_code = mock.MagicMock()
self.client._get(self.href)
resp = self.session.get()
self.client._check_status_code.assert_called_with(resp)
def test_get_raw_uses_href_as_is(self):
self.client._get_raw(self.href, self.headers)
args, kwargs = self.session.get.call_args
url = args[0]
self.assertEqual(url, self.href)
def test_get_raw_passes_headers(self):
self.client._get_raw(self.href, self.headers)
args, kwargs = self.session.get.call_args
headers = kwargs.get('headers')
self.assertIs(headers, self.headers)
def test_get_raw_includes_default_headers(self):
self.client._default_headers = {'Test-Default-Header': 'test'}
self.client._get_raw(self.href, self.headers)
self.assertIn('Test-Default-Header', self.headers.keys())
def test_get_raw_checks_status_code(self):
self.client._check_status_code = mock.MagicMock()
self.client._get_raw(self.href, self.headers)
resp = self.session.get()
self.client._check_status_code.assert_called_with(resp)
class WhenTestingClientDelete(TestClientWithSession):
def setUp(self):
super(WhenTestingClientDelete, self).setUp()
self.session = self._get_fake_session_with_status_code(200)
self.client = client.Client(session=self.session)
self.href = 'http://test_href'
def test_delete_uses_href_as_is(self):
self.client._delete(self.href)
args, kwargs = self.session.delete.call_args
url = args[0]
self.assertEqual(url, self.href)
def test_delete_passes_json(self):
json = '{"test": "test"}'
self.client._delete(self.href, json=json)
args, kwargs = self.session.delete.call_args
passed_json = kwargs.get('json')
self.assertEqual(passed_json, json)
def test_delete_includes_default_headers(self):
self.client._default_headers = {'Test-Default-Header': 'test'}
self.client._delete(self.href)
args, kwargs = self.session.delete.call_args
headers = kwargs.get('headers')
self.assertIn('Test-Default-Header', headers.keys())
def test_delete_checks_status_code(self):
self.client._check_status_code = mock.MagicMock()
self.client._delete(self.href)
resp = self.session.get()
self.client._check_status_code.assert_called_with(resp)
class WhenTestingCheckStatusCodes(TestClient):
def test_raises_http_auth_error_for_401_response(self):
resp = mock.MagicMock()
resp.status_code = 401
self.assertRaises(client.HTTPAuthError, self.client._check_status_code,
resp)
def test_raises_http_server_error_for_500_response(self):
resp = mock.MagicMock()
resp.status_code = 500
self.assertRaises(client.HTTPServerError,
self.client._check_status_code, resp)
def test_raises_http_client_error_for_400_response(self):
resp = mock.MagicMock()
resp.status_code = 400
self.assertRaises(client.HTTPClientError,
self.client._check_status_code, resp)
class WhenTestingGetErrorMessage(TestClient):
def test_gets_error_message_from_title_in_json(self):
resp = mock.MagicMock()
resp.json.return_value = {'title': 'test_text'}
msg = self.client._get_error_message(resp)
self.assertEqual(msg, 'test_text')
def test_gets_error_message_from_content_when_no_json(self):
resp = mock.MagicMock()
resp.json.side_effect = ValueError()
resp.content = content = 'content'
msg = self.client._get_error_message(resp)
self.assertEqual(msg, content)
class BaseEntityResource(testtools.TestCase):
@@ -487,13 +307,13 @@ class BaseEntityResource(testtools.TestCase):
def _setUp(self, entity):
super(BaseEntityResource, self).setUp()
self.endpoint = 'https://localhost:9311/v1/'
self.tenant_id = '1234567'
self.endpoint = 'http://localhost:9311'
self.project_id = '1234567'
self.entity = entity
self.entity_base = self.endpoint + self.entity + "/"
self.entity_base = self.endpoint + "/" + self.entity + "/"
self.entity_href = self.entity_base + \
'abcd1234-eabc-5678-9abc-abcdef012345'
self.api = mock.MagicMock()
self.api._base_url = self.endpoint[:-1]
self.api._base_url = self.endpoint

View File

@@ -92,7 +92,7 @@ class WhenTestingContainers(test_client.BaseEntityResource):
self.api.secrets.Secret.return_value = self.container.secret
self.manager = containers.ContainerManager(self.api)
self.consumers_post_resource = (
self.entity_href.replace(self.endpoint, '') + '/consumers'
self.entity_href.replace(self.endpoint + '/', '') + '/consumers'
)
self.consumers_delete_resource = (
self.entity_href + '/consumers'