diff --git a/oauth2client/_helpers.py b/oauth2client/_helpers.py new file mode 100644 index 0000000..974d210 --- /dev/null +++ b/oauth2client/_helpers.py @@ -0,0 +1,53 @@ +# Copyright 2015 Google Inc. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Helper functions for commonly used utilities.""" + +import base64 +import json +import six + + +def _parse_pem_key(raw_key_input): + """Identify and extract PEM keys. + + Determines whether the given key is in the format of PEM key, and extracts + the relevant part of the key if it is. + + Args: + raw_key_input: The contents of a private key file (either PEM or PKCS12). + + Returns: + string, The actual key if the contents are from a PEM file, or else None. + """ + offset = raw_key_input.find(b'-----BEGIN ') + if offset != -1: + return raw_key_input[offset:] + + +def _json_encode(data): + return json.dumps(data, separators=(',', ':')) + + +def _urlsafe_b64encode(raw_bytes): + if isinstance(raw_bytes, six.text_type): + raw_bytes = raw_bytes.encode('utf-8') + return base64.urlsafe_b64encode(raw_bytes).decode('ascii').rstrip('=') + + +def _urlsafe_b64decode(b64string): + # Guard against unicode strings, which base64 can't handle. + if isinstance(b64string, six.text_type): + b64string = b64string.encode('ascii') + padded = b64string + b'=' * (4 - len(b64string) % 4) + return base64.urlsafe_b64decode(padded) diff --git a/oauth2client/_openssl_crypt.py b/oauth2client/_openssl_crypt.py new file mode 100644 index 0000000..085c620 --- /dev/null +++ b/oauth2client/_openssl_crypt.py @@ -0,0 +1,147 @@ +# Copyright 2015 Google Inc. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""OpenSSL Crypto-related routines for oauth2client.""" + +import base64 +import six + +from oauth2client._helpers import _parse_pem_key + + +class OpenSSLVerifier(object): + """Verifies the signature on a message.""" + + def __init__(self, pubkey): + """Constructor. + + Args: + pubkey, OpenSSL.crypto.PKey, The public key to verify with. + """ + self._pubkey = pubkey + + def verify(self, message, signature): + """Verifies a message against a signature. + + Args: + message: string or bytes, The message to verify. If string, will be + encoded to bytes as utf-8. + signature: string or bytes, The signature on the message. If string, + will be encoded to bytes as utf-8. + + Returns: + True if message was signed by the private key associated with the public + key that this object was constructed with. + """ + from OpenSSL import crypto # Delay import due to 0.5s import time. + if isinstance(message, six.text_type): + message = message.encode('utf-8') + if isinstance(signature, six.text_type): + signature = signature.encode('utf-8') + try: + crypto.verify(self._pubkey, signature, message, 'sha256') + return True + except crypto.Error: + return False + + @staticmethod + def from_string(key_pem, is_x509_cert): + """Construct a Verified instance from a string. + + Args: + key_pem: string, public key in PEM format. + is_x509_cert: bool, True if key_pem is an X509 cert, otherwise it is + expected to be an RSA key in PEM format. + + Returns: + Verifier instance. + + Raises: + OpenSSL.crypto.Error if the key_pem can't be parsed. + """ + from OpenSSL import crypto # Delay import due to 0.5s import time. + if is_x509_cert: + pubkey = crypto.load_certificate(crypto.FILETYPE_PEM, key_pem) + else: + pubkey = crypto.load_privatekey(crypto.FILETYPE_PEM, key_pem) + return OpenSSLVerifier(pubkey) + + +class OpenSSLSigner(object): + """Signs messages with a private key.""" + + def __init__(self, pkey): + """Constructor. + + Args: + pkey, OpenSSL.crypto.PKey (or equiv), The private key to sign with. + """ + self._key = pkey + + def sign(self, message): + """Signs a message. + + Args: + message: bytes, Message to be signed. + + Returns: + string, The signature of the message for the given key. + """ + from OpenSSL import crypto # Delay import due to 0.5s import time. + if isinstance(message, six.text_type): + message = message.encode('utf-8') + return crypto.sign(self._key, message, 'sha256') + + @staticmethod + def from_string(key, password=b'notasecret'): + """Construct a Signer instance from a string. + + Args: + key: string, private key in PKCS12 or PEM format. + password: string, password for the private key file. + + Returns: + Signer instance. + + Raises: + OpenSSL.crypto.Error if the key can't be parsed. + """ + from OpenSSL import crypto # Delay import due to 0.5s import time. + parsed_pem_key = _parse_pem_key(key) + if parsed_pem_key: + pkey = crypto.load_privatekey(crypto.FILETYPE_PEM, parsed_pem_key) + else: + if isinstance(password, six.text_type): + password = password.encode('utf-8') + pkey = crypto.load_pkcs12(key, password).get_privatekey() + return OpenSSLSigner(pkey) + + +def pkcs12_key_as_pem(private_key_text, private_key_password): + """Convert the contents of a PKCS12 key to PEM using OpenSSL. + + Args: + private_key_text: String. Private key. + private_key_password: String. Password for PKCS12. + + Returns: + String. PEM contents of ``private_key_text``. + """ + from OpenSSL import crypto # Delay import due to 0.5s import time. + decoded_body = base64.b64decode(private_key_text) + if isinstance(private_key_password, six.text_type): + private_key_password = private_key_password.encode('ascii') + + pkcs12 = crypto.load_pkcs12(decoded_body, private_key_password) + return crypto.dump_privatekey(crypto.FILETYPE_PEM, + pkcs12.get_privatekey()) diff --git a/oauth2client/_pycrypto_crypt.py b/oauth2client/_pycrypto_crypt.py new file mode 100644 index 0000000..530eea2 --- /dev/null +++ b/oauth2client/_pycrypto_crypt.py @@ -0,0 +1,128 @@ +# Copyright 2015 Google Inc. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""pyCrypto Crypto-related routines for oauth2client.""" + +from Crypto.PublicKey import RSA +from Crypto.Hash import SHA256 +from Crypto.Signature import PKCS1_v1_5 +from Crypto.Util.asn1 import DerSequence +import six + +from oauth2client._helpers import _parse_pem_key +from oauth2client._helpers import _urlsafe_b64decode + + +class PyCryptoVerifier(object): + """Verifies the signature on a message.""" + + def __init__(self, pubkey): + """Constructor. + + Args: + pubkey, OpenSSL.crypto.PKey (or equiv), The public key to verify with. + """ + self._pubkey = pubkey + + def verify(self, message, signature): + """Verifies a message against a signature. + + Args: + message: string or bytes, The message to verify. If string, will be + encoded to bytes as utf-8. + signature: string or bytes, The signature on the message. + + Returns: + True if message was signed by the private key associated with the public + key that this object was constructed with. + """ + if isinstance(message, six.text_type): + message = message.encode('utf-8') + return PKCS1_v1_5.new(self._pubkey).verify( + SHA256.new(message), signature) + + @staticmethod + def from_string(key_pem, is_x509_cert): + """Construct a Verified instance from a string. + + Args: + key_pem: string, public key in PEM format. + is_x509_cert: bool, True if key_pem is an X509 cert, otherwise it is + expected to be an RSA key in PEM format. + + Returns: + Verifier instance. + """ + if is_x509_cert: + if isinstance(key_pem, six.text_type): + key_pem = key_pem.encode('ascii') + pemLines = key_pem.replace(b' ', b'').split() + certDer = _urlsafe_b64decode(b''.join(pemLines[1:-1])) + certSeq = DerSequence() + certSeq.decode(certDer) + tbsSeq = DerSequence() + tbsSeq.decode(certSeq[0]) + pubkey = RSA.importKey(tbsSeq[6]) + else: + pubkey = RSA.importKey(key_pem) + return PyCryptoVerifier(pubkey) + + +class PyCryptoSigner(object): + """Signs messages with a private key.""" + + def __init__(self, pkey): + """Constructor. + + Args: + pkey, OpenSSL.crypto.PKey (or equiv), The private key to sign with. + """ + self._key = pkey + + def sign(self, message): + """Signs a message. + + Args: + message: string, Message to be signed. + + Returns: + string, The signature of the message for the given key. + """ + if isinstance(message, six.text_type): + message = message.encode('utf-8') + return PKCS1_v1_5.new(self._key).sign(SHA256.new(message)) + + @staticmethod + def from_string(key, password='notasecret'): + """Construct a Signer instance from a string. + + Args: + key: string, private key in PEM format. + password: string, password for private key file. Unused for PEM files. + + Returns: + Signer instance. + + Raises: + NotImplementedError if the key isn't in PEM format. + """ + parsed_pem_key = _parse_pem_key(key) + if parsed_pem_key: + pkey = RSA.importKey(parsed_pem_key) + else: + raise NotImplementedError( + 'PKCS12 format is not supported by the PyCrypto library. ' + 'Try converting to a "PEM" ' + '(openssl pkcs12 -in xxxxx.p12 -nodes -nocerts > privatekey.pem) ' + 'or using PyOpenSSL if native code is an option.') + return PyCryptoSigner(pkey) diff --git a/oauth2client/client.py b/oauth2client/client.py index 1101c6d..5956f44 100644 --- a/oauth2client/client.py +++ b/oauth2client/client.py @@ -35,11 +35,12 @@ import six from six.moves import urllib import httplib2 -from oauth2client import clientsecrets from oauth2client import GOOGLE_AUTH_URI from oauth2client import GOOGLE_DEVICE_URI from oauth2client import GOOGLE_REVOKE_URI from oauth2client import GOOGLE_TOKEN_URI +from oauth2client._helpers import _urlsafe_b64decode +from oauth2client import clientsecrets from oauth2client import util HAS_OPENSSL = False @@ -1591,14 +1592,6 @@ def verify_id_token(id_token, audience, http=None, raise VerifyJwtTokenError('Status code: %d' % resp.status) -def _urlsafe_b64decode(b64string): - # Guard against unicode strings, which base64 can't handle. - if isinstance(b64string, six.text_type): - b64string = b64string.encode('ascii') - padded = b64string + b'=' * (4 - len(b64string) % 4) - return base64.urlsafe_b64decode(padded) - - def _extract_id_token(id_token): """Extract the JSON payload from a JWT. diff --git a/oauth2client/crypt.py b/oauth2client/crypt.py index 1a25bec..cc7a6cd 100644 --- a/oauth2client/crypt.py +++ b/oauth2client/crypt.py @@ -15,15 +15,15 @@ # limitations under the License. """Crypto-related routines for oauth2client.""" -import base64 import imp import json import logging import os -import sys import time -import six +from oauth2client._helpers import _json_encode +from oauth2client._helpers import _urlsafe_b64decode +from oauth2client._helpers import _urlsafe_b64encode CLOCK_SKEW_SECS = 300 # 5 minutes in seconds @@ -69,133 +69,9 @@ def _TryOpenSslImport(): try: _TryOpenSslImport() - - class OpenSSLVerifier(object): - """Verifies the signature on a message.""" - - def __init__(self, pubkey): - """Constructor. - - Args: - pubkey, OpenSSL.crypto.PKey, The public key to verify with. - """ - self._pubkey = pubkey - - def verify(self, message, signature): - """Verifies a message against a signature. - - Args: - message: string or bytes, The message to verify. If string, will be - encoded to bytes as utf-8. - signature: string or bytes, The signature on the message. If string, - will be encoded to bytes as utf-8. - - Returns: - True if message was signed by the private key associated with the public - key that this object was constructed with. - """ - from OpenSSL import crypto - if isinstance(message, six.text_type): - message = message.encode('utf-8') - if isinstance(signature, six.text_type): - signature = signature.encode('utf-8') - try: - crypto.verify(self._pubkey, signature, message, 'sha256') - return True - except crypto.Error: - return False - - @staticmethod - def from_string(key_pem, is_x509_cert): - """Construct a Verified instance from a string. - - Args: - key_pem: string, public key in PEM format. - is_x509_cert: bool, True if key_pem is an X509 cert, otherwise it is - expected to be an RSA key in PEM format. - - Returns: - Verifier instance. - - Raises: - OpenSSL.crypto.Error if the key_pem can't be parsed. - """ - from OpenSSL import crypto - if is_x509_cert: - pubkey = crypto.load_certificate(crypto.FILETYPE_PEM, key_pem) - else: - pubkey = crypto.load_privatekey(crypto.FILETYPE_PEM, key_pem) - return OpenSSLVerifier(pubkey) - - - class OpenSSLSigner(object): - """Signs messages with a private key.""" - - def __init__(self, pkey): - """Constructor. - - Args: - pkey, OpenSSL.crypto.PKey (or equiv), The private key to sign with. - """ - self._key = pkey - - def sign(self, message): - """Signs a message. - - Args: - message: bytes, Message to be signed. - - Returns: - string, The signature of the message for the given key. - """ - from OpenSSL import crypto - if isinstance(message, six.text_type): - message = message.encode('utf-8') - return crypto.sign(self._key, message, 'sha256') - - @staticmethod - def from_string(key, password=b'notasecret'): - """Construct a Signer instance from a string. - - Args: - key: string, private key in PKCS12 or PEM format. - password: string, password for the private key file. - - Returns: - Signer instance. - - Raises: - OpenSSL.crypto.Error if the key can't be parsed. - """ - from OpenSSL import crypto - parsed_pem_key = _parse_pem_key(key) - if parsed_pem_key: - pkey = crypto.load_privatekey(crypto.FILETYPE_PEM, parsed_pem_key) - else: - if isinstance(password, six.text_type): - password = password.encode('utf-8') - pkey = crypto.load_pkcs12(key, password).get_privatekey() - return OpenSSLSigner(pkey) - - - def pkcs12_key_as_pem(private_key_text, private_key_password): - """Convert the contents of a PKCS12 key to PEM using OpenSSL. - - Args: - private_key_text: String. Private key. - private_key_password: String. Password for PKCS12. - - Returns: - String. PEM contents of ``private_key_text``. - """ - from OpenSSL import crypto - decoded_body = base64.b64decode(private_key_text) - if isinstance(private_key_password, six.string_types): - private_key_password = private_key_password.encode('ascii') - - pkcs12 = crypto.load_pkcs12(decoded_body, private_key_password) - return crypto.dump_privatekey(crypto.FILETYPE_PEM, - pkcs12.get_privatekey()) + from oauth2client._openssl_crypt import OpenSSLVerifier + from oauth2client._openssl_crypt import OpenSSLSigner + from oauth2client._openssl_crypt import pkcs12_key_as_pem except ImportError: OpenSSLVerifier = None OpenSSLSigner = None @@ -204,116 +80,8 @@ except ImportError: try: - from Crypto.PublicKey import RSA - from Crypto.Hash import SHA256 - from Crypto.Signature import PKCS1_v1_5 - from Crypto.Util.asn1 import DerSequence - - - class PyCryptoVerifier(object): - """Verifies the signature on a message.""" - - def __init__(self, pubkey): - """Constructor. - - Args: - pubkey, OpenSSL.crypto.PKey (or equiv), The public key to verify with. - """ - self._pubkey = pubkey - - def verify(self, message, signature): - """Verifies a message against a signature. - - Args: - message: string or bytes, The message to verify. If string, will be - encoded to bytes as utf-8. - signature: string or bytes, The signature on the message. - - Returns: - True if message was signed by the private key associated with the public - key that this object was constructed with. - """ - if isinstance(message, six.text_type): - message = message.encode('utf-8') - return PKCS1_v1_5.new(self._pubkey).verify( - SHA256.new(message), signature) - - @staticmethod - def from_string(key_pem, is_x509_cert): - """Construct a Verified instance from a string. - - Args: - key_pem: string, public key in PEM format. - is_x509_cert: bool, True if key_pem is an X509 cert, otherwise it is - expected to be an RSA key in PEM format. - - Returns: - Verifier instance. - """ - if is_x509_cert: - if isinstance(key_pem, six.text_type): - key_pem = key_pem.encode('ascii') - pemLines = key_pem.replace(b' ', b'').split() - certDer = _urlsafe_b64decode(b''.join(pemLines[1:-1])) - certSeq = DerSequence() - certSeq.decode(certDer) - tbsSeq = DerSequence() - tbsSeq.decode(certSeq[0]) - pubkey = RSA.importKey(tbsSeq[6]) - else: - pubkey = RSA.importKey(key_pem) - return PyCryptoVerifier(pubkey) - - - class PyCryptoSigner(object): - """Signs messages with a private key.""" - - def __init__(self, pkey): - """Constructor. - - Args: - pkey, OpenSSL.crypto.PKey (or equiv), The private key to sign with. - """ - self._key = pkey - - def sign(self, message): - """Signs a message. - - Args: - message: string, Message to be signed. - - Returns: - string, The signature of the message for the given key. - """ - if isinstance(message, six.text_type): - message = message.encode('utf-8') - return PKCS1_v1_5.new(self._key).sign(SHA256.new(message)) - - @staticmethod - def from_string(key, password='notasecret'): - """Construct a Signer instance from a string. - - Args: - key: string, private key in PEM format. - password: string, password for private key file. Unused for PEM files. - - Returns: - Signer instance. - - Raises: - NotImplementedError if they key isn't in PEM format. - """ - parsed_pem_key = _parse_pem_key(key) - if parsed_pem_key: - pkey = RSA.importKey(parsed_pem_key) - else: - raise NotImplementedError( - 'PKCS12 format is not supported by the PyCrypto library. ' - 'Try converting to a "PEM" ' - '(openssl pkcs12 -in xxxxx.p12 -nodes -nocerts > privatekey.pem) ' - 'or using PyOpenSSL if native code is an option.') - return PyCryptoSigner(pkey) - + from oauth2client._pycrypto_crypt import PyCryptoVerifier + from oauth2client._pycrypto_crypt import PyCryptoSigner except ImportError: PyCryptoVerifier = None PyCryptoSigner = None @@ -330,41 +98,6 @@ else: 'PyOpenSSL, or PyCrypto 2.6 or later') -def _parse_pem_key(raw_key_input): - """Identify and extract PEM keys. - - Determines whether the given key is in the format of PEM key, and extracts - the relevant part of the key if it is. - - Args: - raw_key_input: The contents of a private key file (either PEM or PKCS12). - - Returns: - string, The actual key if the contents are from a PEM file, or else None. - """ - offset = raw_key_input.find(b'-----BEGIN ') - if offset != -1: - return raw_key_input[offset:] - - -def _urlsafe_b64encode(raw_bytes): - if isinstance(raw_bytes, six.text_type): - raw_bytes = raw_bytes.encode('utf-8') - return base64.urlsafe_b64encode(raw_bytes).decode('ascii').rstrip('=') - - -def _urlsafe_b64decode(b64string): - # Guard against unicode strings, which base64 can't handle. - if isinstance(b64string, six.text_type): - b64string = b64string.encode('ascii') - padded = b64string + b'=' * (4 - len(b64string) % 4) - return base64.urlsafe_b64decode(padded) - - -def _json_encode(data): - return json.dumps(data, separators=(',', ':')) - - def make_signed_jwt(signer, payload): """Make a signed JWT. diff --git a/oauth2client/service_account.py b/oauth2client/service_account.py index d1d1d89..ef75a00 100644 --- a/oauth2client/service_account.py +++ b/oauth2client/service_account.py @@ -18,7 +18,6 @@ This credentials class is implemented on top of rsa library. """ import base64 -import json import six import time @@ -28,6 +27,8 @@ import rsa from oauth2client import GOOGLE_REVOKE_URI from oauth2client import GOOGLE_TOKEN_URI +from oauth2client._helpers import _json_encode +from oauth2client._helpers import _urlsafe_b64encode from oauth2client import util from oauth2client.client import AssertionCredentials @@ -75,8 +76,9 @@ class _ServiceAccountCredentials(AssertionCredentials): } payload.update(self._kwargs) - assertion_input = (_urlsafe_b64encode(header) + b'.' + - _urlsafe_b64encode(payload)) + first_segment = _urlsafe_b64encode(_json_encode(header)).encode('utf-8') + second_segment = _urlsafe_b64encode(_json_encode(payload)).encode('utf-8') + assertion_input = first_segment + b'.' + second_segment # Sign the assertion. rsa_bytes = rsa.pkcs1.sign(assertion_input, self._private_key, 'SHA-256') @@ -122,11 +124,6 @@ class _ServiceAccountCredentials(AssertionCredentials): **self._kwargs) -def _urlsafe_b64encode(data): - return base64.urlsafe_b64encode( - json.dumps(data, separators=(',', ':')).encode('UTF-8')).rstrip(b'=') - - def _get_private_key(private_key_pkcs8_text): """Get an RSA private key object from a pkcs8 representation.""" diff --git a/tests/test__helpers.py b/tests/test__helpers.py new file mode 100644 index 0000000..f476a13 --- /dev/null +++ b/tests/test__helpers.py @@ -0,0 +1,81 @@ +# Copyright 2015 Google Inc. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Unit tests for oauth2client._helpers.""" + +import unittest + +from oauth2client._helpers import _json_encode +from oauth2client._helpers import _parse_pem_key +from oauth2client._helpers import _urlsafe_b64decode +from oauth2client._helpers import _urlsafe_b64encode + + +class Test__parse_pem_key(unittest.TestCase): + + def test_valid_input(self): + test_string = b'1234-----BEGIN FOO BAR BAZ' + result = _parse_pem_key(test_string) + self.assertEqual(result, test_string[4:]) + + def test_bad_input(self): + test_string = b'DOES NOT HAVE DASHES' + result = _parse_pem_key(test_string) + self.assertEqual(result, None) + + +class Test__json_encode(unittest.TestCase): + + def test_dictionary_input(self): + # Use only a single key since dictionary hash order + # is non-deterministic. + data = {u'foo': 10} + result = _json_encode(data) + self.assertEqual(result, """{"foo":10}""") + + def test_list_input(self): + data = [42, 1337] + result = _json_encode(data) + self.assertEqual(result, """[42,1337]""") + + +class Test__urlsafe_b64encode(unittest.TestCase): + + def test_valid_input_bytes(self): + test_string = b'deadbeef' + result = _urlsafe_b64encode(test_string) + self.assertEqual(result, u'ZGVhZGJlZWY') + + def test_valid_input_unicode(self): + test_string = u'deadbeef' + result = _urlsafe_b64encode(test_string) + self.assertEqual(result, u'ZGVhZGJlZWY') + + +class Test__urlsafe_b64decode(unittest.TestCase): + + def test_valid_input_bytes(self): + test_string = b'ZGVhZGJlZWY' + result = _urlsafe_b64decode(test_string) + self.assertEqual(result, b'deadbeef') + + def test_valid_input_unicode(self): + test_string = b'ZGVhZGJlZWY' + result = _urlsafe_b64decode(test_string) + self.assertEqual(result, b'deadbeef') + + def test_bad_input(self): + import binascii + bad_string = b'+' + self.assertRaises((TypeError, binascii.Error), + _urlsafe_b64decode, bad_string) diff --git a/tests/test__pycrypto_crypt.py b/tests/test__pycrypto_crypt.py new file mode 100644 index 0000000..3097235 --- /dev/null +++ b/tests/test__pycrypto_crypt.py @@ -0,0 +1,63 @@ +# Copyright 2015 Google Inc. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Unit tests for oauth2client._pycrypto_crypt.""" + +import os +import unittest + +from oauth2client.crypt import PyCryptoSigner +from oauth2client.crypt import PyCryptoVerifier + + +class TestPyCryptoVerifier(unittest.TestCase): + + PUBLIC_KEY_FILENAME = os.path.join(os.path.dirname(__file__), + 'data', 'publickey.pem') + PRIVATE_KEY_FILENAME = os.path.join(os.path.dirname(__file__), + 'data', 'privatekey.pem') + + def _load_public_key_bytes(self): + with open(self.PUBLIC_KEY_FILENAME, 'rb') as fh: + return fh.read() + + def _load_private_key_bytes(self): + with open(self.PRIVATE_KEY_FILENAME, 'rb') as fh: + return fh.read() + + def test_verify_success(self): + to_sign = b'foo' + signer = PyCryptoSigner.from_string(self._load_private_key_bytes()) + actual_signature = signer.sign(to_sign) + + verifier = PyCryptoVerifier.from_string(self._load_public_key_bytes(), + is_x509_cert=True) + self.assertTrue(verifier.verify(to_sign, actual_signature)) + + def test_verify_failure(self): + verifier = PyCryptoVerifier.from_string(self._load_public_key_bytes(), + is_x509_cert=True) + bad_signature = b'' + self.assertFalse(verifier.verify(b'foo', bad_signature)) + + def test_verify_bad_key(self): + verifier = PyCryptoVerifier.from_string(self._load_public_key_bytes(), + is_x509_cert=True) + bad_signature = b'' + self.assertFalse(verifier.verify(b'foo', bad_signature)) + + def test_from_string_unicode_key(self): + public_key = self._load_public_key_bytes() + public_key = public_key.decode('utf-8') + verifier = PyCryptoVerifier.from_string(public_key, is_x509_cert=True) + self.assertTrue(isinstance(verifier, PyCryptoVerifier)) diff --git a/tests/test_crypt.py b/tests/test_crypt.py index dcc44e0..302e26e 100644 --- a/tests/test_crypt.py +++ b/tests/test_crypt.py @@ -23,6 +23,7 @@ except NameError: # For Python3 (though importlib should be used, silly 3.3). from imp import reload +from oauth2client import _helpers from oauth2client.client import HAS_OPENSSL from oauth2client.client import SignedJwtAssertionCredentials from oauth2client import crypt @@ -46,17 +47,25 @@ class Test_pkcs12_key_as_pem(unittest.TestCase): scope='read+write', sub='joe@example.org') - def test_succeeds(self): + def _succeeds_helper(self, password=None): self.assertEqual(True, HAS_OPENSSL) credentials = self._make_signed_jwt_creds() - pem_contents = crypt.pkcs12_key_as_pem(credentials.private_key, - credentials.private_key_password) + if password is None: + password = credentials.private_key_password + pem_contents = crypt.pkcs12_key_as_pem(credentials.private_key, password) pkcs12_key_as_pem = datafile('pem_from_pkcs12.pem') - pkcs12_key_as_pem = crypt._parse_pem_key(pkcs12_key_as_pem) + pkcs12_key_as_pem = _helpers._parse_pem_key(pkcs12_key_as_pem) alternate_pem = datafile('pem_from_pkcs12_alternate.pem') self.assertTrue(pem_contents in [pkcs12_key_as_pem, alternate_pem]) + def test_succeeds(self): + self._succeeds_helper() + + def test_succeeds_with_unicode_password(self): + password = u'notasecret' + self._succeeds_helper(password) + def test_without_openssl(self): import imp imp_find_module = imp.find_module diff --git a/tests/test_jwt.py b/tests/test_jwt.py index 2b9db5a..5f2cc90 100644 --- a/tests/test_jwt.py +++ b/tests/test_jwt.py @@ -67,7 +67,10 @@ class CryptTests(unittest.TestCase): private_key = datafile(private_key_file) public_key = datafile('publickey.pem') - signer = self.signer.from_string(private_key) + # We pass in a non-bytes password to make sure all branches + # are traversed in tests. + signer = self.signer.from_string(private_key, + password=u'notasecret') signature = signer.sign('foo') verifier = self.verifier.from_string(public_key, True) @@ -188,6 +191,13 @@ class CryptTests(unittest.TestCase): }) self._check_jwt_failure(jwt, 'Wrong recipient') + def test_from_string_non_509_cert(self): + # Use a private key instead of a certificate to test the other branch + # of from_string(). + public_key = datafile('privatekey.pem') + verifier = self.verifier.from_string(public_key, is_x509_cert=False) + self.assertTrue(isinstance(verifier, self.verifier)) + class PEMCryptTestsPyCrypto(CryptTests):