diff --git a/keystone/common/base64utils.py b/keystone/common/base64utils.py new file mode 100644 index 0000000000..8d8f705bee --- /dev/null +++ b/keystone/common/base64utils.py @@ -0,0 +1,395 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright 2013 Red Hat, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +""" + +Python provides the base64 module as a core module but this is mostly +limited to encoding and decoding base64 and it's variants. It is often +useful to be able to perform other operations on base64 text. This +module is meant to be used in conjunction with the core base64 module. + +Standarized base64 is defined in +RFC-4648 "The Base16, Base32, and Base64 Data Encodings". + +This module provides the following base64 utility functionality: + + * tests if text is valid base64 + * filter formatting from base64 + * convert base64 between different alphabets + * Handle padding issues + - test if base64 is padded + - removes padding + - restores padding + * wraps base64 text into formatted blocks + - via iterator + - return formatted string + +""" + +import io +import re +import string +import urllib + + +class InvalidBase64Error(ValueError): + pass + +base64_alphabet_re = re.compile(r'^[^A-Za-z0-9+/=]+$') +base64url_alphabet_re = re.compile(r'^[^A-Za-z0-9---_=]+$') + +base64_non_alphabet_re = re.compile(r'[^A-Za-z0-9+/=]+') +base64url_non_alphabet_re = re.compile(r'[^A-Za-z0-9---_=]+') + +_strip_formatting_re = re.compile(r'\s+') + +_base64_to_base64url_trans = string.maketrans('+/', '-_') +_base64url_to_base64_trans = string.maketrans('-_', '+/') + + +def is_valid_base64(text): + """Test if input text can be base64 decoded. + + :param text: input base64 text + :type text: string + :returns: bool -- True if text can be decoded as base64, False otherwise + """ + + text = filter_formatting(text) + + if base64_non_alphabet_re.search(text): + return False + + try: + return base64_is_padded(text) + except InvalidBase64Error: + return False + + +def is_valid_base64url(text): + """Test if input text can be base64url decoded. + + :param text: input base64 text + :type text: string + :returns: bool -- True if text can be decoded as base64url, + False otherwise + """ + + text = filter_formatting(text) + + if base64url_non_alphabet_re.search(text): + return False + + try: + return base64_is_padded(text) + except InvalidBase64Error: + return False + + +def filter_formatting(text): + """Return base64 text without any formatting, just the base64. + + Base64 text is often formatted with whitespace, line endings, + etc. This function strips out any formatting, the result will + contain only base64 characters. + + Note, this function does not filter out all non-base64 alphabet + characters, it only removes characters used for formatting. + + :param text: input text to filter + :type text: string + :returns: string -- filtered text without formatting + """ + return _strip_formatting_re.sub('', text) + + +def base64_to_base64url(text): + """Convert base64 text to base64url text. + + base64url text is designed to be safe for use in filenames and + URL's. It is defined in RFC-4648 Section 5. + + base64url differs from base64 in the last two alphabet characters + at index 62 and 63, these are sometimes referred as the + altchars. The '+' character at index 62 is replaced by '-' + (hyphen) and the '/' character at index 63 is replaced by '_' + (underscore). + + This function only translates the altchars, non-alphabet + characters are not filtered out. + + WARNING + ------- + + base64url continues to use the '=' pad character which is NOT URL + safe. RFC-4648 suggests two alternate methods to deal with this. + + percent-encode + percent-encode the pad character (e.g. '=' becomes + '%3D'). This makes the base64url text fully safe. But + percent-enconding has the downside of requiring + percent-decoding prior to feeding the base64url text into a + base64url decoder since most base64url decoders do not + recognize %3D as a pad character and most decoders require + correct padding. + + no-padding + padding is not strictly necessary to decode base64 or + base64url text, the pad can be computed from the input text + length. However many decoders demand padding and will consider + non-padded text to be malformed. If one wants to omit the + trailing pad character(s) for use in URL's it can be added back + using the base64_assure_padding() function. + + This function makes no decisions about which padding methodolgy to + use. One can either call base64_strip_padding() to remove any pad + characters (restoring later with base64_assure_padding()) or call + base64url_percent_encode() to percent-encode the pad characters. + + :param text: input base64 text + :type text: string + :returns: string -- base64url text + """ + return text.translate(_base64_to_base64url_trans) + + +def base64url_to_base64(text): + """Convert base64url text to base64 text. + + See base64_to_base64url() for a description of base64url text and + it's issues. + + This function does NOT handle percent-encoded pad characters, they + will be left intact. If the input base64url text is + percent-encoded you should call + + :param text: text in base64url alphabet + :type text: string + :returns: string -- text in base64 alphabet + + """ + return text.translate(_base64url_to_base64_trans) + + +def base64_is_padded(text, pad='='): + """Test if the text is base64 padded. + + The input text must be in a base64 alphabet. The pad must be a + single character. If the text has been percent-encoded (e.g. pad + is the string '%3D') you must convert the text back to a base64 + alphabet (e.g. if percent-encoded use the function + base64url_percent_decode()). + + :param text: text containing ONLY characters in a base64 alphabet + :type text: string + :param pad: pad character (must be single character) (default: '=') + :type pad: string + :returns: bool -- True if padded, False otherwise + :raises: ValueError, InvalidBase64Error + """ + + if len(pad) != 1: + raise ValueError(_('pad must be single character')) + + text_len = len(text) + if text_len > 0 and text_len % 4 == 0: + pad_index = text.find(pad) + if pad_index >= 0 and pad_index < text_len - 2: + raise InvalidBase64Error(_('text is multiple of 4, ' + 'but pad "%s" occurs before ' + '2nd to last char') % pad) + if pad_index == text_len - 2 and text[-1] != pad: + raise InvalidBase64Error(_('text is multiple of 4, ' + 'but pad "%s" occurs before ' + 'non-pad last char') % pad) + return True + + if text.find(pad) >= 0: + raise InvalidBase64Error(_('text is not a multiple of 4, ' + 'but contains pad "%s"') % pad) + return False + + +def base64url_percent_encode(text): + """Percent-encode base64url padding. + + The input text should only contain base64url alphabet + characters. Any non-base64url alphabet characters will also be + subject to percent-encoding. + + :param text: text containing ONLY characters in the base64url alphabet + :type text: string + :returns: string -- percent-encoded base64url text + :raises: InvalidBase64Error + """ + + if len(text) % 4 != 0: + raise InvalidBase64Error(_('padded base64url text must be ' + 'multiple of 4 characters')) + + return urllib.quote(text) + + +def base64url_percent_decode(text): + """Percent-decode base64url padding. + + The input text should only contain base64url alphabet + characters and the percent-encoded pad character. Any other + percent-encoded characters will be subject to percent-decoding. + + :param text: base64url alphabet text + :type text: string + :returns: string -- percent-decoded base64url text + """ + + decoded_text = urllib.unquote(text) + + if len(decoded_text) % 4 != 0: + raise InvalidBase64Error(_('padded base64url text must be ' + 'multiple of 4 characters')) + + return decoded_text + + +def base64_strip_padding(text, pad='='): + """Remove padding from input base64 text. + + :param text: text containing ONLY characters in a base64 alphabet + :type text: string + :param pad: pad character (must be single character) (default: '=') + :type pad: string + :returns: string -- base64 text without padding + :raises: ValueError + """ + if len(pad) != 1: + raise ValueError(_('pad must be single character')) + + # Can't be padded if text is less than 4 characters. + if len(text) < 4: + return text + + if text[-1] == pad: + if text[-2] == pad: + return text[0:-2] + else: + return text[0:-1] + else: + return text + + +def base64_assure_padding(text, pad='='): + """Assure the input text ends with padding. + + Base64 text is normally expected to be a multple of 4 + characters. Each 4 character base64 sequence produces 3 octets of + binary data. If the binary data is not a multiple of 3 the base64 + text is padded at the end with a pad character such that is is + always a multple of 4. Padding is ignored and does not alter the + binary data nor it's length. + + In some circumstances is is desirable to omit the padding + character due to transport encoding conflicts. Base64 text can + still be correctly decoded if the length of the base64 text + (consisting only of characters in the desired base64 alphabet) is + known, padding is not absolutely necessary. + + Some base64 decoders demand correct padding or one may wish to + format RFC compliant base64, this function performs this action. + + Input is assumed to consist only of members of a base64 + alphabet (i.e no whitepace). Iteration yields a sequence of lines. + The line does NOT terminate with a line ending. + + Use the filter_formatting() function to assure the input text + contains only the members of the alphabet. + + If the text ends with the pad it is assumed to already be + padded. Otherwise the binary length is computed from the input + text length and correct number of pad characters are appended. + + :param text: text containing ONLY characters in a base64 alphabet + :type text: string + :param pad: pad character (must be single character) (default: '=') + :type pad: string + :returns: string -- input base64 text with padding + :raises: ValueError + """ + + if len(pad) != 1: + raise ValueError(_('pad must be single character')) + + if text.endswith(pad): + return text + + n = len(text) % 4 + if n == 0: + return text + + n = 4 - n + padding = pad * n + return text + padding + + +def base64_wrap_iter(text, width=64): + """Fold text into lines of text with max line length. + + Input is assumed to consist only of members of a base64 + alphabet (i.e no whitepace). Iteration yields a sequence of lines. + The line does NOT terminate with a line ending. + + Use the filter_formatting() function to assure the input text + contains only the members of the alphabet. + + :param text: text containing ONLY characters in a base64 alphabet + :type text: string + :param width: number of characters in each wrapped line (default: 64) + :type width: int + :returns: generator -- sequence of lines of base64 text. + """ + + text = unicode(text) + for x in xrange(0, len(text), width): + yield text[x:x + width] + + +def base64_wrap(text, width=64): + """Fold text into lines of text with max line length. + + Input is assumed to consist only of members of a base64 + alphabet (i.e no whitepace). Fold the text into lines whose + line length is width chars long, terminate each line with line + ending (default is '\n'). Return the wrapped text as a single + string. + + Use the filter_formatting() function to assure the input text + contains only the members of the alphabet. + + :param text: text containing ONLY characters in a base64 alphabet + :type text: string + :param width: number of characters in each wrapped line (default: 64) + :type width: int + :returns: string -- wrapped text. + """ + + buf = io.StringIO() + + for line in base64_wrap_iter(text, width): + buf.write(line) + buf.write(u'\n') + + text = buf.getvalue() + buf.close() + return text diff --git a/keystone/common/pemutils.py b/keystone/common/pemutils.py new file mode 100755 index 0000000000..599c6bbf4d --- /dev/null +++ b/keystone/common/pemutils.py @@ -0,0 +1,509 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright 2013 Red Hat, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + + +""" +PEM formatted data is used frequenlty in conjunction with X509 PKI as +a data exchange mechanism for binary data. The acronym PEM stands for +Privacy Enhanced Mail as defined in RFC-1421. Contrary to expectation +the PEM format in common use has little to do with RFC-1421. Instead +what we know as PEM format grew out of the need for a data exchange +mechanism largely by the influence of OpenSSL. Other X509 +implementations have adopted it. + +Unfortunately PEM format has never been officialy standarized. It's +basic format is as follows: + +1) A header consisting of 5 hyphens followed by the word BEGIN and a +single space. Then an upper case string describing the contents of the +PEM block, this is followed by 5 hyphens and a newline. + +2) Binary data (typically in DER ASN.1 format) encoded in base64. The +base64 text is line wrapped so that each line of base64 is 64 +characters long and terminated with a newline. The last line of base64 +text may be less than 64 characters. The content and format of the +binary data is entirely dependent upon the type of data announced in +the header and footer. + +3) A footer in the exact same as the header execpt the word BEGIN is +replaced by END. The content name in both the header and footer should +exactly match. + +The above is called a PEM block. It is permissible for multiple PEM +blocks to appear in a single file or block of text. This is often used +when specifying multiple X509 certificates. + +An example PEM block for a certificate is: + +-----BEGIN CERTIFICATE----- +MIIC0TCCAjqgAwIBAgIJANsHKV73HYOwMA0GCSqGSIb3DQEBBQUAMIGeMQowCAYD +VQQFEwE1MQswCQYDVQQGEwJVUzELMAkGA1UECBMCQ0ExEjAQBgNVBAcTCVN1bm55 +dmFsZTESMBAGA1UEChMJT3BlblN0YWNrMREwDwYDVQQLEwhLZXlzdG9uZTElMCMG +CSqGSIb3DQEJARYWa2V5c3RvbmVAb3BlbnN0YWNrLm9yZzEUMBIGA1UEAxMLU2Vs +ZiBTaWduZWQwIBcNMTIxMTA1MTgxODI0WhgPMjA3MTA0MzAxODE4MjRaMIGeMQow +CAYDVQQFEwE1MQswCQYDVQQGEwJVUzELMAkGA1UECBMCQ0ExEjAQBgNVBAcTCVN1 +bm55dmFsZTESMBAGA1UEChMJT3BlblN0YWNrMREwDwYDVQQLEwhLZXlzdG9uZTEl +MCMGCSqGSIb3DQEJARYWa2V5c3RvbmVAb3BlbnN0YWNrLm9yZzEUMBIGA1UEAxML +U2VsZiBTaWduZWQwgZ8wDQYJKoZIhvcNAQEBBQADgY0AMIGJAoGBALzI17ExCaqd +r7xY2Q5CBZ1bW1lsrXxS8eNJRdQtskDuQVAluY03/OGZd8HQYiiY/ci2tYy7BNIC +bh5GaO95eqTDykJR3liOYE/tHbY6puQlj2ZivmhlSd2d5d7lF0/H28RQsLu9VktM +uw6q9DpDm35jfrr8LgSeA3MdVqcS/4OhAgMBAAGjEzARMA8GA1UdEwEB/wQFMAMB +Af8wDQYJKoZIhvcNAQEFBQADgYEAjSQND7i1dNZtLKpWgX+JqMr3BdVlM15mFeVr +C26ZspZjZVY5okdozO9gU3xcwRe4Cg30sKFOe6EBQKpkTZucFOXwBtD3h6dWJrdD +c+m/CL/rs0GatDavbaIT2vv405SQUQooCdVh72LYel+4/a6xmRd7fQx3iEXN9QYj +vmHJUcA= +-----END CERTIFICATE----- + +PEM format is safe for transmission in 7-bit ASCII systems +(i.e. standard email). Since 7-bit ASCII is a proper subset of UTF-8 +and Latin-1 it is not affected by transcoding between those +charsets. Nor is PEM format affected by the choice of line +endings. This makes PEM format particularity attractive for transport +and storage of binary data. + +This module provides a number of utilities supporting the generation +and consumption of PEM formatted data including: + + * parse text and find all PEM blocks contained in the + text. Information on the location of the block in the text, the + type of PEM block, and it's base64 and binary data contents. + + * parse text assumed to contain PEM data and return the binary + data. + + * test if a block of text is a PEM block + + * convert base64 text into a formatted PEM block + + * convert binary data into a formatted PEM block + + * access to the valid PEM types and their headers + +""" + +import base64 +import io +from keystone.common import base64utils +import re + +PEM_TYPE_TO_HEADER = { + u'cms': u'CMS', + u'dsa-private': u'DSA PRIVATE KEY', + u'dsa-public': u'DSA PUBLIC KEY', + u'ecdsa-public': u'ECDSA PUBLIC KEY', + u'ec-private': u'EC PRIVATE KEY', + u'pkcs7': u'PKCS7', + u'pkcs7-signed': u'PKCS', + u'pkcs8': u'ENCRYPTED PRIVATE KEY', + u'private-key': u'PRIVATE KEY', + u'public-key': u'PUBLIC KEY', + u'rsa-private': u'RSA PRIVATE KEY', + u'rsa-public': u'RSA PUBLIC KEY', + u'cert': u'CERTIFICATE', + u'crl': u'X509 CRL', + u'cert-pair': u'CERTIFICATE PAIR', + u'csr': u'CERTIFICATE REQUEST', +} + +# This is not a 1-to-1 reverse map of PEM_TYPE_TO_HEADER +# because it includes deprecated headers that map to 1 pem_type. +PEM_HEADER_TO_TYPE = { + u'CMS': u'cms', + u'DSA PRIVATE KEY': u'dsa-private', + u'DSA PUBLIC KEY': u'dsa-public', + u'ECDSA PUBLIC KEY': u'ecdsa-public', + u'EC PRIVATE KEY': u'ec-private', + u'PKCS7': u'pkcs7', + u'PKCS': u'pkcs7-signed', + u'ENCRYPTED PRIVATE KEY': u'pkcs8', + u'PRIVATE KEY': u'private-key', + u'PUBLIC KEY': u'public-key', + u'RSA PRIVATE KEY': u'rsa-private', + u'RSA PUBLIC KEY': u'rsa-public', + u'CERTIFICATE': u'cert', + u'X509 CERTIFICATE': u'cert', + u'CERTIFICATE PAIR': u'cert-pair', + u'X509 CRL': u'crl', + u'CERTIFICATE REQUEST': u'csr', + u'NEW CERTIFICATE REQUEST': u'csr', +} + +# List of valid pem_types +pem_types = sorted(PEM_TYPE_TO_HEADER.keys()) + +# List of valid pem_headers +pem_headers = sorted(PEM_TYPE_TO_HEADER.values()) + +_pem_begin_re = re.compile(r'^-{5}BEGIN\s+([^-]+)-{5}\s*$', re.MULTILINE) +_pem_end_re = re.compile(r'^-{5}END\s+([^-]+)-{5}\s*$', re.MULTILINE) + + +class PEMParseResult(object): + """Information returned when a PEM block is found in text. + + PEMParseResult contains information about a PEM block discovered + while parsing text. The following properties are defined: + + pem_type + A short hand name for the type of the PEM data, e.g. cert, + csr, crl, cms, key. Valid pem_types are listed in pem_types. + When the pem_type is set the pem_header is updated to match it. + + pem_header + The text following '-----BEGIN ' in the PEM header. + Common examples are: + + -----BEGIN CERTIFICATE----- + -----BEGIN CMS----- + + Thus the pem_header would be CERTIFICATE and CMS respectively. + When the pem_header is set the pem_type is updated to match it. + + pem_start, pem_end + The beginning and ending positions of the PEM block + including the PEM header and footer. + + base64_start, base64_end + The beginning and ending positions of the base64 data + contained inside the PEM header and footer. Includes trailing + new line + + binary_data + The decoded base64 data. None if not decoded. + + """ + + def __init__(self, pem_type=None, pem_header=None, + pem_start=None, pem_end=None, + base64_start=None, base64_end=None, + binary_data=None): + + self._pem_type = None + self._pem_header = None + + if pem_type is not None: + self.pem_type = pem_type + + if pem_header is not None: + self.pem_header = pem_header + + self.pem_start = pem_start + self.pem_end = pem_end + self.base64_start = base64_start + self.base64_end = base64_end + self.binary_data = binary_data + + @property + def pem_type(self): + return self._pem_type + + @pem_type.setter + def pem_type(self, pem_type): + if pem_type is None: + self._pem_type = None + self._pem_header = None + else: + pem_header = PEM_TYPE_TO_HEADER.get(pem_type) + if pem_header is None: + raise ValueError(_('unknown pem_type "%(pem_type)s", ' + 'valid types are: %(valid_pem_types)s') % + {'pem_type': pem_type, + 'valid_pem_types': ', '.join(pem_types)}) + self._pem_type = pem_type + self._pem_header = pem_header + + @property + def pem_header(self): + return self._pem_header + + @pem_header.setter + def pem_header(self, pem_header): + if pem_header is None: + self._pem_type = None + self._pem_header = None + else: + pem_type = PEM_HEADER_TO_TYPE.get(pem_header) + if pem_type is None: + raise ValueError(_('unknown pem header "%(pem_header)s", ' + 'valid headers are: ' + '%(valid_pem_headers)s') % + {'pem_header': pem_header, + 'valid_pem_headers': + ', '.join("'%s'" % + [x for x in pem_headers])}) + + self._pem_type = pem_type + self._pem_header = pem_header + +#------------------------------------------------------------------------------ + + +def pem_search(text, start=0): + """Search for a block of PEM formatted data + + Search for a PEM block in a text string. The search begins at + start. If a PEM block is found a PEMParseResult object is + returned, otherwise if no PEM block is found None is returned. + + If the pem_type is not the same in both the header and footer + a ValueError is raised. + + The start and end positions are suitable for use as slices into + the text. To search for multiple PEM blocks pass pem_end as the + start position for the next iteration. Terminate the iteration + when None is returned. Example: + + start = 0 + while True: + block = pem_search(text, start) + if block is None: + break + base64_data = text[block.base64_start : block.base64_end] + start = block.pem_end + + :param text: the text to search for PEM blocks + :type text: string + :param start: the position in text to start searching from (default: 0) + :type start: int + :returns: PEMParseResult or None if not found + :raises: ValueError + """ + + match = _pem_begin_re.search(text, pos=start) + if match: + pem_start = match.start() + begin_text = match.group(0) + base64_start = min(len(text), match.end() + 1) + begin_pem_header = match.group(1).strip() + + match = _pem_end_re.search(text, pos=base64_start) + if match: + pem_end = min(len(text), match.end() + 1) + base64_end = match.start() + end_pem_header = match.group(1).strip() + else: + raise ValueError(_('failed to find end matching "%s"') % + begin_text) + + if begin_pem_header != end_pem_header: + raise ValueError(_('beginning & end PEM headers do not match ' + '(%(begin_pem_header)s' + '!= ' + '%(end_pem_header)s)') % + {'begin_pem_header': begin_pem_header, + 'end_pem_header': end_pem_header}) + else: + return None + + result = PEMParseResult(pem_header=begin_pem_header, + pem_start=pem_start, pem_end=pem_end, + base64_start=base64_start, base64_end=base64_end) + + return result + + +def parse_pem(text, pem_type=None, max_items=None): + """Scan text for PEM data, return list of PEM items + + The input text is scanned for PEM blocks, for each one found a + PEMParseResult is contructed and added to the return list. + + pem_type operates as a filter on the type of PEM desired. If + pem_type is specified only those PEM blocks which match will be + included. The pem_type is a logical name, not the actual text in + the pem header (e.g. 'cert'). If the pem_type is None all PEM + blocks are returned. + + If max_items is specified the result is limited to that number of + items. + + The return value is a list of PEMParseResult objects. The + PEMParseResult provides complete information about the PEM block + including the decoded binary data for the PEM block. The list is + ordered in the same order as found in the text. + + Examples: + + # Get all certs + certs = parse_pem(text, 'cert') + + # Get the first cert + try: + binary_cert = parse_pem(text, 'cert', 1)[0].binary_data + except IndexError: + raise ValueError('no cert found') + + :param text: The text to search for PEM blocks + :type text: string + :param pem_type: Only return data for this pem_type. + Valid types are: csr, cert, crl, cms, key. + If pem_type is None no filtering is performed. + (default: None) + :type pem_type: string or None + :param max_items: Limit the number of blocks returned. (default: None) + :type max_items: int or None + :return: List of PEMParseResult, one for each PEM block found + :raises: ValueError, InvalidBase64Error + """ + + pem_blocks = [] + start = 0 + + while True: + block = pem_search(text, start) + if block is None: + break + start = block.pem_end + if pem_type is None: + pem_blocks.append(block) + else: + try: + if block.pem_type == pem_type: + pem_blocks.append(block) + except KeyError: + raise ValueError(_('unknown pem_type: "%s"') % (pem_type)) + + if max_items is not None and len(pem_blocks) >= max_items: + break + + for block in pem_blocks: + base64_data = text[block.base64_start:block.base64_end] + try: + binary_data = base64.b64decode(base64_data) + except Exception as e: + block.binary_data = None + raise base64utils.InvalidBase64Error( + _('failed to base64 decode %(pem_type)s PEM at position' + '%(position)d: %(err_msg)s') % + {'pem_type': block.pem_type, + 'position': block.pem_start, + 'err_msg': str(e)}) + else: + block.binary_data = binary_data + + return pem_blocks + + +def get_pem_data(text, pem_type='cert'): + """Scan text for PEM data, return binary contents + + The input text is scanned for a PEM block which matches the pem_type. + If found the binary data contained in the PEM block is returned. + If no PEM block is found or it does not match the specified pem type + None is returned. + + :param text: The text to search for the PEM block + :type text: string + :param pem_type: Only return data for this pem_type. + Valid types are: csr, cert, crl, cms, key. + (default: 'cert') + :type pem_type: string + :return: binary data or None if not found. + """ + + blocks = parse_pem(text, pem_type, 1) + if not blocks: + return None + return blocks[0].binary_data + + +def is_pem(text, pem_type='cert'): + """Does this text contain a PEM block. + + Check for the existence of a PEM formatted block in the + text, if one is found verify it's contents can be base64 + decoded, if so return True. Return False otherwise. + + :param text: The text to search for PEM blocks + :type text: string + :param pem_type: Only return data for this pem_type. + Valid types are: csr, cert, crl, cms, key. + (default: 'cert') + :type pem_type: string + :returns: bool -- True if text contains PEM matching the pem_type, + False otherwise. + """ + + try: + pem_blocks = parse_pem(text, pem_type, max_items=1) + except base64utils.InvalidBase64Error: + return False + + if pem_blocks: + return True + else: + return False + + +def base64_to_pem(base64_text, pem_type='cert'): + """Format string of base64 text into PEM format + + Input is assumed to consist only of members of the base64 alphabet + (i.e no whitepace). Use one of the filter functions from + base64utils to assure the input is clean + (i.e. strip_whitespace()). + + :param base64_text: text containing ONLY base64 alphabet + characters to be inserted into PEM output. + :type base64_text: string + :param pem_type: Produce a PEM block for this type. + Valid types are: csr, cert, crl, cms, key. + (default: 'cert') + :type pem_type: string + :returns: string -- PEM formatted text + + + """ + pem_header = PEM_TYPE_TO_HEADER[pem_type] + buf = io.StringIO() + + buf.write(u'-----BEGIN %s-----' % pem_header) + buf.write(u'\n') + + for line in base64utils.base64_wrap_iter(base64_text, width=64): + buf.write(line) + buf.write(u'\n') + + buf.write(u'-----END %s-----' % pem_header) + buf.write(u'\n') + + text = buf.getvalue() + buf.close() + return text + + +def binary_to_pem(binary_data, pem_type='cert'): + """Format binary data into PEM format + + Example: + + # get the certificate binary data in DER format + der_data = certificate.der + # convert the DER binary data into a PEM + pem = binary_to_pem(der_data, 'cert') + + + :param binary_data: binary data to encapsulate into PEM + :type binary_data: buffer + :param pem_type: Produce a PEM block for this type. + Valid types are: csr, cert, crl, cms, key. + (default: 'cert') + :type pem_type: string + :returns: string -- PEM formatted text + + """ + base64_text = base64.b64encode(binary_data) + return base64_to_pem(base64_text, pem_type) diff --git a/keystone/tests/test_base64utils.py b/keystone/tests/test_base64utils.py new file mode 100644 index 0000000000..27da716b71 --- /dev/null +++ b/keystone/tests/test_base64utils.py @@ -0,0 +1,196 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright 2013 Red Hat, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from keystone.common import base64utils +from keystone import tests + +base64_alphabet = ('ABCDEFGHIJKLMNOPQRSTUVWXYZ' + 'abcdefghijklmnopqrstuvwxyz' + '0123456789' + '+/=') # includes pad char + +base64url_alphabet = ('ABCDEFGHIJKLMNOPQRSTUVWXYZ' + 'abcdefghijklmnopqrstuvwxyz' + '0123456789' + '-_=') # includes pad char + + +class TestValid(tests.TestCase): + def test_valid_base64(self): + self.assertTrue(base64utils.is_valid_base64('+/==')) + self.assertTrue(base64utils.is_valid_base64('+/+=')) + self.assertTrue(base64utils.is_valid_base64('+/+/')) + + self.assertFalse(base64utils.is_valid_base64('-_==')) + self.assertFalse(base64utils.is_valid_base64('-_-=')) + self.assertFalse(base64utils.is_valid_base64('-_-_')) + + self.assertTrue(base64utils.is_valid_base64('abcd')) + self.assertFalse(base64utils.is_valid_base64('abcde')) + self.assertFalse(base64utils.is_valid_base64('abcde==')) + self.assertFalse(base64utils.is_valid_base64('abcdef')) + self.assertTrue(base64utils.is_valid_base64('abcdef==')) + self.assertFalse(base64utils.is_valid_base64('abcdefg')) + self.assertTrue(base64utils.is_valid_base64('abcdefg=')) + self.assertTrue(base64utils.is_valid_base64('abcdefgh')) + + self.assertFalse(base64utils.is_valid_base64('-_==')) + + def test_valid_base64url(self): + self.assertFalse(base64utils.is_valid_base64url('+/==')) + self.assertFalse(base64utils.is_valid_base64url('+/+=')) + self.assertFalse(base64utils.is_valid_base64url('+/+/')) + + self.assertTrue(base64utils.is_valid_base64url('-_==')) + self.assertTrue(base64utils.is_valid_base64url('-_-=')) + self.assertTrue(base64utils.is_valid_base64url('-_-_')) + + self.assertTrue(base64utils.is_valid_base64url('abcd')) + self.assertFalse(base64utils.is_valid_base64url('abcde')) + self.assertFalse(base64utils.is_valid_base64url('abcde==')) + self.assertFalse(base64utils.is_valid_base64url('abcdef')) + self.assertTrue(base64utils.is_valid_base64url('abcdef==')) + self.assertFalse(base64utils.is_valid_base64url('abcdefg')) + self.assertTrue(base64utils.is_valid_base64url('abcdefg=')) + self.assertTrue(base64utils.is_valid_base64url('abcdefgh')) + + self.assertTrue(base64utils.is_valid_base64url('-_==')) + + +class TestBase64Padding(tests.TestCase): + + def test_filter(self): + self.assertEqual(base64utils.filter_formatting(''), '') + self.assertEqual(base64utils.filter_formatting(' '), '') + self.assertEqual(base64utils.filter_formatting('a'), 'a') + self.assertEqual(base64utils.filter_formatting(' a'), 'a') + self.assertEqual(base64utils.filter_formatting('a '), 'a') + self.assertEqual(base64utils.filter_formatting('ab'), 'ab') + self.assertEqual(base64utils.filter_formatting(' ab'), 'ab') + self.assertEqual(base64utils.filter_formatting('ab '), 'ab') + self.assertEqual(base64utils.filter_formatting('a b'), 'ab') + self.assertEqual(base64utils.filter_formatting(' a b'), 'ab') + self.assertEqual(base64utils.filter_formatting('a b '), 'ab') + self.assertEqual(base64utils.filter_formatting('a\nb\n '), 'ab') + + text = ('ABCDEFGHIJKLMNOPQRSTUVWXYZ' + 'abcdefghijklmnopqrstuvwxyz' + '0123456789' + '+/=') + self.assertEqual(base64_alphabet, + base64utils.filter_formatting(text)) + + text = (' ABCDEFGHIJKLMNOPQRSTUVWXYZ\n' + ' abcdefghijklmnopqrstuvwxyz\n' + '\t\f\r' + ' 0123456789\n' + ' +/=') + self.assertEqual(base64_alphabet, + base64utils.filter_formatting(text)) + self.assertEqual(base64url_alphabet, + base64utils.base64_to_base64url(base64_alphabet)) + + text = ('ABCDEFGHIJKLMNOPQRSTUVWXYZ' + 'abcdefghijklmnopqrstuvwxyz' + '0123456789' + '-_=') + self.assertEqual(base64url_alphabet, + base64utils.filter_formatting(text)) + + text = (' ABCDEFGHIJKLMNOPQRSTUVWXYZ\n' + ' abcdefghijklmnopqrstuvwxyz\n' + '\t\f\r' + ' 0123456789\n' + '-_=') + self.assertEqual(base64url_alphabet, + base64utils.filter_formatting(text)) + + def test_alphabet_conversion(self): + self.assertEqual(base64url_alphabet, + base64utils.base64_to_base64url(base64_alphabet)) + + self.assertEqual(base64_alphabet, + base64utils.base64url_to_base64(base64url_alphabet)) + + def test_is_padded(self): + self.assertTrue(base64utils.base64_is_padded('ABCD')) + self.assertTrue(base64utils.base64_is_padded('ABC=')) + self.assertTrue(base64utils.base64_is_padded('AB==')) + + self.assertTrue(base64utils.base64_is_padded('1234ABCD')) + self.assertTrue(base64utils.base64_is_padded('1234ABC=')) + self.assertTrue(base64utils.base64_is_padded('1234AB==')) + + self.assertFalse(base64utils.base64_is_padded('ABC')) + self.assertFalse(base64utils.base64_is_padded('AB')) + self.assertFalse(base64utils.base64_is_padded('A')) + self.assertFalse(base64utils.base64_is_padded('')) + + self.assertRaises(base64utils.InvalidBase64Error, + base64utils.base64_is_padded, '=') + + self.assertRaises(base64utils.InvalidBase64Error, + base64utils.base64_is_padded, 'AB=C') + + self.assertRaises(base64utils.InvalidBase64Error, + base64utils.base64_is_padded, 'AB=') + + self.assertRaises(base64utils.InvalidBase64Error, + base64utils.base64_is_padded, 'ABCD=') + + def test_strip_padding(self): + self.assertEqual(base64utils.base64_strip_padding('ABCD'), 'ABCD') + self.assertEqual(base64utils.base64_strip_padding('ABC='), 'ABC') + self.assertEqual(base64utils.base64_strip_padding('AB=='), 'AB') + + def test_assure_padding(self): + self.assertEqual(base64utils.base64_assure_padding('ABCD'), 'ABCD') + self.assertEqual(base64utils.base64_assure_padding('ABC'), 'ABC=') + self.assertEqual(base64utils.base64_assure_padding('ABC='), 'ABC=') + self.assertEqual(base64utils.base64_assure_padding('AB'), 'AB==') + self.assertEqual(base64utils.base64_assure_padding('AB=='), 'AB==') + + def test_base64_percent_encoding(self): + self.assertEqual(base64utils.base64url_percent_encode('ABCD'), 'ABCD') + self.assertEqual(base64utils.base64url_percent_encode('ABC='), + 'ABC%3D') + self.assertEqual(base64utils.base64url_percent_encode('AB=='), + 'AB%3D%3D') + + self.assertEqual(base64utils.base64url_percent_decode('ABCD'), 'ABCD') + self.assertEqual(base64utils.base64url_percent_decode('ABC%3D'), + 'ABC=') + self.assertEqual(base64utils.base64url_percent_decode('AB%3D%3D'), + 'AB==') + + +class TestTextWrap(tests.TestCase): + + def test_wrapping(self): + raw_text = 'abcdefgh' + wrapped_text = 'abc\ndef\ngh\n' + + self.assertEqual(base64utils.base64_wrap(raw_text, width=3), + wrapped_text) + + t = '\n'.join(base64utils.base64_wrap_iter(raw_text, width=3)) + '\n' + self.assertEqual(t, wrapped_text) + + raw_text = 'abcdefgh' + wrapped_text = 'abcd\nefgh\n' + + self.assertEqual(base64utils.base64_wrap(raw_text, width=4), + wrapped_text) diff --git a/keystone/tests/test_pemutils.py b/keystone/tests/test_pemutils.py new file mode 100644 index 0000000000..a1c194f133 --- /dev/null +++ b/keystone/tests/test_pemutils.py @@ -0,0 +1,336 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright 2013 Red Hat, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import base64 +from keystone.common import pemutils +from keystone import tests + + +# List of 2-tuples, (pem_type, pem_header) +headers = pemutils.PEM_TYPE_TO_HEADER.items() + + +def make_data(size, offset=0): + return ''.join([chr(x % 255) for x in xrange(offset, size + offset)]) + + +def make_base64_from_data(data): + return base64.b64encode(data) + + +def wrap_base64(base64_text): + wrapped_text = '\n'.join([base64_text[x:x + 64] + for x in xrange(0, len(base64_text), 64)]) + wrapped_text += '\n' + return wrapped_text + + +def make_pem(header, data): + base64_text = make_base64_from_data(data) + wrapped_text = wrap_base64(base64_text) + + result = '-----BEGIN %s-----\n' % header + result += wrapped_text + result += '-----END %s-----\n' % header + + return result + + +class PEM(object): + """PEM text and it's associated data broken out, used for testing. + + """ + def __init__(self, pem_header='CERTIFICATE', pem_type='cert', + data_size=70, data_offset=0): + self.pem_header = pem_header + self.pem_type = pem_type + self.data_size = data_size + self.data_offset = data_offset + self.data = make_data(self.data_size, self.data_offset) + self.base64_text = make_base64_from_data(self.data) + self.wrapped_base64 = wrap_base64(self.base64_text) + self.pem_text = make_pem(self.pem_header, self.data) + + +class TestPEMParseResult(tests.TestCase): + + def test_pem_types(self): + for pem_type in pemutils.pem_types: + pem_header = pemutils.PEM_TYPE_TO_HEADER[pem_type] + r = pemutils.PEMParseResult(pem_type=pem_type) + self.assertEqual(pem_type, r.pem_type) + self.assertEqual(pem_header, r.pem_header) + + pem_type = 'xxx' + self.assertRaises(ValueError, + pemutils.PEMParseResult, pem_type=pem_type) + + def test_pem_headers(self): + for pem_header in pemutils.pem_headers: + pem_type = pemutils.PEM_HEADER_TO_TYPE[pem_header] + r = pemutils.PEMParseResult(pem_header=pem_header) + self.assertEqual(pem_type, r.pem_type) + self.assertEqual(pem_header, r.pem_header) + + pem_header = 'xxx' + self.assertRaises(ValueError, + pemutils.PEMParseResult, pem_header=pem_header) + + +class TestPEMParse(tests.TestCase): + def test_parse_none(self): + text = '' + text += 'bla bla\n' + text += 'yada yada yada\n' + text += 'burfl blatz bingo\n' + + parse_results = pemutils.parse_pem(text) + self.assertEqual(len(parse_results), 0) + + self.assertEqual(pemutils.is_pem(text), False) + + def test_parse_invalid(self): + p = PEM(pem_type='xxx', + pem_header='XXX') + text = p.pem_text + + self.assertRaises(ValueError, + pemutils.parse_pem, text) + + def test_parse_one(self): + data_size = 70 + count = len(headers) + pems = [] + + for i in xrange(count): + pems.append(PEM(pem_type=headers[i][0], + pem_header=headers[i][1], + data_size=data_size + i, + data_offset=i)) + + for i in xrange(count): + p = pems[i] + text = p.pem_text + + parse_results = pemutils.parse_pem(text) + self.assertEqual(len(parse_results), 1) + + r = parse_results[0] + self.assertEqual(p.pem_type, r.pem_type) + self.assertEqual(p.pem_header, r.pem_header) + self.assertEqual(p.pem_text, + text[r.pem_start:r.pem_end]) + self.assertEqual(p.wrapped_base64, + text[r.base64_start:r.base64_end]) + self.assertEqual(p.data, r.binary_data) + + def test_parse_one_embedded(self): + p = PEM(data_offset=0) + text = '' + text += 'bla bla\n' + text += 'yada yada yada\n' + text += p.pem_text + text += 'burfl blatz bingo\n' + + parse_results = pemutils.parse_pem(text) + self.assertEqual(len(parse_results), 1) + + r = parse_results[0] + self.assertEqual(p.pem_type, r.pem_type) + self.assertEqual(p.pem_header, r.pem_header) + self.assertEqual(p.pem_text, + text[r.pem_start:r.pem_end]) + self.assertEqual(p.wrapped_base64, + text[r.base64_start: r.base64_end]) + self.assertEqual(p.data, r.binary_data) + + def test_parse_multple(self): + data_size = 70 + count = len(headers) + pems = [] + text = '' + + for i in xrange(count): + pems.append(PEM(pem_type=headers[i][0], + pem_header=headers[i][1], + data_size=data_size + i, + data_offset=i)) + + for i in xrange(count): + text += pems[i].pem_text + + parse_results = pemutils.parse_pem(text) + self.assertEqual(len(parse_results), count) + + for i in xrange(count): + r = parse_results[i] + p = pems[i] + + self.assertEqual(p.pem_type, r.pem_type) + self.assertEqual(p.pem_header, r.pem_header) + self.assertEqual(p.pem_text, + text[r.pem_start:r.pem_end]) + self.assertEqual(p.wrapped_base64, + text[r.base64_start: r.base64_end]) + self.assertEqual(p.data, r.binary_data) + + def test_parse_multple_find_specific(self): + data_size = 70 + count = len(headers) + pems = [] + text = '' + + for i in xrange(count): + pems.append(PEM(pem_type=headers[i][0], + pem_header=headers[i][1], + data_size=data_size + i, + data_offset=i)) + + for i in xrange(count): + text += pems[i].pem_text + + for i in xrange(count): + parse_results = pemutils.parse_pem(text, pem_type=headers[i][0]) + self.assertEqual(len(parse_results), 1) + + r = parse_results[0] + p = pems[i] + + self.assertEqual(p.pem_type, r.pem_type) + self.assertEqual(p.pem_header, r.pem_header) + self.assertEqual(p.pem_text, + text[r.pem_start:r.pem_end]) + self.assertEqual(p.wrapped_base64, + text[r.base64_start:r.base64_end]) + self.assertEqual(p.data, r.binary_data) + + def test_parse_multple_embedded(self): + data_size = 75 + count = len(headers) + pems = [] + text = '' + + for i in xrange(count): + pems.append(PEM(pem_type=headers[i][0], + pem_header=headers[i][1], + data_size=data_size + i, + data_offset=i)) + + for i in xrange(count): + text += 'bla bla\n' + text += 'yada yada yada\n' + text += pems[i].pem_text + text += 'burfl blatz bingo\n' + + parse_results = pemutils.parse_pem(text) + self.assertEqual(len(parse_results), count) + + for i in xrange(count): + r = parse_results[i] + p = pems[i] + + self.assertEqual(p.pem_type, r.pem_type) + self.assertEqual(p.pem_header, r.pem_header) + self.assertEqual(p.pem_text, + text[r.pem_start:r.pem_end]) + self.assertEqual(p.wrapped_base64, + text[r.base64_start:r.base64_end]) + self.assertEqual(p.data, r.binary_data) + + def test_get_pem_data_none(self): + text = '' + text += 'bla bla\n' + text += 'yada yada yada\n' + text += 'burfl blatz bingo\n' + + data = pemutils.get_pem_data(text) + self.assertEqual(None, data) + + def test_get_pem_data_invalid(self): + p = PEM(pem_type='xxx', + pem_header='XXX') + text = p.pem_text + + self.assertRaises(ValueError, + pemutils.get_pem_data, text) + + def test_get_pem_data(self): + data_size = 70 + count = len(headers) + pems = [] + + for i in xrange(count): + pems.append(PEM(pem_type=headers[i][0], + pem_header=headers[i][1], + data_size=data_size + i, + data_offset=i)) + + for i in xrange(count): + p = pems[i] + text = p.pem_text + + data = pemutils.get_pem_data(text, p.pem_type) + self.assertEqual(p.data, data) + + def test_is_pem(self): + data_size = 70 + count = len(headers) + pems = [] + + for i in xrange(count): + pems.append(PEM(pem_type=headers[i][0], + pem_header=headers[i][1], + data_size=data_size + i, + data_offset=i)) + + for i in xrange(count): + p = pems[i] + text = p.pem_text + self.assertTrue(pemutils.is_pem(text, pem_type=p.pem_type)) + self.assertFalse(pemutils.is_pem(text, + pem_type=p.pem_type + 'xxx')) + + def test_base64_to_pem(self): + data_size = 70 + count = len(headers) + pems = [] + + for i in xrange(count): + pems.append(PEM(pem_type=headers[i][0], + pem_header=headers[i][1], + data_size=data_size + i, + data_offset=i)) + + for i in xrange(count): + p = pems[i] + pem = pemutils.base64_to_pem(p.base64_text, p.pem_type) + self.assertEqual(pemutils.get_pem_data(pem, p.pem_type), p.data) + + def test_binary_to_pem(self): + data_size = 70 + count = len(headers) + pems = [] + + for i in xrange(count): + pems.append(PEM(pem_type=headers[i][0], + pem_header=headers[i][1], + data_size=data_size + i, + data_offset=i)) + + for i in xrange(count): + p = pems[i] + pem = pemutils.binary_to_pem(p.data, p.pem_type) + self.assertEqual(pemutils.get_pem_data(pem, p.pem_type), p.data)