@@ -34,12 +34,10 @@ import urlparse
|
|||||||
from oauth2client import util
|
from oauth2client import util
|
||||||
from oauth2client.anyjson import simplejson
|
from oauth2client.anyjson import simplejson
|
||||||
|
|
||||||
HAS_OPENSSL = False
|
HAS_CRYPTO = False
|
||||||
try:
|
try:
|
||||||
from oauth2client.crypt import Signer
|
from oauth2client import crypt
|
||||||
from oauth2client.crypt import make_signed_jwt
|
HAS_CRYPTO = True
|
||||||
from oauth2client.crypt import verify_signed_jwt_with_certs
|
|
||||||
HAS_OPENSSL = True
|
|
||||||
except ImportError:
|
except ImportError:
|
||||||
pass
|
pass
|
||||||
|
|
||||||
@@ -769,10 +767,10 @@ class AssertionCredentials(OAuth2Credentials):
|
|||||||
"""
|
"""
|
||||||
_abstract()
|
_abstract()
|
||||||
|
|
||||||
if HAS_OPENSSL:
|
if HAS_CRYPTO:
|
||||||
# PyOpenSSL is not a prerequisite for oauth2client, so if it is missing then
|
# PyOpenSSL and PyCrypto are not prerequisites for oauth2client, so if it is
|
||||||
# don't create the SignedJwtAssertionCredentials or the verify_id_token()
|
# missing then don't create the SignedJwtAssertionCredentials or the
|
||||||
# method.
|
# verify_id_token() method.
|
||||||
|
|
||||||
class SignedJwtAssertionCredentials(AssertionCredentials):
|
class SignedJwtAssertionCredentials(AssertionCredentials):
|
||||||
"""Credentials object used for OAuth 2.0 Signed JWT assertion grants.
|
"""Credentials object used for OAuth 2.0 Signed JWT assertion grants.
|
||||||
@@ -781,9 +779,8 @@ if HAS_OPENSSL:
|
|||||||
a two legged flow, and therefore has all of the required information to
|
a two legged flow, and therefore has all of the required information to
|
||||||
generate and refresh its own access tokens.
|
generate and refresh its own access tokens.
|
||||||
|
|
||||||
SignedJwtAssertionCredentials requires PyOpenSSL and because of that it does
|
SignedJwtAssertionCredentials requires either PyOpenSSL, or PyCrypto 2.6 or
|
||||||
not work on App Engine. For App Engine you may consider using
|
later. For App Engine you may also consider using AppAssertionCredentials.
|
||||||
AppAssertionCredentials.
|
|
||||||
"""
|
"""
|
||||||
|
|
||||||
MAX_TOKEN_LIFETIME_SECS = 3600 # 1 hour in seconds
|
MAX_TOKEN_LIFETIME_SECS = 3600 # 1 hour in seconds
|
||||||
@@ -801,10 +798,11 @@ if HAS_OPENSSL:
|
|||||||
|
|
||||||
Args:
|
Args:
|
||||||
service_account_name: string, id for account, usually an email address.
|
service_account_name: string, id for account, usually an email address.
|
||||||
private_key: string, private key in P12 format.
|
private_key: string, private key in PKCS12 or PEM format.
|
||||||
scope: string or iterable of strings, scope(s) of the credentials being
|
scope: string or iterable of strings, scope(s) of the credentials being
|
||||||
requested.
|
requested.
|
||||||
private_key_password: string, password for private_key.
|
private_key_password: string, password for private_key, unused if
|
||||||
|
private_key is in PEM format.
|
||||||
user_agent: string, HTTP User-Agent to provide for this application.
|
user_agent: string, HTTP User-Agent to provide for this application.
|
||||||
token_uri: string, URI for token endpoint. For convenience
|
token_uri: string, URI for token endpoint. For convenience
|
||||||
defaults to Google's endpoints but any OAuth 2.0 provider can be used.
|
defaults to Google's endpoints but any OAuth 2.0 provider can be used.
|
||||||
@@ -856,8 +854,8 @@ if HAS_OPENSSL:
|
|||||||
logger.debug(str(payload))
|
logger.debug(str(payload))
|
||||||
|
|
||||||
private_key = base64.b64decode(self.private_key)
|
private_key = base64.b64decode(self.private_key)
|
||||||
return make_signed_jwt(
|
return crypt.make_signed_jwt(crypt.Signer.from_string(
|
||||||
Signer.from_string(private_key, self.private_key_password), payload)
|
private_key, self.private_key_password), payload)
|
||||||
|
|
||||||
# Only used in verify_id_token(), which is always calling to the same URI
|
# Only used in verify_id_token(), which is always calling to the same URI
|
||||||
# for the certs.
|
# for the certs.
|
||||||
@@ -869,7 +867,7 @@ if HAS_OPENSSL:
|
|||||||
"""Verifies a signed JWT id_token.
|
"""Verifies a signed JWT id_token.
|
||||||
|
|
||||||
This function requires PyOpenSSL and because of that it does not work on
|
This function requires PyOpenSSL and because of that it does not work on
|
||||||
App Engine. For App Engine you may consider using AppAssertionCredentials.
|
App Engine.
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
id_token: string, A Signed JWT.
|
id_token: string, A Signed JWT.
|
||||||
@@ -892,7 +890,7 @@ if HAS_OPENSSL:
|
|||||||
|
|
||||||
if resp.status == 200:
|
if resp.status == 200:
|
||||||
certs = simplejson.loads(content)
|
certs = simplejson.loads(content)
|
||||||
return verify_signed_jwt_with_certs(id_token, certs, audience)
|
return crypt.verify_signed_jwt_with_certs(id_token, certs, audience)
|
||||||
else:
|
else:
|
||||||
raise VerifyJwtTokenError('Status code: %d' % resp.status)
|
raise VerifyJwtTokenError('Status code: %d' % resp.status)
|
||||||
|
|
||||||
|
|||||||
@@ -20,109 +20,240 @@ import hashlib
|
|||||||
import logging
|
import logging
|
||||||
import time
|
import time
|
||||||
|
|
||||||
from OpenSSL import crypto
|
|
||||||
from anyjson import simplejson
|
from anyjson import simplejson
|
||||||
|
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
|
||||||
|
|
||||||
CLOCK_SKEW_SECS = 300 # 5 minutes in seconds
|
CLOCK_SKEW_SECS = 300 # 5 minutes in seconds
|
||||||
AUTH_TOKEN_LIFETIME_SECS = 300 # 5 minutes in seconds
|
AUTH_TOKEN_LIFETIME_SECS = 300 # 5 minutes in seconds
|
||||||
MAX_TOKEN_LIFETIME_SECS = 86400 # 1 day in seconds
|
MAX_TOKEN_LIFETIME_SECS = 86400 # 1 day in seconds
|
||||||
|
|
||||||
|
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
class AppIdentityError(Exception):
|
class AppIdentityError(Exception):
|
||||||
pass
|
pass
|
||||||
|
|
||||||
|
|
||||||
class Verifier(object):
|
try:
|
||||||
"""Verifies the signature on a message."""
|
from OpenSSL import crypto
|
||||||
|
|
||||||
def __init__(self, pubkey):
|
|
||||||
"""Constructor.
|
|
||||||
|
|
||||||
Args:
|
|
||||||
pubkey, OpenSSL.crypto.PKey, The public key to verify with.
|
|
||||||
"""
|
|
||||||
self._pubkey = pubkey
|
|
||||||
|
|
||||||
def verify(self, message, signature):
|
|
||||||
"""Verifies a message against a signature.
|
|
||||||
|
|
||||||
Args:
|
|
||||||
message: string, The message to verify.
|
|
||||||
signature: string, The signature on the message.
|
|
||||||
|
|
||||||
Returns:
|
|
||||||
True if message was singed by the private key associated with the public
|
|
||||||
key that this object was constructed with.
|
|
||||||
"""
|
|
||||||
try:
|
|
||||||
crypto.verify(self._pubkey, signature, message, 'sha256')
|
|
||||||
return True
|
|
||||||
except:
|
|
||||||
return False
|
|
||||||
|
|
||||||
@staticmethod
|
|
||||||
def from_string(key_pem, is_x509_cert):
|
|
||||||
"""Construct a Verified instance from a string.
|
|
||||||
|
|
||||||
Args:
|
|
||||||
key_pem: string, public key in PEM format.
|
|
||||||
is_x509_cert: bool, True if key_pem is an X509 cert, otherwise it is
|
|
||||||
expected to be an RSA key in PEM format.
|
|
||||||
|
|
||||||
Returns:
|
|
||||||
Verifier instance.
|
|
||||||
|
|
||||||
Raises:
|
|
||||||
OpenSSL.crypto.Error if the key_pem can't be parsed.
|
|
||||||
"""
|
|
||||||
if is_x509_cert:
|
|
||||||
pubkey = crypto.load_certificate(crypto.FILETYPE_PEM, key_pem)
|
|
||||||
else:
|
|
||||||
pubkey = crypto.load_privatekey(crypto.FILETYPE_PEM, key_pem)
|
|
||||||
return Verifier(pubkey)
|
|
||||||
|
|
||||||
|
|
||||||
class Signer(object):
|
class OpenSSLVerifier(object):
|
||||||
"""Signs messages with a private key."""
|
"""Verifies the signature on a message."""
|
||||||
|
|
||||||
def __init__(self, pkey):
|
def __init__(self, pubkey):
|
||||||
"""Constructor.
|
"""Constructor.
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
pkey, OpenSSL.crypto.PKey, The private key to sign with.
|
pubkey, OpenSSL.crypto.PKey, The public key to verify with.
|
||||||
"""
|
"""
|
||||||
self._key = pkey
|
self._pubkey = pubkey
|
||||||
|
|
||||||
def sign(self, message):
|
def verify(self, message, signature):
|
||||||
"""Signs a message.
|
"""Verifies a message against a signature.
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
message: string, Message to be signed.
|
message: string, The message to verify.
|
||||||
|
signature: string, The signature on the message.
|
||||||
|
|
||||||
Returns:
|
Returns:
|
||||||
string, The signature of the message for the given key.
|
True if message was signed by the private key associated with the public
|
||||||
"""
|
key that this object was constructed with.
|
||||||
return crypto.sign(self._key, message, 'sha256')
|
"""
|
||||||
|
try:
|
||||||
|
crypto.verify(self._pubkey, signature, message, 'sha256')
|
||||||
|
return True
|
||||||
|
except:
|
||||||
|
return False
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def from_string(key, password='notasecret'):
|
def from_string(key_pem, is_x509_cert):
|
||||||
"""Construct a Signer instance from a string.
|
"""Construct a Verified instance from a string.
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
key: string, private key in P12 format.
|
key_pem: string, public key in PEM format.
|
||||||
password: string, password for the private key file.
|
is_x509_cert: bool, True if key_pem is an X509 cert, otherwise it is
|
||||||
|
expected to be an RSA key in PEM format.
|
||||||
|
|
||||||
Returns:
|
Returns:
|
||||||
Signer instance.
|
Verifier instance.
|
||||||
|
|
||||||
Raises:
|
Raises:
|
||||||
OpenSSL.crypto.Error if the key can't be parsed.
|
OpenSSL.crypto.Error if the key_pem can't be parsed.
|
||||||
"""
|
"""
|
||||||
pkey = crypto.load_pkcs12(key, password).get_privatekey()
|
if is_x509_cert:
|
||||||
return Signer(pkey)
|
pubkey = crypto.load_certificate(crypto.FILETYPE_PEM, key_pem)
|
||||||
|
else:
|
||||||
|
pubkey = crypto.load_privatekey(crypto.FILETYPE_PEM, key_pem)
|
||||||
|
return OpenSSLVerifier(pubkey)
|
||||||
|
|
||||||
|
|
||||||
|
class OpenSSLSigner(object):
|
||||||
|
"""Signs messages with a private key."""
|
||||||
|
|
||||||
|
def __init__(self, pkey):
|
||||||
|
"""Constructor.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
pkey, OpenSSL.crypto.PKey (or equiv), The private key to sign with.
|
||||||
|
"""
|
||||||
|
self._key = pkey
|
||||||
|
|
||||||
|
def sign(self, message):
|
||||||
|
"""Signs a message.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
message: string, Message to be signed.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
string, The signature of the message for the given key.
|
||||||
|
"""
|
||||||
|
return crypto.sign(self._key, message, 'sha256')
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def from_string(key, password='notasecret'):
|
||||||
|
"""Construct a Signer instance from a string.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
key: string, private key in PKCS12 or PEM format.
|
||||||
|
password: string, password for the private key file.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Signer instance.
|
||||||
|
|
||||||
|
Raises:
|
||||||
|
OpenSSL.crypto.Error if the key can't be parsed.
|
||||||
|
"""
|
||||||
|
if key.startswith('-----BEGIN '):
|
||||||
|
pkey = crypto.load_privatekey(crypto.FILETYPE_PEM, key)
|
||||||
|
else:
|
||||||
|
pkey = crypto.load_pkcs12(key, password).get_privatekey()
|
||||||
|
return OpenSSLSigner(pkey)
|
||||||
|
|
||||||
|
except ImportError:
|
||||||
|
OpenSSLVerifier = None
|
||||||
|
OpenSSLSigner = None
|
||||||
|
|
||||||
|
|
||||||
|
try:
|
||||||
|
from Crypto.PublicKey import RSA
|
||||||
|
from Crypto.Hash import SHA256
|
||||||
|
from Crypto.Signature import PKCS1_v1_5
|
||||||
|
|
||||||
|
|
||||||
|
class PyCryptoVerifier(object):
|
||||||
|
"""Verifies the signature on a message."""
|
||||||
|
|
||||||
|
def __init__(self, pubkey):
|
||||||
|
"""Constructor.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
pubkey, OpenSSL.crypto.PKey (or equiv), The public key to verify with.
|
||||||
|
"""
|
||||||
|
self._pubkey = pubkey
|
||||||
|
|
||||||
|
def verify(self, message, signature):
|
||||||
|
"""Verifies a message against a signature.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
message: string, The message to verify.
|
||||||
|
signature: string, The signature on the message.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
True if message was signed by the private key associated with the public
|
||||||
|
key that this object was constructed with.
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
return PKCS1_v1_5.new(self._pubkey).verify(
|
||||||
|
SHA256.new(message), signature)
|
||||||
|
except:
|
||||||
|
return False
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def from_string(key_pem, is_x509_cert):
|
||||||
|
"""Construct a Verified instance from a string.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
key_pem: string, public key in PEM format.
|
||||||
|
is_x509_cert: bool, True if key_pem is an X509 cert, otherwise it is
|
||||||
|
expected to be an RSA key in PEM format.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Verifier instance.
|
||||||
|
|
||||||
|
Raises:
|
||||||
|
NotImplementedError if is_x509_cert is true.
|
||||||
|
"""
|
||||||
|
if is_x509_cert:
|
||||||
|
raise NotImplementedError(
|
||||||
|
'X509 certs are not supported by the PyCrypto library. '
|
||||||
|
'Try using PyOpenSSL if native code is an option.')
|
||||||
|
else:
|
||||||
|
pubkey = RSA.importKey(key_pem)
|
||||||
|
return PyCryptoVerifier(pubkey)
|
||||||
|
|
||||||
|
|
||||||
|
class PyCryptoSigner(object):
|
||||||
|
"""Signs messages with a private key."""
|
||||||
|
|
||||||
|
def __init__(self, pkey):
|
||||||
|
"""Constructor.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
pkey, OpenSSL.crypto.PKey (or equiv), The private key to sign with.
|
||||||
|
"""
|
||||||
|
self._key = pkey
|
||||||
|
|
||||||
|
def sign(self, message):
|
||||||
|
"""Signs a message.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
message: string, Message to be signed.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
string, The signature of the message for the given key.
|
||||||
|
"""
|
||||||
|
return PKCS1_v1_5.new(self._key).sign(SHA256.new(message))
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def from_string(key, password='notasecret'):
|
||||||
|
"""Construct a Signer instance from a string.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
key: string, private key in PEM format.
|
||||||
|
password: string, password for private key file. Unused for PEM files.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Signer instance.
|
||||||
|
|
||||||
|
Raises:
|
||||||
|
NotImplementedError if they key isn't in PEM format.
|
||||||
|
"""
|
||||||
|
if key.startswith('-----BEGIN '):
|
||||||
|
pkey = RSA.importKey(key)
|
||||||
|
else:
|
||||||
|
raise NotImplementedError(
|
||||||
|
'PKCS12 format is not supported by the PyCrpto library. '
|
||||||
|
'Try converting to a "PEM" '
|
||||||
|
'(openssl pkcs12 -in xxxxx.p12 -nodes -nocerts > privatekey.pem) '
|
||||||
|
'or using PyOpenSSL if native code is an option.')
|
||||||
|
return PyCryptoSigner(pkey)
|
||||||
|
|
||||||
|
except ImportError:
|
||||||
|
PyCryptoVerifier = None
|
||||||
|
PyCryptoSigner = None
|
||||||
|
|
||||||
|
|
||||||
|
if OpenSSLSigner:
|
||||||
|
Signer = OpenSSLSigner
|
||||||
|
Verifier = OpenSSLVerifier
|
||||||
|
elif PyCryptoSigner:
|
||||||
|
Signer = PyCryptoSigner
|
||||||
|
Verifier = PyCryptoVerifier
|
||||||
|
else:
|
||||||
|
raise ImportError('No encryption library found. Please install either '
|
||||||
|
'PyOpenSSL, or PyCrypto 2.6 or later')
|
||||||
|
|
||||||
|
|
||||||
def _urlsafe_b64encode(raw_bytes):
|
def _urlsafe_b64encode(raw_bytes):
|
||||||
|
|||||||
@@ -53,15 +53,19 @@ def datafile(filename):
|
|||||||
|
|
||||||
|
|
||||||
class CryptTests(unittest.TestCase):
|
class CryptTests(unittest.TestCase):
|
||||||
|
def setUp(self):
|
||||||
|
self.format = 'p12'
|
||||||
|
self.signer = crypt.OpenSSLSigner
|
||||||
|
self.verifier = crypt.OpenSSLVerifier
|
||||||
|
|
||||||
def test_sign_and_verify(self):
|
def test_sign_and_verify(self):
|
||||||
private_key = datafile('privatekey.p12')
|
private_key = datafile('privatekey.%s' % self.format)
|
||||||
public_key = datafile('publickey.pem')
|
public_key = datafile('publickey.pem')
|
||||||
|
|
||||||
signer = crypt.Signer.from_string(private_key)
|
signer = self.signer.from_string(private_key)
|
||||||
signature = signer.sign('foo')
|
signature = signer.sign('foo')
|
||||||
|
|
||||||
verifier = crypt.Verifier.from_string(public_key, True)
|
verifier = self.verifier.from_string(public_key, True)
|
||||||
|
|
||||||
self.assertTrue(verifier.verify('foo', signature))
|
self.assertTrue(verifier.verify('foo', signature))
|
||||||
|
|
||||||
@@ -82,8 +86,8 @@ class CryptTests(unittest.TestCase):
|
|||||||
self.assertTrue(expected_error in msg)
|
self.assertTrue(expected_error in msg)
|
||||||
|
|
||||||
def _create_signed_jwt(self):
|
def _create_signed_jwt(self):
|
||||||
private_key = datafile('privatekey.p12')
|
private_key = datafile('privatekey.%s' % self.format)
|
||||||
signer = crypt.Signer.from_string(private_key)
|
signer = self.signer.from_string(private_key)
|
||||||
audience = 'some_audience_address@testing.gserviceaccount.com'
|
audience = 'some_audience_address@testing.gserviceaccount.com'
|
||||||
now = long(time.time())
|
now = long(time.time())
|
||||||
|
|
||||||
@@ -129,7 +133,7 @@ class CryptTests(unittest.TestCase):
|
|||||||
'some_audience_address@testing.gserviceaccount.com', http=http)
|
'some_audience_address@testing.gserviceaccount.com', http=http)
|
||||||
|
|
||||||
def test_verify_id_token_bad_tokens(self):
|
def test_verify_id_token_bad_tokens(self):
|
||||||
private_key = datafile('privatekey.p12')
|
private_key = datafile('privatekey.%s' % self.format)
|
||||||
|
|
||||||
# Wrong number of segments
|
# Wrong number of segments
|
||||||
self._check_jwt_failure('foo', 'Wrong number of segments')
|
self._check_jwt_failure('foo', 'Wrong number of segments')
|
||||||
@@ -143,7 +147,7 @@ class CryptTests(unittest.TestCase):
|
|||||||
self._check_jwt_failure(jwt, 'Invalid token signature')
|
self._check_jwt_failure(jwt, 'Invalid token signature')
|
||||||
|
|
||||||
# No expiration
|
# No expiration
|
||||||
signer = crypt.Signer.from_string(private_key)
|
signer = self.signer.from_string(private_key)
|
||||||
audience = 'https:#www.googleapis.com/auth/id?client_id=' + \
|
audience = 'https:#www.googleapis.com/auth/id?client_id=' + \
|
||||||
'external_public_key@testing.gserviceaccount.com'
|
'external_public_key@testing.gserviceaccount.com'
|
||||||
jwt = crypt.make_signed_jwt(signer, {
|
jwt = crypt.make_signed_jwt(signer, {
|
||||||
@@ -186,10 +190,27 @@ class CryptTests(unittest.TestCase):
|
|||||||
self._check_jwt_failure(jwt, 'Wrong recipient')
|
self._check_jwt_failure(jwt, 'Wrong recipient')
|
||||||
|
|
||||||
|
|
||||||
|
class PEMCryptTestsPyCrypto(CryptTests):
|
||||||
|
def setUp(self):
|
||||||
|
self.format = 'pem'
|
||||||
|
self.signer = crypt.PyCryptoSigner
|
||||||
|
self.verifier = crypt.OpenSSLVerifier
|
||||||
|
|
||||||
|
|
||||||
|
class PEMCryptTestsOpenSSL(CryptTests):
|
||||||
|
def setUp(self):
|
||||||
|
self.format = 'pem'
|
||||||
|
self.signer = crypt.OpenSSLSigner
|
||||||
|
self.verifier = crypt.OpenSSLVerifier
|
||||||
|
|
||||||
|
|
||||||
class SignedJwtAssertionCredentialsTests(unittest.TestCase):
|
class SignedJwtAssertionCredentialsTests(unittest.TestCase):
|
||||||
|
def setUp(self):
|
||||||
|
self.format = 'p12'
|
||||||
|
crypt.Signer = crypt.OpenSSLSigner
|
||||||
|
|
||||||
def test_credentials_good(self):
|
def test_credentials_good(self):
|
||||||
private_key = datafile('privatekey.p12')
|
private_key = datafile('privatekey.%s' % self.format)
|
||||||
credentials = SignedJwtAssertionCredentials(
|
credentials = SignedJwtAssertionCredentials(
|
||||||
'some_account@example.com',
|
'some_account@example.com',
|
||||||
private_key,
|
private_key,
|
||||||
@@ -204,7 +225,7 @@ class SignedJwtAssertionCredentialsTests(unittest.TestCase):
|
|||||||
self.assertEqual('Bearer 1/3w', content['Authorization'])
|
self.assertEqual('Bearer 1/3w', content['Authorization'])
|
||||||
|
|
||||||
def test_credentials_to_from_json(self):
|
def test_credentials_to_from_json(self):
|
||||||
private_key = datafile('privatekey.p12')
|
private_key = datafile('privatekey.%s' % self.format)
|
||||||
credentials = SignedJwtAssertionCredentials(
|
credentials = SignedJwtAssertionCredentials(
|
||||||
'some_account@example.com',
|
'some_account@example.com',
|
||||||
private_key,
|
private_key,
|
||||||
@@ -229,7 +250,7 @@ class SignedJwtAssertionCredentialsTests(unittest.TestCase):
|
|||||||
return content
|
return content
|
||||||
|
|
||||||
def test_credentials_refresh_without_storage(self):
|
def test_credentials_refresh_without_storage(self):
|
||||||
private_key = datafile('privatekey.p12')
|
private_key = datafile('privatekey.%s' % self.format)
|
||||||
credentials = SignedJwtAssertionCredentials(
|
credentials = SignedJwtAssertionCredentials(
|
||||||
'some_account@example.com',
|
'some_account@example.com',
|
||||||
private_key,
|
private_key,
|
||||||
@@ -241,7 +262,7 @@ class SignedJwtAssertionCredentialsTests(unittest.TestCase):
|
|||||||
self.assertEqual('Bearer 3/3w', content['Authorization'])
|
self.assertEqual('Bearer 3/3w', content['Authorization'])
|
||||||
|
|
||||||
def test_credentials_refresh_with_storage(self):
|
def test_credentials_refresh_with_storage(self):
|
||||||
private_key = datafile('privatekey.p12')
|
private_key = datafile('privatekey.%s' % self.format)
|
||||||
credentials = SignedJwtAssertionCredentials(
|
credentials = SignedJwtAssertionCredentials(
|
||||||
'some_account@example.com',
|
'some_account@example.com',
|
||||||
private_key,
|
private_key,
|
||||||
@@ -260,5 +281,35 @@ class SignedJwtAssertionCredentialsTests(unittest.TestCase):
|
|||||||
os.unlink(filename)
|
os.unlink(filename)
|
||||||
|
|
||||||
|
|
||||||
|
class PEMSignedJwtAssertionCredentialsOpenSSLTests(
|
||||||
|
SignedJwtAssertionCredentialsTests):
|
||||||
|
def setUp(self):
|
||||||
|
self.format = 'pem'
|
||||||
|
crypt.Signer = crypt.OpenSSLSigner
|
||||||
|
|
||||||
|
|
||||||
|
class PEMSignedJwtAssertionCredentialsPyCryptoTests(
|
||||||
|
SignedJwtAssertionCredentialsTests):
|
||||||
|
def setUp(self):
|
||||||
|
self.format = 'pem'
|
||||||
|
crypt.Signer = crypt.PyCryptoSigner
|
||||||
|
|
||||||
|
|
||||||
|
class PKCSSignedJwtAssertionCredentialsPyCryptoTests(unittest.TestCase):
|
||||||
|
def test_for_failure(self):
|
||||||
|
crypt.Signer = crypt.PyCryptoSigner
|
||||||
|
private_key = datafile('privatekey.p12')
|
||||||
|
credentials = SignedJwtAssertionCredentials(
|
||||||
|
'some_account@example.com',
|
||||||
|
private_key,
|
||||||
|
scope='read+write',
|
||||||
|
prn='joe@example.org')
|
||||||
|
try:
|
||||||
|
credentials._generate_assertion()
|
||||||
|
self.fail()
|
||||||
|
except NotImplementedError:
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
if __name__ == '__main__':
|
if __name__ == '__main__':
|
||||||
unittest.main()
|
unittest.main()
|
||||||
|
|||||||
Reference in New Issue
Block a user