diff --git a/barbican/common/validators.py b/barbican/common/validators.py index 5b5a5778d..20a662eb7 100644 --- a/barbican/common/validators.py +++ b/barbican/common/validators.py @@ -26,7 +26,9 @@ from barbican.common import utils from barbican import i18n as u from barbican.model import models from barbican.openstack.common import timeutils +from barbican.plugin.interface import secret_store from barbican.plugin.util import mime_types +from barbican.plugin.util import translations LOG = utils.getLogger(__name__) @@ -130,6 +132,16 @@ class NewSecretValidator(ValidatorBase): }, "expiration": {"type": "string", "maxLength": 255}, "payload": {"type": "string"}, + "secret_type": { + "type": "string", + "maxLength": 80, + "enum": [secret_store.SecretType.SYMMETRIC, + secret_store.SecretType.PASSPHRASE, + secret_store.SecretType.PRIVATE, + secret_store.SecretType.PUBLIC, + secret_store.SecretType.CERTIFICATE, + secret_store.SecretType.OPAQUE] + }, "payload_content_type": {"type": "string", "maxLength": 255}, "payload_content_encoding": { "type": "string", @@ -178,8 +190,13 @@ class NewSecretValidator(ValidatorBase): "payload") if content_type: - self._validate_payload_content_type_is_supported(content_type, - schema_name) + self._assert_validity( + mime_types.is_supported(content_type), + schema_name, + u._("payload_content_type is not one of {supported}" + ).format(supplied=content_type, + supported=mime_types.SUPPORTED), + "payload_content_type") return json_data @@ -233,39 +250,32 @@ class NewSecretValidator(ValidatorBase): "be supplied."), "payload_content_type") - self._validate_payload_content_type_is_supported(content_type, - schema_name) - - if content_type == 'application/octet-stream': - self._assert_validity( - content_encoding is not None, - schema_name, - u._("payload_content_encoding must be specified when " - "payload_content_type is application/octet-stream."), - "payload_content_encoding") - - if content_type.startswith('text/plain'): - self._assert_validity( - content_encoding is None, - schema_name, - u._("payload_content_encoding must not be specified when " - "payload_content_type is text/plain"), - "payload_content_encoding") - - def _validate_payload_content_type_is_supported(self, content_type, - schema_name): self._assert_validity( - content_type.lower() in mime_types.SUPPORTED, + mime_types.is_supported(content_type), schema_name, - u._("payload_content_type is not one of {supported}").format( - supported=mime_types.SUPPORTED), + u._("payload_content_type {supplied} is not one of {supported}" + ).format(supplied=content_type, + supported=mime_types.SUPPORTED), "payload_content_type") + self._assert_validity( + mime_types.is_content_type_with_encoding_supported( + content_type, + content_encoding), + schema_name, + u._("payload_content_encoding is not one of {supported}").format( + supported=mime_types.get_supported_encodings(content_type)), + "payload_content_encoding") + def _validate_payload_by_content_encoding(self, payload_content_encoding, payload, schema_name): if payload_content_encoding == 'base64': try: - base64.b64decode(payload) + secret_payload = payload + if translations.is_pem_payload(payload): + pems = translations.get_pem_components(payload) + secret_payload = pems[1] + base64.b64decode(secret_payload) except TypeError: LOG.exception("Problem parsing payload") raise exception.InvalidObject(schema=schema_name, diff --git a/barbican/plugin/crypto/simple_crypto.py b/barbican/plugin/crypto/simple_crypto.py index 57ed7d720..26823469b 100644 --- a/barbican/plugin/crypto/simple_crypto.py +++ b/barbican/plugin/crypto/simple_crypto.py @@ -170,10 +170,10 @@ class SimpleCryptoPlugin(c.CryptoPluginBase): def _wrap_key(self, public_key, private_key, passphrase): pkcs = 8 - key_wrap_format = 'PEM' + key_wrap_format = 'DER' private_key = private_key.exportKey(key_wrap_format, passphrase, pkcs) - public_key = public_key.exportKey() + public_key = public_key.exportKey(key_wrap_format) return public_key, private_key @@ -182,16 +182,12 @@ class SimpleCryptoPlugin(c.CryptoPluginBase): pub_seq = asn1.DerSequence() pub_seq[:] = [0, public_key.p, public_key.q, public_key.g, public_key.y] - public_key = ("-----BEGIN DSA PUBLIC KEY-----\n{0}" - "-----END DSA PUBLIC KEY-----" - .format(pub_seq.encode().encode("base64"))) + public_key = pub_seq.encode() prv_seq = asn1.DerSequence() prv_seq[:] = [0, private_key.p, private_key.q, private_key.g, private_key.y, private_key.x] - private_key = ("-----BEGIN DSA PRIVATE KEY-----\n{0}" - "-----END DSA PRIVATE KEY-----" - .format(prv_seq.encode().encode("base64"))) + private_key = prv_seq.encode() return public_key, private_key diff --git a/barbican/plugin/dogtag.py b/barbican/plugin/dogtag.py index cd250796a..252c2da19 100644 --- a/barbican/plugin/dogtag.py +++ b/barbican/plugin/dogtag.py @@ -117,7 +117,6 @@ class DogtagKRAPlugin(sstore.SecretStoreBase): ALG = "alg" BIT_LENGTH = "bit_length" KEY_ID = "key_id" - SECRET_TYPE = "secret_type" SECRET_MODE = "secret_mode" PASSPHRASE_KEY_ID = "passphrase_key_id" CONVERT_TO_PEM = "convert_to_pem" @@ -199,7 +198,7 @@ class DogtagKRAPlugin(sstore.SecretStoreBase): self._store_secret_attributes(meta_dict, secret_dto) return meta_dict - def get_secret(self, secret_metadata): + def get_secret(self, secret_type, secret_metadata): """Retrieve a secret from the KRA The secret_metadata is simply the dict returned by a store_secret() or @@ -233,7 +232,6 @@ class DogtagKRAPlugin(sstore.SecretStoreBase): PEM format, is returned """ key_id = secret_metadata[DogtagKRAPlugin.KEY_ID] - secret_type = secret_metadata.get(DogtagKRAPlugin.SECRET_TYPE, None) key_spec = sstore.KeySpec( alg=secret_metadata.get(DogtagKRAPlugin.ALG, None), @@ -243,10 +241,11 @@ class DogtagKRAPlugin(sstore.SecretStoreBase): ) passphrase = self._get_passphrase_for_a_private_key( - secret_metadata, key_spec) + secret_type, secret_metadata, key_spec) recovered_key = None - twsk = DogtagKRAPlugin._get_trans_wrapped_session_key(secret_metadata) + twsk = DogtagKRAPlugin._get_trans_wrapped_session_key(secret_type, + secret_metadata) if DogtagKRAPlugin.CONVERT_TO_PEM in secret_metadata: # Case for returning the asymmetric keys generated in KRA. @@ -367,7 +366,6 @@ class DogtagKRAPlugin(sstore.SecretStoreBase): return {DogtagKRAPlugin.ALG: key_spec.alg, DogtagKRAPlugin.BIT_LENGTH: key_spec.bit_length, DogtagKRAPlugin.SECRET_MODE: key_spec.mode, - DogtagKRAPlugin.SECRET_TYPE: sstore.SecretType.SYMMETRIC, DogtagKRAPlugin.KEY_ID: response.get_key_id()} def generate_asymmetric_key(self, key_spec): @@ -411,7 +409,6 @@ class DogtagKRAPlugin(sstore.SecretStoreBase): public_key_metadata = { DogtagKRAPlugin.ALG: key_spec.alg, DogtagKRAPlugin.BIT_LENGTH: key_spec.bit_length, - DogtagKRAPlugin.SECRET_TYPE: sstore.SecretType.PUBLIC, DogtagKRAPlugin.KEY_ID: response.get_key_id(), DogtagKRAPlugin.CONVERT_TO_PEM: "true" } @@ -419,7 +416,6 @@ class DogtagKRAPlugin(sstore.SecretStoreBase): private_key_metadata = { DogtagKRAPlugin.ALG: key_spec.alg, DogtagKRAPlugin.BIT_LENGTH: key_spec.bit_length, - DogtagKRAPlugin.SECRET_TYPE: sstore.SecretType.PRIVATE, DogtagKRAPlugin.KEY_ID: response.get_key_id(), DogtagKRAPlugin.CONVERT_TO_PEM: "true" } @@ -489,12 +485,10 @@ class DogtagKRAPlugin(sstore.SecretStoreBase): meta_dict[DogtagKRAPlugin.BIT_LENGTH] = key_spec.bit_length if key_spec.mode is not None: meta_dict[DogtagKRAPlugin.SECRET_MODE] = key_spec.mode - if secret_dto.type is not None: - meta_dict[DogtagKRAPlugin.SECRET_TYPE] = secret_dto.type - def _get_passphrase_for_a_private_key(self, secret_metadata, key_spec): + def _get_passphrase_for_a_private_key(self, secret_type, secret_metadata, + key_spec): """Retrieve the passphrase for the private key stored in the KRA.""" - secret_type = secret_metadata.get(DogtagKRAPlugin.SECRET_TYPE, None) if secret_type is None: return None if key_spec.alg is None: @@ -520,9 +514,8 @@ class DogtagKRAPlugin(sstore.SecretStoreBase): return passphrase @staticmethod - def _get_trans_wrapped_session_key(secret_metadata): + def _get_trans_wrapped_session_key(secret_type, secret_metadata): twsk = secret_metadata.get('trans_wrapped_session_key', None) - secret_type = secret_metadata.get(DogtagKRAPlugin.SECRET_TYPE, None) if secret_type in [sstore.SecretType.PUBLIC, sstore.SecretType.PRIVATE]: if twsk: diff --git a/barbican/plugin/interface/secret_store.py b/barbican/plugin/interface/secret_store.py index ccadc770f..5f09605ec 100644 --- a/barbican/plugin/interface/secret_store.py +++ b/barbican/plugin/interface/secret_store.py @@ -302,7 +302,7 @@ class SecretDTO(object): secret has been encrypted using a transport key. The transport key is a base64 encoded x509 transport certificate. """ - self.type = type + self.type = type or SecretType.OPAQUE self.secret = secret self.key_spec = key_spec self.content_type = content_type @@ -386,7 +386,7 @@ class SecretStoreBase(object): raise NotImplementedError # pragma: no cover @abc.abstractmethod - def get_secret(self, secret_metadata): + def get_secret(self, secret_type, secret_metadata): """Retrieves a secret from the secret store. Retrieves a secret from the secret store and returns a SecretDTO that @@ -396,6 +396,12 @@ class SecretStoreBase(object): generate or store methods. This data is used by the plugins to retrieve the key. + The secret_type parameter may be useful for secret stores to know the + expected format of the secret. For instance if the type is + SecretDTO.PRIVATE then a PKCS8 structure is returned. This way secret + stores do not need to manage the secret type on their own. + + :param secret_type: secret type :param secret_metadata: secret metadata :returns: SecretDTO that contains secret """ diff --git a/barbican/plugin/kmip_secret_store.py b/barbican/plugin/kmip_secret_store.py index a48934c15..0adc0d772 100644 --- a/barbican/plugin/kmip_secret_store.py +++ b/barbican/plugin/kmip_secret_store.py @@ -250,9 +250,10 @@ class KMIPSecretStore(ss.SecretStoreBase): self.client.close() LOG.debug("Closed connection to KMIP client for secret storage") - def get_secret(self, secret_metadata): + def get_secret(self, secret_type, secret_metadata): """Gets a secret + :param secret_type: secret type :param secret_metadata: Dictionary of key metadata, requires: {'key_uuid': } :returns: SecretDTO of the retrieved Secret @@ -273,9 +274,6 @@ class KMIPSecretStore(ss.SecretStoreBase): if result.result_status.enum == enums.ResultStatus.SUCCESS: secret_block = result.secret.key_block - secret_type = self._map_type_kmip_to_ss( - result.object_type.enum) - key_value_type = type(secret_block.key_value.key_material) if key_value_type == kmip_objects.KeyMaterialStruct: secret_value = self._convert_byte_array_to_base64( @@ -464,19 +462,6 @@ class KMIPSecretStore(ss.SecretStoreBase): else: return None - def _map_type_kmip_to_ss(self, object_type): - """Map KMIP type enum to SecretType enum - - Returns None if the type is not supported. The KMIP plugin only - supports symmetric keys for now. - :param object_type: KMIP type enum - :returns: SecretType enum if type is supported, None if not supported - """ - if object_type == enums.ObjectType.SYMMETRIC_KEY: - return ss.SecretType.SYMMETRIC - else: - return None - def _map_algorithm_ss_to_kmip(self, algorithm): """Map SecretStore enum value to the KMIP algorithm enum diff --git a/barbican/plugin/resources.py b/barbican/plugin/resources.py index d4592776d..743ee1e5c 100644 --- a/barbican/plugin/resources.py +++ b/barbican/plugin/resources.py @@ -101,17 +101,12 @@ def store_secret(unencrypted_raw, content_type_raw, content_encoding, key_spec=key_spec, plugin_name=plugin_name) # Normalize inputs prior to storage. - # TODO(john-wood-w) Normalize all secrets to base64, so we don't have to - # pass in 'content' type to the store_secret() call below. unencrypted, content_type = tr.normalize_before_encryption( unencrypted_raw, content_type_raw, content_encoding, - enforce_text_only=True) + secret_model.secret_type, enforce_text_only=True) # Store the secret securely. - secret_type = None - if key_spec is not None: - secret_type = secret_store.KeyAlgorithm().get_secret_type(key_spec.alg) - secret_dto = secret_store.SecretDTO(type=secret_type, + secret_dto = secret_store.SecretDTO(type=secret_model.secret_type, secret=unencrypted, key_spec=key_spec, content_type=content_type, @@ -181,6 +176,7 @@ def generate_secret(spec, content_type, project_model): # Create secret model to eventually save metadata to. secret_model = models.Secret(spec) + secret_model['secret_type'] = secret_store.SecretType.SYMMETRIC # Generate the secret. secret_metadata = _generate_symmetric_key( @@ -206,9 +202,14 @@ def generate_asymmetric_secret(spec, content_type, project_model): # Create secret models to eventually save metadata to. private_secret_model = models.Secret(spec) + private_secret_model['secret_type'] = secret_store.SecretType.PRIVATE public_secret_model = models.Secret(spec) + public_secret_model['secret_type'] = secret_store.SecretType.PUBLIC passphrase_secret_model = (models.Secret(spec) if spec.get('passphrase') else None) + if passphrase_secret_model: + passphrase_type = secret_store.SecretType.PASSPHRASE + passphrase_secret_model['secret_type'] = passphrase_type # Generate the secret. asymmetric_meta_dto = _generate_asymmetric_key( @@ -319,9 +320,12 @@ def _get_secret(retrieve_plugin, secret_metadata, secret_model, project_model): context = store_crypto.StoreCryptoContext( project_model, secret_model=secret_model) - secret_dto = retrieve_plugin.get_secret(secret_metadata, context) + secret_dto = retrieve_plugin.get_secret(secret_model.secret_type, + secret_metadata, + context) else: - secret_dto = retrieve_plugin.get_secret(secret_metadata) + secret_dto = retrieve_plugin.get_secret(secret_model.secret_type, + secret_metadata) return secret_dto diff --git a/barbican/plugin/store_crypto.py b/barbican/plugin/store_crypto.py index ee4f44ff5..e976d708b 100644 --- a/barbican/plugin/store_crypto.py +++ b/barbican/plugin/store_crypto.py @@ -21,6 +21,7 @@ from barbican.model import repositories from barbican.plugin.crypto import crypto from barbican.plugin.crypto import manager from barbican.plugin.interface import secret_store as sstore +from barbican.plugin.util import translations CONF = cfg.CONF @@ -82,7 +83,14 @@ class StoreCryptoAdapterPlugin(object): encrypting_plugin, context.project_model) # Secrets are base64 encoded before being passed to the secret stores. - normalized_secret = base64.b64decode(secret_dto.secret) + normalized_secret = secret_dto.secret + secret_type = secret_dto.type + if (secret_type == sstore.SecretType.PRIVATE or + secret_type == sstore.SecretType.PUBLIC or + secret_type == sstore.SecretType.CERTIFICATE): + normalized_secret = translations.get_pem_components( + normalized_secret)[1] + normalized_secret = base64.b64decode(normalized_secret) encrypt_dto = crypto.EncryptDTO(normalized_secret) # Enhance the context with content_type, This is needed to build @@ -101,10 +109,11 @@ class StoreCryptoAdapterPlugin(object): return None - def get_secret(self, secret_metadata, context): + def get_secret(self, secret_type, metadata, context): """Retrieve a secret. - :param secret_metadata: secret metadata + :param secret_type: secret type + :param metadata: secret metadata :param context: StoreCryptoContext for secret :returns: SecretDTO that contains secret """ @@ -131,11 +140,16 @@ class StoreCryptoAdapterPlugin(object): kek_meta_dto, datum_model.kek_meta_extended, context.project_model.external_id) + secret = base64.b64encode(secret) + if (secret_type == sstore.SecretType.PRIVATE or + secret_type == sstore.SecretType.PUBLIC or + secret_type == sstore.SecretType.CERTIFICATE): + secret = translations.to_pem(secret_type, secret, True) key_spec = sstore.KeySpec(alg=context.secret_model.algorithm, bit_length=context.secret_model.bit_length, mode=context.secret_model.mode) - return sstore.SecretDTO(sstore.SecretType.SYMMETRIC, + return sstore.SecretDTO(secret_type, secret, key_spec, datum_model.content_type) diff --git a/barbican/plugin/util/mime_types.py b/barbican/plugin/util/mime_types.py index 98bd448ec..76a43625a 100644 --- a/barbican/plugin/util/mime_types.py +++ b/barbican/plugin/util/mime_types.py @@ -26,7 +26,9 @@ PLAIN_TEXT = ['text/plain', 'text/plain;charset=utf-8', 'text/plain; charset=utf-8'] PLAIN_TEXT_CHARSETS = ['utf-8'] -BINARY = ['application/octet-stream'] +BINARY = ['application/octet-stream', + 'application/pkcs8', + 'application/pkix-cert'] SUPPORTED = PLAIN_TEXT + BINARY # Normalizes client types to internal types. @@ -34,6 +36,8 @@ INTERNAL_CTYPES = {'text/plain': 'text/plain', 'text/plain;charset=utf-8': 'text/plain', 'text/plain; charset=utf-8': 'text/plain', 'application/octet-stream': 'application/octet-stream', + 'application/pkcs8': 'application/pkcs8', + 'application/pkix-cert': 'application/pkix-cert', 'application/aes': 'application/aes'} # Maps mime-types used to specify secret data formats to the types that can @@ -42,9 +46,13 @@ INTERNAL_CTYPES = {'text/plain': 'text/plain', # which are then used as the keys to the 'CTYPES_MAPPINGS' below. CTYPES_PLAIN = {'default': 'text/plain'} CTYPES_BINARY = {'default': 'application/octet-stream'} +CTYPES_PKCS8 = {'default': 'application/pkcs8'} +CTYPES_PKIX_CERT = {'default': 'application/pkix-cert'} CTYPES_AES = {'default': 'application/aes'} CTYPES_MAPPINGS = {'text/plain': CTYPES_PLAIN, 'application/octet-stream': CTYPES_BINARY, + 'application/pkcs8': CTYPES_PKCS8, + 'application/pkix-cert': CTYPES_PKIX_CERT, 'application/aes': CTYPES_AES} # Supported encodings @@ -53,6 +61,8 @@ ENCODINGS = ['base64'] # Maps normalized content-types to supported encoding(s) CTYPES_TO_ENCODINGS = {'text/plain': None, 'application/octet-stream': ['base64', 'binary'], + 'application/pkcs8': ['base64', 'binary'], + 'application/pkix-cert': ['base64', 'binary'], 'application/aes': None} @@ -75,7 +85,8 @@ def normalize_content_type(mime_type): def is_supported(mime_type): - return mime_type in SUPPORTED + normalized_type = normalize_content_type(mime_type) + return normalized_type in SUPPORTED def is_base64_encoding_supported(mime_type): @@ -85,6 +96,22 @@ def is_base64_encoding_supported(mime_type): return False +def is_content_type_with_encoding_supported(content_type, content_encoding): + if not is_supported(content_type): + return False + normalized_type = normalize_content_type(content_type) + encodings = CTYPES_TO_ENCODINGS[INTERNAL_CTYPES[normalized_type]] + if encodings: + return content_encoding in encodings + else: + return content_encoding is None + + +def get_supported_encodings(content_type): + normalized_type = normalize_content_type(content_type) + return CTYPES_TO_ENCODINGS[INTERNAL_CTYPES[normalized_type]] + + def is_base64_processing_needed(content_type, content_encoding): content_encodings = utils.get_accepted_encodings_direct(content_encoding) if content_encodings: diff --git a/barbican/plugin/util/translations.py b/barbican/plugin/util/translations.py index acaa58857..65ad90d9d 100644 --- a/barbican/plugin/util/translations.py +++ b/barbican/plugin/util/translations.py @@ -18,15 +18,21 @@ from barbican.plugin.util import mime_types def normalize_before_encryption(unencrypted, content_type, content_encoding, - enforce_text_only=False): - """Normalize unencrypted prior to plugin encryption processing.""" + secret_type, enforce_text_only=False): + """Normalize unencrypted prior to plugin encryption processing. + + This normalizes the secrets before they are handed off to the SecretStore + for storage. This converts all data to Base64 data. If the data is plain + text then it encoded using utf-8 first and then Base64 encoded. Binary + data is simply converted to Base64. In addition if the secret type is + one of private, public, or certificate then the PEM headers are added + to the Base64 encoding. + """ if not unencrypted: raise s.SecretNoPayloadProvidedException() # Validate and normalize content-type. - normalized_mime = mime_types.normalize_content_type(content_type) - if not mime_types.is_supported(normalized_mime): - raise s.SecretContentTypeNotSupportedException(content_type) + normalized_mime = normalize_content_type(content_type) # Process plain-text type. if normalized_mime in mime_types.PLAIN_TEXT: @@ -39,14 +45,22 @@ def normalize_before_encryption(unencrypted, content_type, content_encoding, if content_encoding: content_encoding = content_encoding.lower() if content_encoding == 'base64': + b64payload = unencrypted + if is_pem_payload(unencrypted): + pem_components = get_pem_components(unencrypted) + b64payload = pem_components[1] try: - base64.b64decode(unencrypted) + base64.b64decode(b64payload) except TypeError: raise s.SecretPayloadDecodingError() - unencrypted = unencrypted elif mime_types.use_binary_content_as_is(content_type, content_encoding): - unencrypted = base64.b64encode(unencrypted) + if (secret_type == s.SecretType.PRIVATE or + secret_type == s.SecretType.PUBLIC or + secret_type == s.SecretType.CERTIFICATE): + unencrypted = to_pem(secret_type, unencrypted) + else: + unencrypted = base64.b64encode(unencrypted) elif enforce_text_only: # For text-based protocols (such as the one-step secret POST), # only 'base64' encoding is possible/supported. @@ -60,6 +74,14 @@ def normalize_before_encryption(unencrypted, content_type, content_encoding, return unencrypted, normalized_mime +def normalize_content_type(content_type): + """Normalize the content type and validate that it is supported.""" + normalized_mime = mime_types.normalize_content_type(content_type) + if not mime_types.is_supported(normalized_mime): + raise s.SecretContentTypeNotSupportedException(content_type) + return normalized_mime + + def analyze_before_decryption(content_type): """Determine support for desired content type.""" if not mime_types.is_supported(content_type): @@ -67,17 +89,90 @@ def analyze_before_decryption(content_type): def denormalize_after_decryption(unencrypted, content_type): - """Translate the decrypted data into the desired content type.""" + """Translate the decrypted data into the desired content type. + + This is called when the raw keys are requested by the user. The secret + returned from the SecretStore is the unencrypted parameter. This + 'denormalizes' the data back to its binary format. + """ # Process plain-text type. if content_type in mime_types.PLAIN_TEXT: # normalize text to binary string try: + unencrypted = base64.b64decode(unencrypted) unencrypted = unencrypted.decode('utf-8') except UnicodeDecodeError: raise s.SecretAcceptNotSupportedException(content_type) # Process binary type. - elif content_type not in mime_types.BINARY: + elif content_type in mime_types.BINARY: + if is_pem_payload(unencrypted): + unencrypted = get_pem_components(unencrypted)[1] + unencrypted = base64.b64decode(unencrypted) + else: raise s.SecretContentTypeNotSupportedException(content_type) return unencrypted + + +def get_pem_components(pem): + """Returns the PEM content, header, and footer. + + This parses the PEM string and returns the PEM header, content, and footer. + The content is the base64 encoded bytes without the header and footer. This + is returned as a list. The order of the list is header, content, footer. + """ + delim = "-----" + splits = pem.split(delim) + if len(splits) != 5 or splits[0] != "" or splits[4] != "": + raise s.SecretPayloadDecodingError() + header = delim + splits[1] + delim + content = splits[2] + footer = delim + splits[3] + delim + return (header, content, footer) + + +def is_pem_payload(payload): + """Tests whether payload is in PEM format. + + This parses the payload for the PEM header and footer strings. If it finds + the header and footer strings then it is assumed to be a PEM payload. + """ + delim = "-----" + splits = payload.split(delim) + if len(splits) != 5 or splits[0] != "" or splits[4] != "": + return False + else: + return True + + +def to_pem(secret_type, payload, payload_encoded=False): + """Converts payload to PEM format. + + This converts the payload to Base 64 encoding if payload_encoded is False + and then adds PEM headers. This uses the secret_type to determined the PEM + header. + """ + pem = payload + if payload_encoded: + pem_content = payload + else: + pem_content = base64.b64encode(payload) + + if secret_type == s.SecretType.PRIVATE: + headers = _get_pem_headers("PRIVATE KEY") + pem = headers[0] + pem_content + headers[1] + elif secret_type == s.SecretType.PUBLIC: + headers = _get_pem_headers("PUBLIC KEY") + pem = headers[0] + pem_content + headers[1] + elif secret_type == s.SecretType.CERTIFICATE: + headers = _get_pem_headers("CERTIFICATE") + pem = headers[0] + pem_content + headers[1] + + return pem + + +def _get_pem_headers(pem_name): + header = "-----BEGIN {}-----".format(pem_name) + footer = "-----END {}-----".format(pem_name) + return (header, footer) diff --git a/barbican/tasks/certificate_resources.py b/barbican/tasks/certificate_resources.py index f66df3281..1de1029e6 100644 --- a/barbican/tasks/certificate_resources.py +++ b/barbican/tasks/certificate_resources.py @@ -330,7 +330,7 @@ def _save_plugin_metadata(order_model, plugin_meta): def _save_secrets(result, project_model): cert_secret_model, transport_key_model = plugin.store_secret( unencrypted_raw=result.certificate, - content_type_raw='text/plain', + content_type_raw='application/pkix-cert', content_encoding='base64', spec={}, secret_model=None, @@ -340,7 +340,7 @@ def _save_secrets(result, project_model): if result.intermediates: intermediates_secret_model, transport_key_model = plugin.store_secret( unencrypted_raw=result.intermediates, - content_type_raw='text/plain', + content_type_raw='application/pkix-cert', content_encoding='base64', spec={}, secret_model=None, diff --git a/barbican/tests/common/test_validators.py b/barbican/tests/common/test_validators.py index 797731c58..2aa6e7a53 100644 --- a/barbican/tests/common/test_validators.py +++ b/barbican/tests/common/test_validators.py @@ -27,6 +27,53 @@ VALID_EXTENSIONS = "valid extensions" VALID_FULL_CMC = "valid CMC" +def get_symmetric_key_req(): + return {'name': 'mysymmetrickey', + 'payload_content_type': 'application/octet-stream', + 'payload_content_encoding': 'base64', + 'algorithm': 'aes', + 'bit_length': 256, + 'secret_type': 'symmetric', + 'payload': 'gF6+lLoF3ohA9aPRpt+6bQ=='} + + +def get_private_key_req(): + return {'name': 'myprivatekey', + 'payload_content_type': 'application/pkcs8', + 'payload_content_encoding': 'base64', + 'algorithm': 'rsa', + 'bit_length': 1024, + 'secret_type': 'private', + 'payload': utils.get_private_key()} + + +def get_public_key_req(): + return {'name': 'mypublickey', + 'payload_content_type': 'application/octet-stream', + 'payload_content_encoding': 'base64', + 'algorithm': 'rsa', + 'bit_length': 1024, + 'secret_type': 'public', + 'payload': utils.get_public_key()} + + +def get_certificate_req(): + return {'name': 'mycertificate', + 'payload_content_type': 'application/pkix-cert', + 'payload_content_encoding': 'base64', + 'algorithm': 'rsa', + 'bit_length': 1024, + 'secret_type': 'certificate', + 'payload': utils.get_certificate()} + + +def get_passphrase_req(): + return {'name': 'mypassphrase', + 'payload_content_type': 'text/plain', + 'secret_type': 'passphrase', + 'payload': 'mysecretpassphrase'} + + def suite(): suite = unittest.TestSuite() @@ -62,6 +109,7 @@ class WhenTestingValidatorsFunctions(utils.BaseTestCase): self.assertTrue(is_too_big) +@utils.parameterized_test_case class WhenTestingSecretValidator(utils.BaseTestCase): def setUp(self): @@ -72,12 +120,14 @@ class WhenTestingSecretValidator(utils.BaseTestCase): self.payload_content_type = 'text/plain' self.secret_algorithm = 'algo' self.secret_bit_length = 512 + self.secret_type = 'opaque' self.secret_mode = 'cytype' self.secret_req = {'name': self.name, 'payload_content_type': self.payload_content_type, 'algorithm': self.secret_algorithm, 'bit_length': self.secret_bit_length, + 'secret_type': self.secret_type, 'mode': self.secret_mode, 'payload': self.payload} @@ -333,6 +383,59 @@ class WhenTestingSecretValidator(utils.BaseTestCase): ) self.assertEqual('payload', exception.invalid_property) + def test_should_pass_with_no_secret_type(self): + request = dict(self.secret_req) + del request['secret_type'] + self.validator.validate(request) + + def test_should_fail_with_unknown_secret_type(self): + self.secret_req['secret_type'] = 'unknown_type' + self.assertRaises( + excep.InvalidObject, + self.validator.validate, + self.secret_req, + ) + + @utils.parameterized_dataset({ + 'symmetric': [get_symmetric_key_req()], + 'private': [get_private_key_req()], + 'public': [get_public_key_req()], + 'certificate': [get_certificate_req()], + 'passphrase': [get_passphrase_req()], + }) + def test_should_pass_with_secret_type(self, request): + self.validator.validate(request) + + @utils.parameterized_dataset({ + 'symmetric': [get_symmetric_key_req(), 'foo'], + 'private': [get_private_key_req(), 'foo'], + 'public': [get_public_key_req(), 'foo'], + 'certificate': [get_certificate_req(), 'foo'], + 'passphrase': [get_passphrase_req(), 'base64'], + }) + def test_should_fail_with_bad_encoding(self, request, content_encoding): + request['payload_content_encoding'] = content_encoding + self.assertRaises( + excep.InvalidObject, + self.validator.validate, + request, + ) + + @utils.parameterized_dataset({ + 'symmetric': [get_symmetric_key_req(), 'text/plain'], + 'private': [get_private_key_req(), 'text/plain'], + 'public': [get_public_key_req(), 'text/plain'], + 'certificate': [get_certificate_req(), 'text/plain'], + 'passphrase': [get_passphrase_req(), 'application/octet-stream'], + }) + def test_should_fail_with_bad_content_type(self, request, content_type): + request['payload_content_type'] = content_type + self.assertRaises( + excep.InvalidObject, + self.validator.validate, + request, + ) + class WhenTestingContainerValidator(utils.BaseTestCase): @@ -1182,5 +1285,6 @@ class WhenTestingStoredKeyOrderValidator(utils.BaseTestCase): self.validator.validate, self.order_req) + if __name__ == '__main__': unittest.main() diff --git a/barbican/tests/plugin/crypto/test_crypto.py b/barbican/tests/plugin/crypto/test_crypto.py index e9c64c9b3..7bed79355 100644 --- a/barbican/tests/plugin/crypto/test_crypto.py +++ b/barbican/tests/plugin/crypto/test_crypto.py @@ -370,9 +370,7 @@ class WhenTestingSimpleCryptoPlugin(utils.BaseTestCase): mock.MagicMock()) prv_seq = asn1.DerSequence() - data = "\n".join(private_dto.strip().split("\n") - [1:-1]).decode("base64") - prv_seq.decode(data) + prv_seq.decode(private_dto) p, q, g, y, x = prv_seq[1:] private_dto = DSA.construct((y, g, p, q, x)) diff --git a/barbican/tests/plugin/test_kmip.py b/barbican/tests/plugin/test_kmip.py index c323acc59..e2c8903ae 100644 --- a/barbican/tests/plugin/test_kmip.py +++ b/barbican/tests/plugin/test_kmip.py @@ -48,6 +48,7 @@ class WhenTestingKMIPSecretStore(utils.BaseTestCase): self.secret_store = kss.KMIPSecretStore(CONF) self.secret_store.client = self.kmipclient_mock self.secret_store.credential = self.credential + self.symmetric_type = secret_store.SecretType.SYMMETRIC self.sample_secret_features = { 'key_format_type': enums.KeyFormatType.RAW, @@ -280,13 +281,14 @@ class WhenTestingKMIPSecretStore(utils.BaseTestCase): def test_get_secret_symmetric_assert_called(self): metadata = {kss.KMIPSecretStore.KEY_UUID: 'uuid'} - self.secret_store.get_secret(metadata) + self.secret_store.get_secret(self.symmetric_type, metadata) self.kmipclient_mock.get.assert_called_once_with('uuid', self.credential) def test_get_secret_symmetric_return_value_key_material_struct(self): metadata = {kss.KMIPSecretStore.KEY_UUID: 'uuid'} - return_value = self.secret_store.get_secret(metadata) + return_value = self.secret_store.get_secret(self.symmetric_type, + metadata) self.assertEqual(secret_store.SecretDTO, type(return_value)) self.assertEqual(secret_store.SecretType.SYMMETRIC, return_value.type) # The plugin returns a base64 string for the secret @@ -306,7 +308,8 @@ class WhenTestingKMIPSecretStore(utils.BaseTestCase): secret=sample_secret)) metadata = {kss.KMIPSecretStore.KEY_UUID: 'uuid'} - return_value = self.secret_store.get_secret(metadata) + return_value = self.secret_store.get_secret(self.symmetric_type, + metadata) self.assertEqual(secret_store.SecretDTO, type(return_value)) self.assertEqual(secret_store.SecretType.SYMMETRIC, return_value.type) # The plugin returns a base64 string for the secret @@ -328,7 +331,7 @@ class WhenTestingKMIPSecretStore(utils.BaseTestCase): self.assertRaises( secret_store.SecretGeneralException, self.secret_store.get_secret, - metadata) + self.symmetric_type, metadata) def test_get_secret_symmetric_error_occurs(self): self.secret_store.client.get = mock.create_autospec( @@ -338,7 +341,7 @@ class WhenTestingKMIPSecretStore(utils.BaseTestCase): self.assertRaises( secret_store.SecretGeneralException, self.secret_store.get_secret, - metadata) + self.symmetric_type, metadata) def test_get_secret_symmetric_error_opening_connection(self): self.secret_store.client.open = mock.Mock(side_effect=socket.error) @@ -347,7 +350,7 @@ class WhenTestingKMIPSecretStore(utils.BaseTestCase): self.assertRaises( secret_store.SecretGeneralException, self.secret_store.get_secret, - metadata) + self.symmetric_type, metadata) # ---------------- TEST DELETE ------------------------------------------- @@ -396,16 +399,6 @@ class WhenTestingKMIPSecretStore(utils.BaseTestCase): self.assertIsNone( self.secret_store._map_type_ss_to_kmip('bad_type')) - def test_map_type_kmip_to_ss_valid_type(self): - kmip_types = [enums.ObjectType.SYMMETRIC_KEY] - for kmip_type in kmip_types: - self.assertIsNotNone( - self.secret_store._map_type_kmip_to_ss(kmip_type)) - - def test_map_type_kmip_to_ss_invalid_type(self): - self.assertIsNone( - self.secret_store._map_type_kmip_to_ss('bad_type')) - def test_map_algorithm_ss_to_kmip_valid_alg(self): ss_algs = [secret_store.KeyAlgorithm.AES, secret_store.KeyAlgorithm.DES, diff --git a/barbican/tests/plugin/test_resource.py b/barbican/tests/plugin/test_resource.py index 8dcfcdbec..887074ed0 100644 --- a/barbican/tests/plugin/test_resource.py +++ b/barbican/tests/plugin/test_resource.py @@ -17,11 +17,14 @@ import base64 import mock import testtools +from barbican.model import models from barbican.plugin.interface import secret_store from barbican.plugin import resources +from barbican.plugin import store_crypto from barbican.tests import utils +@utils.parameterized_test_case class WhenTestingPluginResource(testtools.TestCase, utils.MockModelRepositoryMixin): @@ -51,11 +54,10 @@ class WhenTestingPluginResource(testtools.TestCase, } self.moc_plugin_patcher = mock.patch( - 'barbican.plugin.interface.secret_store' - '.get_manager', + 'barbican.plugin.interface.secret_store.get_manager', **moc_plugin_config ) - self.moc_plugin_patcher.start() + self.moc_plugin_manager = self.moc_plugin_patcher.start() self.addCleanup(self.moc_plugin_patcher.stop) self.setup_project_repository_mock() @@ -85,7 +87,8 @@ class WhenTestingPluginResource(testtools.TestCase, super(WhenTestingPluginResource, self).tearDown() def test_store_secret_dto(self): - spec = {'algorithm': 'AES', 'bit_length': 256} + spec = {'algorithm': 'AES', 'bit_length': 256, + 'secret_type': 'symmetric'} secret = base64.b64encode('ABCDEFABCDEFABCDEFABCDEF') self.plugin_resource.store_secret( @@ -103,6 +106,67 @@ class WhenTestingPluginResource(testtools.TestCase, self.assertEqual(spec['bit_length'], dto.key_spec.bit_length) self.assertEqual(self.content_type, dto.content_type) + @utils.parameterized_dataset({ + 'general_secret_store': { + 'moc_plugin': None + }, + 'store_crypto': { + 'moc_plugin': mock.MagicMock(store_crypto.StoreCryptoAdapterPlugin) + } + }) + def test_get_secret_dto(self, moc_plugin): + + def mock_secret_store_store_secret(dto): + self.secret_dto = dto + + def mock_secret_store_get_secret(secret_type, secret_metadata): + return self.secret_dto + + def mock_store_crypto_store_secret(dto, context): + self.secret_dto = dto + + def mock_store_crypto_get_secret( + secret_type, secret_metadata, context): + return self.secret_dto + + if moc_plugin: + self.moc_plugin = moc_plugin + self.moc_plugin.store_secret.return_value = {} + self.moc_plugin.store_secret.side_effect = ( + mock_store_crypto_store_secret) + self.moc_plugin.get_secret.side_effect = ( + mock_store_crypto_get_secret) + + moc_plugin_config = { + 'return_value.get_plugin_store.return_value': + self.moc_plugin, + 'return_value.get_plugin_retrieve_delete.return_value': + self.moc_plugin + } + self.moc_plugin_manager.configure_mock(**moc_plugin_config) + else: + self.moc_plugin.store_secret.side_effect = ( + mock_secret_store_store_secret) + self.moc_plugin.get_secret.side_effect = ( + mock_secret_store_get_secret) + + raw_secret = 'ABCDEFABCDEFABCDEFABCDEF' + spec = {'name': 'testsecret', 'algorithm': 'AES', 'bit_length': 256, + 'secret_type': 'symmetric'} + + self.plugin_resource.store_secret( + base64.b64encode(raw_secret), + self.content_type, + 'base64', + spec, + None, + self.project_model) + secret = self.plugin_resource.get_secret( + 'application/octet-stream', + models.Secret(spec), + None) + self.assertEqual(raw_secret, secret) + def test_generate_asymmetric_with_passphrase(self): """test asymmetric secret generation with passphrase.""" secret_container = self.plugin_resource.generate_asymmetric_secret( diff --git a/barbican/tests/plugin/test_store_crypto.py b/barbican/tests/plugin/test_store_crypto.py index 6809cbe40..5104165d4 100644 --- a/barbican/tests/plugin/test_store_crypto.py +++ b/barbican/tests/plugin/test_store_crypto.py @@ -22,9 +22,39 @@ from barbican.model import models from barbican.plugin.crypto import crypto from barbican.plugin.interface import secret_store from barbican.plugin import store_crypto +from barbican.plugin.util import translations from barbican.tests import utils as test_utils +def get_private_dto(): + spec = secret_store.KeySpec(secret_store.KeyAlgorithm.RSA, 1024) + return secret_store.SecretDTO(secret_store.SecretType.PRIVATE, + test_utils.get_private_key(), + spec, + 'application/pkcs8') + + +def get_public_dto(): + spec = secret_store.KeySpec(secret_store.KeyAlgorithm.RSA, 1024) + return secret_store.SecretDTO(secret_store.SecretType.PUBLIC, + test_utils.get_public_key(), + spec, + 'application/octet-stream') + + +def get_certificate_dto(): + spec = secret_store.KeySpec(secret_store.KeyAlgorithm.RSA, 1024) + return secret_store.SecretDTO(secret_store.SecretType.CERTIFICATE, + test_utils.get_certificate(), + spec, + 'application/pkix-cert') + + +def get_pem_content(pem): + b64_content = translations.get_pem_components(pem)[1] + return base64.b64decode(b64_content) + + class TestSecretStoreBase(testtools.TestCase, test_utils.MockModelRepositoryMixin): """Define common configurations for testing store_crypto.py.""" @@ -48,7 +78,7 @@ class TestSecretStoreBase(testtools.TestCase, self.project_model.id = 'project-model-id' self.project_model.external_id = self.project_id self.secret_dto = secret_store.SecretDTO( - secret_store.SecretType.SYMMETRIC, + secret_store.SecretType.OPAQUE, self.secret, secret_store.KeySpec(), self.content_type @@ -132,8 +162,15 @@ class TestSecretStoreBase(testtools.TestCase, self.setup_kek_datum_repository_mock(self.kek_repo) +@test_utils.parameterized_test_case class WhenTestingStoreCrypto(TestSecretStoreBase): + dataset_for_pem = { + 'private': [get_private_dto()], + 'public': [get_public_dto()], + 'certificate': [get_certificate_dto()] + } + def setUp(self): super(WhenTestingStoreCrypto, self).setUp() @@ -170,17 +207,41 @@ class WhenTestingStoreCrypto(TestSecretStoreBase): self.assertEqual(self.content_type, self.context.content_type) + @test_utils.parameterized_dataset(dataset_for_pem) + def test_store_pem_secret(self, secret_dto): + """Test storing a secret that is PEM encoded.""" + + response_dict = self.plugin_to_test.store_secret( + secret_dto, self.context) + + self.assertEqual(None, response_dict) + + content = translations.get_pem_components(secret_dto.secret)[1] + raw_content = base64.b64decode(content) + + # Verify encrypt plugin and method where invoked. + encrypt_mock = self.encrypting_plugin.encrypt + self.assertEqual(encrypt_mock.call_count, 1) + args, kwargs = encrypt_mock.call_args + test_encrypt_dto, test_kek_meta_dto, test_project_id = tuple(args) + self.assertIsInstance(test_encrypt_dto, crypto.EncryptDTO) + self.assertEqual(raw_content, test_encrypt_dto.unencrypted) + self.assertEqual(self.kek_meta_dto, test_kek_meta_dto) + self.assertEqual(self.project_id, test_project_id) + def test_get_secret(self): """Test getting a secret.""" secret_dto = self.plugin_to_test.get_secret( + secret_store.SecretType.OPAQUE, None, # Secret metadata is not relevant to store_crypto process. self.context) # Verify response. self.assertIsInstance(secret_dto, secret_store.SecretDTO) - self.assertEqual(secret_store.SecretType.SYMMETRIC, secret_dto.type) - self.assertEqual(self.decrypted_secret, secret_dto.secret) + self.assertEqual(secret_store.SecretType.OPAQUE, secret_dto.type) + self.assertEqual(base64.b64encode(self.decrypted_secret), + secret_dto.secret) self.assertEqual( self.encrypted_datum_model.content_type, secret_dto.content_type) self.assertIsInstance(secret_dto.key_spec, secret_store.KeySpec) @@ -217,6 +278,39 @@ class WhenTestingStoreCrypto(TestSecretStoreBase): self.assertEqual(self.project_id, test_project_id) + @test_utils.parameterized_dataset(dataset_for_pem) + def test_get_secret_encoding(self, input_secret_dto): + """Test getting a secret that should be returend in PEM format.""" + secret = input_secret_dto.secret + key_spec = input_secret_dto.key_spec + secret_type = input_secret_dto.type + + decrypt_mock = self.retrieving_plugin.decrypt + content = translations.get_pem_components(secret)[1] + decrypt_mock.return_value = base64.b64decode(content) + + secret_model = self.context.secret_model + secret_model.algorithm = key_spec.alg + secret_model.bit_length = key_spec.bit_length + secret_model.mode = key_spec.mode + + secret_dto = self.plugin_to_test.get_secret( + secret_type, + None, # Secret metadata is not relevant to store_crypto process. + self.context) + + # Verify response. + self.assertIsInstance(secret_dto, secret_store.SecretDTO) + self.assertEqual(secret_type, secret_dto.type) + self.assertEqual(secret, secret_dto.secret) + self.assertIsInstance(secret_dto.key_spec, secret_store.KeySpec) + self.assertEqual( + secret_model.algorithm, secret_dto.key_spec.alg) + self.assertEqual( + secret_model.bit_length, secret_dto.key_spec.bit_length) + self.assertEqual( + secret_model.mode, secret_dto.key_spec.mode) + def test_generate_symmetric_key(self): """test symmetric secret generation.""" generation_type = crypto.PluginSupportTypes.SYMMETRIC_KEY_GENERATION @@ -289,6 +383,7 @@ class WhenTestingStoreCrypto(TestSecretStoreBase): self.assertRaises( secret_store.SecretNotFoundException, self.plugin_to_test.get_secret, + secret_store.SecretType.OPAQUE, None, # get_secret() doesn't use the secret metadata argument self.context ) @@ -299,6 +394,7 @@ class WhenTestingStoreCrypto(TestSecretStoreBase): self.assertRaises( secret_store.SecretNotFoundException, self.plugin_to_test.get_secret, + secret_store.SecretType.OPAQUE, None, # get_secret() doesn't use the secret metadata argument self.context ) diff --git a/barbican/tests/plugin/util/test_mime_types.py b/barbican/tests/plugin/util/test_mime_types.py index dd9ef3c79..dc8262e73 100644 --- a/barbican/tests/plugin/util/test_mime_types.py +++ b/barbican/tests/plugin/util/test_mime_types.py @@ -111,8 +111,8 @@ class WhenTestingAugmentFieldsWithContentTypes(utils.BaseTestCase): def test_static_supported_binary(self): for bin in mime_types.BINARY: - self.assertEqual('application/octet-stream', - mime_types.INTERNAL_CTYPES[bin]) + self.assertTrue(mime_types.INTERNAL_CTYPES[bin] in + mime_types.BINARY) def test_static_content_to_encodings(self): self.assertIn('text/plain', mime_types.CTYPES_TO_ENCODINGS) @@ -195,3 +195,31 @@ class WhenTestingNormalizationOfMIMETypes(utils.BaseTestCase): mime = 'something/bogus' r = mime_types.normalize_content_type(mime) self.assertEqual(r, 'something/bogus') + + +@utils.parameterized_test_case +class WhenTestingIfContentTypeAndEncodingSupported(utils.BaseTestCase): + + @utils.parameterized_dataset({ + 'plaintext_none': ['text/plain', None], + 'octectstream_base64': ['application/octet-stream', 'base64'], + 'pkcs8_base64': ['application/pkcs8', 'base64'], + 'pkix_base64': ['application/pkix-cert', 'base64'], + }) + def test_content_type_encoding_supported(self, content_type, encoding): + self.assertTrue(mime_types.is_content_type_with_encoding_supported( + content_type, + encoding)) + + @utils.parameterized_dataset({ + 'plaintext_none': ['text/plain', 'base64'], + 'octectstream_no_encoding': ['application/octet-stream', None], + 'pkcs8_no_encoding': ['application/pkcs8', None], + 'pkix_no_encoding': ['application/pkix-cert', None], + 'unknown_with_valid_encoding': ['application/uknown-content-type', + 'base64'] + }) + def test_content_type_encoding_not_supported(self, content_type, encoding): + self.assertFalse(mime_types.is_content_type_with_encoding_supported( + content_type, + encoding)) diff --git a/barbican/tests/plugin/util/test_translations.py b/barbican/tests/plugin/util/test_translations.py index ae7d4e754..24c9d2b21 100644 --- a/barbican/tests/plugin/util/test_translations.py +++ b/barbican/tests/plugin/util/test_translations.py @@ -26,24 +26,28 @@ class WhenNormalizingBeforeEncryption(utils.BaseTestCase): 'non_encrypted_content': { 'exception': s.SecretNoPayloadProvidedException, 'unencrypted': None, + 'secret_type': s.SecretType.OPAQUE, 'content_type': '', 'content_encoding': '' }, 'invalid_content_type': { 'exception': s.SecretContentTypeNotSupportedException, 'unencrypted': 'stuff', + 'secret_type': s.SecretType.OPAQUE, 'content_type': 'nope', 'content_encoding': '' }, 'invalid_base64_content': { 'exception': s.SecretPayloadDecodingError, 'unencrypted': 'stuff', + 'secret_type': s.SecretType.OPAQUE, 'content_type': 'application/octet-stream', 'content_encoding': 'base64' }, 'content_encoding_isnt_base64': { 'exception': s.SecretContentEncodingMustBeBase64, 'unencrypted': 'stuff', + 'secret_type': s.SecretType.OPAQUE, 'content_type': 'application/octet-stream', 'content_encoding': 'other_stuff', 'enforce_text_only': True @@ -51,22 +55,119 @@ class WhenNormalizingBeforeEncryption(utils.BaseTestCase): 'unsupported_content_encoding': { 'exception': s.SecretContentEncodingNotSupportedException, 'unencrypted': 'stuff', + 'secret_type': s.SecretType.OPAQUE, 'content_type': 'application/octet-stream', 'content_encoding': 'other_stuff' } } + dataset_for_normalization = { + 'plain_text': { + 'unencrypted': 'stuff', + 'secret_type': s.SecretType.OPAQUE, + 'content_type': 'text/plain', + 'content_encoding': '', + 'expected': base64.b64encode('stuff'.encode('utf-8')) + }, + 'binary_base64': { + 'unencrypted': base64.b64encode('stuff'), + 'secret_type': s.SecretType.OPAQUE, + 'content_type': 'application/octet-stream', + 'content_encoding': 'base64', + 'expected': base64.b64encode('stuff') + }, + 'binary': { + 'unencrypted': 'stuff', + 'secret_type': s.SecretType.OPAQUE, + 'content_type': 'application/octet-stream', + 'content_encoding': 'binary', + 'expected': base64.b64encode('stuff') + }, + 'symmetric_base64': { + 'unencrypted': base64.b64encode('stuff'), + 'secret_type': s.SecretType.SYMMETRIC, + 'content_type': 'application/octet-stream', + 'content_encoding': 'base64', + 'expected': base64.b64encode('stuff') + }, + 'symmetric': { + 'unencrypted': 'stuff', + 'secret_type': s.SecretType.SYMMETRIC, + 'content_type': 'application/octet-stream', + 'content_encoding': 'binary', + 'expected': base64.b64encode('stuff') + }, + 'private_base64': { + 'unencrypted': utils.get_private_key(), + 'secret_type': s.SecretType.PRIVATE, + 'content_type': 'application/pkcs8', + 'content_encoding': 'base64', + 'expected': utils.get_private_key() + }, + 'private': { + 'unencrypted': base64.b64decode( + translations.get_pem_components(utils.get_private_key())[1]), + 'secret_type': s.SecretType.PRIVATE, + 'content_type': 'application/pkcs8', + 'content_encoding': 'binary', + 'expected': utils.get_private_key() + }, + 'public_base64': { + 'unencrypted': utils.get_public_key(), + 'secret_type': s.SecretType.PUBLIC, + 'content_type': 'application/octet-stream', + 'content_encoding': 'base64', + 'expected': utils.get_public_key() + }, + 'public': { + 'unencrypted': base64.b64decode( + translations.get_pem_components(utils.get_public_key())[1]), + 'secret_type': s.SecretType.PUBLIC, + 'content_type': 'application/octet-stream', + 'content_encoding': 'binary', + 'expected': utils.get_public_key() + }, + 'certificate_base64': { + 'unencrypted': utils.get_certificate(), + 'secret_type': s.SecretType.CERTIFICATE, + 'content_type': 'application/pkix-cert', + 'content_encoding': 'base64', + 'expected': utils.get_certificate() + }, + 'certificate': { + 'unencrypted': base64.b64decode( + translations.get_pem_components(utils.get_certificate())[1]), + 'secret_type': s.SecretType.CERTIFICATE, + 'content_type': 'application/pkix-cert', + 'content_encoding': 'binary', + 'expected': utils.get_certificate() + }, + } + def setUp(self): super(WhenNormalizingBeforeEncryption, self).setUp() # Aliasing to reduce the number of line continuations self.normalize = translations.normalize_before_encryption - def test_can_normalize_plain_text(self): + @utils.parameterized_dataset(dataset_for_normalization) + def test_can_normalize(self, **kwargs): + unencrypted, content_type = self.normalize( + unencrypted=kwargs['unencrypted'], + content_type=kwargs['content_type'], + content_encoding=kwargs['content_encoding'], + secret_type=kwargs['secret_type'] + ) + + self.assertEqual(kwargs['expected'], unencrypted) + self.assertEqual(kwargs['content_type'], content_type) + + def test_can_normalize_tmp_plain_text(self): unencrypted, content_type = self.normalize( unencrypted='stuff', content_type='text/plain', - content_encoding='' + content_encoding='', + secret_type=s.SecretType.OPAQUE ) self.assertEqual(unencrypted, base64.b64encode('stuff')) @@ -76,11 +177,12 @@ class WhenNormalizingBeforeEncryption(utils.BaseTestCase): unencrypted, content_type = self.normalize( unencrypted='bam', content_type='application/octet-stream', - content_encoding=None + content_encoding=None, + secret_type=s.SecretType.OPAQUE ) - self.assertEqual(unencrypted, base64.b64encode('bam')) - self.assertEqual(content_type, 'application/octet-stream') + self.assertEqual(base64.b64encode('bam'), unencrypted) + self.assertEqual('application/octet-stream', content_type) @utils.parameterized_dataset(dataset_for_raised_exceptions) def test_normalize_raising_exceptions_with(self, exception, **kwargs): @@ -107,7 +209,96 @@ class WhenAnalyzingBeforeDecryption(utils.BaseTestCase): self.assertRaises(exception, self.analyze, **kwargs) +@utils.parameterized_test_case +class WhenNormalizingPemSecrets(utils.BaseTestCase): + + dataset_for_pem_normalize = { + 'private_key': { + 'pem': utils.get_private_key() + }, + 'public_key': { + 'pem': utils.get_public_key() + }, + 'certificate': { + 'pem': utils.get_certificate() + } + } + + dataset_for_bad_pems = { + 'raw_string': { + 'pem': 'fooandstuff' + }, + 'no_header': { + 'pem': utils.get_private_key()[27:] + }, + 'no_footer': { + 'pem': utils.get_private_key()[:-15] + }, + 'no_header_no_footer': { + 'pem': utils.get_private_key()[27:-15] + } + } + + dataset_for_to_pem = { + 'private_key': { + 'secret_type': s.SecretType.PRIVATE, + 'pem': utils.get_private_key() + }, + 'public_key': { + 'secret_type': s.SecretType.PUBLIC, + 'pem': utils.get_public_key() + }, + 'certificate': { + 'secret_type': s.SecretType.CERTIFICATE, + 'pem': utils.get_certificate() + } + } + + @utils.parameterized_dataset(dataset_for_pem_normalize) + def test_pem_normalized(self, pem): + pem_components = translations.get_pem_components(pem) + self.assertEqual(3, len(pem_components)) + pem_msg = pem_components[0] + pem_components[1] + pem_components[2] + self.assertEqual(pem, pem_msg) + + @utils.parameterized_dataset(dataset_for_to_pem) + def test_to_pem(self, secret_type, pem): + pem_components = translations.get_pem_components(pem) + content = base64.b64decode(pem_components[1]) + pem_msg = translations.to_pem(secret_type, content, False) + self.assertEqual(pem, pem_msg) + + @utils.parameterized_dataset(dataset_for_to_pem) + def test_to_pem_payload_encoded(self, secret_type, pem): + pem_components = translations.get_pem_components(pem) + content = pem_components[1] + pem_msg = translations.to_pem(secret_type, content, True) + self.assertEqual(pem, pem_msg) + + @utils.parameterized_dataset(dataset_for_bad_pems) + def test_pem_normalize_raising_exceptions_with(self, pem): + self.assertRaises(s.SecretPayloadDecodingError, + translations.get_pem_components, pem) + + +@utils.parameterized_test_case class WhenDenormalizingAfterDecryption(utils.BaseTestCase): + + dataset_for_pem_denormalize = { + 'private_key': { + 'pem': utils.get_private_key(), + 'content_type': 'application/pkcs8' + }, + 'public_key': { + 'pem': utils.get_public_key(), + 'content_type': 'application/octet-stream' + }, + 'certificate': { + 'pem': utils.get_certificate(), + 'content_type': 'application/pkix-cert' + } + } + def setUp(self): super(WhenDenormalizingAfterDecryption, self).setUp() @@ -115,17 +306,21 @@ class WhenDenormalizingAfterDecryption(utils.BaseTestCase): self.denormalize = translations.denormalize_after_decryption def test_ascii_characters_to_utf8_with_plain_text(self): - unencrypted = self.denormalize('bam', 'text/plain') + secret = 'bam' + normalized_secret = secret.encode('utf-8') + normalized_secret = base64.b64encode(normalized_secret) + unencrypted = self.denormalize(normalized_secret, 'text/plain') self.assertEqual(unencrypted, 'bam') def test_ascii_characters_to_utf8_with_app_octet_stream(self): - unencrypted = self.denormalize('bam', 'application/octet-stream') + unencrypted = self.denormalize(base64.b64encode('bam'), + 'application/octet-stream') self.assertEqual(unencrypted, 'bam') def test_non_ascii_character_with_plain_text_raises_exception(self): exception = s.SecretAcceptNotSupportedException kwargs = { - 'unencrypted': '\xff', + 'unencrypted': base64.b64encode('\xff'), 'content_type': 'text/plain' } @@ -139,3 +334,10 @@ class WhenDenormalizingAfterDecryption(utils.BaseTestCase): } self.assertRaises(exception, self.denormalize, **kwargs) + + @utils.parameterized_dataset(dataset_for_pem_denormalize) + def test_denormalize_pem(self, pem, content_type): + pem_components = translations.get_pem_components(pem) + secret = base64.b64decode(pem_components[1]) + denorm_secret = self.denormalize(pem, content_type) + self.assertEqual(secret, denorm_secret) diff --git a/barbican/tests/utils.py b/barbican/tests/utils.py index 53470f641..36c5b50d4 100644 --- a/barbican/tests/utils.py +++ b/barbican/tests/utils.py @@ -416,3 +416,57 @@ def generate_test_uuid(tail_value=0): """Returns a blank uuid with the given value added to the end segment.""" return '00000000-0000-0000-0000-{value:0>{pad}}'.format(value=tail_value, pad=12) + + +def get_private_key(): + s = ("-----BEGIN PRIVATE KEY-----" + "MIICdgIBADANBgkqhkiG9w0BAQEFAASCAmAwggJcAgEAAoGBAMxOUcg4eiBTZnIy" + "4XhEV+IoBbye/ZkXnxWQPnz9Cm+2C3rIYBev6WLqztSfi1EHnn3jM9p36KJuVgvA" + "Jr4wfn19hM9pw5Cq5hcnkVlBCAKoCM7p/jf7G2qv0yxlhXK3eZVzR/8Km3wImKN5" + "mJRqCv89I1LXyiuHYlVrznx8hjTZAgMBAAECgYAYyVu0rd1rNJMWQcQHAkujPXXM" + "t4FO5IXBaPmb0ltEyFJGOC8oUIhBHvmu5BhT4VfCMCFnXOrVYguAa3SH2HxP54Wb" + "xfycCNow5ikujEfdvsAZi1tnKedFRnJhdANCAM+6+fTNUzNElUW6kjuvwWWnRq7C" + "iCHqhd5ssVa8vMjPjQJBAPpMz0rXo2DDtYqQLOnWwAbV+djM1+ldmBdh/Q4zETDO" + "xgPfUvLBhU40LJt8NQeia6Ce4oYH+W4WRyNYvvmcGz8CQQDQ9V/8IuMJN6vyAKrc" + "WMPyLfYFu3wJ74/DX0EZ7lf+UhTlCEwEQaVQ78El1oFJDl7cjnH3Ay5GNsFfHOfd" + "uaHnAkAa21MCvNCS+WzpST8IeDpygVMlqBUivSAsoh78/w3BJu6oS7YixhD/qyl+" + "JX2rLApQWbwElaZ14X4QlN0o+08RAkEAi79vIDtcx69Z6ZGUs6UR9wfR/+dxf1ue" + "NDWiXmtOoiHXI422+SnGHCkcbefVszxPKQaDJYYVDpRUIv47+8fIhQJAIPlfqUpN" + "0c23beUUWItd+fSVLH+bxTUv+FUqLQIC2VlXVecg7+eAOMNzF2CqcpWstIcrYkV7" + "lECxnorexnYA8g==" + "-----END PRIVATE KEY-----") + return s + + +def get_public_key(): + s = ("-----BEGIN PUBLIC KEY-----" + "MIGfMA0GCSqGSIb3DQEBAQUAA4GNADCBiQKBgQDMTlHIOHogU2ZyMuF4RFfiKAW8" + "nv2ZF58VkD58/Qpvtgt6yGAXr+li6s7Un4tRB5594zPad+iiblYLwCa+MH59fYTP" + "acOQquYXJ5FZQQgCqAjO6f43+xtqr9MsZYVyt3mVc0f/Cpt8CJijeZiUagr/PSNS" + "18orh2JVa858fIY02QIDAQAB" + "-----END PUBLIC KEY-----") + return s + + +def get_certificate(): + s = ("-----BEGIN CERTIFICATE-----" + "MIIDTzCCArigAwIBAgIJANwgT2i4cVRAMA0GCSqGSIb3DQEBBQUAMHkxCzAJBgNV" + "BAYTAlVTMQswCQYDVQQIEwJUWDEPMA0GA1UEBxMGQXVzdGluMRYwFAYDVQQKEw1t" + "eWNvbXBhbnkuY29tMQ8wDQYDVQQDEwZjb21tb24xIzAhBgkqhkiG9w0BCQEWFGNv" + "bW1vbkBteWNvbXBhbnkuY29tMB4XDTE1MDIxNzIxMDA1N1oXDTE4MDIxNjIxMDA1" + "N1oweTELMAkGA1UEBhMCVVMxCzAJBgNVBAgTAlRYMQ8wDQYDVQQHEwZBdXN0aW4x" + "FjAUBgNVBAoTDW15Y29tcGFueS5jb20xDzANBgNVBAMTBmNvbW1vbjEjMCEGCSqG" + "SIb3DQEJARYUY29tbW9uQG15Y29tcGFueS5jb20wgZ8wDQYJKoZIhvcNAQEBBQAD" + "gY0AMIGJAoGBAMxOUcg4eiBTZnIy4XhEV+IoBbye/ZkXnxWQPnz9Cm+2C3rIYBev" + "6WLqztSfi1EHnn3jM9p36KJuVgvAJr4wfn19hM9pw5Cq5hcnkVlBCAKoCM7p/jf7" + "G2qv0yxlhXK3eZVzR/8Km3wImKN5mJRqCv89I1LXyiuHYlVrznx8hjTZAgMBAAGj" + "gd4wgdswHQYDVR0OBBYEFBxIlJZjp3+TkIwy8G3dqfCgL6GfMIGrBgNVHSMEgaMw" + "gaCAFBxIlJZjp3+TkIwy8G3dqfCgL6GfoX2kezB5MQswCQYDVQQGEwJVUzELMAkG" + "A1UECBMCVFgxDzANBgNVBAcTBkF1c3RpbjEWMBQGA1UEChMNbXljb21wYW55LmNv" + "bTEPMA0GA1UEAxMGY29tbW9uMSMwIQYJKoZIhvcNAQkBFhRjb21tb25AbXljb21w" + "YW55LmNvbYIJANwgT2i4cVRAMAwGA1UdEwQFMAMBAf8wDQYJKoZIhvcNAQEFBQAD" + "gYEANTlbvNaoADYMzGOoLgaAVt7tjQ6EZVXjswax1jnj8dEoQpUNCAzkkwFJDFqt" + "mOTTZxpWNbDm9AcbCubrLXwN22eBqYz02cBGoBnN/h2qINSL2caM08OMmMDm1g1Q" + "+iH+eUsCmvkTnylw8FJwN7TYV0No6V9/+aWvf6h1NqDiiLc=" + "-----END CERTIFICATE-----") + return s diff --git a/functionaltests/api/v1/functional/test_secrets.py b/functionaltests/api/v1/functional/test_secrets.py index 981986c59..bf3c760a0 100644 --- a/functionaltests/api/v1/functional/test_secrets.py +++ b/functionaltests/api/v1/functional/test_secrets.py @@ -21,11 +21,54 @@ import time from testtools import testcase +from barbican.plugin.util import translations from barbican.tests import utils from functionaltests.api import base from functionaltests.api.v1.behaviors import secret_behaviors from functionaltests.api.v1.models import secret_models + +def get_pem_content(pem): + b64_content = translations.get_pem_components(pem)[1] + return base64.b64decode(b64_content) + + +def get_private_key_req(): + return {'name': 'myprivatekey', + 'payload_content_type': 'application/octet-stream', + 'payload_content_encoding': 'base64', + 'algorithm': 'rsa', + 'bit_length': 1024, + 'secret_type': 'private', + 'payload': utils.get_private_key()} + + +def get_public_key_req(): + return {'name': 'mypublickey', + 'payload_content_type': 'application/octet-stream', + 'payload_content_encoding': 'base64', + 'algorithm': 'rsa', + 'bit_length': 1024, + 'secret_type': 'public', + 'payload': utils.get_public_key()} + + +def get_certificate_req(): + return {'name': 'mycertificate', + 'payload_content_type': 'application/octet-stream', + 'payload_content_encoding': 'base64', + 'algorithm': 'rsa', + 'bit_length': 1024, + 'secret_type': 'certificate', + 'payload': utils.get_certificate()} + + +def get_passphrase_req(): + return {'name': 'mypassphrase', + 'payload_content_type': 'text/plain', + 'secret_type': 'passphrase', + 'payload': 'mysecretpassphrase'} + # TODO(tdink) Move to a config file secret_create_defaults_data = { "name": "AES key", @@ -401,7 +444,7 @@ class SecretsTestCase(base.TestCase): self.assertIn( "Provided object does not match schema 'Secret': " - "payload_content_type is not one of ['text/plain', " + "payload_content_type plain-text is not one of ['text/plain', " "'text/plain;charset=utf-8', 'text/plain; charset=utf-8', " "'application/octet-stream'", resp_dict['description']) self.assertIn("Bad Request", resp_dict['title']) @@ -815,3 +858,40 @@ class SecretsTestCase(base.TestCase): # malicious one. regex = '.*{0}.*'.format(malicious_hostname) self.assertNotRegexpMatches(resp.headers['location'], regex) + + @utils.parameterized_dataset({ + 'symmetric': ['symmetric', + base64.b64decode(secret_create_defaults_data['payload']), + secret_create_defaults_data], + 'private': ['private', + get_pem_content(utils.get_private_key()), + get_private_key_req()], + 'public': ['public', + get_pem_content(utils.get_public_key()), + get_public_key_req()], + 'certificate': ['certificate', + get_pem_content(utils.get_certificate()), + get_certificate_req()], + 'passphrase': ['passphrase', + 'mysecretpassphrase', + get_passphrase_req()] + }) + @testcase.attr('positive') + def test_secret_create_with_secret_type(self, secret_type, expected, spec): + """Create secrets with various secret types.""" + test_model = secret_models.SecretModel(**spec) + overrides = {"secret_type": secret_type} + test_model.override_values(**overrides) + + resp, secret_ref = self.behaviors.create_secret(test_model) + self.assertEqual(201, resp.status_code) + + resp = self.behaviors.get_secret_metadata(secret_ref) + secret_type_response = resp.model.secret_type + self.assertIsNotNone(secret_type_response) + self.assertEqual(secret_type, secret_type_response) + + content_type = spec['payload_content_type'] + get_resp = self.behaviors.get_secret(secret_ref, + content_type) + self.assertEqual(expected, get_resp.content)