Merge pull request #215 from dhermes/separate-crypt

Factoring out conditional code from `crypt.py`.
This commit is contained in:
Nathaniel Manista
2015-07-17 09:08:50 -07:00
10 changed files with 511 additions and 297 deletions

53
oauth2client/_helpers.py Normal file
View File

@@ -0,0 +1,53 @@
# Copyright 2015 Google Inc. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Helper functions for commonly used utilities."""
import base64
import json
import six
def _parse_pem_key(raw_key_input):
"""Identify and extract PEM keys.
Determines whether the given key is in the format of PEM key, and extracts
the relevant part of the key if it is.
Args:
raw_key_input: The contents of a private key file (either PEM or PKCS12).
Returns:
string, The actual key if the contents are from a PEM file, or else None.
"""
offset = raw_key_input.find(b'-----BEGIN ')
if offset != -1:
return raw_key_input[offset:]
def _json_encode(data):
return json.dumps(data, separators=(',', ':'))
def _urlsafe_b64encode(raw_bytes):
if isinstance(raw_bytes, six.text_type):
raw_bytes = raw_bytes.encode('utf-8')
return base64.urlsafe_b64encode(raw_bytes).decode('ascii').rstrip('=')
def _urlsafe_b64decode(b64string):
# Guard against unicode strings, which base64 can't handle.
if isinstance(b64string, six.text_type):
b64string = b64string.encode('ascii')
padded = b64string + b'=' * (4 - len(b64string) % 4)
return base64.urlsafe_b64decode(padded)

View File

@@ -0,0 +1,147 @@
# Copyright 2015 Google Inc. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""OpenSSL Crypto-related routines for oauth2client."""
import base64
import six
from oauth2client._helpers import _parse_pem_key
class OpenSSLVerifier(object):
"""Verifies the signature on a message."""
def __init__(self, pubkey):
"""Constructor.
Args:
pubkey, OpenSSL.crypto.PKey, The public key to verify with.
"""
self._pubkey = pubkey
def verify(self, message, signature):
"""Verifies a message against a signature.
Args:
message: string or bytes, The message to verify. If string, will be
encoded to bytes as utf-8.
signature: string or bytes, The signature on the message. If string,
will be encoded to bytes as utf-8.
Returns:
True if message was signed by the private key associated with the public
key that this object was constructed with.
"""
from OpenSSL import crypto # Delay import due to 0.5s import time.
if isinstance(message, six.text_type):
message = message.encode('utf-8')
if isinstance(signature, six.text_type):
signature = signature.encode('utf-8')
try:
crypto.verify(self._pubkey, signature, message, 'sha256')
return True
except crypto.Error:
return False
@staticmethod
def from_string(key_pem, is_x509_cert):
"""Construct a Verified instance from a string.
Args:
key_pem: string, public key in PEM format.
is_x509_cert: bool, True if key_pem is an X509 cert, otherwise it is
expected to be an RSA key in PEM format.
Returns:
Verifier instance.
Raises:
OpenSSL.crypto.Error if the key_pem can't be parsed.
"""
from OpenSSL import crypto # Delay import due to 0.5s import time.
if is_x509_cert:
pubkey = crypto.load_certificate(crypto.FILETYPE_PEM, key_pem)
else:
pubkey = crypto.load_privatekey(crypto.FILETYPE_PEM, key_pem)
return OpenSSLVerifier(pubkey)
class OpenSSLSigner(object):
"""Signs messages with a private key."""
def __init__(self, pkey):
"""Constructor.
Args:
pkey, OpenSSL.crypto.PKey (or equiv), The private key to sign with.
"""
self._key = pkey
def sign(self, message):
"""Signs a message.
Args:
message: bytes, Message to be signed.
Returns:
string, The signature of the message for the given key.
"""
from OpenSSL import crypto # Delay import due to 0.5s import time.
if isinstance(message, six.text_type):
message = message.encode('utf-8')
return crypto.sign(self._key, message, 'sha256')
@staticmethod
def from_string(key, password=b'notasecret'):
"""Construct a Signer instance from a string.
Args:
key: string, private key in PKCS12 or PEM format.
password: string, password for the private key file.
Returns:
Signer instance.
Raises:
OpenSSL.crypto.Error if the key can't be parsed.
"""
from OpenSSL import crypto # Delay import due to 0.5s import time.
parsed_pem_key = _parse_pem_key(key)
if parsed_pem_key:
pkey = crypto.load_privatekey(crypto.FILETYPE_PEM, parsed_pem_key)
else:
if isinstance(password, six.text_type):
password = password.encode('utf-8')
pkey = crypto.load_pkcs12(key, password).get_privatekey()
return OpenSSLSigner(pkey)
def pkcs12_key_as_pem(private_key_text, private_key_password):
"""Convert the contents of a PKCS12 key to PEM using OpenSSL.
Args:
private_key_text: String. Private key.
private_key_password: String. Password for PKCS12.
Returns:
String. PEM contents of ``private_key_text``.
"""
from OpenSSL import crypto # Delay import due to 0.5s import time.
decoded_body = base64.b64decode(private_key_text)
if isinstance(private_key_password, six.text_type):
private_key_password = private_key_password.encode('ascii')
pkcs12 = crypto.load_pkcs12(decoded_body, private_key_password)
return crypto.dump_privatekey(crypto.FILETYPE_PEM,
pkcs12.get_privatekey())

