Factoring out conditional code from crypt.py.

Until now, code that depended on PyCrypto or OpenSSL was
defined conditionally (e.g. indented) in `crypt.py`. Rather than
grouping all these together, we factor out the library specific
behavior into standalone modules (but make the modules
private / protected).

In addition, added a `_helpers.py` module with common behavior
that was previously defined in multiple places.

Finally, beefed up some test cases so that the three newly added
modules had 100% test coverage.

Towards #212.
This commit is contained in:
Danny Hermes
2015-07-06 20:46:36 -07:00
parent 76b3c40068
commit 3b3aeb65d7
10 changed files with 511 additions and 297 deletions

53
oauth2client/_helpers.py Normal file
View File

@@ -0,0 +1,53 @@
# Copyright 2015 Google Inc. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Helper functions for commonly used utilities."""
import base64
import json
import six
def _parse_pem_key(raw_key_input):
"""Identify and extract PEM keys.
Determines whether the given key is in the format of PEM key, and extracts
the relevant part of the key if it is.
Args:
raw_key_input: The contents of a private key file (either PEM or PKCS12).
Returns:
string, The actual key if the contents are from a PEM file, or else None.
"""
offset = raw_key_input.find(b'-----BEGIN ')
if offset != -1:
return raw_key_input[offset:]
def _json_encode(data):
return json.dumps(data, separators=(',', ':'))
def _urlsafe_b64encode(raw_bytes):
if isinstance(raw_bytes, six.text_type):
raw_bytes = raw_bytes.encode('utf-8')
return base64.urlsafe_b64encode(raw_bytes).decode('ascii').rstrip('=')
def _urlsafe_b64decode(b64string):
# Guard against unicode strings, which base64 can't handle.
if isinstance(b64string, six.text_type):
b64string = b64string.encode('ascii')
padded = b64string + b'=' * (4 - len(b64string) % 4)
return base64.urlsafe_b64decode(padded)

View File

@@ -0,0 +1,147 @@
# Copyright 2015 Google Inc. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""OpenSSL Crypto-related routines for oauth2client."""
import base64
import six
from oauth2client._helpers import _parse_pem_key
class OpenSSLVerifier(object):
"""Verifies the signature on a message."""
def __init__(self, pubkey):
"""Constructor.
Args:
pubkey, OpenSSL.crypto.PKey, The public key to verify with.
"""
self._pubkey = pubkey
def verify(self, message, signature):
"""Verifies a message against a signature.
Args:
message: string or bytes, The message to verify. If string, will be
encoded to bytes as utf-8.
signature: string or bytes, The signature on the message. If string,
will be encoded to bytes as utf-8.
Returns:
True if message was signed by the private key associated with the public
key that this object was constructed with.
"""
from OpenSSL import crypto # Delay import due to 0.5s import time.
if isinstance(message, six.text_type):
message = message.encode('utf-8')
if isinstance(signature, six.text_type):
signature = signature.encode('utf-8')
try:
crypto.verify(self._pubkey, signature, message, 'sha256')
return True
except crypto.Error:
return False
@staticmethod
def from_string(key_pem, is_x509_cert):
"""Construct a Verified instance from a string.
Args:
key_pem: string, public key in PEM format.
is_x509_cert: bool, True if key_pem is an X509 cert, otherwise it is
expected to be an RSA key in PEM format.
Returns:
Verifier instance.
Raises:
OpenSSL.crypto.Error if the key_pem can't be parsed.
"""
from OpenSSL import crypto # Delay import due to 0.5s import time.
if is_x509_cert:
pubkey = crypto.load_certificate(crypto.FILETYPE_PEM, key_pem)
else:
pubkey = crypto.load_privatekey(crypto.FILETYPE_PEM, key_pem)
return OpenSSLVerifier(pubkey)
class OpenSSLSigner(object):
"""Signs messages with a private key."""
def __init__(self, pkey):
"""Constructor.
Args:
pkey, OpenSSL.crypto.PKey (or equiv), The private key to sign with.
"""
self._key = pkey
def sign(self, message):
"""Signs a message.
Args:
message: bytes, Message to be signed.
Returns:
string, The signature of the message for the given key.
"""
from OpenSSL import crypto # Delay import due to 0.5s import time.
if isinstance(message, six.text_type):
message = message.encode('utf-8')
return crypto.sign(self._key, message, 'sha256')
@staticmethod
def from_string(key, password=b'notasecret'):
"""Construct a Signer instance from a string.
Args:
key: string, private key in PKCS12 or PEM format.
password: string, password for the private key file.
Returns:
Signer instance.
Raises:
OpenSSL.crypto.Error if the key can't be parsed.
"""
from OpenSSL import crypto # Delay import due to 0.5s import time.
parsed_pem_key = _parse_pem_key(key)
if parsed_pem_key:
pkey = crypto.load_privatekey(crypto.FILETYPE_PEM, parsed_pem_key)
else:
if isinstance(password, six.text_type):
password = password.encode('utf-8')
pkey = crypto.load_pkcs12(key, password).get_privatekey()
return OpenSSLSigner(pkey)
def pkcs12_key_as_pem(private_key_text, private_key_password):
"""Convert the contents of a PKCS12 key to PEM using OpenSSL.
Args:
private_key_text: String. Private key.
private_key_password: String. Password for PKCS12.
Returns:
String. PEM contents of ``private_key_text``.
"""
from OpenSSL import crypto # Delay import due to 0.5s import time.
decoded_body = base64.b64decode(private_key_text)
if isinstance(private_key_password, six.text_type):
private_key_password = private_key_password.encode('ascii')
pkcs12 = crypto.load_pkcs12(decoded_body, private_key_password)
return crypto.dump_privatekey(crypto.FILETYPE_PEM,
pkcs12.get_privatekey())

