Files
deb-python-oauth2client/oauth2client/crypt.py
Danny Hermes 613ab465d4 Deferring OpenSSL import until usage.
This is to speed up import times. In OpenSSL 0.14 the import takes
0.5 seconds due to cffi on-demand build of extensions in the
cryptography library.

For classes and functions which are conditionally defined based on
the existence of OpenSSL.crypto, we check that the module
exists (but don't import it) using imp.find_module.
2015-04-15 15:44:30 -07:00

439 lines
12 KiB
Python

# -*- coding: utf-8 -*-
#
# Copyright 2014 Google Inc. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Crypto-related routines for oauth2client."""
import base64
import imp
import json
import logging
import os
import sys
import time
import six
CLOCK_SKEW_SECS = 300 # 5 minutes in seconds
AUTH_TOKEN_LIFETIME_SECS = 300 # 5 minutes in seconds
MAX_TOKEN_LIFETIME_SECS = 86400 # 1 day in seconds
logger = logging.getLogger(__name__)
class AppIdentityError(Exception):
pass
try:
_, _package_dir, _ = imp.find_module('OpenSSL')
if not os.path.isfile(os.path.join(_package_dir, 'crypto.py')):
raise ImportError('No module named OpenSSL')
class OpenSSLVerifier(object):
"""Verifies the signature on a message."""
def __init__(self, pubkey):
"""Constructor.
Args:
pubkey, OpenSSL.crypto.PKey, The public key to verify with.
"""
self._pubkey = pubkey
def verify(self, message, signature):
"""Verifies a message against a signature.
Args:
message: string, The message to verify.
signature: string, The signature on the message.
Returns:
True if message was signed by the private key associated with the public
key that this object was constructed with.
"""
from OpenSSL import crypto
try:
if isinstance(message, six.text_type):
message = message.encode('utf-8')
crypto.verify(self._pubkey, signature, message, 'sha256')
return True
except:
return False
@staticmethod
def from_string(key_pem, is_x509_cert):
"""Construct a Verified instance from a string.
Args:
key_pem: string, public key in PEM format.
is_x509_cert: bool, True if key_pem is an X509 cert, otherwise it is
expected to be an RSA key in PEM format.
Returns:
Verifier instance.
Raises:
OpenSSL.crypto.Error if the key_pem can't be parsed.
"""
from OpenSSL import crypto
if is_x509_cert:
pubkey = crypto.load_certificate(crypto.FILETYPE_PEM, key_pem)
else:
pubkey = crypto.load_privatekey(crypto.FILETYPE_PEM, key_pem)
return OpenSSLVerifier(pubkey)
class OpenSSLSigner(object):
"""Signs messages with a private key."""
def __init__(self, pkey):
"""Constructor.
Args:
pkey, OpenSSL.crypto.PKey (or equiv), The private key to sign with.
"""
self._key = pkey
def sign(self, message):
"""Signs a message.
Args:
message: bytes, Message to be signed.
Returns:
string, The signature of the message for the given key.
"""
from OpenSSL import crypto
if isinstance(message, six.text_type):
message = message.encode('utf-8')
return crypto.sign(self._key, message, 'sha256')
@staticmethod
def from_string(key, password=b'notasecret'):
"""Construct a Signer instance from a string.
Args:
key: string, private key in PKCS12 or PEM format.
password: string, password for the private key file.
Returns:
Signer instance.
Raises:
OpenSSL.crypto.Error if the key can't be parsed.
"""
from OpenSSL import crypto
parsed_pem_key = _parse_pem_key(key)
if parsed_pem_key:
pkey = crypto.load_privatekey(crypto.FILETYPE_PEM, parsed_pem_key)
else:
if isinstance(password, six.text_type):
password = password.encode('utf-8')
pkey = crypto.load_pkcs12(key, password).get_privatekey()
return OpenSSLSigner(pkey)
def pkcs12_key_as_pem(private_key_text, private_key_password):
"""Convert the contents of a PKCS12 key to PEM using OpenSSL.
Args:
private_key_text: String. Private key.
private_key_password: String. Password for PKCS12.
Returns:
String. PEM contents of ``private_key_text``.
"""
from OpenSSL import crypto
decoded_body = base64.b64decode(private_key_text)
if isinstance(private_key_password, six.string_types):
private_key_password = private_key_password.encode('ascii')
pkcs12 = crypto.load_pkcs12(decoded_body, private_key_password)
return crypto.dump_privatekey(crypto.FILETYPE_PEM,
pkcs12.get_privatekey())
except ImportError:
OpenSSLVerifier = None
OpenSSLSigner = None
def pkcs12_key_as_pem(*args, **kwargs):
raise NotImplementedError('pkcs12_key_as_pem requires OpenSSL.')
try:
from Crypto.PublicKey import RSA
from Crypto.Hash import SHA256
from Crypto.Signature import PKCS1_v1_5
from Crypto.Util.asn1 import DerSequence
class PyCryptoVerifier(object):
"""Verifies the signature on a message."""
def __init__(self, pubkey):
"""Constructor.
Args:
pubkey, OpenSSL.crypto.PKey (or equiv), The public key to verify with.
"""
self._pubkey = pubkey
def verify(self, message, signature):
"""Verifies a message against a signature.
Args:
message: string, The message to verify.
signature: string, The signature on the message.
Returns:
True if message was signed by the private key associated with the public
key that this object was constructed with.
"""
try:
return PKCS1_v1_5.new(self._pubkey).verify(
SHA256.new(message), signature)
except:
return False
@staticmethod
def from_string(key_pem, is_x509_cert):
"""Construct a Verified instance from a string.
Args:
key_pem: string, public key in PEM format.
is_x509_cert: bool, True if key_pem is an X509 cert, otherwise it is
expected to be an RSA key in PEM format.
Returns:
Verifier instance.
"""
if is_x509_cert:
if isinstance(key_pem, six.text_type):
key_pem = key_pem.encode('ascii')
pemLines = key_pem.replace(b' ', b'').split()
certDer = _urlsafe_b64decode(b''.join(pemLines[1:-1]))
certSeq = DerSequence()
certSeq.decode(certDer)
tbsSeq = DerSequence()
tbsSeq.decode(certSeq[0])
pubkey = RSA.importKey(tbsSeq[6])
else:
pubkey = RSA.importKey(key_pem)
return PyCryptoVerifier(pubkey)
class PyCryptoSigner(object):
"""Signs messages with a private key."""
def __init__(self, pkey):
"""Constructor.
Args:
pkey, OpenSSL.crypto.PKey (or equiv), The private key to sign with.
"""
self._key = pkey
def sign(self, message):
"""Signs a message.
Args:
message: string, Message to be signed.
Returns:
string, The signature of the message for the given key.
"""
if isinstance(message, six.text_type):
message = message.encode('utf-8')
return PKCS1_v1_5.new(self._key).sign(SHA256.new(message))
@staticmethod
def from_string(key, password='notasecret'):
"""Construct a Signer instance from a string.
Args:
key: string, private key in PEM format.
password: string, password for private key file. Unused for PEM files.
Returns:
Signer instance.
Raises:
NotImplementedError if they key isn't in PEM format.
"""
parsed_pem_key = _parse_pem_key(key)
if parsed_pem_key:
pkey = RSA.importKey(parsed_pem_key)
else:
raise NotImplementedError(
'PKCS12 format is not supported by the PyCrypto library. '
'Try converting to a "PEM" '
'(openssl pkcs12 -in xxxxx.p12 -nodes -nocerts > privatekey.pem) '
'or using PyOpenSSL if native code is an option.')
return PyCryptoSigner(pkey)
except ImportError:
PyCryptoVerifier = None
PyCryptoSigner = None
if OpenSSLSigner:
Signer = OpenSSLSigner
Verifier = OpenSSLVerifier
elif PyCryptoSigner:
Signer = PyCryptoSigner
Verifier = PyCryptoVerifier
else:
raise ImportError('No encryption library found. Please install either '
'PyOpenSSL, or PyCrypto 2.6 or later')
def _parse_pem_key(raw_key_input):
"""Identify and extract PEM keys.
Determines whether the given key is in the format of PEM key, and extracts
the relevant part of the key if it is.
Args:
raw_key_input: The contents of a private key file (either PEM or PKCS12).
Returns:
string, The actual key if the contents are from a PEM file, or else None.
"""
offset = raw_key_input.find(b'-----BEGIN ')
if offset != -1:
return raw_key_input[offset:]
def _urlsafe_b64encode(raw_bytes):
if isinstance(raw_bytes, six.text_type):
raw_bytes = raw_bytes.encode('utf-8')
return base64.urlsafe_b64encode(raw_bytes).decode('ascii').rstrip('=')
def _urlsafe_b64decode(b64string):
# Guard against unicode strings, which base64 can't handle.
if isinstance(b64string, six.text_type):
b64string = b64string.encode('ascii')
padded = b64string + b'=' * (4 - len(b64string) % 4)
return base64.urlsafe_b64decode(padded)
def _json_encode(data):
return json.dumps(data, separators=(',', ':'))
def make_signed_jwt(signer, payload):
"""Make a signed JWT.
See http://self-issued.info/docs/draft-jones-json-web-token.html.
Args:
signer: crypt.Signer, Cryptographic signer.
payload: dict, Dictionary of data to convert to JSON and then sign.
Returns:
string, The JWT for the payload.
"""
header = {'typ': 'JWT', 'alg': 'RS256'}
segments = [
_urlsafe_b64encode(_json_encode(header)),
_urlsafe_b64encode(_json_encode(payload)),
]
signing_input = '.'.join(segments)
signature = signer.sign(signing_input)
segments.append(_urlsafe_b64encode(signature))
logger.debug(str(segments))
return '.'.join(segments)
def verify_signed_jwt_with_certs(jwt, certs, audience):
"""Verify a JWT against public certs.
See http://self-issued.info/docs/draft-jones-json-web-token.html.
Args:
jwt: string, A JWT.
certs: dict, Dictionary where values of public keys in PEM format.
audience: string, The audience, 'aud', that this JWT should contain. If
None then the JWT's 'aud' parameter is not verified.
Returns:
dict, The deserialized JSON payload in the JWT.
Raises:
AppIdentityError if any checks are failed.
"""
segments = jwt.split('.')
if len(segments) != 3:
raise AppIdentityError('Wrong number of segments in token: %s' % jwt)
signed = '%s.%s' % (segments[0], segments[1])
signature = _urlsafe_b64decode(segments[2])
# Parse token.
json_body = _urlsafe_b64decode(segments[1])
try:
parsed = json.loads(json_body.decode('utf-8'))
except:
raise AppIdentityError('Can\'t parse token: %s' % json_body)
# Check signature.
verified = False
for pem in certs.values():
verifier = Verifier.from_string(pem, True)
if verifier.verify(signed, signature):
verified = True
break
if not verified:
raise AppIdentityError('Invalid token signature: %s' % jwt)
# Check creation timestamp.
iat = parsed.get('iat')
if iat is None:
raise AppIdentityError('No iat field in token: %s' % json_body)
earliest = iat - CLOCK_SKEW_SECS
# Check expiration timestamp.
now = int(time.time())
exp = parsed.get('exp')
if exp is None:
raise AppIdentityError('No exp field in token: %s' % json_body)
if exp >= now + MAX_TOKEN_LIFETIME_SECS:
raise AppIdentityError('exp field too far in future: %s' % json_body)
latest = exp + CLOCK_SKEW_SECS
if now < earliest:
raise AppIdentityError('Token used too early, %d < %d: %s' %
(now, earliest, json_body))
if now > latest:
raise AppIdentityError('Token used too late, %d > %d: %s' %
(now, latest, json_body))
# Check audience.
if audience is not None:
aud = parsed.get('aud')
if aud is None:
raise AppIdentityError('No aud field in token: %s' % json_body)
if aud != audience:
raise AppIdentityError('Wrong recipient, %s != %s: %s' %
(aud, audience, json_body))
return parsed