Standardized Secret Encoding

Defined the different types a secret can be as well as the encoding
for the secret. A secret can now be one of symmetric, public, private,
certificate, passphrase, or opaque. Each secret type now a specific
encoding that is defined.

The Secrets REST API has been updated to allow a secret type to be
specified when storing a secret.

store_crypto is now storing public and private keys in Distinguished
Encoding Rules (DER) format using the simple_crypto plugin. This makes
it consistant with the rest of the keys that are stored in binary
format. It also makes the store_crypto get_secret call easier because
it can assume that it needs to base64 encode everything before
returning it to the caller.

All secrets returned from the secret stores are to be base64 encoded.
Updated the translations module to account for this.

Co-authored-by: Kaitlin Farr <kaitlin.farr@jhuapl.edu>
Change-Id: I5e91c16d0031b3b4791e9dcee4aa48fcc821cf66
Implements: blueprint content-types
This commit is contained in:
Nathan Reller
2015-02-12 17:21:42 -05:00
parent 707c037ab2
commit 7b1d25fae1
19 changed files with 881 additions and 132 deletions

View File

@@ -26,7 +26,9 @@ from barbican.common import utils
from barbican import i18n as u
from barbican.model import models
from barbican.openstack.common import timeutils
from barbican.plugin.interface import secret_store
from barbican.plugin.util import mime_types
from barbican.plugin.util import translations
LOG = utils.getLogger(__name__)
@@ -130,6 +132,16 @@ class NewSecretValidator(ValidatorBase):
},
"expiration": {"type": "string", "maxLength": 255},
"payload": {"type": "string"},
"secret_type": {
"type": "string",
"maxLength": 80,
"enum": [secret_store.SecretType.SYMMETRIC,
secret_store.SecretType.PASSPHRASE,
secret_store.SecretType.PRIVATE,
secret_store.SecretType.PUBLIC,
secret_store.SecretType.CERTIFICATE,
secret_store.SecretType.OPAQUE]
},
"payload_content_type": {"type": "string", "maxLength": 255},
"payload_content_encoding": {
"type": "string",
@@ -178,8 +190,13 @@ class NewSecretValidator(ValidatorBase):
"payload")
if content_type:
self._validate_payload_content_type_is_supported(content_type,
schema_name)
self._assert_validity(
mime_types.is_supported(content_type),
schema_name,
u._("payload_content_type is not one of {supported}"
).format(supplied=content_type,
supported=mime_types.SUPPORTED),
"payload_content_type")
return json_data
@@ -233,39 +250,32 @@ class NewSecretValidator(ValidatorBase):
"be supplied."),
"payload_content_type")
self._validate_payload_content_type_is_supported(content_type,
schema_name)
if content_type == 'application/octet-stream':
self._assert_validity(
content_encoding is not None,
schema_name,
u._("payload_content_encoding must be specified when "
"payload_content_type is application/octet-stream."),
"payload_content_encoding")
if content_type.startswith('text/plain'):
self._assert_validity(
content_encoding is None,
schema_name,
u._("payload_content_encoding must not be specified when "
"payload_content_type is text/plain"),
"payload_content_encoding")
def _validate_payload_content_type_is_supported(self, content_type,
schema_name):
self._assert_validity(
content_type.lower() in mime_types.SUPPORTED,
mime_types.is_supported(content_type),
schema_name,
u._("payload_content_type is not one of {supported}").format(
supported=mime_types.SUPPORTED),
u._("payload_content_type {supplied} is not one of {supported}"
).format(supplied=content_type,
supported=mime_types.SUPPORTED),
"payload_content_type")
self._assert_validity(
mime_types.is_content_type_with_encoding_supported(
content_type,
content_encoding),
schema_name,
u._("payload_content_encoding is not one of {supported}").format(
supported=mime_types.get_supported_encodings(content_type)),
"payload_content_encoding")
def _validate_payload_by_content_encoding(self, payload_content_encoding,
payload, schema_name):
if payload_content_encoding == 'base64':
try:
base64.b64decode(payload)
secret_payload = payload
if translations.is_pem_payload(payload):
pems = translations.get_pem_components(payload)
secret_payload = pems[1]
base64.b64decode(secret_payload)
except TypeError:
LOG.exception("Problem parsing payload")
raise exception.InvalidObject(schema=schema_name,

View File

@@ -170,10 +170,10 @@ class SimpleCryptoPlugin(c.CryptoPluginBase):
def _wrap_key(self, public_key, private_key,
passphrase):
pkcs = 8
key_wrap_format = 'PEM'
key_wrap_format = 'DER'
private_key = private_key.exportKey(key_wrap_format, passphrase, pkcs)
public_key = public_key.exportKey()
public_key = public_key.exportKey(key_wrap_format)
return public_key, private_key
@@ -182,16 +182,12 @@ class SimpleCryptoPlugin(c.CryptoPluginBase):
pub_seq = asn1.DerSequence()
pub_seq[:] = [0, public_key.p, public_key.q,
public_key.g, public_key.y]
public_key = ("-----BEGIN DSA PUBLIC KEY-----\n{0}"
"-----END DSA PUBLIC KEY-----"
.format(pub_seq.encode().encode("base64")))
public_key = pub_seq.encode()
prv_seq = asn1.DerSequence()
prv_seq[:] = [0, private_key.p, private_key.q,
private_key.g, private_key.y, private_key.x]
private_key = ("-----BEGIN DSA PRIVATE KEY-----\n{0}"
"-----END DSA PRIVATE KEY-----"
.format(prv_seq.encode().encode("base64")))
private_key = prv_seq.encode()
return public_key, private_key

View File

@@ -117,7 +117,6 @@ class DogtagKRAPlugin(sstore.SecretStoreBase):
ALG = "alg"
BIT_LENGTH = "bit_length"
KEY_ID = "key_id"
SECRET_TYPE = "secret_type"
SECRET_MODE = "secret_mode"
PASSPHRASE_KEY_ID = "passphrase_key_id"
CONVERT_TO_PEM = "convert_to_pem"
@@ -199,7 +198,7 @@ class DogtagKRAPlugin(sstore.SecretStoreBase):
self._store_secret_attributes(meta_dict, secret_dto)
return meta_dict
def get_secret(self, secret_metadata):
def get_secret(self, secret_type, secret_metadata):
"""Retrieve a secret from the KRA
The secret_metadata is simply the dict returned by a store_secret() or
@@ -233,7 +232,6 @@ class DogtagKRAPlugin(sstore.SecretStoreBase):
PEM format, is returned
"""
key_id = secret_metadata[DogtagKRAPlugin.KEY_ID]
secret_type = secret_metadata.get(DogtagKRAPlugin.SECRET_TYPE, None)
key_spec = sstore.KeySpec(
alg=secret_metadata.get(DogtagKRAPlugin.ALG, None),
@@ -243,10 +241,11 @@ class DogtagKRAPlugin(sstore.SecretStoreBase):
)
passphrase = self._get_passphrase_for_a_private_key(
secret_metadata, key_spec)
secret_type, secret_metadata, key_spec)
recovered_key = None
twsk = DogtagKRAPlugin._get_trans_wrapped_session_key(secret_metadata)
twsk = DogtagKRAPlugin._get_trans_wrapped_session_key(secret_type,
secret_metadata)
if DogtagKRAPlugin.CONVERT_TO_PEM in secret_metadata:
# Case for returning the asymmetric keys generated in KRA.
@@ -367,7 +366,6 @@ class DogtagKRAPlugin(sstore.SecretStoreBase):
return {DogtagKRAPlugin.ALG: key_spec.alg,
DogtagKRAPlugin.BIT_LENGTH: key_spec.bit_length,
DogtagKRAPlugin.SECRET_MODE: key_spec.mode,
DogtagKRAPlugin.SECRET_TYPE: sstore.SecretType.SYMMETRIC,
DogtagKRAPlugin.KEY_ID: response.get_key_id()}
def generate_asymmetric_key(self, key_spec):
@@ -411,7 +409,6 @@ class DogtagKRAPlugin(sstore.SecretStoreBase):
public_key_metadata = {
DogtagKRAPlugin.ALG: key_spec.alg,
DogtagKRAPlugin.BIT_LENGTH: key_spec.bit_length,
DogtagKRAPlugin.SECRET_TYPE: sstore.SecretType.PUBLIC,
DogtagKRAPlugin.KEY_ID: response.get_key_id(),
DogtagKRAPlugin.CONVERT_TO_PEM: "true"
}
@@ -419,7 +416,6 @@ class DogtagKRAPlugin(sstore.SecretStoreBase):
private_key_metadata = {
DogtagKRAPlugin.ALG: key_spec.alg,
DogtagKRAPlugin.BIT_LENGTH: key_spec.bit_length,
DogtagKRAPlugin.SECRET_TYPE: sstore.SecretType.PRIVATE,
DogtagKRAPlugin.KEY_ID: response.get_key_id(),
DogtagKRAPlugin.CONVERT_TO_PEM: "true"
}
@@ -489,12 +485,10 @@ class DogtagKRAPlugin(sstore.SecretStoreBase):
meta_dict[DogtagKRAPlugin.BIT_LENGTH] = key_spec.bit_length
if key_spec.mode is not None:
meta_dict[DogtagKRAPlugin.SECRET_MODE] = key_spec.mode
if secret_dto.type is not None:
meta_dict[DogtagKRAPlugin.SECRET_TYPE] = secret_dto.type
def _get_passphrase_for_a_private_key(self, secret_metadata, key_spec):
def _get_passphrase_for_a_private_key(self, secret_type, secret_metadata,
key_spec):
"""Retrieve the passphrase for the private key stored in the KRA."""
secret_type = secret_metadata.get(DogtagKRAPlugin.SECRET_TYPE, None)
if secret_type is None:
return None
if key_spec.alg is None:
@@ -520,9 +514,8 @@ class DogtagKRAPlugin(sstore.SecretStoreBase):
return passphrase
@staticmethod
def _get_trans_wrapped_session_key(secret_metadata):
def _get_trans_wrapped_session_key(secret_type, secret_metadata):
twsk = secret_metadata.get('trans_wrapped_session_key', None)
secret_type = secret_metadata.get(DogtagKRAPlugin.SECRET_TYPE, None)
if secret_type in [sstore.SecretType.PUBLIC,
sstore.SecretType.PRIVATE]:
if twsk:

