Make _ServiceAccountCredentials public.
Also - changing the svc. acct. creds constructor to take a signer - adding two factory constructors to build from a JSON keyfile (either by filename or already parsed) - adding helpers to avoid re-loading file contents or re-parsing JSON when constructing svc. acct. creds from the main `client` module
This commit is contained in:
@@ -68,6 +68,7 @@ class OpenSSLVerifier(object):
|
||||
Raises:
|
||||
OpenSSL.crypto.Error: if the key_pem can't be parsed.
|
||||
"""
|
||||
key_pem = _to_bytes(key_pem)
|
||||
if is_x509_cert:
|
||||
pubkey = crypto.load_certificate(crypto.FILETYPE_PEM, key_pem)
|
||||
else:
|
||||
@@ -112,7 +113,8 @@ class OpenSSLSigner(object):
|
||||
Raises:
|
||||
OpenSSL.crypto.Error if the key can't be parsed.
|
||||
"""
|
||||
parsed_pem_key = _parse_pem_key(_to_bytes(key))
|
||||
key = _to_bytes(key)
|
||||
parsed_pem_key = _parse_pem_key(key)
|
||||
if parsed_pem_key:
|
||||
pkey = crypto.load_privatekey(crypto.FILETYPE_PEM, parsed_pem_key)
|
||||
else:
|
||||
|
||||
@@ -115,7 +115,7 @@ class PyCryptoSigner(object):
|
||||
Raises:
|
||||
NotImplementedError if the key isn't in PEM format.
|
||||
"""
|
||||
parsed_pem_key = _parse_pem_key(key)
|
||||
parsed_pem_key = _parse_pem_key(_to_bytes(key))
|
||||
if parsed_pem_key:
|
||||
pkey = RSA.importKey(parsed_pem_key)
|
||||
else:
|
||||
|
||||
@@ -1230,17 +1230,17 @@ class GoogleCredentials(OAuth2Credentials):
|
||||
return self
|
||||
|
||||
@classmethod
|
||||
def from_json(cls, s):
|
||||
def from_json(cls, json_data):
|
||||
# TODO(issue 388): eliminate the circularity that is the reason for
|
||||
# this non-top-level import.
|
||||
from oauth2client.service_account import _ServiceAccountCredentials
|
||||
data = json.loads(_from_bytes(s))
|
||||
# this non-top-level import.
|
||||
from oauth2client.service_account import ServiceAccountCredentials
|
||||
data = json.loads(_from_bytes(json_data))
|
||||
|
||||
# We handle service_account._ServiceAccountCredentials since it is a
|
||||
# We handle service_account.ServiceAccountCredentials since it is a
|
||||
# possible return type of GoogleCredentials.get_application_default()
|
||||
if (data['_module'] == 'oauth2client.service_account' and
|
||||
data['_class'] == '_ServiceAccountCredentials'):
|
||||
return _ServiceAccountCredentials.from_json(s)
|
||||
data['_class'] == 'ServiceAccountCredentials'):
|
||||
return ServiceAccountCredentials.from_json(data)
|
||||
|
||||
token_expiry = _parse_expiry(data.get('token_expiry'))
|
||||
google_credentials = cls(
|
||||
@@ -1490,9 +1490,6 @@ def _get_well_known_file():
|
||||
|
||||
def _get_application_default_credential_from_file(filename):
|
||||
"""Build the Application Default Credentials from file."""
|
||||
|
||||
from oauth2client import service_account
|
||||
|
||||
# read the credentials from the file
|
||||
with open(filename) as file_obj:
|
||||
client_credentials = json.load(file_obj)
|
||||
@@ -1523,12 +1520,9 @@ def _get_application_default_credential_from_file(filename):
|
||||
token_uri=GOOGLE_TOKEN_URI,
|
||||
user_agent='Python client library')
|
||||
else: # client_credentials['type'] == SERVICE_ACCOUNT
|
||||
return service_account._ServiceAccountCredentials(
|
||||
service_account_id=client_credentials['client_id'],
|
||||
service_account_email=client_credentials['client_email'],
|
||||
private_key_id=client_credentials['private_key_id'],
|
||||
private_key_pkcs8_text=client_credentials['private_key'],
|
||||
scopes=[])
|
||||
from oauth2client.service_account import ServiceAccountCredentials
|
||||
return ServiceAccountCredentials.from_json_keyfile_dict(
|
||||
client_credentials)
|
||||
|
||||
|
||||
def _raise_exception_for_missing_fields(missing_fields):
|
||||
|
||||
@@ -23,48 +23,162 @@ from oauth2client import GOOGLE_REVOKE_URI
|
||||
from oauth2client import GOOGLE_TOKEN_URI
|
||||
from oauth2client._helpers import _json_encode
|
||||
from oauth2client._helpers import _from_bytes
|
||||
from oauth2client._helpers import _to_bytes
|
||||
from oauth2client._helpers import _urlsafe_b64encode
|
||||
from oauth2client import util
|
||||
from oauth2client.client import AssertionCredentials
|
||||
from oauth2client.client import EXPIRY_FORMAT
|
||||
from oauth2client.client import SERVICE_ACCOUNT
|
||||
from oauth2client import crypt
|
||||
|
||||
|
||||
class _ServiceAccountCredentials(AssertionCredentials):
|
||||
"""Class representing a service account (signed JWT) credential."""
|
||||
class ServiceAccountCredentials(AssertionCredentials):
|
||||
"""Service Account credential for OAuth 2.0 signed JWT grants.
|
||||
|
||||
MAX_TOKEN_LIFETIME_SECS = 3600 # 1 hour in seconds
|
||||
Supports
|
||||
|
||||
* JSON keyfile (typically contains a PKCS8 key stored as
|
||||
PEM text)
|
||||
|
||||
Makes an assertion to server using a signed JWT assertion in exchange
|
||||
for an access token.
|
||||
|
||||
This credential does not require a flow to instantiate because it
|
||||
represents a two legged flow, and therefore has all of the required
|
||||
information to generate and refresh its own access tokens.
|
||||
|
||||
Args:
|
||||
service_account_email: string, The email associated with the
|
||||
service account.
|
||||
signer: ``crypt.Signer``, A signer which can be used to sign content.
|
||||
scopes: List or string, (Optional) Scopes to use when acquiring
|
||||
an access token.
|
||||
private_key_id: string, (Optional) Private key identifier. Typically
|
||||
only used with a JSON keyfile. Can be sent in the
|
||||
header of a JWT token assertion.
|
||||
client_id: string, (Optional) Client ID for the project that owns the
|
||||
service account.
|
||||
user_agent: string, (Optional) User agent to use when sending
|
||||
request.
|
||||
kwargs: dict, Extra key-value pairs (both strings) to send in the
|
||||
payload body when making an assertion.
|
||||
"""
|
||||
|
||||
MAX_TOKEN_LIFETIME_SECS = 3600
|
||||
"""Max lifetime of the token (one hour, in seconds)."""
|
||||
|
||||
NON_SERIALIZED_MEMBERS = (
|
||||
frozenset(['_signer']) |
|
||||
AssertionCredentials.NON_SERIALIZED_MEMBERS)
|
||||
"""Members that aren't serialized when object is converted to JSON."""
|
||||
|
||||
def __init__(self, service_account_id, service_account_email,
|
||||
private_key_id, private_key_pkcs8_text, scopes,
|
||||
user_agent=None, token_uri=GOOGLE_TOKEN_URI,
|
||||
revoke_uri=GOOGLE_REVOKE_URI, **kwargs):
|
||||
# Can be over-ridden by factory constructors. Used for
|
||||
# serialization/deserialization purposes.
|
||||
_private_key_pkcs8_pem = None
|
||||
|
||||
super(_ServiceAccountCredentials, self).__init__(
|
||||
None, user_agent=user_agent, token_uri=token_uri,
|
||||
revoke_uri=revoke_uri)
|
||||
def __init__(self,
|
||||
service_account_email,
|
||||
signer,
|
||||
scopes='',
|
||||
private_key_id=None,
|
||||
client_id=None,
|
||||
user_agent=None,
|
||||
**kwargs):
|
||||
|
||||
super(ServiceAccountCredentials, self).__init__(
|
||||
None, user_agent=user_agent)
|
||||
|
||||
self._service_account_id = service_account_id
|
||||
self._service_account_email = service_account_email
|
||||
self._private_key_id = private_key_id
|
||||
self._private_key_pkcs8_text = private_key_pkcs8_text
|
||||
self._signer = crypt.Signer.from_string(self._private_key_pkcs8_text)
|
||||
self._signer = signer
|
||||
self._scopes = util.scopes_to_string(scopes)
|
||||
self._private_key_id = private_key_id
|
||||
self.client_id = client_id
|
||||
self._user_agent = user_agent
|
||||
self._token_uri = token_uri
|
||||
self._revoke_uri = revoke_uri
|
||||
self._kwargs = kwargs
|
||||
|
||||
@classmethod
|
||||
def _from_parsed_json_keyfile(cls, keyfile_dict, scopes):
|
||||
"""Helper for factory constructors from JSON keyfile.
|
||||
|
||||
Args:
|
||||
keyfile_dict: dict-like object, The parsed dictionary-like object
|
||||
containing the contents of the JSON keyfile.
|
||||
scopes: List or string, Scopes to use when acquiring an
|
||||
access token.
|
||||
|
||||
Returns:
|
||||
ServiceAccountCredentials, a credentials object created from
|
||||
the keyfile contents.
|
||||
|
||||
Raises:
|
||||
ValueError, if the credential type is not :data:`SERVICE_ACCOUNT`.
|
||||
KeyError, if one of the expected keys is not present in
|
||||
the keyfile.
|
||||
"""
|
||||
creds_type = keyfile_dict.get('type')
|
||||
if creds_type != SERVICE_ACCOUNT:
|
||||
raise ValueError('Unexpected credentials type', creds_type,
|
||||
'Expected', SERVICE_ACCOUNT)
|
||||
|
||||
service_account_email = keyfile_dict['client_email']
|
||||
private_key_pkcs8_pem = keyfile_dict['private_key']
|
||||
private_key_id = keyfile_dict['private_key_id']
|
||||
client_id = keyfile_dict['client_id']
|
||||
|
||||
signer = crypt.Signer.from_string(private_key_pkcs8_pem)
|
||||
credentials = cls(service_account_email, signer, scopes=scopes,
|
||||
private_key_id=private_key_id,
|
||||
client_id=client_id)
|
||||
credentials._private_key_pkcs8_pem = private_key_pkcs8_pem
|
||||
return credentials
|
||||
|
||||
@classmethod
|
||||
def from_json_keyfile_name(cls, filename, scopes=''):
|
||||
"""Factory constructor from JSON keyfile by name.
|
||||
|
||||
Args:
|
||||
filename: string, The location of the keyfile.
|
||||
scopes: List or string, (Optional) Scopes to use when acquiring an
|
||||
access token.
|
||||
|
||||
Returns:
|
||||
ServiceAccountCredentials, a credentials object created from
|
||||
the keyfile.
|
||||
|
||||
Raises:
|
||||
ValueError, if the credential type is not :data:`SERVICE_ACCOUNT`.
|
||||
KeyError, if one of the expected keys is not present in
|
||||
the keyfile.
|
||||
"""
|
||||
with open(filename, 'r') as file_obj:
|
||||
client_credentials = json.load(file_obj)
|
||||
return cls._from_parsed_json_keyfile(client_credentials, scopes)
|
||||
|
||||
@classmethod
|
||||
def from_json_keyfile_dict(cls, keyfile_dict, scopes=''):
|
||||
"""Factory constructor from parsed JSON keyfile.
|
||||
|
||||
Args:
|
||||
keyfile_dict: dict-like object, The parsed dictionary-like object
|
||||
containing the contents of the JSON keyfile.
|
||||
scopes: List or string, (Optional) Scopes to use when acquiring an
|
||||
access token.
|
||||
|
||||
Returns:
|
||||
ServiceAccountCredentials, a credentials object created from
|
||||
the keyfile.
|
||||
|
||||
Raises:
|
||||
ValueError, if the credential type is not :data:`SERVICE_ACCOUNT`.
|
||||
KeyError, if one of the expected keys is not present in
|
||||
the keyfile.
|
||||
"""
|
||||
return cls._from_parsed_json_keyfile(keyfile_dict, scopes)
|
||||
|
||||
def _generate_assertion(self):
|
||||
"""Generate the assertion that will be used in the request."""
|
||||
now = int(time.time())
|
||||
payload = {
|
||||
'aud': self._token_uri,
|
||||
'aud': self.token_uri,
|
||||
'scope': self._scopes,
|
||||
'iat': now,
|
||||
'exp': now + self.MAX_TOKEN_LIFETIME_SECS,
|
||||
@@ -85,26 +199,45 @@ class _ServiceAccountCredentials(AssertionCredentials):
|
||||
def serialization_data(self):
|
||||
return {
|
||||
'type': 'service_account',
|
||||
'client_id': self._service_account_id,
|
||||
'client_email': self._service_account_email,
|
||||
'private_key_id': self._private_key_id,
|
||||
'private_key': self._private_key_pkcs8_text
|
||||
'private_key': self._private_key_pkcs8_pem,
|
||||
'client_id': self.client_id,
|
||||
}
|
||||
|
||||
@classmethod
|
||||
def from_json(cls, s):
|
||||
data = json.loads(_from_bytes(s))
|
||||
def from_json(cls, json_data):
|
||||
"""Deserialize a JSON-serialized instance.
|
||||
|
||||
Inverse to :meth:`to_json`.
|
||||
|
||||
Args:
|
||||
json_data: dict or string, Serialized JSON (as a string or an
|
||||
already parsed dictionary) representing a credential.
|
||||
|
||||
Returns:
|
||||
ServiceAccountCredentials from the serialized data.
|
||||
"""
|
||||
if not isinstance(json_data, dict):
|
||||
json_data = json.loads(_from_bytes(json_data))
|
||||
|
||||
private_key_pkcs8_pem = json_data['_private_key_pkcs8_pem']
|
||||
signer = crypt.Signer.from_string(private_key_pkcs8_pem)
|
||||
credentials = cls(
|
||||
service_account_id=data['_service_account_id'],
|
||||
service_account_email=data['_service_account_email'],
|
||||
private_key_id=data['_private_key_id'],
|
||||
private_key_pkcs8_text=data['_private_key_pkcs8_text'],
|
||||
scopes=[],
|
||||
user_agent=data['_user_agent'])
|
||||
credentials.invalid = data['invalid']
|
||||
credentials.access_token = data['access_token']
|
||||
token_expiry = data.get('token_expiry', None)
|
||||
json_data['_service_account_email'],
|
||||
signer,
|
||||
scopes=json_data['_scopes'],
|
||||
private_key_id=json_data['_private_key_id'],
|
||||
client_id=json_data['client_id'],
|
||||
user_agent=json_data['_user_agent'],
|
||||
**json_data['_kwargs']
|
||||
)
|
||||
credentials._private_key_pkcs8_pem = private_key_pkcs8_pem
|
||||
credentials.invalid = json_data['invalid']
|
||||
credentials.access_token = json_data['access_token']
|
||||
credentials.token_uri = json_data['token_uri']
|
||||
credentials.revoke_uri = json_data['revoke_uri']
|
||||
token_expiry = json_data.get('token_expiry', None)
|
||||
if token_expiry is not None:
|
||||
credentials.token_expiry = datetime.datetime.strptime(
|
||||
token_expiry, EXPIRY_FORMAT)
|
||||
@@ -114,12 +247,13 @@ class _ServiceAccountCredentials(AssertionCredentials):
|
||||
return not self._scopes
|
||||
|
||||
def create_scoped(self, scopes):
|
||||
return _ServiceAccountCredentials(self._service_account_id,
|
||||
self._service_account_email,
|
||||
self._private_key_id,
|
||||
self._private_key_pkcs8_text,
|
||||
scopes,
|
||||
user_agent=self._user_agent,
|
||||
token_uri=self._token_uri,
|
||||
revoke_uri=self._revoke_uri,
|
||||
**self._kwargs)
|
||||
result = self.__class__(self._service_account_email,
|
||||
self._signer,
|
||||
scopes=scopes,
|
||||
private_key_id=self._private_key_id,
|
||||
client_id=self.client_id,
|
||||
user_agent=self._user_agent,
|
||||
**self._kwargs)
|
||||
result.token_uri = self.token_uri
|
||||
result.revoke_uri = self.revoke_uri
|
||||
return result
|
||||
|
||||
@@ -3,7 +3,7 @@ import os
|
||||
|
||||
import httplib2
|
||||
from oauth2client import client
|
||||
from oauth2client import service_account
|
||||
from oauth2client.service_account import ServiceAccountCredentials
|
||||
|
||||
|
||||
JSON_KEY_PATH = os.getenv('OAUTH2CLIENT_TEST_JSON_KEY_PATH')
|
||||
@@ -51,18 +51,10 @@ def _check_user_info(credentials, expected_email):
|
||||
|
||||
|
||||
def run_json():
|
||||
with open(JSON_KEY_PATH, 'r') as file_object:
|
||||
client_credentials = json.load(file_object)
|
||||
|
||||
credentials = service_account._ServiceAccountCredentials(
|
||||
service_account_id=client_credentials['client_id'],
|
||||
service_account_email=client_credentials['client_email'],
|
||||
private_key_id=client_credentials['private_key_id'],
|
||||
private_key_pkcs8_text=client_credentials['private_key'],
|
||||
scopes=SCOPE,
|
||||
)
|
||||
|
||||
_check_user_info(credentials, client_credentials['client_email'])
|
||||
credentials = ServiceAccountCredentials.from_json_keyfile_name(
|
||||
JSON_KEY_PATH, scopes=SCOPE)
|
||||
service_account_email = credentials._service_account_email
|
||||
_check_user_info(credentials, service_account_email)
|
||||
|
||||
|
||||
def run_p12():
|
||||
|
||||
@@ -27,7 +27,7 @@ class TestPyCryptoVerifier(unittest.TestCase):
|
||||
PRIVATE_KEY_FILENAME = os.path.join(os.path.dirname(__file__),
|
||||
'data', 'privatekey.pem')
|
||||
|
||||
def _load_public_key_bytes(self):
|
||||
def _load_public_cert_bytes(self):
|
||||
with open(self.PUBLIC_CERT_FILENAME, 'rb') as fh:
|
||||
return fh.read()
|
||||
|
||||
@@ -40,24 +40,24 @@ class TestPyCryptoVerifier(unittest.TestCase):
|
||||
signer = PyCryptoSigner.from_string(self._load_private_key_bytes())
|
||||
actual_signature = signer.sign(to_sign)
|
||||
|
||||
verifier = PyCryptoVerifier.from_string(self._load_public_key_bytes(),
|
||||
verifier = PyCryptoVerifier.from_string(self._load_public_cert_bytes(),
|
||||
is_x509_cert=True)
|
||||
self.assertTrue(verifier.verify(to_sign, actual_signature))
|
||||
|
||||
def test_verify_failure(self):
|
||||
verifier = PyCryptoVerifier.from_string(self._load_public_key_bytes(),
|
||||
verifier = PyCryptoVerifier.from_string(self._load_public_cert_bytes(),
|
||||
is_x509_cert=True)
|
||||
bad_signature = b''
|
||||
self.assertFalse(verifier.verify(b'foo', bad_signature))
|
||||
|
||||
def test_verify_bad_key(self):
|
||||
verifier = PyCryptoVerifier.from_string(self._load_public_key_bytes(),
|
||||
verifier = PyCryptoVerifier.from_string(self._load_public_cert_bytes(),
|
||||
is_x509_cert=True)
|
||||
bad_signature = b''
|
||||
self.assertFalse(verifier.verify(b'foo', bad_signature))
|
||||
|
||||
def test_from_string_unicode_key(self):
|
||||
public_key = self._load_public_key_bytes()
|
||||
public_key = self._load_public_cert_bytes()
|
||||
public_key = public_key.decode('utf-8')
|
||||
verifier = PyCryptoVerifier.from_string(public_key, is_x509_cert=True)
|
||||
self.assertTrue(isinstance(verifier, PyCryptoVerifier))
|
||||
|
||||
@@ -80,7 +80,7 @@ from oauth2client.client import credentials_from_code
|
||||
from oauth2client.client import flow_from_clientsecrets
|
||||
from oauth2client.client import save_to_well_known_file
|
||||
from oauth2client.clientsecrets import _loadfile
|
||||
from oauth2client.service_account import _ServiceAccountCredentials
|
||||
from oauth2client.service_account import ServiceAccountCredentials
|
||||
|
||||
__author__ = 'jcgregorio@google.com (Joe Gregorio)'
|
||||
|
||||
@@ -157,8 +157,8 @@ class GoogleCredentialsTests(unittest2.TestCase):
|
||||
os.environ.pop(env, None)
|
||||
|
||||
def validate_service_account_credentials(self, credentials):
|
||||
self.assertTrue(isinstance(credentials, _ServiceAccountCredentials))
|
||||
self.assertEqual('123', credentials._service_account_id)
|
||||
self.assertTrue(isinstance(credentials, ServiceAccountCredentials))
|
||||
self.assertEqual('123', credentials.client_id)
|
||||
self.assertEqual('dummy@google.com',
|
||||
credentials._service_account_email)
|
||||
self.assertEqual('ABCDEF', credentials._private_key_id)
|
||||
@@ -619,8 +619,8 @@ class GoogleCredentialsTests(unittest2.TestCase):
|
||||
credentials_file)
|
||||
|
||||
def test_to_from_json_authorized_user(self):
|
||||
credentials_file = datafile(
|
||||
os.path.join('gcloud', 'application_default_credentials_authorized_user.json'))
|
||||
filename = 'application_default_credentials_authorized_user.json'
|
||||
credentials_file = datafile(os.path.join('gcloud', filename))
|
||||
creds = GoogleCredentials.from_stream(credentials_file)
|
||||
json = creds.to_json()
|
||||
creds2 = GoogleCredentials.from_json(json)
|
||||
|
||||
@@ -21,36 +21,41 @@ import datetime
|
||||
import json
|
||||
import os
|
||||
import rsa
|
||||
import unittest
|
||||
import tempfile
|
||||
|
||||
import mock
|
||||
import unittest2
|
||||
|
||||
from .http_mock import HttpMockSequence
|
||||
from oauth2client.service_account import _ServiceAccountCredentials
|
||||
from oauth2client import crypt
|
||||
from oauth2client.service_account import ServiceAccountCredentials
|
||||
from oauth2client.service_account import SERVICE_ACCOUNT
|
||||
|
||||
|
||||
def data_filename(filename):
|
||||
return os.path.join(os.path.dirname(__file__), 'data', filename)
|
||||
|
||||
|
||||
def datafile(filename):
|
||||
# TODO(orestica): Refactor this using pkgutil.get_data
|
||||
f = open(os.path.join(os.path.dirname(__file__), 'data', filename), 'rb')
|
||||
data = f.read()
|
||||
f.close()
|
||||
return data
|
||||
with open(data_filename(filename), 'rb') as file_obj:
|
||||
return file_obj.read()
|
||||
|
||||
|
||||
class ServiceAccountCredentialsTests(unittest.TestCase):
|
||||
class ServiceAccountCredentialsTests(unittest2.TestCase):
|
||||
|
||||
def setUp(self):
|
||||
self.service_account_id = '123'
|
||||
self.client_id = '123'
|
||||
self.service_account_email = 'dummy@google.com'
|
||||
self.private_key_id = 'ABCDEF'
|
||||
self.private_key = datafile('pem_from_pkcs12.pem')
|
||||
self.scopes = ['dummy_scope']
|
||||
self.credentials = _ServiceAccountCredentials(
|
||||
self.service_account_id,
|
||||
self.signer = crypt.Signer.from_string(self.private_key)
|
||||
self.credentials = ServiceAccountCredentials(
|
||||
self.service_account_email,
|
||||
self.private_key_id,
|
||||
self.private_key,
|
||||
[])
|
||||
self.signer,
|
||||
private_key_id=self.private_key_id,
|
||||
client_id=self.client_id,
|
||||
)
|
||||
|
||||
def test_sign_blob(self):
|
||||
private_key_id, signature = self.credentials.sign_blob('Google')
|
||||
@@ -71,23 +76,78 @@ class ServiceAccountCredentialsTests(unittest.TestCase):
|
||||
self.assertEqual(self.service_account_email,
|
||||
self.credentials.service_account_email)
|
||||
|
||||
@staticmethod
|
||||
def _from_json_keyfile_name_helper(payload, scopes=None):
|
||||
filehandle, filename = tempfile.mkstemp()
|
||||
os.close(filehandle)
|
||||
try:
|
||||
with open(filename, 'w') as file_obj:
|
||||
json.dump(payload, file_obj)
|
||||
return ServiceAccountCredentials.from_json_keyfile_name(
|
||||
filename, scopes=scopes)
|
||||
finally:
|
||||
os.remove(filename)
|
||||
|
||||
@mock.patch('oauth2client.crypt.Signer.from_string',
|
||||
return_value=object())
|
||||
def test_from_json_keyfile_name_factory(self, signer_factory):
|
||||
client_id = 'id123'
|
||||
client_email= 'foo@bar.com'
|
||||
private_key_id = 'pkid456'
|
||||
private_key = 's3kr3tz'
|
||||
payload = {
|
||||
'type': SERVICE_ACCOUNT,
|
||||
'client_id': client_id,
|
||||
'client_email': client_email,
|
||||
'private_key_id': private_key_id,
|
||||
'private_key': private_key,
|
||||
}
|
||||
scopes = ['foo', 'bar']
|
||||
creds = self._from_json_keyfile_name_helper(payload, scopes=scopes)
|
||||
self.assertIsInstance(creds, ServiceAccountCredentials)
|
||||
self.assertEqual(creds.client_id, client_id)
|
||||
self.assertEqual(creds._service_account_email, client_email)
|
||||
self.assertEqual(creds._private_key_id, private_key_id)
|
||||
self.assertEqual(creds._private_key_pkcs8_pem, private_key)
|
||||
self.assertEqual(creds._scopes, ' '.join(scopes))
|
||||
# Check stub.
|
||||
self.assertEqual(creds._signer, signer_factory.return_value)
|
||||
signer_factory.assert_called_once_with(private_key)
|
||||
|
||||
def test_from_json_keyfile_name_factory_bad_type(self):
|
||||
type_ = 'bad-type'
|
||||
self.assertNotEqual(type_, SERVICE_ACCOUNT)
|
||||
payload = {'type': type_}
|
||||
with self.assertRaises(ValueError):
|
||||
self._from_json_keyfile_name_helper(payload)
|
||||
|
||||
def test_from_json_keyfile_name_factory_missing_field(self):
|
||||
payload = {
|
||||
'type': SERVICE_ACCOUNT,
|
||||
'client_id': 'my-client',
|
||||
}
|
||||
with self.assertRaises(KeyError):
|
||||
self._from_json_keyfile_name_helper(payload)
|
||||
|
||||
def test_create_scoped_required_without_scopes(self):
|
||||
self.assertTrue(self.credentials.create_scoped_required())
|
||||
|
||||
def test_create_scoped_required_with_scopes(self):
|
||||
self.credentials = _ServiceAccountCredentials(
|
||||
self.service_account_id,
|
||||
signer = object()
|
||||
self.credentials = ServiceAccountCredentials(
|
||||
self.service_account_email,
|
||||
self.private_key_id,
|
||||
self.private_key,
|
||||
self.scopes)
|
||||
signer,
|
||||
scopes=self.scopes,
|
||||
private_key_id=self.private_key_id,
|
||||
client_id=self.client_id,
|
||||
)
|
||||
self.assertFalse(self.credentials.create_scoped_required())
|
||||
|
||||
def test_create_scoped(self):
|
||||
new_credentials = self.credentials.create_scoped(self.scopes)
|
||||
self.assertNotEqual(self.credentials, new_credentials)
|
||||
self.assertTrue(isinstance(new_credentials,
|
||||
_ServiceAccountCredentials))
|
||||
self.assertIsInstance(new_credentials,
|
||||
ServiceAccountCredentials)
|
||||
self.assertEqual('dummy_scope', new_credentials._scopes)
|
||||
|
||||
@mock.patch('oauth2client.client._UTCNOW')
|
||||
@@ -102,16 +162,12 @@ class ServiceAccountCredentialsTests(unittest.TestCase):
|
||||
signed_value = b'signed-content'
|
||||
signer.sign = mock.MagicMock(name='sign',
|
||||
return_value=signed_value)
|
||||
signer_patch = mock.patch('oauth2client.crypt.Signer.from_string',
|
||||
return_value=signer)
|
||||
with signer_patch as signer_factory:
|
||||
credentials = _ServiceAccountCredentials(
|
||||
self.service_account_id,
|
||||
self.service_account_email,
|
||||
self.private_key_id,
|
||||
self.private_key,
|
||||
'',
|
||||
)
|
||||
credentials = ServiceAccountCredentials(
|
||||
self.service_account_email,
|
||||
signer,
|
||||
private_key_id=self.private_key_id,
|
||||
client_id=self.client_id,
|
||||
)
|
||||
|
||||
# Begin testing.
|
||||
lifetime = 2 # number of seconds in which the token expires
|
||||
@@ -196,4 +252,4 @@ class ServiceAccountCredentialsTests(unittest.TestCase):
|
||||
|
||||
|
||||
if __name__ == '__main__': # pragma: NO COVER
|
||||
unittest.main()
|
||||
unittest2.main()
|
||||
|
||||
Reference in New Issue
Block a user