View File

@@ -0,0 +1,128 @@
# Copyright 2015 Google Inc. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""pyCrypto Crypto-related routines for oauth2client."""
from Crypto.PublicKey import RSA
from Crypto.Hash import SHA256
from Crypto.Signature import PKCS1_v1_5
from Crypto.Util.asn1 import DerSequence
import six
from oauth2client._helpers import _parse_pem_key
from oauth2client._helpers import _urlsafe_b64decode
class PyCryptoVerifier(object):
"""Verifies the signature on a message."""
def __init__(self, pubkey):
"""Constructor.
Args:
pubkey, OpenSSL.crypto.PKey (or equiv), The public key to verify with.
"""
self._pubkey = pubkey
def verify(self, message, signature):
"""Verifies a message against a signature.
Args:
message: string or bytes, The message to verify. If string, will be
encoded to bytes as utf-8.
signature: string or bytes, The signature on the message.
Returns:
True if message was signed by the private key associated with the public
key that this object was constructed with.
"""
if isinstance(message, six.text_type):
message = message.encode('utf-8')
return PKCS1_v1_5.new(self._pubkey).verify(
SHA256.new(message), signature)
@staticmethod
def from_string(key_pem, is_x509_cert):
"""Construct a Verified instance from a string.
Args:
key_pem: string, public key in PEM format.
is_x509_cert: bool, True if key_pem is an X509 cert, otherwise it is
expected to be an RSA key in PEM format.
Returns:
Verifier instance.
"""
if is_x509_cert:
if isinstance(key_pem, six.text_type):
key_pem = key_pem.encode('ascii')
pemLines = key_pem.replace(b' ', b'').split()
certDer = _urlsafe_b64decode(b''.join(pemLines[1:-1]))
certSeq = DerSequence()
certSeq.decode(certDer)
tbsSeq = DerSequence()
tbsSeq.decode(certSeq[0])
pubkey = RSA.importKey(tbsSeq[6])
else:
pubkey = RSA.importKey(key_pem)
return PyCryptoVerifier(pubkey)
class PyCryptoSigner(object):
"""Signs messages with a private key."""
def __init__(self, pkey):
"""Constructor.
Args:
pkey, OpenSSL.crypto.PKey (or equiv), The private key to sign with.
"""
self._key = pkey
def sign(self, message):
"""Signs a message.
Args:
message: string, Message to be signed.
Returns:
string, The signature of the message for the given key.
"""
if isinstance(message, six.text_type):
message = message.encode('utf-8')
return PKCS1_v1_5.new(self._key).sign(SHA256.new(message))
@staticmethod
def from_string(key, password='notasecret'):
"""Construct a Signer instance from a string.
Args:
key: string, private key in PEM format.
password: string, password for private key file. Unused for PEM files.
Returns:
Signer instance.
Raises:
NotImplementedError if the key isn't in PEM format.
"""
parsed_pem_key = _parse_pem_key(key)
if parsed_pem_key:
pkey = RSA.importKey(parsed_pem_key)
else:
raise NotImplementedError(
'PKCS12 format is not supported by the PyCrypto library. '
'Try converting to a "PEM" '
'(openssl pkcs12 -in xxxxx.p12 -nodes -nocerts > privatekey.pem) '
'or using PyOpenSSL if native code is an option.')
return PyCryptoSigner(pkey)

