Removing SignedJwtAssertionCredentials.
This completes the consolidation of the two service account credentials implementations. In the process, also adding test coverage for some untested code paths within the crypto helpers.
This commit is contained in:
committed by
Nathaniel Manista
parent
d3391bc91d
commit
dcd20c9375
@@ -123,19 +123,17 @@ class OpenSSLSigner(object):
|
||||
return OpenSSLSigner(pkey)
|
||||
|
||||
|
||||
def pkcs12_key_as_pem(private_key_text, private_key_password):
|
||||
"""Convert the contents of a PKCS12 key to PEM using OpenSSL.
|
||||
def pkcs12_key_as_pem(private_key_bytes, private_key_password):
|
||||
"""Convert the contents of a PKCS#12 key to PEM using pyOpenSSL.
|
||||
|
||||
Args:
|
||||
private_key_text: String. Private key.
|
||||
private_key_password: String. Password for PKCS12.
|
||||
private_key_bytes: Bytes. PKCS#12 key in DER format.
|
||||
private_key_password: String. Password for PKCS#12 key.
|
||||
|
||||
Returns:
|
||||
String. PEM contents of ``private_key_text``.
|
||||
String. PEM contents of ``private_key_bytes``.
|
||||
"""
|
||||
decoded_body = base64.b64decode(private_key_text)
|
||||
private_key_password = _to_bytes(private_key_password)
|
||||
|
||||
pkcs12 = crypto.load_pkcs12(decoded_body, private_key_password)
|
||||
pkcs12 = crypto.load_pkcs12(private_key_bytes, private_key_password)
|
||||
return crypto.dump_privatekey(crypto.FILETYPE_PEM,
|
||||
pkcs12.get_privatekey())
|
||||
|
||||
@@ -298,20 +298,20 @@ class Credentials(object):
|
||||
return self._to_json(self.NON_SERIALIZED_MEMBERS)
|
||||
|
||||
@classmethod
|
||||
def new_from_json(cls, s):
|
||||
def new_from_json(cls, json_data):
|
||||
"""Utility class method to instantiate a Credentials subclass from JSON.
|
||||
|
||||
Expects the JSON string to have been produced by to_json().
|
||||
|
||||
Args:
|
||||
s: string or bytes, JSON from to_json().
|
||||
json_data: string or bytes, JSON from to_json().
|
||||
|
||||
Returns:
|
||||
An instance of the subclass of Credentials that was serialized with
|
||||
to_json().
|
||||
"""
|
||||
json_string_as_unicode = _from_bytes(s)
|
||||
data = json.loads(json_string_as_unicode)
|
||||
json_data_as_unicode = _from_bytes(json_data)
|
||||
data = json.loads(json_data_as_unicode)
|
||||
# Find and call the right classmethod from_json() to restore
|
||||
# the object.
|
||||
module_name = data['_module']
|
||||
@@ -326,8 +326,7 @@ class Credentials(object):
|
||||
module_obj = __import__(module_name,
|
||||
fromlist=module_name.split('.')[:-1])
|
||||
kls = getattr(module_obj, data['_class'])
|
||||
from_json = getattr(kls, 'from_json')
|
||||
return from_json(json_string_as_unicode)
|
||||
return kls.from_json(json_data_as_unicode)
|
||||
|
||||
@classmethod
|
||||
def from_json(cls, unused_data):
|
||||
@@ -710,19 +709,18 @@ class OAuth2Credentials(Credentials):
|
||||
return self.scopes
|
||||
|
||||
@classmethod
|
||||
def from_json(cls, s):
|
||||
def from_json(cls, json_data):
|
||||
"""Instantiate a Credentials object from a JSON description of it.
|
||||
|
||||
The JSON should have been produced by calling .to_json() on the object.
|
||||
|
||||
Args:
|
||||
data: dict, A deserialized JSON object.
|
||||
json_data: string or bytes, JSON to deserialize.
|
||||
|
||||
Returns:
|
||||
An instance of a Credentials subclass.
|
||||
"""
|
||||
s = _from_bytes(s)
|
||||
data = json.loads(s)
|
||||
data = json.loads(_from_bytes(json_data))
|
||||
if (data.get('token_expiry') and
|
||||
not isinstance(data['token_expiry'], datetime.datetime)):
|
||||
try:
|
||||
@@ -1070,8 +1068,8 @@ class AccessTokenCredentials(OAuth2Credentials):
|
||||
revoke_uri=revoke_uri)
|
||||
|
||||
@classmethod
|
||||
def from_json(cls, s):
|
||||
data = json.loads(_from_bytes(s))
|
||||
def from_json(cls, json_data):
|
||||
data = json.loads(_from_bytes(json_data))
|
||||
retval = AccessTokenCredentials(
|
||||
data['access_token'],
|
||||
data['user_agent'])
|
||||
@@ -1190,7 +1188,7 @@ class GoogleCredentials(OAuth2Credentials):
|
||||
NON_SERIALIZED_MEMBERS = (
|
||||
frozenset(['_private_key']) |
|
||||
OAuth2Credentials.NON_SERIALIZED_MEMBERS)
|
||||
|
||||
"""Members that aren't serialized when object is converted to JSON."""
|
||||
|
||||
def __init__(self, access_token, client_id, client_secret, refresh_token,
|
||||
token_expiry, token_uri, user_agent,
|
||||
@@ -1630,101 +1628,6 @@ def _RequireCryptoOrDie():
|
||||
raise CryptoUnavailableError('No crypto library available')
|
||||
|
||||
|
||||
class SignedJwtAssertionCredentials(AssertionCredentials):
|
||||
"""Credentials object used for OAuth 2.0 Signed JWT assertion grants.
|
||||
|
||||
This credential does not require a flow to instantiate because it
|
||||
represents a two legged flow, and therefore has all of the required
|
||||
information to generate and refresh its own access tokens.
|
||||
|
||||
SignedJwtAssertionCredentials requires either PyOpenSSL, or PyCrypto
|
||||
2.6 or later. For App Engine you may also consider using
|
||||
AppAssertionCredentials.
|
||||
"""
|
||||
|
||||
MAX_TOKEN_LIFETIME_SECS = 3600 # 1 hour in seconds
|
||||
|
||||
@util.positional(4)
|
||||
def __init__(self,
|
||||
service_account_name,
|
||||
private_key,
|
||||
scope,
|
||||
private_key_password='notasecret',
|
||||
user_agent=None,
|
||||
token_uri=GOOGLE_TOKEN_URI,
|
||||
revoke_uri=GOOGLE_REVOKE_URI,
|
||||
**kwargs):
|
||||
"""Constructor for SignedJwtAssertionCredentials.
|
||||
|
||||
Args:
|
||||
service_account_name: string, id for account, usually an email
|
||||
address.
|
||||
private_key: string or bytes, private key in PKCS12 or PEM format.
|
||||
scope: string or iterable of strings, scope(s) of the credentials
|
||||
being requested.
|
||||
private_key_password: string, password for private_key, unused if
|
||||
private_key is in PEM format.
|
||||
user_agent: string, HTTP User-Agent to provide for this
|
||||
application.
|
||||
token_uri: string, URI for token endpoint. For convenience defaults
|
||||
to Google's endpoints but any OAuth 2.0 provider can be
|
||||
used.
|
||||
revoke_uri: string, URI for revoke endpoint.
|
||||
kwargs: kwargs, Additional parameters to add to the JWT token, for
|
||||
example sub=joe@xample.org.
|
||||
|
||||
Raises:
|
||||
CryptoUnavailableError if no crypto library is available.
|
||||
"""
|
||||
_RequireCryptoOrDie()
|
||||
super(SignedJwtAssertionCredentials, self).__init__(
|
||||
None,
|
||||
user_agent=user_agent,
|
||||
token_uri=token_uri,
|
||||
revoke_uri=revoke_uri,
|
||||
)
|
||||
|
||||
self.scope = util.scopes_to_string(scope)
|
||||
|
||||
# Keep base64 encoded so it can be stored in JSON.
|
||||
self.private_key = base64.b64encode(_to_bytes(private_key))
|
||||
self.private_key_password = private_key_password
|
||||
self.service_account_name = service_account_name
|
||||
self.kwargs = kwargs
|
||||
|
||||
@classmethod
|
||||
def from_json(cls, s):
|
||||
data = json.loads(_from_bytes(s))
|
||||
retval = SignedJwtAssertionCredentials(
|
||||
data['service_account_name'],
|
||||
base64.b64decode(data['private_key']),
|
||||
data['scope'],
|
||||
private_key_password=data['private_key_password'],
|
||||
user_agent=data['user_agent'],
|
||||
token_uri=data['token_uri'],
|
||||
**data['kwargs']
|
||||
)
|
||||
retval.invalid = data['invalid']
|
||||
retval.access_token = data['access_token']
|
||||
return retval
|
||||
|
||||
def _generate_assertion(self):
|
||||
"""Generate the assertion that will be used in the request."""
|
||||
now = int(time.time())
|
||||
payload = {
|
||||
'aud': self.token_uri,
|
||||
'scope': self.scope,
|
||||
'iat': now,
|
||||
'exp': now + SignedJwtAssertionCredentials.MAX_TOKEN_LIFETIME_SECS,
|
||||
'iss': self.service_account_name
|
||||
}
|
||||
payload.update(self.kwargs)
|
||||
logger.debug(str(payload))
|
||||
|
||||
private_key = base64.b64decode(self.private_key)
|
||||
return crypt.make_signed_jwt(crypt.Signer.from_string(
|
||||
private_key, self.private_key_password), payload)
|
||||
|
||||
# Only used in verify_id_token(), which is always calling to the same URI
|
||||
# for the certs.
|
||||
_cached_http = httplib2.Http(MemoryCache())
|
||||
|
||||
@@ -308,8 +308,7 @@ class ServiceAccountCredentials(AssertionCredentials):
|
||||
# state.
|
||||
pkcs12_val = base64.b64decode(pkcs12_val)
|
||||
password = json_data['_private_key_password']
|
||||
signer = crypt.Signer.from_string(private_key_pkcs12,
|
||||
private_key_password)
|
||||
signer = crypt.Signer.from_string(pkcs12_val, password)
|
||||
|
||||
credentials = cls(
|
||||
json_data['_service_account_email'],
|
||||
|
||||
@@ -174,6 +174,11 @@ class TestRsaSigner(unittest2.TestCase):
|
||||
with self.assertRaises(ValueError):
|
||||
RsaSigner.from_string(key_bytes)
|
||||
|
||||
def test_from_string_bogus_key(self):
|
||||
key_bytes = 'bogus-key'
|
||||
with self.assertRaises(ValueError):
|
||||
RsaSigner.from_string(key_bytes)
|
||||
|
||||
|
||||
if __name__ == '__main__': # pragma: NO COVER
|
||||
unittest2.main()
|
||||
|
||||
@@ -14,13 +14,13 @@
|
||||
"""Unit tests for oauth2client._pycrypto_crypt."""
|
||||
|
||||
import os
|
||||
import unittest
|
||||
import unittest2
|
||||
|
||||
from oauth2client.crypt import PyCryptoSigner
|
||||
from oauth2client.crypt import PyCryptoVerifier
|
||||
|
||||
|
||||
class TestPyCryptoVerifier(unittest.TestCase):
|
||||
class TestPyCryptoVerifier(unittest2.TestCase):
|
||||
|
||||
PUBLIC_CERT_FILENAME = os.path.join(os.path.dirname(__file__),
|
||||
'data', 'public_cert.pem')
|
||||
@@ -63,5 +63,13 @@ class TestPyCryptoVerifier(unittest.TestCase):
|
||||
self.assertTrue(isinstance(verifier, PyCryptoVerifier))
|
||||
|
||||
|
||||
class TestPyCryptoSigner(unittest2.TestCase):
|
||||
|
||||
def test_from_string_bad_key(self):
|
||||
key_bytes = 'definitely-not-pem-format'
|
||||
with self.assertRaises(NotImplementedError):
|
||||
PyCryptoSigner.from_string(key_bytes)
|
||||
|
||||
|
||||
if __name__ == '__main__': # pragma: NO COVER
|
||||
unittest.main()
|
||||
unittest2.main()
|
||||
|
||||
@@ -20,15 +20,17 @@ import mock
|
||||
|
||||
from oauth2client import _helpers
|
||||
from oauth2client.client import HAS_OPENSSL
|
||||
from oauth2client.client import SignedJwtAssertionCredentials
|
||||
from oauth2client import crypt
|
||||
from oauth2client.service_account import ServiceAccountCredentials
|
||||
|
||||
|
||||
def data_filename(filename):
|
||||
return os.path.join(os.path.dirname(__file__), 'data', filename)
|
||||
|
||||
|
||||
def datafile(filename):
|
||||
f = open(os.path.join(os.path.dirname(__file__), 'data', filename), 'rb')
|
||||
data = f.read()
|
||||
f.close()
|
||||
return data
|
||||
with open(data_filename(filename), 'rb') as file_obj:
|
||||
return file_obj.read()
|
||||
|
||||
|
||||
class Test__bad_pkcs12_key_as_pem(unittest.TestCase):
|
||||
@@ -39,23 +41,23 @@ class Test__bad_pkcs12_key_as_pem(unittest.TestCase):
|
||||
|
||||
class Test_pkcs12_key_as_pem(unittest.TestCase):
|
||||
|
||||
def _make_signed_jwt_creds(self, private_key_file='privatekey.p12',
|
||||
private_key=None):
|
||||
private_key = private_key or datafile(private_key_file)
|
||||
return SignedJwtAssertionCredentials(
|
||||
def _make_svc_account_creds(self, private_key_file='privatekey.p12'):
|
||||
filename = data_filename(private_key_file)
|
||||
credentials = ServiceAccountCredentials.from_p12_keyfile(
|
||||
'some_account@example.com',
|
||||
private_key,
|
||||
scope='read+write',
|
||||
sub='joe@example.org')
|
||||
filename,
|
||||
scopes='read+write')
|
||||
credentials._kwargs['sub'] ='joe@example.org'
|
||||
return credentials
|
||||
|
||||
def _succeeds_helper(self, password=None):
|
||||
self.assertEqual(True, HAS_OPENSSL)
|
||||
|
||||
credentials = self._make_signed_jwt_creds()
|
||||
credentials = self._make_svc_account_creds()
|
||||
if password is None:
|
||||
password = credentials.private_key_password
|
||||
pem_contents = crypt.pkcs12_key_as_pem(credentials.private_key,
|
||||
password)
|
||||
password = credentials._private_key_password
|
||||
pem_contents = crypt.pkcs12_key_as_pem(
|
||||
credentials._private_key_pkcs12, password)
|
||||
pkcs12_key_as_pem = datafile('pem_from_pkcs12.pem')
|
||||
pkcs12_key_as_pem = _helpers._parse_pem_key(pkcs12_key_as_pem)
|
||||
alternate_pem = datafile('pem_from_pkcs12_alternate.pem')
|
||||
@@ -68,13 +70,6 @@ class Test_pkcs12_key_as_pem(unittest.TestCase):
|
||||
password = u'notasecret'
|
||||
self._succeeds_helper(password)
|
||||
|
||||
def test_with_nonsense_key(self):
|
||||
from OpenSSL import crypto
|
||||
credentials = self._make_signed_jwt_creds(private_key=b'NOT_A_KEY')
|
||||
self.assertRaises(crypto.Error, crypt.pkcs12_key_as_pem,
|
||||
credentials.private_key,
|
||||
credentials.private_key_password)
|
||||
|
||||
|
||||
class Test__verify_signature(unittest.TestCase):
|
||||
|
||||
|
||||
@@ -21,39 +21,48 @@ import unittest2
|
||||
|
||||
from .http_mock import HttpMockSequence
|
||||
from oauth2client.client import Credentials
|
||||
from oauth2client.client import SignedJwtAssertionCredentials
|
||||
from oauth2client.client import VerifyJwtTokenError
|
||||
from oauth2client.client import verify_id_token
|
||||
from oauth2client.client import HAS_OPENSSL
|
||||
from oauth2client.client import HAS_CRYPTO
|
||||
from oauth2client import crypt
|
||||
from oauth2client.file import Storage
|
||||
from oauth2client.service_account import _PASSWORD_DEFAULT
|
||||
from oauth2client.service_account import ServiceAccountCredentials
|
||||
|
||||
|
||||
__author__ = 'jcgregorio@google.com (Joe Gregorio)'
|
||||
|
||||
|
||||
_FORMATS_TO_CONSTRUCTOR_ARGS = {
|
||||
'p12': 'private_key_pkcs12',
|
||||
'pem': 'private_key_pkcs8_pem',
|
||||
}
|
||||
|
||||
|
||||
def data_filename(filename):
|
||||
return os.path.join(os.path.dirname(__file__), 'data', filename)
|
||||
|
||||
|
||||
def datafile(filename):
|
||||
f = open(os.path.join(os.path.dirname(__file__), 'data', filename), 'rb')
|
||||
data = f.read()
|
||||
f.close()
|
||||
return data
|
||||
with open(data_filename(filename), 'rb') as file_obj:
|
||||
return file_obj.read()
|
||||
|
||||
|
||||
class CryptTests(unittest2.TestCase):
|
||||
|
||||
def setUp(self):
|
||||
self.format = 'p12'
|
||||
self.format_ = 'p12'
|
||||
self.signer = crypt.OpenSSLSigner
|
||||
self.verifier = crypt.OpenSSLVerifier
|
||||
|
||||
def test_sign_and_verify(self):
|
||||
self._check_sign_and_verify('privatekey.%s' % self.format)
|
||||
self._check_sign_and_verify('privatekey.' + self.format_)
|
||||
|
||||
def test_sign_and_verify_from_converted_pkcs12(self):
|
||||
# Tests that following instructions to convert from PKCS12 to
|
||||
# PEM works.
|
||||
if self.format == 'pem':
|
||||
if self.format_ == 'pem':
|
||||
self._check_sign_and_verify('pem_from_pkcs12.pem')
|
||||
|
||||
def _check_sign_and_verify(self, private_key_file):
|
||||
@@ -85,7 +94,7 @@ class CryptTests(unittest2.TestCase):
|
||||
self.assertTrue(expected_error in str(exc_manager.exception))
|
||||
|
||||
def _create_signed_jwt(self):
|
||||
private_key = datafile('privatekey.%s' % self.format)
|
||||
private_key = datafile('privatekey.' + self.format_)
|
||||
signer = self.signer.from_string(private_key)
|
||||
audience = 'some_audience_address@testing.gserviceaccount.com'
|
||||
now = int(time.time())
|
||||
@@ -132,7 +141,7 @@ class CryptTests(unittest2.TestCase):
|
||||
http=http)
|
||||
|
||||
def test_verify_id_token_bad_tokens(self):
|
||||
private_key = datafile('privatekey.%s' % self.format)
|
||||
private_key = datafile('privatekey.' + self.format_)
|
||||
|
||||
# Wrong number of segments
|
||||
self._check_jwt_failure('foo', 'Wrong number of segments')
|
||||
@@ -198,7 +207,7 @@ class CryptTests(unittest2.TestCase):
|
||||
class PEMCryptTestsPyCrypto(CryptTests):
|
||||
|
||||
def setUp(self):
|
||||
self.format = 'pem'
|
||||
self.format_ = 'pem'
|
||||
self.signer = crypt.PyCryptoSigner
|
||||
self.verifier = crypt.PyCryptoVerifier
|
||||
|
||||
@@ -206,7 +215,7 @@ class PEMCryptTestsPyCrypto(CryptTests):
|
||||
class PEMCryptTestsOpenSSL(CryptTests):
|
||||
|
||||
def setUp(self):
|
||||
self.format = 'pem'
|
||||
self.format_ = 'pem'
|
||||
self.signer = crypt.OpenSSLSigner
|
||||
self.verifier = crypt.OpenSSLVerifier
|
||||
|
||||
@@ -214,16 +223,27 @@ class PEMCryptTestsOpenSSL(CryptTests):
|
||||
class SignedJwtAssertionCredentialsTests(unittest2.TestCase):
|
||||
|
||||
def setUp(self):
|
||||
self.format = 'p12'
|
||||
self.format_ = 'p12'
|
||||
crypt.Signer = crypt.OpenSSLSigner
|
||||
|
||||
def test_credentials_good(self):
|
||||
private_key = datafile('privatekey.%s' % self.format)
|
||||
credentials = SignedJwtAssertionCredentials(
|
||||
'some_account@example.com',
|
||||
private_key,
|
||||
scope='read+write',
|
||||
def _make_credentials(self):
|
||||
private_key = datafile('privatekey.' + self.format_)
|
||||
signer = crypt.Signer.from_string(private_key)
|
||||
credentials = ServiceAccountCredentials(
|
||||
'some_account@example.com', signer,
|
||||
scopes='read+write',
|
||||
sub='joe@example.org')
|
||||
if self.format_ == 'pem':
|
||||
credentials._private_key_pkcs8_pem = private_key
|
||||
elif self.format_ == 'p12':
|
||||
credentials._private_key_pkcs12 = private_key
|
||||
credentials._private_key_password = _PASSWORD_DEFAULT
|
||||
else: # pragma: NO COVER
|
||||
raise ValueError('Unexpected format.')
|
||||
return credentials
|
||||
|
||||
def test_credentials_good(self):
|
||||
credentials = self._make_credentials()
|
||||
http = HttpMockSequence([
|
||||
({'status': '200'}, b'{"access_token":"1/3w","expires_in":3600}'),
|
||||
({'status': '200'}, 'echo_request_headers'),
|
||||
@@ -233,18 +253,14 @@ class SignedJwtAssertionCredentialsTests(unittest2.TestCase):
|
||||
self.assertEqual(b'Bearer 1/3w', content[b'Authorization'])
|
||||
|
||||
def test_credentials_to_from_json(self):
|
||||
private_key = datafile('privatekey.%s' % self.format)
|
||||
credentials = SignedJwtAssertionCredentials(
|
||||
'some_account@example.com',
|
||||
private_key,
|
||||
scope='read+write',
|
||||
sub='joe@example.org')
|
||||
credentials = self._make_credentials()
|
||||
json = credentials.to_json()
|
||||
restored = Credentials.new_from_json(json)
|
||||
self.assertEqual(credentials.private_key, restored.private_key)
|
||||
self.assertEqual(credentials.private_key_password,
|
||||
restored.private_key_password)
|
||||
self.assertEqual(credentials.kwargs, restored.kwargs)
|
||||
self.assertEqual(credentials._private_key_pkcs12,
|
||||
restored._private_key_pkcs12)
|
||||
self.assertEqual(credentials._private_key_password,
|
||||
restored._private_key_password)
|
||||
self.assertEqual(credentials._kwargs, restored._kwargs)
|
||||
|
||||
def _credentials_refresh(self, credentials):
|
||||
http = HttpMockSequence([
|
||||
@@ -258,24 +274,12 @@ class SignedJwtAssertionCredentialsTests(unittest2.TestCase):
|
||||
return content
|
||||
|
||||
def test_credentials_refresh_without_storage(self):
|
||||
private_key = datafile('privatekey.%s' % self.format)
|
||||
credentials = SignedJwtAssertionCredentials(
|
||||
'some_account@example.com',
|
||||
private_key,
|
||||
scope='read+write',
|
||||
sub='joe@example.org')
|
||||
|
||||
credentials = self._make_credentials()
|
||||
content = self._credentials_refresh(credentials)
|
||||
|
||||
self.assertEqual(b'Bearer 3/3w', content[b'Authorization'])
|
||||
|
||||
def test_credentials_refresh_with_storage(self):
|
||||
private_key = datafile('privatekey.%s' % self.format)
|
||||
credentials = SignedJwtAssertionCredentials(
|
||||
'some_account@example.com',
|
||||
private_key,
|
||||
scope='read+write',
|
||||
sub='joe@example.org')
|
||||
credentials = self._make_credentials()
|
||||
|
||||
filehandle, filename = tempfile.mkstemp()
|
||||
os.close(filehandle)
|
||||
@@ -293,7 +297,7 @@ class PEMSignedJwtAssertionCredentialsOpenSSLTests(
|
||||
SignedJwtAssertionCredentialsTests):
|
||||
|
||||
def setUp(self):
|
||||
self.format = 'pem'
|
||||
self.format_ = 'pem'
|
||||
crypt.Signer = crypt.OpenSSLSigner
|
||||
|
||||
|
||||
@@ -301,25 +305,10 @@ class PEMSignedJwtAssertionCredentialsPyCryptoTests(
|
||||
SignedJwtAssertionCredentialsTests):
|
||||
|
||||
def setUp(self):
|
||||
self.format = 'pem'
|
||||
self.format_ = 'pem'
|
||||
crypt.Signer = crypt.PyCryptoSigner
|
||||
|
||||
|
||||
class PKCSSignedJwtAssertionCredentialsPyCryptoTests(unittest2.TestCase):
|
||||
|
||||
def test_for_failure(self):
|
||||
crypt.Signer = crypt.PyCryptoSigner
|
||||
private_key = datafile('privatekey.p12')
|
||||
credentials = SignedJwtAssertionCredentials(
|
||||
'some_account@example.com',
|
||||
private_key,
|
||||
scope='read+write',
|
||||
sub='joe@example.org')
|
||||
|
||||
self.assertRaises(NotImplementedError,
|
||||
credentials._generate_assertion)
|
||||
|
||||
|
||||
class TestHasOpenSSLFlag(unittest2.TestCase):
|
||||
|
||||
def test_true(self):
|
||||
|
||||
@@ -57,6 +57,23 @@ class ServiceAccountCredentialsTests(unittest2.TestCase):
|
||||
client_id=self.client_id,
|
||||
)
|
||||
|
||||
def test__to_json_override(self):
|
||||
signer = object()
|
||||
creds = ServiceAccountCredentials('name@email.com',
|
||||
signer)
|
||||
self.assertEqual(creds._signer, signer)
|
||||
# Serialize over-ridden data (unrelated to ``creds``).
|
||||
to_serialize = {'unrelated': 'data'}
|
||||
serialized_str = creds._to_json([], to_serialize.copy())
|
||||
serialized_data = json.loads(serialized_str)
|
||||
expected_serialized = {
|
||||
'_class': 'ServiceAccountCredentials',
|
||||
'_module': 'oauth2client.service_account',
|
||||
'token_expiry': None,
|
||||
}
|
||||
expected_serialized.update(to_serialize)
|
||||
self.assertEqual(serialized_data, expected_serialized)
|
||||
|
||||
def test_sign_blob(self):
|
||||
private_key_id, signature = self.credentials.sign_blob('Google')
|
||||
self.assertEqual(self.private_key_id, private_key_id)
|
||||
|
||||
Reference in New Issue
Block a user