Add Containers to python-barbicanclient

Adding Container support to the client and CLI. Also adding Consumer
Registration support at the same time (though not in the CLI).

Change-Id: Id34ac86cfae3f155e97760f14d8cf256722c7d6f
Implements: blueprint client-add-containers
This commit is contained in:
Adam Harwell
2014-08-20 19:05:26 -05:00
parent 34565d7132
commit 87ca6d750a
8 changed files with 1385 additions and 12 deletions

View File

@@ -17,8 +17,6 @@
Command-line interface to the Barbican API.
"""
import argparse
import logging
import sys
from cliff import app
@@ -30,7 +28,7 @@ from barbicanclient import version
class Barbican(app.App):
"""Barbican comand line interface."""
"""Barbican command line interface."""
def __init__(self, **kwargs):
super(Barbican, self).__init__(

View File

@@ -0,0 +1,167 @@
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Command-line interface sub-commands related to containers.
"""
from cliff import command
from cliff import lister
from cliff import show
import six
from barbicanclient.barbican_cli.formatter import EntityFormatter
from barbicanclient.containers import RSAContainer, CertificateContainer
class ContainerFormatter(EntityFormatter):
columns = ("Container href",
"Name",
"Created",
"Status",
"Type",
"Secrets",
"Consumers",
)
def _get_formatted_data(self, entity):
formatted_secrets = '\n'.join(
(s.secret_ref for n, s in six.iteritems(entity.secrets))
)
formatted_consumers = '\n'.join(
(str(c) for c in entity.consumers)
)
data = (entity.container_ref,
entity.name,
entity.created,
entity.status,
entity._type,
formatted_secrets,
formatted_consumers,
)
return data
class DeleteContainer(command.Command):
"""Delete a container by providing its href."""
def get_parser(self, prog_name):
parser = super(DeleteContainer, self).get_parser(prog_name)
parser.add_argument('URI', help='The URI reference for the container')
return parser
def take_action(self, args):
self.app.client.containers.delete(args.URI)
class GetContainer(show.ShowOne, ContainerFormatter):
"""Retrieve a container by providing its URI."""
def get_parser(self, prog_name):
parser = super(GetContainer, self).get_parser(prog_name)
parser.add_argument('URI', help='The URI reference for the container.')
return parser
def take_action(self, args):
entity = self.app.client.containers.get(args.URI)
return self._get_formatted_entity(entity)
class ListContainer(lister.Lister, ContainerFormatter):
"""List containers."""
def get_parser(self, prog_name):
parser = super(ListContainer, self).get_parser(prog_name)
parser.add_argument('--limit', '-l', default=10,
help='specify the limit to the number of items '
'to list per page (default: %(default)s; '
'maximum: 100)',
type=int)
parser.add_argument('--offset', '-o', default=0,
help='specify the page offset '
'(default: %(default)s)',
type=int)
parser.add_argument('--name', '-n', default=None,
help='specify the container name '
'(default: %(default)s)')
parser.add_argument('--type', '-t', default=None,
help='specify the type filter for the list '
'(default: %(default)s).')
return parser
def take_action(self, args):
obj_list = self.app.client.containers.list(args.limit, args.offset,
args.name, args.type)
return self._list_objects(obj_list)
class CreateContainer(show.ShowOne, ContainerFormatter):
"""Store a container in Barbican."""
def get_parser(self, prog_name):
parser = super(CreateContainer, self).get_parser(prog_name)
parser.add_argument('--name', '-n',
help='a human-friendly name.')
parser.add_argument('--type', default='generic',
help='type of container to create (default: '
'%(default)s).')
parser.add_argument('--secret', '-s', action='append',
help='one secret to store in a container '
'(can be set multiple times). Example: '
'--secret "private_key='
'https://url.test/v1/secrets/1-2-3-4"')
return parser
def take_action(self, args):
container_type = self.app.client.containers.container_map.get(
args.type)
if not container_type:
raise ValueError('Invalid container type specified.')
secret_refs = CreateContainer._parse_secrets(args.secret)
if container_type is RSAContainer:
public_key_ref = secret_refs.get('public_key')
private_key_ref = secret_refs.get('private_key')
private_key_pass_ref = secret_refs.get('private_key_passphrase')
entity = RSAContainer(
api=self.app.client,
name=args.name,
public_key_ref=public_key_ref,
private_key_ref=private_key_ref,
private_key_passphrase_ref=private_key_pass_ref,
)
elif container_type is CertificateContainer:
certificate_ref = secret_refs.get('certificate')
intermediates_ref = secret_refs.get('intermediates')
private_key_ref = secret_refs.get('private_key')
private_key_pass_ref = secret_refs.get('private_key_passphrase')
entity = CertificateContainer(
api=self.app.client,
name=args.name,
certificate_ref=certificate_ref,
intermediates_ref=intermediates_ref,
private_key_ref=private_key_ref,
private_key_passphrase_ref=private_key_pass_ref,
)
else:
entity = container_type(api=self.app.client, name=args.name,
secret_refs=secret_refs)
entity.store()
return self._get_formatted_entity(entity)
@staticmethod
def _parse_secrets(secrets):
if not secrets:
raise ValueError("Must supply at least one secret.")
return dict(
(s.split('=')[0], s.split('=')[1])
for s in secrets if s.count('=') is 1
)

View File