View File

@@ -35,11 +35,12 @@ import six
from six.moves import urllib from six.moves import urllib
import httplib2 import httplib2
from oauth2client import clientsecrets
from oauth2client import GOOGLE_AUTH_URI from oauth2client import GOOGLE_AUTH_URI
from oauth2client import GOOGLE_DEVICE_URI from oauth2client import GOOGLE_DEVICE_URI
from oauth2client import GOOGLE_REVOKE_URI from oauth2client import GOOGLE_REVOKE_URI
from oauth2client import GOOGLE_TOKEN_URI from oauth2client import GOOGLE_TOKEN_URI
from oauth2client._helpers import _urlsafe_b64decode
from oauth2client import clientsecrets
from oauth2client import util from oauth2client import util
HAS_OPENSSL = False HAS_OPENSSL = False
@@ -1591,14 +1592,6 @@ def verify_id_token(id_token, audience, http=None,
raise VerifyJwtTokenError('Status code: %d' % resp.status) raise VerifyJwtTokenError('Status code: %d' % resp.status)
def _urlsafe_b64decode(b64string):
# Guard against unicode strings, which base64 can't handle.
if isinstance(b64string, six.text_type):
b64string = b64string.encode('ascii')
padded = b64string + b'=' * (4 - len(b64string) % 4)
return base64.urlsafe_b64decode(padded)
def _extract_id_token(id_token): def _extract_id_token(id_token):
"""Extract the JSON payload from a JWT. """Extract the JSON payload from a JWT.

View File

@@ -15,15 +15,15 @@
# limitations under the License. # limitations under the License.
"""Crypto-related routines for oauth2client.""" """Crypto-related routines for oauth2client."""
import base64
import imp import imp
import json import json
import logging import logging
import os import os
import sys
import time import time
import six from oauth2client._helpers import _json_encode
from oauth2client._helpers import _urlsafe_b64decode
from oauth2client._helpers import _urlsafe_b64encode
CLOCK_SKEW_SECS = 300 # 5 minutes in seconds CLOCK_SKEW_SECS = 300 # 5 minutes in seconds
@@ -69,133 +69,9 @@ def _TryOpenSslImport():
try: try:
_TryOpenSslImport() _TryOpenSslImport()
from oauth2client._openssl_crypt import OpenSSLVerifier
class OpenSSLVerifier(object): from oauth2client._openssl_crypt import OpenSSLSigner
"""Verifies the signature on a message.""" from oauth2client._openssl_crypt import pkcs12_key_as_pem
def __init__(self, pubkey):
"""Constructor.
Args:
pubkey, OpenSSL.crypto.PKey, The public key to verify with.
"""
self._pubkey = pubkey
def verify(self, message, signature):
"""Verifies a message against a signature.
Args:
message: string or bytes, The message to verify. If string, will be
encoded to bytes as utf-8.
signature: string or bytes, The signature on the message. If string,
will be encoded to bytes as utf-8.
Returns:
True if message was signed by the private key associated with the public
key that this object was constructed with.
"""
from OpenSSL import crypto
if isinstance(message, six.text_type):
message = message.encode('utf-8')
if isinstance(signature, six.text_type):
signature = signature.encode('utf-8')
try:
crypto.verify(self._pubkey, signature, message, 'sha256')
return True
except crypto.Error:
return False
@staticmethod
def from_string(key_pem, is_x509_cert):
"""Construct a Verified instance from a string.
Args:
key_pem: string, public key in PEM format.
is_x509_cert: bool, True if key_pem is an X509 cert, otherwise it is
expected to be an RSA key in PEM format.
Returns:
Verifier instance.
Raises:
OpenSSL.crypto.Error if the key_pem can't be parsed.
"""
from OpenSSL import crypto
if is_x509_cert:
pubkey = crypto.load_certificate(crypto.FILETYPE_PEM, key_pem)
else:
pubkey = crypto.load_privatekey(crypto.FILETYPE_PEM, key_pem)
return OpenSSLVerifier(pubkey)
class OpenSSLSigner(object):
"""Signs messages with a private key."""
def __init__(self, pkey):
"""Constructor.
Args:
pkey, OpenSSL.crypto.PKey (or equiv), The private key to sign with.
"""
self._key = pkey
def sign(self, message):
"""Signs a message.
Args:
message: bytes, Message to be signed.
Returns:
string, The signature of the message for the given key.
"""
from OpenSSL import crypto
if isinstance(message, six.text_type):
message = message.encode('utf-8')
return crypto.sign(self._key, message, 'sha256')
@staticmethod
def from_string(key, password=b'notasecret'):
"""Construct a Signer instance from a string.
Args:
key: string, private key in PKCS12 or PEM format.
password: string, password for the private key file.
Returns:
Signer instance.
Raises:
OpenSSL.crypto.Error if the key can't be parsed.
"""
from OpenSSL import crypto
parsed_pem_key = _parse_pem_key(key)
if parsed_pem_key:
pkey = crypto.load_privatekey(crypto.FILETYPE_PEM, parsed_pem_key)
else:
if isinstance(password, six.text_type):
password = password.encode('utf-8')
pkey = crypto.load_pkcs12(key, password).get_privatekey()
return OpenSSLSigner(pkey)
def pkcs12_key_as_pem(private_key_text, private_key_password):
"""Convert the contents of a PKCS12 key to PEM using OpenSSL.
Args:
private_key_text: String. Private key.
private_key_password: String. Password for PKCS12.
Returns:
String. PEM contents of ``private_key_text``.
"""
from OpenSSL import crypto
decoded_body = base64.b64decode(private_key_text)
if isinstance(private_key_password, six.string_types):
private_key_password = private_key_password.encode('ascii')
pkcs12 = crypto.load_pkcs12(decoded_body, private_key_password)
return crypto.dump_privatekey(crypto.FILETYPE_PEM,
pkcs12.get_privatekey())
except ImportError: except ImportError:
OpenSSLVerifier = None OpenSSLVerifier = None
OpenSSLSigner = None OpenSSLSigner = None
@@ -204,116 +80,8 @@ except ImportError:
try: try:
from Crypto.PublicKey import RSA from oauth2client._pycrypto_crypt import PyCryptoVerifier
from Crypto.Hash import SHA256 from oauth2client._pycrypto_crypt import PyCryptoSigner
from Crypto.Signature import PKCS1_v1_5
from Crypto.Util.asn1 import DerSequence
class PyCryptoVerifier(object):
"""Verifies the signature on a message."""
def __init__(self, pubkey):
"""Constructor.
Args:
pubkey, OpenSSL.crypto.PKey (or equiv), The public key to verify with.
"""
self._pubkey = pubkey
def verify(self, message, signature):
"""Verifies a message against a signature.
Args:
message: string or bytes, The message to verify. If string, will be
encoded to bytes as utf-8.
signature: string or bytes, The signature on the message.
Returns:
True if message was signed by the private key associated with the public
key that this object was constructed with.
"""
if isinstance(message, six.text_type):
message = message.encode('utf-8')
return PKCS1_v1_5.new(self._pubkey).verify(
SHA256.new(message), signature)
@staticmethod
def from_string(key_pem, is_x509_cert):
"""Construct a Verified instance from a string.
Args:
key_pem: string, public key in PEM format.
is_x509_cert: bool, True if key_pem is an X509 cert, otherwise it is
expected to be an RSA key in PEM format.
Returns:
Verifier instance.
"""
if is_x509_cert:
if isinstance(key_pem, six.text_type):
key_pem = key_pem.encode('ascii')
pemLines = key_pem.replace(b' ', b'').split()
certDer = _urlsafe_b64decode(b''.join(pemLines[1:-1]))
certSeq = DerSequence()
certSeq.decode(certDer)
tbsSeq = DerSequence()
tbsSeq.decode(certSeq[0])
pubkey = RSA.importKey(tbsSeq[6])
else:
pubkey = RSA.importKey(key_pem)
return PyCryptoVerifier(pubkey)
class PyCryptoSigner(object):
"""Signs messages with a private key."""
def __init__(self, pkey):
"""Constructor.
Args:
pkey, OpenSSL.crypto.PKey (or equiv), The private key to sign with.
"""
self._key = pkey
def sign(self, message):
"""Signs a message.
Args:
message: string, Message to be signed.
Returns:
string, The signature of the message for the given key.
"""
if isinstance(message, six.text_type):
message = message.encode('utf-8')
return PKCS1_v1_5.new(self._key).sign(SHA256.new(message))
@staticmethod
def from_string(key, password='notasecret'):
"""Construct a Signer instance from a string.
Args:
key: string, private key in PEM format.
password: string, password for private key file. Unused for PEM files.
Returns:
Signer instance.
Raises:
NotImplementedError if they key isn't in PEM format.
"""
parsed_pem_key = _parse_pem_key(key)
if parsed_pem_key:
pkey = RSA.importKey(parsed_pem_key)
else:
raise NotImplementedError(
'PKCS12 format is not supported by the PyCrypto library. '
'Try converting to a "PEM" '
'(openssl pkcs12 -in xxxxx.p12 -nodes -nocerts > privatekey.pem) '
'or using PyOpenSSL if native code is an option.')
return PyCryptoSigner(pkey)
except ImportError: except ImportError:
PyCryptoVerifier = None PyCryptoVerifier = None
PyCryptoSigner = None PyCryptoSigner = None
@@ -330,41 +98,6 @@ else:
'PyOpenSSL, or PyCrypto 2.6 or later') 'PyOpenSSL, or PyCrypto 2.6 or later')
def _parse_pem_key(raw_key_input):
"""Identify and extract PEM keys.
Determines whether the given key is in the format of PEM key, and extracts
the relevant part of the key if it is.
Args:
raw_key_input: The contents of a private key file (either PEM or PKCS12).
Returns:
string, The actual key if the contents are from a PEM file, or else None.
"""
offset = raw_key_input.find(b'-----BEGIN ')
if offset != -1:
return raw_key_input[offset:]
def _urlsafe_b64encode(raw_bytes):
if isinstance(raw_bytes, six.text_type):
raw_bytes = raw_bytes.encode('utf-8')
return base64.urlsafe_b64encode(raw_bytes).decode('ascii').rstrip('=')
def _urlsafe_b64decode(b64string):
# Guard against unicode strings, which base64 can't handle.
if isinstance(b64string, six.text_type):
b64string = b64string.encode('ascii')
padded = b64string + b'=' * (4 - len(b64string) % 4)
return base64.urlsafe_b64decode(padded)
def _json_encode(data):
return json.dumps(data, separators=(',', ':'))
def make_signed_jwt(signer, payload): def make_signed_jwt(signer, payload):
"""Make a signed JWT. """Make a signed JWT.

