Add support for Google Default Credentials.

This commit is contained in:
Orest Bolohan
2014-05-30 11:05:24 -07:00
parent e48d95189d
commit 569f4e0d23
18 changed files with 1120 additions and 6 deletions

View File

@@ -164,6 +164,7 @@ class AppAssertionCredentials(AssertionCredentials):
unspecified, the default service account for the app is used.
"""
self.scope = util.scopes_to_string(scope)
self._kwargs = kwargs
self.service_account_id = kwargs.get('service_account_id', None)
# Assertion type is no longer used, but still in the parent class signature.
@@ -196,6 +197,12 @@ class AppAssertionCredentials(AssertionCredentials):
raise AccessTokenRefreshError(str(e))
self.access_token = token
def create_scoped_required(self):
return not self.scope
def create_scoped(self, scopes):
return AppAssertionCredentials(scopes, **self._kwargs)
class FlowProperty(db.Property):
"""App Engine datastore Property for Flow.

View File

@@ -66,6 +66,14 @@ OOB_CALLBACK_URN = 'urn:ietf:wg:oauth:2.0:oob'
# Google Data client libraries may need to set this to [401, 403].
REFRESH_STATUS_CODES = [401]
# The value representing user credentials.
AUTHORIZED_USER = 'authorized_user'
# The value representing service account credentials.
SERVICE_ACCOUNT = 'service_account'
# The environment variable pointing the file with local Default Credentials.
GOOGLE_CREDENTIALS_DEFAULT = 'GOOGLE_CREDENTIALS_DEFAULT'
class Error(Exception):
"""Base error for this module."""
@@ -99,6 +107,10 @@ class NonAsciiHeaderError(Error):
"""Header names and values must be ASCII strings."""
class DefaultCredentialsError(Error):
"""Error retrieving the Default Credentials."""
def _abstract():
raise NotImplementedError('You need to override this function')
@@ -126,7 +138,7 @@ class Credentials(object):
an HTTP transport.
Subclasses must also specify a classmethod named 'from_json' that takes a JSON
string as input and returns an instaniated Credentials object.
string as input and returns an instantiated Credentials object.
"""
NON_SERIALIZED_MEMBERS = ['store']
@@ -375,7 +387,7 @@ def _update_query_params(uri, params):
The same URI but with the new query parameters added.
"""
parts = list(urlparse.urlparse(uri))
query_params = dict(parse_qsl(parts[4])) # 4 is the index of the query part
query_params = dict(parse_qsl(parts[4])) # 4 is the index of the query part
query_params.update(params)
parts[4] = urllib.urlencode(query_params)
return urlparse.urlunparse(parts)
@@ -587,6 +599,20 @@ class OAuth2Credentials(Credentials):
return True
return False
def get_access_token(self, http=None):
"""Return the access token.
If the token does not exist, get one.
If the token expired, refresh it.
"""
if self.access_token and not self.access_token_expired:
return self.access_token
else:
if not http:
http = httplib2.Http()
self.refresh(http)
return self.access_token
def set_store(self, store):
"""Set the Storage for the credential.
@@ -820,7 +846,303 @@ class AccessTokenCredentials(OAuth2Credentials):
self._do_revoke(http_request, self.access_token)
class AssertionCredentials(OAuth2Credentials):
_env_name = None
def _get_environment(urllib2_urlopen=None):
"""Detect the environment the code is being run on."""
global _env_name
if _env_name:
return _env_name
server_software = os.environ.get('SERVER_SOFTWARE', '')
if server_software.startswith('Google App Engine/'):
_env_name = 'GAE_PRODUCTION'
elif server_software.startswith('Development/'):
_env_name = 'GAE_LOCAL'
else:
import urllib2
try:
if urllib2_urlopen is None:
urllib2_urlopen = urllib2.urlopen
response = urllib2_urlopen('http://metadata.google.internal')
if any('Metadata-Flavor: Google' in h for h in response.info().headers):
_env_name = 'GCE_PRODUCTION'
else:
_env_name = 'UNKNOWN'
except urllib2.URLError:
_env_name = 'UNKNOWN'
return _env_name
class GoogleCredentials(OAuth2Credentials):
"""Default credentials for use in calling Google APIs.
The Default Credentials are being constructed as a function of the environment
where the code is being run. More details can be found on this page:
https://developers.google.com/accounts/docs/default-credentials
Here is an example of how to use the Default Credentials for a service that
requires authentication:
<code>
from googleapiclient.discovery import build
from oauth2client.client import GoogleCredentials
PROJECT = 'bamboo-machine-422' # replace this with one of your projects
ZONE = 'us-central1-a' # replace this with the zone you care about
service = build('compute', 'v1', credentials=GoogleCredentials.get_default())
request = service.instances().list(project=PROJECT, zone=ZONE)
response = request.execute()
print response
</code>
A service that does not require authentication does not need credentials
to be passed in:
<code>
from googleapiclient.discovery import build
service = build('discovery', 'v1')
request = service.apis().list()
response = request.execute()
print response
</code>
"""
def __init__(self, access_token, client_id, client_secret, refresh_token,
token_expiry, token_uri, user_agent,
revoke_uri=GOOGLE_REVOKE_URI):
"""Create an instance of GoogleCredentials.
This constructor is not usually called by the user, instead
GoogleCredentials objects are instantiated by
GoogleCredentials.from_stream() or GoogleCredentials.get_default().
Args:
access_token: string, access token.
client_id: string, client identifier.
client_secret: string, client secret.
refresh_token: string, refresh token.
token_expiry: datetime, when the access_token expires.
token_uri: string, URI of token endpoint.
user_agent: string, The HTTP User-Agent to provide for this application.
revoke_uri: string, URI for revoke endpoint.
Defaults to GOOGLE_REVOKE_URI; a token can't be revoked if this is None.
"""
super(GoogleCredentials, self).__init__(
access_token, client_id, client_secret, refresh_token, token_expiry,
token_uri, user_agent, revoke_uri=revoke_uri)
def create_scoped_required(self):
"""Whether this Credentials object is scopeless.
create_scoped(scopes) method needs to be called in order to create
a Credentials object for API calls.
"""
return False
def create_scoped(self, scopes):
"""Create a Credentials object for the given scopes.
The Credentials type is preserved.
"""
return self
@staticmethod
def get_default():
"""Get the Default Credentials for the current environment.
Exceptions:
DefaultCredentialsError: raised when the credentials fail to be retrieved.
"""
_env_name = _get_environment()
if _env_name in ('GAE_PRODUCTION', 'GAE_LOCAL'):
# if we are running inside Google App Engine
# there is no need to look for credentials in local files
default_credential_filename = None
well_known_file = None
else:
default_credential_filename = _get_environment_variable_file()
well_known_file = _get_well_known_file()
if default_credential_filename:
try:
return _get_default_credential_from_file(default_credential_filename)
except (DefaultCredentialsError, ValueError) as error:
extra_help = (' (pointed to by ' + GOOGLE_CREDENTIALS_DEFAULT +
' environment variable)')
_raise_exception_for_reading_json(default_credential_filename,
extra_help, error)
elif well_known_file:
try:
return _get_default_credential_from_file(well_known_file)
except (DefaultCredentialsError, ValueError) as error:
extra_help = (' (produced automatically when running'
' "gcloud auth login" command)')
_raise_exception_for_reading_json(well_known_file, extra_help, error)
elif _env_name in ('GAE_PRODUCTION', 'GAE_LOCAL'):
return _get_default_credential_GAE()
elif _env_name == 'GCE_PRODUCTION':
return _get_default_credential_GCE()
else:
raise DefaultCredentialsError(
"The Default Credentials are not available. They are available if "
"running in Google App Engine or Google Compute Engine. They are "
"also available if using the Google Cloud SDK and running 'gcloud "
"auth login'. Otherwise, the environment variable " +
GOOGLE_CREDENTIALS_DEFAULT + " must be defined pointing to a file "
"defining the credentials. "
"See https://developers.google.com/accounts/docs/default-credentials "
"for details.")
@staticmethod
def from_stream(credential_filename):
"""Create a Credentials object by reading the information from a given file.
It returns an object of type GoogleCredentials.
Args:
credential_filename: the path to the file from where the credentials
are to be read
Exceptions:
DefaultCredentialsError: raised when the credentials fail to be retrieved.
"""
if credential_filename and os.path.isfile(credential_filename):
try:
return _get_default_credential_from_file(credential_filename)
except (DefaultCredentialsError, ValueError) as error:
extra_help = ' (provided as parameter to the from_stream() method)'
_raise_exception_for_reading_json(credential_filename,
extra_help,
error)
else:
raise DefaultCredentialsError('The parameter passed to the from_stream()'
' method should point to a file.')
def _get_environment_variable_file():
default_credential_filename = os.environ.get(GOOGLE_CREDENTIALS_DEFAULT,
None)
if default_credential_filename:
if os.path.isfile(default_credential_filename):
return default_credential_filename
else:
raise DefaultCredentialsError(
'File ' + default_credential_filename + ' (pointed by ' +
GOOGLE_CREDENTIALS_DEFAULT + ' environment variable) does not exist!')
def _get_well_known_file():
"""Get the well known file produced by command 'gcloud auth login'."""
# TODO(orestica): Revisit this method once gcloud provides a better way
# of pinpointing the exact location of the file.
WELL_KNOWN_CREDENTIALS_FILE = 'credentials_default.json'
CLOUDSDK_CONFIG_DIRECTORY = 'gcloud'
if os.name == 'nt':
try:
default_config_path = os.path.join(os.environ['APPDATA'],
CLOUDSDK_CONFIG_DIRECTORY)
except KeyError:
# This should never happen unless someone is really messing with things.
drive = os.environ.get('SystemDrive', 'C:')
default_config_path = os.path.join(drive, '\\', CLOUDSDK_CONFIG_DIRECTORY)
else:
default_config_path = os.path.join(os.path.expanduser('~'),
'.config',
CLOUDSDK_CONFIG_DIRECTORY)
default_config_path = os.path.join(default_config_path,
WELL_KNOWN_CREDENTIALS_FILE)
if os.path.isfile(default_config_path):
return default_config_path
def _get_default_credential_from_file(default_credential_filename):
"""Build the Default Credentials from file."""
import service_account
# read the credentials from the file
with open(default_credential_filename) as default_credential:
client_credentials = service_account.simplejson.load(default_credential)
credentials_type = client_credentials.get('type')
if credentials_type == AUTHORIZED_USER:
required_fields = set(['client_id', 'client_secret', 'refresh_token'])
elif credentials_type == SERVICE_ACCOUNT:
required_fields = set(['client_id', 'client_email', 'private_key_id',
'private_key'])
else:
raise DefaultCredentialsError("'type' field should be defined "
"(and have one of the '" + AUTHORIZED_USER +
"' or '" + SERVICE_ACCOUNT + "' values)")
missing_fields = required_fields.difference(client_credentials.keys())
if missing_fields:
_raise_exception_for_missing_fields(missing_fields)
if client_credentials['type'] == AUTHORIZED_USER:
return GoogleCredentials(
access_token=None,
client_id=client_credentials['client_id'],
client_secret=client_credentials['client_secret'],
refresh_token=client_credentials['refresh_token'],
token_expiry=None,
token_uri=GOOGLE_TOKEN_URI,
user_agent='Python client library')
else: # client_credentials['type'] == SERVICE_ACCOUNT
return service_account._ServiceAccountCredentials(
service_account_id=client_credentials['client_id'],
service_account_email=client_credentials['client_email'],
private_key_id=client_credentials['private_key_id'],
private_key_pkcs8_text=client_credentials['private_key'],
scopes=[])
def _raise_exception_for_missing_fields(missing_fields):
raise DefaultCredentialsError('The following field(s): ' +
', '.join(missing_fields) + ' must be defined.')
def _raise_exception_for_reading_json(credential_file,
extra_help,
error):
raise DefaultCredentialsError('An error was encountered while reading '
'json file: '+ credential_file + extra_help +
': ' + str(error))
def _get_default_credential_GAE():
from oauth2client.appengine import AppAssertionCredentials
return AppAssertionCredentials([])
def _get_default_credential_GCE():
from oauth2client.gce import AppAssertionCredentials
return AppAssertionCredentials([])
class AssertionCredentials(GoogleCredentials):
"""Abstract Credentials object used for OAuth 2.0 assertion grants.
This credential does not require a flow to instantiate because it
@@ -899,7 +1221,7 @@ if HAS_CRYPTO:
later. For App Engine you may also consider using AppAssertionCredentials.
"""
MAX_TOKEN_LIFETIME_SECS = 3600 # 1 hour in seconds
MAX_TOKEN_LIFETIME_SECS = 3600 # 1 hour in seconds
@util.positional(4)
def __init__(self,

View File

@@ -57,6 +57,7 @@ class AppAssertionCredentials(AssertionCredentials):
requested.
"""
self.scope = util.scopes_to_string(scope)
self.kwargs = kwargs
# Assertion type is no longer used, but still in the parent class signature.
super(AppAssertionCredentials, self).__init__(None)
@@ -87,4 +88,14 @@ class AppAssertionCredentials(AssertionCredentials):
raise AccessTokenRefreshError(str(e))
self.access_token = d['accessToken']
else:
if response.status == 404:
content = content + (' This can occur if a VM was created'
' with no service account or scopes.')
raise AccessTokenRefreshError(content)
def create_scoped_required(self):
return not self.scope
def create_scoped(self, scopes):
return AppAssertionCredentials(scopes,
**self.kwargs)