@@ -34,6 +34,13 @@ def validate_ref(ref, entity):
raise ValueError('{0} incorrectly specified.'.format(entity))
def indent_object_string(string, spaces=8):
return '\n'.join(
['{0}{1}'.format(' ' * spaces, line)
for line in str(string).split('\n') if line]
)
class ImmutableException(Exception):
def __init__(self, attribute=None):
message = "This object is immutable!"

View File

@@ -17,11 +17,11 @@ import logging
import os
from keystoneclient.auth.base import BaseAuthPlugin
from keystoneclient import exceptions
from keystoneclient import session as ks_session
from barbicanclient.common.auth import KeystoneAuthPluginWrapper
from barbicanclient.openstack.common.gettextutils import _
from barbicanclient import containers
from barbicanclient import orders
from barbicanclient import secrets
@@ -100,6 +100,7 @@ class Client(object):
self.base_url = '{0}'.format(self._barbican_url)
self.secrets = secrets.SecretManager(self)
self.orders = orders.OrderManager(self)
self.containers = containers.ContainerManager(self)
def _wrap_session_with_keystone_if_required(self, session, insecure):
# if session is not a keystone session, wrap it
@@ -157,7 +158,7 @@ class Client(object):
return auth_plugin.tenant_id
def _prepare_auth(self, headers):
if headers and not self._session.auth:
if isinstance(headers, dict) and not self._session.auth:
headers['X-Project-Id'] = self._tenant_id
def get(self, href, params=None):
@@ -173,10 +174,10 @@ class Client(object):
self._check_status_code(resp)
return resp.content
def delete(self, href):
def delete(self, href, json=None):
headers = {}
self._prepare_auth(headers)
resp = self._session.delete(href, headers=headers)
resp = self._session.delete(href, headers=headers, json=json)
self._check_status_code(resp)
def post(self, path, data):

View File