View File

@@ -18,7 +18,6 @@ This credentials class is implemented on top of rsa library.
""" """
import base64 import base64
import json
import six import six
import time import time
@@ -28,6 +27,8 @@ import rsa
from oauth2client import GOOGLE_REVOKE_URI from oauth2client import GOOGLE_REVOKE_URI
from oauth2client import GOOGLE_TOKEN_URI from oauth2client import GOOGLE_TOKEN_URI
from oauth2client._helpers import _json_encode
from oauth2client._helpers import _urlsafe_b64encode
from oauth2client import util from oauth2client import util
from oauth2client.client import AssertionCredentials from oauth2client.client import AssertionCredentials
@@ -75,8 +76,9 @@ class _ServiceAccountCredentials(AssertionCredentials):
} }
payload.update(self._kwargs) payload.update(self._kwargs)
assertion_input = (_urlsafe_b64encode(header) + b'.' + first_segment = _urlsafe_b64encode(_json_encode(header)).encode('utf-8')
_urlsafe_b64encode(payload)) second_segment = _urlsafe_b64encode(_json_encode(payload)).encode('utf-8')
assertion_input = first_segment + b'.' + second_segment
# Sign the assertion. # Sign the assertion.
rsa_bytes = rsa.pkcs1.sign(assertion_input, self._private_key, 'SHA-256') rsa_bytes = rsa.pkcs1.sign(assertion_input, self._private_key, 'SHA-256')
@@ -122,11 +124,6 @@ class _ServiceAccountCredentials(AssertionCredentials):
**self._kwargs) **self._kwargs)
def _urlsafe_b64encode(data):
return base64.urlsafe_b64encode(
json.dumps(data, separators=(',', ':')).encode('UTF-8')).rstrip(b'=')
def _get_private_key(private_key_pkcs8_text): def _get_private_key(private_key_pkcs8_text):
"""Get an RSA private key object from a pkcs8 representation.""" """Get an RSA private key object from a pkcs8 representation."""