View File

@@ -0,0 +1,128 @@
# Copyright 2015 Google Inc. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""pyCrypto Crypto-related routines for oauth2client."""
from Crypto.PublicKey import RSA
from Crypto.Hash import SHA256
from Crypto.Signature import PKCS1_v1_5
from Crypto.Util.asn1 import DerSequence
import six
from oauth2client._helpers import _parse_pem_key
from oauth2client._helpers import _urlsafe_b64decode
class PyCryptoVerifier(object):
"""Verifies the signature on a message."""
def __init__(self, pubkey):
"""Constructor.
Args:
pubkey, OpenSSL.crypto.PKey (or equiv), The public key to verify with.
"""
self._pubkey = pubkey
def verify(self, message, signature):
"""Verifies a message against a signature.
Args:
message: string or bytes, The message to verify. If string, will be
encoded to bytes as utf-8.
signature: string or bytes, The signature on the message.
Returns:
True if message was signed by the private key associated with the public
key that this object was constructed with.
"""
if isinstance(message, six.text_type):
message = message.encode('utf-8')
return PKCS1_v1_5.new(self._pubkey).verify(
SHA256.new(message), signature)
@staticmethod
def from_string(key_pem, is_x509_cert):
"""Construct a Verified instance from a string.
Args:
key_pem: string, public key in PEM format.
is_x509_cert: bool, True if key_pem is an X509 cert, otherwise it is
expected to be an RSA key in PEM format.
Returns:
Verifier instance.
"""
if is_x509_cert:
if isinstance(key_pem, six.text_type):
key_pem = key_pem.encode('ascii')
pemLines = key_pem.replace(b' ', b'').split()
certDer = _urlsafe_b64decode(b''.join(pemLines[1:-1]))
certSeq = DerSequence()
certSeq.decode(certDer)
tbsSeq = DerSequence()
tbsSeq.decode(certSeq[0])
pubkey = RSA.importKey(tbsSeq[6])
else:
pubkey = RSA.importKey(key_pem)
return PyCryptoVerifier(pubkey)
class PyCryptoSigner(object):
"""Signs messages with a private key."""
def __init__(self, pkey):
"""Constructor.
Args:
pkey, OpenSSL.crypto.PKey (or equiv), The private key to sign with.
"""
self._key = pkey
def sign(self, message):
"""Signs a message.
Args:
message: string, Message to be signed.
Returns:
string, The signature of the message for the given key.
"""
if isinstance(message, six.text_type):
message = message.encode('utf-8')
return PKCS1_v1_5.new(self._key).sign(SHA256.new(message))
@staticmethod
def from_string(key, password='notasecret'):
"""Construct a Signer instance from a string.
Args:
key: string, private key in PEM format.
password: string, password for private key file. Unused for PEM files.
Returns:
Signer instance.
Raises:
NotImplementedError if the key isn't in PEM format.
"""
parsed_pem_key = _parse_pem_key(key)
if parsed_pem_key:
pkey = RSA.importKey(parsed_pem_key)
else:
raise NotImplementedError(
'PKCS12 format is not supported by the PyCrypto library. '
'Try converting to a "PEM" '
'(openssl pkcs12 -in xxxxx.p12 -nodes -nocerts > privatekey.pem) '
'or using PyOpenSSL if native code is an option.')
return PyCryptoSigner(pkey)

View File

