Refactor client models in python-barbicanclient

Switching to an OOP approach.
Also did some minor cleanup (removing old Verifications code and some
unused imports).

Change-Id: Ifb443eb0a3ca7bab570d9181b35f6fd5f09e727c
Implements: blueprint client-refactor-models
This commit is contained in:
Adam Harwell
2014-08-18 14:22:39 -05:00
parent f9644702ad
commit b179a01122
11 changed files with 636 additions and 535 deletions

View File

@@ -66,12 +66,11 @@ class CreateOrder(show.ShowOne, OrderFormatter):
return parser
def take_action(self, args):
entity = self.app.client.orders.create(args.name,
args.payload_content_type,
args.algorithm,
args.bit_length,
args.mode,
args.expiration)
entity = self.app.client.orders.Order(
name=args.name, payload_content_type=args.payload_content_type,
algorithm=args.algorithm, bit_length=args.bit_length,
mode=args.mode, expiration=args.expiration)
entity.submit()
return self._get_formatted_entity(entity)
@@ -96,7 +95,7 @@ class GetOrder(show.ShowOne, OrderFormatter):
return parser
def take_action(self, args):
entity = self.app.client.orders.get(args.URI)
entity = self.app.client.orders.Order(order_ref=args.URI)
return self._get_formatted_entity(entity)

View File

@@ -85,7 +85,7 @@ class GetSecret(show.ShowOne, SecretFormatter):
return (('Secret',),
(entity,))
else:
entity = self.app.client.secrets.get(args.URI)
entity = self.app.client.secrets.Secret(secret_ref=args.URI)
return self._get_formatted_entity(entity)
@@ -161,9 +161,11 @@ class StoreSecret(show.ShowOne, SecretFormatter):
return parser
def take_action(self, args):
entity = self.app.client.secrets.store(
args.name, args.payload, args.payload_content_type,
args.payload_content_encoding, args.algorithm,
args.bit_length, args.mode, args.expiration)
return (('Secret',),
(entity,))
entity = self.app.client.secrets.Secret(
name=args.name, payload=args.payload,
payload_content_type=args.payload_content_type,
payload_content_encoding=args.payload_content_encoding,
algorithm=args.algorithm, bit_length=args.bit_length,
mode=args.mode, expiration=args.expiration)
entity.store()
return self._get_formatted_entity(entity)

View File

@@ -13,8 +13,31 @@
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Base utilites to build API operation managers.
Base utilities to build API operation managers.
"""
import six
import uuid
def filter_empty_keys(dictionary):
return dict(((k, v) for k, v in dictionary.items() if v))
def validate_ref(ref, entity):
try:
# Split out the UUID from the ref URL
url = six.moves.urllib.parse.urlparse(ref)
parts = url.path.rstrip('/').split('/')
# Attempt to load the UUID with uuid, which will raise if invalid
uuid.UUID(parts[-1])
except:
raise ValueError('{0} incorrectly specified.'.format(entity))
class ImmutableException(Exception):
def __init__(self, attribute=None):
message = "This object is immutable!"
super(ImmutableException, self).__init__(message)
class BaseEntityManager(object):
@@ -22,12 +45,6 @@ class BaseEntityManager(object):
self.api = api
self.entity = entity
def _remove_empty_keys(self, dictionary):
copied_dict = dictionary.copy()
for k in copied_dict.keys():
if dictionary[k] is None:
dictionary.pop(k)
def total(self):
"""
Returns the total number of entities stored in Barbican.

View File

@@ -24,7 +24,6 @@ from barbicanclient.common.auth import KeystoneAuthPluginWrapper
from barbicanclient.openstack.common.gettextutils import _
from barbicanclient import orders
from barbicanclient import secrets
from barbicanclient import verifications
LOG = logging.getLogger(__name__)
@@ -101,7 +100,6 @@ class Client(object):
self.base_url = '{0}'.format(self._barbican_url)
self.secrets = secrets.SecretManager(self)
self.orders = orders.OrderManager(self)
self.verifications = verifications.VerificationManager(self)
def _wrap_session_with_keystone_if_required(self, session, insecure):
# if session is not a keystone session, wrap it

View File