81
tests/test__helpers.py Normal file
View File

@@ -0,0 +1,81 @@
# Copyright 2015 Google Inc. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Unit tests for oauth2client._helpers."""
import unittest
from oauth2client._helpers import _json_encode
from oauth2client._helpers import _parse_pem_key
from oauth2client._helpers import _urlsafe_b64decode
from oauth2client._helpers import _urlsafe_b64encode
class Test__parse_pem_key(unittest.TestCase):
def test_valid_input(self):
test_string = b'1234-----BEGIN FOO BAR BAZ'
result = _parse_pem_key(test_string)
self.assertEqual(result, test_string[4:])
def test_bad_input(self):
test_string = b'DOES NOT HAVE DASHES'
result = _parse_pem_key(test_string)
self.assertEqual(result, None)
class Test__json_encode(unittest.TestCase):
def test_dictionary_input(self):
# Use only a single key since dictionary hash order
# is non-deterministic.
data = {u'foo': 10}
result = _json_encode(data)
self.assertEqual(result, """{"foo":10}""")
def test_list_input(self):
data = [42, 1337]
result = _json_encode(data)
self.assertEqual(result, """[42,1337]""")
class Test__urlsafe_b64encode(unittest.TestCase):
def test_valid_input_bytes(self):
test_string = b'deadbeef'
result = _urlsafe_b64encode(test_string)
self.assertEqual(result, u'ZGVhZGJlZWY')
def test_valid_input_unicode(self):
test_string = u'deadbeef'
result = _urlsafe_b64encode(test_string)
self.assertEqual(result, u'ZGVhZGJlZWY')
class Test__urlsafe_b64decode(unittest.TestCase):
def test_valid_input_bytes(self):
test_string = b'ZGVhZGJlZWY'
result = _urlsafe_b64decode(test_string)
self.assertEqual(result, b'deadbeef')
def test_valid_input_unicode(self):
test_string = b'ZGVhZGJlZWY'
result = _urlsafe_b64decode(test_string)
self.assertEqual(result, b'deadbeef')
def test_bad_input(self):
import binascii
bad_string = b'+'
self.assertRaises((TypeError, binascii.Error),
_urlsafe_b64decode, bad_string)