@@ -35,11 +35,12 @@ import six
from six.moves import urllib
import httplib2
from oauth2client import clientsecrets
from oauth2client import GOOGLE_AUTH_URI
from oauth2client import GOOGLE_DEVICE_URI
from oauth2client import GOOGLE_REVOKE_URI
from oauth2client import GOOGLE_TOKEN_URI
from oauth2client._helpers import _urlsafe_b64decode
from oauth2client import clientsecrets
from oauth2client import util
HAS_OPENSSL = False
@@ -1591,14 +1592,6 @@ def verify_id_token(id_token, audience, http=None,
raise VerifyJwtTokenError('Status code: %d' % resp.status)
def _urlsafe_b64decode(b64string):
# Guard against unicode strings, which base64 can't handle.
if isinstance(b64string, six.text_type):
b64string = b64string.encode('ascii')
padded = b64string + b'=' * (4 - len(b64string) % 4)
return base64.urlsafe_b64decode(padded)
def _extract_id_token(id_token):
"""Extract the JSON payload from a JWT.

View File

@@ -15,15 +15,15 @@
# limitations under the License.
"""Crypto-related routines for oauth2client."""
import base64
import imp
import json
import logging
import os
import sys
import time
import six
from oauth2client._helpers import _json_encode
from oauth2client._helpers import _urlsafe_b64decode
from oauth2client._helpers import _urlsafe_b64encode
CLOCK_SKEW_SECS = 300 # 5 minutes in seconds
@@ -69,133 +69,9 @@ def _TryOpenSslImport():
try:
_TryOpenSslImport()
class OpenSSLVerifier(object):
"""Verifies the signature on a message."""
def __init__(self, pubkey):
"""Constructor.
Args:
pubkey, OpenSSL.crypto.PKey, The public key to verify with.
"""
self._pubkey = pubkey
def verify(self, message, signature):
"""Verifies a message against a signature.
Args:
message: string or bytes, The message to verify. If string, will be
encoded to bytes as utf-8.
signature: string or bytes, The signature on the message. If string,
will be encoded to bytes as utf-8.
Returns:
True if message was signed by the private key associated with the public
key that this object was constructed with.
"""
from OpenSSL import crypto
if isinstance(message, six.text_type):
message = message.encode('utf-8')
if isinstance(signature, six.text_type):
signature = signature.encode('utf-8')
try:
crypto.verify(self._pubkey, signature, message, 'sha256')
return True
except crypto.Error:
return False
@staticmethod
def from_string(key_pem, is_x509_cert):
"""Construct a Verified instance from a string.
Args:
key_pem: string, public key in PEM format.
is_x509_cert: bool, True if key_pem is an X509 cert, otherwise it is
expected to be an RSA key in PEM format.
Returns:
Verifier instance.
Raises:
OpenSSL.crypto.Error if the key_pem can't be parsed.
"""
from OpenSSL import crypto
if is_x509_cert:
pubkey = crypto.load_certificate(crypto.FILETYPE_PEM, key_pem)
else:
pubkey = crypto.load_privatekey(crypto.FILETYPE_PEM, key_pem)
return OpenSSLVerifier(pubkey)
class OpenSSLSigner(object):
"""Signs messages with a private key."""
def __init__(self, pkey):
"""Constructor.
Args:
pkey, OpenSSL.crypto.PKey (or equiv), The private key to sign with.
"""
self._key = pkey
def sign(self, message):
"""Signs a message.
Args:
message: bytes, Message to be signed.
Returns:
string, The signature of the message for the given key.
"""
from OpenSSL import crypto
if isinstance(message, six.text_type):
message = message.encode('utf-8')
return crypto.sign(self._key, message, 'sha256')
@staticmethod
def from_string(key, password=b'notasecret'):
"""Construct a Signer instance from a string.
Args:
key: string, private key in PKCS12 or PEM format.
password: string, password for the private key file.
Returns:
Signer instance.
Raises:
OpenSSL.crypto.Error if the key can't be parsed.
"""
from OpenSSL import crypto
parsed_pem_key = _parse_pem_key(key)
if parsed_pem_key:
pkey = crypto.load_privatekey(crypto.FILETYPE_PEM, parsed_pem_key)
else:
if isinstance(password, six.text_type):
password = password.encode('utf-8')
pkey = crypto.load_pkcs12(key, password).get_privatekey()
return OpenSSLSigner(pkey)
def pkcs12_key_as_pem(private_key_text, private_key_password):
"""Convert the contents of a PKCS12 key to PEM using OpenSSL.
Args:
private_key_text: String. Private key.
private_key_password: String. Password for PKCS12.
Returns:
String. PEM contents of ``private_key_text``.
"""
from OpenSSL import crypto
decoded_body = base64.b64decode(private_key_text)
if isinstance(private_key_password, six.string_types):
private_key_password = private_key_password.encode('ascii')
pkcs12 = crypto.load_pkcs12(decoded_body, private_key_password)
return crypto.dump_privatekey(crypto.FILETYPE_PEM,
pkcs12.get_privatekey())
from oauth2client._openssl_crypt import OpenSSLVerifier
from oauth2client._openssl_crypt import OpenSSLSigner
from oauth2client._openssl_crypt import pkcs12_key_as_pem
except ImportError:
OpenSSLVerifier = None
OpenSSLSigner = None
@@ -204,116 +80,8 @@ except ImportError:
try:
from Crypto.PublicKey import RSA
from Crypto.Hash import SHA256
from Crypto.Signature import PKCS1_v1_5
from Crypto.Util.asn1 import DerSequence
class PyCryptoVerifier(object):
"""Verifies the signature on a message."""
def __init__(self, pubkey):
"""Constructor.
Args:
pubkey, OpenSSL.crypto.PKey (or equiv), The public key to verify with.
"""
self._pubkey = pubkey
def verify(self, message, signature):
"""Verifies a message against a signature.
Args:
message: string or bytes, The message to verify. If string, will be
encoded to bytes as utf-8.
signature: string or bytes, The signature on the message.
Returns:
True if message was signed by the private key associated with the public
key that this object was constructed with.
"""
if isinstance(message, six.text_type):
message = message.encode('utf-8')
return PKCS1_v1_5.new(self._pubkey).verify(
SHA256.new(message), signature)
@staticmethod
def from_string(key_pem, is_x509_cert):
"""Construct a Verified instance from a string.
Args:
key_pem: string, public key in PEM format.
is_x509_cert: bool, True if key_pem is an X509 cert, otherwise it is
expected to be an RSA key in PEM format.
Returns:
Verifier instance.
"""
if is_x509_cert:
if isinstance(key_pem, six.text_type):
key_pem = key_pem.encode('ascii')
pemLines = key_pem.replace(b' ', b'').split()
certDer = _urlsafe_b64decode(b''.join(pemLines[1:-1]))
certSeq = DerSequence()
certSeq.decode(certDer)
tbsSeq = DerSequence()
tbsSeq.decode(certSeq[0])
pubkey = RSA.importKey(tbsSeq[6])
else:
pubkey = RSA.importKey(key_pem)
return PyCryptoVerifier(pubkey)
class PyCryptoSigner(object):
"""Signs messages with a private key."""
def __init__(self, pkey):
"""Constructor.
Args:
pkey, OpenSSL.crypto.PKey (or equiv), The private key to sign with.
"""
self._key = pkey
def sign(self, message):
"""Signs a message.
Args:
message: string, Message to be signed.
Returns:
string, The signature of the message for the given key.
"""
if isinstance(message, six.text_type):
message = message.encode('utf-8')
return PKCS1_v1_5.new(self._key).sign(SHA256.new(message))
@staticmethod
def from_string(key, password='notasecret'):
"""Construct a Signer instance from a string.
Args:
key: string, private key in PEM format.
password: string, password for private key file. Unused for PEM files.
Returns:
Signer instance.
Raises:
NotImplementedError if they key isn't in PEM format.
"""
parsed_pem_key = _parse_pem_key(key)
if parsed_pem_key:
pkey = RSA.importKey(parsed_pem_key)
else:
raise NotImplementedError(
'PKCS12 format is not supported by the PyCrypto library. '
'Try converting to a "PEM" '
'(openssl pkcs12 -in xxxxx.p12 -nodes -nocerts > privatekey.pem) '
'or using PyOpenSSL if native code is an option.')
return PyCryptoSigner(pkey)
from oauth2client._pycrypto_crypt import PyCryptoVerifier
from oauth2client._pycrypto_crypt import PyCryptoSigner
except ImportError:
PyCryptoVerifier = None
PyCryptoSigner = None
@@ -330,41 +98,6 @@ else:
'PyOpenSSL, or PyCrypto 2.6 or later')
def _parse_pem_key(raw_key_input):
"""Identify and extract PEM keys.
Determines whether the given key is in the format of PEM key, and extracts
the relevant part of the key if it is.
Args:
raw_key_input: The contents of a private key file (either PEM or PKCS12).
Returns:
string, The actual key if the contents are from a PEM file, or else None.
"""
offset = raw_key_input.find(b'-----BEGIN ')
if offset != -1:
return raw_key_input[offset:]
def _urlsafe_b64encode(raw_bytes):
if isinstance(raw_bytes, six.text_type):
raw_bytes = raw_bytes.encode('utf-8')
return base64.urlsafe_b64encode(raw_bytes).decode('ascii').rstrip('=')
def _urlsafe_b64decode(b64string):
# Guard against unicode strings, which base64 can't handle.
if isinstance(b64string, six.text_type):
b64string = b64string.encode('ascii')
padded = b64string + b'=' * (4 - len(b64string) % 4)
return base64.urlsafe_b64decode(padded)
def _json_encode(data):
return json.dumps(data, separators=(',', ':'))
def make_signed_jwt(signer, payload):
"""Make a signed JWT.

View File

@@ -18,7 +18,6 @@ This credentials class is implemented on top of rsa library.
"""
import base64
import json
import six
import time
@@ -28,6 +27,8 @@ import rsa
from oauth2client import GOOGLE_REVOKE_URI
from oauth2client import GOOGLE_TOKEN_URI
from oauth2client._helpers import _json_encode
from oauth2client._helpers import _urlsafe_b64encode
from oauth2client import util
from oauth2client.client import AssertionCredentials
@@ -75,8 +76,9 @@ class _ServiceAccountCredentials(AssertionCredentials):
}
payload.update(self._kwargs)
assertion_input = (_urlsafe_b64encode(header) + b'.' +
_urlsafe_b64encode(payload))
first_segment = _urlsafe_b64encode(_json_encode(header)).encode('utf-8')
second_segment = _urlsafe_b64encode(_json_encode(payload)).encode('utf-8')
assertion_input = first_segment + b'.' + second_segment
# Sign the assertion.
rsa_bytes = rsa.pkcs1.sign(assertion_input, self._private_key, 'SHA-256')
@@ -122,11 +124,6 @@ class _ServiceAccountCredentials(AssertionCredentials):
**self._kwargs)
def _urlsafe_b64encode(data):
return base64.urlsafe_b64encode(
json.dumps(data, separators=(',', ':')).encode('UTF-8')).rstrip(b'=')
def _get_private_key(private_key_pkcs8_text):
"""Get an RSA private key object from a pkcs8 representation."""

81
tests/test__helpers.py Normal file
View File

@@ -0,0 +1,81 @@
# Copyright 2015 Google Inc. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Unit tests for oauth2client._helpers."""
import unittest
from oauth2client._helpers import _json_encode
from oauth2client._helpers import _parse_pem_key
from oauth2client._helpers import _urlsafe_b64decode
from oauth2client._helpers import _urlsafe_b64encode
class Test__parse_pem_key(unittest.TestCase):
def test_valid_input(self):
test_string = b'1234-----BEGIN FOO BAR BAZ'
result = _parse_pem_key(test_string)
self.assertEqual(result, test_string[4:])
def test_bad_input(self):
test_string = b'DOES NOT HAVE DASHES'
result = _parse_pem_key(test_string)
self.assertEqual(result, None)
class Test__json_encode(unittest.TestCase):
def test_dictionary_input(self):
# Use only a single key since dictionary hash order
# is non-deterministic.
data = {u'foo': 10}
result = _json_encode(data)
self.assertEqual(result, """{"foo":10}""")
def test_list_input(self):
data = [42, 1337]
result = _json_encode(data)
self.assertEqual(result, """[42,1337]""")
class Test__urlsafe_b64encode(unittest.TestCase):
def test_valid_input_bytes(self):
test_string = b'deadbeef'
result = _urlsafe_b64encode(test_string)
self.assertEqual(result, u'ZGVhZGJlZWY')
def test_valid_input_unicode(self):
test_string = u'deadbeef'
result = _urlsafe_b64encode(test_string)
self.assertEqual(result, u'ZGVhZGJlZWY')
class Test__urlsafe_b64decode(unittest.TestCase):
def test_valid_input_bytes(self):
test_string = b'ZGVhZGJlZWY'
result = _urlsafe_b64decode(test_string)
self.assertEqual(result, b'deadbeef')
def test_valid_input_unicode(self):
test_string = b'ZGVhZGJlZWY'
result = _urlsafe_b64decode(test_string)
self.assertEqual(result, b'deadbeef')
def test_bad_input(self):
import binascii
bad_string = b'+'
self.assertRaises((TypeError, binascii.Error),
_urlsafe_b64decode, bad_string)

View File

@@ -0,0 +1,63 @@
# Copyright 2015 Google Inc. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Unit tests for oauth2client._pycrypto_crypt."""
import os
import unittest
from oauth2client.crypt import PyCryptoSigner
from oauth2client.crypt import PyCryptoVerifier
class TestPyCryptoVerifier(unittest.TestCase):
PUBLIC_KEY_FILENAME = os.path.join(os.path.dirname(__file__),
'data', 'publickey.pem')
PRIVATE_KEY_FILENAME = os.path.join(os.path.dirname(__file__),
'data', 'privatekey.pem')
def _load_public_key_bytes(self):
with open(self.PUBLIC_KEY_FILENAME, 'rb') as fh:
return fh.read()
def _load_private_key_bytes(self):
with open(self.PRIVATE_KEY_FILENAME, 'rb') as fh:
return fh.read()
def test_verify_success(self):
to_sign = b'foo'
signer = PyCryptoSigner.from_string(self._load_private_key_bytes())
actual_signature = signer.sign(to_sign)
verifier = PyCryptoVerifier.from_string(self._load_public_key_bytes(),
is_x509_cert=True)
self.assertTrue(verifier.verify(to_sign, actual_signature))
def test_verify_failure(self):
verifier = PyCryptoVerifier.from_string(self._load_public_key_bytes(),
is_x509_cert=True)
bad_signature = b''
self.assertFalse(verifier.verify(b'foo', bad_signature))
def test_verify_bad_key(self):
verifier = PyCryptoVerifier.from_string(self._load_public_key_bytes(),
is_x509_cert=True)
bad_signature = b''
self.assertFalse(verifier.verify(b'foo', bad_signature))
def test_from_string_unicode_key(self):
public_key = self._load_public_key_bytes()
public_key = public_key.decode('utf-8')
verifier = PyCryptoVerifier.from_string(public_key, is_x509_cert=True)
self.assertTrue(isinstance(verifier, PyCryptoVerifier))

View File

@@ -23,6 +23,7 @@ except NameError:
# For Python3 (though importlib should be used, silly 3.3).
from imp import reload
from oauth2client import _helpers
from oauth2client.client import HAS_OPENSSL
from oauth2client.client import SignedJwtAssertionCredentials
from oauth2client import crypt
@@ -46,17 +47,25 @@ class Test_pkcs12_key_as_pem(unittest.TestCase):
scope='read+write',
sub='joe@example.org')
def test_succeeds(self):
def _succeeds_helper(self, password=None):
self.assertEqual(True, HAS_OPENSSL)
credentials = self._make_signed_jwt_creds()
pem_contents = crypt.pkcs12_key_as_pem(credentials.private_key,
credentials.private_key_password)
if password is None:
password = credentials.private_key_password
pem_contents = crypt.pkcs12_key_as_pem(credentials.private_key, password)
pkcs12_key_as_pem = datafile('pem_from_pkcs12.pem')
pkcs12_key_as_pem = crypt._parse_pem_key(pkcs12_key_as_pem)
pkcs12_key_as_pem = _helpers._parse_pem_key(pkcs12_key_as_pem)
alternate_pem = datafile('pem_from_pkcs12_alternate.pem')
self.assertTrue(pem_contents in [pkcs12_key_as_pem, alternate_pem])
def test_succeeds(self):
self._succeeds_helper()
def test_succeeds_with_unicode_password(self):
password = u'notasecret'
self._succeeds_helper(password)
def test_without_openssl(self):
import imp
imp_find_module = imp.find_module

View File

@@ -67,7 +67,10 @@ class CryptTests(unittest.TestCase):
private_key = datafile(private_key_file)
public_key = datafile('publickey.pem')
signer = self.signer.from_string(private_key)
# We pass in a non-bytes password to make sure all branches
# are traversed in tests.
signer = self.signer.from_string(private_key,
password=u'notasecret')
signature = signer.sign('foo')
verifier = self.verifier.from_string(public_key, True)
@@ -188,6 +191,13 @@ class CryptTests(unittest.TestCase):
})
self._check_jwt_failure(jwt, 'Wrong recipient')
def test_from_string_non_509_cert(self):
# Use a private key instead of a certificate to test the other branch
# of from_string().
public_key = datafile('privatekey.pem')
verifier = self.verifier.from_string(public_key, is_x509_cert=False)
self.assertTrue(isinstance(verifier, self.verifier))
class PEMCryptTestsPyCrypto(CryptTests):