100% coverage for contrib.appengine

This commit is contained in:
Jon Wayne Parrott
2016-03-15 14:26:24 -07:00
committed by Jon Wayne Parrott
parent 5b280ec0a6
commit 3b1f6e596e
2 changed files with 199 additions and 17 deletions

View File

@@ -61,7 +61,7 @@ OAUTH2CLIENT_NAMESPACE = 'oauth2client#ns'
XSRF_MEMCACHE_ID = 'xsrf_secret_key'
if _appengine_ndb is None:
if _appengine_ndb is None: # pragma: NO COVER
CredentialsNDBModel = None
CredentialsNDBProperty = None
FlowNDBProperty = None

View File

@@ -18,7 +18,7 @@ import json
import os
import tempfile
import time
import unittest
import unittest2
from six.moves import urllib
@@ -39,14 +39,20 @@ from google.appengine.ext import ndb
from google.appengine.ext import testbed
from oauth2client.contrib import appengine
from oauth2client import GOOGLE_TOKEN_URI
from oauth2client import GOOGLE_REVOKE_URI
from oauth2client.clientsecrets import _loadfile
from oauth2client.clientsecrets import TYPE_WEB
from oauth2client.clientsecrets import InvalidClientSecretsError
from oauth2client.contrib.appengine import AppAssertionCredentials
from oauth2client.contrib.appengine import CredentialsModel
from oauth2client.contrib.appengine import CredentialsNDBModel
from oauth2client.contrib.appengine import CredentialsProperty
from oauth2client.contrib.appengine import FlowProperty
from oauth2client.contrib.appengine import (
InvalidClientSecretsError as AppEngineInvalidClientSecretsError)
from oauth2client.contrib.appengine import OAuth2Decorator
from oauth2client.contrib.appengine import OAuth2DecoratorFromClientSecrets
from oauth2client.contrib.appengine import oauth2decorator_from_clientsecrets
from oauth2client.contrib.appengine import StorageByKeyName
from oauth2client.client import _CLOUDSDK_CONFIG_ENV_VAR
from oauth2client.client import AccessTokenRefreshError
@@ -104,7 +110,7 @@ class Http2Mock(object):
return self, json.dumps(self.content)
class TestAppAssertionCredentials(unittest.TestCase):
class TestAppAssertionCredentials(unittest2.TestCase):
account_name = "service_account_name@appspot.com"
signature = "signature"
@@ -287,20 +293,24 @@ class TestFlowModel(db.Model):
flow = FlowProperty()
class FlowPropertyTest(unittest.TestCase):
class FlowPropertyTest(unittest2.TestCase):
def setUp(self):
self.testbed = testbed.Testbed()
self.testbed.activate()
self.testbed.init_datastore_v3_stub()
self.flow = flow_from_clientsecrets(
datafile('client_secrets.json'),
'foo',
redirect_uri='oob')
def tearDown(self):
self.testbed.deactivate()
def test_flow_get_put(self):
instance = TestFlowModel(
flow=flow_from_clientsecrets(datafile('client_secrets.json'),
'foo', redirect_uri='oob'),
flow=self.flow,
key_name='foo'
)
instance.put()
@@ -308,6 +318,74 @@ class FlowPropertyTest(unittest.TestCase):
self.assertEqual('foo_client_id', retrieved.flow.client_id)
def test_make_value_from_datastore_none(self):
self.assertIsNone(FlowProperty().make_value_from_datastore(None))
def test_validate(self):
FlowProperty().validate(None)
self.assertRaises(
db.BadValueError,
FlowProperty().validate, 42)
class TestCredentialsModel(db.Model):
credentials = CredentialsProperty()
class CredentialsPropertyTest(unittest2.TestCase):
def setUp(self):
self.testbed = testbed.Testbed()
self.testbed.activate()
self.testbed.init_datastore_v3_stub()
access_token = 'foo'
client_id = 'some_client_id'
client_secret = 'cOuDdkfjxxnv+'
refresh_token = '1/0/a.df219fjls0'
token_expiry = datetime.datetime.utcnow()
user_agent = 'refresh_checker/1.0'
self.credentials = OAuth2Credentials(
access_token, client_id, client_secret,
refresh_token, token_expiry, GOOGLE_TOKEN_URI,
user_agent)
def tearDown(self):
self.testbed.deactivate()
def test_credentials_get_put(self):
instance = TestCredentialsModel(
credentials=self.credentials,
key_name='foo'
)
instance.put()
retrieved = TestCredentialsModel.get_by_key_name('foo')
self.assertEqual(
self.credentials.to_json(),
retrieved.credentials.to_json())
def test_make_value_from_datastore(self):
self.assertIsNone(
CredentialsProperty().make_value_from_datastore(None))
self.assertIsNone(
CredentialsProperty().make_value_from_datastore(''))
self.assertIsNone(
CredentialsProperty().make_value_from_datastore('{'))
decoded = CredentialsProperty().make_value_from_datastore(
self.credentials.to_json())
self.assertEqual(
self.credentials.to_json(),
decoded.to_json())
def test_validate(self):
CredentialsProperty().validate(self.credentials)
CredentialsProperty().validate(None)
self.assertRaises(
db.BadValueError,
CredentialsProperty().validate, 42)
def _http_request(*args, **kwargs):
resp = httplib2.Response({'status': '200'})
@@ -316,7 +394,7 @@ def _http_request(*args, **kwargs):
return resp, content
class StorageByKeyNameTest(unittest.TestCase):
class StorageByKeyNameTest(unittest2.TestCase):
def setUp(self):
self.testbed = testbed.Testbed()
@@ -339,6 +417,27 @@ class StorageByKeyNameTest(unittest.TestCase):
def tearDown(self):
self.testbed.deactivate()
def test_bad_ctor(self):
with self.assertRaises(ValueError):
StorageByKeyName(CredentialsModel, None, None)
def test__is_ndb(self):
storage = StorageByKeyName(
object(), 'foo', 'credentials')
self.assertRaises(
TypeError, storage._is_ndb)
storage._model = type(object)
self.assertRaises(
TypeError, storage._is_ndb)
storage._model = CredentialsModel
self.assertFalse(storage._is_ndb())
storage._model = CredentialsNDBModel
self.assertTrue(storage._is_ndb())
def test_get_and_put_simple(self):
storage = StorageByKeyName(
CredentialsModel, 'foo', 'credentials')
@@ -492,7 +591,7 @@ class MockRequestHandler(object):
request = MockRequest()
class DecoratorTests(unittest.TestCase):
class DecoratorTests(unittest2.TestCase):
def setUp(self):
self.testbed = testbed.Testbed()
@@ -522,7 +621,7 @@ class DecoratorTests(unittest.TestCase):
parent.had_credentials = True
parent.found_credentials = decorator.credentials
if parent.should_raise:
raise Exception('')
raise parent.should_raise
class TestAwareHandler(webapp2.RequestHandler):
@decorator.oauth_aware
@@ -534,7 +633,7 @@ class DecoratorTests(unittest.TestCase):
parent.had_credentials = True
parent.found_credentials = decorator.credentials
if parent.should_raise:
raise Exception('')
raise parent.should_raise
routes = [
('/oauth2callback', self.decorator.callback_handler()),
@@ -557,6 +656,24 @@ class DecoratorTests(unittest.TestCase):
self.testbed.deactivate()
httplib2.Http = self.httplib2_orig
def test_in_error(self):
# NOTE: This branch is never reached. _in_error is not set by any code
# path. It appears to be intended to be set during construction.
self.decorator._in_error = True
self.decorator._message = 'foobar'
response = self.app.get('http://localhost/foo_path')
self.assertIn('foobar', response.body)
response = self.app.get('http://localhost/bar_path/1234/56')
self.assertIn('foobar', response.body)
def test_callback_application(self):
app = self.decorator.callback_application()
self.assertEqual(
app.router.match_routes[0].handler.__name__,
'OAuth2Handler')
def test_required(self):
# An initial request to an oauth_required decorated path should be a
# redirect to start the OAuth dance.
@@ -610,11 +727,21 @@ class DecoratorTests(unittest.TestCase):
self.assertEqual(None, self.decorator.credentials)
# Raising an exception still clears the Credentials.
self.should_raise = True
self.should_raise = Exception('')
self.assertRaises(Exception, self.app.get, '/foo_path')
self.should_raise = False
self.assertEqual(None, self.decorator.credentials)
# Access token refresh error should start the dance again
self.should_raise = AccessTokenRefreshError()
response = self.app.get('/foo_path')
self.should_raise = False
self.assertTrue(response.status.startswith('302'))
query_params = urllib.parse.parse_qs(
response.headers['Location'].split('?', 1)[1])
self.assertEqual('http://localhost/oauth2callback',
query_params['redirect_uri'][0])
# Invalidate the stored Credentials.
self.found_credentials.invalid = True
self.found_credentials.store.put(self.found_credentials)
@@ -622,10 +749,10 @@ class DecoratorTests(unittest.TestCase):
# Invalid Credentials should start the OAuth dance again.
response = self.app.get('/foo_path')
self.assertTrue(response.status.startswith('302'))
q = urllib.parse.parse_qs(
query_params = urllib.parse.parse_qs(
response.headers['Location'].split('?', 1)[1])
self.assertEqual('http://localhost/oauth2callback',
q['redirect_uri'][0])
query_params['redirect_uri'][0])
def test_storage_delete(self):
# An initial request to an oauth_required decorated path should be a
@@ -710,7 +837,7 @@ class DecoratorTests(unittest.TestCase):
self.assertEqual(None, self.decorator.credentials)
# Raising an exception still clears the Credentials.
self.should_raise = True
self.should_raise = Exception('')
self.assertRaises(Exception, self.app.get, '/bar_path/2012/01')
self.should_raise = False
self.assertEqual(None, self.decorator.credentials)
@@ -769,6 +896,38 @@ class DecoratorTests(unittest.TestCase):
self.assertEqual(self.decorator._revoke_uri,
self.decorator.credentials.revoke_uri)
def test_decorator_from_client_secrets_toplevel(self):
decorator_patch = mock.patch(
'oauth2client.contrib.appengine.OAuth2DecoratorFromClientSecrets')
with decorator_patch as decorator_mock:
filename = datafile('client_secrets.json')
decorator = oauth2decorator_from_clientsecrets(
filename,
scope='foo_scope')
decorator_mock.assert_called_once_with(
filename,
'foo_scope',
cache=None,
message=None)
def test_decorator_from_client_secrets_bad_type(self):
# NOTE: this code path is not currently reachable, as the only types
# that oauth2client.clientsecrets can load is web and installed, so
# this test forces execution of this code path. Despite not being
# normally reachable, this should remain in case future types of
# credentials are added.
loadfile_patch = mock.patch(
'oauth2client.contrib.appengine.clientsecrets.loadfile')
with loadfile_patch as loadfile_mock:
loadfile_mock.return_value = ('badtype', None)
self.assertRaises(
AppEngineInvalidClientSecretsError,
OAuth2DecoratorFromClientSecrets,
'doesntmatter.json',
scope=['foo_scope', 'bar_scope'])
def test_decorator_from_client_secrets_kwargs(self):
decorator = OAuth2DecoratorFromClientSecrets(
datafile('client_secrets.json'),
@@ -830,8 +989,31 @@ class DecoratorTests(unittest.TestCase):
except InvalidClientSecretsError:
pass
def test_decorator_from_client_secrets_with_optional_settings(self):
# Test that the decorator works with the absense of a revoke_uri in
# the client secrets.
loadfile_patch = mock.patch(
'oauth2client.contrib.appengine.clientsecrets.loadfile')
with loadfile_patch as loadfile_mock:
loadfile_mock.return_value = (TYPE_WEB, {
"client_id": "foo_client_id",
"client_secret": "foo_client_secret",
"redirect_uris": [],
"auth_uri": "https://accounts.google.com/o/oauth2/v2/auth",
"token_uri": "https://www.googleapis.com/oauth2/v4/token",
# No revoke URI
})
class DecoratorXsrfSecretTests(unittest.TestCase):
decorator = OAuth2DecoratorFromClientSecrets(
'doesntmatter.json',
scope=['foo_scope', 'bar_scope'])
self.assertEqual(decorator._revoke_uri, GOOGLE_REVOKE_URI)
# This is never set, but it's consistent with other tests.
self.assertFalse(decorator._in_error)
class DecoratorXsrfSecretTests(unittest2.TestCase):
"""Test xsrf_secret_key."""
def setUp(self):
@@ -880,7 +1062,7 @@ class DecoratorXsrfSecretTests(unittest.TestCase):
self.assertEqual(site_key.secret, secret)
class DecoratorXsrfProtectionTests(unittest.TestCase):
class DecoratorXsrfProtectionTests(unittest2.TestCase):
"""Test _build_state_value and _parse_state_value."""
def setUp(self):
@@ -902,4 +1084,4 @@ class DecoratorXsrfProtectionTests(unittest.TestCase):
if __name__ == '__main__': # pragma: NO COVER
unittest.main()
unittest2.main()