View File

@@ -0,0 +1,63 @@
# Copyright 2015 Google Inc. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Unit tests for oauth2client._pycrypto_crypt."""
import os
import unittest
from oauth2client.crypt import PyCryptoSigner
from oauth2client.crypt import PyCryptoVerifier
class TestPyCryptoVerifier(unittest.TestCase):
PUBLIC_KEY_FILENAME = os.path.join(os.path.dirname(__file__),
'data', 'publickey.pem')
PRIVATE_KEY_FILENAME = os.path.join(os.path.dirname(__file__),
'data', 'privatekey.pem')
def _load_public_key_bytes(self):
with open(self.PUBLIC_KEY_FILENAME, 'rb') as fh:
return fh.read()
def _load_private_key_bytes(self):
with open(self.PRIVATE_KEY_FILENAME, 'rb') as fh:
return fh.read()
def test_verify_success(self):
to_sign = b'foo'
signer = PyCryptoSigner.from_string(self._load_private_key_bytes())
actual_signature = signer.sign(to_sign)
verifier = PyCryptoVerifier.from_string(self._load_public_key_bytes(),
is_x509_cert=True)
self.assertTrue(verifier.verify(to_sign, actual_signature))
def test_verify_failure(self):
verifier = PyCryptoVerifier.from_string(self._load_public_key_bytes(),
is_x509_cert=True)
bad_signature = b''
self.assertFalse(verifier.verify(b'foo', bad_signature))
def test_verify_bad_key(self):
verifier = PyCryptoVerifier.from_string(self._load_public_key_bytes(),
is_x509_cert=True)
bad_signature = b''
self.assertFalse(verifier.verify(b'foo', bad_signature))
def test_from_string_unicode_key(self):
public_key = self._load_public_key_bytes()
public_key = public_key.decode('utf-8')
verifier = PyCryptoVerifier.from_string(public_key, is_x509_cert=True)
self.assertTrue(isinstance(verifier, PyCryptoVerifier))