View File

@@ -302,7 +302,7 @@ class SecretDTO(object):
secret has been encrypted using a transport key. The transport
key is a base64 encoded x509 transport certificate.
"""
self.type = type
self.type = type or SecretType.OPAQUE
self.secret = secret
self.key_spec = key_spec
self.content_type = content_type
@@ -386,7 +386,7 @@ class SecretStoreBase(object):
raise NotImplementedError # pragma: no cover
@abc.abstractmethod
def get_secret(self, secret_metadata):
def get_secret(self, secret_type, secret_metadata):
"""Retrieves a secret from the secret store.
Retrieves a secret from the secret store and returns a SecretDTO that
@@ -396,6 +396,12 @@ class SecretStoreBase(object):
generate or store methods. This data is used by the plugins to retrieve
the key.
The secret_type parameter may be useful for secret stores to know the
expected format of the secret. For instance if the type is
SecretDTO.PRIVATE then a PKCS8 structure is returned. This way secret
stores do not need to manage the secret type on their own.
:param secret_type: secret type
:param secret_metadata: secret metadata
:returns: SecretDTO that contains secret
"""

View File

@@ -250,9 +250,10 @@ class KMIPSecretStore(ss.SecretStoreBase):
self.client.close()
LOG.debug("Closed connection to KMIP client for secret storage")
def get_secret(self, secret_metadata):
def get_secret(self, secret_type, secret_metadata):
"""Gets a secret
:param secret_type: secret type
:param secret_metadata: Dictionary of key metadata, requires:
{'key_uuid': <uuid of key>}
:returns: SecretDTO of the retrieved Secret
@@ -273,9 +274,6 @@ class KMIPSecretStore(ss.SecretStoreBase):
if result.result_status.enum == enums.ResultStatus.SUCCESS:
secret_block = result.secret.key_block
secret_type = self._map_type_kmip_to_ss(
result.object_type.enum)
key_value_type = type(secret_block.key_value.key_material)
if key_value_type == kmip_objects.KeyMaterialStruct:
secret_value = self._convert_byte_array_to_base64(
@@ -464,19 +462,6 @@ class KMIPSecretStore(ss.SecretStoreBase):
else:
return None
def _map_type_kmip_to_ss(self, object_type):
"""Map KMIP type enum to SecretType enum
Returns None if the type is not supported. The KMIP plugin only
supports symmetric keys for now.
:param object_type: KMIP type enum
:returns: SecretType enum if type is supported, None if not supported
"""
if object_type == enums.ObjectType.SYMMETRIC_KEY:
return ss.SecretType.SYMMETRIC
else:
return None
def _map_algorithm_ss_to_kmip(self, algorithm):
"""Map SecretStore enum value to the KMIP algorithm enum

View File

@@ -101,17 +101,12 @@ def store_secret(unencrypted_raw, content_type_raw, content_encoding,
key_spec=key_spec, plugin_name=plugin_name)
# Normalize inputs prior to storage.
# TODO(john-wood-w) Normalize all secrets to base64, so we don't have to
# pass in 'content' type to the store_secret() call below.
unencrypted, content_type = tr.normalize_before_encryption(
unencrypted_raw, content_type_raw, content_encoding,
enforce_text_only=True)
secret_model.secret_type, enforce_text_only=True)
# Store the secret securely.
secret_type = None
if key_spec is not None:
secret_type = secret_store.KeyAlgorithm().get_secret_type(key_spec.alg)
secret_dto = secret_store.SecretDTO(type=secret_type,
secret_dto = secret_store.SecretDTO(type=secret_model.secret_type,
secret=unencrypted,
key_spec=key_spec,
content_type=content_type,
@@ -181,6 +176,7 @@ def generate_secret(spec, content_type, project_model):
# Create secret model to eventually save metadata to.
secret_model = models.Secret(spec)
secret_model['secret_type'] = secret_store.SecretType.SYMMETRIC
# Generate the secret.
secret_metadata = _generate_symmetric_key(
@@ -206,9 +202,14 @@ def generate_asymmetric_secret(spec, content_type, project_model):
# Create secret models to eventually save metadata to.
private_secret_model = models.Secret(spec)
private_secret_model['secret_type'] = secret_store.SecretType.PRIVATE
public_secret_model = models.Secret(spec)
public_secret_model['secret_type'] = secret_store.SecretType.PUBLIC
passphrase_secret_model = (models.Secret(spec)
if spec.get('passphrase') else None)
if passphrase_secret_model:
passphrase_type = secret_store.SecretType.PASSPHRASE
passphrase_secret_model['secret_type'] = passphrase_type
# Generate the secret.
asymmetric_meta_dto = _generate_asymmetric_key(
@@ -319,9 +320,12 @@ def _get_secret(retrieve_plugin, secret_metadata, secret_model, project_model):
context = store_crypto.StoreCryptoContext(
project_model,
secret_model=secret_model)
secret_dto = retrieve_plugin.get_secret(secret_metadata, context)
secret_dto = retrieve_plugin.get_secret(secret_model.secret_type,
secret_metadata,
context)
else:
secret_dto = retrieve_plugin.get_secret(secret_metadata)
secret_dto = retrieve_plugin.get_secret(secret_model.secret_type,
secret_metadata)
return secret_dto

View File

@@ -21,6 +21,7 @@ from barbican.model import repositories
from barbican.plugin.crypto import crypto
from barbican.plugin.crypto import manager
from barbican.plugin.interface import secret_store as sstore
from barbican.plugin.util import translations
CONF = cfg.CONF
@@ -82,7 +83,14 @@ class StoreCryptoAdapterPlugin(object):
encrypting_plugin, context.project_model)
# Secrets are base64 encoded before being passed to the secret stores.
normalized_secret = base64.b64decode(secret_dto.secret)
normalized_secret = secret_dto.secret
secret_type = secret_dto.type
if (secret_type == sstore.SecretType.PRIVATE or
secret_type == sstore.SecretType.PUBLIC or
secret_type == sstore.SecretType.CERTIFICATE):
normalized_secret = translations.get_pem_components(
normalized_secret)[1]
normalized_secret = base64.b64decode(normalized_secret)
encrypt_dto = crypto.EncryptDTO(normalized_secret)
# Enhance the context with content_type, This is needed to build
@@ -101,10 +109,11 @@ class StoreCryptoAdapterPlugin(object):
return None
def get_secret(self, secret_metadata, context):
def get_secret(self, secret_type, metadata, context):
"""Retrieve a secret.
:param secret_metadata: secret metadata
:param secret_type: secret type
:param metadata: secret metadata
:param context: StoreCryptoContext for secret
:returns: SecretDTO that contains secret
"""
@@ -131,11 +140,16 @@ class StoreCryptoAdapterPlugin(object):
kek_meta_dto,
datum_model.kek_meta_extended,
context.project_model.external_id)
secret = base64.b64encode(secret)
if (secret_type == sstore.SecretType.PRIVATE or
secret_type == sstore.SecretType.PUBLIC or
secret_type == sstore.SecretType.CERTIFICATE):
secret = translations.to_pem(secret_type, secret, True)
key_spec = sstore.KeySpec(alg=context.secret_model.algorithm,
bit_length=context.secret_model.bit_length,
mode=context.secret_model.mode)
return sstore.SecretDTO(sstore.SecretType.SYMMETRIC,
return sstore.SecretDTO(secret_type,
secret, key_spec,
datum_model.content_type)

View File

@@ -26,7 +26,9 @@ PLAIN_TEXT = ['text/plain',
'text/plain;charset=utf-8',
'text/plain; charset=utf-8']
PLAIN_TEXT_CHARSETS = ['utf-8']
BINARY = ['application/octet-stream']
BINARY = ['application/octet-stream',
'application/pkcs8',
'application/pkix-cert']
SUPPORTED = PLAIN_TEXT + BINARY
# Normalizes client types to internal types.
@@ -34,6 +36,8 @@ INTERNAL_CTYPES = {'text/plain': 'text/plain',
'text/plain;charset=utf-8': 'text/plain',
'text/plain; charset=utf-8': 'text/plain',
'application/octet-stream': 'application/octet-stream',
'application/pkcs8': 'application/pkcs8',
'application/pkix-cert': 'application/pkix-cert',
'application/aes': 'application/aes'}
# Maps mime-types used to specify secret data formats to the types that can
@@ -42,9 +46,13 @@ INTERNAL_CTYPES = {'text/plain': 'text/plain',
# which are then used as the keys to the 'CTYPES_MAPPINGS' below.
CTYPES_PLAIN = {'default': 'text/plain'}
CTYPES_BINARY = {'default': 'application/octet-stream'}
CTYPES_PKCS8 = {'default': 'application/pkcs8'}
CTYPES_PKIX_CERT = {'default': 'application/pkix-cert'}
CTYPES_AES = {'default': 'application/aes'}
CTYPES_MAPPINGS = {'text/plain': CTYPES_PLAIN,
'application/octet-stream': CTYPES_BINARY,
'application/pkcs8': CTYPES_PKCS8,
'application/pkix-cert': CTYPES_PKIX_CERT,
'application/aes': CTYPES_AES}
# Supported encodings
@@ -53,6 +61,8 @@ ENCODINGS = ['base64']
# Maps normalized content-types to supported encoding(s)
CTYPES_TO_ENCODINGS = {'text/plain': None,
'application/octet-stream': ['base64', 'binary'],
'application/pkcs8': ['base64', 'binary'],
'application/pkix-cert': ['base64', 'binary'],
'application/aes': None}
@@ -75,7 +85,8 @@ def normalize_content_type(mime_type):
def is_supported(mime_type):
return mime_type in SUPPORTED
normalized_type = normalize_content_type(mime_type)
return normalized_type in SUPPORTED
def is_base64_encoding_supported(mime_type):
@@ -85,6 +96,22 @@ def is_base64_encoding_supported(mime_type):
return False
def is_content_type_with_encoding_supported(content_type, content_encoding):
if not is_supported(content_type):
return False
normalized_type = normalize_content_type(content_type)
encodings = CTYPES_TO_ENCODINGS[INTERNAL_CTYPES[normalized_type]]
if encodings:
return content_encoding in encodings
else:
return content_encoding is None
def get_supported_encodings(content_type):
normalized_type = normalize_content_type(content_type)
return CTYPES_TO_ENCODINGS[INTERNAL_CTYPES[normalized_type]]
def is_base64_processing_needed(content_type, content_encoding):
content_encodings = utils.get_accepted_encodings_direct(content_encoding)
if content_encodings:

View File

@@ -18,15 +18,21 @@ from barbican.plugin.util import mime_types
def normalize_before_encryption(unencrypted, content_type, content_encoding,
enforce_text_only=False):
"""Normalize unencrypted prior to plugin encryption processing."""
secret_type, enforce_text_only=False):
"""Normalize unencrypted prior to plugin encryption processing.
This normalizes the secrets before they are handed off to the SecretStore
for storage. This converts all data to Base64 data. If the data is plain
text then it encoded using utf-8 first and then Base64 encoded. Binary
data is simply converted to Base64. In addition if the secret type is
one of private, public, or certificate then the PEM headers are added
to the Base64 encoding.
"""
if not unencrypted:
raise s.SecretNoPayloadProvidedException()
# Validate and normalize content-type.
normalized_mime = mime_types.normalize_content_type(content_type)
if not mime_types.is_supported(normalized_mime):
raise s.SecretContentTypeNotSupportedException(content_type)
normalized_mime = normalize_content_type(content_type)
# Process plain-text type.
if normalized_mime in mime_types.PLAIN_TEXT:
@@ -39,14 +45,22 @@ def normalize_before_encryption(unencrypted, content_type, content_encoding,
if content_encoding:
content_encoding = content_encoding.lower()
if content_encoding == 'base64':
b64payload = unencrypted
if is_pem_payload(unencrypted):
pem_components = get_pem_components(unencrypted)
b64payload = pem_components[1]
try:
base64.b64decode(unencrypted)
base64.b64decode(b64payload)
except TypeError:
raise s.SecretPayloadDecodingError()
unencrypted = unencrypted
elif mime_types.use_binary_content_as_is(content_type,
content_encoding):
unencrypted = base64.b64encode(unencrypted)
if (secret_type == s.SecretType.PRIVATE or
secret_type == s.SecretType.PUBLIC or
secret_type == s.SecretType.CERTIFICATE):
unencrypted = to_pem(secret_type, unencrypted)
else:
unencrypted = base64.b64encode(unencrypted)
elif enforce_text_only:
# For text-based protocols (such as the one-step secret POST),
# only 'base64' encoding is possible/supported.
@@ -60,6 +74,14 @@ def normalize_before_encryption(unencrypted, content_type, content_encoding,
return unencrypted, normalized_mime
def normalize_content_type(content_type):
"""Normalize the content type and validate that it is supported."""
normalized_mime = mime_types.normalize_content_type(content_type)
if not mime_types.is_supported(normalized_mime):
raise s.SecretContentTypeNotSupportedException(content_type)
return normalized_mime
def analyze_before_decryption(content_type):
"""Determine support for desired content type."""
if not mime_types.is_supported(content_type):
@@ -67,17 +89,90 @@ def analyze_before_decryption(content_type):
def denormalize_after_decryption(unencrypted, content_type):
"""Translate the decrypted data into the desired content type."""
"""Translate the decrypted data into the desired content type.
This is called when the raw keys are requested by the user. The secret
returned from the SecretStore is the unencrypted parameter. This
'denormalizes' the data back to its binary format.
"""
# Process plain-text type.
if content_type in mime_types.PLAIN_TEXT:
# normalize text to binary string
try:
unencrypted = base64.b64decode(unencrypted)
unencrypted = unencrypted.decode('utf-8')
except UnicodeDecodeError:
raise s.SecretAcceptNotSupportedException(content_type)
# Process binary type.
elif content_type not in mime_types.BINARY:
elif content_type in mime_types.BINARY:
if is_pem_payload(unencrypted):
unencrypted = get_pem_components(unencrypted)[1]
unencrypted = base64.b64decode(unencrypted)
else:
raise s.SecretContentTypeNotSupportedException(content_type)
return unencrypted
def get_pem_components(pem):
"""Returns the PEM content, header, and footer.
This parses the PEM string and returns the PEM header, content, and footer.
The content is the base64 encoded bytes without the header and footer. This
is returned as a list. The order of the list is header, content, footer.
"""
delim = "-----"
splits = pem.split(delim)
if len(splits) != 5 or splits[0] != "" or splits[4] != "":
raise s.SecretPayloadDecodingError()
header = delim + splits[1] + delim
content = splits[2]
footer = delim + splits[3] + delim
return (header, content, footer)
def is_pem_payload(payload):
"""Tests whether payload is in PEM format.
This parses the payload for the PEM header and footer strings. If it finds
the header and footer strings then it is assumed to be a PEM payload.
"""
delim = "-----"
splits = payload.split(delim)
if len(splits) != 5 or splits[0] != "" or splits[4] != "":
return False
else:
return True
def to_pem(secret_type, payload, payload_encoded=False):
"""Converts payload to PEM format.
This converts the payload to Base 64 encoding if payload_encoded is False
and then adds PEM headers. This uses the secret_type to determined the PEM
header.
"""
pem = payload
if payload_encoded:
pem_content = payload
else:
pem_content = base64.b64encode(payload)
if secret_type == s.SecretType.PRIVATE:
headers = _get_pem_headers("PRIVATE KEY")
pem = headers[0] + pem_content + headers[1]
elif secret_type == s.SecretType.PUBLIC:
headers = _get_pem_headers("PUBLIC KEY")
pem = headers[0] + pem_content + headers[1]
elif secret_type == s.SecretType.CERTIFICATE:
headers = _get_pem_headers("CERTIFICATE")
pem = headers[0] + pem_content + headers[1]
return pem
def _get_pem_headers(pem_name):
header = "-----BEGIN {}-----".format(pem_name)
footer = "-----END {}-----".format(pem_name)
return (header, footer)

View File

@@ -330,7 +330,7 @@ def _save_plugin_metadata(order_model, plugin_meta):
def _save_secrets(result, project_model):
cert_secret_model, transport_key_model = plugin.store_secret(
unencrypted_raw=result.certificate,
content_type_raw='text/plain',
content_type_raw='application/pkix-cert',
content_encoding='base64',
spec={},
secret_model=None,
@@ -340,7 +340,7 @@ def _save_secrets(result, project_model):
if result.intermediates:
intermediates_secret_model, transport_key_model = plugin.store_secret(
unencrypted_raw=result.intermediates,
content_type_raw='text/plain',
content_type_raw='application/pkix-cert',
content_encoding='base64',
spec={},
secret_model=None,

View File

@@ -27,6 +27,53 @@ VALID_EXTENSIONS = "valid extensions"
VALID_FULL_CMC = "valid CMC"
def get_symmetric_key_req():
return {'name': 'mysymmetrickey',
'payload_content_type': 'application/octet-stream',
'payload_content_encoding': 'base64',
'algorithm': 'aes',
'bit_length': 256,
'secret_type': 'symmetric',
'payload': 'gF6+lLoF3ohA9aPRpt+6bQ=='}
def get_private_key_req():
return {'name': 'myprivatekey',
'payload_content_type': 'application/pkcs8',
'payload_content_encoding': 'base64',
'algorithm': 'rsa',
'bit_length': 1024,
'secret_type': 'private',
'payload': utils.get_private_key()}
def get_public_key_req():
return {'name': 'mypublickey',
'payload_content_type': 'application/octet-stream',
'payload_content_encoding': 'base64',
'algorithm': 'rsa',
'bit_length': 1024,
'secret_type': 'public',
'payload': utils.get_public_key()}
def get_certificate_req():
return {'name': 'mycertificate',
'payload_content_type': 'application/pkix-cert',
'payload_content_encoding': 'base64',
'algorithm': 'rsa',
'bit_length': 1024,
'secret_type': 'certificate',
'payload': utils.get_certificate()}
def get_passphrase_req():
return {'name': 'mypassphrase',
'payload_content_type': 'text/plain',
'secret_type': 'passphrase',
'payload': 'mysecretpassphrase'}
def suite():
suite = unittest.TestSuite()
@@ -62,6 +109,7 @@ class WhenTestingValidatorsFunctions(utils.BaseTestCase):
self.assertTrue(is_too_big)
@utils.parameterized_test_case
class WhenTestingSecretValidator(utils.BaseTestCase):
def setUp(self):
@@ -72,12 +120,14 @@ class WhenTestingSecretValidator(utils.BaseTestCase):
self.payload_content_type = 'text/plain'
self.secret_algorithm = 'algo'
self.secret_bit_length = 512
self.secret_type = 'opaque'
self.secret_mode = 'cytype'
self.secret_req = {'name': self.name,
'payload_content_type': self.payload_content_type,
'algorithm': self.secret_algorithm,
'bit_length': self.secret_bit_length,
'secret_type': self.secret_type,
'mode': self.secret_mode,
'payload': self.payload}
@@ -333,6 +383,59 @@ class WhenTestingSecretValidator(utils.BaseTestCase):
)
self.assertEqual('payload', exception.invalid_property)
def test_should_pass_with_no_secret_type(self):
request = dict(self.secret_req)
del request['secret_type']
self.validator.validate(request)
def test_should_fail_with_unknown_secret_type(self):
self.secret_req['secret_type'] = 'unknown_type'
self.assertRaises(
excep.InvalidObject,
self.validator.validate,
self.secret_req,
)
@utils.parameterized_dataset({
'symmetric': [get_symmetric_key_req()],
'private': [get_private_key_req()],
'public': [get_public_key_req()],
'certificate': [get_certificate_req()],
'passphrase': [get_passphrase_req()],
})
def test_should_pass_with_secret_type(self, request):
self.validator.validate(request)
@utils.parameterized_dataset({
'symmetric': [get_symmetric_key_req(), 'foo'],
'private': [get_private_key_req(), 'foo'],
'public': [get_public_key_req(), 'foo'],
'certificate': [get_certificate_req(), 'foo'],
'passphrase': [get_passphrase_req(), 'base64'],
})
def test_should_fail_with_bad_encoding(self, request, content_encoding):
request['payload_content_encoding'] = content_encoding
self.assertRaises(
excep.InvalidObject,
self.validator.validate,
request,
)
@utils.parameterized_dataset({
'symmetric': [get_symmetric_key_req(), 'text/plain'],
'private': [get_private_key_req(), 'text/plain'],
'public': [get_public_key_req(), 'text/plain'],
'certificate': [get_certificate_req(), 'text/plain'],
'passphrase': [get_passphrase_req(), 'application/octet-stream'],
})
def test_should_fail_with_bad_content_type(self, request, content_type):
request['payload_content_type'] = content_type
self.assertRaises(
excep.InvalidObject,
self.validator.validate,
request,
)
class WhenTestingContainerValidator(utils.BaseTestCase):
@@ -1182,5 +1285,6 @@ class WhenTestingStoredKeyOrderValidator(utils.BaseTestCase):
self.validator.validate,
self.order_req)
if __name__ == '__main__':
unittest.main()

View File

@@ -370,9 +370,7 @@ class WhenTestingSimpleCryptoPlugin(utils.BaseTestCase):
mock.MagicMock())
prv_seq = asn1.DerSequence()
data = "\n".join(private_dto.strip().split("\n")
[1:-1]).decode("base64")
prv_seq.decode(data)
prv_seq.decode(private_dto)
p, q, g, y, x = prv_seq[1:]
private_dto = DSA.construct((y, g, p, q, x))

View File

@@ -48,6 +48,7 @@ class WhenTestingKMIPSecretStore(utils.BaseTestCase):
self.secret_store = kss.KMIPSecretStore(CONF)
self.secret_store.client = self.kmipclient_mock
self.secret_store.credential = self.credential
self.symmetric_type = secret_store.SecretType.SYMMETRIC
self.sample_secret_features = {
'key_format_type': enums.KeyFormatType.RAW,
@@ -280,13 +281,14 @@ class WhenTestingKMIPSecretStore(utils.BaseTestCase):
def test_get_secret_symmetric_assert_called(self):
metadata = {kss.KMIPSecretStore.KEY_UUID: 'uuid'}
self.secret_store.get_secret(metadata)
self.secret_store.get_secret(self.symmetric_type, metadata)
self.kmipclient_mock.get.assert_called_once_with('uuid',
self.credential)
def test_get_secret_symmetric_return_value_key_material_struct(self):
metadata = {kss.KMIPSecretStore.KEY_UUID: 'uuid'}
return_value = self.secret_store.get_secret(metadata)
return_value = self.secret_store.get_secret(self.symmetric_type,
metadata)
self.assertEqual(secret_store.SecretDTO, type(return_value))
self.assertEqual(secret_store.SecretType.SYMMETRIC, return_value.type)
# The plugin returns a base64 string for the secret
@@ -306,7 +308,8 @@ class WhenTestingKMIPSecretStore(utils.BaseTestCase):
secret=sample_secret))
metadata = {kss.KMIPSecretStore.KEY_UUID: 'uuid'}
return_value = self.secret_store.get_secret(metadata)
return_value = self.secret_store.get_secret(self.symmetric_type,
metadata)
self.assertEqual(secret_store.SecretDTO, type(return_value))
self.assertEqual(secret_store.SecretType.SYMMETRIC, return_value.type)
# The plugin returns a base64 string for the secret
@@ -328,7 +331,7 @@ class WhenTestingKMIPSecretStore(utils.BaseTestCase):
self.assertRaises(
secret_store.SecretGeneralException,
self.secret_store.get_secret,
metadata)
self.symmetric_type, metadata)
def test_get_secret_symmetric_error_occurs(self):
self.secret_store.client.get = mock.create_autospec(
@@ -338,7 +341,7 @@ class WhenTestingKMIPSecretStore(utils.BaseTestCase):
self.assertRaises(
secret_store.SecretGeneralException,
self.secret_store.get_secret,
metadata)
self.symmetric_type, metadata)
def test_get_secret_symmetric_error_opening_connection(self):
self.secret_store.client.open = mock.Mock(side_effect=socket.error)
@@ -347,7 +350,7 @@ class WhenTestingKMIPSecretStore(utils.BaseTestCase):
self.assertRaises(
secret_store.SecretGeneralException,
self.secret_store.get_secret,
metadata)
self.symmetric_type, metadata)
# ---------------- TEST DELETE -------------------------------------------
@@ -396,16 +399,6 @@ class WhenTestingKMIPSecretStore(utils.BaseTestCase):
self.assertIsNone(
self.secret_store._map_type_ss_to_kmip('bad_type'))
def test_map_type_kmip_to_ss_valid_type(self):
kmip_types = [enums.ObjectType.SYMMETRIC_KEY]
for kmip_type in kmip_types:
self.assertIsNotNone(
self.secret_store._map_type_kmip_to_ss(kmip_type))
def test_map_type_kmip_to_ss_invalid_type(self):
self.assertIsNone(
self.secret_store._map_type_kmip_to_ss('bad_type'))
def test_map_algorithm_ss_to_kmip_valid_alg(self):
ss_algs = [secret_store.KeyAlgorithm.AES,
secret_store.KeyAlgorithm.DES,

View File

@@ -17,11 +17,14 @@ import base64
import mock
import testtools
from barbican.model import models
from barbican.plugin.interface import secret_store
from barbican.plugin import resources
from barbican.plugin import store_crypto
from barbican.tests import utils
@utils.parameterized_test_case
class WhenTestingPluginResource(testtools.TestCase,
utils.MockModelRepositoryMixin):
@@ -51,11 +54,10 @@ class WhenTestingPluginResource(testtools.TestCase,
}
self.moc_plugin_patcher = mock.patch(
'barbican.plugin.interface.secret_store'
'.get_manager',
'barbican.plugin.interface.secret_store.get_manager',
**moc_plugin_config
)
self.moc_plugin_patcher.start()
self.moc_plugin_manager = self.moc_plugin_patcher.start()
self.addCleanup(self.moc_plugin_patcher.stop)
self.setup_project_repository_mock()
@@ -85,7 +87,8 @@ class WhenTestingPluginResource(testtools.TestCase,
super(WhenTestingPluginResource, self).tearDown()
def test_store_secret_dto(self):
spec = {'algorithm': 'AES', 'bit_length': 256}
spec = {'algorithm': 'AES', 'bit_length': 256,
'secret_type': 'symmetric'}
secret = base64.b64encode('ABCDEFABCDEFABCDEFABCDEF')
self.plugin_resource.store_secret(
@@ -103,6 +106,67 @@ class WhenTestingPluginResource(testtools.TestCase,
self.assertEqual(spec['bit_length'], dto.key_spec.bit_length)
self.assertEqual(self.content_type, dto.content_type)
@utils.parameterized_dataset({
'general_secret_store': {
'moc_plugin': None
},
'store_crypto': {
'moc_plugin': mock.MagicMock(store_crypto.StoreCryptoAdapterPlugin)
}
})
def test_get_secret_dto(self, moc_plugin):
def mock_secret_store_store_secret(dto):
self.secret_dto = dto
def mock_secret_store_get_secret(secret_type, secret_metadata):
return self.secret_dto
def mock_store_crypto_store_secret(dto, context):
self.secret_dto = dto
def mock_store_crypto_get_secret(
secret_type, secret_metadata, context):
return self.secret_dto
if moc_plugin:
self.moc_plugin = moc_plugin
self.moc_plugin.store_secret.return_value = {}
self.moc_plugin.store_secret.side_effect = (
mock_store_crypto_store_secret)
self.moc_plugin.get_secret.side_effect = (
mock_store_crypto_get_secret)
moc_plugin_config = {
'return_value.get_plugin_store.return_value':
self.moc_plugin,
'return_value.get_plugin_retrieve_delete.return_value':
self.moc_plugin
}
self.moc_plugin_manager.configure_mock(**moc_plugin_config)
else:
self.moc_plugin.store_secret.side_effect = (
mock_secret_store_store_secret)
self.moc_plugin.get_secret.side_effect = (
mock_secret_store_get_secret)
raw_secret = 'ABCDEFABCDEFABCDEFABCDEF'
spec = {'name': 'testsecret', 'algorithm': 'AES', 'bit_length': 256,
'secret_type': 'symmetric'}
self.plugin_resource.store_secret(
base64.b64encode(raw_secret),
self.content_type,
'base64',
spec,
None,
self.project_model)
secret = self.plugin_resource.get_secret(
'application/octet-stream',
models.Secret(spec),
None)
self.assertEqual(raw_secret, secret)
def test_generate_asymmetric_with_passphrase(self):
"""test asymmetric secret generation with passphrase."""
secret_container = self.plugin_resource.generate_asymmetric_secret(

View File

@@ -22,9 +22,39 @@ from barbican.model import models
from barbican.plugin.crypto import crypto
from barbican.plugin.interface import secret_store
from barbican.plugin import store_crypto
from barbican.plugin.util import translations
from barbican.tests import utils as test_utils
def get_private_dto():
spec = secret_store.KeySpec(secret_store.KeyAlgorithm.RSA, 1024)
return secret_store.SecretDTO(secret_store.SecretType.PRIVATE,
test_utils.get_private_key(),
spec,
'application/pkcs8')
def get_public_dto():
spec = secret_store.KeySpec(secret_store.KeyAlgorithm.RSA, 1024)
return secret_store.SecretDTO(secret_store.SecretType.PUBLIC,
test_utils.get_public_key(),
spec,
'application/octet-stream')
def get_certificate_dto():
spec = secret_store.KeySpec(secret_store.KeyAlgorithm.RSA, 1024)
return secret_store.SecretDTO(secret_store.SecretType.CERTIFICATE,
test_utils.get_certificate(),
spec,
'application/pkix-cert')
def get_pem_content(pem):
b64_content = translations.get_pem_components(pem)[1]
return base64.b64decode(b64_content)
class TestSecretStoreBase(testtools.TestCase,
test_utils.MockModelRepositoryMixin):
"""Define common configurations for testing store_crypto.py."""
@@ -48,7 +78,7 @@ class TestSecretStoreBase(testtools.TestCase,
self.project_model.id = 'project-model-id'
self.project_model.external_id = self.project_id
self.secret_dto = secret_store.SecretDTO(
secret_store.SecretType.SYMMETRIC,
secret_store.SecretType.OPAQUE,
self.secret,
secret_store.KeySpec(),
self.content_type
@@ -132,8 +162,15 @@ class TestSecretStoreBase(testtools.TestCase,
self.setup_kek_datum_repository_mock(self.kek_repo)
@test_utils.parameterized_test_case
class WhenTestingStoreCrypto(TestSecretStoreBase):
dataset_for_pem = {
'private': [get_private_dto()],
'public': [get_public_dto()],
'certificate': [get_certificate_dto()]
}
def setUp(self):
super(WhenTestingStoreCrypto, self).setUp()
@@ -170,17 +207,41 @@ class WhenTestingStoreCrypto(TestSecretStoreBase):
self.assertEqual(self.content_type, self.context.content_type)
@test_utils.parameterized_dataset(dataset_for_pem)
def test_store_pem_secret(self, secret_dto):
"""Test storing a secret that is PEM encoded."""
response_dict = self.plugin_to_test.store_secret(
secret_dto, self.context)
self.assertEqual(None, response_dict)
content = translations.get_pem_components(secret_dto.secret)[1]
raw_content = base64.b64decode(content)
# Verify encrypt plugin and method where invoked.
encrypt_mock = self.encrypting_plugin.encrypt
self.assertEqual(encrypt_mock.call_count, 1)
args, kwargs = encrypt_mock.call_args
test_encrypt_dto, test_kek_meta_dto, test_project_id = tuple(args)
self.assertIsInstance(test_encrypt_dto, crypto.EncryptDTO)
self.assertEqual(raw_content, test_encrypt_dto.unencrypted)
self.assertEqual(self.kek_meta_dto, test_kek_meta_dto)
self.assertEqual(self.project_id, test_project_id)
def test_get_secret(self):
"""Test getting a secret."""
secret_dto = self.plugin_to_test.get_secret(
secret_store.SecretType.OPAQUE,
None, # Secret metadata is not relevant to store_crypto process.
self.context)
# Verify response.
self.assertIsInstance(secret_dto, secret_store.SecretDTO)
self.assertEqual(secret_store.SecretType.SYMMETRIC, secret_dto.type)
self.assertEqual(self.decrypted_secret, secret_dto.secret)
self.assertEqual(secret_store.SecretType.OPAQUE, secret_dto.type)
self.assertEqual(base64.b64encode(self.decrypted_secret),
secret_dto.secret)
self.assertEqual(
self.encrypted_datum_model.content_type, secret_dto.content_type)
self.assertIsInstance(secret_dto.key_spec, secret_store.KeySpec)
@@ -217,6 +278,39 @@ class WhenTestingStoreCrypto(TestSecretStoreBase):
self.assertEqual(self.project_id, test_project_id)
@test_utils.parameterized_dataset(dataset_for_pem)
def test_get_secret_encoding(self, input_secret_dto):
"""Test getting a secret that should be returend in PEM format."""
secret = input_secret_dto.secret
key_spec = input_secret_dto.key_spec
secret_type = input_secret_dto.type
decrypt_mock = self.retrieving_plugin.decrypt
content = translations.get_pem_components(secret)[1]
decrypt_mock.return_value = base64.b64decode(content)
secret_model = self.context.secret_model
secret_model.algorithm = key_spec.alg
secret_model.bit_length = key_spec.bit_length
secret_model.mode = key_spec.mode
secret_dto = self.plugin_to_test.get_secret(
secret_type,
None, # Secret metadata is not relevant to store_crypto process.
self.context)
# Verify response.
self.assertIsInstance(secret_dto, secret_store.SecretDTO)
self.assertEqual(secret_type, secret_dto.type)
self.assertEqual(secret, secret_dto.secret)
self.assertIsInstance(secret_dto.key_spec, secret_store.KeySpec)
self.assertEqual(
secret_model.algorithm, secret_dto.key_spec.alg)
self.assertEqual(
secret_model.bit_length, secret_dto.key_spec.bit_length)
self.assertEqual(
secret_model.mode, secret_dto.key_spec.mode)
def test_generate_symmetric_key(self):
"""test symmetric secret generation."""
generation_type = crypto.PluginSupportTypes.SYMMETRIC_KEY_GENERATION
@@ -289,6 +383,7 @@ class WhenTestingStoreCrypto(TestSecretStoreBase):
self.assertRaises(
secret_store.SecretNotFoundException,
self.plugin_to_test.get_secret,
secret_store.SecretType.OPAQUE,
None, # get_secret() doesn't use the secret metadata argument
self.context
)
@@ -299,6 +394,7 @@ class WhenTestingStoreCrypto(TestSecretStoreBase):
self.assertRaises(
secret_store.SecretNotFoundException,
self.plugin_to_test.get_secret,
secret_store.SecretType.OPAQUE,
None, # get_secret() doesn't use the secret metadata argument
self.context
)

View File

@@ -111,8 +111,8 @@ class WhenTestingAugmentFieldsWithContentTypes(utils.BaseTestCase):
def test_static_supported_binary(self):
for bin in mime_types.BINARY:
self.assertEqual('application/octet-stream',
mime_types.INTERNAL_CTYPES[bin])
self.assertTrue(mime_types.INTERNAL_CTYPES[bin] in
mime_types.BINARY)
def test_static_content_to_encodings(self):
self.assertIn('text/plain', mime_types.CTYPES_TO_ENCODINGS)
@@ -195,3 +195,31 @@ class WhenTestingNormalizationOfMIMETypes(utils.BaseTestCase):
mime = 'something/bogus'
r = mime_types.normalize_content_type(mime)
self.assertEqual(r, 'something/bogus')
@utils.parameterized_test_case
class WhenTestingIfContentTypeAndEncodingSupported(utils.BaseTestCase):
@utils.parameterized_dataset({
'plaintext_none': ['text/plain', None],
'octectstream_base64': ['application/octet-stream', 'base64'],
'pkcs8_base64': ['application/pkcs8', 'base64'],
'pkix_base64': ['application/pkix-cert', 'base64'],
})
def test_content_type_encoding_supported(self, content_type, encoding):
self.assertTrue(mime_types.is_content_type_with_encoding_supported(
content_type,
encoding))
@utils.parameterized_dataset({
'plaintext_none': ['text/plain', 'base64'],
'octectstream_no_encoding': ['application/octet-stream', None],
'pkcs8_no_encoding': ['application/pkcs8', None],
'pkix_no_encoding': ['application/pkix-cert', None],
'unknown_with_valid_encoding': ['application/uknown-content-type',
'base64']
})
def test_content_type_encoding_not_supported(self, content_type, encoding):
self.assertFalse(mime_types.is_content_type_with_encoding_supported(
content_type,
encoding))

View File

@@ -26,24 +26,28 @@ class WhenNormalizingBeforeEncryption(utils.BaseTestCase):
'non_encrypted_content': {
'exception': s.SecretNoPayloadProvidedException,
'unencrypted': None,
'secret_type': s.SecretType.OPAQUE,
'content_type': '',
'content_encoding': ''
},
'invalid_content_type': {
'exception': s.SecretContentTypeNotSupportedException,
'unencrypted': 'stuff',
'secret_type': s.SecretType.OPAQUE,
'content_type': 'nope',
'content_encoding': ''
},
'invalid_base64_content': {
'exception': s.SecretPayloadDecodingError,
'unencrypted': 'stuff',
'secret_type': s.SecretType.OPAQUE,
'content_type': 'application/octet-stream',
'content_encoding': 'base64'
},
'content_encoding_isnt_base64': {
'exception': s.SecretContentEncodingMustBeBase64,
'unencrypted': 'stuff',
'secret_type': s.SecretType.OPAQUE,
'content_type': 'application/octet-stream',
'content_encoding': 'other_stuff',
'enforce_text_only': True
@@ -51,22 +55,119 @@ class WhenNormalizingBeforeEncryption(utils.BaseTestCase):
'unsupported_content_encoding': {
'exception': s.SecretContentEncodingNotSupportedException,
'unencrypted': 'stuff',
'secret_type': s.SecretType.OPAQUE,
'content_type': 'application/octet-stream',
'content_encoding': 'other_stuff'
}
}
dataset_for_normalization = {
'plain_text': {
'unencrypted': 'stuff',
'secret_type': s.SecretType.OPAQUE,
'content_type': 'text/plain',
'content_encoding': '',
'expected': base64.b64encode('stuff'.encode('utf-8'))
},
'binary_base64': {
'unencrypted': base64.b64encode('stuff'),
'secret_type': s.SecretType.OPAQUE,
'content_type': 'application/octet-stream',
'content_encoding': 'base64',
'expected': base64.b64encode('stuff')
},
'binary': {
'unencrypted': 'stuff',
'secret_type': s.SecretType.OPAQUE,
'content_type': 'application/octet-stream',
'content_encoding': 'binary',
'expected': base64.b64encode('stuff')
},
'symmetric_base64': {
'unencrypted': base64.b64encode('stuff'),
'secret_type': s.SecretType.SYMMETRIC,
'content_type': 'application/octet-stream',
'content_encoding': 'base64',
'expected': base64.b64encode('stuff')
},
'symmetric': {
'unencrypted': 'stuff',
'secret_type': s.SecretType.SYMMETRIC,
'content_type': 'application/octet-stream',
'content_encoding': 'binary',
'expected': base64.b64encode('stuff')
},
'private_base64': {
'unencrypted': utils.get_private_key(),
'secret_type': s.SecretType.PRIVATE,
'content_type': 'application/pkcs8',
'content_encoding': 'base64',
'expected': utils.get_private_key()
},
'private': {
'unencrypted': base64.b64decode(
translations.get_pem_components(utils.get_private_key())[1]),
'secret_type': s.SecretType.PRIVATE,
'content_type': 'application/pkcs8',
'content_encoding': 'binary',
'expected': utils.get_private_key()
},
'public_base64': {
'unencrypted': utils.get_public_key(),
'secret_type': s.SecretType.PUBLIC,
'content_type': 'application/octet-stream',
'content_encoding': 'base64',
'expected': utils.get_public_key()
},
'public': {
'unencrypted': base64.b64decode(
translations.get_pem_components(utils.get_public_key())[1]),
'secret_type': s.SecretType.PUBLIC,
'content_type': 'application/octet-stream',
'content_encoding': 'binary',
'expected': utils.get_public_key()
},
'certificate_base64': {
'unencrypted': utils.get_certificate(),
'secret_type': s.SecretType.CERTIFICATE,
'content_type': 'application/pkix-cert',
'content_encoding': 'base64',
'expected': utils.get_certificate()
},
'certificate': {
'unencrypted': base64.b64decode(
translations.get_pem_components(utils.get_certificate())[1]),
'secret_type': s.SecretType.CERTIFICATE,
'content_type': 'application/pkix-cert',
'content_encoding': 'binary',
'expected': utils.get_certificate()
},
}
def setUp(self):
super(WhenNormalizingBeforeEncryption, self).setUp()
# Aliasing to reduce the number of line continuations
self.normalize = translations.normalize_before_encryption
def test_can_normalize_plain_text(self):
@utils.parameterized_dataset(dataset_for_normalization)
def test_can_normalize(self, **kwargs):
unencrypted, content_type = self.normalize(
unencrypted=kwargs['unencrypted'],
content_type=kwargs['content_type'],
content_encoding=kwargs['content_encoding'],
secret_type=kwargs['secret_type']
)
self.assertEqual(kwargs['expected'], unencrypted)
self.assertEqual(kwargs['content_type'], content_type)
def test_can_normalize_tmp_plain_text(self):
unencrypted, content_type = self.normalize(
unencrypted='stuff',
content_type='text/plain',
content_encoding=''
content_encoding='',
secret_type=s.SecretType.OPAQUE
)
self.assertEqual(unencrypted, base64.b64encode('stuff'))
@@ -76,11 +177,12 @@ class WhenNormalizingBeforeEncryption(utils.BaseTestCase):
unencrypted, content_type = self.normalize(
unencrypted='bam',
content_type='application/octet-stream',
content_encoding=None
content_encoding=None,
secret_type=s.SecretType.OPAQUE
)
self.assertEqual(unencrypted, base64.b64encode('bam'))
self.assertEqual(content_type, 'application/octet-stream')
self.assertEqual(base64.b64encode('bam'), unencrypted)
self.assertEqual('application/octet-stream', content_type)
@utils.parameterized_dataset(dataset_for_raised_exceptions)
def test_normalize_raising_exceptions_with(self, exception, **kwargs):
@@ -107,7 +209,96 @@ class WhenAnalyzingBeforeDecryption(utils.BaseTestCase):
self.assertRaises(exception, self.analyze, **kwargs)
@utils.parameterized_test_case
class WhenNormalizingPemSecrets(utils.BaseTestCase):
dataset_for_pem_normalize = {
'private_key': {
'pem': utils.get_private_key()
},
'public_key': {
'pem': utils.get_public_key()
},
'certificate': {
'pem': utils.get_certificate()
}
}
dataset_for_bad_pems = {
'raw_string': {
'pem': 'fooandstuff'
},
'no_header': {
'pem': utils.get_private_key()[27:]
},
'no_footer': {
'pem': utils.get_private_key()[:-15]
},
'no_header_no_footer': {
'pem': utils.get_private_key()[27:-15]
}
}
dataset_for_to_pem = {
'private_key': {
'secret_type': s.SecretType.PRIVATE,
'pem': utils.get_private_key()
},
'public_key': {
'secret_type': s.SecretType.PUBLIC,
'pem': utils.get_public_key()
},
'certificate': {
'secret_type': s.SecretType.CERTIFICATE,
'pem': utils.get_certificate()
}
}
@utils.parameterized_dataset(dataset_for_pem_normalize)
def test_pem_normalized(self, pem):
pem_components = translations.get_pem_components(pem)
self.assertEqual(3, len(pem_components))
pem_msg = pem_components[0] + pem_components[1] + pem_components[2]
self.assertEqual(pem, pem_msg)
@utils.parameterized_dataset(dataset_for_to_pem)
def test_to_pem(self, secret_type, pem):
pem_components = translations.get_pem_components(pem)
content = base64.b64decode(pem_components[1])
pem_msg = translations.to_pem(secret_type, content, False)
self.assertEqual(pem, pem_msg)
@utils.parameterized_dataset(dataset_for_to_pem)
def test_to_pem_payload_encoded(self, secret_type, pem):
pem_components = translations.get_pem_components(pem)
content = pem_components[1]
pem_msg = translations.to_pem(secret_type, content, True)
self.assertEqual(pem, pem_msg)
@utils.parameterized_dataset(dataset_for_bad_pems)
def test_pem_normalize_raising_exceptions_with(self, pem):
self.assertRaises(s.SecretPayloadDecodingError,
translations.get_pem_components, pem)
@utils.parameterized_test_case
class WhenDenormalizingAfterDecryption(utils.BaseTestCase):
dataset_for_pem_denormalize = {
'private_key': {
'pem': utils.get_private_key(),
'content_type': 'application/pkcs8'
},
'public_key': {
'pem': utils.get_public_key(),
'content_type': 'application/octet-stream'
},
'certificate': {
'pem': utils.get_certificate(),
'content_type': 'application/pkix-cert'
}
}
def setUp(self):
super(WhenDenormalizingAfterDecryption, self).setUp()
@@ -115,17 +306,21 @@ class WhenDenormalizingAfterDecryption(utils.BaseTestCase):
self.denormalize = translations.denormalize_after_decryption
def test_ascii_characters_to_utf8_with_plain_text(self):
unencrypted = self.denormalize('bam', 'text/plain')
secret = 'bam'
normalized_secret = secret.encode('utf-8')
normalized_secret = base64.b64encode(normalized_secret)
unencrypted = self.denormalize(normalized_secret, 'text/plain')
self.assertEqual(unencrypted, 'bam')
def test_ascii_characters_to_utf8_with_app_octet_stream(self):
unencrypted = self.denormalize('bam', 'application/octet-stream')
unencrypted = self.denormalize(base64.b64encode('bam'),
'application/octet-stream')
self.assertEqual(unencrypted, 'bam')
def test_non_ascii_character_with_plain_text_raises_exception(self):
exception = s.SecretAcceptNotSupportedException
kwargs = {
'unencrypted': '\xff',
'unencrypted': base64.b64encode('\xff'),
'content_type': 'text/plain'
}
@@ -139,3 +334,10 @@ class WhenDenormalizingAfterDecryption(utils.BaseTestCase):
}
self.assertRaises(exception, self.denormalize, **kwargs)
@utils.parameterized_dataset(dataset_for_pem_denormalize)
def test_denormalize_pem(self, pem, content_type):
pem_components = translations.get_pem_components(pem)
secret = base64.b64decode(pem_components[1])
denorm_secret = self.denormalize(pem, content_type)
self.assertEqual(secret, denorm_secret)

View File

@@ -416,3 +416,57 @@ def generate_test_uuid(tail_value=0):
"""Returns a blank uuid with the given value added to the end segment."""
return '00000000-0000-0000-0000-{value:0>{pad}}'.format(value=tail_value,
pad=12)
def get_private_key():
s = ("-----BEGIN PRIVATE KEY-----"
"MIICdgIBADANBgkqhkiG9w0BAQEFAASCAmAwggJcAgEAAoGBAMxOUcg4eiBTZnIy"
"4XhEV+IoBbye/ZkXnxWQPnz9Cm+2C3rIYBev6WLqztSfi1EHnn3jM9p36KJuVgvA"
"Jr4wfn19hM9pw5Cq5hcnkVlBCAKoCM7p/jf7G2qv0yxlhXK3eZVzR/8Km3wImKN5"
"mJRqCv89I1LXyiuHYlVrznx8hjTZAgMBAAECgYAYyVu0rd1rNJMWQcQHAkujPXXM"
"t4FO5IXBaPmb0ltEyFJGOC8oUIhBHvmu5BhT4VfCMCFnXOrVYguAa3SH2HxP54Wb"
"xfycCNow5ikujEfdvsAZi1tnKedFRnJhdANCAM+6+fTNUzNElUW6kjuvwWWnRq7C"
"iCHqhd5ssVa8vMjPjQJBAPpMz0rXo2DDtYqQLOnWwAbV+djM1+ldmBdh/Q4zETDO"
"xgPfUvLBhU40LJt8NQeia6Ce4oYH+W4WRyNYvvmcGz8CQQDQ9V/8IuMJN6vyAKrc"
"WMPyLfYFu3wJ74/DX0EZ7lf+UhTlCEwEQaVQ78El1oFJDl7cjnH3Ay5GNsFfHOfd"
"uaHnAkAa21MCvNCS+WzpST8IeDpygVMlqBUivSAsoh78/w3BJu6oS7YixhD/qyl+"
"JX2rLApQWbwElaZ14X4QlN0o+08RAkEAi79vIDtcx69Z6ZGUs6UR9wfR/+dxf1ue"
"NDWiXmtOoiHXI422+SnGHCkcbefVszxPKQaDJYYVDpRUIv47+8fIhQJAIPlfqUpN"
"0c23beUUWItd+fSVLH+bxTUv+FUqLQIC2VlXVecg7+eAOMNzF2CqcpWstIcrYkV7"
"lECxnorexnYA8g=="
"-----END PRIVATE KEY-----")
return s
def get_public_key():
s = ("-----BEGIN PUBLIC KEY-----"
"MIGfMA0GCSqGSIb3DQEBAQUAA4GNADCBiQKBgQDMTlHIOHogU2ZyMuF4RFfiKAW8"
"nv2ZF58VkD58/Qpvtgt6yGAXr+li6s7Un4tRB5594zPad+iiblYLwCa+MH59fYTP"
"acOQquYXJ5FZQQgCqAjO6f43+xtqr9MsZYVyt3mVc0f/Cpt8CJijeZiUagr/PSNS"
"18orh2JVa858fIY02QIDAQAB"
"-----END PUBLIC KEY-----")
return s
def get_certificate():
s = ("-----BEGIN CERTIFICATE-----"
"MIIDTzCCArigAwIBAgIJANwgT2i4cVRAMA0GCSqGSIb3DQEBBQUAMHkxCzAJBgNV"
"BAYTAlVTMQswCQYDVQQIEwJUWDEPMA0GA1UEBxMGQXVzdGluMRYwFAYDVQQKEw1t"
"eWNvbXBhbnkuY29tMQ8wDQYDVQQDEwZjb21tb24xIzAhBgkqhkiG9w0BCQEWFGNv"
"bW1vbkBteWNvbXBhbnkuY29tMB4XDTE1MDIxNzIxMDA1N1oXDTE4MDIxNjIxMDA1"
"N1oweTELMAkGA1UEBhMCVVMxCzAJBgNVBAgTAlRYMQ8wDQYDVQQHEwZBdXN0aW4x"
"FjAUBgNVBAoTDW15Y29tcGFueS5jb20xDzANBgNVBAMTBmNvbW1vbjEjMCEGCSqG"
"SIb3DQEJARYUY29tbW9uQG15Y29tcGFueS5jb20wgZ8wDQYJKoZIhvcNAQEBBQAD"
"gY0AMIGJAoGBAMxOUcg4eiBTZnIy4XhEV+IoBbye/ZkXnxWQPnz9Cm+2C3rIYBev"
"6WLqztSfi1EHnn3jM9p36KJuVgvAJr4wfn19hM9pw5Cq5hcnkVlBCAKoCM7p/jf7"
"G2qv0yxlhXK3eZVzR/8Km3wImKN5mJRqCv89I1LXyiuHYlVrznx8hjTZAgMBAAGj"
"gd4wgdswHQYDVR0OBBYEFBxIlJZjp3+TkIwy8G3dqfCgL6GfMIGrBgNVHSMEgaMw"
"gaCAFBxIlJZjp3+TkIwy8G3dqfCgL6GfoX2kezB5MQswCQYDVQQGEwJVUzELMAkG"
"A1UECBMCVFgxDzANBgNVBAcTBkF1c3RpbjEWMBQGA1UEChMNbXljb21wYW55LmNv"
"bTEPMA0GA1UEAxMGY29tbW9uMSMwIQYJKoZIhvcNAQkBFhRjb21tb25AbXljb21w"
"YW55LmNvbYIJANwgT2i4cVRAMAwGA1UdEwQFMAMBAf8wDQYJKoZIhvcNAQEFBQAD"
"gYEANTlbvNaoADYMzGOoLgaAVt7tjQ6EZVXjswax1jnj8dEoQpUNCAzkkwFJDFqt"
"mOTTZxpWNbDm9AcbCubrLXwN22eBqYz02cBGoBnN/h2qINSL2caM08OMmMDm1g1Q"
"+iH+eUsCmvkTnylw8FJwN7TYV0No6V9/+aWvf6h1NqDiiLc="
"-----END CERTIFICATE-----")
return s

View File

@@ -21,11 +21,54 @@ import time
from testtools import testcase
from barbican.plugin.util import translations
from barbican.tests import utils
from functionaltests.api import base
from functionaltests.api.v1.behaviors import secret_behaviors
from functionaltests.api.v1.models import secret_models
def get_pem_content(pem):
b64_content = translations.get_pem_components(pem)[1]
return base64.b64decode(b64_content)
def get_private_key_req():
return {'name': 'myprivatekey',
'payload_content_type': 'application/octet-stream',
'payload_content_encoding': 'base64',
'algorithm': 'rsa',
'bit_length': 1024,
'secret_type': 'private',
'payload': utils.get_private_key()}
def get_public_key_req():
return {'name': 'mypublickey',
'payload_content_type': 'application/octet-stream',
'payload_content_encoding': 'base64',
'algorithm': 'rsa',
'bit_length': 1024,
'secret_type': 'public',
'payload': utils.get_public_key()}
def get_certificate_req():
return {'name': 'mycertificate',
'payload_content_type': 'application/octet-stream',
'payload_content_encoding': 'base64',
'algorithm': 'rsa',
'bit_length': 1024,
'secret_type': 'certificate',
'payload': utils.get_certificate()}
def get_passphrase_req():
return {'name': 'mypassphrase',
'payload_content_type': 'text/plain',
'secret_type': 'passphrase',
'payload': 'mysecretpassphrase'}
# TODO(tdink) Move to a config file
secret_create_defaults_data = {
"name": "AES key",
@@ -401,7 +444,7 @@ class SecretsTestCase(base.TestCase):
self.assertIn(
"Provided object does not match schema 'Secret': "
"payload_content_type is not one of ['text/plain', "
"payload_content_type plain-text is not one of ['text/plain', "
"'text/plain;charset=utf-8', 'text/plain; charset=utf-8', "
"'application/octet-stream'", resp_dict['description'])
self.assertIn("Bad Request", resp_dict['title'])
@@ -815,3 +858,40 @@ class SecretsTestCase(base.TestCase):
# malicious one.
regex = '.*{0}.*'.format(malicious_hostname)
self.assertNotRegexpMatches(resp.headers['location'], regex)
@utils.parameterized_dataset({
'symmetric': ['symmetric',
base64.b64decode(secret_create_defaults_data['payload']),
secret_create_defaults_data],
'private': ['private',
get_pem_content(utils.get_private_key()),
get_private_key_req()],
'public': ['public',
get_pem_content(utils.get_public_key()),
get_public_key_req()],
'certificate': ['certificate',
get_pem_content(utils.get_certificate()),
get_certificate_req()],
'passphrase': ['passphrase',
'mysecretpassphrase',
get_passphrase_req()]
})
@testcase.attr('positive')
def test_secret_create_with_secret_type(self, secret_type, expected, spec):
"""Create secrets with various secret types."""
test_model = secret_models.SecretModel(**spec)
overrides = {"secret_type": secret_type}
test_model.override_values(**overrides)
resp, secret_ref = self.behaviors.create_secret(test_model)
self.assertEqual(201, resp.status_code)
resp = self.behaviors.get_secret_metadata(secret_ref)
secret_type_response = resp.model.secret_type
self.assertIsNotNone(secret_type_response)
self.assertEqual(secret_type, secret_type_response)
content_type = spec['payload_content_type']
get_resp = self.behaviors.get_secret(secret_ref,
content_type)
self.assertEqual(expected, get_resp.content)