@@ -0,0 +1,653 @@
# Copyright (c) 2014 Rackspace, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import functools
import logging
import six
from barbicanclient import base
from barbicanclient import secrets
from barbicanclient.openstack.common.timeutils import parse_isotime
LOG = logging.getLogger(__name__)
def _immutable_after_save(func):
@functools.wraps(func)
def wrapper(self, *args):
if hasattr(self, '_container_ref') and self._container_ref:
raise base.ImmutableException()
return func(self, *args)
return wrapper
class Container(object):
"""
Containers are used to keep track of the data stored in Barbican.
"""
entity = 'containers'
_type = 'generic'
def __init__(self, api, name=None, secrets=None, consumers=None,
container_ref=None, created=None, updated=None, status=None,
secret_refs=None):
self._api = api
self._name = name
self._secret_refs = secret_refs
self._cached_secrets = dict()
self._initialize_secrets(secrets)
self._consumers = consumers if consumers else list()
self._container_ref = container_ref
self._created = parse_isotime(created) if created else None
self._updated = parse_isotime(updated) if updated else None
self._status = status
def _initialize_secrets(self, secrets):
try:
self._fill_secrets_from_secret_refs()
except Exception:
raise ValueError("One or more of the provided secret_refs could "
"not be retrieved!")
if secrets:
try:
for name, secret in six.iteritems(secrets):
self.add(name, secret)
except Exception:
raise ValueError("One or more of the provided secrets are not "
"valid Secret objects!")
def _fill_secrets_from_secret_refs(self):
if self._secret_refs:
self._cached_secrets = dict(
(name.lower(), self._api.secrets.Secret(secret_ref=secret_ref))
for name, secret_ref in six.iteritems(self._secret_refs)
)
@property
def container_ref(self):
return self._container_ref
@property
def name(self):
if self._container_ref and not self._name:
self._reload()
return self._name
@property
def created(self):
return self._created
@property
def updated(self):
return self._updated
@property
def status(self):
if self._container_ref and not self._status:
self._reload()
return self._status
@property
def secret_refs(self):
if self._cached_secrets:
self._secret_refs = dict(
(name, secret.secret_ref)
for name, secret in six.iteritems(self._cached_secrets)
)
return self._secret_refs
@property
def secrets(self, cache=True):
if not self._cached_secrets or not cache:
self._fill_secrets_from_secret_refs()
return self._cached_secrets
@property
def consumers(self):
return self._consumers
@name.setter
@_immutable_after_save
def name(self, value):
self._name = value
@_immutable_after_save
def add(self, name, secret):
if not isinstance(secret, secrets.Secret):
raise ValueError("Must provide a valid Secret object")
if name.lower() in self.secrets:
raise KeyError("A secret with this name already exists!")
self._cached_secrets[name.lower()] = secret
@_immutable_after_save
def remove(self, name):
self._cached_secrets.pop(name.lower(), None)
if self._secret_refs:
self._secret_refs.pop(name.lower(), None)
@_immutable_after_save
def store(self):
secret_refs = self._get_secrets_and_store_them_if_necessary()
container_dict = base.filter_empty_keys({
'name': self.name,
'type': self._type,
'secret_refs': secret_refs
})
LOG.debug("Request body: {0}".format(container_dict))
# Save, store container_ref and return
response = self._api.post(self.entity, container_dict)
if response:
self._container_ref = response['container_ref']
return self.container_ref
def delete(self):
if self._container_ref:
self._api.delete(self._container_ref)
self._container_ref = None
self._status = None
self._created = None
self._updated = None
else:
raise LookupError("Secret is not yet stored.")
def _get_secrets_and_store_them_if_necessary(self):
# Save all secrets if they are not yet saved
LOG.debug("Storing secrets: {0}".format(self.secrets))
secret_refs = []
for name, secret in six.iteritems(self.secrets):
if secret and not secret.secret_ref:
secret.store()
secret_refs.append({'name': name, 'secret_ref': secret.secret_ref})
return secret_refs
def _reload(self):
if not self._container_ref:
raise AttributeError("container_ref not set, cannot reload data.")
LOG.debug('Getting container - Container href: {0}'
.format(self._container_ref))
base.validate_ref(self._container_ref, 'Container')
try:
response = self._api.get(self._container_ref)
except AttributeError:
raise LookupError('Container {0} could not be found.'
.format(self._container_ref))
self._name = response.get('name')
self._consumers = response.get('consumers', [])
created = response.get('created')
updated = response.get('updated')
self._created = parse_isotime(created) if created else None
self._updated = parse_isotime(updated) if updated else None
self._status = response.get('status')
def _get_named_secret(self, name):
return self.secrets.get(name)
def __str__(self):
sec = ['"{0}":\n{1}'.format(s.get('name'),
base.indent_object_string(s.get('secret'),
4))
for s in six.iteritems(self.secrets)]
return ("Container:\n"
" href: {0}\n"
" name: {1}\n"
" created: {2}\n"
" status: {3}\n"
" type: {4}\n"
" secrets:\n"
"{5}\n"
" consumers: {6}\n"
.format(self.container_ref, self.name, self.created,
self.status, self._type,
base.indent_object_string('\n'.join(sec)),
self.consumers)
)
def __repr__(self):
return 'Container(name="{0}")'.format(self.name)
class RSAContainer(Container):
_required_secrets = ["public_key", "private_key"]
_optional_secrets = ["private_key_passphrase"]
_type = 'rsa'
def __init__(self, api, name=None, public_key=None, private_key=None,
private_key_passphrase=None, consumers=[], container_ref=None,
created=None, updated=None, status=None, public_key_ref=None,
private_key_ref=None, private_key_passphrase_ref=None):
secret_refs = {}
if public_key_ref:
secret_refs['public_key'] = public_key_ref
if private_key_ref:
secret_refs['private_key'] = private_key_ref
if private_key_passphrase_ref:
secret_refs['private_key_passphrase'] = private_key_passphrase_ref
super(RSAContainer, self).__init__(
api=api,
name=name,
consumers=consumers,
container_ref=container_ref,
created=created,
updated=updated,
status=status,
secret_refs=secret_refs
)
if public_key:
self.public_key = public_key
if private_key:
self.private_key = private_key
if private_key_passphrase:
self.private_key_passphrase = private_key_passphrase
@property
def public_key(self):
return self._get_named_secret("public_key")
@property
def private_key(self):
return self._get_named_secret("private_key")
@property
def private_key_passphrase(self):
return self._get_named_secret("private_key_passphrase")
@public_key.setter
@_immutable_after_save
def public_key(self, value):
super(RSAContainer, self).remove("public_key")
super(RSAContainer, self).add("public_key", value)
@private_key.setter
@_immutable_after_save
def private_key(self, value):
super(RSAContainer, self).remove("private_key")
super(RSAContainer, self).add("private_key", value)
@private_key_passphrase.setter
@_immutable_after_save
def private_key_passphrase(self, value):
super(RSAContainer, self).remove("private_key_passphrase")
super(RSAContainer, self).add("private_key_passphrase", value)
def add(self, name, sec):
raise NotImplementedError("`add()` is not implemented for "
"Typed Containers")
def __repr__(self):
return 'RSAContainer(name="{0}")'.format(self.name)
def __str__(self):
return ("RSAContainer:\n"
" href: {0}\n"
" name: {1}\n"
" created: {2}\n"
" status: {3}\n"
" public_key:\n"
"{4}\n"
" private_key:\n"
"{5}\n"
" private_key_passphrase:\n"
"{6}\n"
" consumers: {7}\n"
.format(self.container_ref, self.name, self.created,
self.status,
base.indent_object_string(self.public_key),
base.indent_object_string(self.private_key),
base.indent_object_string(self.private_key_passphrase),
self.consumers)
)
class CertificateContainer(Container):
_required_secrets = ["certificate", "private_key"]
_optional_secrets = ["private_key_passphrase", "intermediates"]
_type = 'certificate'
def __init__(self, api, name=None, certificate=None, intermediates=None,
private_key=None, private_key_passphrase=None, consumers=[],
container_ref=None, created=None, updated=None, status=None,
certificate_ref=None, intermediates_ref=None,
private_key_ref=None, private_key_passphrase_ref=None):
secret_refs = {}
if certificate_ref:
secret_refs['certificate'] = certificate_ref
if intermediates_ref:
secret_refs['intermediates'] = intermediates_ref
if private_key_ref:
secret_refs['private_key'] = private_key_ref
if private_key_passphrase_ref:
secret_refs['private_key_passphrase'] = private_key_passphrase_ref
super(CertificateContainer, self).__init__(
api=api,
name=name,
consumers=consumers,
container_ref=container_ref,
created=created,
updated=updated,
status=status,
secret_refs=secret_refs
)
if certificate:
self.certificate = certificate
if intermediates:
self.intermediates = intermediates
if private_key:
self.private_key = private_key
if private_key_passphrase:
self.private_key_passphrase = private_key_passphrase
@property
def certificate(self):
return self._get_named_secret("certificate")
@property
def private_key(self):
return self._get_named_secret("private_key")
@property
def private_key_passphrase(self):
return self._get_named_secret("private_key_passphrase")
@property
def intermediates(self):
return self._get_named_secret("intermediates")
@certificate.setter
@_immutable_after_save
def certificate(self, value):
super(CertificateContainer, self).remove("certificate")
super(CertificateContainer, self).add("certificate", value)
@private_key.setter
@_immutable_after_save
def private_key(self, value):
super(CertificateContainer, self).remove("private_key")
super(CertificateContainer, self).add("private_key", value)
@private_key_passphrase.setter
@_immutable_after_save
def private_key_passphrase(self, value):
super(CertificateContainer, self).remove("private_key_passphrase")
super(CertificateContainer, self).add("private_key_passphrase", value)
@intermediates.setter
@_immutable_after_save
def intermediates(self, value):
super(CertificateContainer, self).remove("intermediates")
super(CertificateContainer, self).add("intermediates", value)
def add(self, name, sec):
raise NotImplementedError("`add()` is not implemented for "
"Typed Containers")
def __repr__(self):
return 'CertificateContainer(name="{0}")'.format(self.name)
def __str__(self):
return ("CertificateContainer:\n"
" href: {0}\n"
" name: {1}\n"
" created: {2}\n"
" status: {3}\n"
" certificate:\n"
"{4}\n"
" private_key:\n"
"{5}\n"
" private_key_passphrase:\n"
"{6}\n"
" intermediates:\n"
"{7}\n"
" consumers: {8}\n"
.format(self.container_ref, self.name, self.created,
self.status,
base.indent_object_string(self.certificate),
base.indent_object_string(self.private_key),
base.indent_object_string(self.private_key_passphrase),
base.indent_object_string(self.intermediates),
self.consumers)
)
class ContainerManager(base.BaseEntityManager):
container_map = {
'generic': Container,
'rsa': RSAContainer,
'certificate': CertificateContainer
}
def __init__(self, api):
super(ContainerManager, self).__init__(api, 'containers')
def get(self, container_ref):
"""Get a Container
:param container_ref: Full HATEOAS reference to a Container
:returns: Container object or a subclass of the appropriate type
"""
LOG.debug('Getting container - Container href: {0}'
.format(container_ref))
base.validate_ref(container_ref, 'Container')
try:
response = self.api.get(container_ref)
except AttributeError:
raise LookupError('Container {0} could not be found.'
.format(container_ref))
return self._generate_typed_container(response)
def _generate_typed_container(self, response):
resp_type = response.get('type', '').lower()
container_type = self.container_map.get(resp_type)
if not container_type:
raise TypeError('Unknown container type "{0}".'
.format(resp_type))
name = response.get('name')
consumers = response.get('consumers', [])
container_ref = response.get('container_ref')
created = response.get('created')
updated = response.get('updated')
status = response.get('status')
secret_refs = self._translate_secret_refs_from_json(
response.get('secret_refs')
)
if container_type is RSAContainer:
public_key_ref = secret_refs.get('public_key')
private_key_ref = secret_refs.get('private_key')
private_key_pass_ref = secret_refs.get('private_key_passphrase')
return RSAContainer(
api=self.api,
name=name,
consumers=consumers,
container_ref=container_ref,
created=created,
updated=updated,
status=status,
public_key_ref=public_key_ref,
private_key_ref=private_key_ref,
private_key_passphrase_ref=private_key_pass_ref,
)
elif container_type is CertificateContainer:
certificate_ref = secret_refs.get('certificate')
intermediates_ref = secret_refs.get('intermediates')
private_key_ref = secret_refs.get('private_key')
private_key_pass_ref = secret_refs.get('private_key_passphrase')
return CertificateContainer(
api=self.api,
name=name,
consumers=consumers,
container_ref=container_ref,
created=created,
updated=updated,
status=status,
certificate_ref=certificate_ref,
intermediates_ref=intermediates_ref,
private_key_ref=private_key_ref,
private_key_passphrase_ref=private_key_pass_ref,
)
return container_type(
api=self.api,
name=name,
secret_refs=secret_refs,
consumers=consumers,
container_ref=container_ref,
created=created,
updated=updated,
status=status
)
@staticmethod
def _translate_secret_refs_from_json(json_refs):
return dict(
(ref_pack.get('name'), ref_pack.get('secret_ref'))
for ref_pack in json_refs
)
def create(self, name=None, secrets=None):
"""
Container creation method
:param name: A friendly name for the Container
:param secrets: Secrets to populate when creating a Container
:returns: Container
"""
return Container(
api=self.api,
name=name,
secrets=secrets
)
def create_rsa(self, name=None, public_key=None, private_key=None,
private_key_passphrase=None):
"""
RSAContainer creation method
:param name: A friendly name for the RSAContainer
:param public_key: Secret object containing a Public Key
:param private_key: Secret object containing a Private Key
:param private_key_passphrase: Secret object containing a passphrase
:returns: RSAContainer
"""
return RSAContainer(
api=self.api,
name=name,
public_key=public_key,
private_key=private_key,
private_key_passphrase=private_key_passphrase
)
def create_certificate(self, name=None, certificate=None,
intermediates=None, private_key=None,
private_key_passphrase=None):
"""
CertificateContainer creation method
:param name: A friendly name for the CertificateContainer
:param certificate: Secret object containing a Certificate
:param intermediates: Secret object containing Intermediate Certs
:param private_key: Secret object containing a Private Key
:param private_key_passphrase: Secret object containing a passphrase
:returns: CertificateContainer
"""
return CertificateContainer(
api=self.api,
name=name,
certificate=certificate,
intermediates=intermediates,
private_key=private_key,
private_key_passphrase=private_key_passphrase
)
def delete(self, container_ref):
"""
Deletes a container
:param container_ref: Full HATEOAS reference to a Container
"""
if not container_ref:
raise ValueError('container_ref is required.')
try:
self.api.delete(container_ref)
except AttributeError:
raise LookupError('Container {0} could not be deleted. '
'Does it still exist?'.format(container_ref))
def list(self, limit=10, offset=0, name=None, type=None):
"""
List all containers for the tenant
:param limit: Max number of containers returned
:param offset: Offset containers to begin list
:param name: Name filter for the list
:param type: Type filter for the list
:returns: list of Container metadata objects
"""
LOG.debug('Listing containers - offset {0} limit {1} name {2} type {3}'
.format(offset, limit, name, type))
href = '{0}/{1}'.format(self.api.base_url, self.entity)
params = {'limit': limit, 'offset': offset}
if name:
params['name'] = name
if type:
params['type'] = type
response = self.api.get(href, params)
return [self._generate_typed_container(container)
for container in response.get('containers', [])]
def register_consumer(self, container_ref, name, url):
"""
Add a consumer to the container
:param container_ref: Full HATEOAS reference to a Container
:param name: Name of the consuming service
:param url: URL of the consuming resource
:returns: A container object per the get() method
"""
LOG.debug('Creating consumer registration for container '
'{0} as {1}: {2}'.format(container_ref, name, url))
href = '{0}/{1}/consumers'.format(self.entity,
container_ref.split('/')[-1])
consumer_dict = dict()
consumer_dict['name'] = name
consumer_dict['URL'] = url
response = self.api.post(href, consumer_dict)
return self._generate_typed_container(response)
def remove_consumer(self, container_ref, name, url):
"""
Remove a consumer from the container
:param container_ref: Full HATEOAS reference to a Container
:param name: Name of the previously consuming service
:param url: URL of the previously consuming resource
"""
LOG.debug('Deleting consumer registration for container '
'{0} as {1}: {2}'.format(container_ref, name, url))
href = '{0}/{1}/{2}/consumers'.format(self.api.base_url, self.entity,
container_ref.split('/')[-1])
consumer_dict = {
'name': name,
'URL': url
}
self.api.delete(href, json=consumer_dict)

