Utilites for manipulating base64 & PEM
Add two new utility modules - keystone.common.base64utils - keystone.common.pemutils Add two new unit tests to exercise the above. - keystone.tests.test_base64utils - keystone.tests.test_pemutils Functionality in these utilities will be utilized to fix bug #1233838 in a subsequent commit. It is hoped these modules will moved to Oslo common code. Each module has a doc string describing their purpose and functionalty. Change-Id: I3546674dcd9eddec2c378929dd3f759f667cfd5d Partial-Bug: #1233838
This commit is contained in:
parent
2ab2c62435
commit
6ba5e3683b
|
@ -0,0 +1,395 @@
|
|||
# vim: tabstop=4 shiftwidth=4 softtabstop=4
|
||||
|
||||
# Copyright 2013 Red Hat, Inc.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
"""
|
||||
|
||||
Python provides the base64 module as a core module but this is mostly
|
||||
limited to encoding and decoding base64 and it's variants. It is often
|
||||
useful to be able to perform other operations on base64 text. This
|
||||
module is meant to be used in conjunction with the core base64 module.
|
||||
|
||||
Standarized base64 is defined in
|
||||
RFC-4648 "The Base16, Base32, and Base64 Data Encodings".
|
||||
|
||||
This module provides the following base64 utility functionality:
|
||||
|
||||
* tests if text is valid base64
|
||||
* filter formatting from base64
|
||||
* convert base64 between different alphabets
|
||||
* Handle padding issues
|
||||
- test if base64 is padded
|
||||
- removes padding
|
||||
- restores padding
|
||||
* wraps base64 text into formatted blocks
|
||||
- via iterator
|
||||
- return formatted string
|
||||
|
||||
"""
|
||||
|
||||
import io
|
||||
import re
|
||||
import string
|
||||
import urllib
|
||||
|
||||
|
||||
class InvalidBase64Error(ValueError):
|
||||
pass
|
||||
|
||||
base64_alphabet_re = re.compile(r'^[^A-Za-z0-9+/=]+$')
|
||||
base64url_alphabet_re = re.compile(r'^[^A-Za-z0-9---_=]+$')
|
||||
|
||||
base64_non_alphabet_re = re.compile(r'[^A-Za-z0-9+/=]+')
|
||||
base64url_non_alphabet_re = re.compile(r'[^A-Za-z0-9---_=]+')
|
||||
|
||||
_strip_formatting_re = re.compile(r'\s+')
|
||||
|
||||
_base64_to_base64url_trans = string.maketrans('+/', '-_')
|
||||
_base64url_to_base64_trans = string.maketrans('-_', '+/')
|
||||
|
||||
|
||||
def is_valid_base64(text):
|
||||
"""Test if input text can be base64 decoded.
|
||||
|
||||
:param text: input base64 text
|
||||
:type text: string
|
||||
:returns: bool -- True if text can be decoded as base64, False otherwise
|
||||
"""
|
||||
|
||||
text = filter_formatting(text)
|
||||
|
||||
if base64_non_alphabet_re.search(text):
|
||||
return False
|
||||
|
||||
try:
|
||||
return base64_is_padded(text)
|
||||
except InvalidBase64Error:
|
||||
return False
|
||||
|
||||
|
||||
def is_valid_base64url(text):
|
||||
"""Test if input text can be base64url decoded.
|
||||
|
||||
:param text: input base64 text
|
||||
:type text: string
|
||||
:returns: bool -- True if text can be decoded as base64url,
|
||||
False otherwise
|
||||
"""
|
||||
|
||||
text = filter_formatting(text)
|
||||
|
||||
if base64url_non_alphabet_re.search(text):
|
||||
return False
|
||||
|
||||
try:
|
||||
return base64_is_padded(text)
|
||||
except InvalidBase64Error:
|
||||
return False
|
||||
|
||||
|
||||
def filter_formatting(text):
|
||||
"""Return base64 text without any formatting, just the base64.
|
||||
|
||||
Base64 text is often formatted with whitespace, line endings,
|
||||
etc. This function strips out any formatting, the result will
|
||||
contain only base64 characters.
|
||||
|
||||
Note, this function does not filter out all non-base64 alphabet
|
||||
characters, it only removes characters used for formatting.
|
||||
|
||||
:param text: input text to filter
|
||||
:type text: string
|
||||
:returns: string -- filtered text without formatting
|
||||
"""
|
||||
return _strip_formatting_re.sub('', text)
|
||||
|
||||
|
||||
def base64_to_base64url(text):
|
||||
"""Convert base64 text to base64url text.
|
||||
|
||||
base64url text is designed to be safe for use in filenames and
|
||||
URL's. It is defined in RFC-4648 Section 5.
|
||||
|
||||
base64url differs from base64 in the last two alphabet characters
|
||||
at index 62 and 63, these are sometimes referred as the
|
||||
altchars. The '+' character at index 62 is replaced by '-'
|
||||
(hyphen) and the '/' character at index 63 is replaced by '_'
|
||||
(underscore).
|
||||
|
||||
This function only translates the altchars, non-alphabet
|
||||
characters are not filtered out.
|
||||
|
||||
WARNING
|
||||
-------
|
||||
|
||||
base64url continues to use the '=' pad character which is NOT URL
|
||||
safe. RFC-4648 suggests two alternate methods to deal with this.
|
||||
|
||||
percent-encode
|
||||
percent-encode the pad character (e.g. '=' becomes
|
||||
'%3D'). This makes the base64url text fully safe. But
|
||||
percent-enconding has the downside of requiring
|
||||
percent-decoding prior to feeding the base64url text into a
|
||||
base64url decoder since most base64url decoders do not
|
||||
recognize %3D as a pad character and most decoders require
|
||||
correct padding.
|
||||
|
||||
no-padding
|
||||
padding is not strictly necessary to decode base64 or
|
||||
base64url text, the pad can be computed from the input text
|
||||
length. However many decoders demand padding and will consider
|
||||
non-padded text to be malformed. If one wants to omit the
|
||||
trailing pad character(s) for use in URL's it can be added back
|
||||
using the base64_assure_padding() function.
|
||||
|
||||
This function makes no decisions about which padding methodolgy to
|
||||
use. One can either call base64_strip_padding() to remove any pad
|
||||
characters (restoring later with base64_assure_padding()) or call
|
||||
base64url_percent_encode() to percent-encode the pad characters.
|
||||
|
||||
:param text: input base64 text
|
||||
:type text: string
|
||||
:returns: string -- base64url text
|
||||
"""
|
||||
return text.translate(_base64_to_base64url_trans)
|
||||
|
||||
|
||||
def base64url_to_base64(text):
|
||||
"""Convert base64url text to base64 text.
|
||||
|
||||
See base64_to_base64url() for a description of base64url text and
|
||||
it's issues.
|
||||
|
||||
This function does NOT handle percent-encoded pad characters, they
|
||||
will be left intact. If the input base64url text is
|
||||
percent-encoded you should call
|
||||
|
||||
:param text: text in base64url alphabet
|
||||
:type text: string
|
||||
:returns: string -- text in base64 alphabet
|
||||
|
||||
"""
|
||||
return text.translate(_base64url_to_base64_trans)
|
||||
|
||||
|
||||
def base64_is_padded(text, pad='='):
|
||||
"""Test if the text is base64 padded.
|
||||
|
||||
The input text must be in a base64 alphabet. The pad must be a
|
||||
single character. If the text has been percent-encoded (e.g. pad
|
||||
is the string '%3D') you must convert the text back to a base64
|
||||
alphabet (e.g. if percent-encoded use the function
|
||||
base64url_percent_decode()).
|
||||
|
||||
:param text: text containing ONLY characters in a base64 alphabet
|
||||
:type text: string
|
||||
:param pad: pad character (must be single character) (default: '=')
|
||||
:type pad: string
|
||||
:returns: bool -- True if padded, False otherwise
|
||||
:raises: ValueError, InvalidBase64Error
|
||||
"""
|
||||
|
||||
if len(pad) != 1:
|
||||
raise ValueError(_('pad must be single character'))
|
||||
|
||||
text_len = len(text)
|
||||
if text_len > 0 and text_len % 4 == 0:
|
||||
pad_index = text.find(pad)
|
||||
if pad_index >= 0 and pad_index < text_len - 2:
|
||||
raise InvalidBase64Error(_('text is multiple of 4, '
|
||||
'but pad "%s" occurs before '
|
||||
'2nd to last char') % pad)
|
||||
if pad_index == text_len - 2 and text[-1] != pad:
|
||||
raise InvalidBase64Error(_('text is multiple of 4, '
|
||||
'but pad "%s" occurs before '
|
||||
'non-pad last char') % pad)
|
||||
return True
|
||||
|
||||
if text.find(pad) >= 0:
|
||||
raise InvalidBase64Error(_('text is not a multiple of 4, '
|
||||
'but contains pad "%s"') % pad)
|
||||
return False
|
||||
|
||||
|
||||
def base64url_percent_encode(text):
|
||||
"""Percent-encode base64url padding.
|
||||
|
||||
The input text should only contain base64url alphabet
|
||||
characters. Any non-base64url alphabet characters will also be
|
||||
subject to percent-encoding.
|
||||
|
||||
:param text: text containing ONLY characters in the base64url alphabet
|
||||
:type text: string
|
||||
:returns: string -- percent-encoded base64url text
|
||||
:raises: InvalidBase64Error
|
||||
"""
|
||||
|
||||
if len(text) % 4 != 0:
|
||||
raise InvalidBase64Error(_('padded base64url text must be '
|
||||
'multiple of 4 characters'))
|
||||
|
||||
return urllib.quote(text)
|
||||
|
||||
|
||||
def base64url_percent_decode(text):
|
||||
"""Percent-decode base64url padding.
|
||||
|
||||
The input text should only contain base64url alphabet
|
||||
characters and the percent-encoded pad character. Any other
|
||||
percent-encoded characters will be subject to percent-decoding.
|
||||
|
||||
:param text: base64url alphabet text
|
||||
:type text: string
|
||||
:returns: string -- percent-decoded base64url text
|
||||
"""
|
||||
|
||||
decoded_text = urllib.unquote(text)
|
||||
|
||||
if len(decoded_text) % 4 != 0:
|
||||
raise InvalidBase64Error(_('padded base64url text must be '
|
||||
'multiple of 4 characters'))
|
||||
|
||||
return decoded_text
|
||||
|
||||
|
||||
def base64_strip_padding(text, pad='='):
|
||||
"""Remove padding from input base64 text.
|
||||
|
||||
:param text: text containing ONLY characters in a base64 alphabet
|
||||
:type text: string
|
||||
:param pad: pad character (must be single character) (default: '=')
|
||||
:type pad: string
|
||||
:returns: string -- base64 text without padding
|
||||
:raises: ValueError
|
||||
"""
|
||||
if len(pad) != 1:
|
||||
raise ValueError(_('pad must be single character'))
|
||||
|
||||
# Can't be padded if text is less than 4 characters.
|
||||
if len(text) < 4:
|
||||
return text
|
||||
|
||||
if text[-1] == pad:
|
||||
if text[-2] == pad:
|
||||
return text[0:-2]
|
||||
else:
|
||||
return text[0:-1]
|
||||
else:
|
||||
return text
|
||||
|
||||
|
||||
def base64_assure_padding(text, pad='='):
|
||||
"""Assure the input text ends with padding.
|
||||
|
||||
Base64 text is normally expected to be a multple of 4
|
||||
characters. Each 4 character base64 sequence produces 3 octets of
|
||||
binary data. If the binary data is not a multiple of 3 the base64
|
||||
text is padded at the end with a pad character such that is is
|
||||
always a multple of 4. Padding is ignored and does not alter the
|
||||
binary data nor it's length.
|
||||
|
||||
In some circumstances is is desirable to omit the padding
|
||||
character due to transport encoding conflicts. Base64 text can
|
||||
still be correctly decoded if the length of the base64 text
|
||||
(consisting only of characters in the desired base64 alphabet) is
|
||||
known, padding is not absolutely necessary.
|
||||
|
||||
Some base64 decoders demand correct padding or one may wish to
|
||||
format RFC compliant base64, this function performs this action.
|
||||
|
||||
Input is assumed to consist only of members of a base64
|
||||
alphabet (i.e no whitepace). Iteration yields a sequence of lines.
|
||||
The line does NOT terminate with a line ending.
|
||||
|
||||
Use the filter_formatting() function to assure the input text
|
||||
contains only the members of the alphabet.
|
||||
|
||||
If the text ends with the pad it is assumed to already be
|
||||
padded. Otherwise the binary length is computed from the input
|
||||
text length and correct number of pad characters are appended.
|
||||
|
||||
:param text: text containing ONLY characters in a base64 alphabet
|
||||
:type text: string
|
||||
:param pad: pad character (must be single character) (default: '=')
|
||||
:type pad: string
|
||||
:returns: string -- input base64 text with padding
|
||||
:raises: ValueError
|
||||
"""
|
||||
|
||||
if len(pad) != 1:
|
||||
raise ValueError(_('pad must be single character'))
|
||||
|
||||
if text.endswith(pad):
|
||||
return text
|
||||
|
||||
n = len(text) % 4
|
||||
if n == 0:
|
||||
return text
|
||||
|
||||
n = 4 - n
|
||||
padding = pad * n
|
||||
return text + padding
|
||||
|
||||
|
||||
def base64_wrap_iter(text, width=64):
|
||||
"""Fold text into lines of text with max line length.
|
||||
|
||||
Input is assumed to consist only of members of a base64
|
||||
alphabet (i.e no whitepace). Iteration yields a sequence of lines.
|
||||
The line does NOT terminate with a line ending.
|
||||
|
||||
Use the filter_formatting() function to assure the input text
|
||||
contains only the members of the alphabet.
|
||||
|
||||
:param text: text containing ONLY characters in a base64 alphabet
|
||||
:type text: string
|
||||
:param width: number of characters in each wrapped line (default: 64)
|
||||
:type width: int
|
||||
:returns: generator -- sequence of lines of base64 text.
|
||||
"""
|
||||
|
||||
text = unicode(text)
|
||||
for x in xrange(0, len(text), width):
|
||||
yield text[x:x + width]
|
||||
|
||||
|
||||
def base64_wrap(text, width=64):
|
||||
"""Fold text into lines of text with max line length.
|
||||
|
||||
Input is assumed to consist only of members of a base64
|
||||
alphabet (i.e no whitepace). Fold the text into lines whose
|
||||
line length is width chars long, terminate each line with line
|
||||
ending (default is '\n'). Return the wrapped text as a single
|
||||
string.
|
||||
|
||||
Use the filter_formatting() function to assure the input text
|
||||
contains only the members of the alphabet.
|
||||
|
||||
:param text: text containing ONLY characters in a base64 alphabet
|
||||
:type text: string
|
||||
:param width: number of characters in each wrapped line (default: 64)
|
||||
:type width: int
|
||||
:returns: string -- wrapped text.
|
||||
"""
|
||||
|
||||
buf = io.StringIO()
|
||||
|
||||
for line in base64_wrap_iter(text, width):
|
||||
buf.write(line)
|
||||
buf.write(u'\n')
|
||||
|
||||
text = buf.getvalue()
|
||||
buf.close()
|
||||
return text
|
|
@ -0,0 +1,509 @@
|
|||
# vim: tabstop=4 shiftwidth=4 softtabstop=4
|
||||
|
||||
# Copyright 2013 Red Hat, Inc.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
|
||||
"""
|
||||
PEM formatted data is used frequenlty in conjunction with X509 PKI as
|
||||
a data exchange mechanism for binary data. The acronym PEM stands for
|
||||
Privacy Enhanced Mail as defined in RFC-1421. Contrary to expectation
|
||||
the PEM format in common use has little to do with RFC-1421. Instead
|
||||
what we know as PEM format grew out of the need for a data exchange
|
||||
mechanism largely by the influence of OpenSSL. Other X509
|
||||
implementations have adopted it.
|
||||
|
||||
Unfortunately PEM format has never been officialy standarized. It's
|
||||
basic format is as follows:
|
||||
|
||||
1) A header consisting of 5 hyphens followed by the word BEGIN and a
|
||||
single space. Then an upper case string describing the contents of the
|
||||
PEM block, this is followed by 5 hyphens and a newline.
|
||||
|
||||
2) Binary data (typically in DER ASN.1 format) encoded in base64. The
|
||||
base64 text is line wrapped so that each line of base64 is 64
|
||||
characters long and terminated with a newline. The last line of base64
|
||||
text may be less than 64 characters. The content and format of the
|
||||
binary data is entirely dependent upon the type of data announced in
|
||||
the header and footer.
|
||||
|
||||
3) A footer in the exact same as the header execpt the word BEGIN is
|
||||
replaced by END. The content name in both the header and footer should
|
||||
exactly match.
|
||||
|
||||
The above is called a PEM block. It is permissible for multiple PEM
|
||||
blocks to appear in a single file or block of text. This is often used
|
||||
when specifying multiple X509 certificates.
|
||||
|
||||
An example PEM block for a certificate is:
|
||||
|
||||
-----BEGIN CERTIFICATE-----
|
||||
MIIC0TCCAjqgAwIBAgIJANsHKV73HYOwMA0GCSqGSIb3DQEBBQUAMIGeMQowCAYD
|
||||
VQQFEwE1MQswCQYDVQQGEwJVUzELMAkGA1UECBMCQ0ExEjAQBgNVBAcTCVN1bm55
|
||||
dmFsZTESMBAGA1UEChMJT3BlblN0YWNrMREwDwYDVQQLEwhLZXlzdG9uZTElMCMG
|
||||
CSqGSIb3DQEJARYWa2V5c3RvbmVAb3BlbnN0YWNrLm9yZzEUMBIGA1UEAxMLU2Vs
|
||||
ZiBTaWduZWQwIBcNMTIxMTA1MTgxODI0WhgPMjA3MTA0MzAxODE4MjRaMIGeMQow
|
||||
CAYDVQQFEwE1MQswCQYDVQQGEwJVUzELMAkGA1UECBMCQ0ExEjAQBgNVBAcTCVN1
|
||||
bm55dmFsZTESMBAGA1UEChMJT3BlblN0YWNrMREwDwYDVQQLEwhLZXlzdG9uZTEl
|
||||
MCMGCSqGSIb3DQEJARYWa2V5c3RvbmVAb3BlbnN0YWNrLm9yZzEUMBIGA1UEAxML
|
||||
U2VsZiBTaWduZWQwgZ8wDQYJKoZIhvcNAQEBBQADgY0AMIGJAoGBALzI17ExCaqd
|
||||
r7xY2Q5CBZ1bW1lsrXxS8eNJRdQtskDuQVAluY03/OGZd8HQYiiY/ci2tYy7BNIC
|
||||
bh5GaO95eqTDykJR3liOYE/tHbY6puQlj2ZivmhlSd2d5d7lF0/H28RQsLu9VktM
|
||||
uw6q9DpDm35jfrr8LgSeA3MdVqcS/4OhAgMBAAGjEzARMA8GA1UdEwEB/wQFMAMB
|
||||
Af8wDQYJKoZIhvcNAQEFBQADgYEAjSQND7i1dNZtLKpWgX+JqMr3BdVlM15mFeVr
|
||||
C26ZspZjZVY5okdozO9gU3xcwRe4Cg30sKFOe6EBQKpkTZucFOXwBtD3h6dWJrdD
|
||||
c+m/CL/rs0GatDavbaIT2vv405SQUQooCdVh72LYel+4/a6xmRd7fQx3iEXN9QYj
|
||||
vmHJUcA=
|
||||
-----END CERTIFICATE-----
|
||||
|
||||
PEM format is safe for transmission in 7-bit ASCII systems
|
||||
(i.e. standard email). Since 7-bit ASCII is a proper subset of UTF-8
|
||||
and Latin-1 it is not affected by transcoding between those
|
||||
charsets. Nor is PEM format affected by the choice of line
|
||||
endings. This makes PEM format particularity attractive for transport
|
||||
and storage of binary data.
|
||||
|
||||
This module provides a number of utilities supporting the generation
|
||||
and consumption of PEM formatted data including:
|
||||
|
||||
* parse text and find all PEM blocks contained in the
|
||||
text. Information on the location of the block in the text, the
|
||||
type of PEM block, and it's base64 and binary data contents.
|
||||
|
||||
* parse text assumed to contain PEM data and return the binary
|
||||
data.
|
||||
|
||||
* test if a block of text is a PEM block
|
||||
|
||||
* convert base64 text into a formatted PEM block
|
||||
|
||||
* convert binary data into a formatted PEM block
|
||||
|
||||
* access to the valid PEM types and their headers
|
||||
|
||||
"""
|
||||
|
||||
import base64
|
||||
import io
|
||||
from keystone.common import base64utils
|
||||
import re
|
||||
|
||||
PEM_TYPE_TO_HEADER = {
|
||||
u'cms': u'CMS',
|
||||
u'dsa-private': u'DSA PRIVATE KEY',
|
||||
u'dsa-public': u'DSA PUBLIC KEY',
|
||||
u'ecdsa-public': u'ECDSA PUBLIC KEY',
|
||||
u'ec-private': u'EC PRIVATE KEY',
|
||||
u'pkcs7': u'PKCS7',
|
||||
u'pkcs7-signed': u'PKCS',
|
||||
u'pkcs8': u'ENCRYPTED PRIVATE KEY',
|
||||
u'private-key': u'PRIVATE KEY',
|
||||
u'public-key': u'PUBLIC KEY',
|
||||
u'rsa-private': u'RSA PRIVATE KEY',
|
||||
u'rsa-public': u'RSA PUBLIC KEY',
|
||||
u'cert': u'CERTIFICATE',
|
||||
u'crl': u'X509 CRL',
|
||||
u'cert-pair': u'CERTIFICATE PAIR',
|
||||
u'csr': u'CERTIFICATE REQUEST',
|
||||
}
|
||||
|
||||
# This is not a 1-to-1 reverse map of PEM_TYPE_TO_HEADER
|
||||
# because it includes deprecated headers that map to 1 pem_type.
|
||||
PEM_HEADER_TO_TYPE = {
|
||||
u'CMS': u'cms',
|
||||
u'DSA PRIVATE KEY': u'dsa-private',
|
||||
u'DSA PUBLIC KEY': u'dsa-public',
|
||||
u'ECDSA PUBLIC KEY': u'ecdsa-public',
|
||||
u'EC PRIVATE KEY': u'ec-private',
|
||||
u'PKCS7': u'pkcs7',
|
||||
u'PKCS': u'pkcs7-signed',
|
||||
u'ENCRYPTED PRIVATE KEY': u'pkcs8',
|
||||
u'PRIVATE KEY': u'private-key',
|
||||
u'PUBLIC KEY': u'public-key',
|
||||
u'RSA PRIVATE KEY': u'rsa-private',
|
||||
u'RSA PUBLIC KEY': u'rsa-public',
|
||||
u'CERTIFICATE': u'cert',
|
||||
u'X509 CERTIFICATE': u'cert',
|
||||
u'CERTIFICATE PAIR': u'cert-pair',
|
||||
u'X509 CRL': u'crl',
|
||||
u'CERTIFICATE REQUEST': u'csr',
|
||||
u'NEW CERTIFICATE REQUEST': u'csr',
|
||||
}
|
||||
|
||||
# List of valid pem_types
|
||||
pem_types = sorted(PEM_TYPE_TO_HEADER.keys())
|
||||
|
||||
# List of valid pem_headers
|
||||
pem_headers = sorted(PEM_TYPE_TO_HEADER.values())
|
||||
|
||||
_pem_begin_re = re.compile(r'^-{5}BEGIN\s+([^-]+)-{5}\s*$', re.MULTILINE)
|
||||
_pem_end_re = re.compile(r'^-{5}END\s+([^-]+)-{5}\s*$', re.MULTILINE)
|
||||
|
||||
|
||||
class PEMParseResult(object):
|
||||
"""Information returned when a PEM block is found in text.
|
||||
|
||||
PEMParseResult contains information about a PEM block discovered
|
||||
while parsing text. The following properties are defined:
|
||||
|
||||
pem_type
|
||||
A short hand name for the type of the PEM data, e.g. cert,
|
||||
csr, crl, cms, key. Valid pem_types are listed in pem_types.
|
||||
When the pem_type is set the pem_header is updated to match it.
|
||||
|
||||
pem_header
|
||||
The text following '-----BEGIN ' in the PEM header.
|
||||
Common examples are:
|
||||
|
||||
-----BEGIN CERTIFICATE-----
|
||||
-----BEGIN CMS-----
|
||||
|
||||
Thus the pem_header would be CERTIFICATE and CMS respectively.
|
||||
When the pem_header is set the pem_type is updated to match it.
|
||||
|
||||
pem_start, pem_end
|
||||
The beginning and ending positions of the PEM block
|
||||
including the PEM header and footer.
|
||||
|
||||
base64_start, base64_end
|
||||
The beginning and ending positions of the base64 data
|
||||
contained inside the PEM header and footer. Includes trailing
|
||||
new line
|
||||
|
||||
binary_data
|
||||
The decoded base64 data. None if not decoded.
|
||||
|
||||
"""
|
||||
|
||||
def __init__(self, pem_type=None, pem_header=None,
|
||||
pem_start=None, pem_end=None,
|
||||
base64_start=None, base64_end=None,
|
||||
binary_data=None):
|
||||
|
||||
self._pem_type = None
|
||||
self._pem_header = None
|
||||
|
||||
if pem_type is not None:
|
||||
self.pem_type = pem_type
|
||||
|
||||
if pem_header is not None:
|
||||
self.pem_header = pem_header
|
||||
|
||||
self.pem_start = pem_start
|
||||
self.pem_end = pem_end
|
||||
self.base64_start = base64_start
|
||||
self.base64_end = base64_end
|
||||
self.binary_data = binary_data
|
||||
|
||||
@property
|
||||
def pem_type(self):
|
||||
return self._pem_type
|
||||
|
||||
@pem_type.setter
|
||||
def pem_type(self, pem_type):
|
||||
if pem_type is None:
|
||||
self._pem_type = None
|
||||
self._pem_header = None
|
||||
else:
|
||||
pem_header = PEM_TYPE_TO_HEADER.get(pem_type)
|
||||
if pem_header is None:
|
||||
raise ValueError(_('unknown pem_type "%(pem_type)s", '
|
||||
'valid types are: %(valid_pem_types)s') %
|
||||
{'pem_type': pem_type,
|
||||
'valid_pem_types': ', '.join(pem_types)})
|
||||
self._pem_type = pem_type
|
||||
self._pem_header = pem_header
|
||||
|
||||
@property
|
||||
def pem_header(self):
|
||||
return self._pem_header
|
||||
|
||||
@pem_header.setter
|
||||
def pem_header(self, pem_header):
|
||||
if pem_header is None:
|
||||
self._pem_type = None
|
||||
self._pem_header = None
|
||||
else:
|
||||
pem_type = PEM_HEADER_TO_TYPE.get(pem_header)
|
||||
if pem_type is None:
|
||||
raise ValueError(_('unknown pem header "%(pem_header)s", '
|
||||
'valid headers are: '
|
||||
'%(valid_pem_headers)s') %
|
||||
{'pem_header': pem_header,
|
||||
'valid_pem_headers':
|
||||
', '.join("'%s'" %
|
||||
[x for x in pem_headers])})
|
||||
|
||||
self._pem_type = pem_type
|
||||
self._pem_header = pem_header
|
||||
|
||||
#------------------------------------------------------------------------------
|
||||
|
||||
|
||||
def pem_search(text, start=0):
|
||||
"""Search for a block of PEM formatted data
|
||||
|
||||
Search for a PEM block in a text string. The search begins at
|
||||
start. If a PEM block is found a PEMParseResult object is
|
||||
returned, otherwise if no PEM block is found None is returned.
|
||||
|
||||
If the pem_type is not the same in both the header and footer
|
||||
a ValueError is raised.
|
||||
|
||||
The start and end positions are suitable for use as slices into
|
||||
the text. To search for multiple PEM blocks pass pem_end as the
|
||||
start position for the next iteration. Terminate the iteration
|
||||
when None is returned. Example:
|
||||
|
||||
start = 0
|
||||
while True:
|
||||
block = pem_search(text, start)
|
||||
if block is None:
|
||||
break
|
||||
base64_data = text[block.base64_start : block.base64_end]
|
||||
start = block.pem_end
|
||||
|
||||
:param text: the text to search for PEM blocks
|
||||
:type text: string
|
||||
:param start: the position in text to start searching from (default: 0)
|
||||
:type start: int
|
||||
:returns: PEMParseResult or None if not found
|
||||
:raises: ValueError
|
||||
"""
|
||||
|
||||
match = _pem_begin_re.search(text, pos=start)
|
||||
if match:
|
||||
pem_start = match.start()
|
||||
begin_text = match.group(0)
|
||||
base64_start = min(len(text), match.end() + 1)
|
||||
begin_pem_header = match.group(1).strip()
|
||||
|
||||
match = _pem_end_re.search(text, pos=base64_start)
|
||||
if match:
|
||||
pem_end = min(len(text), match.end() + 1)
|
||||
base64_end = match.start()
|
||||
end_pem_header = match.group(1).strip()
|
||||
else:
|
||||
raise ValueError(_('failed to find end matching "%s"') %
|
||||
begin_text)
|
||||
|
||||
if begin_pem_header != end_pem_header:
|
||||
raise ValueError(_('beginning & end PEM headers do not match '
|
||||
'(%(begin_pem_header)s'
|
||||
'!= '
|
||||
'%(end_pem_header)s)') %
|
||||
{'begin_pem_header': begin_pem_header,
|
||||
'end_pem_header': end_pem_header})
|
||||
else:
|
||||
return None
|
||||
|
||||
result = PEMParseResult(pem_header=begin_pem_header,
|
||||
pem_start=pem_start, pem_end=pem_end,
|
||||
base64_start=base64_start, base64_end=base64_end)
|
||||
|
||||
return result
|
||||
|
||||
|
||||
def parse_pem(text, pem_type=None, max_items=None):
|
||||
"""Scan text for PEM data, return list of PEM items
|
||||
|
||||
The input text is scanned for PEM blocks, for each one found a
|
||||
PEMParseResult is contructed and added to the return list.
|
||||
|
||||
pem_type operates as a filter on the type of PEM desired. If
|
||||
pem_type is specified only those PEM blocks which match will be
|
||||
included. The pem_type is a logical name, not the actual text in
|
||||
the pem header (e.g. 'cert'). If the pem_type is None all PEM
|
||||
blocks are returned.
|
||||
|
||||
If max_items is specified the result is limited to that number of
|
||||
items.
|
||||
|
||||
The return value is a list of PEMParseResult objects. The
|
||||
PEMParseResult provides complete information about the PEM block
|
||||
including the decoded binary data for the PEM block. The list is
|
||||
ordered in the same order as found in the text.
|
||||
|
||||
Examples:
|
||||
|
||||
# Get all certs
|
||||
certs = parse_pem(text, 'cert')
|
||||
|
||||
# Get the first cert
|
||||
try:
|
||||
binary_cert = parse_pem(text, 'cert', 1)[0].binary_data
|
||||
except IndexError:
|
||||
raise ValueError('no cert found')
|
||||
|
||||
:param text: The text to search for PEM blocks
|
||||
:type text: string
|
||||
:param pem_type: Only return data for this pem_type.
|
||||
Valid types are: csr, cert, crl, cms, key.
|
||||
If pem_type is None no filtering is performed.
|
||||
(default: None)
|
||||
:type pem_type: string or None
|
||||
:param max_items: Limit the number of blocks returned. (default: None)
|
||||
:type max_items: int or None
|
||||
:return: List of PEMParseResult, one for each PEM block found
|
||||
:raises: ValueError, InvalidBase64Error
|
||||
"""
|
||||
|
||||
pem_blocks = []
|
||||
start = 0
|
||||
|
||||
while True:
|
||||
block = pem_search(text, start)
|
||||
if block is None:
|
||||
break
|
||||
start = block.pem_end
|
||||
if pem_type is None:
|
||||
pem_blocks.append(block)
|
||||
else:
|
||||
try:
|
||||
if block.pem_type == pem_type:
|
||||
pem_blocks.append(block)
|
||||
except KeyError:
|
||||
raise ValueError(_('unknown pem_type: "%s"') % (pem_type))
|
||||
|
||||
if max_items is not None and len(pem_blocks) >= max_items:
|
||||
break
|
||||
|
||||
for block in pem_blocks:
|
||||
base64_data = text[block.base64_start:block.base64_end]
|
||||
try:
|
||||
binary_data = base64.b64decode(base64_data)
|
||||
except Exception as e:
|
||||
block.binary_data = None
|
||||
raise base64utils.InvalidBase64Error(
|
||||
_('failed to base64 decode %(pem_type)s PEM at position'
|
||||
'%(position)d: %(err_msg)s') %
|
||||
{'pem_type': block.pem_type,
|
||||
'position': block.pem_start,
|
||||
'err_msg': str(e)})
|
||||
else:
|
||||
block.binary_data = binary_data
|
||||
|
||||
return pem_blocks
|
||||
|
||||
|
||||
def get_pem_data(text, pem_type='cert'):
|
||||
"""Scan text for PEM data, return binary contents
|
||||
|
||||
The input text is scanned for a PEM block which matches the pem_type.
|
||||
If found the binary data contained in the PEM block is returned.
|
||||
If no PEM block is found or it does not match the specified pem type
|
||||
None is returned.
|
||||
|
||||
:param text: The text to search for the PEM block
|
||||
:type text: string
|
||||
:param pem_type: Only return data for this pem_type.
|
||||
Valid types are: csr, cert, crl, cms, key.
|
||||
(default: 'cert')
|
||||
:type pem_type: string
|
||||
:return: binary data or None if not found.
|
||||
"""
|
||||
|
||||
blocks = parse_pem(text, pem_type, 1)
|
||||
if not blocks:
|
||||
return None
|
||||
return blocks[0].binary_data
|
||||
|
||||
|
||||
def is_pem(text, pem_type='cert'):
|
||||
"""Does this text contain a PEM block.
|
||||
|
||||
Check for the existence of a PEM formatted block in the
|
||||
text, if one is found verify it's contents can be base64
|
||||
decoded, if so return True. Return False otherwise.
|
||||
|
||||
:param text: The text to search for PEM blocks
|
||||
:type text: string
|
||||
:param pem_type: Only return data for this pem_type.
|
||||
Valid types are: csr, cert, crl, cms, key.
|
||||
(default: 'cert')
|
||||
:type pem_type: string
|
||||
:returns: bool -- True if text contains PEM matching the pem_type,
|
||||
False otherwise.
|
||||
"""
|
||||
|
||||
try:
|
||||
pem_blocks = parse_pem(text, pem_type, max_items=1)
|
||||
except base64utils.InvalidBase64Error:
|
||||
return False
|
||||
|
||||
if pem_blocks:
|
||||
return True
|
||||
else:
|
||||
return False
|
||||
|
||||
|
||||
def base64_to_pem(base64_text, pem_type='cert'):
|
||||
"""Format string of base64 text into PEM format
|
||||
|
||||
Input is assumed to consist only of members of the base64 alphabet
|
||||
(i.e no whitepace). Use one of the filter functions from
|
||||
base64utils to assure the input is clean
|
||||
(i.e. strip_whitespace()).
|
||||
|
||||
:param base64_text: text containing ONLY base64 alphabet
|
||||
characters to be inserted into PEM output.
|
||||
:type base64_text: string
|
||||
:param pem_type: Produce a PEM block for this type.
|
||||
Valid types are: csr, cert, crl, cms, key.
|
||||
(default: 'cert')
|
||||
:type pem_type: string
|
||||
:returns: string -- PEM formatted text
|
||||
|
||||
|
||||
"""
|
||||
pem_header = PEM_TYPE_TO_HEADER[pem_type]
|
||||
buf = io.StringIO()
|
||||
|
||||
buf.write(u'-----BEGIN %s-----' % pem_header)
|
||||
buf.write(u'\n')
|
||||
|
||||
for line in base64utils.base64_wrap_iter(base64_text, width=64):
|
||||
buf.write(line)
|
||||
buf.write(u'\n')
|
||||
|
||||
buf.write(u'-----END %s-----' % pem_header)
|
||||
buf.write(u'\n')
|
||||
|
||||
text = buf.getvalue()
|
||||
buf.close()
|
||||
return text
|
||||
|
||||
|
||||
def binary_to_pem(binary_data, pem_type='cert'):
|
||||
"""Format binary data into PEM format
|
||||
|
||||
Example:
|
||||
|
||||
# get the certificate binary data in DER format
|
||||
der_data = certificate.der
|
||||
# convert the DER binary data into a PEM
|
||||
pem = binary_to_pem(der_data, 'cert')
|
||||
|
||||
|
||||
:param binary_data: binary data to encapsulate into PEM
|
||||
:type binary_data: buffer
|
||||
:param pem_type: Produce a PEM block for this type.
|
||||
Valid types are: csr, cert, crl, cms, key.
|
||||
(default: 'cert')
|
||||
:type pem_type: string
|
||||
:returns: string -- PEM formatted text
|
||||
|
||||
"""
|
||||
base64_text = base64.b64encode(binary_data)
|
||||
return base64_to_pem(base64_text, pem_type)
|
|
@ -0,0 +1,196 @@
|
|||
# vim: tabstop=4 shiftwidth=4 softtabstop=4
|
||||
|
||||
# Copyright 2013 Red Hat, Inc.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
from keystone.common import base64utils
|
||||
from keystone import tests
|
||||
|
||||
base64_alphabet = ('ABCDEFGHIJKLMNOPQRSTUVWXYZ'
|
||||
'abcdefghijklmnopqrstuvwxyz'
|
||||
'0123456789'
|
||||
'+/=') # includes pad char
|
||||
|
||||
base64url_alphabet = ('ABCDEFGHIJKLMNOPQRSTUVWXYZ'
|
||||
'abcdefghijklmnopqrstuvwxyz'
|
||||
'0123456789'
|
||||
'-_=') # includes pad char
|
||||
|
||||
|
||||
class TestValid(tests.TestCase):
|
||||
def test_valid_base64(self):
|
||||
self.assertTrue(base64utils.is_valid_base64('+/=='))
|
||||
self.assertTrue(base64utils.is_valid_base64('+/+='))
|
||||
self.assertTrue(base64utils.is_valid_base64('+/+/'))
|
||||
|
||||
self.assertFalse(base64utils.is_valid_base64('-_=='))
|
||||
self.assertFalse(base64utils.is_valid_base64('-_-='))
|
||||
self.assertFalse(base64utils.is_valid_base64('-_-_'))
|
||||
|
||||
self.assertTrue(base64utils.is_valid_base64('abcd'))
|
||||
self.assertFalse(base64utils.is_valid_base64('abcde'))
|
||||
self.assertFalse(base64utils.is_valid_base64('abcde=='))
|
||||
self.assertFalse(base64utils.is_valid_base64('abcdef'))
|
||||
self.assertTrue(base64utils.is_valid_base64('abcdef=='))
|
||||
self.assertFalse(base64utils.is_valid_base64('abcdefg'))
|
||||
self.assertTrue(base64utils.is_valid_base64('abcdefg='))
|
||||
self.assertTrue(base64utils.is_valid_base64('abcdefgh'))
|
||||
|
||||
self.assertFalse(base64utils.is_valid_base64('-_=='))
|
||||
|
||||
def test_valid_base64url(self):
|
||||
self.assertFalse(base64utils.is_valid_base64url('+/=='))
|
||||
self.assertFalse(base64utils.is_valid_base64url('+/+='))
|
||||
self.assertFalse(base64utils.is_valid_base64url('+/+/'))
|
||||
|
||||
self.assertTrue(base64utils.is_valid_base64url('-_=='))
|
||||
self.assertTrue(base64utils.is_valid_base64url('-_-='))
|
||||
self.assertTrue(base64utils.is_valid_base64url('-_-_'))
|
||||
|
||||
self.assertTrue(base64utils.is_valid_base64url('abcd'))
|
||||
self.assertFalse(base64utils.is_valid_base64url('abcde'))
|
||||
self.assertFalse(base64utils.is_valid_base64url('abcde=='))
|
||||
self.assertFalse(base64utils.is_valid_base64url('abcdef'))
|
||||
self.assertTrue(base64utils.is_valid_base64url('abcdef=='))
|
||||
self.assertFalse(base64utils.is_valid_base64url('abcdefg'))
|
||||
self.assertTrue(base64utils.is_valid_base64url('abcdefg='))
|
||||
self.assertTrue(base64utils.is_valid_base64url('abcdefgh'))
|
||||
|
||||
self.assertTrue(base64utils.is_valid_base64url('-_=='))
|
||||
|
||||
|
||||
class TestBase64Padding(tests.TestCase):
|
||||
|
||||
def test_filter(self):
|
||||
self.assertEqual(base64utils.filter_formatting(''), '')
|
||||
self.assertEqual(base64utils.filter_formatting(' '), '')
|
||||
self.assertEqual(base64utils.filter_formatting('a'), 'a')
|
||||
self.assertEqual(base64utils.filter_formatting(' a'), 'a')
|
||||
self.assertEqual(base64utils.filter_formatting('a '), 'a')
|
||||
self.assertEqual(base64utils.filter_formatting('ab'), 'ab')
|
||||
self.assertEqual(base64utils.filter_formatting(' ab'), 'ab')
|
||||
self.assertEqual(base64utils.filter_formatting('ab '), 'ab')
|
||||
self.assertEqual(base64utils.filter_formatting('a b'), 'ab')
|
||||
self.assertEqual(base64utils.filter_formatting(' a b'), 'ab')
|
||||
self.assertEqual(base64utils.filter_formatting('a b '), 'ab')
|
||||
self.assertEqual(base64utils.filter_formatting('a\nb\n '), 'ab')
|
||||
|
||||
text = ('ABCDEFGHIJKLMNOPQRSTUVWXYZ'
|
||||
'abcdefghijklmnopqrstuvwxyz'
|
||||
'0123456789'
|
||||
'+/=')
|
||||
self.assertEqual(base64_alphabet,
|
||||
base64utils.filter_formatting(text))
|
||||
|
||||
text = (' ABCDEFGHIJKLMNOPQRSTUVWXYZ\n'
|
||||
' abcdefghijklmnopqrstuvwxyz\n'
|
||||
'\t\f\r'
|
||||
' 0123456789\n'
|
||||
' +/=')
|
||||
self.assertEqual(base64_alphabet,
|
||||
base64utils.filter_formatting(text))
|
||||
self.assertEqual(base64url_alphabet,
|
||||
base64utils.base64_to_base64url(base64_alphabet))
|
||||
|
||||
text = ('ABCDEFGHIJKLMNOPQRSTUVWXYZ'
|
||||
'abcdefghijklmnopqrstuvwxyz'
|
||||
'0123456789'
|
||||
'-_=')
|
||||
self.assertEqual(base64url_alphabet,
|
||||
base64utils.filter_formatting(text))
|
||||
|
||||
text = (' ABCDEFGHIJKLMNOPQRSTUVWXYZ\n'
|
||||
' abcdefghijklmnopqrstuvwxyz\n'
|
||||
'\t\f\r'
|
||||
' 0123456789\n'
|
||||
'-_=')
|
||||
self.assertEqual(base64url_alphabet,
|
||||
base64utils.filter_formatting(text))
|
||||
|
||||
def test_alphabet_conversion(self):
|
||||
self.assertEqual(base64url_alphabet,
|
||||
base64utils.base64_to_base64url(base64_alphabet))
|
||||
|
||||
self.assertEqual(base64_alphabet,
|
||||
base64utils.base64url_to_base64(base64url_alphabet))
|
||||
|
||||
def test_is_padded(self):
|
||||
self.assertTrue(base64utils.base64_is_padded('ABCD'))
|
||||
self.assertTrue(base64utils.base64_is_padded('ABC='))
|
||||
self.assertTrue(base64utils.base64_is_padded('AB=='))
|
||||
|
||||
self.assertTrue(base64utils.base64_is_padded('1234ABCD'))
|
||||
self.assertTrue(base64utils.base64_is_padded('1234ABC='))
|
||||
self.assertTrue(base64utils.base64_is_padded('1234AB=='))
|
||||
|
||||
self.assertFalse(base64utils.base64_is_padded('ABC'))
|
||||
self.assertFalse(base64utils.base64_is_padded('AB'))
|
||||
self.assertFalse(base64utils.base64_is_padded('A'))
|
||||
self.assertFalse(base64utils.base64_is_padded(''))
|
||||
|
||||
self.assertRaises(base64utils.InvalidBase64Error,
|
||||
base64utils.base64_is_padded, '=')
|
||||
|
||||
self.assertRaises(base64utils.InvalidBase64Error,
|
||||
base64utils.base64_is_padded, 'AB=C')
|
||||
|
||||
self.assertRaises(base64utils.InvalidBase64Error,
|
||||
base64utils.base64_is_padded, 'AB=')
|
||||
|
||||
self.assertRaises(base64utils.InvalidBase64Error,
|
||||
base64utils.base64_is_padded, 'ABCD=')
|
||||
|
||||
def test_strip_padding(self):
|
||||
self.assertEqual(base64utils.base64_strip_padding('ABCD'), 'ABCD')
|
||||
self.assertEqual(base64utils.base64_strip_padding('ABC='), 'ABC')
|
||||
self.assertEqual(base64utils.base64_strip_padding('AB=='), 'AB')
|
||||
|
||||
def test_assure_padding(self):
|
||||
self.assertEqual(base64utils.base64_assure_padding('ABCD'), 'ABCD')
|
||||
self.assertEqual(base64utils.base64_assure_padding('ABC'), 'ABC=')
|
||||
self.assertEqual(base64utils.base64_assure_padding('ABC='), 'ABC=')
|
||||
self.assertEqual(base64utils.base64_assure_padding('AB'), 'AB==')
|
||||
self.assertEqual(base64utils.base64_assure_padding('AB=='), 'AB==')
|
||||
|
||||
def test_base64_percent_encoding(self):
|
||||
self.assertEqual(base64utils.base64url_percent_encode('ABCD'), 'ABCD')
|
||||
self.assertEqual(base64utils.base64url_percent_encode('ABC='),
|
||||
'ABC%3D')
|
||||
self.assertEqual(base64utils.base64url_percent_encode('AB=='),
|
||||
'AB%3D%3D')
|
||||
|
||||
self.assertEqual(base64utils.base64url_percent_decode('ABCD'), 'ABCD')
|
||||
self.assertEqual(base64utils.base64url_percent_decode('ABC%3D'),
|
||||
'ABC=')
|
||||
self.assertEqual(base64utils.base64url_percent_decode('AB%3D%3D'),
|
||||
'AB==')
|
||||
|
||||
|
||||
class TestTextWrap(tests.TestCase):
|
||||
|
||||
def test_wrapping(self):
|
||||
raw_text = 'abcdefgh'
|
||||
wrapped_text = 'abc\ndef\ngh\n'
|
||||
|
||||
self.assertEqual(base64utils.base64_wrap(raw_text, width=3),
|
||||
wrapped_text)
|
||||
|
||||
t = '\n'.join(base64utils.base64_wrap_iter(raw_text, width=3)) + '\n'
|
||||
self.assertEqual(t, wrapped_text)
|
||||
|
||||
raw_text = 'abcdefgh'
|
||||
wrapped_text = 'abcd\nefgh\n'
|
||||
|
||||
self.assertEqual(base64utils.base64_wrap(raw_text, width=4),
|
||||
wrapped_text)
|
|
@ -0,0 +1,336 @@
|
|||
# vim: tabstop=4 shiftwidth=4 softtabstop=4
|
||||
|
||||
# Copyright 2013 Red Hat, Inc.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import base64
|
||||
from keystone.common import pemutils
|
||||
from keystone import tests
|
||||
|
||||
|
||||
# List of 2-tuples, (pem_type, pem_header)
|
||||
headers = pemutils.PEM_TYPE_TO_HEADER.items()
|
||||
|
||||
|
||||
def make_data(size, offset=0):
|
||||
return ''.join([chr(x % 255) for x in xrange(offset, size + offset)])
|
||||
|
||||
|
||||
def make_base64_from_data(data):
|
||||
return base64.b64encode(data)
|
||||
|
||||
|
||||
def wrap_base64(base64_text):
|
||||
wrapped_text = '\n'.join([base64_text[x:x + 64]
|
||||
for x in xrange(0, len(base64_text), 64)])
|
||||
wrapped_text += '\n'
|
||||
return wrapped_text
|
||||
|
||||
|
||||
def make_pem(header, data):
|
||||
base64_text = make_base64_from_data(data)
|
||||
wrapped_text = wrap_base64(base64_text)
|
||||
|
||||
result = '-----BEGIN %s-----\n' % header
|
||||
result += wrapped_text
|
||||
result += '-----END %s-----\n' % header
|
||||
|
||||
return result
|
||||
|
||||
|
||||
class PEM(object):
|
||||
"""PEM text and it's associated data broken out, used for testing.
|
||||
|
||||
"""
|
||||
def __init__(self, pem_header='CERTIFICATE', pem_type='cert',
|
||||
data_size=70, data_offset=0):
|
||||
self.pem_header = pem_header
|
||||
self.pem_type = pem_type
|
||||
self.data_size = data_size
|
||||
self.data_offset = data_offset
|
||||
self.data = make_data(self.data_size, self.data_offset)
|
||||
self.base64_text = make_base64_from_data(self.data)
|
||||
self.wrapped_base64 = wrap_base64(self.base64_text)
|
||||
self.pem_text = make_pem(self.pem_header, self.data)
|
||||
|
||||
|
||||
class TestPEMParseResult(tests.TestCase):
|
||||
|
||||
def test_pem_types(self):
|
||||
for pem_type in pemutils.pem_types:
|
||||
pem_header = pemutils.PEM_TYPE_TO_HEADER[pem_type]
|
||||
r = pemutils.PEMParseResult(pem_type=pem_type)
|
||||
self.assertEqual(pem_type, r.pem_type)
|
||||
self.assertEqual(pem_header, r.pem_header)
|
||||
|
||||
pem_type = 'xxx'
|
||||
self.assertRaises(ValueError,
|
||||
pemutils.PEMParseResult, pem_type=pem_type)
|
||||
|
||||
def test_pem_headers(self):
|
||||
for pem_header in pemutils.pem_headers:
|
||||
pem_type = pemutils.PEM_HEADER_TO_TYPE[pem_header]
|
||||
r = pemutils.PEMParseResult(pem_header=pem_header)
|
||||
self.assertEqual(pem_type, r.pem_type)
|
||||
self.assertEqual(pem_header, r.pem_header)
|
||||
|
||||
pem_header = 'xxx'
|
||||
self.assertRaises(ValueError,
|
||||
pemutils.PEMParseResult, pem_header=pem_header)
|
||||
|
||||
|
||||
class TestPEMParse(tests.TestCase):
|
||||
def test_parse_none(self):
|
||||
text = ''
|
||||
text += 'bla bla\n'
|
||||
text += 'yada yada yada\n'
|
||||
text += 'burfl blatz bingo\n'
|
||||
|
||||
parse_results = pemutils.parse_pem(text)
|
||||
self.assertEqual(len(parse_results), 0)
|
||||
|
||||
self.assertEqual(pemutils.is_pem(text), False)
|
||||
|
||||
def test_parse_invalid(self):
|
||||
p = PEM(pem_type='xxx',
|
||||
pem_header='XXX')
|
||||
text = p.pem_text
|
||||
|
||||
self.assertRaises(ValueError,
|
||||
pemutils.parse_pem, text)
|
||||
|
||||
def test_parse_one(self):
|
||||
data_size = 70
|
||||
count = len(headers)
|
||||
pems = []
|
||||
|
||||
for i in xrange(count):
|
||||
pems.append(PEM(pem_type=headers[i][0],
|
||||
pem_header=headers[i][1],
|
||||
data_size=data_size + i,
|
||||
data_offset=i))
|
||||
|
||||
for i in xrange(count):
|
||||
p = pems[i]
|
||||
text = p.pem_text
|
||||
|
||||
parse_results = pemutils.parse_pem(text)
|
||||
self.assertEqual(len(parse_results), 1)
|
||||
|
||||
r = parse_results[0]
|
||||
self.assertEqual(p.pem_type, r.pem_type)
|
||||
self.assertEqual(p.pem_header, r.pem_header)
|
||||
self.assertEqual(p.pem_text,
|
||||
text[r.pem_start:r.pem_end])
|
||||
self.assertEqual(p.wrapped_base64,
|
||||
text[r.base64_start:r.base64_end])
|
||||
self.assertEqual(p.data, r.binary_data)
|
||||
|
||||
def test_parse_one_embedded(self):
|
||||
p = PEM(data_offset=0)
|
||||
text = ''
|
||||
text += 'bla bla\n'
|
||||
text += 'yada yada yada\n'
|
||||
text += p.pem_text
|
||||
text += 'burfl blatz bingo\n'
|
||||
|
||||
parse_results = pemutils.parse_pem(text)
|
||||
self.assertEqual(len(parse_results), 1)
|
||||
|
||||
r = parse_results[0]
|
||||
self.assertEqual(p.pem_type, r.pem_type)
|
||||
self.assertEqual(p.pem_header, r.pem_header)
|
||||
self.assertEqual(p.pem_text,
|
||||
text[r.pem_start:r.pem_end])
|
||||
self.assertEqual(p.wrapped_base64,
|
||||
text[r.base64_start: r.base64_end])
|
||||
self.assertEqual(p.data, r.binary_data)
|
||||
|
||||
def test_parse_multple(self):
|
||||
data_size = 70
|
||||
count = len(headers)
|
||||
pems = []
|
||||
text = ''
|
||||
|
||||
for i in xrange(count):
|
||||
pems.append(PEM(pem_type=headers[i][0],
|
||||
pem_header=headers[i][1],
|
||||
data_size=data_size + i,
|
||||
data_offset=i))
|
||||
|
||||
for i in xrange(count):
|
||||
text += pems[i].pem_text
|
||||
|
||||
parse_results = pemutils.parse_pem(text)
|
||||
self.assertEqual(len(parse_results), count)
|
||||
|
||||
for i in xrange(count):
|
||||
r = parse_results[i]
|
||||
p = pems[i]
|
||||
|
||||
self.assertEqual(p.pem_type, r.pem_type)
|
||||
self.assertEqual(p.pem_header, r.pem_header)
|
||||
self.assertEqual(p.pem_text,
|
||||
text[r.pem_start:r.pem_end])
|
||||
self.assertEqual(p.wrapped_base64,
|
||||
text[r.base64_start: r.base64_end])
|
||||
self.assertEqual(p.data, r.binary_data)
|
||||
|
||||
def test_parse_multple_find_specific(self):
|
||||
data_size = 70
|
||||
count = len(headers)
|
||||
pems = []
|
||||
text = ''
|
||||
|
||||
for i in xrange(count):
|
||||
pems.append(PEM(pem_type=headers[i][0],
|
||||
pem_header=headers[i][1],
|
||||
data_size=data_size + i,
|
||||
data_offset=i))
|
||||
|
||||
for i in xrange(count):
|
||||
text += pems[i].pem_text
|
||||
|
||||
for i in xrange(count):
|
||||
parse_results = pemutils.parse_pem(text, pem_type=headers[i][0])
|
||||
self.assertEqual(len(parse_results), 1)
|
||||
|
||||
r = parse_results[0]
|
||||
p = pems[i]
|
||||
|
||||
self.assertEqual(p.pem_type, r.pem_type)
|
||||
self.assertEqual(p.pem_header, r.pem_header)
|
||||
self.assertEqual(p.pem_text,
|
||||
text[r.pem_start:r.pem_end])
|
||||
self.assertEqual(p.wrapped_base64,
|
||||
text[r.base64_start:r.base64_end])
|
||||
self.assertEqual(p.data, r.binary_data)
|
||||
|
||||
def test_parse_multple_embedded(self):
|
||||
data_size = 75
|
||||
count = len(headers)
|
||||
pems = []
|
||||
text = ''
|
||||
|
||||
for i in xrange(count):
|
||||
pems.append(PEM(pem_type=headers[i][0],
|
||||
pem_header=headers[i][1],
|
||||
data_size=data_size + i,
|
||||
data_offset=i))
|
||||
|
||||
for i in xrange(count):
|
||||
text += 'bla bla\n'
|
||||
text += 'yada yada yada\n'
|
||||
text += pems[i].pem_text
|
||||
text += 'burfl blatz bingo\n'
|
||||
|
||||
parse_results = pemutils.parse_pem(text)
|
||||
self.assertEqual(len(parse_results), count)
|
||||
|
||||
for i in xrange(count):
|
||||
r = parse_results[i]
|
||||
p = pems[i]
|
||||
|
||||
self.assertEqual(p.pem_type, r.pem_type)
|
||||
self.assertEqual(p.pem_header, r.pem_header)
|
||||
self.assertEqual(p.pem_text,
|
||||
text[r.pem_start:r.pem_end])
|
||||
self.assertEqual(p.wrapped_base64,
|
||||
text[r.base64_start:r.base64_end])
|
||||
self.assertEqual(p.data, r.binary_data)
|
||||
|
||||
def test_get_pem_data_none(self):
|
||||
text = ''
|
||||
text += 'bla bla\n'
|
||||
text += 'yada yada yada\n'
|
||||
text += 'burfl blatz bingo\n'
|
||||
|
||||
data = pemutils.get_pem_data(text)
|
||||
self.assertEqual(None, data)
|
||||
|
||||
def test_get_pem_data_invalid(self):
|
||||
p = PEM(pem_type='xxx',
|
||||
pem_header='XXX')
|
||||
text = p.pem_text
|
||||
|
||||
self.assertRaises(ValueError,
|
||||
pemutils.get_pem_data, text)
|
||||
|
||||
def test_get_pem_data(self):
|
||||
data_size = 70
|
||||
count = len(headers)
|
||||
pems = []
|
||||
|
||||
for i in xrange(count):
|
||||
pems.append(PEM(pem_type=headers[i][0],
|
||||
pem_header=headers[i][1],
|
||||
data_size=data_size + i,
|
||||
data_offset=i))
|
||||
|
||||
for i in xrange(count):
|
||||
p = pems[i]
|
||||
text = p.pem_text
|
||||
|
||||
data = pemutils.get_pem_data(text, p.pem_type)
|
||||
self.assertEqual(p.data, data)
|
||||
|
||||
def test_is_pem(self):
|
||||
data_size = 70
|
||||
count = len(headers)
|
||||
pems = []
|
||||
|
||||
for i in xrange(count):
|
||||
pems.append(PEM(pem_type=headers[i][0],
|
||||
pem_header=headers[i][1],
|
||||
data_size=data_size + i,
|
||||
data_offset=i))
|
||||
|
||||
for i in xrange(count):
|
||||
p = pems[i]
|
||||
text = p.pem_text
|
||||
self.assertTrue(pemutils.is_pem(text, pem_type=p.pem_type))
|
||||
self.assertFalse(pemutils.is_pem(text,
|
||||
pem_type=p.pem_type + 'xxx'))
|
||||
|
||||
def test_base64_to_pem(self):
|
||||
data_size = 70
|
||||
count = len(headers)
|
||||
pems = []
|
||||
|
||||
for i in xrange(count):
|
||||
pems.append(PEM(pem_type=headers[i][0],
|
||||
pem_header=headers[i][1],
|
||||
data_size=data_size + i,
|
||||
data_offset=i))
|
||||
|
||||
for i in xrange(count):
|
||||
p = pems[i]
|
||||
pem = pemutils.base64_to_pem(p.base64_text, p.pem_type)
|
||||
self.assertEqual(pemutils.get_pem_data(pem, p.pem_type), p.data)
|
||||
|
||||
def test_binary_to_pem(self):
|
||||
data_size = 70
|
||||
count = len(headers)
|
||||
pems = []
|
||||
|
||||
for i in xrange(count):
|
||||
pems.append(PEM(pem_type=headers[i][0],
|
||||
pem_header=headers[i][1],
|
||||
data_size=data_size + i,
|
||||
data_offset=i))
|
||||
|
||||
for i in xrange(count):
|
||||
p = pems[i]
|
||||
pem = pemutils.binary_to_pem(p.data, p.pem_type)
|
||||
self.assertEqual(pemutils.get_pem_data(pem, p.pem_type), p.data)
|
Loading…
Reference in New Issue