View File

@@ -0,0 +1,121 @@
# Copyright (C) 2014 Google Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""A service account credentials class.
This credentials class is implemented on top of rsa library.
"""
import base64
import rsa
import time
import types
from oauth2client import GOOGLE_REVOKE_URI
from oauth2client import GOOGLE_TOKEN_URI
from oauth2client import util
from oauth2client.anyjson import simplejson
from oauth2client.client import AssertionCredentials
from pyasn1.codec.ber import decoder
from pyasn1_modules.rfc5208 import PrivateKeyInfo
class _ServiceAccountCredentials(AssertionCredentials):
"""Class representing a service account (signed JWT) credential."""
MAX_TOKEN_LIFETIME_SECS = 3600 # 1 hour in seconds
def __init__(self, service_account_id, service_account_email, private_key_id,
private_key_pkcs8_text, scopes, user_agent=None,
token_uri=GOOGLE_TOKEN_URI, revoke_uri=GOOGLE_REVOKE_URI, **kwargs):
super(_ServiceAccountCredentials, self).__init__(
None, user_agent=user_agent, token_uri=token_uri, revoke_uri=revoke_uri)
self._service_account_id = service_account_id
self._service_account_email = service_account_email
self._private_key_id = private_key_id
self._private_key = _get_private_key(private_key_pkcs8_text)
self._private_key_pkcs8_text = private_key_pkcs8_text
self._scopes = util.scopes_to_string(scopes)
self._user_agent = user_agent
self._token_uri = token_uri
self._revoke_uri = revoke_uri
self._kwargs = kwargs
def _generate_assertion(self):
"""Generate the assertion that will be used in the request."""
header = {
'alg': 'RS256',
'typ': 'JWT',
'kid': self._private_key_id
}
now = long(time.time())
payload = {
'aud': self._token_uri,
'scope': self._scopes,
'iat': now,
'exp': now + _ServiceAccountCredentials.MAX_TOKEN_LIFETIME_SECS,
'iss': self._service_account_email
}
payload.update(self._kwargs)
assertion_input = '%s.%s' % (
_urlsafe_b64encode(header),
_urlsafe_b64encode(payload))
# Sign the assertion.
signature = base64.urlsafe_b64encode(rsa.pkcs1.sign(
assertion_input, self._private_key, 'SHA-256')).rstrip('=')
return '%s.%s' % (assertion_input, signature)
def sign_blob(self, blob):
return (self._private_key_id,
rsa.pkcs1.sign(blob, self._private_key, 'SHA-256'))
@property
def service_account_email(self):
return self._service_account_email
def create_scoped_required(self):
return not self._scopes
def create_scoped(self, scopes):
return _ServiceAccountCredentials(self._service_account_id,
self._service_account_email,
self._private_key_id,
self._private_key_pkcs8_text,
scopes,
user_agent=self._user_agent,
token_uri=self._token_uri,
revoke_uri=self._revoke_uri,
**self._kwargs)
def _urlsafe_b64encode(data):
return base64.urlsafe_b64encode(
simplejson.dumps(data, separators = (',', ':'))\
.encode('UTF-8')).rstrip('=')
def _get_private_key(private_key_pkcs8_text):
"""Get an RSA private key object from a pkcs8 representation."""
der = rsa.pem.load_pem(private_key_pkcs8_text, 'PRIVATE KEY')
asn1_private_key, _ = decoder.decode(der, asn1Spec=PrivateKeyInfo())
return rsa.PrivateKey.load_pkcs1(
asn1_private_key.getComponentByName('privateKey').asOctets(),
format='DER')

View File

@@ -0,0 +1,15 @@
# To be used to test GoogleCredential.GetDefaultCredential()
# from local machine and GCE.
from googleapiclient.discovery import build
from oauth2client.client import GoogleCredentials
PROJECT = "bamboo-machine-422" # Provide your own GCE project here
ZONE = "us-central1-a" # Put here a zone which has some VMs
service = build("compute", "v1", credentials=GoogleCredentials.get_default())
request = service.instances().list(project=PROJECT, zone=ZONE)
response = request.execute()
print response

View File

@@ -0,0 +1,10 @@
application: bamboo-machine-422
version: 2
runtime: python27
api_version: 1
threadsafe: true
handlers:
- url: /.*
script: call_compute_service_from_gae.app

View File

@@ -0,0 +1,21 @@
# To be used to test GoogleCredential.GetDefaultCredential()
# from devel GAE (ie, dev_appserver.py).
import webapp2
from googleapiclient.discovery import build
from oauth2client.client import GoogleCredentials
PROJECT = "bamboo-machine-422" # Provide your own GCE project here
ZONE = "us-central1-a" # Put here a zone which has some VMs
def get_instances():
service = build("compute", "v1", credentials=GoogleCredentials.get_default())
request = service.instances().list(project=PROJECT, zone=ZONE)
return request.execute()
class MainPage(webapp2.RequestHandler):
def get(self):
self.response.write(get_instances())
app = webapp2.WSGIApplication([('/', MainPage),], debug=True)

View File

@@ -26,6 +26,9 @@ packages = [
install_requires = [
'httplib2>=0.8',
'pyasn1==0.1.7',
'pyasn1_modules==0.0.5',
'rsa==3.1.4',
]
needs_json = False

View File

@@ -0,0 +1,9 @@
{
"type": "service_account",
"client_id": "123",
"client_secret": "secret",
"refresh_token": "alabalaportocala",
"client_email": "dummy@google.com",
"private_key_id": "ABCDEF",
"private_key": "Bag Attributes\n friendlyName: key\n localKeyID: 22 7E 04 FC 64 48 20 83 1E C1 BD E3 F5 2F 44 7D EA 99 A5 BC\nKey Attributes: <No Attributes>\n-----BEGIN PRIVATE KEY-----\nMIIEvgIBADANBgkqhkiG9w0BAQEFAASCBKgwggSkAgEAAoIBAQDh6PSnttDsv+vi\ntUZTP1E3hVBah6PUGDWZhYgNiyW8quTWCmPvBmCR2YzuhUrY5+CtKP8UJOQico+p\noJHSAPsrzSr6YsGs3c9SQOslBmm9Fkh9/f/GZVTVZ6u5AsUmOcVvZ2q7Sz8Vj/aR\naIm0EJqRe9cQ5vvN9sg25rIv4xKwIZJ1VixKWJLmpCmDINqn7xvl+ldlUmSr3aGt\nw21uSDuEJhQlzO3yf2FwJMkJ9SkCm9oVDXyl77OnKXj5bOQ/rojbyGeIxDJSUDWE\nGKyRPuqKi6rSbwg6h2G/Z9qBJkqM5NNTbGRIFz/9/LdmmwvtaqCxlLtD7RVEryAp\n+qTGDk5hAgMBAAECggEBAMYYfNDEYpf4A2SdCLne/9zrrfZ0kphdUkL48MDPj5vN\nTzTRj6f9s5ixZ/+QKn3hdwbguCx13QbH5mocP0IjUhyqoFFHYAWxyyaZfpjM8tO4\nQoEYxby3BpjLe62UXESUzChQSytJZFwIDXKcdIPNO3zvVzufEJcfG5no2b9cIvsG\nDy6J1FNILWxCtDIqBM+G1B1is9DhZnUDgn0iKzINiZmh1I1l7k/4tMnozVIKAfwo\nf1kYjG/d2IzDM02mTeTElz3IKeNriaOIYTZgI26xLJxTkiFnBV4JOWFAZw15X+yR\n+DrjGSIkTfhzbLa20Vt3AFM+LFK0ZoXT2dRnjbYPjQECgYEA+9XJFGwLcEX6pl1p\nIwXAjXKJdju9DDn4lmHTW0Pbw25h1EXONwm/NPafwsWmPll9kW9IwsxUQVUyBC9a\nc3Q7rF1e8ai/qqVFRIZof275MI82ciV2Mw8Hz7FPAUyoju5CvnjAEH4+irt1VE/7\nSgdvQ1gDBQFegS69ijdz+cOhFxkCgYEA5aVoseMy/gIlsCvNPyw9+Jz/zBpKItX0\njGzdF7lhERRO2cursujKaoHntRckHcE3P/Z4K565bvVq+VaVG0T/BcBKPmPHrLmY\niuVXidltW7Jh9/RCVwb5+BvqlwlC470PEwhqoUatY/fPJ74srztrqJHvp1L29FT5\nsdmlJW8YwokCgYAUa3dMgp5C0knKp5RY1KSSU5E11w4zKZgwiWob4lq1dAPWtHpO\nGCo63yyBHImoUJVP75gUw4Cpc4EEudo5tlkIVuHV8nroGVKOhd9/Rb5K47Hke4kk\nBrn5a0Ues9qPDF65Fw1ryPDFSwHufjXAAO5SpZZJF51UGDgiNvDedbBgMQKBgHSk\nt7DjPhtW69234eCckD2fQS5ijBV1p2lMQmCygGM0dXiawvN02puOsCqDPoz+fxm2\nDwPY80cw0M0k9UeMnBxHt25JMDrDan/iTbxu++T/jlNrdebOXFlxlI5y3c7fULDS\nLZcNVzTXwhjlt7yp6d0NgzTyJw2ju9BiREfnTiRBAoGBAOPHrTOnPyjO+bVcCPTB\nWGLsbBd77mVPGIuL0XGrvbVYPE8yIcNbZcthd8VXL/38Ygy8SIZh2ZqsrU1b5WFa\nXUMLnGEODSS8x/GmW3i3KeirW5OxBNjfUzEF4XkJP8m41iTdsQEXQf9DdUY7X+CB\nVL5h7N0VstYhGgycuPpcIUQa\n-----END PRIVATE KEY-----\n"
}

View File

@@ -0,0 +1,9 @@
{
"type": "authorized_user",
"client_id": "123",
"client_secret": "secret",
"refresh_token": "alabalaportocala",
"client_email": "dummy@google.com",
"private_key_id": "ABCDEF",
"private_key": "Bag Attributes\n friendlyName: key\n localKeyID: 22 7E 04 FC 64 48 20 83 1E C1 BD E3 F5 2F 44 7D EA 99 A5 BC\nKey Attributes: <No Attributes>\n-----BEGIN PRIVATE KEY-----\nMIIEvgIBADANBgkqhkiG9w0BAQEFAASCBKgwggSkAgEAAoIBAQDh6PSnttDsv+vi\ntUZTP1E3hVBah6PUGDWZhYgNiyW8quTWCmPvBmCR2YzuhUrY5+CtKP8UJOQico+p\noJHSAPsrzSr6YsGs3c9SQOslBmm9Fkh9/f/GZVTVZ6u5AsUmOcVvZ2q7Sz8Vj/aR\naIm0EJqRe9cQ5vvN9sg25rIv4xKwIZJ1VixKWJLmpCmDINqn7xvl+ldlUmSr3aGt\nw21uSDuEJhQlzO3yf2FwJMkJ9SkCm9oVDXyl77OnKXj5bOQ/rojbyGeIxDJSUDWE\nGKyRPuqKi6rSbwg6h2G/Z9qBJkqM5NNTbGRIFz/9/LdmmwvtaqCxlLtD7RVEryAp\n+qTGDk5hAgMBAAECggEBAMYYfNDEYpf4A2SdCLne/9zrrfZ0kphdUkL48MDPj5vN\nTzTRj6f9s5ixZ/+QKn3hdwbguCx13QbH5mocP0IjUhyqoFFHYAWxyyaZfpjM8tO4\nQoEYxby3BpjLe62UXESUzChQSytJZFwIDXKcdIPNO3zvVzufEJcfG5no2b9cIvsG\nDy6J1FNILWxCtDIqBM+G1B1is9DhZnUDgn0iKzINiZmh1I1l7k/4tMnozVIKAfwo\nf1kYjG/d2IzDM02mTeTElz3IKeNriaOIYTZgI26xLJxTkiFnBV4JOWFAZw15X+yR\n+DrjGSIkTfhzbLa20Vt3AFM+LFK0ZoXT2dRnjbYPjQECgYEA+9XJFGwLcEX6pl1p\nIwXAjXKJdju9DDn4lmHTW0Pbw25h1EXONwm/NPafwsWmPll9kW9IwsxUQVUyBC9a\nc3Q7rF1e8ai/qqVFRIZof275MI82ciV2Mw8Hz7FPAUyoju5CvnjAEH4+irt1VE/7\nSgdvQ1gDBQFegS69ijdz+cOhFxkCgYEA5aVoseMy/gIlsCvNPyw9+Jz/zBpKItX0\njGzdF7lhERRO2cursujKaoHntRckHcE3P/Z4K565bvVq+VaVG0T/BcBKPmPHrLmY\niuVXidltW7Jh9/RCVwb5+BvqlwlC470PEwhqoUatY/fPJ74srztrqJHvp1L29FT5\nsdmlJW8YwokCgYAUa3dMgp5C0knKp5RY1KSSU5E11w4zKZgwiWob4lq1dAPWtHpO\nGCo63yyBHImoUJVP75gUw4Cpc4EEudo5tlkIVuHV8nroGVKOhd9/Rb5K47Hke4kk\nBrn5a0Ues9qPDF65Fw1ryPDFSwHufjXAAO5SpZZJF51UGDgiNvDedbBgMQKBgHSk\nt7DjPhtW69234eCckD2fQS5ijBV1p2lMQmCygGM0dXiawvN02puOsCqDPoz+fxm2\nDwPY80cw0M0k9UeMnBxHt25JMDrDan/iTbxu++T/jlNrdebOXFlxlI5y3c7fULDS\nLZcNVzTXwhjlt7yp6d0NgzTyJw2ju9BiREfnTiRBAoGBAOPHrTOnPyjO+bVcCPTB\nWGLsbBd77mVPGIuL0XGrvbVYPE8yIcNbZcthd8VXL/38Ygy8SIZh2ZqsrU1b5WFa\nXUMLnGEODSS8x/GmW3i3KeirW5OxBNjfUzEF4XkJP8m41iTdsQEXQf9DdUY7X+CB\nVL5h7N0VstYhGgycuPpcIUQa\n-----END PRIVATE KEY-----\n"
}

View File

@@ -0,0 +1,9 @@
{
"type": "serviceaccount",
"client_id": "123",
"client_secret": "secret",
"refresh_token": "alabalaportocala",
"client_email": "dummy@google.com",
"private_key_id": "ABCDEF",
"private_key": "Bag Attributes\n friendlyName: key\n localKeyID: 22 7E 04 FC 64 48 20 83 1E C1 BD E3 F5 2F 44 7D EA 99 A5 BC\nKey Attributes: <No Attributes>\n-----BEGIN PRIVATE KEY-----\nMIIEvgIBADANBgkqhkiG9w0BAQEFAASCBKgwggSkAgEAAoIBAQDh6PSnttDsv+vi\ntUZTP1E3hVBah6PUGDWZhYgNiyW8quTWCmPvBmCR2YzuhUrY5+CtKP8UJOQico+p\noJHSAPsrzSr6YsGs3c9SQOslBmm9Fkh9/f/GZVTVZ6u5AsUmOcVvZ2q7Sz8Vj/aR\naIm0EJqRe9cQ5vvN9sg25rIv4xKwIZJ1VixKWJLmpCmDINqn7xvl+ldlUmSr3aGt\nw21uSDuEJhQlzO3yf2FwJMkJ9SkCm9oVDXyl77OnKXj5bOQ/rojbyGeIxDJSUDWE\nGKyRPuqKi6rSbwg6h2G/Z9qBJkqM5NNTbGRIFz/9/LdmmwvtaqCxlLtD7RVEryAp\n+qTGDk5hAgMBAAECggEBAMYYfNDEYpf4A2SdCLne/9zrrfZ0kphdUkL48MDPj5vN\nTzTRj6f9s5ixZ/+QKn3hdwbguCx13QbH5mocP0IjUhyqoFFHYAWxyyaZfpjM8tO4\nQoEYxby3BpjLe62UXESUzChQSytJZFwIDXKcdIPNO3zvVzufEJcfG5no2b9cIvsG\nDy6J1FNILWxCtDIqBM+G1B1is9DhZnUDgn0iKzINiZmh1I1l7k/4tMnozVIKAfwo\nf1kYjG/d2IzDM02mTeTElz3IKeNriaOIYTZgI26xLJxTkiFnBV4JOWFAZw15X+yR\n+DrjGSIkTfhzbLa20Vt3AFM+LFK0ZoXT2dRnjbYPjQECgYEA+9XJFGwLcEX6pl1p\nIwXAjXKJdju9DDn4lmHTW0Pbw25h1EXONwm/NPafwsWmPll9kW9IwsxUQVUyBC9a\nc3Q7rF1e8ai/qqVFRIZof275MI82ciV2Mw8Hz7FPAUyoju5CvnjAEH4+irt1VE/7\nSgdvQ1gDBQFegS69ijdz+cOhFxkCgYEA5aVoseMy/gIlsCvNPyw9+Jz/zBpKItX0\njGzdF7lhERRO2cursujKaoHntRckHcE3P/Z4K565bvVq+VaVG0T/BcBKPmPHrLmY\niuVXidltW7Jh9/RCVwb5+BvqlwlC470PEwhqoUatY/fPJ74srztrqJHvp1L29FT5\nsdmlJW8YwokCgYAUa3dMgp5C0knKp5RY1KSSU5E11w4zKZgwiWob4lq1dAPWtHpO\nGCo63yyBHImoUJVP75gUw4Cpc4EEudo5tlkIVuHV8nroGVKOhd9/Rb5K47Hke4kk\nBrn5a0Ues9qPDF65Fw1ryPDFSwHufjXAAO5SpZZJF51UGDgiNvDedbBgMQKBgHSk\nt7DjPhtW69234eCckD2fQS5ijBV1p2lMQmCygGM0dXiawvN02puOsCqDPoz+fxm2\nDwPY80cw0M0k9UeMnBxHt25JMDrDan/iTbxu++T/jlNrdebOXFlxlI5y3c7fULDS\nLZcNVzTXwhjlt7yp6d0NgzTyJw2ju9BiREfnTiRBAoGBAOPHrTOnPyjO+bVcCPTB\nWGLsbBd77mVPGIuL0XGrvbVYPE8yIcNbZcthd8VXL/38Ygy8SIZh2ZqsrU1b5WFa\nXUMLnGEODSS8x/GmW3i3KeirW5OxBNjfUzEF4XkJP8m41iTdsQEXQf9DdUY7X+CB\nVL5h7N0VstYhGgycuPpcIUQa\n-----END PRIVATE KEY-----\n"
}

View File

@@ -0,0 +1,8 @@
{
"type": "service_account",
"client_id": "123",
"client_secret": "secret",
"refresh_token": "alabalaportocala",
"client_email": "dummy@google.com",
"private_key": "Bag Attributes\n friendlyName: key\n localKeyID: 22 7E 04 FC 64 48 20 83 1E C1 BD E3 F5 2F 44 7D EA 99 A5 BC\nKey Attributes: <No Attributes>\n-----BEGIN PRIVATE KEY-----\nMIIEvgIBADANBgkqhkiG9w0BAQEFAASCBKgwggSkAgEAAoIBAQDh6PSnttDsv+vi\ntUZTP1E3hVBah6PUGDWZhYgNiyW8quTWCmPvBmCR2YzuhUrY5+CtKP8UJOQico+p\noJHSAPsrzSr6YsGs3c9SQOslBmm9Fkh9/f/GZVTVZ6u5AsUmOcVvZ2q7Sz8Vj/aR\naIm0EJqRe9cQ5vvN9sg25rIv4xKwIZJ1VixKWJLmpCmDINqn7xvl+ldlUmSr3aGt\nw21uSDuEJhQlzO3yf2FwJMkJ9SkCm9oVDXyl77OnKXj5bOQ/rojbyGeIxDJSUDWE\nGKyRPuqKi6rSbwg6h2G/Z9qBJkqM5NNTbGRIFz/9/LdmmwvtaqCxlLtD7RVEryAp\n+qTGDk5hAgMBAAECggEBAMYYfNDEYpf4A2SdCLne/9zrrfZ0kphdUkL48MDPj5vN\nTzTRj6f9s5ixZ/+QKn3hdwbguCx13QbH5mocP0IjUhyqoFFHYAWxyyaZfpjM8tO4\nQoEYxby3BpjLe62UXESUzChQSytJZFwIDXKcdIPNO3zvVzufEJcfG5no2b9cIvsG\nDy6J1FNILWxCtDIqBM+G1B1is9DhZnUDgn0iKzINiZmh1I1l7k/4tMnozVIKAfwo\nf1kYjG/d2IzDM02mTeTElz3IKeNriaOIYTZgI26xLJxTkiFnBV4JOWFAZw15X+yR\n+DrjGSIkTfhzbLa20Vt3AFM+LFK0ZoXT2dRnjbYPjQECgYEA+9XJFGwLcEX6pl1p\nIwXAjXKJdju9DDn4lmHTW0Pbw25h1EXONwm/NPafwsWmPll9kW9IwsxUQVUyBC9a\nc3Q7rF1e8ai/qqVFRIZof275MI82ciV2Mw8Hz7FPAUyoju5CvnjAEH4+irt1VE/7\nSgdvQ1gDBQFegS69ijdz+cOhFxkCgYEA5aVoseMy/gIlsCvNPyw9+Jz/zBpKItX0\njGzdF7lhERRO2cursujKaoHntRckHcE3P/Z4K565bvVq+VaVG0T/BcBKPmPHrLmY\niuVXidltW7Jh9/RCVwb5+BvqlwlC470PEwhqoUatY/fPJ74srztrqJHvp1L29FT5\nsdmlJW8YwokCgYAUa3dMgp5C0knKp5RY1KSSU5E11w4zKZgwiWob4lq1dAPWtHpO\nGCo63yyBHImoUJVP75gUw4Cpc4EEudo5tlkIVuHV8nroGVKOhd9/Rb5K47Hke4kk\nBrn5a0Ues9qPDF65Fw1ryPDFSwHufjXAAO5SpZZJF51UGDgiNvDedbBgMQKBgHSk\nt7DjPhtW69234eCckD2fQS5ijBV1p2lMQmCygGM0dXiawvN02puOsCqDPoz+fxm2\nDwPY80cw0M0k9UeMnBxHt25JMDrDan/iTbxu++T/jlNrdebOXFlxlI5y3c7fULDS\nLZcNVzTXwhjlt7yp6d0NgzTyJw2ju9BiREfnTiRBAoGBAOPHrTOnPyjO+bVcCPTB\nWGLsbBd77mVPGIuL0XGrvbVYPE8yIcNbZcthd8VXL/38Ygy8SIZh2ZqsrU1b5WFa\nXUMLnGEODSS8x/GmW3i3KeirW5OxBNjfUzEF4XkJP8m41iTdsQEXQf9DdUY7X+CB\nVL5h7N0VstYhGgycuPpcIUQa\n-----END PRIVATE KEY-----\n"
}

View File

@@ -0,0 +1,9 @@
{
"type": "service_account"
"client_id": "123",
"client_secret": "secret",
"refresh_token": "alabalaportocala",
"client_email": "dummy@google.com",
"private_key_id": "ABCDEF",
"private_key": "Bag Attributes\n friendlyName: key\n localKeyID: 22 7E 04 FC 64 48 20 83 1E C1 BD E3 F5 2F 44 7D EA 99 A5 BC\nKey Attributes: <No Attributes>\n-----BEGIN PRIVATE KEY-----\nMIIEvgIBADANBgkqhkiG9w0BAQEFAASCBKgwggSkAgEAAoIBAQDh6PSnttDsv+vi\ntUZTP1E3hVBah6PUGDWZhYgNiyW8quTWCmPvBmCR2YzuhUrY5+CtKP8UJOQico+p\noJHSAPsrzSr6YsGs3c9SQOslBmm9Fkh9/f/GZVTVZ6u5AsUmOcVvZ2q7Sz8Vj/aR\naIm0EJqRe9cQ5vvN9sg25rIv4xKwIZJ1VixKWJLmpCmDINqn7xvl+ldlUmSr3aGt\nw21uSDuEJhQlzO3yf2FwJMkJ9SkCm9oVDXyl77OnKXj5bOQ/rojbyGeIxDJSUDWE\nGKyRPuqKi6rSbwg6h2G/Z9qBJkqM5NNTbGRIFz/9/LdmmwvtaqCxlLtD7RVEryAp\n+qTGDk5hAgMBAAECggEBAMYYfNDEYpf4A2SdCLne/9zrrfZ0kphdUkL48MDPj5vN\nTzTRj6f9s5ixZ/+QKn3hdwbguCx13QbH5mocP0IjUhyqoFFHYAWxyyaZfpjM8tO4\nQoEYxby3BpjLe62UXESUzChQSytJZFwIDXKcdIPNO3zvVzufEJcfG5no2b9cIvsG\nDy6J1FNILWxCtDIqBM+G1B1is9DhZnUDgn0iKzINiZmh1I1l7k/4tMnozVIKAfwo\nf1kYjG/d2IzDM02mTeTElz3IKeNriaOIYTZgI26xLJxTkiFnBV4JOWFAZw15X+yR\n+DrjGSIkTfhzbLa20Vt3AFM+LFK0ZoXT2dRnjbYPjQECgYEA+9XJFGwLcEX6pl1p\nIwXAjXKJdju9DDn4lmHTW0Pbw25h1EXONwm/NPafwsWmPll9kW9IwsxUQVUyBC9a\nc3Q7rF1e8ai/qqVFRIZof275MI82ciV2Mw8Hz7FPAUyoju5CvnjAEH4+irt1VE/7\nSgdvQ1gDBQFegS69ijdz+cOhFxkCgYEA5aVoseMy/gIlsCvNPyw9+Jz/zBpKItX0\njGzdF7lhERRO2cursujKaoHntRckHcE3P/Z4K565bvVq+VaVG0T/BcBKPmPHrLmY\niuVXidltW7Jh9/RCVwb5+BvqlwlC470PEwhqoUatY/fPJ74srztrqJHvp1L29FT5\nsdmlJW8YwokCgYAUa3dMgp5C0knKp5RY1KSSU5E11w4zKZgwiWob4lq1dAPWtHpO\nGCo63yyBHImoUJVP75gUw4Cpc4EEudo5tlkIVuHV8nroGVKOhd9/Rb5K47Hke4kk\nBrn5a0Ues9qPDF65Fw1ryPDFSwHufjXAAO5SpZZJF51UGDgiNvDedbBgMQKBgHSk\nt7DjPhtW69234eCckD2fQS5ijBV1p2lMQmCygGM0dXiawvN02puOsCqDPoz+fxm2\nDwPY80cw0M0k9UeMnBxHt25JMDrDan/iTbxu++T/jlNrdebOXFlxlI5y3c7fULDS\nLZcNVzTXwhjlt7yp6d0NgzTyJw2ju9BiREfnTiRBAoGBAOPHrTOnPyjO+bVcCPTB\nWGLsbBd77mVPGIuL0XGrvbVYPE8yIcNbZcthd8VXL/38Ygy8SIZh2ZqsrU1b5WFa\nXUMLnGEODSS8x/GmW3i3KeirW5OxBNjfUzEF4XkJP8m41iTdsQEXQf9DdUY7X+CB\nVL5h7N0VstYhGgycuPpcIUQa\n-----END PRIVATE KEY-----\n"
}

View File

@@ -0,0 +1,9 @@
-----BEGIN PUBLIC KEY-----
MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEA4ej0p7bQ7L/r4rVGUz9R
N4VQWoej1Bg1mYWIDYslvKrk1gpj7wZgkdmM7oVK2OfgrSj/FCTkInKPqaCR0gD7
K80q+mLBrN3PUkDrJQZpvRZIff3/xmVU1WeruQLFJjnFb2dqu0s/FY/2kWiJtBCa
kXvXEOb7zfbINuayL+MSsCGSdVYsSliS5qQpgyDap+8b5fpXZVJkq92hrcNtbkg7
hCYUJczt8n9hcCTJCfUpApvaFQ18pe+zpyl4+WzkP66I28hniMQyUlA1hBiskT7q
iouq0m8IOodhv2fagSZKjOTTU2xkSBc//fy3ZpsL7WqgsZS7Q+0VRK8gKfqkxg5O
YQIDAQAB
-----END PUBLIC KEY-----

View File

@@ -220,6 +220,33 @@ class TestAppAssertionCredentials(unittest.TestCase):
self.assertEqual('a_token_456', credentials.access_token)
self.assertEqual(scope, credentials.scope)
def test_create_scoped_required_without_scopes(self):
credentials = AppAssertionCredentials([])
self.assertTrue(credentials.create_scoped_required())
def test_create_scoped_required_with_scopes(self):
credentials = AppAssertionCredentials(['dummy_scope'])
self.assertFalse(credentials.create_scoped_required())
def test_create_scoped(self):
credentials = AppAssertionCredentials([])
new_credentials = credentials.create_scoped(['dummy_scope'])
self.assertNotEqual(credentials, new_credentials)
self.assertTrue(isinstance(new_credentials, AppAssertionCredentials))
self.assertEqual('dummy_scope', new_credentials.scope)
def test_get_access_token(self):
app_identity_stub = self.AppIdentityStubImpl()
apiproxy_stub_map.apiproxy = apiproxy_stub_map.APIProxyStubMap()
apiproxy_stub_map.apiproxy.RegisterStub("app_identity_service",
app_identity_stub)
apiproxy_stub_map.apiproxy.RegisterStub(
'memcache', memcache_stub.MemcacheServiceStub())
credentials = AppAssertionCredentials(['dummy_scope'])
token = credentials.get_access_token()
self.assertEqual('a_token_123', token)
class TestFlowModel(db.Model):
flow = FlowProperty()

View File

@@ -20,8 +20,9 @@ Unit tests for oauth2client.gce.
__author__ = 'jcgregorio@google.com (Joe Gregorio)'
import unittest
import httplib2
import mox
import unittest
from oauth2client.client import AccessTokenRefreshError
from oauth2client.client import Credentials
@@ -55,7 +56,6 @@ class AssertionCredentialsTests(unittest.TestCase):
m.UnsetStubs()
m.VerifyAll()
def test_fail_refresh(self):
m = mox.Mox()
@@ -90,3 +90,44 @@ class AssertionCredentialsTests(unittest.TestCase):
c2 = Credentials.new_from_json(json)
self.assertEqual(c.access_token, c2.access_token)
def test_create_scoped_required_without_scopes(self):
credentials = AppAssertionCredentials([])
self.assertTrue(credentials.create_scoped_required())
def test_create_scoped_required_with_scopes(self):
credentials = AppAssertionCredentials(['dummy_scope'])
self.assertFalse(credentials.create_scoped_required())
def test_create_scoped(self):
credentials = AppAssertionCredentials([])
new_credentials = credentials.create_scoped(['dummy_scope'])
self.assertNotEqual(credentials, new_credentials)
self.assertTrue(isinstance(new_credentials, AppAssertionCredentials))
self.assertEqual('dummy_scope', new_credentials.scope)
def test_get_access_token(self):
m = mox.Mox()
httplib2_response = m.CreateMock(object)
httplib2_response.status = 200
httplib2_request = m.CreateMock(object)
httplib2_request.__call__(
('http://metadata.google.internal/0.1/meta-data/service-accounts/'
'default/acquire?scope=dummy_scope'
)).AndReturn((httplib2_response, '{"accessToken": "this-is-a-token"}'))
m.ReplayAll()
credentials = AppAssertionCredentials(['dummy_scope'])
http = httplib2.Http()
http.request = httplib2_request
self.assertEquals('this-is-a-token',
credentials.get_access_token(http=http))
m.UnsetStubs()
m.VerifyAll()

View File

@@ -24,7 +24,9 @@ __author__ = 'jcgregorio@google.com (Joe Gregorio)'
import base64
import datetime
import mox
import os
import time
import unittest
import urlparse
@@ -37,23 +39,36 @@ from oauth2client.client import AccessTokenCredentials
from oauth2client.client import AccessTokenCredentialsError
from oauth2client.client import AccessTokenRefreshError
from oauth2client.client import AssertionCredentials
from oauth2client.client import AUTHORIZED_USER
from oauth2client.client import Credentials
from oauth2client.client import DefaultCredentialsError
from oauth2client.client import FlowExchangeError
from oauth2client.client import GoogleCredentials
from oauth2client.client import GOOGLE_CREDENTIALS_DEFAULT
from oauth2client.client import MemoryCache
from oauth2client.client import NonAsciiHeaderError
from oauth2client.client import OAuth2Credentials
from oauth2client.client import OAuth2WebServerFlow
from oauth2client.client import OOB_CALLBACK_URN
from oauth2client.client import REFRESH_STATUS_CODES
from oauth2client.client import SERVICE_ACCOUNT
from oauth2client.client import Storage
from oauth2client.client import TokenRevokeError
from oauth2client.client import VerifyJwtTokenError
from oauth2client.client import _env_name
from oauth2client.client import _extract_id_token
from oauth2client.client import _get_default_credential_from_file
from oauth2client.client import _get_environment
from oauth2client.client import _get_environment_variable_file
from oauth2client.client import _get_well_known_file
from oauth2client.client import _raise_exception_for_missing_fields
from oauth2client.client import _raise_exception_for_reading_json
from oauth2client.client import _update_query_params
from oauth2client.client import credentials_from_clientsecrets_and_code
from oauth2client.client import credentials_from_code
from oauth2client.client import flow_from_clientsecrets
from oauth2client.clientsecrets import _loadfile
from oauth2client.service_account import _ServiceAccountCredentials
DATA_DIR = os.path.join(os.path.dirname(__file__), 'data')
@@ -107,6 +122,328 @@ class CredentialsTests(unittest.TestCase):
restored = Credentials.new_from_json(json)
class MockResponse(object):
"""Mock the response of urllib2.urlopen() call."""
def __init__(self, headers):
self._headers = headers
def info(self):
class Info:
def __init__(self, headers):
self.headers = headers
return Info(self._headers)
class GoogleCredentialsTests(unittest.TestCase):
def setUp(self):
self.env_server_software = os.environ.get('SERVER_SOFTWARE', None)
self.env_google_credentials_default = (
os.environ.get(GOOGLE_CREDENTIALS_DEFAULT, None))
self.env_appdata = os.environ.get('APPDATA', None)
self.os_name = os.name
from oauth2client import client
setattr(client, '_env_name', None)
def tearDown(self):
self.reset_env('SERVER_SOFTWARE', self.env_server_software)
self.reset_env(GOOGLE_CREDENTIALS_DEFAULT,
self.env_google_credentials_default)
self.reset_env('APPDATA', self.env_appdata)
os.name = self.os_name
def reset_env(self, env, value):
"""Set the environment variable 'env' to 'value'."""
if value is not None:
os.environ[env] = value
else:
os.environ.pop(env, '')
def validate_service_account_credentials(self, credentials):
self.assertTrue(isinstance(credentials, _ServiceAccountCredentials))
self.assertEqual('123', credentials._service_account_id)
self.assertEqual('dummy@google.com', credentials._service_account_email)
self.assertEqual('ABCDEF', credentials._private_key_id)
self.assertEqual('', credentials._scopes)
def validate_google_credentials(self, credentials):
self.assertTrue(isinstance(credentials, GoogleCredentials))
self.assertEqual(None, credentials.access_token)
self.assertEqual('123', credentials.client_id)
self.assertEqual('secret', credentials.client_secret)
self.assertEqual('alabalaportocala', credentials.refresh_token)
self.assertEqual(None, credentials.token_expiry)
self.assertEqual(GOOGLE_TOKEN_URI, credentials.token_uri)
self.assertEqual('Python client library', credentials.user_agent)
def get_a_google_credentials_object(self):
return GoogleCredentials(None, None, None, None, None, None, None, None)
def test_create_scoped_required(self):
self.assertFalse(
self.get_a_google_credentials_object().create_scoped_required())
def test_create_scoped(self):
credentials = self.get_a_google_credentials_object()
self.assertEqual(credentials, credentials.create_scoped(None))
self.assertEqual(credentials,
credentials.create_scoped(['dummy_scope']))
def test_get_environment_gae_production(self):
os.environ['SERVER_SOFTWARE'] = 'Google App Engine/XYZ'
self.assertEqual('GAE_PRODUCTION', _get_environment())
def test_get_environment_gae_local(self):
os.environ['SERVER_SOFTWARE'] = 'Development/XYZ'
self.assertEqual('GAE_LOCAL', _get_environment())
def test_get_environment_gce_production(self):
os.environ['SERVER_SOFTWARE'] = ''
mockResponse = MockResponse(['Metadata-Flavor: Google\r\n'])
m = mox.Mox()
urllib2_urlopen = m.CreateMock(object)
urllib2_urlopen.__call__(('http://metadata.google.internal'
)).AndReturn(mockResponse)
m.ReplayAll()
self.assertEqual('GCE_PRODUCTION', _get_environment(urllib2_urlopen))
m.UnsetStubs()
m.VerifyAll()
def test_get_environment_unknown(self):
os.environ['SERVER_SOFTWARE'] = ''
mockResponse = MockResponse([])
m = mox.Mox()
urllib2_urlopen = m.CreateMock(object)
urllib2_urlopen.__call__(('http://metadata.google.internal'
)).AndReturn(mockResponse)
m.ReplayAll()
self.assertEqual('UNKNOWN', _get_environment(urllib2_urlopen))
m.UnsetStubs()
m.VerifyAll()
def test_get_environment_variable_file(self):
environment_variable_file = datafile(
os.path.join('gcloud', 'credentials_default.json'))
os.environ[GOOGLE_CREDENTIALS_DEFAULT] = environment_variable_file
self.assertEqual(environment_variable_file,
_get_environment_variable_file())
def test_get_environment_variable_file_error(self):
nonexistent_file = datafile('nonexistent')
os.environ[GOOGLE_CREDENTIALS_DEFAULT] = nonexistent_file
# we can't use self.assertRaisesRegexp() because it is only in Python 2.7+
try:
_get_environment_variable_file()
self.fail(nonexistent_file + ' should not exist.')
except DefaultCredentialsError as error:
self.assertEqual('File ' + nonexistent_file +
' (pointed by ' + GOOGLE_CREDENTIALS_DEFAULT +
' environment variable) does not exist!',
str(error))
def test_get_well_known_file_on_windows(self):
well_known_file = datafile(
os.path.join('gcloud', 'credentials_default.json'))
os.name = 'nt'
os.environ['APPDATA'] = DATA_DIR
self.assertEqual(well_known_file, _get_well_known_file())
def test_get_well_known_file_on_windows_no_file(self):
os.name = 'nt'
os.environ['APPDATA'] = os.path.join(DATA_DIR, 'nonexistentpath')
self.assertEqual(None, _get_well_known_file())
def test_get_default_credential_from_file_service_account(self):
credentials_file = datafile(
os.path.join('gcloud', 'credentials_default.json'))
credentials = _get_default_credential_from_file(credentials_file)
self.validate_service_account_credentials(credentials)
def test_get_default_credential_from_file_authorized_user(self):
credentials_file = datafile(
os.path.join('gcloud', 'credentials_default_authorized_user.json'))
credentials = _get_default_credential_from_file(credentials_file)
self.validate_google_credentials(credentials)
def test_get_default_credential_from_malformed_file_1(self):
credentials_file = datafile(
os.path.join('gcloud', 'credentials_default_malformed_1.json'))
# we can't use self.assertRaisesRegexp() because it is only in Python 2.7+
try:
_get_default_credential_from_file(credentials_file)
self.fail('An exception was expected!')
except DefaultCredentialsError as error:
self.assertEqual("'type' field should be defined "
"(and have one of the '" + AUTHORIZED_USER +
"' or '" + SERVICE_ACCOUNT + "' values)",
str(error))
def test_get_default_credential_from_malformed_file_2(self):
credentials_file = datafile(
os.path.join('gcloud', 'credentials_default_malformed_2.json'))
# we can't use self.assertRaisesRegexp() because it is only in Python 2.7+
try:
_get_default_credential_from_file(credentials_file)
self.fail('An exception was expected!')
except DefaultCredentialsError as error:
self.assertEqual('The following field(s): '
'private_key_id must be defined.',
str(error))
def test_get_default_credential_from_malformed_file_3(self):
credentials_file = datafile(
os.path.join('gcloud', 'credentials_default_malformed_3.json'))
self.assertRaises(ValueError, _get_default_credential_from_file,
credentials_file)
def test_raise_exception_for_missing_fields(self):
missing_fields = ['first', 'second', 'third']
# we can't use self.assertRaisesRegexp() because it is only in Python 2.7+
try:
_raise_exception_for_missing_fields(missing_fields)
self.fail('An exception was expected!')
except DefaultCredentialsError as error:
self.assertEqual('The following field(s): ' +
', '.join(missing_fields) + ' must be defined.',
str(error))
def test_raise_exception_for_reading_json(self):
credential_file = 'any_file'
extra_help = ' be good'
error = DefaultCredentialsError('stuff happens')
# we can't use self.assertRaisesRegexp() because it is only in Python 2.7+
try:
_raise_exception_for_reading_json(credential_file, extra_help, error)
self.fail('An exception was expected!')
except DefaultCredentialsError as ex:
self.assertEqual('An error was encountered while reading '
'json file: '+ credential_file +
extra_help + ': ' + str(error),
str(ex))
def test_get_default_from_environment_variable_service_account(self):
os.environ['SERVER_SOFTWARE'] = ''
environment_variable_file = datafile(
os.path.join('gcloud', 'credentials_default.json'))
os.environ[GOOGLE_CREDENTIALS_DEFAULT] = environment_variable_file
self.validate_service_account_credentials(GoogleCredentials.get_default())
def test_env_name(self):
from oauth2client import client
self.assertEqual(None, getattr(client, '_env_name'))
self.test_get_default_from_environment_variable_service_account()
self.assertEqual('UNKNOWN', getattr(client, '_env_name'))
def test_get_default_from_environment_variable_authorized_user(self):
os.environ['SERVER_SOFTWARE'] = ''
environment_variable_file = datafile(
os.path.join('gcloud', 'credentials_default_authorized_user.json'))
os.environ[GOOGLE_CREDENTIALS_DEFAULT] = environment_variable_file
self.validate_google_credentials(GoogleCredentials.get_default())
def test_get_default_from_environment_variable_malformed_file(self):
os.environ['SERVER_SOFTWARE'] = ''
environment_variable_file = datafile(
os.path.join('gcloud', 'credentials_default_malformed_3.json'))
os.environ[GOOGLE_CREDENTIALS_DEFAULT] = environment_variable_file
# we can't use self.assertRaisesRegexp() because it is only in Python 2.7+
try:
GoogleCredentials.get_default()
self.fail('An exception was expected!')
except DefaultCredentialsError as error:
self.assertTrue(str(error).startswith(
'An error was encountered while reading json file: ' +
environment_variable_file + ' (pointed to by ' +
GOOGLE_CREDENTIALS_DEFAULT + ' environment variable):'))
def test_get_default_environment_not_set_up(self):
# It is normal for this test to fail if run inside
# a Google Compute Engine VM or after 'gcloud auth login' command
# has been executed on a non Windows machine.
os.environ['SERVER_SOFTWARE'] = ''
os.environ[GOOGLE_CREDENTIALS_DEFAULT] = ''
os.environ['APPDATA'] = ''
# we can't use self.assertRaisesRegexp() because it is only in Python 2.7+
try:
GoogleCredentials.get_default()
self.fail('An exception was expected!')
except DefaultCredentialsError as error:
self.assertEqual("The Default Credentials are not available. They are "
"available if running in Google App Engine or Google "
"Compute Engine. They are also available if using the "
"Google Cloud SDK and running 'gcloud auth login'. "
"Otherwise, the environment variable " +
GOOGLE_CREDENTIALS_DEFAULT + " must be defined pointing "
"to a file defining the credentials. See "
"https://developers.google.com/accounts/docs/default-"
"credentials for details.",
str(error))
def test_from_stream_service_account(self):
credentials_file = datafile(
os.path.join('gcloud', 'credentials_default.json'))
credentials = (
self.get_a_google_credentials_object().from_stream(credentials_file))
self.validate_service_account_credentials(credentials)
def test_from_stream_authorized_user(self):
credentials_file = datafile(
os.path.join('gcloud', 'credentials_default_authorized_user.json'))
credentials = (
self.get_a_google_credentials_object().from_stream(credentials_file))
self.validate_google_credentials(credentials)
def test_from_stream_malformed_file_1(self):
credentials_file = datafile(
os.path.join('gcloud', 'credentials_default_malformed_1.json'))
# we can't use self.assertRaisesRegexp() because it is only in Python 2.7+
try:
self.get_a_google_credentials_object().from_stream(credentials_file)
self.fail('An exception was expected!')
except DefaultCredentialsError as error:
self.assertEqual("An error was encountered while reading json file: " +
credentials_file +
" (provided as parameter to the from_stream() method): "
"'type' field should be defined (and have one of the '" +
AUTHORIZED_USER + "' or '" + SERVICE_ACCOUNT +
"' values)",
str(error))
def test_from_stream_malformed_file_2(self):
credentials_file = datafile(
os.path.join('gcloud', 'credentials_default_malformed_2.json'))
# we can't use self.assertRaisesRegexp() because it is only in Python 2.7+
try:
self.get_a_google_credentials_object().from_stream(credentials_file)
self.fail('An exception was expected!')
except DefaultCredentialsError as error:
self.assertEqual('An error was encountered while reading json file: ' +
credentials_file +
' (provided as parameter to the from_stream() method): '
'The following field(s): private_key_id must be '
'defined.',
str(error))
def test_from_stream_malformed_file_3(self):
credentials_file = datafile(
os.path.join('gcloud', 'credentials_default_malformed_3.json'))
self.assertRaises(
DefaultCredentialsError,
self.get_a_google_credentials_object().from_stream, credentials_file)
class DummyDeleteStorage(Storage):
delete_called = False
@@ -245,6 +582,32 @@ class BasicCredentialsTests(unittest.TestCase):
instance = OAuth2Credentials.from_json(self.credentials.to_json())
self.assertEqual('foobar', instance.token_response)
def test_get_access_token(self):
token_response_first = {'access_token': 'first_token', 'expires_in': 1}
token_response_second = {'access_token': 'second_token', 'expires_in': 1}
http = HttpMockSequence([
({'status': '200'}, simplejson.dumps(token_response_first)),
({'status': '200'}, simplejson.dumps(token_response_second)),
])
self.assertEqual('first_token',
self.credentials.get_access_token(http=http))
self.assertFalse(self.credentials.access_token_expired)
self.assertEqual(token_response_first, self.credentials.token_response)
self.assertEqual('first_token',
self.credentials.get_access_token(http=http))
self.assertFalse(self.credentials.access_token_expired)
self.assertEqual(token_response_first, self.credentials.token_response)
time.sleep(1)
self.assertTrue(self.credentials.access_token_expired)
self.assertEqual('second_token',
self.credentials.get_access_token(http=http))
self.assertFalse(self.credentials.access_token_expired)
self.assertEqual(token_response_second, self.credentials.token_response)
class AccessTokenCredentialsTests(unittest.TestCase):

View File

@@ -0,0 +1,120 @@
#!/usr/bin/python2.4
#
# Copyright 2014 Google Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Oauth2client tests.
Unit tests for service account credentials implemented using RSA.
"""
import os
import rsa
import time
import unittest
from http_mock import HttpMockSequence
from oauth2client.anyjson import simplejson
from oauth2client.service_account import _ServiceAccountCredentials
def datafile(filename):
# TODO(orestica): Refactor this using pkgutil.get_data
f = open(os.path.join(os.path.dirname(__file__), 'data', filename), 'r')
data = f.read()
f.close()
return data
class ServiceAccountCredentialsTests(unittest.TestCase):
def setUp(self):
self.service_account_id = '123'
self.service_account_email = 'dummy@google.com'
self.private_key_id = 'ABCDEF'
self.private_key = datafile('pem_from_pkcs12.pem')
self.scopes = ['dummy_scope']
self.credentials = _ServiceAccountCredentials(self.service_account_id,
self.service_account_email,
self.private_key_id,
self.private_key,
[])
def test_sign_blob(self):
private_key_id, signature = self.credentials.sign_blob('Google')
self.assertEqual( self.private_key_id, private_key_id)
pub_key = rsa.PublicKey.load_pkcs1_openssl_pem(
datafile('publickey_openssl.pem'))
self.assertTrue(rsa.pkcs1.verify('Google', signature, pub_key))
try:
rsa.pkcs1.verify('Orest', signature, pub_key)
self.fail('Verification should have failed!')
except rsa.pkcs1.VerificationError:
pass # Expected
try:
rsa.pkcs1.verify('Google', 'bad signature', pub_key)
self.fail('Verification should have failed!')
except rsa.pkcs1.VerificationError:
pass # Expected
def test_service_account_email(self):
self.assertEqual(self.service_account_email,
self.credentials.service_account_email)
def test_create_scoped_required_without_scopes(self):
self.assertTrue(self.credentials.create_scoped_required())
def test_create_scoped_required_with_scopes(self):
self.credentials = _ServiceAccountCredentials(self.service_account_id,
self.service_account_email,
self.private_key_id,
self.private_key,
self.scopes)
self.assertFalse(self.credentials.create_scoped_required())
def test_create_scoped(self):
new_credentials = self.credentials.create_scoped(self.scopes)
self.assertNotEqual(self.credentials, new_credentials)
self.assertTrue(isinstance(new_credentials, _ServiceAccountCredentials))
self.assertEqual('dummy_scope', new_credentials._scopes)
def test_access_token(self):
token_response_first = {'access_token': 'first_token', 'expires_in': 1}
token_response_second = {'access_token': 'second_token', 'expires_in': 1}
http = HttpMockSequence([
({'status': '200'}, simplejson.dumps(token_response_first)),
({'status': '200'}, simplejson.dumps(token_response_second)),
])
self.assertEqual('first_token',
self.credentials.get_access_token(http=http))
self.assertFalse(self.credentials.access_token_expired)
self.assertEqual(token_response_first, self.credentials.token_response)
self.assertEqual('first_token',
self.credentials.get_access_token(http=http))
self.assertFalse(self.credentials.access_token_expired)
self.assertEqual(token_response_first, self.credentials.token_response)
time.sleep(1)
self.assertTrue(self.credentials.access_token_expired)
self.assertEqual('second_token',
self.credentials.get_access_token(http=http))
self.assertFalse(self.credentials.access_token_expired)
self.assertEqual(token_response_second, self.credentials.token_response)