View File

@@ -13,11 +13,11 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import mock
import httpretty
import json
import mock
import requests
import testtools
import json
from barbicanclient import client
from barbicanclient.test import keystone_client_fixtures
@@ -453,16 +453,47 @@ class WhenTestingClientWithKeystoneV3(WhenTestingClientWithSession):
class BaseEntityResource(testtools.TestCase):
# TODO: The compatibility of unittest between versions is horrible
# Reported as https://bugs.launchpad.net/testtools/+bug/1373139
if hasattr(testtools.TestCase, 'assertItemsEqual'):
# If this function is available, do nothing (PY27)
pass
elif hasattr(testtools.TestCase, 'assertCountEqual'):
# If this function is available, alias it (PY32+)
assertItemsEqual = testtools.TestCase.assertCountEqual
else:
# If neither is available, make our own version (PY26, PY30-31)
def assertItemsEqual(self, expected_seq, actual_seq, msg=None):
first_seq, second_seq = list(expected_seq), list(actual_seq)
differences = []
for item in first_seq:
if item not in second_seq:
differences.append(item)
for item in second_seq:
if item not in first_seq:
differences.append(item)
if differences:
if not msg:
msg = "Items differ: {0}".format(differences)
self.fail(msg)
if len(first_seq) != len(second_seq):
if not msg:
msg = "Size of collection differs: {0} != {1}".format(
len(first_seq), len(second_seq)
)
self.fail(msg)
def _setUp(self, entity):
super(BaseEntityResource, self).setUp()
self.endpoint = 'https://localhost:9311/v1/'
self.tenant_id = '1234567'
self.entity = entity
base = self.endpoint + self.tenant_id + "/"
self.entity_base = base + self.entity + "/"
self.entity_base = self.endpoint + self.entity + "/"
self.entity_href = self.entity_base + \
'abcd1234-eabc-5678-9abc-abcdef012345'
self.api = mock.MagicMock()
self.api.base_url = base[:-1]
self.api.base_url = self.endpoint[:-1]

