barbican/barbican/plugin/util/translations.py

205 lines
7.1 KiB
Python

# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import serialization
from OpenSSL import crypto
from oslo_serialization import base64
import six
from barbican import i18n as u # noqa
from barbican.plugin.interface import secret_store as s
from barbican.plugin.util import mime_types
def normalize_before_encryption(unencrypted, content_type, content_encoding,
secret_type, enforce_text_only=False):
"""Normalize unencrypted prior to plugin encryption processing.
This normalizes the secrets before they are handed off to the SecretStore
for storage. This converts all data to Base64 data. If the data is plain
text then it encoded using utf-8 first and then Base64 encoded. Binary
data is simply converted to Base64.
:param str unencrypted: Raw payload
:param str content_type: The media type for the payload
:param str content_encoding: Transfer encoding
:param str secret_type: The type of secret
:param bool enforce_text_only: Require text content_type or base64
content_encoding
:returns: Tuple containing the normalized (base64 encoded) payload and
the normalized media type.
"""
if not unencrypted:
raise s.SecretNoPayloadProvidedException()
# Validate and normalize content-type.
normalized_media_type = normalize_content_type(content_type)
# Process plain-text type.
if normalized_media_type in mime_types.PLAIN_TEXT:
# normalize text to binary and then base64 encode it
if six.PY3:
b64payload = base64.encode_as_bytes(unencrypted)
else:
unencrypted_bytes = unencrypted.encode('utf-8')
b64payload = base64.encode_as_bytes(unencrypted_bytes)
# Process binary type.
else:
if not content_encoding:
b64payload = base64.encode_as_bytes(unencrypted)
elif content_encoding.lower() == 'base64':
if not isinstance(unencrypted, six.binary_type):
b64payload = unencrypted.encode('utf-8')
else:
b64payload = unencrypted
elif enforce_text_only:
# For text-based protocols (such as the one-step secret POST),
# only 'base64' encoding is possible/supported.
raise s.SecretContentEncodingMustBeBase64()
else:
# Unsupported content-encoding request.
raise s.SecretContentEncodingNotSupportedException(
content_encoding
)
return b64payload, normalized_media_type
def normalize_content_type(content_type):
"""Normalize the content type and validate that it is supported."""
normalized_mime = mime_types.normalize_content_type(content_type)
if not mime_types.is_supported(normalized_mime):
raise s.SecretContentTypeNotSupportedException(content_type)
return normalized_mime
def analyze_before_decryption(content_type):
"""Determine support for desired content type."""
if not mime_types.is_supported(content_type):
raise s.SecretAcceptNotSupportedException(content_type)
def denormalize_after_decryption(unencrypted, content_type):
"""Translate the decrypted data into the desired content type.
This is called when the raw keys are requested by the user. The secret
returned from the SecretStore is the unencrypted parameter. This
'denormalizes' the data back to its binary format.
"""
# Process plain-text type.
if content_type in mime_types.PLAIN_TEXT:
# normalize text to binary string
try:
unencrypted = base64.decode_as_text(unencrypted)
except UnicodeDecodeError:
raise s.SecretAcceptNotSupportedException(content_type)
# Process binary type.
elif content_type in mime_types.BINARY:
unencrypted = base64.decode_as_bytes(unencrypted)
else:
raise s.SecretContentTypeNotSupportedException(content_type)
return unencrypted
def convert_pem_to_der(pem, secret_type):
if secret_type == s.SecretType.PRIVATE:
return _convert_private_pem_to_der(pem)
elif secret_type == s.SecretType.PUBLIC:
return _convert_public_pem_to_der(pem)
elif secret_type == s.SecretType.CERTIFICATE:
return _convert_certificate_pem_to_der(pem)
else:
reason = u._("Secret type can not be converted to DER")
raise s.SecretGeneralException(reason=reason)
def convert_der_to_pem(der, secret_type):
if secret_type == s.SecretType.PRIVATE:
return _convert_private_der_to_pem(der)
elif secret_type == s.SecretType.PUBLIC:
return _convert_public_der_to_pem(der)
elif secret_type == s.SecretType.CERTIFICATE:
return _convert_certificate_der_to_pem(der)
else:
reason = u._("Secret type can not be converted to PEM")
raise s.SecretGeneralException(reason=reason)
def _convert_private_pem_to_der(pem):
private_key = serialization.load_pem_private_key(
pem,
password=None,
backend=default_backend()
)
der = private_key.private_bytes(
encoding=serialization.Encoding.DER,
format=serialization.PrivateFormat.PKCS8,
encryption_algorithm=serialization.NoEncryption()
)
return der
def _convert_private_der_to_pem(der):
private_key = serialization.load_der_private_key(
der,
password=None,
backend=default_backend()
)
pem = private_key.private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.PKCS8,
encryption_algorithm=serialization.NoEncryption()
)
return pem
def _convert_public_pem_to_der(pem):
public_key = serialization.load_pem_public_key(
pem,
backend=default_backend()
)
der = public_key.public_bytes(
encoding=serialization.Encoding.DER,
format=serialization.PublicFormat.SubjectPublicKeyInfo
)
return der
def _convert_public_der_to_pem(der):
public_key = serialization.load_der_public_key(
der,
backend=default_backend()
)
pem = public_key.public_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PublicFormat.SubjectPublicKeyInfo
)
return pem
def _convert_certificate_pem_to_der(pem):
cert = crypto.load_certificate(crypto.FILETYPE_PEM, pem)
der = crypto.dump_certificate(crypto.FILETYPE_ASN1, cert)
return der
def _convert_certificate_der_to_pem(der):
cert = crypto.load_certificate(crypto.FILETYPE_ASN1, der)
pem = crypto.dump_certificate(crypto.FILETYPE_PEM, cert)
return pem