Morgan Fainberg 559ba206d4 Make common log import consistent
Across the project, make the common log import consistent.

Change-Id: I937cd5d337db5d2296dd9238685803bbba3391b3
2014-01-11 16:55:59 -08:00

175 lines
6.5 KiB

# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2013 OpenStack Foundation
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import hashlib
from keystone.common import environment
from keystone.openstack.common import log
LOG = log.getLogger(__name__)
def cms_verify(formatted, signing_cert_file_name, ca_file_name):
"""Verifies the signature of the contents IAW CMS syntax."""
process = environment.subprocess.Popen(["openssl", "cms", "-verify",
"-CAfile", ca_file_name,
"-inform", "PEM",
"-nosmimecap", "-nodetach",
"-nocerts", "-noattr"],
output, err = process.communicate(formatted)
retcode = process.poll()
if retcode:
LOG.error(_('Verify error: %s'), err)
raise environment.subprocess.CalledProcessError(retcode,
"openssl", output=err)
return output
def token_to_cms(signed_text):
copy_of_text = signed_text.replace('-', '/')
formatted = "-----BEGIN CMS-----\n"
line_length = 64
while len(copy_of_text) > 0:
if (len(copy_of_text) > line_length):
formatted += copy_of_text[:line_length]
copy_of_text = copy_of_text[line_length:]
formatted += copy_of_text
copy_of_text = ""
formatted += "\n"
formatted += "-----END CMS-----\n"
return formatted
def verify_token(token, signing_cert_file_name, ca_file_name):
return cms_verify(token_to_cms(token),
def is_ans1_token(token):
"""Determine if a token appears to be PKI-based.
thx to ayoung for sorting this out.
base64 decoded hex representation of MII is 3082
In [3]: binascii.hexlify(base64.b64decode('MII='))
Out[3]: '3082'
pg4: For tags from 0 to 30 the first octet is the identifier
pg10: Hex 30 means sequence, followed by the length of that sequence.
pg5: Second octet is the length octet
first bit indicates short or long form, next 7 bits encode the number
of subsequent octets that make up the content length octets as an
unsigned binary int
82 = 10000010 (first bit indicates long form)
0000010 = 2 octets of content length
so read the next 2 octets to get the length of the content.
In the case of a very large content length there could be a requirement to
have more than 2 octets to designate the content length, therefore
requiring us to check for MIM, MIQ, etc.
In [4]: base64.b64encode(binascii.a2b_hex('3083'))
Out[4]: 'MIM='
In [5]: base64.b64encode(binascii.a2b_hex('3084'))
Out[5]: 'MIQ='
Checking for MI would become invalid at 16 octets of content length
10010000 = 90
In [6]: base64.b64encode(binascii.a2b_hex('3090'))
Out[6]: 'MJA='
Checking for just M is insufficient
But we will only check for MII:
Max length of the content using 2 octets is 7FFF or 32767
It's not practical to support a token of this length or greater in http
therefore, we will check for MII only and ignore the case of larger tokens
return token[:3] == PKI_ANS1_PREFIX
def cms_sign_text(text, signing_cert_file_name, signing_key_file_name):
"""Uses OpenSSL to sign a document
Produces a Base64 encoding of a DER formatted CMS Document
process = environment.subprocess.Popen(["openssl", "cms", "-sign",
"-signer", signing_cert_file_name,
"-inkey", signing_key_file_name,
"-outform", "PEM",
"-nosmimecap", "-nodetach",
"-nocerts", "-noattr"],
output, err = process.communicate(text)
retcode = process.poll()
if retcode or "Error" in err:
if retcode == 3:
LOG.error(_("Signing error: Unable to load certificate - "
"ensure you've configured PKI with "
"'keystone-manage pki_setup'"))
LOG.error(_('Signing error: %s'), err)
raise environment.subprocess.CalledProcessError(retcode, "openssl")
return output
def cms_sign_token(text, signing_cert_file_name, signing_key_file_name):
output = cms_sign_text(text, signing_cert_file_name, signing_key_file_name)
return cms_to_token(output)
def cms_to_token(cms_text):
start_delim = "-----BEGIN CMS-----"
end_delim = "-----END CMS-----"
signed_text = cms_text
signed_text = signed_text.replace('/', '-')
signed_text = signed_text.replace(start_delim, '')
signed_text = signed_text.replace(end_delim, '')
signed_text = signed_text.replace('\n', '')
return signed_text
def cms_hash_token(token_id):
"""Hash PKI tokens.
return: for ans1_token, returns the hash of the passed in token
otherwise, returns what it was passed in.
if token_id is None:
return None
if is_ans1_token(token_id):
hasher = hashlib.md5()
return hasher.hexdigest()
return token_id