View File

@@ -0,0 +1,511 @@
# Copyright (c) 2013 Rackspace, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import mock
from barbicanclient.test import test_client
from barbicanclient import base, containers, secrets
from barbicanclient.openstack.common import timeutils
class ContainerData(object):
def __init__(self):
self.name = 'Self destruction sequence'
self.type = 'generic'
self.secret = mock.Mock(spec=secrets.Secret)
self.secret.__bases__ = (secrets.Secret,)
self.secret.secret_ref = 'http://a/b/1'
self.secret.name = 'thing1'
self.generic_secret_refs = {self.secret.name: self.secret.secret_ref}
self.generic_secret_refs_json = [{'name': self.secret.name,
'secret_ref': self.secret.secret_ref}]
self.generic_secrets = {self.secret.name: self.secret}
self.rsa_secret_refs = {
'private_key': self.secret.secret_ref,
'public_key': self.secret.secret_ref,
'private_key_passphrase': self.secret.secret_ref,
}
self.rsa_secret_refs_json = [
{'name': 'private_key',
'secret_ref': self.secret.secret_ref},
{'name': 'public_key',
'secret_ref': self.secret.secret_ref},
{'name': 'private_key_passphrase',
'secret_ref': self.secret.secret_ref},
]
self.certificate_secret_refs = {
'certificate': self.secret.secret_ref,
'private_key': self.secret.secret_ref,
'private_key_passphrase': self.secret.secret_ref,
'intermediates': self.secret.secret_ref,
}
self.certificate_secret_refs_json = [
{'name': 'certificate',
'secret_ref': self.secret.secret_ref},
{'name': 'private_key',
'secret_ref': self.secret.secret_ref},
{'name': 'private_key_passphrase',
'secret_ref': self.secret.secret_ref},
{'name': 'intermediates',
'secret_ref': self.secret.secret_ref},
]
self.created = str(timeutils.utcnow())
self.consumer = {'name': 'testing', 'URL': 'http://c.d/e'}
self.container_dict = {'name': self.name,
'status': 'ACTIVE',
'created': self.created}
def get_dict(self, container_ref=None, type='generic', consumers=None):
container = self.container_dict
if container_ref:
container['container_ref'] = container_ref
container['type'] = type
if type == 'rsa':
container['secret_refs'] = self.rsa_secret_refs_json
elif type == 'certificate':
container['secret_refs'] = self.certificate_secret_refs_json
else:
container['secret_refs'] = self.generic_secret_refs_json
if consumers:
container['consumers'] = consumers
return container
class WhenTestingContainers(test_client.BaseEntityResource):
def setUp(self):
self._setUp('containers')
self.container = ContainerData()
self.api.secrets.Secret.return_value = self.container.secret
self.manager = containers.ContainerManager(self.api)
self.consumers_post_resource = (
self.entity_href.replace(self.endpoint, '') + '/consumers'
)
self.consumers_delete_resource = (
self.entity_href + '/consumers'
)
def test_should_generic_container_str(self):
container_obj = self.manager.create(name=self.container.name)
self.assertIn('name: ' + self.container.name, str(container_obj))
def test_should_certificate_container_str(self):
container_obj = self.manager.create_certificate(
name=self.container.name)
self.assertIn('name: ' + self.container.name, str(container_obj))
def test_should_rsa_container_str(self):
container_obj = self.manager.create_rsa(name=self.container.name)
self.assertIn('name: ' + self.container.name, str(container_obj))
def test_should_generic_container_repr(self):
container_obj = self.manager.create(name=self.container.name)
self.assertIn('name="{0}"'.format(self.container.name),
repr(container_obj))
def test_should_certificate_container_repr(self):
container_obj = self.manager.create_certificate(
name=self.container.name)
self.assertIn('name="{0}"'.format(self.container.name),
repr(container_obj))
def test_should_rsa_container_repr(self):
container_obj = self.manager.create_rsa(name=self.container.name)
self.assertIn('name="{0}"'.format(self.container.name),
repr(container_obj))
def test_should_store_generic_via_constructor(self):
self.api.post.return_value = {'container_ref': self.entity_href}
container = self.manager.create(
name=self.container.name,
secrets=self.container.generic_secrets
)
container_href = container.store()
self.assertEqual(self.entity_href, container_href)
# Verify the correct URL was used to make the call.
args, kwargs = self.api.post.call_args
entity_resp = args[0]
self.assertEqual(self.entity, entity_resp)
# Verify that correct information was sent in the call.
container_req = args[1]
self.assertEqual(self.container.name, container_req['name'])
self.assertEqual(self.container.type, container_req['type'])
self.assertEqual(self.container.generic_secret_refs_json,
container_req['secret_refs'])
def test_should_store_generic_via_attributes(self):
self.api.post.return_value = {'container_ref': self.entity_href}
container = self.manager.create()
container.name = self.container.name
container.add(self.container.secret.name, self.container.secret)
container_href = container.store()
self.assertEqual(self.entity_href, container_href)
# Verify the correct URL was used to make the call.
args, kwargs = self.api.post.call_args
entity_resp = args[0]
self.assertEqual(self.entity, entity_resp)
# Verify that correct information was sent in the call.
container_req = args[1]
self.assertEqual(self.container.name, container_req['name'])
self.assertEqual(self.container.type, container_req['type'])
self.assertItemsEqual(self.container.generic_secret_refs_json,
container_req['secret_refs'])
def test_should_store_certificate_via_attributes(self):
self.api.post.return_value = {'container_ref': self.entity_href}
container = self.manager.create_certificate()
container.name = self.container.name
container.certificate = self.container.secret
container.private_key = self.container.secret
container.private_key_passphrase = self.container.secret
container.intermediates = self.container.secret
container_href = container.store()
self.assertEqual(self.entity_href, container_href)
# Verify the correct URL was used to make the call.
args, kwargs = self.api.post.call_args
entity_resp = args[0]
self.assertEqual(self.entity, entity_resp)
# Verify that correct information was sent in the call.
container_req = args[1]
self.assertEqual(self.container.name, container_req['name'])
self.assertEqual('certificate', container_req['type'])
self.assertItemsEqual(self.container.certificate_secret_refs_json,
container_req['secret_refs'])
def test_should_store_certificate_via_constructor(self):
self.api.post.return_value = {'container_ref': self.entity_href}
container = self.manager.create_certificate(
name=self.container.name,
certificate=self.container.secret,
private_key=self.container.secret,
private_key_passphrase=self.container.secret,
intermediates=self.container.secret
)
container_href = container.store()
self.assertEqual(self.entity_href, container_href)
# Verify the correct URL was used to make the call.
args, kwargs = self.api.post.call_args
entity_resp = args[0]
self.assertEqual(self.entity, entity_resp)
# Verify that correct information was sent in the call.
container_req = args[1]
self.assertEqual(self.container.name, container_req['name'])
self.assertEqual('certificate', container_req['type'])
self.assertItemsEqual(self.container.certificate_secret_refs_json,
container_req['secret_refs'])
def test_should_store_rsa_via_attributes(self):
self.api.post.return_value = {'container_ref': self.entity_href}
container = self.manager.create_rsa()
container.name = self.container.name
container.private_key = self.container.secret
container.private_key_passphrase = self.container.secret
container.public_key = self.container.secret
container_href = container.store()
self.assertEqual(self.entity_href, container_href)
# Verify the correct URL was used to make the call.
args, kwargs = self.api.post.call_args
entity_resp = args[0]
self.assertEqual(self.entity, entity_resp)
# Verify that correct information was sent in the call.
container_req = args[1]
self.assertEqual(self.container.name, container_req['name'])
self.assertEqual('rsa', container_req['type'])
self.assertItemsEqual(self.container.rsa_secret_refs_json,
container_req['secret_refs'])
def test_should_store_rsa_via_constructor(self):
self.api.post.return_value = {'container_ref': self.entity_href}
container = self.manager.create_rsa(
name=self.container.name,
private_key=self.container.secret,
private_key_passphrase=self.container.secret,
public_key=self.container.secret
)
container_href = container.store()
self.assertEqual(self.entity_href, container_href)
# Verify the correct URL was used to make the call.
args, kwargs = self.api.post.call_args
entity_resp = args[0]
self.assertEqual(self.entity, entity_resp)
# Verify that correct information was sent in the call.
container_req = args[1]
self.assertEqual(self.container.name, container_req['name'])
self.assertEqual('rsa', container_req['type'])
self.assertItemsEqual(self.container.rsa_secret_refs_json,
container_req['secret_refs'])
def test_should_get_secret_refs_when_created_using_secret_objects(self):
self.api.post.return_value = {'container_ref': self.entity_href}
container = self.manager.create(
name=self.container.name,
secrets=self.container.generic_secrets
)
self.assertEqual(container.secret_refs,
self.container.generic_secret_refs)
def test_should_reload_attributes_after_store(self):
self.api.post.return_value = {'container_ref': self.entity_href}
self.api.get.return_value = self.container.get_dict(self.entity_href)
container = self.manager.create(
name=self.container.name,
secrets=self.container.generic_secrets
)
self.assertIsNone(container.status)
self.assertIsNone(container.created)
self.assertIsNone(container.updated)
container_href = container.store()
self.assertEqual(self.entity_href, container_href)
self.assertIsNotNone(container.status)
self.assertIsNotNone(container.created)
def test_should_fail_add_invalid_secret_object(self):
container = self.manager.create()
self.assertRaises(ValueError, container.add, "Not-a-secret",
"Actually a string")
def test_should_fail_add_duplicate_named_secret_object(self):
container = self.manager.create()
container.add(self.container.secret.name, self.container.secret)
self.assertRaises(KeyError, container.add, self.container.secret.name,
self.container.secret)
def test_should_add_remove_add_secret_object(self):
container = self.manager.create()
container.add(self.container.secret.name, self.container.secret)
container.remove(self.container.secret.name)
container.add(self.container.secret.name, self.container.secret)
def test_should_be_immutable_after_store(self):
self.api.post.return_value = {'container_ref': self.entity_href}
container = self.manager.create(
name=self.container.name,
secrets=self.container.generic_secrets
)
container_href = container.store()
self.assertEqual(self.entity_href, container_href)
# Verify that attributes are immutable after store.
attributes = [
"name"
]
for attr in attributes:
try:
setattr(container, attr, "test")
self.fail("didn't raise an ImmutableException exception")
except base.ImmutableException:
pass
self.assertRaises(base.ImmutableException, container.add,
self.container.secret.name, self.container.secret)
def test_should_not_be_able_to_set_generated_attributes(self):
container = self.manager.create()
# Verify that generated attributes cannot be set.
attributes = [
"container_ref", "created", "updated", "status", "consumers"
]
for attr in attributes:
try:
setattr(container, attr, "test")
self.fail("didn't raise an AttributeError exception")
except AttributeError:
pass
def test_should_get_generic_container(self):
self.api.get.return_value = self.container.get_dict(self.entity_href)
container = self.manager.get(container_ref=self.entity_href)
self.assertIsInstance(container, containers.Container)
self.assertEqual(self.entity_href, container.container_ref)
# Verify the correct URL was used to make the call.
args, kwargs = self.api.get.call_args
url = args[0]
self.assertEqual(self.entity_href, url)
self.assertIsNotNone(container.secrets)
def test_should_get_certificate_container(self):
self.api.get.return_value = self.container.get_dict(self.entity_href,
type='certificate')
container = self.manager.get(container_ref=self.entity_href)
self.assertIsInstance(container, containers.Container)
self.assertEqual(self.entity_href, container.container_ref)
# Verify the correct URL was used to make the call.
args, kwargs = self.api.get.call_args
url = args[0]
self.assertEqual(self.entity_href, url)
# Verify the returned type is correct
self.assertIsInstance(container, containers.CertificateContainer)
self.assertIsNotNone(container.certificate)
self.assertIsNotNone(container.private_key)
self.assertIsNotNone(container.private_key_passphrase)
self.assertIsNotNone(container.intermediates)
def test_should_get_rsa_container(self):
self.api.get.return_value = self.container.get_dict(self.entity_href,
type='rsa')
container = self.manager.get(container_ref=self.entity_href)
self.assertIsInstance(container, containers.Container)
self.assertEqual(self.entity_href, container.container_ref)
# Verify the correct URL was used to make the call.
args, kwargs = self.api.get.call_args
url = args[0]
self.assertEqual(self.entity_href, url)
# Verify the returned type is correct
self.assertIsInstance(container, containers.RSAContainer)
self.assertIsNotNone(container.private_key)
self.assertIsNotNone(container.public_key)
self.assertIsNotNone(container.private_key_passphrase)
def test_should_delete_from_manager(self):
self.manager.delete(container_ref=self.entity_href)
# Verify the correct URL was used to make the call.
args, kwargs = self.api.delete.call_args
url = args[0]
self.assertEqual(self.entity_href, url)
def test_should_delete_from_object(self):
self.api.get.return_value = self.container.get_dict(self.entity_href)
container = self.manager.get(container_ref=self.entity_href)
self.assertIsNotNone(container.container_ref)
container.delete()
# Verify the correct URL was used to make the call.
args, kwargs = self.api.delete.call_args
url = args[0]
self.assertEqual(self.entity_href, url)
# Verify that the Container no longer has a container_ref
self.assertIsNone(container.container_ref)
def test_should_store_after_delete_from_object(self):
self.api.get.return_value = self.container.get_dict(self.entity_href)
container = self.manager.get(container_ref=self.entity_href)
self.assertIsNotNone(container.container_ref)
container.delete()
# Verify the correct URL was used to make the call.
args, kwargs = self.api.delete.call_args
url = args[0]
self.assertEqual(self.entity_href, url)
# Verify that the Container no longer has a container_ref
self.assertIsNone(container.container_ref)
container.store()
# Verify that the Container has a container_ref again
self.assertIsNotNone(container.container_ref)
def test_should_get_list(self):
container_resp = self.container.get_dict(self.entity_href)
self.api.get.return_value = {"containers":
[container_resp for v in range(3)]}
containers_list = self.manager.list(limit=10, offset=5)
self.assertTrue(len(containers_list) == 3)
self.assertIsInstance(containers_list[0], containers.Container)
self.assertEqual(self.entity_href, containers_list[0].container_ref)
# Verify the correct URL was used to make the call.
args, kwargs = self.api.get.call_args
url = args[0]
self.assertEqual(self.entity_base[:-1], url)
# Verify that correct information was sent in the call.
params = args[1]
self.assertEqual(10, params['limit'])
self.assertEqual(5, params['offset'])
def test_should_fail_get_invalid_container(self):
self.assertRaises(ValueError, self.manager.get,
**{'container_ref': '12345'})
def test_should_fail_delete_no_href(self):
self.assertRaises(ValueError, self.manager.delete, None)
def test_should_register_consumer(self):
self.api.post.return_value = self.container.get_dict(
self.entity_href, consumers=[self.container.consumer]
)
container = self.manager.register_consumer(
self.entity_href, self.container.consumer.get('name'),
self.container.consumer.get('URL')
)
self.assertIsInstance(container, containers.Container)
self.assertEqual(self.entity_href, container.container_ref)
args, kwargs = self.api.post.call_args
url, body = args[0], args[1]
self.assertEqual(self.consumers_post_resource, url)
self.assertEqual(self.container.consumer, body)
self.assertEqual([self.container.consumer], container.consumers)
def test_should_remove_consumer(self):
self.manager.remove_consumer(
self.entity_href, self.container.consumer.get('name'),
self.container.consumer.get('URL')
)
args, kwargs = self.api.delete.call_args
url = args[0]
body = kwargs['json']
self.assertEqual(self.consumers_delete_resource, url)
self.assertEqual(self.container.consumer, body)

View File

@@ -37,6 +37,11 @@ barbican.client =
secret_list = barbicanclient.barbican_cli.secrets:ListSecret
secret_store = barbicanclient.barbican_cli.secrets:StoreSecret
container_delete = barbicanclient.barbican_cli.containers:DeleteContainer
container_get = barbicanclient.barbican_cli.containers:GetContainer
container_list = barbicanclient.barbican_cli.containers:ListContainer
container_create = barbicanclient.barbican_cli.containers:CreateContainer
[build_sphinx]
source-dir = doc/source
build-dir = doc/build