View File

@@ -23,6 +23,7 @@ except NameError:
# For Python3 (though importlib should be used, silly 3.3). # For Python3 (though importlib should be used, silly 3.3).
from imp import reload from imp import reload
from oauth2client import _helpers
from oauth2client.client import HAS_OPENSSL from oauth2client.client import HAS_OPENSSL
from oauth2client.client import SignedJwtAssertionCredentials from oauth2client.client import SignedJwtAssertionCredentials
from oauth2client import crypt from oauth2client import crypt
@@ -46,17 +47,25 @@ class Test_pkcs12_key_as_pem(unittest.TestCase):
scope='read+write', scope='read+write',
sub='joe@example.org') sub='joe@example.org')
def test_succeeds(self): def _succeeds_helper(self, password=None):
self.assertEqual(True, HAS_OPENSSL) self.assertEqual(True, HAS_OPENSSL)
credentials = self._make_signed_jwt_creds() credentials = self._make_signed_jwt_creds()
pem_contents = crypt.pkcs12_key_as_pem(credentials.private_key, if password is None:
credentials.private_key_password) password = credentials.private_key_password
pem_contents = crypt.pkcs12_key_as_pem(credentials.private_key, password)
pkcs12_key_as_pem = datafile('pem_from_pkcs12.pem') pkcs12_key_as_pem = datafile('pem_from_pkcs12.pem')
pkcs12_key_as_pem = crypt._parse_pem_key(pkcs12_key_as_pem) pkcs12_key_as_pem = _helpers._parse_pem_key(pkcs12_key_as_pem)
alternate_pem = datafile('pem_from_pkcs12_alternate.pem') alternate_pem = datafile('pem_from_pkcs12_alternate.pem')
self.assertTrue(pem_contents in [pkcs12_key_as_pem, alternate_pem]) self.assertTrue(pem_contents in [pkcs12_key_as_pem, alternate_pem])
def test_succeeds(self):
self._succeeds_helper()
def test_succeeds_with_unicode_password(self):
password = u'notasecret'
self._succeeds_helper(password)
def test_without_openssl(self): def test_without_openssl(self):
import imp import imp
imp_find_module = imp.find_module imp_find_module = imp.find_module

View File

@@ -67,7 +67,10 @@ class CryptTests(unittest.TestCase):
private_key = datafile(private_key_file) private_key = datafile(private_key_file)
public_key = datafile('publickey.pem') public_key = datafile('publickey.pem')
signer = self.signer.from_string(private_key) # We pass in a non-bytes password to make sure all branches
# are traversed in tests.
signer = self.signer.from_string(private_key,
password=u'notasecret')
signature = signer.sign('foo') signature = signer.sign('foo')
verifier = self.verifier.from_string(public_key, True) verifier = self.verifier.from_string(public_key, True)
@@ -188,6 +191,13 @@ class CryptTests(unittest.TestCase):
}) })
self._check_jwt_failure(jwt, 'Wrong recipient') self._check_jwt_failure(jwt, 'Wrong recipient')
def test_from_string_non_509_cert(self):
# Use a private key instead of a certificate to test the other branch
# of from_string().
public_key = datafile('privatekey.pem')
verifier = self.verifier.from_string(public_key, is_x509_cert=False)
self.assertTrue(isinstance(verifier, self.verifier))
class PEMCryptTestsPyCrypto(CryptTests): class PEMCryptTestsPyCrypto(CryptTests):