Mime Type Revamp

Implements: blueprint mime-type-revamp
Change-Id: I43f8ec16a3b2839174abb0989c950a6bb8a5b685
This commit is contained in:
Douglas Mendizabal 2013-07-25 16:44:22 -05:00
parent 719da9246d
commit 1bb6adc8fd
17 changed files with 686 additions and 395 deletions

View File

@ -16,6 +16,7 @@
"""
API-facing resource controllers.
"""
import base64
import falcon
@ -28,6 +29,7 @@ from barbican.model import models
from barbican.model import repositories as repo
from barbican.common import exception
from barbican.crypto import extension_manager as em
from barbican.crypto import mime_types
from barbican.openstack.common.gettextutils import _
from barbican.openstack.common import jsonutils as json
from barbican.queue import get_queue_api
@ -80,16 +82,6 @@ def _secret_mime_type_not_supported(mt, req, resp):
_("Mime-type of '{0}' is not supported.").format(mt), req, resp)
def _client_content_mismatch_to_secret(expected, actual, req, res):
"""
Throw exception indicating client content-type doesn't match
secret's mime-type.
"""
api.abort(falcon.HTTP_400,
_("Request content-type of '{0}' doesn't match "
"secret's of '{1}'.").format(actual, expected), req, res)
def _secret_data_too_large(req, resp):
"""Throw exception indicating plain-text was too big."""
api.abort(falcon.HTTP_413,
@ -322,15 +314,13 @@ class SecretsResource(api.ApiResource):
LOG.debug('Start secrets on_get '
'for tenant-ID {0}:'.format(keystone_id))
params = req._params
result = self.secret_repo.get_by_create_date(
keystone_id,
offset_arg=req.get_param('offset'),
limit_arg=req.get_param('limit'),
suppress_exception=True
)
result = self.secret_repo \
.get_by_create_date(keystone_id,
offset_arg=params.get('offset',
None),
limit_arg=params.get('limit',
None),
suppress_exception=True)
secrets, offset, limit = result
if not secrets:
@ -383,6 +373,7 @@ class SecretResource(api.ApiResource):
else:
tenant = res.get_or_create_tenant(keystone_id, self.tenant_repo)
resp.set_header('Content-Type', req.accept)
try:
resp.body = self.crypto_manager.decrypt(req.accept, secret,
tenant)
@ -390,14 +381,34 @@ class SecretResource(api.ApiResource):
LOG.exception('Secret decryption failed - '
'accept not supported')
_get_accept_not_supported(canse.accept, req, resp)
except em.CryptoNoSecretOrDataException as cnsode:
LOG.exception('Secret information of type {0} not '
'found for decryption.'.format(cnsode.mime_type))
_get_secret_info_not_found(cnsode.mime_type, req, resp)
except em.CryptoNoSecretOrDataException:
LOG.exception('Secret information of type not '
'found for decryption.')
_get_secret_info_not_found(req.accept, req, resp)
except Exception:
LOG.exception('Secret decryption failed - unknown')
_failed_to_decrypt_data(req, resp)
acceptable = utils.get_accepted_encodings(req)
LOG.debug('Acceptable: {0}'.format(acceptable))
if acceptable:
encodings = [enc for enc in acceptable if
enc in mime_types.ENCODINGS]
if encodings:
if 'base64' in encodings:
resp.body = base64.b64encode(resp.body)
else:
# encoding not supported
LOG.exception('Accept-Encoding not supported:'
' {0}'.format(str(encodings)))
_get_accept_not_supported(str(encodings), req, resp)
else:
# encoding not supported
LOG.exception('Accept-Encoding not supported:'
' {0}'.format(str(encodings)))
_get_accept_not_supported(str(encodings), req, resp)
@handle_exceptions(_('Secret update'))
def on_put(self, req, resp, keystone_id, secret_id):
@ -408,24 +419,35 @@ class SecretResource(api.ApiResource):
suppress_exception=True)
if not secret:
_secret_not_found(req, resp)
if secret.mime_type != req.content_type:
_client_content_mismatch_to_secret(secret.mime_type,
req.content_type, req, resp)
if secret.encrypted_data:
_secret_already_has_data(req, resp)
tenant = res.get_or_create_tenant(keystone_id, self.tenant_repo)
payload = None
content_type = req.content_type
content_encoding = req.get_header('Content-Encoding')
try:
plain_text = req.stream.read(api.MAX_BYTES_REQUEST_INPUT_ACCEPTED)
payload = req.stream.read(api.MAX_BYTES_REQUEST_INPUT_ACCEPTED)
except IOError:
api.abort(falcon.HTTP_500, 'Read Error')
resp.status = falcon.HTTP_200
if content_type in mime_types.BINARY and \
content_encoding in mime_types.ENCODINGS:
if content_encoding == 'base64':
payload = base64.b64decode(payload)
elif content_encoding is not None:
LOG.exception(
'Content-Encoding not supported {0}'.format(content_encoding)
)
_put_accept_incorrect(content_encoding, req, resp)
try:
res.create_encrypted_datum(secret,
plain_text,
payload,
content_type,
content_encoding,
tenant,
self.crypto_manager,
self.tenant_secret_repo,
@ -443,6 +465,8 @@ class SecretResource(api.ApiResource):
LOG.exception('Secret creation failed - unknown')
_failed_to_create_encrypted_datum(req, resp)
resp.status = falcon.HTTP_200
@handle_exceptions(_('Secret deletion'))
def on_delete(self, req, resp, keystone_id, secret_id):

View File

@ -209,6 +209,14 @@ class InvalidContentType(BarbicanException):
message = _("Invalid content type %(content_type)s")
class InvalidContentEncoding(BarbicanException):
message = _("Invalid content encoding %(content_encoding)s")
class PayloadDecodingError(BarbicanException):
message = _("Error while attempting to decode payload.")
class BadRegistryConnectionConfiguration(BarbicanException):
message = _("Registry was not configured correctly on API server. "
"Reason: %(reason)s")

View File

@ -16,9 +16,12 @@
"""
Shared business logic.
"""
import base64
from barbican.common import exception, validators
from barbican.model import models
from barbican.common import utils
from barbican.crypto import mime_types
LOG = utils.getLogger(__name__)
@ -51,15 +54,32 @@ def create_secret(data, tenant, crypto_manager,
time_keeper.mark('after Secret model create')
new_datum = None
if 'plain_text' in data:
plain_text = data['plain_text']
if not plain_text:
if 'payload' in data:
payload = data.get('payload')
content_type = data.get('payload_content_type')
content_encoding = data.get('payload_content_encoding')
if not payload:
raise exception.NoDataToProcess()
LOG.debug('Encrypting plain_text secret...')
new_datum = crypto_manager.encrypt(data['plain_text'],
LOG.debug('Pre-processing payload...')
if content_type in mime_types.PLAIN_TEXT:
LOG.debug('Plain text payload. No pre-processing needed.')
elif content_type in mime_types.BINARY:
# payload has to be decoded
if content_encoding not in ['base64']:
raise exception.InvalidContentEncoding(content_encoding)
try:
payload = base64.b64decode(payload)
except TypeError:
raise exception.PayloadDecodingError()
else:
raise exception.InvalidContentType()
time_keeper.mark('after pre-processing')
LOG.debug('Encrypting payload...')
new_datum = crypto_manager.encrypt(payload,
content_type,
new_secret,
tenant)
time_keeper.mark('after encrypt')
@ -73,8 +93,6 @@ def create_secret(data, tenant, crypto_manager,
else:
LOG.debug('Creating metadata only for the new secret. '
'A subsequent PUT is required')
crypto_manager.supports(new_secret, tenant)
time_keeper.mark('after supports check')
# Create Secret entities in datastore.
secret_repo.create_from(new_secret)
@ -97,34 +115,39 @@ def create_secret(data, tenant, crypto_manager,
return new_secret
def create_encrypted_datum(secret, plain_text, tenant, crypto_manager,
def create_encrypted_datum(secret, payload,
content_type, content_encoding,
tenant, crypto_manager,
tenant_secret_repo, datum_repo):
"""
Modifies the secret to add the plain_text secret information.
:param secret: the secret entity to associate the secret data to
:param plain_text: plain-text of the secret data to store
:param payload: secret data to store
:param content_type: payload content mime type
:param content_encoding: payload content encoding
:param tenant: the tenant (entity) who owns the secret
:param crypto_manager: the crypto plugin manager
:param tenant_secret_repo: the tenant/secret association repository
:param datum_repo: the encrypted datum repository
:retval The response body, None if N/A
"""
if not plain_text:
if not payload:
raise exception.NoDataToProcess()
if validators.secret_too_big(plain_text):
if validators.secret_too_big(payload):
raise exception.LimitExceeded()
if secret.encrypted_data:
raise ValueError('Secret already has encrypted data stored for it.')
fields = secret.to_dict_fields()
fields['plain_text'] = plain_text
fields['payload'] = payload
# Encrypt plain_text
LOG.debug('Encrypting plain_text secret')
new_datum = crypto_manager.encrypt(plain_text,
# Encrypt payload
LOG.debug('Encrypting secret payload...')
new_datum = crypto_manager.encrypt(payload,
content_type,
secret,
tenant)
datum_repo.create_from(new_datum)

View File

@ -18,7 +18,9 @@ Common utilities for Barbican.
"""
import time
from oslo.config import cfg
import barbican.openstack.common.log as logging
@ -54,6 +56,40 @@ def getLogger(name):
return logging.getLogger(name)
def get_accepted_encodings(req):
"""Returns a list of client acceptable encodings sorted by q value.
For details see: http://tools.ietf.org/html/rfc2616#section-14.3
:param req: request object
:returns: list of client acceptable encodings sorted by q value.
"""
header = req.get_header('Accept-Encoding')
if header is None:
return None
encodings = list()
for enc in header.split(','):
if ';' in enc:
encoding, q = enc.split(';')
try:
q = q.split('=')[1]
quality = float(q.strip())
except ValueError:
# can't convert quality to float
return None
if quality > 1.0 or quality < 0.0:
# quality is outside valid range
return None
if quality > 0.0:
encodings.append((encoding.strip(), quality))
else:
encodings.append((enc.strip(), 1))
return [enc[0] for enc in sorted(encodings,
cmp=lambda a, b: cmp(b[1], a[1]))]
class TimeKeeper(object):
"""
Keeps track of elapsed times and then allows for dumping a smmary to

View File

@ -3,11 +3,14 @@ API JSON validators.
"""
import abc
import jsonschema as schema
from oslo.config import cfg
from barbican.common import exception
from barbican.openstack.common import timeutils
from barbican.common import utils
from barbican.crypto import mime_types
from barbican.openstack.common import timeutils
from barbican.openstack.common.gettextutils import _
@ -71,16 +74,18 @@ class NewSecretValidator(ValidatorBase):
"cypher_type": {"type": "string"},
"bit_length": {"type": "integer", "minimum": 0},
"expiration": {"type": "string"},
"plain_text": {"type": "string"},
"mime_type": {
"payload": {"type": "string"},
"payload_content_type": {
"type": "string",
'enum': [
'text/plain',
'application/octet-stream'
"enum": mime_types.SUPPORTED
},
"payload_content_encoding": {
"type": "string",
"enum": [
"base64"
]
},
},
"required": ["mime_type"]
}
def validate(self, json_data, parent_schema=None):
@ -108,27 +113,46 @@ class NewSecretValidator(ValidatorBase):
"before current time"))
json_data['expiration'] = expiration
# Validate/convert 'plain_text' if provided.
if 'plain_text' in json_data:
if json_data['mime_type'] != 'text/plain':
raise exception.InvalidObject(schema=schema_name,
reason=_("If 'plain_text' is "
"supplied, 'mime_type' "
"must be 'text/plain'"))
# Validate/convert 'payload' if provided.
if 'payload' in json_data:
content_type = json_data.get('payload_content_type')
if content_type is None:
raise exception.InvalidObject(
schema=schema_name,
reason=_("If 'payload' is supplied, 'payload_content_type'"
" must also be supplied.")
)
plain_text = json_data['plain_text']
if secret_too_big(plain_text):
content_encoding = json_data.get('payload_content_encoding')
if content_type == 'application/octet-stream' and \
content_encoding is None:
raise exception.InvalidObject(
schema=schema_name,
reason=_("payload_content_encoding must be specified "
"when payload_content_type is application/"
"octet-stream.")
)
if content_type.startswith('text/plain') and \
content_encoding is not None:
raise exception.InvalidObject(
schema=schema_name,
reason=_("payload_content_encoding must not be specified "
"when payload_content_type is text/plain")
)
payload = json_data['payload']
if secret_too_big(payload):
raise exception.LimitExceeded()
plain_text = plain_text.strip()
if not plain_text:
payload = payload.strip()
if not payload:
raise exception.InvalidObject(schema=schema_name,
reason=_("If 'plain_text' "
reason=_("If 'payload' "
"specified, must be "
"non empty"))
json_data['plain_text'] = plain_text
# TODO: Add validation of 'mime_type' based on loaded plugins.
json_data['payload'] = payload
return json_data
@ -169,35 +193,29 @@ class NewOrderValidator(ValidatorBase):
except schema.ValidationError as e:
raise exception.InvalidObject(schema=schema_name, reason=str(e))
# If secret group is provided, validate it now.
if 'secret' in json_data:
secret = json_data['secret']
self.secret_validator.validate(secret, parent_schema=self.name)
if 'plain_text' in secret:
raise exception.InvalidObject(schema=schema_name,
reason=_("'plain_text' not "
"allowed for secret "
"generation"))
else:
secret = json_data.get('secret')
if secret is None:
raise exception.InvalidObject(schema=schema_name,
reason=_("'secret' attributes "
"are required"))
# If secret group is provided, validate it now.
self.secret_validator.validate(secret, parent_schema=self.name)
if 'payload' in secret:
raise exception.InvalidObject(schema=schema_name,
reason=_("'payload' not "
"allowed for secret "
"generation"))
# Validation secret generation related fields.
# TODO: Invoke the crypto plugin for this purpose
if secret.get('mime_type') != 'application/octet-stream':
raise exception.UnsupportedField(field="mime_type",
schema=schema_name,
reason=_("Only 'application/octe"
"t-stream' supported"))
if secret.get('cypher_type') != 'cbc':
if secret.get('cypher_type').lower() != 'cbc':
raise exception.UnsupportedField(field="cypher_type",
schema=schema_name,
reason=_("Only 'cbc' "
"supported"))
if secret.get('algorithm') != 'aes':
if secret.get('algorithm').lower() != 'aes':
raise exception.UnsupportedField(field="algorithm",
schema=schema_name,
reason=_("Only 'aes' "

View File

@ -17,6 +17,7 @@ from oslo.config import cfg
from stevedore import named
from barbican.common.exception import BarbicanException
from barbican.crypto import mime_types
from barbican.model.models import EncryptedDatum
from barbican.openstack.common.gettextutils import _
@ -64,12 +65,17 @@ class CryptoAcceptNotSupportedException(BarbicanException):
class CryptoNoSecretOrDataException(BarbicanException):
"""Raised when secret information is not available for the specified
secret mime-type."""
def __init__(self, mime_type):
def __init__(self, secret_id):
super(CryptoNoSecretOrDataException, self).__init__(
_('No secret information available for '
'Mime Type of {0}').format(mime_type)
'secret {0}').format(secret_id)
)
self.mime_type = mime_type
self.secret_id = secret_id
class CryptoPluginNotFound(BarbicanException):
"""Raised when no plugins are installed."""
message = "Crypto plugin not found."
class CryptoExtensionManager(named.NamedExtensionManager):
@ -83,68 +89,67 @@ class CryptoExtensionManager(named.NamedExtensionManager):
invoke_kwds=invoke_kwargs
)
def encrypt(self, unencrypted, secret, tenant):
"""Delegates encryption to active plugins."""
if secret.mime_type == 'text/plain':
def encrypt(self, unencrypted, content_type, secret, tenant):
"""Delegates encryption to first active plugin."""
if len(self.extensions) < 1:
raise CryptoPluginNotFound()
encrypting_plugin = self.extensions[0].obj
if content_type in mime_types.PLAIN_TEXT:
# normalize text to binary string
unencrypted = unencrypted.encode('utf-8')
for ext in self.extensions:
if ext.obj.supports(secret.mime_type):
datum = EncryptedDatum(secret)
datum.cypher_text, datum.kek_metadata = ext.obj.encrypt(
unencrypted, tenant
)
return datum
else:
raise CryptoMimeTypeNotSupportedException(secret.mime_type)
datum = EncryptedDatum(secret)
datum.content_type = content_type
datum.cypher_text, datum.kek_metadata = encrypting_plugin.encrypt(
unencrypted, tenant
)
return datum
def decrypt(self, accept, secret, tenant):
"""Delegates decryption to active plugins."""
if not secret or not secret.encrypted_data:
raise CryptoNoSecretOrDataException(accept)
raise CryptoNoSecretOrDataException(secret.id)
if not mime_types.is_supported(accept):
raise CryptoAcceptNotSupportedException(accept)
for ext in self.extensions:
if ext.obj.supports(accept):
for datum in secret.encrypted_data:
if accept == datum.mime_type:
unencrypted = ext.obj.decrypt(datum.cypher_text,
datum.kek_metadata,
tenant)
if accept == 'text/plain':
unencrypted = unencrypted.decode('utf-8')
return unencrypted
plugin = ext.obj
for datum in secret.encrypted_data:
if plugin.supports(datum.kek_metadata):
unencrypted = plugin.decrypt(datum.cypher_text,
datum.kek_metadata,
tenant)
if datum.content_type in mime_types.PLAIN_TEXT:
unencrypted = unencrypted.decode('utf-8')
return unencrypted
else:
raise CryptoAcceptNotSupportedException(accept)
raise CryptoPluginNotFound()
def generate_data_encryption_key(self, secret, tenant):
"""
Delegates generating a data-encryption key to active plugins.
Delegates generating a data-encryption key to first active plugin.
Note that this key can be used by clients for their encryption
processes. This generated key is then be encrypted via
the plug-in key encryption process, and that encrypted datum
is then returned from this method.
"""
for ext in self.extensions:
if ext.obj.supports(secret.mime_type):
# TODO: Call plugin's key generation processes.
# Note: It could be the *data* key to generate (for the
# secret algo type) uses a different plug in than that
# used to encrypted the key.
data_key = ext.obj.create(secret.algorithm, secret.bit_length)
datum = EncryptedDatum(secret)
datum.cypher_text, datum.kek_metadata = ext.obj.encrypt(
data_key, tenant
)
return datum
else:
raise CryptoMimeTypeNotSupportedException(secret.mime_type)
if len(self.extensions) < 1:
raise CryptoPluginNotFound()
encrypting_plugin = self.extensions[0].obj
def supports(self, secret, tenant):
"""Tests if at least one plug-in supports the secret type."""
for ext in self.extensions:
if ext.obj.supports(secret.mime_type):
return True
else:
raise CryptoMimeTypeNotSupportedException(secret.mime_type)
# TODO: Call plugin's key generation processes.
# Note: It could be the *data* key to generate (for the
# secret algo type) uses a different plug in than that
# used to encrypted the key.
data_key = encrypting_plugin.create(secret.algorithm,
secret.bit_length)
datum = EncryptedDatum(secret)
datum.cypher_text, datum.kek_metadata = encrypting_plugin.encrypt(
data_key, tenant
)
return datum

View File

@ -17,6 +17,14 @@
Barbican defined mime-types
"""
# Supported mime types
PLAIN_TEXT = ['text/plain',
'text/plain;charset=utf-8',
'text/plain; charset=utf-8']
BINARY = ['application/octet-stream']
ENCODINGS = ['base64']
SUPPORTED = PLAIN_TEXT + BINARY
# Maps mime-types used to specify secret data formats to the types that can
# be requested for secrets via GET calls.
CTYPES_PLAIN = {'default': 'text/plain'}
@ -27,6 +35,10 @@ CTYPES_MAPPINGS = {'text/plain': CTYPES_PLAIN,
'application/aes': CTYPES_AES}
def is_supported(mime_type):
return mime_type in SUPPORTED
def augment_fields_with_content_types(secret):
"""Generate a dict of content types based on the data associated
with the specified secret."""
@ -38,8 +50,10 @@ def augment_fields_with_content_types(secret):
# TODO: How deal with merging more than one datum instance?
for datum in secret.encrypted_data:
if datum.mime_type in CTYPES_MAPPINGS:
fields.update({'content_types': CTYPES_MAPPINGS[datum.mime_type]})
if datum.content_type in CTYPES_MAPPINGS:
fields.update(
{'content_types': CTYPES_MAPPINGS[datum.content_type]}
)
break
return fields

View File

@ -68,15 +68,14 @@ class CryptoPluginBase(object):
"""Create a new key."""
@abc.abstractmethod
def supports(self, secret_type):
"""Whether the plugin supports the specified secret type."""
def supports(self, kek_metadata):
"""Used to decide whether the plugin supports decoding the secret."""
class SimpleCryptoPlugin(CryptoPluginBase):
"""Insecure implementation of the crypto plugin."""
def __init__(self, conf=CONF):
self.supported_types = ['text/plain', 'application/octet-stream']
self.kek = conf.simple_crypto_plugin.kek
self.block_size = AES.block_size
@ -131,5 +130,6 @@ class SimpleCryptoPlugin(CryptoPluginBase):
raise ValueError('At this time you must supply 128/192/256 as'
'bit length')
def supports(self, secret_type):
return secret_type in self.supported_types
def supports(self, kek_metadata):
metadata = json.loads(kek_metadata)
return metadata['plugin'] == 'SimpleCryptoPlugin'

View File

@ -0,0 +1,40 @@
"""bp/mime-type-revamp
Revision ID: b4ba6a2a663
Revises: 53c2ae2df15d
Create Date: 2013-07-26 15:25:44.193889
"""
# revision identifiers, used by Alembic.
revision = 'b4ba6a2a663'
down_revision = '53c2ae2df15d'
from alembic import op
import sqlalchemy as sa
def upgrade():
### commands auto generated by Alembic - please adjust! ###
op.add_column('encrypted_data', sa.Column('content_type',
sa.String(length=255),
nullable=True))
op.alter_column('secrets', u'mime_type',
existing_type=sa.VARCHAR(length=255),
nullable=True)
op.alter_column('orders', u'secret_mime_type',
existing_type=sa.VARCHAR(length=255),
nullable=True)
### end Alembic commands ###
def downgrade():
### commands auto generated by Alembic - please adjust! ###
op.alter_column('secrets', u'mime_type',
existing_type=sa.VARCHAR(length=255),
nullable=False)
op.alter_column('orders', u'secret_mime_type',
existing_type=sa.VARCHAR(length=255),
nullable=True)
op.drop_column('encrypted_data', 'content_type')
### end Alembic commands ###

View File

@ -191,7 +191,7 @@ class Secret(BASE, ModelBase):
name = Column(String(255))
expiration = Column(DateTime, default=None)
mime_type = Column(String(255), nullable=False)
mime_type = Column(String(255), nullable=True)
algorithm = Column(String(255))
bit_length = Column(Integer)
cypher_type = Column(String(255))
@ -204,15 +204,13 @@ class Secret(BASE, ModelBase):
"""Creates secret from a dict."""
super(Secret, self).__init__()
self.name = parsed_request.get('name', None)
self.mime_type = parsed_request['mime_type']
self.expiration = parsed_request.get('expiration', None)
self.algorithm = parsed_request.get('algorithm', None)
self.bit_length = parsed_request.get('bit_length', None)
self.cypher_type = parsed_request.get('cypher_type', None)
self.name = parsed_request.get('name')
self.expiration = parsed_request.get('expiration')
self.algorithm = parsed_request.get('algorithm')
self.bit_length = parsed_request.get('bit_length')
self.cypher_type = parsed_request.get('cypher_type')
self.status = States.ACTIVE
self.mime_type = None
def _do_delete_children(self, session):
"""
@ -226,7 +224,6 @@ class Secret(BASE, ModelBase):
return {'secret_id': self.id,
'name': self.name or self.id,
'expiration': self.expiration,
'mime_type': self.mime_type,
'algorithm': self.algorithm,
'bit_length': self.bit_length,
'cypher_type': self.cypher_type}
@ -244,7 +241,7 @@ class EncryptedDatum(BASE, ModelBase):
secret_id = Column(String(36), ForeignKey('secrets.id'),
nullable=False)
content_type = Column(String(255))
mime_type = Column(String(255))
cypher_text = Column(LargeBinary)
kek_metadata = Column(Text)
@ -255,15 +252,14 @@ class EncryptedDatum(BASE, ModelBase):
if secret:
self.secret_id = secret.id
self.mime_type = secret.mime_type
self.status = States.ACTIVE
def _do_extra_dict_fields(self):
"""Sub-class hook method: return dict of fields."""
return {'name': self.name,
'mime_type': self.mime_type,
'cypher_text': self.secret,
'content_type': self.content_type,
'kek_metadata': self.kek_metadata}
@ -285,7 +281,7 @@ class Order(BASE, ModelBase):
secret_algorithm = Column(String(255))
secret_bit_length = Column(Integer)
secret_cypher_type = Column(String(255))
secret_mime_type = Column(String(255), nullable=False)
secret_mime_type = Column(String(255), nullable=True)
secret_expiration = Column(DateTime, default=None)
secret_id = Column(String(36), ForeignKey('secrets.id'),
@ -294,7 +290,6 @@ class Order(BASE, ModelBase):
def _do_extra_dict_fields(self):
"""Sub-class hook method: return dict of fields."""
return {'secret': {'name': self.secret_name or self.secret_id,
'mime_type': self.secret_mime_type,
'algorithm': self.secret_algorithm,
'bit_length': self.secret_bit_length,
'cypher_type': self.secret_cypher_type,

View File

@ -13,16 +13,19 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from mock import MagicMock
import falcon
import base64
import json
import unittest
import falcon
import mock
from mock import MagicMock
from barbican.api import resources as res
from barbican.crypto.extension_manager import CryptoExtensionManager
from barbican.model import models
from barbican.common import exception as excep
from barbican.common.validators import DEFAULT_MAX_SECRET_BYTES
from barbican.crypto.extension_manager import CryptoExtensionManager
from barbican.model import models
from barbican.openstack.common import jsonutils
@ -34,19 +37,18 @@ def suite():
suite.addTest(WhenGettingSecretsListUsingSecretsResource())
suite.addTest(WhenGettingPuttingOrDeletingSecretUsingSecretResource())
suite.addTest(WhenCreatingOrdersUsingOrdersResource())
suite.addText(WhenGettingOrdersListUsingOrdersResource())
suite.addTest(WhenGettingOrdersListUsingOrdersResource())
suite.addTest(WhenGettingOrDeletingOrderUsingOrderResource())
return suite
def create_secret(mime_type, id="id", name="name",
def create_secret(id="id", name="name",
algorithm=None, bit_length=None,
cypher_type=None, encrypted_datum=None):
"""Generate a Secret entity instance."""
info = {'id': id,
'name': name,
'mime_type': mime_type,
'algorithm': algorithm,
'bit_length': bit_length,
'cypher_type': cypher_type}
@ -56,14 +58,15 @@ def create_secret(mime_type, id="id", name="name",
return secret
def create_order(mime_type, id="id", name="name",
algorithm=None, bit_length=None,
def create_order(id="id",
name="name",
algorithm=None,
bit_length=None,
cypher_type=None):
"""Generate an Order entity instance."""
order = models.Order()
order.id = id
order.secret_name = name
order.secret_mime_type = mime_type
order.secret_algorithm = algorithm
order.secret_bit_length = bit_length
order.secret_cypher_type = cypher_type
@ -95,18 +98,17 @@ class WhenTestingVersionResource(unittest.TestCase):
class WhenCreatingSecretsUsingSecretsResource(unittest.TestCase):
def setUp(self):
self.name = 'name'
self.plain_text = 'not-encrypted'.decode('utf-8')
self.mime_type = 'text/plain'
self.secret_algorithm = 'algo'
self.secret_bit_length = 512
self.secret_cypher_type = 'cytype'
self.payload = b'not-encrypted'
self.payload_content_type = 'text/plain'
self.secret_algorithm = 'AES'
self.secret_bit_length = 256
self.secret_cypher_type = 'CBC'
self.secret_req = {'name': self.name,
'mime_type': self.mime_type,
'payload': self.payload,
'payload_content_type': self.payload_content_type,
'algorithm': self.secret_algorithm,
'bit_length': self.secret_bit_length,
'cypher_type': self.secret_cypher_type,
'plain_text': self.plain_text}
'cypher_type': self.secret_cypher_type}
self.json = json.dumps(self.secret_req)
self.keystone_id = 'keystone1234'
@ -153,16 +155,15 @@ class WhenCreatingSecretsUsingSecretsResource(unittest.TestCase):
args, kwargs = self.secret_repo.create_from.call_args
secret = args[0]
self.assertTrue(isinstance(secret, models.Secret))
self.assertIsInstance(secret, models.Secret)
self.assertEqual(secret.name, self.name)
self.assertEqual(secret.algorithm, self.secret_algorithm)
self.assertEqual(secret.bit_length, self.secret_bit_length)
self.assertEqual(secret.cypher_type, self.secret_cypher_type)
self.assertEqual(secret.mime_type, self.mime_type)
args, kwargs = self.tenant_secret_repo.create_from.call_args
tenant_secret = args[0]
self.assertTrue(isinstance(tenant_secret, models.TenantSecret))
self.assertIsInstance(tenant_secret, models.TenantSecret)
self.assertEqual(tenant_secret.tenant_id, self.tenant_entity_id)
self.assertEqual(tenant_secret.secret_id, secret.id)
@ -170,7 +171,7 @@ class WhenCreatingSecretsUsingSecretsResource(unittest.TestCase):
datum = args[0]
self.assertIsInstance(datum, models.EncryptedDatum)
self.assertEqual('cypher_text', datum.cypher_text)
self.assertEqual(self.mime_type, datum.mime_type)
self.assertEqual(self.payload_content_type, datum.content_type)
self.assertIsNotNone(datum.kek_metadata)
def test_should_add_new_secret_with_expiration(self):
@ -187,19 +188,19 @@ class WhenCreatingSecretsUsingSecretsResource(unittest.TestCase):
expected = expiration[:-6].replace('12', '17', 1)
self.assertEqual(expected, str(secret.expiration))
def test_should_add_new_secret_tenant_not_exist(self):
def test_should_add_new_secret_if_tenant_does_not_exist(self):
self.tenant_repo.get.return_value = None
self.resource.on_post(self.req, self.resp, self.keystone_id)
args, kwargs = self.secret_repo.create_from.call_args
secret = args[0]
self.assertTrue(isinstance(secret, models.Secret))
self.assertIsInstance(secret, models.Secret)
self.assertEqual(secret.name, self.name)
args, kwargs = self.tenant_secret_repo.create_from.call_args
tenant_secret = args[0]
self.assertTrue(isinstance(tenant_secret, models.TenantSecret))
self.assertIsInstance(tenant_secret, models.TenantSecret)
self.assertEqual(self.tenant_entity_id, tenant_secret.tenant_id)
self.assertEqual(secret.id, tenant_secret.secret_id)
@ -207,54 +208,52 @@ class WhenCreatingSecretsUsingSecretsResource(unittest.TestCase):
datum = args[0]
self.assertTrue(isinstance(datum, models.EncryptedDatum))
self.assertEqual('cypher_text', datum.cypher_text)
self.assertEqual(self.mime_type, datum.mime_type)
self.assertEqual(self.payload_content_type, datum.content_type)
self.assertIsNotNone(datum.kek_metadata)
def test_should_add_new_secret_no_plain_text(self):
json_template = u'{{"name":"{0}", "mime_type":"{1}"}}'
json = json_template.format(self.name, self.mime_type)
self.stream.read.return_value = json
def test_should_add_new_secret_metadata_without_payload(self):
self.stream.read.return_value = json.dumps({'name': self.name})
self.resource.on_post(self.req, self.resp, self.keystone_id)
args, kwargs = self.secret_repo.create_from.call_args
secret = args[0]
self.assertTrue(isinstance(secret, models.Secret))
self.assertIsInstance(secret, models.Secret)
self.assertEqual(secret.name, self.name)
args, kwargs = self.tenant_secret_repo.create_from.call_args
tenant_secret = args[0]
self.assertTrue(isinstance(tenant_secret, models.TenantSecret))
self.assertIsInstance(tenant_secret, models.TenantSecret)
self.assertEqual(tenant_secret.tenant_id, self.tenant_entity_id)
self.assertEqual(tenant_secret.secret_id, secret.id)
self.assertFalse(self.datum_repo.create_from.called)
def test_should_add_new_secret_plain_text_almost_too_large(self):
def test_should_add_new_secret_payload_almost_too_large(self):
big_text = ''.join(['A' for x
in xrange(DEFAULT_MAX_SECRET_BYTES - 10)])
self.secret_req = {'name': self.name,
'mime_type': self.mime_type,
'algorithm': self.secret_algorithm,
'bit_length': self.secret_bit_length,
'cypher_type': self.secret_cypher_type,
'plain_text': big_text}
self.stream.read.return_value = json.dumps(self.secret_req)
secret_req = {'name': self.name,
'algorithm': self.secret_algorithm,
'bit_length': self.secret_bit_length,
'cypher_type': self.secret_cypher_type,
'payload': big_text,
'payload_content_type': 'text/plain'}
self.stream.read.return_value = json.dumps(secret_req)
self.resource.on_post(self.req, self.resp, self.keystone_id)
def test_should_fail_due_to_plain_text_too_large(self):
def test_should_fail_due_to_payload_too_large(self):
big_text = ''.join(['A' for x
in xrange(DEFAULT_MAX_SECRET_BYTES + 10)])
self.secret_req = {'name': self.name,
'mime_type': self.mime_type,
'algorithm': self.secret_algorithm,
'bit_length': self.secret_bit_length,
'cypher_type': self.secret_cypher_type,
'plain_text': big_text}
self.stream.read.return_value = json.dumps(self.secret_req)
secret_req = {'name': self.name,
'algorithm': self.secret_algorithm,
'bit_length': self.secret_bit_length,
'cypher_type': self.secret_cypher_type,
'payload': big_text,
'payload_content_type': 'text/plain'}
self.stream.read.return_value = json.dumps(secret_req)
with self.assertRaises(falcon.HTTPError) as cm:
self.resource.on_post(self.req, self.resp, self.keystone_id)
@ -262,13 +261,28 @@ class WhenCreatingSecretsUsingSecretsResource(unittest.TestCase):
exception = cm.exception
self.assertEqual(falcon.HTTP_413, exception.status)
def test_should_fail_due_to_empty_plain_text(self):
def test_should_fail_due_to_empty_payload(self):
secret_req = {'name': self.name,
'algorithm': self.secret_algorithm,
'bit_length': self.secret_bit_length,
'cypher_type': self.secret_cypher_type,
'payload': '',
'payload_content_type': 'text/plain'}
self.stream.read.return_value = json.dumps(secret_req)
with self.assertRaises(falcon.HTTPError) as cm:
self.resource.on_post(self.req, self.resp, self.keystone_id)
exception = cm.exception
self.assertEqual(falcon.HTTP_400, exception.status)
def test_should_fail_due_to_unsupported_payload_content_type(self):
self.secret_req = {'name': self.name,
'mime_type': self.mime_type,
'payload_content_type': 'somethingbogushere',
'algorithm': self.secret_algorithm,
'bit_length': self.secret_bit_length,
'cypher_type': self.secret_cypher_type,
'plain_text': ''}
'payload': self.payload}
self.stream.read.return_value = json.dumps(self.secret_req)
with self.assertRaises(falcon.HTTPError) as cm:
@ -277,20 +291,67 @@ class WhenCreatingSecretsUsingSecretsResource(unittest.TestCase):
exception = cm.exception
self.assertEqual(falcon.HTTP_400, exception.status)
def test_should_fail_due_to_unsupported_mime(self):
self.secret_req = {'name': self.name,
'mime_type': 'somethingbogushere',
'algorithm': self.secret_algorithm,
'bit_length': self.secret_bit_length,
'cypher_type': self.secret_cypher_type,
'plain_text': self.plain_text}
self.stream.read.return_value = json.dumps(self.secret_req)
def test_create_secret_with_encoded_binary_payload(self):
self.stream.read.return_value = json.dumps(
{'name': self.name,
'algorithm': self.secret_algorithm,
'bit_length': self.secret_bit_length,
'cypher_type': self.secret_cypher_type,
'payload': 'lOtfqHaUUpe6NqLABgquYQ==',
'payload_content_type': 'application/octet-stream',
'payload_content_encoding': 'base64'}
)
with self.assertRaises(falcon.HTTPError) as cm:
self.resource.on_post(self.req, self.resp, self.keystone_id)
self.assertEqual(falcon.HTTP_201, self.resp.status)
args, kwargs = self.datum_repo.create_from.call_args
datum = args[0]
self.assertIsInstance(datum, models.EncryptedDatum)
self.assertEqual('cypher_text', datum.cypher_text)
self.assertEqual('application/octet-stream', datum.content_type)
self.assertIsNotNone(datum.kek_metadata)
def test_create_secret_fails_with_binary_payload_no_encoding(self):
self.stream.read.return_value = json.dumps(
{'name': self.name,
'algorithm': self.secret_algorithm,
'bit_length': self.secret_bit_length,
'cypher_type': self.secret_cypher_type,
'payload': 'lOtfqHaUUpe6NqLABgquYQ==',
'payload_content_type': 'application/octet-stream'}
)
with self.assertRaises(falcon.HTTPError):
self.resource.on_post(self.req, self.resp, self.keystone_id)
exception = cm.exception
self.assertEqual(falcon.HTTP_400, exception.status)
def test_create_secret_fails_with_binary_payload_bad_encoding(self):
self.stream.read.return_value = json.dumps(
{'name': self.name,
'algorithm': self.secret_algorithm,
'bit_length': self.secret_bit_length,
'cypher_type': self.secret_cypher_type,
'payload': 'lOtfqHaUUpe6NqLABgquYQ==',
'payload_content_type': 'application/octet-stream',
'payload_content_encoding': 'bogus64'}
)
with self.assertRaises(falcon.HTTPError):
self.resource.on_post(self.req, self.resp, self.keystone_id)
def test_create_secret_fails_with_binary_payload_no_content_type(self):
self.stream.read.return_value = json.dumps(
{'name': self.name,
'algorithm': self.secret_algorithm,
'bit_length': self.secret_bit_length,
'cypher_type': self.secret_cypher_type,
'payload': 'lOtfqHaUUpe6NqLABgquYQ==',
'payload_content_encoding': 'base64'}
)
with self.assertRaises(falcon.HTTPError):
self.resource.on_post(self.req, self.resp, self.keystone_id)
class WhenGettingSecretsListUsingSecretsResource(unittest.TestCase):
@ -298,18 +359,15 @@ class WhenGettingSecretsListUsingSecretsResource(unittest.TestCase):
self.tenant_id = 'tenant1234'
self.keystone_id = 'keystone1234'
self.name = 'name1234'
self.mime_type = 'text/plain'
self.secret_algorithm = "algo"
self.secret_bit_length = 512
self.secret_cypher_type = "cytype"
self.params = {'offset': 2, 'limit': 2}
self.secret_algorithm = "AES"
self.secret_bit_length = 256
self.secret_cypher_type = "CBC"
self.num_secrets = 10
self.offset = 2
self.limit = 2
secret_params = {'mime_type': self.mime_type,
'name': self.name,
secret_params = {'name': self.name,
'algorithm': self.secret_algorithm,
'bit_length': self.secret_bit_length,
'cypher_type': self.secret_cypher_type,
@ -340,7 +398,8 @@ class WhenGettingSecretsListUsingSecretsResource(unittest.TestCase):
self.req = MagicMock()
self.req.accept = 'application/json'
self.req._params = self.params
self.req.get_param = mock.Mock()
self.req.get_param.side_effect = [self.offset, self.limit]
self.resp = MagicMock()
self.resource = res.SecretsResource(self.crypto_mgr, self.tenant_repo,
self.secret_repo,
@ -352,10 +411,8 @@ class WhenGettingSecretsListUsingSecretsResource(unittest.TestCase):
self.secret_repo.get_by_create_date \
.assert_called_once_with(self.keystone_id,
offset_arg=self.params.get('offset',
self.offset),
limit_arg=self.params.get('limit',
self.limit),
offset_arg=self.offset,
limit_arg=self.limit,
suppress_exception=True)
resp_body = jsonutils.loads(self.resp.body)
@ -382,10 +439,8 @@ class WhenGettingSecretsListUsingSecretsResource(unittest.TestCase):
self.secret_repo.get_by_create_date \
.assert_called_once_with(self.keystone_id,
offset_arg=self.params.get('offset',
self.offset),
limit_arg=self.params.get('limit',
self.limit),
offset_arg=self.offset,
limit_arg=self.limit,
suppress_exception=True)
resp_body = jsonutils.loads(self.resp.body)
@ -408,22 +463,22 @@ class WhenGettingPuttingOrDeletingSecretUsingSecretResource(unittest.TestCase):
self.tenant_id = 'tenantid1234'
self.keystone_id = 'keystone1234'
self.name = 'name1234'
self.mime_type = 'text/plain'
secret_id = "idsecret1"
datum_id = "iddatum1"
self.secret_algorithm = "algo"
self.secret_bit_length = 512
self.secret_cypher_type = "cytype"
self.secret_algorithm = "AES"
self.secret_bit_length = 256
self.secret_cypher_type = "CBC"
self.datum = models.EncryptedDatum()
self.datum.id = datum_id
self.datum.secret_id = secret_id
self.datum.mime_type = self.mime_type
self.datum.content_type = "text/plain"
self.datum.cypher_text = "cypher_text"
self.datum.kek_metadata = "kekedata"
self.datum.kek_metadata = json.dumps({'plugin': 'TestCryptoPlugin'})
self.secret = create_secret(self.mime_type,
id=secret_id,
self.secret = create_secret(id=secret_id,
name=self.name,
algorithm=self.secret_algorithm,
bit_length=self.secret_bit_length,
@ -476,9 +531,10 @@ class WhenGettingPuttingOrDeletingSecretUsingSecretResource(unittest.TestCase):
self.assertEquals(self.resp.status, falcon.HTTP_200)
resp_body = jsonutils.loads(self.resp.body)
self.assertTrue('content_types' in resp_body)
self.assertTrue(self.datum.mime_type in
resp_body['content_types'].itervalues())
self.assertIn('content_types', resp_body)
self.assertIn(self.datum.content_type,
resp_body['content_types'].itervalues())
self.assertNotIn('mime_type', resp_body)
def test_should_get_secret_as_plain(self):
self.req.accept = 'text/plain'
@ -496,6 +552,29 @@ class WhenGettingPuttingOrDeletingSecretUsingSecretResource(unittest.TestCase):
resp_body = self.resp.body
self.assertIsNotNone(resp_body)
def test_should_get_secret_as_binary(self):
self.req.accept = 'application/octet-stream'
# mock Content-Encoding header
self.req.get_header.return_value = None
self.datum.content_type = 'application/octet-stream'
self.resource.on_get(self.req, self.resp, self.keystone_id,
self.secret.id)
self.assertEqual(self.resp.body, 'unencrypted_data')
def test_should_get_secret_as_encoded_binary(self):
encoded_data = base64.b64encode(b'unencrypted_data')
self.req.accept = 'application/octet-stream'
# mock Content-Encoding header
self.req.get_header.return_value = 'base64'
self.datum.content_type = 'application/octet-stream'
self.resource.on_get(self.req, self.resp, self.keystone_id,
self.secret.id)
self.assertEqual(self.resp.body, encoded_data)
def test_should_throw_exception_for_get_when_secret_not_found(self):
self.secret_repo.get.return_value = None
@ -516,6 +595,17 @@ class WhenGettingPuttingOrDeletingSecretUsingSecretResource(unittest.TestCase):
exception = cm.exception
self.assertEqual(falcon.HTTP_406, exception.status)
def test_should_throw_exeption_for_get_with_wrong_accept_encoding(self):
self.req.accept = 'application/octet-stream'
# mock Content-Encoding header
self.req.get_header.return_value = 'bogusencoding'
with self.assertRaises(falcon.HTTPError) as e:
self.resource.on_get(self.req, self.resp, self.keystone_id,
self.secret.id)
self.assertEqual(falcon.HTTP_406, e.exception.status)
def test_should_throw_exception_for_get_when_datum_not_available(self):
self.req.accept = 'text/plain'
self.secret.encrypted_data = []
@ -533,20 +623,52 @@ class WhenGettingPuttingOrDeletingSecretUsingSecretResource(unittest.TestCase):
self.resource.on_put(self.req, self.resp, self.keystone_id,
self.secret.id)
self.assertEquals(self.resp.status, falcon.HTTP_200)
self.assertEqual(self.resp.status, falcon.HTTP_200)
args, kwargs = self.datum_repo.create_from.call_args
datum = args[0]
self.assertTrue(isinstance(datum, models.EncryptedDatum))
self.assertIsInstance(datum, models.EncryptedDatum)
self.assertEqual('cypher_text', datum.cypher_text)
self.assertEqual(self.mime_type, datum.mime_type)
self.assertIsNotNone(datum.kek_metadata)
def test_should_put_secret_as_binary(self):
self._setup_for_puts()
self.req.content_type = 'application/octet-stream'
self.resource.on_put(self.req, self.resp, self.keystone_id,
self.secret.id)
self.assertEqual(self.resp.status, falcon.HTTP_200)
args, kwargs = self.datum_repo.create_from.call_args
datum = args[0]
self.assertIsInstance(datum, models.EncryptedDatum)
def test_should_put_encoded_secret_as_binary(self):
self._setup_for_puts()
self.stream.read.return_value = base64.b64encode(self.payload)
self.req.content_type = 'application/octet-stream'
# mock Content-Encoding header
self.req.get_header.return_value = 'base64'
self.resource.on_put(self.req, self.resp, self.keystone_id,
self.secret.id)
self.assertEqual(self.resp.status, falcon.HTTP_200)
def test_should_fail_to_put_secret_with_unsupported_encoding(self):
self._setup_for_puts()
self.req.content_type = 'application/octet-stream'
# mock Content-Encoding header
self.req.get_header.return_value = 'bogusencoding'
with self.assertRaises(falcon.HTTPError):
self.resource.on_put(self.req, self.resp, self.keystone_id,
self.secret.id)
def test_should_fail_put_secret_as_json(self):
self._setup_for_puts()
# Force error, as content_type of PUT doesn't match
# the secret's mime-type.
self.req.content_type = 'application/json'
with self.assertRaises(falcon.HTTPError) as cm:
@ -569,7 +691,7 @@ class WhenGettingPuttingOrDeletingSecretUsingSecretResource(unittest.TestCase):
exception = cm.exception
self.assertEqual(falcon.HTTP_404, exception.status)
def test_should_fail_put_secret_no_plain_text(self):
def test_should_fail_put_secret_no_payload(self):
self._setup_for_puts()
# Force error due to no data passed in the request.
@ -595,7 +717,7 @@ class WhenGettingPuttingOrDeletingSecretUsingSecretResource(unittest.TestCase):
exception = cm.exception
self.assertEqual(falcon.HTTP_409, exception.status)
def test_should_fail_due_to_empty_plain_text(self):
def test_should_fail_due_to_empty_payload(self):
self._setup_for_puts()
self.stream.read.return_value = ''
@ -640,9 +762,11 @@ class WhenGettingPuttingOrDeletingSecretUsingSecretResource(unittest.TestCase):
self.assertEqual(falcon.HTTP_404, exception.status)
def _setup_for_puts(self):
self.plain_text = "plain_text"
self.req.accept = self.mime_type
self.req.content_type = self.mime_type
self.payload = "plain_text"
self.req.accept = "text/plain"
self.req.content_type = "text/plain"
# mock Content-Encoding header
self.req.get_header.return_value = None
self.secret.encrypted_data = []
@ -650,7 +774,7 @@ class WhenGettingPuttingOrDeletingSecretUsingSecretResource(unittest.TestCase):
self.tenant_secret_repo.create_from.return_value = None
self.stream = MagicMock()
self.stream.read.return_value = self.plain_text
self.stream.read.return_value = self.payload
self.req.stream = self.stream
@ -745,8 +869,7 @@ class WhenGettingOrdersListUsingOrdersResource(unittest.TestCase):
self.offset = 2
self.limit = 2
order_params = {'mime_type': self.mime_type,
'name': self.name,
order_params = {'name': self.name,
'algorithm': self.secret_algorithm,
'bit_length': self.secret_bit_length,
'cypher_type': self.secret_cypher_type}
@ -834,8 +957,7 @@ class WhenGettingOrDeletingOrderUsingOrderResource(unittest.TestCase):
self.tenant_keystone_id = 'keystoneid1234'
self.requestor = 'requestor1234'
self.order = create_order(id="id1", name="name",
mime_type="name")
self.order = create_order(id="id1", name="name")
self.order_repo = MagicMock()
self.order_repo.get.return_value = self.order

View File

View File

@ -0,0 +1,52 @@
# Copyright (c) 2013 Rackspace, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import unittest
import mock
from barbican.common import utils
class WhenTestingAcceptEncodingGetter(unittest.TestCase):
def setUp(self):
self.req = mock.Mock()
def test_parses_accept_encoding_header(self):
self.req.get_header.return_value = '*'
ae = utils.get_accepted_encodings(self.req)
self.req.get_header.assert_called_once_with('Accept-Encoding')
self.assertEqual(ae, ['*'])
def test_returns_none_for_empty_encoding(self):
self.req.get_header.return_value = None
ae = utils.get_accepted_encodings(self.req)
self.assertIsNone(ae)
def test_parses_single_accept_with_quality_value(self):
self.req.get_header.return_value = 'base64;q=0.7'
ae = utils.get_accepted_encodings(self.req)
self.assertEqual(ae, ['base64'])
def test_parses_more_than_one_encoding(self):
self.req.get_header.return_value = 'base64, gzip'
ae = utils.get_accepted_encodings(self.req)
self.assertEqual(ae, ['base64', 'gzip'])
def test_can_sort_by_quality_value(self):
self.req.get_header.return_value = 'base64;q=0.5, gzip;q=0.6, compress'
ae = utils.get_accepted_encodings(self.req)
self.assertEqual(ae, ['compress', 'gzip', 'base64'])

View File

@ -31,18 +31,18 @@ class WhenTestingSecretValidator(unittest.TestCase):
def setUp(self):
self.name = 'name'
self.plain_text = 'not-encrypted'.decode('utf-8')
self.mime_type = 'text/plain'
self.payload = b'not-encrypted'
self.payload_content_type = 'text/plain'
self.secret_algorithm = 'algo'
self.secret_bit_length = 512
self.secret_cypher_type = 'cytype'
self.secret_req = {'name': self.name,
'mime_type': self.mime_type,
'payload_content_type': self.payload_content_type,
'algorithm': self.secret_algorithm,
'bit_length': self.secret_bit_length,
'cypher_type': self.secret_cypher_type,
'plain_text': self.plain_text}
'payload': self.payload}
self.validator = validators.NewSecretValidator()
@ -57,17 +57,17 @@ class WhenTestingSecretValidator(unittest.TestCase):
self.secret_req['name'] = ' '
self.validator.validate(self.secret_req)
def test_should_validate_no_plain_text(self):
del self.secret_req['plain_text']
def test_should_validate_no_payload(self):
del self.secret_req['payload']
result = self.validator.validate(self.secret_req)
self.assertFalse('plain_text' in result)
self.assertFalse('payload' in result)
def test_should_validate_plain_text_with_whitespace(self):
self.secret_req['plain_text'] = ' ' + self.plain_text + ' '
def test_should_validate_payload_with_whitespace(self):
self.secret_req['payload'] = ' ' + self.payload + ' '
result = self.validator.validate(self.secret_req)
self.assertEqual(self.plain_text, result['plain_text'])
self.assertEqual(self.payload, result['payload'])
def test_should_validate_future_expiration(self):
self.secret_req['expiration'] = '2114-02-28T19:14:44.180394'
@ -128,24 +128,6 @@ class WhenTestingSecretValidator(unittest.TestCase):
exception = e.exception
self.assertTrue('name' in str(exception))
def test_should_fail_no_mime(self):
del self.secret_req['mime_type']
with self.assertRaises(excep.InvalidObject) as e:
self.validator.validate(self.secret_req)
exception = e.exception
self.assertTrue('mime_type' in str(exception))
def test_should_fail_invalid_mime(self):
self.secret_req['mime_type'] = 'application/octet-stream'
with self.assertRaises(excep.InvalidObject) as e:
self.validator.validate(self.secret_req)
exception = e.exception
self.assertTrue('plain_text' in str(exception))
def test_should_fail_negative_bit_length(self):
self.secret_req['bit_length'] = -23
@ -164,23 +146,14 @@ class WhenTestingSecretValidator(unittest.TestCase):
exception = e.exception
self.assertTrue('bit_length' in str(exception))
def test_should_fail_empty_plain_text(self):
self.secret_req['plain_text'] = ' '
def test_validation_should_fail_with_empty_payload(self):
self.secret_req['payload'] = ' '
with self.assertRaises(excep.InvalidObject) as e:
self.validator.validate(self.secret_req)
exception = e.exception
self.assertTrue('plain_text' in str(exception))
def test_should_fail_bad_mime(self):
self.secret_req['mime_type'] = 'badmime'
with self.assertRaises(excep.InvalidObject) as e:
self.validator.validate(self.secret_req)
exception = e.exception
self.assertTrue('mime_type' in str(exception))
self.assertTrue('payload' in str(exception))
def test_should_fail_already_expired(self):
self.secret_req['expiration'] = '2004-02-28T19:14:44.180394'
@ -206,36 +179,53 @@ class WhenTestingSecretValidator(unittest.TestCase):
'bit_length': None,
'cypher_type': None}
with self.assertRaises(excep.InvalidObject) as e:
with self.assertRaises(excep.InvalidObject):
self.validator.validate(self.secret_req)
exception = e.exception
self.assertTrue('mime_type' in str(exception))
def test_should_fail_all_empties(self):
self.secret_req = {'name': '',
'algorithm': '',
'bit_length': '',
'cypher_type': ''}
with self.assertRaises(excep.InvalidObject) as e:
with self.assertRaises(excep.InvalidObject):
self.validator.validate(self.secret_req)
exception = e.exception
self.assertTrue('mime_type' in str(exception))
def test_should_fail_no_payload_content_type(self):
del self.secret_req['payload_content_type']
with self.assertRaises(excep.InvalidObject):
self.validator.validate(self.secret_req)
def test_should_fail_with_plain_text_and_encoding(self):
self.secret_req['payload_content_encoding'] = 'base64'
with self.assertRaises(excep.InvalidObject):
self.validator.validate(self.secret_req)
def test_should_fail_with_wrong_encoding(self):
self.secret_req['payload_content_type'] = 'application/octet-stream'
self.secret_req['payload_content_encoding'] = 'unsupported'
with self.assertRaises(excep.InvalidObject):
self.validator.validate(self.secret_req)
def test_should_validate_with_wrong_encoding(self):
self.secret_req['payload_content_type'] = 'application/octet-stream'
self.secret_req['payload_content_encoding'] = 'base64'
self.validator.validate(self.secret_req)
class WhenTestingOrderValidator(unittest.TestCase):
def setUp(self):
self.name = 'name'
self.mime_type = 'application/octet-stream'
self.secret_algorithm = 'aes'
self.secret_bit_length = 128
self.secret_cypher_type = 'cbc'
self.secret_req = {'name': self.name,
'mime_type': self.mime_type,
'algorithm': self.secret_algorithm,
'bit_length': self.secret_bit_length,
'cypher_type': self.secret_cypher_type}
@ -294,51 +284,6 @@ class WhenTestingOrderValidator(unittest.TestCase):
exception = e.exception
self.assertTrue('cypher_type' in str(exception))
def test_should_fail_no_mime(self):
del self.secret_req['mime_type']
with self.assertRaises(excep.InvalidObject) as e:
self.validator.validate(self.order_req)
exception = e.exception
self.assertTrue('mime_type' in str(exception))
def test_should_fail_bad_mime(self):
self.secret_req['mime_type'] = 'badmimehere'
with self.assertRaises(excep.InvalidObject) as e:
self.validator.validate(self.order_req)
exception = e.exception
self.assertTrue('mime_type' in str(exception))
def test_should_fail_wrong_mime(self):
self.secret_req['mime_type'] = 'text/plain'
with self.assertRaises(excep.UnsupportedField) as e:
self.validator.validate(self.order_req)
exception = e.exception
self.assertTrue('mime_type' in str(exception))
def test_should_fail_bad_mime_empty(self):
self.secret_req['mime_type'] = ''
with self.assertRaises(excep.InvalidObject) as e:
self.validator.validate(self.order_req)
exception = e.exception
self.assertTrue('mime_type' in str(exception))
def test_should_fail_bad_mime_whitespace(self):
self.secret_req['mime_type'] = ' '
with self.assertRaises(excep.InvalidObject) as e:
self.validator.validate(self.order_req)
exception = e.exception
self.assertTrue('mime_type' in str(exception))
def test_should_fail_negative_bit_length(self):
self.secret_req['bit_length'] = -23
@ -364,14 +309,14 @@ class WhenTestingOrderValidator(unittest.TestCase):
exception = e.exception
self.assertTrue('secret' in str(exception))
def test_should_fail_plain_text_provided(self):
self.secret_req['plain_text'] = ' '
def test_should_fail_payload_provided(self):
self.secret_req['payload'] = ' '
with self.assertRaises(excep.InvalidObject) as e:
self.validator.validate(self.order_req)
exception = e.exception
self.assertTrue('plain_text' in str(exception))
self.assertTrue('payload' in str(exception))
def test_should_fail_already_expired(self):
self.secret_req['expiration'] = '2004-02-28T19:14:44.180394'
@ -398,12 +343,9 @@ class WhenTestingOrderValidator(unittest.TestCase):
'cypher_type': None}
self.order_req = {'secret': self.secret_req}
with self.assertRaises(excep.InvalidObject) as e:
with self.assertRaises(excep.InvalidObject):
self.validator.validate(self.order_req)
exception = e.exception
self.assertTrue('mime_type' in str(exception))
def test_should_fail_all_empties(self):
self.secret_req = {'name': '',
'algorithm': '',
@ -411,12 +353,9 @@ class WhenTestingOrderValidator(unittest.TestCase):
'cypher_type': ''}
self.order_req = {'secret': self.secret_req}
with self.assertRaises(excep.InvalidObject) as e:
with self.assertRaises(excep.InvalidObject):
self.validator.validate(self.order_req)
exception = e.exception
self.assertTrue('mime_type' in str(exception))
if __name__ == '__main__':
unittest.main()

View File

@ -30,13 +30,14 @@ class TestCryptoPlugin(CryptoPluginBase):
return cypher_text, kek_metadata
def decrypt(self, encrypted, kek_metadata, tenant):
return 'plain-data'
return b'unencrypted_data'
def create(self, algorithm, bit_length):
return "insecure_key"
def supports(self, secret_type):
return secret_type == 'text/plain'
def supports(self, kek_metadata):
metadata = json.loads(kek_metadata)
return metadata['plugin'] == 'TestCryptoPlugin'
class WhenTestingSimpleCryptoPlugin(unittest.TestCase):
@ -104,3 +105,19 @@ class WhenTestingSimpleCryptoPlugin(unittest.TestCase):
def test_create_unsupported_bit_key(self):
with self.assertRaises(ValueError):
self.plugin.create("aes", 129)
def test_supports_decoding_metadata(self):
kek_metadata = json.dumps({
'plugin': 'SimpleCryptoPlugin',
'encryption': 'aes-128-cbc',
'kek': 'kek_id'
})
self.assertTrue(self.plugin.supports(kek_metadata))
def test_does_not_support_decoding_metadata(self):
kek_metadata = json.dumps({
'plugin': 'MuchFancierPlugin',
'encryption': 'aes-128-cbc',
'kek': 'kek_id'
})
self.assertFalse(self.plugin.supports(kek_metadata))

View File

@ -15,22 +15,22 @@
import unittest
from barbican.model.models import Secret
from barbican.model import models
class WhenCreatingNewSecret(unittest.TestCase):
def setUp(self):
self.parsed_body = {'name': 'name',
'mime_type': 'text/plain',
'algorithm': 'algorithm',
'bit_length': 512,
'cypher_type': 'cypher_type',
'plain_text': 'not-encrypted'}
self.parsed_secret = {'name': 'name',
'algorithm': 'algorithm',
'bit_length': 512,
'cypher_type': 'cypher_type',
'plain_text': 'not-encrypted'}
self.parsed_order = {'secret': self.parsed_secret}
def test_new_secret_is_created_from_dict(self):
secret = Secret(self.parsed_body)
self.assertEqual(secret.name, self.parsed_body['name'])
self.assertEqual(secret.mime_type, self.parsed_body['mime_type'])
self.assertEqual(secret.algorithm, self.parsed_body['algorithm'])
self.assertEqual(secret.bit_length, self.parsed_body['bit_length'])
self.assertEqual(secret.cypher_type, self.parsed_body['cypher_type'])
secret = models.Secret(self.parsed_secret)
self.assertEqual(secret.name, self.parsed_secret['name'])
self.assertEqual(secret.algorithm, self.parsed_secret['algorithm'])
self.assertEqual(secret.bit_length, self.parsed_secret['bit_length'])
self.assertEqual(secret.cypher_type, self.parsed_secret['cypher_type'])

View File

@ -13,9 +13,10 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from mock import MagicMock
import unittest
from mock import MagicMock
from barbican.crypto.extension_manager import CryptoExtensionManager
from barbican.tasks.resources import BeginOrder
from barbican.model.models import (Tenant, Secret, TenantSecret,
@ -39,10 +40,9 @@ class WhenBeginningOrder(unittest.TestCase):
self.order.requestor = self.requestor
self.secret_name = "name"
self.secret_algorithm = "algo"
self.secret_bit_length = 512
self.secret_cypher_type = "cytype"
self.secret_mime_type = "text/plain"
self.secret_algorithm = "AES"
self.secret_bit_length = 256
self.secret_cypher_type = "CBC"
self.secret_expiration = timeutils.utcnow()
self.keystone_id = 'keystone1234'
@ -60,7 +60,6 @@ class WhenBeginningOrder(unittest.TestCase):
self.order.secret_bit_length = self.secret_bit_length
self.order.secret_cypher_type = self.secret_cypher_type
self.order.secret_expiration = self.secret_expiration
self.order.secret_mime_type = self.secret_mime_type
self.order_repo = MagicMock()
self.order_repo.get.return_value = self.order
@ -90,26 +89,25 @@ class WhenBeginningOrder(unittest.TestCase):
self.order_repo.get \
.assert_called_once_with(entity_id=self.order.id,
keystone_id=self.keystone_id)
assert self.order.status == States.ACTIVE
self.assertEqual(self.order.status, States.ACTIVE)
args, kwargs = self.secret_repo.create_from.call_args
secret = args[0]
assert isinstance(secret, Secret)
assert secret.name == self.secret_name
assert secret.expiration == self.secret_expiration
self.assertIsInstance(secret, Secret)
self.assertEqual(secret.name, self.secret_name)
self.assertEqual(secret.expiration, self.secret_expiration)
args, kwargs = self.tenant_secret_repo.create_from.call_args
tenant_secret = args[0]
assert isinstance(tenant_secret, TenantSecret)
assert tenant_secret.tenant_id == self.tenant_id
assert tenant_secret.secret_id == secret.id
self.assertIsInstance(tenant_secret, TenantSecret)
self.assertEqual(tenant_secret.tenant_id, self.tenant_id)
self.assertEqual(tenant_secret.secret_id, secret.id)
args, kwargs = self.datum_repo.create_from.call_args
datum = args[0]
self.assertIsInstance(datum, EncryptedDatum)
self.assertEqual(self.secret_mime_type, datum.mime_type)
assert datum.cypher_text is not None
assert datum.kek_metadata is not None
self.assertIsNotNone(datum.cypher_text)
self.assertIsNotNone(datum.kek_metadata)
if __name__ == '__main__':