@@ -12,48 +12,205 @@
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import functools
import logging
from barbicanclient import base
from barbicanclient.openstack.common.gettextutils import _
from barbicanclient.openstack.common import timeutils
from barbicanclient.openstack.common.timeutils import parse_isotime
LOG = logging.getLogger(__name__)
def immutable_after_save(func):
@functools.wraps(func)
def wrapper(self, *args):
if self._order_ref:
raise base.ImmutableException()
return func(self, *args)
return wrapper
class Order(object):
"""
Orders are used to request the generation of a Secret in Barbican.
"""
entity = 'orders'
def __init__(self, order_dict):
"""
Builds an order object from a dictionary.
"""
self.order_ref = order_dict['order_ref']
self.error_status_code = order_dict.get('error_status_code', None)
self.error_reason = order_dict.get('error_reason', None)
self.status = order_dict.get('status')
self.created = timeutils.parse_isotime(order_dict['created'])
if order_dict.get('updated') is not None:
self.updated = timeutils.parse_isotime(order_dict['updated'])
def __init__(self, api, name=None, algorithm=None, bit_length=None,
mode=None, payload_content_type='application/octet-stream',
order_ref=None, secret_ref=None, status=None,
created=None, updated=None, expiration=None,
error_status_code=None, error_reason=None, secret=None,
meta=None, type=None):
self._api = api
self._order_ref = order_ref
self._type = type
self._meta = meta
if order_ref:
self._error_status_code = error_status_code
self._error_reason = error_reason
self._status = status
self._created = created
self._updated = updated
if self._created:
self._created = parse_isotime(self._created)
if self._updated:
self._updated = parse_isotime(self._updated)
self._secret_ref = secret_ref
self._secret = secret
else:
self.updated = None
self.secret_ref = order_dict.get('secret_ref')
self._error_status_code = None
self._error_reason = None
self._status = None
self._created = None
self._updated = None
self._secret_ref = None
self._secret = base.filter_empty_keys({
'name': name,
'algorithm': algorithm,
'bit_length': bit_length,
'mode': mode,
'payload_content_type': payload_content_type,
'expiration': expiration
})
if self._secret.get("expiration"):
self._secret['expiration'] = parse_isotime(
self._secret.get('expiration'))
@property
def name(self):
return self._secret.get('name')
@property
def expiration(self):
return self._secret.get('expiration')
@property
def algorithm(self):
return self._secret.get('algorithm')
@property
def bit_length(self):
return self._secret.get('bit_length')
@property
def mode(self):
return self._secret.get('mode')
@property
def payload_content_type(self):
return self._secret.get('payload_content_type')
@property
def order_ref(self):
return self._order_ref
@property
def secret_ref(self):
return self._secret_ref
@property
def created(self):
return self._created
@property
def updated(self):
return self._updated
@property
def status(self):
return self._status
@property
def error_status_code(self):
return self._error_status_code
@property
def error_reason(self):
return self._error_reason
@property
def type(self):
return self._type
@property
def meta(self):
return self._meta
@name.setter
@immutable_after_save
def name(self, value):
self._secret['name'] = value
@expiration.setter
@immutable_after_save
def expiration(self, value):
self._secret['expiration'] = value
@algorithm.setter
@immutable_after_save
def algorithm(self, value):
self._secret['algorithm'] = value
@bit_length.setter
@immutable_after_save
def bit_length(self, value):
self._secret['bit_length'] = value
@mode.setter
@immutable_after_save
def mode(self, value):
self._secret['mode'] = value
@payload_content_type.setter
@immutable_after_save
def payload_content_type(self, value):
self._secret['payload_content_type'] = value
@type.setter
@immutable_after_save
def type(self, value):
self._type = value
@meta.setter
@immutable_after_save
def meta(self, value):
self._meta = value
@immutable_after_save
def submit(self):
order_dict = dict({
'secret': self._secret
})
LOG.debug("Request body: {0}".format(order_dict.get('secret')))
response = self._api.post(self.entity, order_dict)
if response:
self._order_ref = response.get('order_ref')
return self._order_ref
def delete(self):
if self._order_ref:
self._api.delete(self._order_ref)
self._order_ref = None
else:
raise LookupError("Order is not yet stored.")
def __str__(self):
strg = ("Order - order href: {0}\n"
" secret href: {1}\n"
" created: {2}\n"
" status: {3}\n"
).format(self.order_ref, self.secret_ref,
self.created, self.status)
str_rep = ("Order:\n"
" order href: {0}\n"
" secret href: {1}\n"
" created: {2}\n"
" status: {3}\n"
).format(self.order_ref, self.secret_ref,
self.created, self.status)
if self.error_status_code:
strg = ''.join([strg, (" error_status_code: {0}\n"
" error_reason: {1}\n"
).format(self.error_status_code,
self.error_reason)])
return strg
str_rep = ''.join([str_rep, (" error_status_code: {0}\n"
" error_reason: {1}\n"
).format(self.error_status_code,
self.error_reason)])
return str_rep
def __repr__(self):
return 'Order(order_ref={0})'.format(self.order_ref)
@@ -64,53 +221,31 @@ class OrderManager(base.BaseEntityManager):
def __init__(self, api):
super(OrderManager, self).__init__(api, 'orders')
def create(self,
name=None,
payload_content_type='application/octet-stream',
algorithm=None,
bit_length=None,
mode=None,
expiration=None):
def Order(self, order_ref=None, name=None, payload_content_type=None,
algorithm=None, bit_length=None, mode=None, expiration=None):
"""
Creates a new Order in Barbican
Factory method that either retrieves an Order from Barbican if
given an order_ref, or creates a new Order if not, and returns
the Order object.
:param order_ref: If provided, will do an Order GET in Barbican
:param name: A friendly name for the secret
:param payload_content_type: The format/type of the secret data
:param algorithm: The algorithm the secret associated with
:param bit_length: The bit length of the secret
:param mode: The algorithm mode (e.g. CBC or CTR mode)
:param expiration: The expiration time of the secret in ISO 8601
format
:returns: Order href for the created order
:param algorithm: The algorithm associated with this secret key
:param bit_length: The bit length of this secret key
:param mode: The algorithm mode used with this secret key
:param expiration: The expiration time of the secret in ISO 8601 format
:returns: Secret object
"""
LOG.debug("Creating order")
order_dict = {'secret': {}}
order_dict['secret']['name'] = name
order_dict['secret'][
'payload_content_type'] = payload_content_type
order_dict['secret']['algorithm'] = algorithm
order_dict['secret']['bit_length'] = bit_length
order_dict['secret']['mode'] = mode
order_dict['secret']['expiration'] = expiration
self._remove_empty_keys(order_dict['secret'])
LOG.debug("Request body: {0}".format(order_dict['secret']))
resp = self.api.post(self.entity, order_dict)
return resp['order_ref']
def get(self, order_ref):
"""
Returns an Order object
:param order_ref: The href for the order
"""
LOG.debug("Getting order - Order href: {0}".format(order_ref))
if not order_ref:
raise ValueError('order_ref is required.')
resp = self.api.get(order_ref)
return Order(resp)
if order_ref:
LOG.debug("Getting order - Order href: {0}".format(order_ref))
base.validate_ref(order_ref, 'Order')
response = self.api.get(order_ref)
return Order(api=self.api, **response)
return Order(api=self.api, name=name,
payload_content_type=payload_content_type,
algorithm=algorithm, bit_length=bit_length, mode=mode,
expiration=expiration)
def delete(self, order_ref):
"""
@@ -134,6 +269,6 @@ class OrderManager(base.BaseEntityManager):
limit))
href = '{0}/{1}'.format(self.api.base_url, self.entity)
params = {'limit': limit, 'offset': offset}
resp = self.api.get(href, params)
response = self.api.get(href, params)
return [Order(o) for o in resp['orders']]
return [Order(api=self.api, **o) for o in response.get('orders', [])]

View File

@@ -12,9 +12,8 @@
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import functools
import logging
import re
import six
from barbicanclient import base
@@ -24,47 +23,205 @@ from barbicanclient.openstack.common.timeutils import parse_isotime
LOG = logging.getLogger(__name__)
def immutable_after_save(func):
@functools.wraps(func)
def wrapper(self, *args):
if self._secret_ref:
raise base.ImmutableException()
return func(self, *args)
return wrapper
class Secret(object):
"""
Secrets are used to keep track of the data stored in Barbican.
"""
entity = 'secrets'
def __init__(self, secret_dict):
"""
Builds a secret object from a dictionary.
"""
self.secret_ref = secret_dict.get('secret_ref')
self.name = secret_dict.get('name')
self.status = secret_dict.get('status')
self.content_types = secret_dict.get('content_types')
def __init__(self, api, name=None, expiration=None, algorithm=None,
bit_length=None, mode=None, payload=None,
payload_content_type=None, payload_content_encoding=None,
secret_ref=None, created=None, updated=None,
content_types=None, status=None):
self._api = api
self._secret_ref = secret_ref
self._name = name
self._algorithm = algorithm
self._bit_length = bit_length
self._mode = mode
self._payload = payload
self._payload_content_encoding = payload_content_encoding
self._expiration = expiration
if self._expiration:
self._expiration = parse_isotime(self._expiration)
if secret_ref:
self._content_types = content_types
self._status = status
self._created = created
self._updated = updated
if self._created:
self._created = parse_isotime(self._created)
if self._updated:
self._updated = parse_isotime(self._updated)
else:
self._content_types = None
self._status = None
self._created = None
self._updated = None
if secret_dict.get('created') is not None:
self.created = parse_isotime(secret_dict['created'])
if not self._content_types:
self._payload_content_type = payload_content_type
else:
self.created = None
if secret_dict.get('expiration') is not None:
self.expiration = parse_isotime(secret_dict['expiration'])
else:
self.expiration = None
if secret_dict.get('updated') is not None:
self.updated = parse_isotime(secret_dict['updated'])
else:
self.updated = None
self._payload_content_type = self._content_types.get('default',
None)
self.algorithm = secret_dict.get('algorithm')
self.bit_length = secret_dict.get('bit_length')
self.mode = secret_dict.get('mode')
@property
def secret_ref(self):
return self._secret_ref
@property
def name(self):
return self._name
@property
def expiration(self):
return self._expiration
@property
def algorithm(self):
return self._algorithm
@property
def bit_length(self):
return self._bit_length
@property
def mode(self):
return self._mode
@property
def payload(self):
if not self._payload:
self._fetch_payload()
return self._payload
@property
def payload_content_type(self):
return self._payload_content_type
@property
def payload_content_encoding(self):
return self._payload_content_encoding
@property
def created(self):
return self._created
@property
def updated(self):
return self._updated
@property
def content_types(self):
if self._content_types:
return self._content_types
elif self._payload_content_type:
return {u'default': six.u(self.payload_content_type)}
return None
@property
def status(self):
return self._status
@name.setter
@immutable_after_save
def name(self, value):
self._name = value
@expiration.setter
@immutable_after_save
def expiration(self, value):
self._expiration = value
@algorithm.setter
@immutable_after_save
def algorithm(self, value):
self._algorithm = value
@bit_length.setter
@immutable_after_save
def bit_length(self, value):
self._bit_length = value
@mode.setter
@immutable_after_save
def mode(self, value):
self._mode = value
@payload.setter
@immutable_after_save
def payload(self, value):
self._payload = value
@payload_content_type.setter
@immutable_after_save
def payload_content_type(self, value):
self._payload_content_type = value
@payload_content_encoding.setter
@immutable_after_save
def payload_content_encoding(self, value):
self._payload_content_encoding = value
def _fetch_payload(self):
if not self._payload_content_type and not self._content_types:
raise ValueError('Secret has no encrypted data to decrypt.')
elif not self._payload_content_type:
raise ValueError("Must specify decrypt content-type as "
"secret does not specify a 'default' "
"content-type.")
headers = {'Accept': self._payload_content_type}
self._payload = self._api.get_raw(self._secret_ref, headers)
@immutable_after_save
def store(self):
secret_dict = base.filter_empty_keys({
'name': self.name,
'payload': self.payload,
'payload_content_type': self.payload_content_type,
'payload_content_encoding': self.payload_content_encoding,
'algorithm': self.algorithm,
'mode': self.mode,
'bit_length': self.bit_length,
'expiration': self.expiration
})
LOG.debug("Request body: {0}".format(secret_dict))
# Save, store secret_ref and return
response = self._api.post(self.entity, secret_dict)
if response:
self._secret_ref = response.get('secret_ref')
return self.secret_ref
def delete(self):
if self._secret_ref:
self._api.delete(self._secret_ref)
self._secret_ref = None
else:
raise LookupError("Secret is not yet stored.")
def __str__(self):
return ("Secret - href: {0}\n"
" name: {1}\n"
" created: {2}\n"
" status: {3}\n"
" content types: {4}\n"
" algorithm: {5}\n"
" bit length: {6}\n"
" mode: {7}\n"
" expiration: {8}\n"
return ("Secret:\n"
" href: {0}\n"
" name: {1}\n"
" created: {2}\n"
" status: {3}\n"
" content types: {4}\n"
" algorithm: {5}\n"
" bit length: {6}\n"
" mode: {7}\n"
" expiration: {8}\n"
.format(self.secret_ref, self.name, self.created,
self.status, self.content_types, self.algorithm,
self.bit_length, self.mode, self.expiration)
@@ -79,92 +236,37 @@ class SecretManager(base.BaseEntityManager):
def __init__(self, api):
super(SecretManager, self).__init__(api, 'secrets')
def store(self,
name=None,
payload=None,
payload_content_type=None,
payload_content_encoding=None,
algorithm=None,
bit_length=None,
mode=None,
expiration=None):
def Secret(self, secret_ref=None, name=None, payload=None,
payload_content_type=None, payload_content_encoding=None,
algorithm=None, bit_length=None, mode=None, expiration=None):
"""
Stores a new Secret in Barbican
Factory method that either retrieves a Secret from Barbican if
given a secret_ref, or creates a new Secret if not, and returns
the Secret object.
:param name: A friendly name for the secret
:param secret_ref: If provided, will do a Secret GET in Barbican
:param name: A friendly name for the Secret
:param payload: The unencrypted secret data
:param payload_content_type: The format/type of the secret data
:param payload_content_encoding: The encoding of the secret data
:param algorithm: The algorithm associated with this secret key
:param bit_length: The bit length of this secret key
:param mode: The algorithm mode used with this secret key
:param expiration: The expiration time of the secret in ISO 8601
format
:returns: Secret href for the stored secret
:param expiration: The expiration time of the secret in ISO 8601 format
:returns: Secret object
"""
LOG.debug("Creating secret of payload content type {0}".format(
payload_content_type))
secret_dict = dict()
secret_dict['name'] = name
secret_dict['payload'] = payload
secret_dict['payload_content_type'] = payload_content_type
secret_dict['payload_content_encoding'] = payload_content_encoding
secret_dict['algorithm'] = algorithm
secret_dict['mode'] = mode
secret_dict['bit_length'] = bit_length
secret_dict['expiration'] = expiration
self._remove_empty_keys(secret_dict)
LOG.debug("Request body: {0}".format(secret_dict))
resp = self.api.post(self.entity, secret_dict)
return resp['secret_ref']
def get(self, secret_ref):
"""
Returns a Secret object with metadata about the secret.
:param secret_ref: The href for the secret
"""
if not secret_ref:
raise ValueError('secret_ref is required.')
try:
url = six.moves.urllib.parse.urlparse(secret_ref)
parts = url.path.rstrip('/').split('/')
reuuid = re.compile(r'[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-'
'[0-9a-f]{4}-[0-9a-f]{12}', re.I)
if not reuuid.findall(parts[-1]):
raise ValueError('secret uuid format error.')
except:
raise ValueError('secret incorrectly specified.')
resp = self.api.get(secret_ref)
return Secret(resp)
def decrypt(self, secret_ref, content_type=None):
"""
Returns the actual secret data stored in Barbican.
:param secret_ref: The href for the secret
:param content_type: The content_type of the secret, if not
provided, the client will fetch the secret meta and use the
default content_type to decrypt the secret
:returns: secret data
"""
if not secret_ref:
raise ValueError('secret_ref is required.')
if not content_type:
secret = self.get(secret_ref)
if secret.content_types is None:
raise ValueError('Secret has no encrypted data to decrypt.')
if 'default' not in secret.content_types:
raise ValueError("Must specify decrypt content-type as "
"secret does not specify a 'default' "
"content-type.")
content_type = secret.content_types['default']
headers = {'Accept': content_type}
return self.api.get_raw(secret_ref, headers)
if secret_ref:
LOG.debug("Getting secret - Secret href: {0}".format(secret_ref))
base.validate_ref(secret_ref, 'Secret')
response = self.api.get(secret_ref)
return Secret(api=self.api,
payload_content_type=payload_content_type,
**response)
return Secret(api=self.api, name=name, payload=payload,
payload_content_type=payload_content_type,
payload_content_encoding=payload_content_encoding,
algorithm=algorithm, bit_length=bit_length, mode=mode,
expiration=expiration)
def delete(self, secret_ref):
"""
@@ -183,6 +285,10 @@ class SecretManager(base.BaseEntityManager):
:param limit: Max number of secrets returned
:param offset: Offset secrets to begin list
:param name: Name filter for the list
:param algorithm: Algorithm filter for the list
:param mode: Mode filter for the list
:param bits: Bits filter for the list
:returns: list of Secret metadata objects
"""
LOG.debug('Listing secrets - offset {0} limit {1}'.format(offset,
@@ -198,6 +304,6 @@ class SecretManager(base.BaseEntityManager):
if bits > 0:
params['bits'] = bits
resp = self.api.get(href, params)
response = self.api.get(href, params)
return [Secret(s) for s in resp['secrets']]
return [Secret(api=self.api, **s) for s in response.get('secrets', [])]

View File

@@ -18,14 +18,11 @@ import httpretty
import requests
import testtools
import json
import uuid
from barbicanclient import client
from barbicanclient.test import keystone_client_fixtures
from barbicanclient.openstack.common import timeutils
from barbicanclient.openstack.common import jsonutils
from keystoneclient import session as ks_session
from keystoneclient.auth.identity import v2
from keystoneclient.auth.identity import v3

View File

@@ -13,7 +13,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from barbicanclient import orders
from barbicanclient import orders, base
from barbicanclient.openstack.common import timeutils
from barbicanclient.test import test_client
from barbicanclient.test import test_client_secrets as test_secrets
@@ -47,25 +47,26 @@ class WhenTestingOrders(test_client.BaseEntityResource):
self.manager = orders.OrderManager(self.api)
def test_should_entity_str(self):
order_obj = orders.Order(self.order.get_dict(self.entity_href))
order_obj.error_status_code = '500'
order_obj.error_reason = 'Something is broken'
self.assertIn('status: ' + self.order.status,
str(order_obj))
order = self.order.get_dict(self.entity_href)
order_obj = orders.Order(api=None, error_status_code='500',
error_reason='Something is broken', **order)
self.assertIn('status: ' + self.order.status, str(order_obj))
self.assertIn('error_status_code: 500', str(order_obj))
def test_should_entity_repr(self):
order_obj = orders.Order(self.order.get_dict(self.entity_href))
self.assertIn('order_ref=' + self.entity_href,
repr(order_obj))
order = self.order.get_dict(self.entity_href)
order_obj = orders.Order(api=None, **order)
self.assertIn('order_ref=' + self.entity_href, repr(order_obj))
def test_should_create(self):
def test_should_submit_via_constructor(self):
self.api.post.return_value = {'order_ref': self.entity_href}
order_href = self.manager\
.create(name=self.order.secret.name,
algorithm=self.order.secret.algorithm,
payload_content_type=self.order.secret.content)
order = self.manager.Order(
name=self.order.secret.name,
algorithm=self.order.secret.algorithm,
payload_content_type=self.order.secret.content
)
order_href = order.submit()
self.assertEqual(self.entity_href, order_href)
@@ -82,10 +83,73 @@ class WhenTestingOrders(test_client.BaseEntityResource):
self.assertEqual(self.order.secret.payload_content_type,
order_req['secret']['payload_content_type'])
def test_should_submit_via_attributes(self):
self.api.post.return_value = {'order_ref': self.entity_href}
order = self.manager.Order()
order.name = self.order.secret.name
order.algorithm = self.order.secret.algorithm
order.payload_content_type = self.order.secret.content
order_href = order.submit()
self.assertEqual(self.entity_href, order_href)
# Verify the correct URL was used to make the call.
args, kwargs = self.api.post.call_args
entity_resp = args[0]
self.assertEqual(self.entity, entity_resp)
# Verify that correct information was sent in the call.
order_req = args[1]
self.assertEqual(self.order.secret.name, order_req['secret']['name'])
self.assertEqual(self.order.secret.algorithm,
order_req['secret']['algorithm'])
self.assertEqual(self.order.secret.payload_content_type,
order_req['secret']['payload_content_type'])
def test_should_be_immutable_after_submit(self):
self.api.post.return_value = {'order_ref': self.entity_href}
order = self.manager.Order(
name=self.order.secret.name,
algorithm=self.order.secret.algorithm,
payload_content_type=self.order.secret.content
)
order_href = order.submit()
self.assertEqual(self.entity_href, order_href)
# Verify that attributes are immutable after store.
attributes = [
"name", "expiration", "algorithm", "bit_length", "mode",
"payload_content_type"
]
for attr in attributes:
try:
setattr(order, attr, "test")
self.fail("didn't raise an ImmutableException exception")
except base.ImmutableException:
pass
def test_should_not_be_able_to_set_generated_attributes(self):
order = self.manager.Order()
# Verify that generated attributes cannot be set.
attributes = [
"order_ref", "secret_ref", "created", "updated", "status",
"error_status_code", "error_reason"
]
for attr in attributes:
try:
setattr(order, attr, "test")
self.fail("didn't raise an AttributeError exception")
except AttributeError:
pass
def test_should_get(self):
self.api.get.return_value = self.order.get_dict(self.entity_href)
order = self.manager.get(order_ref=self.entity_href)
order = self.manager.Order(order_ref=self.entity_href)
self.assertIsInstance(order, orders.Order)
self.assertEqual(self.entity_href, order.order_ref)
@@ -122,8 +186,5 @@ class WhenTestingOrders(test_client.BaseEntityResource):
self.assertEqual(10, params['limit'])
self.assertEqual(5, params['offset'])
def test_should_fail_get_no_href(self):
self.assertRaises(ValueError, self.manager.get, None)
def test_should_fail_delete_no_href(self):
self.assertRaises(ValueError, self.manager.delete, None)

View File

@@ -14,7 +14,7 @@
# limitations under the License.
from barbicanclient.test import test_client
from barbicanclient import secrets
from barbicanclient import secrets, base
from barbicanclient.openstack.common import timeutils
@@ -51,22 +51,20 @@ class WhenTestingSecrets(test_client.BaseEntityResource):
self.manager = secrets.SecretManager(self.api)
def test_should_entity_str(self):
secret_obj = secrets.Secret(self.secret.get_dict(self.entity_href))
self.assertIn('name: ' + self.secret.name,
str(secret_obj))
secret_obj = self.manager.Secret(name=self.secret.name)
self.assertIn('name: ' + self.secret.name, str(secret_obj))
def test_should_entity_repr(self):
secret_obj = secrets.Secret(self.secret.get_dict(self.entity_href))
secret_obj = self.manager.Secret(name=self.secret.name)
self.assertIn('name="{0}"'.format(self.secret.name), repr(secret_obj))
def test_should_store(self):
def test_should_store_via_constructor(self):
self.api.post.return_value = {'secret_ref': self.entity_href}
secret_href = self.manager\
.store(name=self.secret.name,
payload=self.secret.payload,
payload_content_type=self.secret.content)
secret = self.manager.Secret(name=self.secret.name,
payload=self.secret.payload,
payload_content_type=self.secret.content)
secret_href = secret.store()
self.assertEqual(self.entity_href, secret_href)
# Verify the correct URL was used to make the call.
@@ -81,10 +79,68 @@ class WhenTestingSecrets(test_client.BaseEntityResource):
self.assertEqual(self.secret.payload_content_type,
secret_req['payload_content_type'])
def test_should_store_via_attributes(self):
self.api.post.return_value = {'secret_ref': self.entity_href}
secret = self.manager.Secret()
secret.name = self.secret.name
secret.payload = self.secret.payload
secret.payload_content_type = self.secret.content
secret_href = secret.store()
self.assertEqual(self.entity_href, secret_href)
# Verify the correct URL was used to make the call.
args, kwargs = self.api.post.call_args
entity_resp = args[0]
self.assertEqual(self.entity, entity_resp)
# Verify that correct information was sent in the call.
secret_req = args[1]
self.assertEqual(self.secret.name, secret_req['name'])
self.assertEqual(self.secret.payload, secret_req['payload'])
self.assertEqual(self.secret.payload_content_type,
secret_req['payload_content_type'])
def test_should_be_immutable_after_submit(self):
self.api.post.return_value = {'secret_ref': self.entity_href}
secret = self.manager.Secret(name=self.secret.name,
payload=self.secret.payload,
payload_content_type=self.secret.content)
secret_href = secret.store()
self.assertEqual(self.entity_href, secret_href)
# Verify that attributes are immutable after store.
attributes = [
"name", "expiration", "algorithm", "bit_length", "mode", "payload",
"payload_content_type", "payload_content_encoding"
]
for attr in attributes:
try:
setattr(secret, attr, "test")
self.fail("didn't raise an ImmutableException exception")
except base.ImmutableException:
pass
def test_should_not_be_able_to_set_generated_attributes(self):
secret = self.manager.Secret()
# Verify that generated attributes cannot be set.
attributes = [
"secret_ref", "created", "updated", "content_types", "status"
]
for attr in attributes:
try:
setattr(secret, attr, "test")
self.fail("didn't raise an AttributeError exception")
except AttributeError:
pass
def test_should_get(self):
self.api.get.return_value = self.secret.get_dict(self.entity_href)
secret = self.manager.get(secret_ref=self.entity_href)
secret = self.manager.Secret(secret_ref=self.entity_href)
self.assertIsInstance(secret, secrets.Secret)
self.assertEqual(self.entity_href, secret.secret_ref)
@@ -94,12 +150,17 @@ class WhenTestingSecrets(test_client.BaseEntityResource):
self.assertEqual(self.entity_href, url)
def test_should_decrypt_with_content_type(self):
self.api.get.return_value = self.secret.get_dict(self.entity_href)
decrypted = 'decrypted text here'
self.api.get_raw.return_value = decrypted
secret = self.manager.decrypt(secret_ref=self.entity_href,
content_type='application/octet-stream')
self.assertEqual(decrypted, secret)
secret = self.manager.Secret(
secret_ref=self.entity_href,
payload_content_type='application/octet-stream'
)
secret_payload = secret.payload
self.assertEqual(decrypted, secret_payload)
# Verify the correct URL was used to make the call.
args, kwargs = self.api.get_raw.call_args
@@ -117,8 +178,9 @@ class WhenTestingSecrets(test_client.BaseEntityResource):
decrypted = 'decrypted text here'
self.api.get_raw.return_value = decrypted
secret = self.manager.decrypt(secret_ref=self.entity_href)
self.assertEqual(decrypted, secret)
secret = self.manager.Secret(secret_ref=self.entity_href)
secret_payload = secret.payload
self.assertEqual(decrypted, secret_payload)
# Verify the correct URL was used to make the call.
args, kwargs = self.api.get.call_args
@@ -163,24 +225,28 @@ class WhenTestingSecrets(test_client.BaseEntityResource):
self.assertEqual(5, params['offset'])
def test_should_fail_get_invalid_secret(self):
self.assertRaises(ValueError, self.manager.get, '12345')
def test_should_fail_get_no_href(self):
self.assertRaises(ValueError, self.manager.get, None)
self.assertRaises(ValueError, self.manager.Secret,
**{'secret_ref': '12345'})
def test_should_fail_decrypt_no_content_types(self):
self.api.get.return_value = self.secret.get_dict(self.entity_href)
self.assertRaises(ValueError, self.manager.decrypt,
**{"secret_ref": self.entity_href})
secret = self.manager.Secret(secret_ref=self.entity_href)
try:
secret.payload
self.fail("didn't raise a ValueError exception")
except ValueError:
pass
def test_should_fail_decrypt_no_default_content_type(self):
content_types_dict = {'no-default': 'application/octet-stream'}
self.api.get.return_value = self.secret.get_dict(self.entity_href,
content_types_dict)
self.assertRaises(ValueError, self.manager.decrypt,
**{"secret_ref": self.entity_href})
secret = self.manager.Secret(secret_ref=self.entity_href)
try:
secret.payload
self.fail("didn't raise a ValueError exception")
except ValueError:
pass
def test_should_fail_delete_no_href(self):
self.assertRaises(ValueError, self.manager.get, None)
self.assertRaises(ValueError, self.manager.delete, None)

View File

@@ -1,133 +0,0 @@
# Copyright (c) 2013 Rackspace, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from barbicanclient import verifications as vers
from barbicanclient.openstack.common import timeutils
from barbicanclient.test import test_client
class VerificationData(object):
def __init__(self):
self.created = str(timeutils.utcnow())
self.resource_type = 'image'
self.resource_ref = 'http://www.image.com/v1/images/1234'
self.resource_action = 'vm_attach'
self.impersonation_allowed = True
self.verification_dict = {'created': self.created,
'resource_type': self.resource_type,
'resource_ref': self.resource_ref,
'resource_action': self.resource_action,
'impersonation_allowed':
self.impersonation_allowed}
def get_dict(self, verification_ref):
verify = self.verification_dict
verify['verification_ref'] = verification_ref
return verify
class WhenTestingVerifications(test_client.BaseEntityResource):
def setUp(self):
self._setUp('verifications')
self.verify = VerificationData()
self.manager = vers.VerificationManager(self.api)
def test_should_entity_str(self):
verif_obj = vers.Verification(self.verify.get_dict(self.entity_href))
verif_obj.error_status_code = '500'
verif_obj.error_reason = 'Something is broken'
self.assertIn('resource_type: ' + self.verify.resource_type,
str(verif_obj))
self.assertIn('error_status_code: 500', str(verif_obj))
def test_should_entity_repr(self):
verif = vers.Verification(self.verify.get_dict(self.entity_href))
self.assertIn('verification_ref=' + self.entity_href,
repr(verif))
def test_should_create(self):
self.api.post.return_value = {'verification_ref': self.entity_href}
order_href = self.manager\
.create(resource_type=self.verify.resource_type,
resource_ref=self.verify.resource_ref,
resource_action=self.verify.resource_action)
self.assertEqual(self.entity_href, order_href)
# Verify the correct URL was used to make the call.
args, kwargs = self.api.post.call_args
entity_resp = args[0]
self.assertEqual(self.entity, entity_resp)
# Verify that correct information was sent in the call.
verify_req = args[1]
self.assertEqual(self.verify.resource_type,
verify_req['resource_type'])
self.assertEqual(self.verify.resource_action,
verify_req['resource_action'])
self.assertEqual(self.verify.resource_ref,
verify_req['resource_ref'])
def test_should_get(self):
self.api.get.return_value = self.verify.get_dict(self.entity_href)
verify = self.manager.get(verification_ref=self.entity_href)
self.assertIsInstance(verify, vers.Verification)
self.assertEqual(self.entity_href, verify.verif_ref)
# Verify the correct URL was used to make the call.
args, kwargs = self.api.get.call_args
url = args[0]
self.assertEqual(self.entity_href, url)
def test_should_delete(self):
self.manager.delete(verification_ref=self.entity_href)
# Verify the correct URL was used to make the call.
args, kwargs = self.api.delete.call_args
url = args[0]
self.assertEqual(self.entity_href, url)
def test_should_get_list(self):
verify_resp = self.verify.get_dict(self.entity_href)
self.api.get.return_value = {"verifications":
[verify_resp for v in range(3)]}
verifies = self.manager.list(limit=10, offset=5)
self.assertTrue(len(verifies) == 3)
self.assertIsInstance(verifies[0], vers.Verification)
self.assertEqual(self.entity_href, verifies[0].verif_ref)
# Verify the correct URL was used to make the call.
args, kwargs = self.api.get.call_args
url = args[0]
self.assertEqual(self.entity_base[:-1], url)
# Verify that correct information was sent in the call.
params = args[1]
self.assertEqual(10, params['limit'])
self.assertEqual(5, params['offset'])
def test_should_fail_get_no_href(self):
self.assertRaises(ValueError, self.manager.get, None)
def test_should_fail_delete_no_href(self):
self.assertRaises(ValueError, self.manager.delete, None)

View File

@@ -1,147 +0,0 @@
# Copyright (c) 2013 Rackspace, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
from barbicanclient import base
from barbicanclient.openstack.common.gettextutils import _
from barbicanclient.openstack.common import timeutils
LOG = logging.getLogger(__name__)
class Verification(object):
def __init__(self, verif_dict):
"""
Builds a verification object from a dictionary.
"""
self.verif_ref = verif_dict['verification_ref']
self.resource_type = verif_dict['resource_type']
self.resource_ref = verif_dict['resource_ref']
self.resource_action = verif_dict['resource_action']
self.impersonation_allowed = verif_dict['impersonation_allowed']
self.is_verified = verif_dict.get('is_verified', False)
self.error_status_code = verif_dict.get('error_status_code', None)
self.error_reason = verif_dict.get('error_reason', None)
self.status = verif_dict.get('status')
self.created = timeutils.parse_isotime(verif_dict['created'])
if verif_dict.get('updated') is not None:
self.updated = timeutils.parse_isotime(verif_dict['updated'])
else:
self.updated = None
def __str__(self):
strg = ("Verification - verification href: {0}\n"
" resource_type: {1}\n"
" resource_ref: {2}\n"
" resource_action: {3}\n"
" impersonation_allowed: {4}\n"
" is_verified: {5}\n"
" created: {6}\n"
" status: {7}\n"
).format(self.verif_ref,
self.resource_type,
self.resource_ref,
self.resource_action,
self.impersonation_allowed,
self.is_verified,
self.created,
self.status)
if self.error_status_code:
strg = ''.join([strg, (" error_status_code: {0}\n"
" error_reason: {1}\n"
).format(self.error_status_code,
self.error_reason)])
return strg
def __repr__(self):
return 'Verification(verification_ref={0})'.format(self.verif_ref)
class VerificationManager(base.BaseEntityManager):
def __init__(self, api):
super(VerificationManager, self).__init__(api, 'verifications')
def create(self,
resource_type=None,
resource_ref=None,
resource_action=None,
impersonation_allowed=False):
"""
Creates a new Verification in Barbican
:param resource_type: Type of resource to verify
:param resource_ref: Reference to resource
:param resource_action: Action to be performed on or with the resource
:param impersonation_allowed: True if users/projects interacting
: with resource can be impersonated
:returns: Verification href for the created verification
"""
LOG.debug("Creating verification")
verif_dict = {'resource_type': resource_type,
'resource_ref': resource_ref,
'resource_action': resource_action,
'impersonation_allowed': impersonation_allowed}
self._remove_empty_keys(verif_dict)
LOG.debug("Request body: {0}".format(verif_dict))
resp = self.api.post(self.entity, verif_dict)
return resp['verification_ref']
def get(self, verification_ref):
"""
Returns a verification object
:param verification_ref: The href for the verification instance
"""
LOG.debug("Getting verification - "
"Verification href: {0}".format(verification_ref))
if not verification_ref:
raise ValueError('verif_ref is required.')
resp = self.api.get(verification_ref)
return Verification(resp)
def delete(self, verification_ref):
"""
Deletes a verification
:param verification_ref: The href for the verification instance
"""
if not verification_ref:
raise ValueError('verif_ref is required.')
self.api.delete(verification_ref)
def list(self, limit=10, offset=0):
"""
Lists all verifications for the tenant
:param limit: Max number of verifications returned
:param offset: Offset verifications to begin list
:returns: list of Verification objects
"""
LOG.debug('Listing verifications - '
'offset {0} limit {1}'.format(offset, limit))
href = '{0}/{1}'.format(self.api.base_url, self.entity)
params = {'limit': limit, 'offset': offset}
resp = self.api.get(href, params)
return [Verification(o) for o in resp['verifications']]