keystone/keystone/common/cms.py

174 lines
6.5 KiB
Python

# Copyright 2013 OpenStack Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import hashlib
from keystone.common import environment
from keystone.openstack.common import log
LOG = log.getLogger(__name__)
PKI_ANS1_PREFIX = 'MII'
def cms_verify(formatted, signing_cert_file_name, ca_file_name):
"""Verifies the signature of the contents IAW CMS syntax."""
process = environment.subprocess.Popen(["openssl", "cms", "-verify",
"-certfile",
signing_cert_file_name,
"-CAfile", ca_file_name,
"-inform", "PEM",
"-nosmimecap", "-nodetach",
"-nocerts", "-noattr"],
stdin=environment.subprocess.PIPE,
stdout=environment.subprocess.PIPE,
stderr=environment.subprocess.PIPE)
output, err = process.communicate(formatted)
retcode = process.poll()
if retcode:
LOG.error(_('Verify error: %s'), err)
raise environment.subprocess.CalledProcessError(retcode,
"openssl", output=err)
return output
def token_to_cms(signed_text):
copy_of_text = signed_text.replace('-', '/')
formatted = "-----BEGIN CMS-----\n"
line_length = 64
while len(copy_of_text) > 0:
if (len(copy_of_text) > line_length):
formatted += copy_of_text[:line_length]
copy_of_text = copy_of_text[line_length:]
else:
formatted += copy_of_text
copy_of_text = ""
formatted += "\n"
formatted += "-----END CMS-----\n"
return formatted
def verify_token(token, signing_cert_file_name, ca_file_name):
return cms_verify(token_to_cms(token),
signing_cert_file_name,
ca_file_name)
def is_ans1_token(token):
"""Determine if a token appears to be PKI-based.
thx to ayoung for sorting this out.
| base64 decoded hex representation of MII is 3082
| In [3]: binascii.hexlify(base64.b64decode('MII='))
| Out[3]: '3082'
re: http://www.itu.int/ITU-T/studygroups/com17/languages/X.690-0207.pdf
| pg4: For tags from 0 to 30 the first octet is the identifier
| pg10: Hex 30 means sequence, followed by the length of that sequence.
| pg5: Second octet is the length octet
| first bit indicates short or long form, next 7 bits encode the
number of subsequent octets that make up the content length octets
as an unsigned binary int
|
| 82 = 10000010 (first bit indicates long form)
| 0000010 = 2 octets of content length
| so read the next 2 octets to get the length of the content.
In the case of a very large content length there could be a requirement to
have more than 2 octets to designate the content length, therefore
requiring us to check for MIM, MIQ, etc.
| In [4]: base64.b64encode(binascii.a2b_hex('3083'))
| Out[4]: 'MIM='
| In [5]: base64.b64encode(binascii.a2b_hex('3084'))
| Out[5]: 'MIQ='
| Checking for MI would become invalid at 16 octets of content length
| 10010000 = 90
| In [6]: base64.b64encode(binascii.a2b_hex('3090'))
| Out[6]: 'MJA='
| Checking for just M is insufficient
But we will only check for MII:
Max length of the content using 2 octets is 7FFF or 32767.
It's not practical to support a token of this length or greater in http;
therefore, we will check for MII only and ignore the case of larger tokens
"""
return token[:3] == PKI_ANS1_PREFIX
def cms_sign_text(text, signing_cert_file_name, signing_key_file_name):
"""Uses OpenSSL to sign a document
Produces a Base64 encoding of a DER formatted CMS Document
http://en.wikipedia.org/wiki/Cryptographic_Message_Syntax
"""
process = environment.subprocess.Popen(["openssl", "cms", "-sign",
"-signer", signing_cert_file_name,
"-inkey", signing_key_file_name,
"-outform", "PEM",
"-nosmimecap", "-nodetach",
"-nocerts", "-noattr"],
stdin=environment.subprocess.PIPE,
stdout=environment.subprocess.PIPE,
stderr=environment.subprocess.PIPE)
output, err = process.communicate(text)
retcode = process.poll()
if retcode or "Error" in err:
if retcode == 3:
LOG.error(_("Signing error: Unable to load certificate - "
"ensure you've configured PKI with "
"'keystone-manage pki_setup'"))
else:
LOG.error(_('Signing error: %s'), err)
raise environment.subprocess.CalledProcessError(retcode, "openssl")
return output
def cms_sign_token(text, signing_cert_file_name, signing_key_file_name):
output = cms_sign_text(text, signing_cert_file_name, signing_key_file_name)
return cms_to_token(output)
def cms_to_token(cms_text):
start_delim = "-----BEGIN CMS-----"
end_delim = "-----END CMS-----"
signed_text = cms_text
signed_text = signed_text.replace('/', '-')
signed_text = signed_text.replace(start_delim, '')
signed_text = signed_text.replace(end_delim, '')
signed_text = signed_text.replace('\n', '')
return signed_text
def cms_hash_token(token_id):
"""Hash PKI tokens.
return: for ans1_token, returns the hash of the passed in token
otherwise, returns what it was passed in.
"""
if token_id is None:
return None
if is_ans1_token(token_id):
hasher = hashlib.md5()
hasher.update(token_id)
return hasher.hexdigest()
else:
return token_id