No longer needed since the cryptography 1.0 release has fixed the slow import (due to changes in the way cffi was used).
171 lines
4.8 KiB
Python
171 lines
4.8 KiB
Python
# -*- coding: utf-8 -*-
|
|
#
|
|
# Copyright 2014 Google Inc. All rights reserved.
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
# you may not use this file except in compliance with the License.
|
|
# You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
# See the License for the specific language governing permissions and
|
|
# limitations under the License.
|
|
"""Crypto-related routines for oauth2client."""
|
|
|
|
import json
|
|
import logging
|
|
import time
|
|
|
|
from oauth2client._helpers import _json_encode
|
|
from oauth2client._helpers import _urlsafe_b64decode
|
|
from oauth2client._helpers import _urlsafe_b64encode
|
|
|
|
|
|
CLOCK_SKEW_SECS = 300 # 5 minutes in seconds
|
|
AUTH_TOKEN_LIFETIME_SECS = 300 # 5 minutes in seconds
|
|
MAX_TOKEN_LIFETIME_SECS = 86400 # 1 day in seconds
|
|
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class AppIdentityError(Exception):
|
|
pass
|
|
|
|
|
|
try:
|
|
from oauth2client._openssl_crypt import OpenSSLVerifier
|
|
from oauth2client._openssl_crypt import OpenSSLSigner
|
|
from oauth2client._openssl_crypt import pkcs12_key_as_pem
|
|
except ImportError:
|
|
OpenSSLVerifier = None
|
|
OpenSSLSigner = None
|
|
def pkcs12_key_as_pem(*args, **kwargs):
|
|
raise NotImplementedError('pkcs12_key_as_pem requires OpenSSL.')
|
|
|
|
|
|
try:
|
|
from oauth2client._pycrypto_crypt import PyCryptoVerifier
|
|
from oauth2client._pycrypto_crypt import PyCryptoSigner
|
|
except ImportError:
|
|
PyCryptoVerifier = None
|
|
PyCryptoSigner = None
|
|
|
|
|
|
if OpenSSLSigner:
|
|
Signer = OpenSSLSigner
|
|
Verifier = OpenSSLVerifier
|
|
elif PyCryptoSigner:
|
|
Signer = PyCryptoSigner
|
|
Verifier = PyCryptoVerifier
|
|
else:
|
|
raise ImportError('No encryption library found. Please install either '
|
|
'PyOpenSSL, or PyCrypto 2.6 or later')
|
|
|
|
|
|
def make_signed_jwt(signer, payload):
|
|
"""Make a signed JWT.
|
|
|
|
See http://self-issued.info/docs/draft-jones-json-web-token.html.
|
|
|
|
Args:
|
|
signer: crypt.Signer, Cryptographic signer.
|
|
payload: dict, Dictionary of data to convert to JSON and then sign.
|
|
|
|
Returns:
|
|
string, The JWT for the payload.
|
|
"""
|
|
header = {'typ': 'JWT', 'alg': 'RS256'}
|
|
|
|
segments = [
|
|
_urlsafe_b64encode(_json_encode(header)),
|
|
_urlsafe_b64encode(_json_encode(payload)),
|
|
]
|
|
signing_input = '.'.join(segments)
|
|
|
|
signature = signer.sign(signing_input)
|
|
segments.append(_urlsafe_b64encode(signature))
|
|
|
|
logger.debug(str(segments))
|
|
|
|
return '.'.join(segments)
|
|
|
|
|
|
def verify_signed_jwt_with_certs(jwt, certs, audience):
|
|
"""Verify a JWT against public certs.
|
|
|
|
See http://self-issued.info/docs/draft-jones-json-web-token.html.
|
|
|
|
Args:
|
|
jwt: string, A JWT.
|
|
certs: dict, Dictionary where values of public keys in PEM format.
|
|
audience: string, The audience, 'aud', that this JWT should contain. If
|
|
None then the JWT's 'aud' parameter is not verified.
|
|
|
|
Returns:
|
|
dict, The deserialized JSON payload in the JWT.
|
|
|
|
Raises:
|
|
AppIdentityError if any checks are failed.
|
|
"""
|
|
segments = jwt.split('.')
|
|
|
|
if len(segments) != 3:
|
|
raise AppIdentityError('Wrong number of segments in token: %s' % jwt)
|
|
signed = '%s.%s' % (segments[0], segments[1])
|
|
|
|
signature = _urlsafe_b64decode(segments[2])
|
|
|
|
# Parse token.
|
|
json_body = _urlsafe_b64decode(segments[1])
|
|
try:
|
|
parsed = json.loads(json_body.decode('utf-8'))
|
|
except:
|
|
raise AppIdentityError('Can\'t parse token: %s' % json_body)
|
|
|
|
# Check signature.
|
|
verified = False
|
|
for pem in certs.values():
|
|
verifier = Verifier.from_string(pem, True)
|
|
if verifier.verify(signed, signature):
|
|
verified = True
|
|
break
|
|
if not verified:
|
|
raise AppIdentityError('Invalid token signature: %s' % jwt)
|
|
|
|
# Check creation timestamp.
|
|
iat = parsed.get('iat')
|
|
if iat is None:
|
|
raise AppIdentityError('No iat field in token: %s' % json_body)
|
|
earliest = iat - CLOCK_SKEW_SECS
|
|
|
|
# Check expiration timestamp.
|
|
now = int(time.time())
|
|
exp = parsed.get('exp')
|
|
if exp is None:
|
|
raise AppIdentityError('No exp field in token: %s' % json_body)
|
|
if exp >= now + MAX_TOKEN_LIFETIME_SECS:
|
|
raise AppIdentityError('exp field too far in future: %s' % json_body)
|
|
latest = exp + CLOCK_SKEW_SECS
|
|
|
|
if now < earliest:
|
|
raise AppIdentityError('Token used too early, %d < %d: %s' %
|
|
(now, earliest, json_body))
|
|
if now > latest:
|
|
raise AppIdentityError('Token used too late, %d > %d: %s' %
|
|
(now, latest, json_body))
|
|
|
|
# Check audience.
|
|
if audience is not None:
|
|
aud = parsed.get('aud')
|
|
if aud is None:
|
|
raise AppIdentityError('No aud field in token: %s' % json_body)
|
|
if aud != audience:
|
|
raise AppIdentityError('Wrong recipient, %s != %s: %s' %
|
|
(aud, audience, json_body))
|
|
|
|
return parsed
|