# -*- coding: utf-8 -*- # # Copyright 2014 Google Inc. All rights reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. """Crypto-related routines for oauth2client.""" import imp import json import logging import os import time from oauth2client._helpers import _json_encode from oauth2client._helpers import _urlsafe_b64decode from oauth2client._helpers import _urlsafe_b64encode CLOCK_SKEW_SECS = 300 # 5 minutes in seconds AUTH_TOKEN_LIFETIME_SECS = 300 # 5 minutes in seconds MAX_TOKEN_LIFETIME_SECS = 86400 # 1 day in seconds logger = logging.getLogger(__name__) class AppIdentityError(Exception): pass def _TryOpenSslImport(): """Import OpenSSL, avoiding the explicit import where possible. Importing OpenSSL 0.14 can take up to 0.5s, which is a large price to pay at module import time. However, it's also possible for ``imp.find_module`` to fail to find the module, even when it's installed. (This is the case in various exotic environments, including some relevant for Google.) So we first try a fast-path, and fall back to the slow import as needed. Args: None Returns: None Raises: ImportError if OpenSSL is unavailable. """ try: _, _package_dir, _ = imp.find_module('OpenSSL') if not (os.path.isfile(os.path.join(_package_dir, 'crypto.py')) or os.path.isfile(os.path.join(_package_dir, 'crypto.so')) or os.path.isdir(os.path.join(_package_dir, 'crypto'))): raise ImportError('No module named OpenSSL.crypto') return except ImportError: import OpenSSL.crypto try: _TryOpenSslImport() from oauth2client._openssl_crypt import OpenSSLVerifier from oauth2client._openssl_crypt import OpenSSLSigner from oauth2client._openssl_crypt import pkcs12_key_as_pem except ImportError: OpenSSLVerifier = None OpenSSLSigner = None def pkcs12_key_as_pem(*args, **kwargs): raise NotImplementedError('pkcs12_key_as_pem requires OpenSSL.') try: from oauth2client._pycrypto_crypt import PyCryptoVerifier from oauth2client._pycrypto_crypt import PyCryptoSigner except ImportError: PyCryptoVerifier = None PyCryptoSigner = None if OpenSSLSigner: Signer = OpenSSLSigner Verifier = OpenSSLVerifier elif PyCryptoSigner: Signer = PyCryptoSigner Verifier = PyCryptoVerifier else: raise ImportError('No encryption library found. Please install either ' 'PyOpenSSL, or PyCrypto 2.6 or later') def make_signed_jwt(signer, payload): """Make a signed JWT. See http://self-issued.info/docs/draft-jones-json-web-token.html. Args: signer: crypt.Signer, Cryptographic signer. payload: dict, Dictionary of data to convert to JSON and then sign. Returns: string, The JWT for the payload. """ header = {'typ': 'JWT', 'alg': 'RS256'} segments = [ _urlsafe_b64encode(_json_encode(header)), _urlsafe_b64encode(_json_encode(payload)), ] signing_input = '.'.join(segments) signature = signer.sign(signing_input) segments.append(_urlsafe_b64encode(signature)) logger.debug(str(segments)) return '.'.join(segments) def verify_signed_jwt_with_certs(jwt, certs, audience): """Verify a JWT against public certs. See http://self-issued.info/docs/draft-jones-json-web-token.html. Args: jwt: string, A JWT. certs: dict, Dictionary where values of public keys in PEM format. audience: string, The audience, 'aud', that this JWT should contain. If None then the JWT's 'aud' parameter is not verified. Returns: dict, The deserialized JSON payload in the JWT. Raises: AppIdentityError if any checks are failed. """ segments = jwt.split('.') if len(segments) != 3: raise AppIdentityError('Wrong number of segments in token: %s' % jwt) signed = '%s.%s' % (segments[0], segments[1]) signature = _urlsafe_b64decode(segments[2]) # Parse token. json_body = _urlsafe_b64decode(segments[1]) try: parsed = json.loads(json_body.decode('utf-8')) except: raise AppIdentityError('Can\'t parse token: %s' % json_body) # Check signature. verified = False for pem in certs.values(): verifier = Verifier.from_string(pem, True) if verifier.verify(signed, signature): verified = True break if not verified: raise AppIdentityError('Invalid token signature: %s' % jwt) # Check creation timestamp. iat = parsed.get('iat') if iat is None: raise AppIdentityError('No iat field in token: %s' % json_body) earliest = iat - CLOCK_SKEW_SECS # Check expiration timestamp. now = int(time.time()) exp = parsed.get('exp') if exp is None: raise AppIdentityError('No exp field in token: %s' % json_body) if exp >= now + MAX_TOKEN_LIFETIME_SECS: raise AppIdentityError('exp field too far in future: %s' % json_body) latest = exp + CLOCK_SKEW_SECS if now < earliest: raise AppIdentityError('Token used too early, %d < %d: %s' % (now, earliest, json_body)) if now > latest: raise AppIdentityError('Token used too late, %d > %d: %s' % (now, latest, json_body)) # Check audience. if audience is not None: aud = parsed.get('aud') if aud is None: raise AppIdentityError('No aud field in token: %s' % json_body) if aud != audience: raise AppIdentityError('Wrong recipient, %s != %s: %s' % (aud, audience, json_body)) return parsed