From 28445b98bb81c60189dc73119904b0125ca73cd3 Mon Sep 17 00:00:00 2001 From: Danny Hermes Date: Thu, 20 Aug 2015 14:05:56 -0700 Subject: [PATCH] Making tests/ files pass PEP8. This is all errors except E402: module level import not at top of file. This is because in most (all?) files the __author__ global comes before imports begin. Ref: http://stackoverflow.com/a/24859703/1068170 --- tests/http_mock.py | 29 +- tests/test__helpers.py | 6 +- tests/test__pycrypto_crypt.py | 10 +- tests/test_appengine.py | 284 ++++++++------- tests/test_clientsecrets.py | 49 ++- tests/test_crypt.py | 16 +- tests/test_devshell.py | 12 +- tests/test_django_orm.py | 9 +- tests/test_file.py | 129 ++++--- tests/test_gce.py | 21 +- tests/test_jwt.py | 121 ++++--- tests/test_keyring.py | 28 +- tests/test_oauth2client.py | 656 ++++++++++++++++++---------------- tests/test_service_account.py | 42 ++- tests/test_tools.py | 3 +- tests/test_util.py | 41 +-- tests/test_xsrfutil.py | 77 ++-- 17 files changed, 816 insertions(+), 717 deletions(-) diff --git a/tests/http_mock.py b/tests/http_mock.py index 2f6913c..5ae3d07 100644 --- a/tests/http_mock.py +++ b/tests/http_mock.py @@ -20,6 +20,7 @@ import httplib2 # TODO(craigcitro): Find a cleaner way to share this code with googleapiclient. + class HttpMock(object): """Mock of httplib2.Http""" @@ -46,11 +47,11 @@ class HttpMock(object): self.headers = None def request(self, uri, - method='GET', - body=None, - headers=None, - redirections=1, - connection_type=None): + method='GET', + body=None, + headers=None, + redirections=1, + connection_type=None): self.uri = uri self.method = method self.body = body @@ -94,23 +95,25 @@ class HttpMockSequence(object): self.requests = [] def request(self, uri, - method='GET', - body=None, - headers=None, - redirections=1, - connection_type=None): + method='GET', + body=None, + headers=None, + redirections=1, + connection_type=None): resp, content = self._iterable.pop(0) self.requests.append({'uri': uri, 'body': body, 'headers': headers}) # Read any underlying stream before sending the request. - body_stream_content = body.read() if getattr(body, 'read', None) else None + body_stream_content = (body.read() + if getattr(body, 'read', None) else None) if content == 'echo_request_headers': content = headers elif content == 'echo_request_headers_as_json': content = json.dumps(headers) elif content == 'echo_request_body': - content = body if body_stream_content is None else body_stream_content + content = (body + if body_stream_content is None else body_stream_content) elif content == 'echo_request_uri': content = uri elif not isinstance(content, bytes): - raise TypeError('http content should be bytes: %r' % (content, )) + raise TypeError('http content should be bytes: %r' % (content,)) return httplib2.Response(resp), content diff --git a/tests/test__helpers.py b/tests/test__helpers.py index eb75d35..0a7fecf 100644 --- a/tests/test__helpers.py +++ b/tests/test__helpers.py @@ -43,12 +43,12 @@ class Test__json_encode(unittest.TestCase): # is non-deterministic. data = {u'foo': 10} result = _json_encode(data) - self.assertEqual(result, """{"foo":10}""") + self.assertEqual(result, '{"foo":10}') def test_list_input(self): data = [42, 1337] result = _json_encode(data) - self.assertEqual(result, """[42,1337]""") + self.assertEqual(result, '[42,1337]') class Test__to_bytes(unittest.TestCase): @@ -114,4 +114,4 @@ class Test__urlsafe_b64decode(unittest.TestCase): import binascii bad_string = b'+' self.assertRaises((TypeError, binascii.Error), - _urlsafe_b64decode, bad_string) + _urlsafe_b64decode, bad_string) diff --git a/tests/test__pycrypto_crypt.py b/tests/test__pycrypto_crypt.py index 9895a62..0f26b03 100644 --- a/tests/test__pycrypto_crypt.py +++ b/tests/test__pycrypto_crypt.py @@ -23,9 +23,9 @@ from oauth2client.crypt import PyCryptoVerifier class TestPyCryptoVerifier(unittest.TestCase): PUBLIC_KEY_FILENAME = os.path.join(os.path.dirname(__file__), - 'data', 'publickey.pem') + 'data', 'publickey.pem') PRIVATE_KEY_FILENAME = os.path.join(os.path.dirname(__file__), - 'data', 'privatekey.pem') + 'data', 'privatekey.pem') def _load_public_key_bytes(self): with open(self.PUBLIC_KEY_FILENAME, 'rb') as fh: @@ -41,18 +41,18 @@ class TestPyCryptoVerifier(unittest.TestCase): actual_signature = signer.sign(to_sign) verifier = PyCryptoVerifier.from_string(self._load_public_key_bytes(), - is_x509_cert=True) + is_x509_cert=True) self.assertTrue(verifier.verify(to_sign, actual_signature)) def test_verify_failure(self): verifier = PyCryptoVerifier.from_string(self._load_public_key_bytes(), - is_x509_cert=True) + is_x509_cert=True) bad_signature = b'' self.assertFalse(verifier.verify(b'foo', bad_signature)) def test_verify_bad_key(self): verifier = PyCryptoVerifier.from_string(self._load_public_key_bytes(), - is_x509_cert=True) + is_x509_cert=True) bad_signature = b'' self.assertFalse(verifier.verify(b'foo', bad_signature)) diff --git a/tests/test_appengine.py b/tests/test_appengine.py index dfa6831..f6c632a 100644 --- a/tests/test_appengine.py +++ b/tests/test_appengine.py @@ -113,16 +113,16 @@ class Http2Mock(object): """Mock httplib2.Http""" status = 200 content = { - 'access_token': 'foo_access_token', - 'refresh_token': 'foo_refresh_token', - 'expires_in': 3600, - 'extra': 'value', + 'access_token': 'foo_access_token', + 'refresh_token': 'foo_refresh_token', + 'expires_in': 3600, + 'extra': 'value', } def request(self, token_uri, method, body, headers, *args, **kwargs): self.body = body self.headers = headers - return (self, json.dumps(self.content)) + return self, json.dumps(self.content) class TestAppAssertionCredentials(unittest.TestCase): @@ -132,8 +132,8 @@ class TestAppAssertionCredentials(unittest.TestCase): class AppIdentityStubImpl(apiproxy_stub.APIProxyStub): def __init__(self): - super(TestAppAssertionCredentials.AppIdentityStubImpl, self).__init__( - 'app_identity_service') + super(TestAppAssertionCredentials.AppIdentityStubImpl, + self).__init__('app_identity_service') def _Dynamic_GetAccessToken(self, request, response): response.set_access_token('a_token_123') @@ -142,8 +142,8 @@ class TestAppAssertionCredentials(unittest.TestCase): class ErroringAppIdentityStubImpl(apiproxy_stub.APIProxyStub): def __init__(self): - super(TestAppAssertionCredentials.ErroringAppIdentityStubImpl, self).__init__( - 'app_identity_service') + super(TestAppAssertionCredentials.ErroringAppIdentityStubImpl, + self).__init__('app_identity_service') def _Dynamic_GetAccessToken(self, request, response): raise app_identity.BackendDeadlineExceeded() @@ -152,9 +152,9 @@ class TestAppAssertionCredentials(unittest.TestCase): app_identity_stub = self.ErroringAppIdentityStubImpl() apiproxy_stub_map.apiproxy = apiproxy_stub_map.APIProxyStubMap() apiproxy_stub_map.apiproxy.RegisterStub('app_identity_service', - app_identity_stub) + app_identity_stub) apiproxy_stub_map.apiproxy.RegisterStub( - 'memcache', memcache_stub.MemcacheServiceStub()) + 'memcache', memcache_stub.MemcacheServiceStub()) scope = 'http://www.googleapis.com/scope' credentials = AppAssertionCredentials(scope) @@ -165,13 +165,13 @@ class TestAppAssertionCredentials(unittest.TestCase): app_identity_stub = self.AppIdentityStubImpl() apiproxy_stub_map.apiproxy = apiproxy_stub_map.APIProxyStubMap() apiproxy_stub_map.apiproxy.RegisterStub("app_identity_service", - app_identity_stub) + app_identity_stub) apiproxy_stub_map.apiproxy.RegisterStub( - 'memcache', memcache_stub.MemcacheServiceStub()) + 'memcache', memcache_stub.MemcacheServiceStub()) scope = [ - "http://www.googleapis.com/scope", - "http://www.googleapis.com/scope2"] + "http://www.googleapis.com/scope", + "http://www.googleapis.com/scope2"] credentials = AppAssertionCredentials(scope) http = httplib2.Http() credentials.refresh(http) @@ -180,34 +180,35 @@ class TestAppAssertionCredentials(unittest.TestCase): json = credentials.to_json() credentials = Credentials.new_from_json(json) self.assertEqual( - 'http://www.googleapis.com/scope http://www.googleapis.com/scope2', - credentials.scope) + 'http://www.googleapis.com/scope http://www.googleapis.com/scope2', + credentials.scope) - scope = "http://www.googleapis.com/scope http://www.googleapis.com/scope2" + scope = ('http://www.googleapis.com/scope ' + 'http://www.googleapis.com/scope2') credentials = AppAssertionCredentials(scope) http = httplib2.Http() credentials.refresh(http) self.assertEqual('a_token_123', credentials.access_token) self.assertEqual( - 'http://www.googleapis.com/scope http://www.googleapis.com/scope2', - credentials.scope) + 'http://www.googleapis.com/scope http://www.googleapis.com/scope2', + credentials.scope) def test_custom_service_account(self): scope = "http://www.googleapis.com/scope" account_id = "service_account_name_2@appspot.com" with mock.patch.object(app_identity, 'get_access_token', - return_value=('a_token_456', None), - autospec=True) as get_access_token: + return_value=('a_token_456', None), + autospec=True) as get_access_token: credentials = AppAssertionCredentials( - scope, service_account_id=account_id) + scope, service_account_id=account_id) http = httplib2.Http() credentials.refresh(http) self.assertEqual('a_token_456', credentials.access_token) self.assertEqual(scope, credentials.scope) get_access_token.assert_called_once_with( - [scope], service_account_id=account_id) + [scope], service_account_id=account_id) def test_create_scoped_required_without_scopes(self): credentials = AppAssertionCredentials([]) @@ -228,9 +229,9 @@ class TestAppAssertionCredentials(unittest.TestCase): app_identity_stub = self.AppIdentityStubImpl() apiproxy_stub_map.apiproxy = apiproxy_stub_map.APIProxyStubMap() apiproxy_stub_map.apiproxy.RegisterStub("app_identity_service", - app_identity_stub) + app_identity_stub) apiproxy_stub_map.apiproxy.RegisterStub( - 'memcache', memcache_stub.MemcacheServiceStub()) + 'memcache', memcache_stub.MemcacheServiceStub()) credentials = AppAssertionCredentials(['dummy_scope']) token = credentials.get_access_token() @@ -240,7 +241,8 @@ class TestAppAssertionCredentials(unittest.TestCase): def test_save_to_well_known_file(self): os.environ[_CLOUDSDK_CONFIG_ENV_VAR] = tempfile.mkdtemp() credentials = AppAssertionCredentials([]) - self.assertRaises(NotImplementedError, save_to_well_known_file, credentials) + self.assertRaises(NotImplementedError, + save_to_well_known_file, credentials) del os.environ[_CLOUDSDK_CONFIG_ENV_VAR] @@ -260,9 +262,9 @@ class FlowPropertyTest(unittest.TestCase): def test_flow_get_put(self): instance = TestFlowModel( - flow=flow_from_clientsecrets(datafile('client_secrets.json'), 'foo', - redirect_uri='oob'), - key_name='foo' + flow=flow_from_clientsecrets(datafile('client_secrets.json'), + 'foo', redirect_uri='oob'), + key_name='foo' ) instance.put() retrieved = TestFlowModel.get_by_key_name('foo') @@ -287,9 +289,9 @@ class FlowNDBPropertyTest(unittest.TestCase): def test_flow_get_put(self): instance = TestFlowNDBModel( - flow=flow_from_clientsecrets(datafile('client_secrets.json'), 'foo', - redirect_uri='oob'), - id='foo' + flow=flow_from_clientsecrets(datafile('client_secrets.json'), + 'foo', redirect_uri='oob'), + id='foo' ) instance.put() retrieved = TestFlowNDBModel.get_by_id('foo') @@ -320,16 +322,16 @@ class StorageByKeyNameTest(unittest.TestCase): token_expiry = datetime.datetime.utcnow() user_agent = 'refresh_checker/1.0' self.credentials = OAuth2Credentials( - access_token, client_id, client_secret, - refresh_token, token_expiry, GOOGLE_TOKEN_URI, - user_agent) + access_token, client_id, client_secret, + refresh_token, token_expiry, GOOGLE_TOKEN_URI, + user_agent) def tearDown(self): self.testbed.deactivate() def test_get_and_put_simple(self): storage = StorageByKeyName( - CredentialsModel, 'foo', 'credentials') + CredentialsModel, 'foo', 'credentials') self.assertEqual(None, storage.get()) self.credentials.set_store(storage) @@ -340,7 +342,7 @@ class StorageByKeyNameTest(unittest.TestCase): def test_get_and_put_cached(self): storage = StorageByKeyName( - CredentialsModel, 'foo', 'credentials', cache=memcache) + CredentialsModel, 'foo', 'credentials', cache=memcache) self.assertEqual(None, storage.get()) self.credentials.set_store(storage) @@ -365,13 +367,13 @@ class StorageByKeyNameTest(unittest.TestCase): def test_get_and_put_set_store_on_cache_retrieval(self): storage = StorageByKeyName( - CredentialsModel, 'foo', 'credentials', cache=memcache) + CredentialsModel, 'foo', 'credentials', cache=memcache) self.assertEqual(None, storage.get()) self.credentials.set_store(storage) storage.put(self.credentials) - # Pre-bug 292 old_creds wouldn't have storage, and the _refresh wouldn't - # be able to store the updated cred back into the storage. + # Pre-bug 292 old_creds wouldn't have storage, and the _refresh + # wouldn't be able to store the updated cred back into the storage. old_creds = storage.get() self.assertEqual(old_creds.access_token, 'foo') old_creds.invalid = True @@ -382,7 +384,7 @@ class StorageByKeyNameTest(unittest.TestCase): def test_get_and_put_ndb(self): # Start empty storage = StorageByKeyName( - CredentialsNDBModel, 'foo', 'credentials') + CredentialsNDBModel, 'foo', 'credentials') self.assertEqual(None, storage.get()) # Refresh storage and retrieve without using storage @@ -391,19 +393,20 @@ class StorageByKeyNameTest(unittest.TestCase): credmodel = CredentialsNDBModel.get_by_id('foo') self.assertEqual('bar', credmodel.credentials.access_token) self.assertEqual(credmodel.credentials.to_json(), - self.credentials.to_json()) + self.credentials.to_json()) def test_delete_ndb(self): # Start empty storage = StorageByKeyName( - CredentialsNDBModel, 'foo', 'credentials') + CredentialsNDBModel, 'foo', 'credentials') self.assertEqual(None, storage.get()) - # Add credentials to model with storage, and check equivalent w/o storage + # Add credentials to model with storage, and check equivalent + # w/o storage storage.put(self.credentials) credmodel = CredentialsNDBModel.get_by_id('foo') self.assertEqual(credmodel.credentials.to_json(), - self.credentials.to_json()) + self.credentials.to_json()) # Delete and make sure empty storage.delete() @@ -412,7 +415,7 @@ class StorageByKeyNameTest(unittest.TestCase): def test_get_and_put_mixed_ndb_storage_db_get(self): # Start empty storage = StorageByKeyName( - CredentialsNDBModel, 'foo', 'credentials') + CredentialsNDBModel, 'foo', 'credentials') self.assertEqual(None, storage.get()) # Set NDB store and refresh to add to storage @@ -423,12 +426,12 @@ class StorageByKeyNameTest(unittest.TestCase): credmodel = CredentialsModel.get_by_key_name('foo') self.assertEqual('bar', credmodel.credentials.access_token) self.assertEqual(self.credentials.to_json(), - credmodel.credentials.to_json()) + credmodel.credentials.to_json()) def test_get_and_put_mixed_db_storage_ndb_get(self): # Start empty storage = StorageByKeyName( - CredentialsModel, 'foo', 'credentials') + CredentialsModel, 'foo', 'credentials') self.assertEqual(None, storage.get()) # Set DB store and refresh to add to storage @@ -439,14 +442,14 @@ class StorageByKeyNameTest(unittest.TestCase): credmodel = CredentialsNDBModel.get_by_id('foo') self.assertEqual('bar', credmodel.credentials.access_token) self.assertEqual(self.credentials.to_json(), - credmodel.credentials.to_json()) + credmodel.credentials.to_json()) def test_delete_db_ndb_mixed(self): # Start empty storage_ndb = StorageByKeyName( - CredentialsNDBModel, 'foo', 'credentials') + CredentialsNDBModel, 'foo', 'credentials') storage = StorageByKeyName( - CredentialsModel, 'foo', 'credentials') + CredentialsModel, 'foo', 'credentials') # First DB, then NDB self.assertEqual(None, storage.get()) @@ -489,9 +492,9 @@ class DecoratorTests(unittest.TestCase): self.testbed.init_user_stub() decorator = OAuth2Decorator(client_id='foo_client_id', - client_secret='foo_client_secret', - scope=['foo_scope', 'bar_scope'], - user_agent='foo') + client_secret='foo_client_secret', + scope=['foo_scope', 'bar_scope'], + user_agent='foo') self._finish_setup(decorator, user_mock=UserMock) @@ -523,20 +526,22 @@ class DecoratorTests(unittest.TestCase): if parent.should_raise: raise Exception('') - application = webapp2.WSGIApplication([ - ('/oauth2callback', self.decorator.callback_handler()), - ('/foo_path', TestRequiredHandler), - webapp2.Route(r'/bar_path//', - handler=TestAwareHandler, name='bar')], - debug=True) - self.app = TestApp(application, extra_environ={ - 'wsgi.url_scheme': 'http', - 'HTTP_HOST': 'localhost', + routes = [ + ('/oauth2callback', self.decorator.callback_handler()), + ('/foo_path', TestRequiredHandler), + webapp2.Route(r'/bar_path//', + handler=TestAwareHandler, name='bar'), + ] + application = webapp2.WSGIApplication(routes, debug=True) + + self.app = TestApp(application, extra_environ={ + 'wsgi.url_scheme': 'http', + 'HTTP_HOST': 'localhost', }) - self.current_user = user_mock() - users.get_current_user = self.current_user - self.httplib2_orig = httplib2.Http - httplib2.Http = Http2Mock + self.current_user = user_mock() + users.get_current_user = self.current_user + self.httplib2_orig = httplib2.Http + httplib2.Http = Http2Mock def tearDown(self): self.testbed.deactivate() @@ -549,46 +554,49 @@ class DecoratorTests(unittest.TestCase): self.assertEqual(self.decorator.credentials, None) response = self.app.get('http://localhost/foo_path') self.assertTrue(response.status.startswith('302')) - q = urllib.parse.parse_qs(response.headers['Location'].split('?', 1)[1]) - self.assertEqual('http://localhost/oauth2callback', q['redirect_uri'][0]) + q = urllib.parse.parse_qs( + response.headers['Location'].split('?', 1)[1]) + self.assertEqual('http://localhost/oauth2callback', + q['redirect_uri'][0]) self.assertEqual('foo_client_id', q['client_id'][0]) self.assertEqual('foo_scope bar_scope', q['scope'][0]) self.assertEqual('http://localhost/foo_path', - q['state'][0].rsplit(':', 1)[0]) + q['state'][0].rsplit(':', 1)[0]) self.assertEqual('code', q['response_type'][0]) self.assertEqual(False, self.decorator.has_credentials()) with mock.patch.object(appengine, '_parse_state_value', - return_value='foo_path', - autospec=True) as parse_state_value: + return_value='foo_path', + autospec=True) as parse_state_value: # Now simulate the callback to /oauth2callback. response = self.app.get('/oauth2callback', { - 'code': 'foo_access_code', - 'state': 'foo_path:xsrfkey123', - }) + 'code': 'foo_access_code', + 'state': 'foo_path:xsrfkey123', + }) parts = response.headers['Location'].split('?', 1) self.assertEqual('http://localhost/foo_path', parts[0]) self.assertEqual(None, self.decorator.credentials) if self.decorator._token_response_param: response_query = urllib.parse.parse_qs(parts[1]) - response = response_query[self.decorator._token_response_param][0] + response = response_query[ + self.decorator._token_response_param][0] self.assertEqual(Http2Mock.content, - json.loads(urllib.parse.unquote(response))) + json.loads(urllib.parse.unquote(response))) self.assertEqual(self.decorator.flow, self.decorator._tls.flow) self.assertEqual(self.decorator.credentials, - self.decorator._tls.credentials) + self.decorator._tls.credentials) parse_state_value.assert_called_once_with( - 'foo_path:xsrfkey123', self.current_user) + 'foo_path:xsrfkey123', self.current_user) # Now requesting the decorated path should work. response = self.app.get('/foo_path') self.assertEqual('200 OK', response.status) self.assertEqual(True, self.had_credentials) self.assertEqual('foo_refresh_token', - self.found_credentials.refresh_token) + self.found_credentials.refresh_token) self.assertEqual('foo_access_token', - self.found_credentials.access_token) + self.found_credentials.access_token) self.assertEqual(None, self.decorator.credentials) # Raising an exception still clears the Credentials. @@ -604,8 +612,10 @@ class DecoratorTests(unittest.TestCase): # Invalid Credentials should start the OAuth dance again. response = self.app.get('/foo_path') self.assertTrue(response.status.startswith('302')) - q = urllib.parse.parse_qs(response.headers['Location'].split('?', 1)[1]) - self.assertEqual('http://localhost/oauth2callback', q['redirect_uri'][0]) + q = urllib.parse.parse_qs( + response.headers['Location'].split('?', 1)[1]) + self.assertEqual('http://localhost/oauth2callback', + q['redirect_uri'][0]) def test_storage_delete(self): # An initial request to an oauth_required decorated path should be a @@ -614,14 +624,15 @@ class DecoratorTests(unittest.TestCase): self.assertTrue(response.status.startswith('302')) with mock.patch.object(appengine, '_parse_state_value', - return_value='foo_path', - autospec=True) as parse_state_value: + return_value='foo_path', + autospec=True) as parse_state_value: # Now simulate the callback to /oauth2callback. response = self.app.get('/oauth2callback', { - 'code': 'foo_access_code', - 'state': 'foo_path:xsrfkey123', + 'code': 'foo_access_code', + 'state': 'foo_path:xsrfkey123', }) - self.assertEqual('http://localhost/foo_path', response.headers['Location']) + self.assertEqual('http://localhost/foo_path', + response.headers['Location']) self.assertEqual(None, self.decorator.credentials) # Now requesting the decorated path should work. @@ -640,37 +651,40 @@ class DecoratorTests(unittest.TestCase): self.assertTrue(response.status.startswith('302')) parse_state_value.assert_called_once_with( - 'foo_path:xsrfkey123', self.current_user) + 'foo_path:xsrfkey123', self.current_user) def test_aware(self): - # An initial request to an oauth_aware decorated path should not redirect. + # An initial request to an oauth_aware decorated path should + # not redirect. response = self.app.get('http://localhost/bar_path/2012/01') self.assertEqual('Hello World!', response.body) self.assertEqual('200 OK', response.status) self.assertEqual(False, self.decorator.has_credentials()) url = self.decorator.authorize_url() q = urllib.parse.parse_qs(url.split('?', 1)[1]) - self.assertEqual('http://localhost/oauth2callback', q['redirect_uri'][0]) + self.assertEqual('http://localhost/oauth2callback', + q['redirect_uri'][0]) self.assertEqual('foo_client_id', q['client_id'][0]) self.assertEqual('foo_scope bar_scope', q['scope'][0]) self.assertEqual('http://localhost/bar_path/2012/01', - q['state'][0].rsplit(':', 1)[0]) + q['state'][0].rsplit(':', 1)[0]) self.assertEqual('code', q['response_type'][0]) with mock.patch.object(appengine, '_parse_state_value', - return_value='bar_path', - autospec=True) as parse_state_value: + return_value='bar_path', + autospec=True) as parse_state_value: # Now simulate the callback to /oauth2callback. url = self.decorator.authorize_url() response = self.app.get('/oauth2callback', { - 'code': 'foo_access_code', - 'state': 'bar_path:xsrfkey456', - }) + 'code': 'foo_access_code', + 'state': 'bar_path:xsrfkey456', + }) - self.assertEqual('http://localhost/bar_path', response.headers['Location']) + self.assertEqual('http://localhost/bar_path', + response.headers['Location']) self.assertEqual(False, self.decorator.has_credentials()) parse_state_value.assert_called_once_with( - 'bar_path:xsrfkey456', self.current_user) + 'bar_path:xsrfkey456', self.current_user) # Now requesting the decorated path will have credentials. response = self.app.get('/bar_path/2012/01') @@ -678,9 +692,9 @@ class DecoratorTests(unittest.TestCase): self.assertEqual('Hello World!', response.body) self.assertEqual(True, self.had_credentials) self.assertEqual('foo_refresh_token', - self.found_credentials.refresh_token) + self.found_credentials.refresh_token) self.assertEqual('foo_access_token', - self.found_credentials.access_token) + self.found_credentials.access_token) # Credentials should be cleared after each call. self.assertEqual(None, self.decorator.credentials) @@ -692,28 +706,29 @@ class DecoratorTests(unittest.TestCase): self.assertEqual(None, self.decorator.credentials) def test_error_in_step2(self): - # An initial request to an oauth_aware decorated path should not redirect. + # An initial request to an oauth_aware decorated path should + # not redirect. response = self.app.get('/bar_path/2012/01') url = self.decorator.authorize_url() response = self.app.get('/oauth2callback', { - 'error': 'BadHappened\'' + 'error': 'BadHappened\'' }) self.assertEqual('200 OK', response.status) self.assertTrue('Bad<Stuff>Happened'' in response.body) def test_kwargs_are_passed_to_underlying_flow(self): decorator = OAuth2Decorator(client_id='foo_client_id', - client_secret='foo_client_secret', - user_agent='foo_user_agent', - scope=['foo_scope', 'bar_scope'], - access_type='offline', - approval_prompt='force', - revoke_uri='dummy_revoke_uri') + client_secret='foo_client_secret', + user_agent='foo_user_agent', + scope=['foo_scope', 'bar_scope'], + access_type='offline', + approval_prompt='force', + revoke_uri='dummy_revoke_uri') request_handler = MockRequestHandler() decorator._create_flow(request_handler) self.assertEqual('https://example.org/oauth2callback', - decorator.flow.redirect_uri) + decorator.flow.redirect_uri) self.assertEqual('offline', decorator.flow.params['access_type']) self.assertEqual('force', decorator.flow.params['approval_prompt']) self.assertEqual('foo_user_agent', decorator.flow.user_agent) @@ -727,41 +742,42 @@ class DecoratorTests(unittest.TestCase): def test_decorator_from_client_secrets(self): decorator = OAuth2DecoratorFromClientSecrets( - datafile('client_secrets.json'), - scope=['foo_scope', 'bar_scope']) + datafile('client_secrets.json'), + scope=['foo_scope', 'bar_scope']) self._finish_setup(decorator, user_mock=UserMock) self.assertFalse(decorator._in_error) self.decorator = decorator self.test_required() http = self.decorator.http() - self.assertEquals('foo_access_token', http.request.credentials.access_token) + self.assertEquals('foo_access_token', + http.request.credentials.access_token) # revoke_uri is not required self.assertEqual(self.decorator._revoke_uri, - 'https://accounts.google.com/o/oauth2/revoke') + 'https://accounts.google.com/o/oauth2/revoke') self.assertEqual(self.decorator._revoke_uri, - self.decorator.credentials.revoke_uri) + self.decorator.credentials.revoke_uri) def test_decorator_from_client_secrets_kwargs(self): decorator = OAuth2DecoratorFromClientSecrets( - datafile('client_secrets.json'), - scope=['foo_scope', 'bar_scope'], - approval_prompt='force') + datafile('client_secrets.json'), + scope=['foo_scope', 'bar_scope'], + approval_prompt='force') self.assertTrue('approval_prompt' in decorator._kwargs) def test_decorator_from_cached_client_secrets(self): cache_mock = CacheMock() load_and_cache('client_secrets.json', 'secret', cache_mock) decorator = OAuth2DecoratorFromClientSecrets( - # filename, scope, message=None, cache=None - 'secret', '', cache=cache_mock) + # filename, scope, message=None, cache=None + 'secret', '', cache=cache_mock) self.assertFalse(decorator._in_error) def test_decorator_from_client_secrets_not_logged_in_required(self): decorator = OAuth2DecoratorFromClientSecrets( - datafile('client_secrets.json'), - scope=['foo_scope', 'bar_scope'], message='NotLoggedInMessage') + datafile('client_secrets.json'), + scope=['foo_scope', 'bar_scope'], message='NotLoggedInMessage') self.decorator = decorator self._finish_setup(decorator, user_mock=UserNotLoggedInMock) @@ -775,8 +791,8 @@ class DecoratorTests(unittest.TestCase): def test_decorator_from_client_secrets_not_logged_in_aware(self): decorator = OAuth2DecoratorFromClientSecrets( - datafile('client_secrets.json'), - scope=['foo_scope', 'bar_scope'], message='NotLoggedInMessage') + datafile('client_secrets.json'), + scope=['foo_scope', 'bar_scope'], message='NotLoggedInMessage') self.decorator = decorator self._finish_setup(decorator, user_mock=UserNotLoggedInMock) @@ -790,8 +806,8 @@ class DecoratorTests(unittest.TestCase): MESSAGE = 'File is missing' try: decorator = OAuth2DecoratorFromClientSecrets( - datafile('unfilled_client_secrets.json'), - scope=['foo_scope', 'bar_scope'], message=MESSAGE) + datafile('unfilled_client_secrets.json'), + scope=['foo_scope', 'bar_scope'], message=MESSAGE) except InvalidClientSecretsError: pass @@ -799,8 +815,8 @@ class DecoratorTests(unittest.TestCase): MESSAGE = 'File is missing' try: decorator = OAuth2DecoratorFromClientSecrets( - datafile('unfilled_client_secrets.json'), - scope=['foo_scope', 'bar_scope'], message=MESSAGE) + datafile('unfilled_client_secrets.json'), + scope=['foo_scope', 'bar_scope'], message=MESSAGE) except InvalidClientSecretsError: pass @@ -826,13 +842,13 @@ class DecoratorXsrfSecretTests(unittest.TestCase): # Secret shouldn't change if memcache goes away. memcache.delete(appengine.XSRF_MEMCACHE_ID, - namespace=appengine.OAUTH2CLIENT_NAMESPACE) + namespace=appengine.OAUTH2CLIENT_NAMESPACE) secret3 = appengine.xsrf_secret_key() self.assertEqual(secret2, secret3) # Secret should change if both memcache and the model goes away. memcache.delete(appengine.XSRF_MEMCACHE_ID, - namespace=appengine.OAUTH2CLIENT_NAMESPACE) + namespace=appengine.OAUTH2CLIENT_NAMESPACE) model = appengine.SiteXsrfSecretKey.get_or_insert('site') model.delete() @@ -869,10 +885,10 @@ class DecoratorXsrfProtectionTests(unittest.TestCase): def test_build_and_parse_state(self): state = appengine._build_state_value(MockRequestHandler(), UserMock()) self.assertEqual( - 'https://example.org', - appengine._parse_state_value(state, UserMock())) + 'https://example.org', + appengine._parse_state_value(state, UserMock())) self.assertRaises(appengine.InvalidXsrfTokenError, - appengine._parse_state_value, state[1:], UserMock()) + appengine._parse_state_value, state[1:], UserMock()) if __name__ == '__main__': diff --git a/tests/test_clientsecrets.py b/tests/test_clientsecrets.py index 0fc5794..edf3243 100644 --- a/tests/test_clientsecrets.py +++ b/tests/test_clientsecrets.py @@ -14,8 +14,6 @@ """Unit tests for oauth2client.clientsecrets.""" -__author__ = 'jcgregorio@google.com (Joe Gregorio)' - import os import unittest from io import StringIO @@ -24,6 +22,10 @@ import httplib2 from oauth2client import clientsecrets + +__author__ = 'jcgregorio@google.com (Joe Gregorio)' + + DATA_DIR = os.path.join(os.path.dirname(__file__), 'data') VALID_FILE = os.path.join(DATA_DIR, 'client_secrets.json') INVALID_FILE = os.path.join(DATA_DIR, 'unfilled_client_secrets.json') @@ -32,29 +34,24 @@ NONEXISTENT_FILE = os.path.join(__file__, '..', 'afilethatisntthere.json') class OAuth2CredentialsTests(unittest.TestCase): - def setUp(self): - pass - - def tearDown(self): - pass - def test_validate_error(self): + payload = ( + b'{' + b' "web": {' + b' "client_id": "[[CLIENT ID REQUIRED]]",' + b' "client_secret": "[[CLIENT SECRET REQUIRED]]",' + b' "redirect_uris": ["http://localhost:8080/oauth2callback"],' + b' "auth_uri": "",' + b' "token_uri": ""' + b' }' + b'}') ERRORS = [ - ('{}', 'Invalid'), - ('{"foo": {}}', 'Unknown'), - ('{"web": {}}', 'Missing'), - ('{"web": {"client_id": "dkkd"}}', 'Missing'), - ("""{ - "web": { - "client_id": "[[CLIENT ID REQUIRED]]", - "client_secret": "[[CLIENT SECRET REQUIRED]]", - "redirect_uris": ["http://localhost:8080/oauth2callback"], - "auth_uri": "", - "token_uri": "" - } - } - """, 'Property'), - ] + ('{}', 'Invalid'), + ('{"foo": {}}', 'Unknown'), + ('{"web": {}}', 'Missing'), + ('{"web": {"client_id": "dkkd"}}', 'Missing'), + (payload, 'Property'), + ] for src, match in ERRORS: # Ensure that it is unicode try: @@ -107,7 +104,7 @@ class CachedClientsecretsTests(unittest.TestCase): def test_cache_miss(self): client_type, client_info = clientsecrets.loadfile( - VALID_FILE, cache=self.cache_mock) + VALID_FILE, cache=self.cache_mock) self.assertEqual('web', client_type) self.assertEqual('foo_client_secret', client_info['client_secret']) @@ -124,7 +121,7 @@ class CachedClientsecretsTests(unittest.TestCase): self.cache_mock.cache[NONEXISTENT_FILE] = {'web': 'secret info'} client_type, client_info = clientsecrets.loadfile( - NONEXISTENT_FILE, cache=self.cache_mock) + NONEXISTENT_FILE, cache=self.cache_mock) self.assertEqual('web', client_type) self.assertEqual('secret info', client_info) # make sure we didn't do any set() RPCs @@ -134,7 +131,7 @@ class CachedClientsecretsTests(unittest.TestCase): try: clientsecrets.loadfile(INVALID_FILE, cache=self.cache_mock) self.fail('Expected InvalidClientSecretsError to be raised ' - 'while loading %s' % INVALID_FILE) + 'while loading %s' % INVALID_FILE) except clientsecrets.InvalidClientSecretsError: pass diff --git a/tests/test_crypt.py b/tests/test_crypt.py index bd2c105..da8797a 100644 --- a/tests/test_crypt.py +++ b/tests/test_crypt.py @@ -39,13 +39,13 @@ def datafile(filename): class Test_pkcs12_key_as_pem(unittest.TestCase): def _make_signed_jwt_creds(self, private_key_file='privatekey.p12', - private_key=None): + private_key=None): private_key = private_key or datafile(private_key_file) return SignedJwtAssertionCredentials( - 'some_account@example.com', - private_key, - scope='read+write', - sub='joe@example.org') + 'some_account@example.com', + private_key, + scope='read+write', + sub='joe@example.org') def _succeeds_helper(self, password=None): self.assertEqual(True, HAS_OPENSSL) @@ -53,7 +53,8 @@ class Test_pkcs12_key_as_pem(unittest.TestCase): credentials = self._make_signed_jwt_creds() if password is None: password = credentials.private_key_password - pem_contents = crypt.pkcs12_key_as_pem(credentials.private_key, password) + pem_contents = crypt.pkcs12_key_as_pem(credentials.private_key, + password) pkcs12_key_as_pem = datafile('pem_from_pkcs12.pem') pkcs12_key_as_pem = _helpers._parse_pem_key(pkcs12_key_as_pem) alternate_pem = datafile('pem_from_pkcs12_alternate.pem') @@ -70,4 +71,5 @@ class Test_pkcs12_key_as_pem(unittest.TestCase): from OpenSSL import crypto credentials = self._make_signed_jwt_creds(private_key=b'NOT_A_KEY') self.assertRaises(crypto.Error, crypt.pkcs12_key_as_pem, - credentials.private_key, credentials.private_key_password) + credentials.private_key, + credentials.private_key_password) diff --git a/tests/test_devshell.py b/tests/test_devshell.py index 2f052f9..8b0816b 100644 --- a/tests/test_devshell.py +++ b/tests/test_devshell.py @@ -32,7 +32,7 @@ class _AuthReferenceServer(threading.Thread): def __init__(self, response=None): super(_AuthReferenceServer, self).__init__(None) self.response = (response or - '["joe@example.com", "fooproj", "sometoken"]') + '["joe@example.com", "fooproj", "sometoken"]') def __enter__(self): self.start_server() @@ -57,8 +57,9 @@ class _AuthReferenceServer(threading.Thread): def run(self): s = None try: - # Do not set the timeout on the socket, leave it in the blocking mode as - # setting the timeout seems to cause spurious EAGAIN errors on OSX. + # Do not set the timeout on the socket, leave it in the blocking + # mode as setting the timeout seems to cause spurious EAGAIN + # errors on OSX. self._socket.settimeout(None) s, unused_addr = self._socket.accept() @@ -121,7 +122,7 @@ class DevshellCredentialsTests(unittest.TestCase): def test_handles_ignores_extra_fields(self): with _AuthReferenceServer( - '["joe@example.com", "fooproj", "sometoken", "extra"]'): + '["joe@example.com", "fooproj", "sometoken", "extra"]'): creds = DevshellCredentials() self.assertEqual('joe@example.com', creds.user_email) self.assertEqual('fooproj', creds.project_id) @@ -133,6 +134,7 @@ class DevshellCredentialsTests(unittest.TestCase): os.path.isdir = lambda path: True with _AuthReferenceServer(): creds = DevshellCredentials() - self.assertRaises(NotImplementedError, save_to_well_known_file, creds) + self.assertRaises(NotImplementedError, + save_to_well_known_file, creds) finally: os.path.isdir = ORIGINAL_ISDIR diff --git a/tests/test_django_orm.py b/tests/test_django_orm.py index ebe4400..88894d1 100644 --- a/tests/test_django_orm.py +++ b/tests/test_django_orm.py @@ -42,7 +42,8 @@ from oauth2client.client import Flow from django.conf import global_settings global_settings.SECRET_KEY = 'NotASecret' os.environ['DJANGO_SETTINGS_MODULE'] = 'django_settings' -sys.modules['django_settings'] = django_settings = imp.new_module('django_settings') +sys.modules['django_settings'] = django_settings = imp.new_module( + 'django_settings') django_settings.SECRET_KEY = 'xyzzy' from oauth2client.django_orm import CredentialsField @@ -50,6 +51,7 @@ from oauth2client.django_orm import FlowField class TestCredentialsField(unittest.TestCase): + def setUp(self): self.field = CredentialsField() self.credentials = Credentials() @@ -59,11 +61,12 @@ class TestCredentialsField(unittest.TestCase): self.assertEquals(self.field.get_internal_type(), 'TextField') def test_field_unpickled(self): - self.assertTrue(isinstance(self.field.to_python(self.pickle), Credentials)) + self.assertTrue(isinstance(self.field.to_python(self.pickle), + Credentials)) def test_field_pickled(self): prep_value = self.field.get_db_prep_value(self.credentials, - connection=None) + connection=None) self.assertEqual(prep_value, self.pickle) diff --git a/tests/test_file.py b/tests/test_file.py index c92c68f..5162df4 100644 --- a/tests/test_file.py +++ b/tests/test_file.py @@ -64,7 +64,7 @@ class OAuth2ClientFileTests(unittest.TestCase): pass def create_test_credentials(self, client_id='some_client_id', - expiration=None): + expiration=None): access_token = 'foo' client_secret = 'cOuDdkfjxxnv+' refresh_token = '1/0/a.df219fjls0' @@ -73,9 +73,9 @@ class OAuth2ClientFileTests(unittest.TestCase): user_agent = 'refresh_checker/1.0' credentials = OAuth2Credentials( - access_token, client_id, client_secret, - refresh_token, token_expiry, token_uri, - user_agent) + access_token, client_id, client_secret, + refresh_token, token_expiry, token_uri, + user_agent) return credentials def test_non_existent_file_storage(self): @@ -104,8 +104,8 @@ class OAuth2ClientFileTests(unittest.TestCase): pickle.dump(credentials, f) f.close() - # Storage should be not be able to read that object, as the capability to - # read and write credentials as pickled objects has been removed. + # Storage should be not be able to read that object, as the capability + # to read and write credentials as pickled objects has been removed. s = file.Storage(FILENAME) read_credentials = s.get() self.assertEquals(None, read_credentials) @@ -120,7 +120,8 @@ class OAuth2ClientFileTests(unittest.TestCase): self.assertEquals(data['_module'], OAuth2Credentials.__module__) def test_token_refresh_store_expired(self): - expiration = datetime.datetime.utcnow() - datetime.timedelta(minutes=15) + expiration = (datetime.datetime.utcnow() - + datetime.timedelta(minutes=15)) credentials = self.create_test_credentials(expiration=expiration) s = file.Storage(FILENAME) @@ -133,16 +134,17 @@ class OAuth2ClientFileTests(unittest.TestCase): access_token = '1/3w' token_response = {'access_token': access_token, 'expires_in': 3600} http = HttpMockSequence([ - ({'status': '200'}, json.dumps(token_response).encode('utf-8')), + ({'status': '200'}, json.dumps(token_response).encode('utf-8')), ]) credentials._refresh(http.request) self.assertEquals(credentials.access_token, access_token) def test_token_refresh_store_expires_soon(self): - # Tests the case where an access token that is valid when it is read from - # the store expires before the original request succeeds. - expiration = datetime.datetime.utcnow() + datetime.timedelta(minutes=15) + # Tests the case where an access token that is valid when it is read + # from the store expires before the original request succeeds. + expiration = (datetime.datetime.utcnow() + + datetime.timedelta(minutes=15)) credentials = self.create_test_credentials(expiration=expiration) s = file.Storage(FILENAME) @@ -155,12 +157,14 @@ class OAuth2ClientFileTests(unittest.TestCase): access_token = '1/3w' token_response = {'access_token': access_token, 'expires_in': 3600} http = HttpMockSequence([ - ({'status': str(http_client.UNAUTHORIZED)}, b'Initial token expired'), - ({'status': str(http_client.UNAUTHORIZED)}, b'Store token expired'), - ({'status': str(http_client.OK)}, - json.dumps(token_response).encode('utf-8')), - ({'status': str(http_client.OK)}, - b'Valid response to original request') + ({'status': str(http_client.UNAUTHORIZED)}, + b'Initial token expired'), + ({'status': str(http_client.UNAUTHORIZED)}, + b'Store token expired'), + ({'status': str(http_client.OK)}, + json.dumps(token_response).encode('utf-8')), + ({'status': str(http_client.OK)}, + b'Valid response to original request') ]) credentials.authorize(http) @@ -168,7 +172,8 @@ class OAuth2ClientFileTests(unittest.TestCase): self.assertEqual(credentials.access_token, access_token) def test_token_refresh_good_store(self): - expiration = datetime.datetime.utcnow() + datetime.timedelta(minutes=15) + expiration = (datetime.datetime.utcnow() + + datetime.timedelta(minutes=15)) credentials = self.create_test_credentials(expiration=expiration) s = file.Storage(FILENAME) @@ -182,7 +187,8 @@ class OAuth2ClientFileTests(unittest.TestCase): self.assertEquals(credentials.access_token, 'bar') def test_token_refresh_stream_body(self): - expiration = datetime.datetime.utcnow() + datetime.timedelta(minutes=15) + expiration = (datetime.datetime.utcnow() + + datetime.timedelta(minutes=15)) credentials = self.create_test_credentials(expiration=expiration) s = file.Storage(FILENAME) @@ -193,13 +199,16 @@ class OAuth2ClientFileTests(unittest.TestCase): s.put(new_cred) valid_access_token = '1/3w' - token_response = {'access_token': valid_access_token, 'expires_in': 3600} + token_response = {'access_token': valid_access_token, + 'expires_in': 3600} http = HttpMockSequence([ - ({'status': str(http_client.UNAUTHORIZED)}, b'Initial token expired'), - ({'status': str(http_client.UNAUTHORIZED)}, b'Store token expired'), - ({'status': str(http_client.OK)}, - json.dumps(token_response).encode('utf-8')), - ({'status': str(http_client.OK)}, 'echo_request_body') + ({'status': str(http_client.UNAUTHORIZED)}, + b'Initial token expired'), + ({'status': str(http_client.UNAUTHORIZED)}, + b'Store token expired'), + ({'status': str(http_client.OK)}, + json.dumps(token_response).encode('utf-8')), + ({'status': str(http_client.OK)}, 'echo_request_body') ]) body = six.StringIO('streaming body') @@ -235,7 +244,8 @@ class OAuth2ClientFileTests(unittest.TestCase): mode = os.stat(FILENAME).st_mode if os.name == 'posix': - self.assertEquals('0o600', oct(stat.S_IMODE(os.stat(FILENAME).st_mode))) + self.assertEquals('0o600', + oct(stat.S_IMODE(os.stat(FILENAME).st_mode))) def test_read_only_file_fail_lock(self): credentials = self.create_test_credentials() @@ -244,10 +254,10 @@ class OAuth2ClientFileTests(unittest.TestCase): os.chmod(FILENAME, 0o400) store = multistore_file.get_credential_storage( - FILENAME, - credentials.client_id, - credentials.user_agent, - ['some-scope', 'some-other-scope']) + FILENAME, + credentials.client_id, + credentials.user_agent, + ['some-scope', 'some-other-scope']) store.put(credentials) if os.name == 'posix': @@ -259,10 +269,10 @@ class OAuth2ClientFileTests(unittest.TestCase): SYMFILENAME = FILENAME + 'sym' os.symlink(FILENAME, SYMFILENAME) store = multistore_file.get_credential_storage( - SYMFILENAME, - 'some_client_id', - 'user-agent/1.0', - ['some-scope', 'some-other-scope']) + SYMFILENAME, + 'some_client_id', + 'user-agent/1.0', + ['some-scope', 'some-other-scope']) try: store.get() self.fail('Should have raised an exception.') @@ -273,10 +283,10 @@ class OAuth2ClientFileTests(unittest.TestCase): def test_multistore_non_existent_file(self): store = multistore_file.get_credential_storage( - FILENAME, - 'some_client_id', - 'user-agent/1.0', - ['some-scope', 'some-other-scope']) + FILENAME, + 'some_client_id', + 'user-agent/1.0', + ['some-scope', 'some-other-scope']) credentials = store.get() self.assertEquals(None, credentials) @@ -285,10 +295,10 @@ class OAuth2ClientFileTests(unittest.TestCase): credentials = self.create_test_credentials() store = multistore_file.get_credential_storage( - FILENAME, - credentials.client_id, - credentials.user_agent, - ['some-scope', 'some-other-scope']) + FILENAME, + credentials.client_id, + credentials.user_agent, + ['some-scope', 'some-other-scope']) store.put(credentials) credentials = store.get() @@ -302,20 +312,22 @@ class OAuth2ClientFileTests(unittest.TestCase): self.assertEquals(None, credentials) if os.name == 'posix': - self.assertEquals('0o600', oct(stat.S_IMODE(os.stat(FILENAME).st_mode))) + self.assertEquals('0o600', + oct(stat.S_IMODE(os.stat(FILENAME).st_mode))) def test_multistore_file_custom_key(self): credentials = self.create_test_credentials() custom_key = {'myapp': 'testing', 'clientid': 'some client'} store = multistore_file.get_credential_storage_custom_key( - FILENAME, custom_key) + FILENAME, custom_key) store.put(credentials) stored_credentials = store.get() self.assertNotEquals(None, stored_credentials) - self.assertEqual(credentials.access_token, stored_credentials.access_token) + self.assertEqual(credentials.access_token, + stored_credentials.access_token) store.delete() stored_credentials = store.get() @@ -327,20 +339,22 @@ class OAuth2ClientFileTests(unittest.TestCase): # store with string key store = multistore_file.get_credential_storage_custom_string_key( - FILENAME, 'mykey') + FILENAME, 'mykey') store.put(credentials) stored_credentials = store.get() self.assertNotEquals(None, stored_credentials) - self.assertEqual(credentials.access_token, stored_credentials.access_token) + self.assertEqual(credentials.access_token, + stored_credentials.access_token) # try retrieving with a dictionary store_dict = multistore_file.get_credential_storage_custom_string_key( - FILENAME, {'key': 'mykey'}) + FILENAME, {'key': 'mykey'}) stored_credentials = store.get() self.assertNotEquals(None, stored_credentials) - self.assertEqual(credentials.access_token, stored_credentials.access_token) + self.assertEqual(credentials.access_token, + stored_credentials.access_token) store.delete() stored_credentials = store.get() @@ -353,16 +367,19 @@ class OAuth2ClientFileTests(unittest.TestCase): # store the credentials using the legacy key method store = multistore_file.get_credential_storage( - FILENAME, 'client_id', 'user_agent', scopes) + FILENAME, 'client_id', 'user_agent', scopes) store.put(credentials) - # retrieve the credentials using a custom key that matches the legacy key + # retrieve the credentials using a custom key that matches the + # legacy key key = {'clientId': 'client_id', 'userAgent': 'user_agent', - 'scope': util.scopes_to_string(scopes)} - store = multistore_file.get_credential_storage_custom_key(FILENAME, key) + 'scope': util.scopes_to_string(scopes)} + store = multistore_file.get_credential_storage_custom_key( + FILENAME, key) stored_credentials = store.get() - self.assertEqual(credentials.access_token, stored_credentials.access_token) + self.assertEqual(credentials.access_token, + stored_credentials.access_token) def test_multistore_file_get_all_keys(self): # start with no keys @@ -373,7 +390,7 @@ class OAuth2ClientFileTests(unittest.TestCase): credentials = self.create_test_credentials(client_id='client1') custom_key = {'myapp': 'testing', 'clientid': 'client1'} store1 = multistore_file.get_credential_storage_custom_key( - FILENAME, custom_key) + FILENAME, custom_key) store1.put(credentials) keys = multistore_file.get_all_credential_keys(FILENAME) @@ -383,7 +400,7 @@ class OAuth2ClientFileTests(unittest.TestCase): credentials = self.create_test_credentials(client_id='client2') string_key = 'string_key' store2 = multistore_file.get_credential_storage_custom_string_key( - FILENAME, string_key) + FILENAME, string_key) store2.put(credentials) keys = multistore_file.get_all_credential_keys(FILENAME) diff --git a/tests/test_gce.py b/tests/test_gce.py index 5683122..5fab9fc 100644 --- a/tests/test_gce.py +++ b/tests/test_gce.py @@ -42,7 +42,7 @@ class AssertionCredentialsTests(unittest.TestCase): return_val = _to_bytes(return_val) http = mock.MagicMock() http.request = mock.MagicMock( - return_value=(mock.Mock(status=200), return_val)) + return_value=(mock.Mock(status=200), return_val)) scopes = ['http://example.com/a', 'http://example.com/b'] credentials = AppAssertionCredentials(scope=scopes) @@ -51,7 +51,7 @@ class AssertionCredentialsTests(unittest.TestCase): self.assertEquals(access_token, credentials.access_token) base_metadata_uri = ('http://metadata.google.internal/0.1/meta-data/' - 'service-accounts/default/acquire') + 'service-accounts/default/acquire') escaped_scopes = urllib.parse.quote(' '.join(scopes), safe='') request_uri = base_metadata_uri + '?scope=' + escaped_scopes http.request.assert_called_once_with(request_uri) @@ -64,15 +64,16 @@ class AssertionCredentialsTests(unittest.TestCase): def test_fail_refresh(self): http = mock.MagicMock() - http.request = mock.MagicMock(return_value=(mock.Mock(status=400), '{}')) + http.request = mock.MagicMock( + return_value=(mock.Mock(status=400), '{}')) c = AppAssertionCredentials(scope=['http://example.com/a', - 'http://example.com/b']) + 'http://example.com/b']) self.assertRaises(AccessTokenRefreshError, c.refresh, http) def test_to_from_json(self): c = AppAssertionCredentials(scope=['http://example.com/a', - 'http://example.com/b']) + 'http://example.com/b']) json = c.to_json() c2 = Credentials.new_from_json(json) @@ -96,8 +97,8 @@ class AssertionCredentialsTests(unittest.TestCase): def test_get_access_token(self): http = mock.MagicMock() http.request = mock.MagicMock( - return_value=(mock.Mock(status=200), - '{"accessToken": "this-is-a-token"}')) + return_value=(mock.Mock(status=200), + '{"accessToken": "this-is-a-token"}')) credentials = AppAssertionCredentials(['dummy_scope']) token = credentials.get_access_token(http=http) @@ -105,8 +106,8 @@ class AssertionCredentialsTests(unittest.TestCase): self.assertEqual(None, token.expires_in) http.request.assert_called_once_with( - 'http://metadata.google.internal/0.1/meta-data/service-accounts/' - 'default/acquire?scope=dummy_scope') + 'http://metadata.google.internal/0.1/meta-data/service-accounts/' + 'default/acquire?scope=dummy_scope') def test_save_to_well_known_file(self): import os @@ -115,6 +116,6 @@ class AssertionCredentialsTests(unittest.TestCase): os.path.isdir = lambda path: True credentials = AppAssertionCredentials([]) self.assertRaises(NotImplementedError, save_to_well_known_file, - credentials) + credentials) finally: os.path.isdir = ORIGINAL_ISDIR diff --git a/tests/test_jwt.py b/tests/test_jwt.py index a56f24e..53aa656 100644 --- a/tests/test_jwt.py +++ b/tests/test_jwt.py @@ -58,7 +58,8 @@ class CryptTests(unittest.TestCase): self._check_sign_and_verify('privatekey.%s' % self.format) def test_sign_and_verify_from_converted_pkcs12(self): - # Tests that following instructions to convert from PKCS12 to PEM works. + # Tests that following instructions to convert from PKCS12 to + # PEM works. if self.format == 'pem': self._check_sign_and_verify('pem_from_pkcs12.pem') @@ -69,7 +70,7 @@ class CryptTests(unittest.TestCase): # We pass in a non-bytes password to make sure all branches # are traversed in tests. signer = self.signer.from_string(private_key, - password=u'notasecret') + password=u'notasecret') signature = signer.sign('foo') verifier = self.verifier.from_string(public_key, True) @@ -83,7 +84,7 @@ class CryptTests(unittest.TestCase): public_key = datafile('publickey.pem') certs = {'foo': public_key} audience = ('https://www.googleapis.com/auth/id?client_id=' - 'external_public_key@testing.gserviceaccount.com') + 'external_public_key@testing.gserviceaccount.com') try: crypt.verify_signed_jwt_with_certs(jwt, certs, audience) self.fail() @@ -97,11 +98,11 @@ class CryptTests(unittest.TestCase): now = int(time.time()) return crypt.make_signed_jwt(signer, { - 'aud': audience, - 'iat': now, - 'exp': now + 300, - 'user': 'billy bob', - 'metadata': {'meta': 'data'}, + 'aud': audience, + 'iat': now, + 'exp': now + 300, + 'user': 'billy bob', + 'metadata': {'meta': 'data'}, }) def test_verify_id_token(self): @@ -117,11 +118,12 @@ class CryptTests(unittest.TestCase): jwt = self._create_signed_jwt() http = HttpMockSequence([ - ({'status': '200'}, datafile('certs.json')), + ({'status': '200'}, datafile('certs.json')), ]) contents = verify_id_token( - jwt, 'some_audience_address@testing.gserviceaccount.com', http=http) + jwt, 'some_audience_address@testing.gserviceaccount.com', + http=http) self.assertEqual('billy bob', contents['user']) self.assertEqual('data', contents['metadata']['meta']) @@ -129,12 +131,12 @@ class CryptTests(unittest.TestCase): jwt = self._create_signed_jwt() http = HttpMockSequence([ - ({'status': '404'}, datafile('certs.json')), + ({'status': '404'}, datafile('certs.json')), ]) self.assertRaises(VerifyJwtTokenError, verify_id_token, jwt, - 'some_audience_address@testing.gserviceaccount.com', - http=http) + 'some_audience_address@testing.gserviceaccount.com', + http=http) def test_verify_id_token_bad_tokens(self): private_key = datafile('privatekey.%s' % self.format) @@ -146,47 +148,49 @@ class CryptTests(unittest.TestCase): self._check_jwt_failure('foo.bar.baz', 'Can\'t parse token') # Bad signature - jwt = b'.'.join([b'foo', crypt._urlsafe_b64encode('{"a":"b"}'), b'baz']) + jwt = b'.'.join([b'foo', + crypt._urlsafe_b64encode('{"a":"b"}'), + b'baz']) self._check_jwt_failure(jwt, 'Invalid token signature') # No expiration signer = self.signer.from_string(private_key) audience = ('https:#www.googleapis.com/auth/id?client_id=' - 'external_public_key@testing.gserviceaccount.com') + 'external_public_key@testing.gserviceaccount.com') jwt = crypt.make_signed_jwt(signer, { - 'aud': audience, - 'iat': time.time(), + 'aud': audience, + 'iat': time.time(), }) self._check_jwt_failure(jwt, 'No exp field in token') # No issued at jwt = crypt.make_signed_jwt(signer, { - 'aud': 'audience', - 'exp': time.time() + 400, + 'aud': 'audience', + 'exp': time.time() + 400, }) self._check_jwt_failure(jwt, 'No iat field in token') # Too early jwt = crypt.make_signed_jwt(signer, { - 'aud': 'audience', - 'iat': time.time() + 301, - 'exp': time.time() + 400, + 'aud': 'audience', + 'iat': time.time() + 301, + 'exp': time.time() + 400, }) self._check_jwt_failure(jwt, 'Token used too early') # Too late jwt = crypt.make_signed_jwt(signer, { - 'aud': 'audience', - 'iat': time.time() - 500, - 'exp': time.time() - 301, + 'aud': 'audience', + 'iat': time.time() - 500, + 'exp': time.time() - 301, }) self._check_jwt_failure(jwt, 'Token used too late') # Wrong target jwt = crypt.make_signed_jwt(signer, { - 'aud': 'somebody else', - 'iat': time.time(), - 'exp': time.time() + 300, + 'aud': 'somebody else', + 'iat': time.time(), + 'exp': time.time() + 300, }) self._check_jwt_failure(jwt, 'Wrong recipient') @@ -223,13 +227,13 @@ class SignedJwtAssertionCredentialsTests(unittest.TestCase): def test_credentials_good(self): private_key = datafile('privatekey.%s' % self.format) credentials = SignedJwtAssertionCredentials( - 'some_account@example.com', - private_key, - scope='read+write', - sub='joe@example.org') + 'some_account@example.com', + private_key, + scope='read+write', + sub='joe@example.org') http = HttpMockSequence([ - ({'status': '200'}, b'{"access_token":"1/3w","expires_in":3600}'), - ({'status': '200'}, 'echo_request_headers'), + ({'status': '200'}, b'{"access_token":"1/3w","expires_in":3600}'), + ({'status': '200'}, 'echo_request_headers'), ]) http = credentials.authorize(http) resp, content = http.request('http://example.org') @@ -238,23 +242,23 @@ class SignedJwtAssertionCredentialsTests(unittest.TestCase): def test_credentials_to_from_json(self): private_key = datafile('privatekey.%s' % self.format) credentials = SignedJwtAssertionCredentials( - 'some_account@example.com', - private_key, - scope='read+write', - sub='joe@example.org') + 'some_account@example.com', + private_key, + scope='read+write', + sub='joe@example.org') json = credentials.to_json() restored = Credentials.new_from_json(json) self.assertEqual(credentials.private_key, restored.private_key) self.assertEqual(credentials.private_key_password, - restored.private_key_password) + restored.private_key_password) self.assertEqual(credentials.kwargs, restored.kwargs) def _credentials_refresh(self, credentials): http = HttpMockSequence([ - ({'status': '200'}, b'{"access_token":"1/3w","expires_in":3600}'), - ({'status': '401'}, b''), - ({'status': '200'}, b'{"access_token":"3/3w","expires_in":3600}'), - ({'status': '200'}, 'echo_request_headers'), + ({'status': '200'}, b'{"access_token":"1/3w","expires_in":3600}'), + ({'status': '401'}, b''), + ({'status': '200'}, b'{"access_token":"3/3w","expires_in":3600}'), + ({'status': '200'}, 'echo_request_headers'), ]) http = credentials.authorize(http) _, content = http.request('http://example.org') @@ -263,10 +267,10 @@ class SignedJwtAssertionCredentialsTests(unittest.TestCase): def test_credentials_refresh_without_storage(self): private_key = datafile('privatekey.%s' % self.format) credentials = SignedJwtAssertionCredentials( - 'some_account@example.com', - private_key, - scope='read+write', - sub='joe@example.org') + 'some_account@example.com', + private_key, + scope='read+write', + sub='joe@example.org') content = self._credentials_refresh(credentials) @@ -275,12 +279,12 @@ class SignedJwtAssertionCredentialsTests(unittest.TestCase): def test_credentials_refresh_with_storage(self): private_key = datafile('privatekey.%s' % self.format) credentials = SignedJwtAssertionCredentials( - 'some_account@example.com', - private_key, - scope='read+write', - sub='joe@example.org') + 'some_account@example.com', + private_key, + scope='read+write', + sub='joe@example.org') - (filehandle, filename) = tempfile.mkstemp() + filehandle, filename = tempfile.mkstemp() os.close(filehandle) store = Storage(filename) store.put(credentials) @@ -293,7 +297,7 @@ class SignedJwtAssertionCredentialsTests(unittest.TestCase): class PEMSignedJwtAssertionCredentialsOpenSSLTests( - SignedJwtAssertionCredentialsTests): + SignedJwtAssertionCredentialsTests): def setUp(self): self.format = 'pem' @@ -301,7 +305,7 @@ class PEMSignedJwtAssertionCredentialsOpenSSLTests( class PEMSignedJwtAssertionCredentialsPyCryptoTests( - SignedJwtAssertionCredentialsTests): + SignedJwtAssertionCredentialsTests): def setUp(self): self.format = 'pem' @@ -314,10 +318,10 @@ class PKCSSignedJwtAssertionCredentialsPyCryptoTests(unittest.TestCase): crypt.Signer = crypt.PyCryptoSigner private_key = datafile('privatekey.p12') credentials = SignedJwtAssertionCredentials( - 'some_account@example.com', - private_key, - scope='read+write', - sub='joe@example.org') + 'some_account@example.com', + private_key, + scope='read+write', + sub='joe@example.org') try: credentials._generate_assertion() self.fail() @@ -326,6 +330,7 @@ class PKCSSignedJwtAssertionCredentialsPyCryptoTests(unittest.TestCase): class TestHasOpenSSLFlag(unittest.TestCase): + def test_true(self): self.assertEqual(True, HAS_OPENSSL) self.assertEqual(True, HAS_CRYPTO) diff --git a/tests/test_keyring.py b/tests/test_keyring.py index 8b0ea0d..63d2a43 100644 --- a/tests/test_keyring.py +++ b/tests/test_keyring.py @@ -34,8 +34,8 @@ class OAuth2ClientKeyringTests(unittest.TestCase): def test_non_existent_credentials_storage(self): with mock.patch.object(keyring, 'get_password', - return_value=None, - autospec=True) as get_password: + return_value=None, + autospec=True) as get_password: s = Storage('my_unit_test', 'me') credentials = s.get() self.assertEquals(None, credentials) @@ -43,8 +43,8 @@ class OAuth2ClientKeyringTests(unittest.TestCase): def test_malformed_credentials_in_storage(self): with mock.patch.object(keyring, 'get_password', - return_value='{', - autospec=True) as get_password: + return_value='{', + autospec=True) as get_password: s = Storage('my_unit_test', 'me') credentials = s.get() self.assertEquals(None, credentials) @@ -59,31 +59,31 @@ class OAuth2ClientKeyringTests(unittest.TestCase): user_agent = 'refresh_checker/1.0' credentials = OAuth2Credentials( - access_token, client_id, client_secret, - refresh_token, token_expiry, GOOGLE_TOKEN_URI, - user_agent) + access_token, client_id, client_secret, + refresh_token, token_expiry, GOOGLE_TOKEN_URI, + user_agent) # Setting autospec on a mock with an iterable side_effect is # currently broken (http://bugs.python.org/issue17826), so instead # we patch twice. with mock.patch.object(keyring, 'get_password', - return_value=None, - autospec=True) as get_password: + return_value=None, + autospec=True) as get_password: with mock.patch.object(keyring, 'set_password', - return_value=None, - autospec=True) as set_password: + return_value=None, + autospec=True) as set_password: s = Storage('my_unit_test', 'me') self.assertEquals(None, s.get()) s.put(credentials) set_password.assert_called_once_with( - 'my_unit_test', 'me', credentials.to_json()) + 'my_unit_test', 'me', credentials.to_json()) get_password.assert_called_once_with('my_unit_test', 'me') with mock.patch.object(keyring, 'get_password', - return_value=credentials.to_json(), - autospec=True) as get_password: + return_value=credentials.to_json(), + autospec=True) as get_password: restored = s.get() self.assertEqual('foo', restored.access_token) self.assertEqual('some_client_id', restored.client_id) diff --git a/tests/test_oauth2client.py b/tests/test_oauth2client.py index 4c3bc46..050f623 100644 --- a/tests/test_oauth2client.py +++ b/tests/test_oauth2client.py @@ -171,7 +171,7 @@ class GoogleCredentialsTests(unittest.TestCase): def setUp(self): self.env_server_software = os.environ.get('SERVER_SOFTWARE', None) self.env_google_application_credentials = ( - os.environ.get(GOOGLE_APPLICATION_CREDENTIALS, None)) + os.environ.get(GOOGLE_APPLICATION_CREDENTIALS, None)) self.env_appdata = os.environ.get('APPDATA', None) self.os_name = os.name from oauth2client import client @@ -180,7 +180,7 @@ class GoogleCredentialsTests(unittest.TestCase): def tearDown(self): self.reset_env('SERVER_SOFTWARE', self.env_server_software) self.reset_env(GOOGLE_APPLICATION_CREDENTIALS, - self.env_google_application_credentials) + self.env_google_application_credentials) self.reset_env('APPDATA', self.env_appdata) os.name = self.os_name @@ -194,7 +194,8 @@ class GoogleCredentialsTests(unittest.TestCase): def validate_service_account_credentials(self, credentials): self.assertTrue(isinstance(credentials, _ServiceAccountCredentials)) self.assertEqual('123', credentials._service_account_id) - self.assertEqual('dummy@google.com', credentials._service_account_email) + self.assertEqual('dummy@google.com', + credentials._service_account_email) self.assertEqual('ABCDEF', credentials._private_key_id) self.assertEqual('', credentials._scopes) @@ -209,17 +210,18 @@ class GoogleCredentialsTests(unittest.TestCase): self.assertEqual('Python client library', credentials.user_agent) def get_a_google_credentials_object(self): - return GoogleCredentials(None, None, None, None, None, None, None, None) + return GoogleCredentials(None, None, None, None, + None, None, None, None) def test_create_scoped_required(self): self.assertFalse( - self.get_a_google_credentials_object().create_scoped_required()) + self.get_a_google_credentials_object().create_scoped_required()) def test_create_scoped(self): credentials = self.get_a_google_credentials_object() self.assertEqual(credentials, credentials.create_scoped(None)) self.assertEqual(credentials, - credentials.create_scoped(['dummy_scope'])) + credentials.create_scoped(['dummy_scope'])) def test_environment_check_gae_production(self): with mock_module_import('google.appengine'): @@ -237,11 +239,12 @@ class GoogleCredentialsTests(unittest.TestCase): os.environ['SERVER_SOFTWARE'] = 'Development/XYZ' with mock_module_import('google.appengine'): with mock.patch.object(urllib.request, 'urlopen', - return_value=MockResponse({}), - autospec=True) as urlopen: + return_value=MockResponse({}), + autospec=True) as urlopen: self.assertTrue(_in_gae_environment()) self.assertFalse(_in_gce_environment()) - # We already know are in GAE, so we shouldn't actually do the urlopen. + # We already know are in GAE, so we shouldn't actually do + # the urlopen. self.assertFalse(urlopen.called) def test_environment_caching(self): @@ -249,7 +252,8 @@ class GoogleCredentialsTests(unittest.TestCase): with mock_module_import('google.appengine'): self.assertTrue(_in_gae_environment()) os.environ['SERVER_SOFTWARE'] = '' - # Even though we no longer pass the environment check, it is cached. + # Even though we no longer pass the environment check, it + # is cached. self.assertTrue(_in_gae_environment()) def test_environment_check_gae_module_on_gce(self): @@ -257,91 +261,92 @@ class GoogleCredentialsTests(unittest.TestCase): os.environ['SERVER_SOFTWARE'] = '' response = MockResponse({'Metadata-Flavor': 'Google'}) with mock.patch.object(urllib.request, 'urlopen', - return_value=response, - autospec=True) as urlopen: + return_value=response, + autospec=True) as urlopen: self.assertFalse(_in_gae_environment()) self.assertTrue(_in_gce_environment()) urlopen.assert_called_once_with( - 'http://169.254.169.254/', timeout=1) + 'http://169.254.169.254/', timeout=1) def test_environment_check_gae_module_unknown(self): with mock_module_import('google.appengine'): os.environ['SERVER_SOFTWARE'] = '' with mock.patch.object(urllib.request, 'urlopen', - return_value=MockResponse({}), - autospec=True) as urlopen: + return_value=MockResponse({}), + autospec=True) as urlopen: self.assertFalse(_in_gae_environment()) self.assertFalse(_in_gce_environment()) urlopen.assert_called_once_with( - 'http://169.254.169.254/', timeout=1) + 'http://169.254.169.254/', timeout=1) def test_environment_check_gce_production(self): os.environ['SERVER_SOFTWARE'] = '' response = MockResponse({'Metadata-Flavor': 'Google'}) with mock.patch.object(urllib.request, 'urlopen', - return_value=response, - autospec=True) as urlopen: + return_value=response, + autospec=True) as urlopen: self.assertFalse(_in_gae_environment()) self.assertTrue(_in_gce_environment()) urlopen.assert_called_once_with( - 'http://169.254.169.254/', timeout=1) + 'http://169.254.169.254/', timeout=1) def test_environment_check_gce_timeout(self): os.environ['SERVER_SOFTWARE'] = '' response = MockResponse({'Metadata-Flavor': 'Google'}) with mock.patch.object(urllib.request, 'urlopen', - return_value=response, - autospec=True) as urlopen: + return_value=response, + autospec=True) as urlopen: urlopen.side_effect = socket.timeout() self.assertFalse(_in_gce_environment()) urlopen.assert_called_once_with( - 'http://169.254.169.254/', timeout=1) + 'http://169.254.169.254/', timeout=1) with mock.patch.object(urllib.request, 'urlopen', - return_value=response, - autospec=True) as urlopen: + return_value=response, + autospec=True) as urlopen: urlopen.side_effect = urllib.error.URLError(socket.timeout()) self.assertFalse(_in_gce_environment()) urlopen.assert_called_once_with( - 'http://169.254.169.254/', timeout=1) + 'http://169.254.169.254/', timeout=1) def test_environment_check_unknown(self): os.environ['SERVER_SOFTWARE'] = '' with mock.patch.object(urllib.request, 'urlopen', - return_value=MockResponse({}), - autospec=True) as urlopen: + return_value=MockResponse({}), + autospec=True) as urlopen: self.assertFalse(_in_gce_environment()) self.assertFalse(_in_gae_environment()) urlopen.assert_called_once_with( - 'http://169.254.169.254/', timeout=1) + 'http://169.254.169.254/', timeout=1) def test_get_environment_variable_file(self): environment_variable_file = datafile( - os.path.join('gcloud', 'application_default_credentials.json')) + os.path.join('gcloud', 'application_default_credentials.json')) os.environ[GOOGLE_APPLICATION_CREDENTIALS] = environment_variable_file self.assertEqual(environment_variable_file, - _get_environment_variable_file()) + _get_environment_variable_file()) def test_get_environment_variable_file_error(self): nonexistent_file = datafile('nonexistent') os.environ[GOOGLE_APPLICATION_CREDENTIALS] = nonexistent_file - # we can't use self.assertRaisesRegexp() because it is only in Python 2.7+ + # we can't use self.assertRaisesRegexp() because it is only in + # Python 2.7+ try: _get_environment_variable_file() self.fail(nonexistent_file + ' should not exist.') except ApplicationDefaultCredentialsError as error: self.assertEqual('File ' + nonexistent_file + - ' (pointed by ' + GOOGLE_APPLICATION_CREDENTIALS + - ' environment variable) does not exist!', - str(error)) + ' (pointed by ' + GOOGLE_APPLICATION_CREDENTIALS + + ' environment variable) does not exist!', + str(error)) def test_get_well_known_file_on_windows(self): ORIGINAL_ISDIR = os.path.isdir try: os.path.isdir = lambda path: True well_known_file = datafile( - os.path.join(client._CLOUDSDK_CONFIG_DIRECTORY, - 'application_default_credentials.json')) + os.path.join(client._CLOUDSDK_CONFIG_DIRECTORY, + 'application_default_credentials.json')) os.name = 'nt' os.environ['APPDATA'] = DATA_DIR self.assertEqual(well_known_file, _get_well_known_file()) @@ -353,7 +358,7 @@ class GoogleCredentialsTests(unittest.TestCase): ORIGINAL_ISDIR = os.path.isdir CUSTOM_DIR = 'CUSTOM_DIR' EXPECTED_FILE = os.path.join(CUSTOM_DIR, - 'application_default_credentials.json') + 'application_default_credentials.json') try: os.environ = {client._CLOUDSDK_CONFIG_ENV_VAR: CUSTOM_DIR} os.path.isdir = lambda path: True @@ -363,20 +368,21 @@ class GoogleCredentialsTests(unittest.TestCase): os.environ = ORIGINAL_ENVIRON os.path.isdir = ORIGINAL_ISDIR - def test_get_application_default_credential_from_file_service_account(self): + def test_get_adc_from_file_service_account(self): credentials_file = datafile( - os.path.join('gcloud', 'application_default_credentials.json')) + os.path.join('gcloud', 'application_default_credentials.json')) credentials = _get_application_default_credential_from_file( - credentials_file) + credentials_file) self.validate_service_account_credentials(credentials) def test_save_to_well_known_file_service_account(self): credential_file = datafile( - os.path.join('gcloud', 'application_default_credentials.json')) + os.path.join('gcloud', 'application_default_credentials.json')) credentials = _get_application_default_credential_from_file( - credential_file) + credential_file) temp_credential_file = datafile( - os.path.join('gcloud', 'temp_well_known_file_service_account.json')) + os.path.join('gcloud', + 'temp_well_known_file_service_account.json')) save_to_well_known_file(credentials, temp_credential_file) with open(temp_credential_file) as f: d = json.load(f) @@ -388,9 +394,9 @@ class GoogleCredentialsTests(unittest.TestCase): def test_save_well_known_file_with_non_existent_config_dir(self): credential_file = datafile( - os.path.join('gcloud', 'application_default_credentials.json')) + os.path.join('gcloud', 'application_default_credentials.json')) credentials = _get_application_default_credential_from_file( - credential_file) + credential_file) ORIGINAL_ISDIR = os.path.isdir try: os.path.isdir = lambda path: False @@ -398,22 +404,23 @@ class GoogleCredentialsTests(unittest.TestCase): finally: os.path.isdir = ORIGINAL_ISDIR - def test_get_application_default_credential_from_file_authorized_user(self): - credentials_file = datafile( - os.path.join('gcloud', - 'application_default_credentials_authorized_user.json')) + def test_get_adc_from_file_authorized_user(self): + credentials_file = datafile(os.path.join( + 'gcloud', + 'application_default_credentials_authorized_user.json')) credentials = _get_application_default_credential_from_file( - credentials_file) + credentials_file) self.validate_google_credentials(credentials) def test_save_to_well_known_file_authorized_user(self): - credentials_file = datafile( - os.path.join('gcloud', - 'application_default_credentials_authorized_user.json')) + credentials_file = datafile(os.path.join( + 'gcloud', + 'application_default_credentials_authorized_user.json')) credentials = _get_application_default_credential_from_file( - credentials_file) + credentials_file) temp_credential_file = datafile( - os.path.join('gcloud', 'temp_well_known_file_authorized_user.json')) + os.path.join('gcloud', + 'temp_well_known_file_authorized_user.json')) save_to_well_known_file(credentials, temp_credential_file) with open(temp_credential_file) as f: d = json.load(f) @@ -425,103 +432,108 @@ class GoogleCredentialsTests(unittest.TestCase): def test_get_application_default_credential_from_malformed_file_1(self): credentials_file = datafile( - os.path.join('gcloud', - 'application_default_credentials_malformed_1.json')) - # we can't use self.assertRaisesRegexp() because it is only in Python 2.7+ + os.path.join('gcloud', + 'application_default_credentials_malformed_1.json')) + # we can't use self.assertRaisesRegexp() because it is only in + # Python 2.7+ try: _get_application_default_credential_from_file(credentials_file) self.fail('An exception was expected!') except ApplicationDefaultCredentialsError as error: self.assertEqual("'type' field should be defined " - "(and have one of the '" + AUTHORIZED_USER + - "' or '" + SERVICE_ACCOUNT + "' values)", - str(error)) + "(and have one of the '" + AUTHORIZED_USER + + "' or '" + SERVICE_ACCOUNT + "' values)", + str(error)) def test_get_application_default_credential_from_malformed_file_2(self): credentials_file = datafile( - os.path.join('gcloud', - 'application_default_credentials_malformed_2.json')) - # we can't use self.assertRaisesRegexp() because it is only in Python 2.7+ + os.path.join('gcloud', + 'application_default_credentials_malformed_2.json')) + # we can't use self.assertRaisesRegexp() because it is only in + # Python 2.7+ try: _get_application_default_credential_from_file(credentials_file) self.fail('An exception was expected!') except ApplicationDefaultCredentialsError as error: - self.assertEqual('The following field(s) must be defined: private_key_id', - str(error)) + self.assertEqual( + 'The following field(s) must be defined: private_key_id', + str(error)) def test_get_application_default_credential_from_malformed_file_3(self): credentials_file = datafile( - os.path.join('gcloud', - 'application_default_credentials_malformed_3.json')) - self.assertRaises(ValueError, _get_application_default_credential_from_file, - credentials_file) + os.path.join('gcloud', + 'application_default_credentials_malformed_3.json')) + self.assertRaises(ValueError, + _get_application_default_credential_from_file, + credentials_file) def test_raise_exception_for_missing_fields(self): missing_fields = ['first', 'second', 'third'] - # we can't use self.assertRaisesRegexp() because it is only in Python 2.7+ + # we can't use self.assertRaisesRegexp() because it is only in + # Python 2.7+ try: _raise_exception_for_missing_fields(missing_fields) self.fail('An exception was expected!') except ApplicationDefaultCredentialsError as error: self.assertEqual('The following field(s) must be defined: ' + - ', '.join(missing_fields), - str(error)) + ', '.join(missing_fields), + str(error)) def test_raise_exception_for_reading_json(self): credential_file = 'any_file' extra_help = ' be good' error = ApplicationDefaultCredentialsError('stuff happens') - # we can't use self.assertRaisesRegexp() because it is only in Python 2.7+ + # we can't use self.assertRaisesRegexp() because it is only in + # Python 2.7+ try: - _raise_exception_for_reading_json(credential_file, extra_help, error) + _raise_exception_for_reading_json(credential_file, + extra_help, error) self.fail('An exception was expected!') except ApplicationDefaultCredentialsError as ex: self.assertEqual('An error was encountered while reading ' - 'json file: ' + credential_file + - extra_help + ': ' + str(error), - str(ex)) + 'json file: ' + credential_file + + extra_help + ': ' + str(error), + str(ex)) - def test_get_application_default_from_environment_variable_service_account( - self): + def test_get_adc_from_environment_variable_service_account(self): os.environ['SERVER_SOFTWARE'] = '' environment_variable_file = datafile( - os.path.join('gcloud', 'application_default_credentials.json')) + os.path.join('gcloud', 'application_default_credentials.json')) os.environ[GOOGLE_APPLICATION_CREDENTIALS] = environment_variable_file self.validate_service_account_credentials( - GoogleCredentials.get_application_default()) + GoogleCredentials.get_application_default()) def test_env_name(self): from oauth2client import client self.assertEqual(None, client.SETTINGS.env_name) - self.test_get_application_default_from_environment_variable_service_account() + self.test_get_adc_from_environment_variable_service_account() self.assertEqual(DEFAULT_ENV_NAME, client.SETTINGS.env_name) - def test_get_application_default_from_environment_variable_authorized_user( - self): + def test_get_adc_from_environment_variable_authorized_user(self): os.environ['SERVER_SOFTWARE'] = '' - environment_variable_file = datafile( - os.path.join('gcloud', - 'application_default_credentials_authorized_user.json')) + environment_variable_file = datafile(os.path.join( + 'gcloud', + 'application_default_credentials_authorized_user.json')) os.environ[GOOGLE_APPLICATION_CREDENTIALS] = environment_variable_file self.validate_google_credentials( - GoogleCredentials.get_application_default()) + GoogleCredentials.get_application_default()) - def test_get_application_default_from_environment_variable_malformed_file( - self): + def test_get_adc_from_environment_variable_malformed_file(self): os.environ['SERVER_SOFTWARE'] = '' environment_variable_file = datafile( - os.path.join('gcloud', - 'application_default_credentials_malformed_3.json')) + os.path.join('gcloud', + 'application_default_credentials_malformed_3.json')) os.environ[GOOGLE_APPLICATION_CREDENTIALS] = environment_variable_file - # we can't use self.assertRaisesRegexp() because it is only in Python 2.7+ + # we can't use self.assertRaisesRegexp() because it is only in + # Python 2.7+ try: GoogleCredentials.get_application_default() self.fail('An exception was expected!') except ApplicationDefaultCredentialsError as error: self.assertTrue(str(error).startswith( - 'An error was encountered while reading json file: ' + - environment_variable_file + ' (pointed to by ' + - GOOGLE_APPLICATION_CREDENTIALS + ' environment variable):')) + 'An error was encountered while reading json file: ' + + environment_variable_file + ' (pointed to by ' + + GOOGLE_APPLICATION_CREDENTIALS + ' environment variable):')) def test_get_application_default_environment_not_set_up(self): # It is normal for this test to fail if run inside @@ -530,7 +542,8 @@ class GoogleCredentialsTests(unittest.TestCase): os.environ['SERVER_SOFTWARE'] = '' os.environ[GOOGLE_APPLICATION_CREDENTIALS] = '' os.environ['APPDATA'] = '' - # we can't use self.assertRaisesRegexp() because it is only in Python 2.7+ + # we can't use self.assertRaisesRegexp() because it is only in + # Python 2.7+ VALID_CONFIG_DIR = client._CLOUDSDK_CONFIG_DIRECTORY ORIGINAL_ISDIR = os.path.isdir try: @@ -546,59 +559,66 @@ class GoogleCredentialsTests(unittest.TestCase): def test_from_stream_service_account(self): credentials_file = datafile( - os.path.join('gcloud', 'application_default_credentials.json')) - credentials = ( - self.get_a_google_credentials_object().from_stream(credentials_file)) + os.path.join('gcloud', 'application_default_credentials.json')) + credentials = self.get_a_google_credentials_object().from_stream( + credentials_file) self.validate_service_account_credentials(credentials) def test_from_stream_authorized_user(self): - credentials_file = datafile( - os.path.join('gcloud', - 'application_default_credentials_authorized_user.json')) - credentials = ( - self.get_a_google_credentials_object().from_stream(credentials_file)) + credentials_file = datafile(os.path.join( + 'gcloud', + 'application_default_credentials_authorized_user.json')) + credentials = self.get_a_google_credentials_object().from_stream( + credentials_file) self.validate_google_credentials(credentials) def test_from_stream_malformed_file_1(self): credentials_file = datafile( - os.path.join('gcloud', - 'application_default_credentials_malformed_1.json')) - # we can't use self.assertRaisesRegexp() because it is only in Python 2.7+ + os.path.join('gcloud', + 'application_default_credentials_malformed_1.json')) + # we can't use self.assertRaisesRegexp() because it is only in + # Python 2.7+ try: - self.get_a_google_credentials_object().from_stream(credentials_file) + self.get_a_google_credentials_object().from_stream( + credentials_file) self.fail('An exception was expected!') except ApplicationDefaultCredentialsError as error: - self.assertEqual("An error was encountered while reading json file: " + - credentials_file + - " (provided as parameter to the from_stream() method): " - "'type' field should be defined (and have one of the '" + - AUTHORIZED_USER + "' or '" + SERVICE_ACCOUNT + - "' values)", - str(error)) + self.assertEqual( + "An error was encountered while reading json file: " + + credentials_file + + " (provided as parameter to the from_stream() method): " + "'type' field should be defined (and have one of the '" + + AUTHORIZED_USER + "' or '" + SERVICE_ACCOUNT + + "' values)", + str(error)) def test_from_stream_malformed_file_2(self): credentials_file = datafile( - os.path.join('gcloud', - 'application_default_credentials_malformed_2.json')) - # we can't use self.assertRaisesRegexp() because it is only in Python 2.7+ + os.path.join('gcloud', + 'application_default_credentials_malformed_2.json')) + # we can't use self.assertRaisesRegexp() because it is only in + # Python 2.7+ try: - self.get_a_google_credentials_object().from_stream(credentials_file) + self.get_a_google_credentials_object().from_stream( + credentials_file) self.fail('An exception was expected!') except ApplicationDefaultCredentialsError as error: - self.assertEqual('An error was encountered while reading json file: ' + - credentials_file + - ' (provided as parameter to the from_stream() method): ' - 'The following field(s) must be defined: ' - 'private_key_id', - str(error)) + self.assertEqual( + 'An error was encountered while reading json file: ' + + credentials_file + + ' (provided as parameter to the from_stream() method): ' + 'The following field(s) must be defined: ' + 'private_key_id', + str(error)) def test_from_stream_malformed_file_3(self): credentials_file = datafile( - os.path.join('gcloud', - 'application_default_credentials_malformed_3.json')) + os.path.join('gcloud', + 'application_default_credentials_malformed_3.json')) self.assertRaises( - ApplicationDefaultCredentialsError, - self.get_a_google_credentials_object().from_stream, credentials_file) + ApplicationDefaultCredentialsError, + self.get_a_google_credentials_object().from_stream, + credentials_file) class DummyDeleteStorage(Storage): @@ -625,12 +645,13 @@ def _token_revoke_test_helper(testcase, status, revoke_raise, http = HttpMock(headers={'status': status}) if revoke_raise: - testcase.assertRaises(TokenRevokeError, testcase.credentials.revoke, http) + testcase.assertRaises(TokenRevokeError, + testcase.credentials.revoke, http) else: testcase.credentials.revoke(http) testcase.assertEqual(getattr(testcase.credentials, token_attr), - testcase.token_from_revoke) + testcase.token_from_revoke) testcase.assertEqual(valid_bool_value, testcase.credentials.invalid) testcase.assertEqual(valid_bool_value, dummy_store.delete_called) @@ -647,28 +668,29 @@ class BasicCredentialsTests(unittest.TestCase): token_expiry = datetime.datetime.utcnow() user_agent = 'refresh_checker/1.0' self.credentials = OAuth2Credentials( - access_token, client_id, client_secret, - refresh_token, token_expiry, GOOGLE_TOKEN_URI, - user_agent, revoke_uri=GOOGLE_REVOKE_URI, scopes='foo', - token_info_uri=GOOGLE_TOKEN_INFO_URI) + access_token, client_id, client_secret, + refresh_token, token_expiry, GOOGLE_TOKEN_URI, + user_agent, revoke_uri=GOOGLE_REVOKE_URI, scopes='foo', + token_info_uri=GOOGLE_TOKEN_INFO_URI) # Provoke a failure if @util.positional is not respected. self.old_positional_enforcement = ( - oauth2client_util.positional_parameters_enforcement) + oauth2client_util.positional_parameters_enforcement) oauth2client_util.positional_parameters_enforcement = ( - oauth2client_util.POSITIONAL_EXCEPTION) + oauth2client_util.POSITIONAL_EXCEPTION) def tearDown(self): oauth2client_util.positional_parameters_enforcement = ( - self.old_positional_enforcement) + self.old_positional_enforcement) def test_token_refresh_success(self): for status_code in REFRESH_STATUS_CODES: token_response = {'access_token': '1/3w', 'expires_in': 3600} http = HttpMockSequence([ - ({'status': status_code}, b''), - ({'status': '200'}, json.dumps(token_response).encode('utf-8')), - ({'status': '200'}, 'echo_request_headers'), + ({'status': status_code}, b''), + ({'status': '200'}, json.dumps(token_response).encode( + 'utf-8')), + ({'status': '200'}, 'echo_request_headers'), ]) http = self.credentials.authorize(http) resp, content = http.request('http://example.com') @@ -690,7 +712,7 @@ class BasicCredentialsTests(unittest.TestCase): token_response = {'access_token': '1/3w', 'expires_in': 3600} encoded_response = json.dumps(token_response).encode('utf-8') http = HttpMockSequence([ - ({'status': '200'}, encoded_response), + ({'status': '200'}, encoded_response), ]) http = self.credentials.authorize(http) http = self.credentials.authorize(http) @@ -699,9 +721,9 @@ class BasicCredentialsTests(unittest.TestCase): def test_token_refresh_failure(self): for status_code in REFRESH_STATUS_CODES: http = HttpMockSequence([ - ({'status': status_code}, b''), - ({'status': '400'}, b'{"error":"access_denied"}'), - ]) + ({'status': status_code}, b''), + ({'status': '400'}, b'{"error":"access_denied"}'), + ]) http = self.credentials.authorize(http) try: http.request('http://example.com') @@ -713,26 +735,26 @@ class BasicCredentialsTests(unittest.TestCase): def test_token_revoke_success(self): _token_revoke_test_helper( - self, '200', revoke_raise=False, - valid_bool_value=True, token_attr='refresh_token') + self, '200', revoke_raise=False, + valid_bool_value=True, token_attr='refresh_token') def test_token_revoke_failure(self): _token_revoke_test_helper( - self, '400', revoke_raise=True, - valid_bool_value=False, token_attr='refresh_token') + self, '400', revoke_raise=True, + valid_bool_value=False, token_attr='refresh_token') def test_token_revoke_fallback(self): original_credentials = self.credentials.to_json() self.credentials.refresh_token = None _token_revoke_test_helper( - self, '200', revoke_raise=False, - valid_bool_value=True, token_attr='access_token') + self, '200', revoke_raise=False, + valid_bool_value=True, token_attr='access_token') self.credentials = self.credentials.from_json(original_credentials) def test_non_401_error_response(self): http = HttpMockSequence([ - ({'status': '400'}, b''), - ]) + ({'status': '400'}, b''), + ]) http = self.credentials.authorize(http) resp, content = http.request('http://example.com') self.assertEqual(400, resp.status) @@ -763,8 +785,8 @@ class BasicCredentialsTests(unittest.TestCase): revoke_uri = str(GOOGLE_REVOKE_URI) user_agent = u'refresh_checker/1.0' credentials = OAuth2Credentials(access_token, client_id, client_secret, - refresh_token, token_expiry, token_uri, - user_agent, revoke_uri=revoke_uri) + refresh_token, token_expiry, token_uri, + user_agent, revoke_uri=revoke_uri) # First, test that we correctly encode basic objects, making sure # to include a bytes object. Note that oauth2client will normalize @@ -780,9 +802,10 @@ class BasicCredentialsTests(unittest.TestCase): # Next, test that we do fail on unicode. unicode_str = six.unichr(40960) + 'abcd' self.assertRaises( - NonAsciiHeaderError, - http.request, - u'http://example.com', method=u'GET', headers={u'foo': unicode_str}) + NonAsciiHeaderError, + http.request, + u'http://example.com', method=u'GET', + headers={u'foo': unicode_str}) def test_no_unicode_in_request_params(self): access_token = u'foo' @@ -794,20 +817,23 @@ class BasicCredentialsTests(unittest.TestCase): revoke_uri = str(GOOGLE_REVOKE_URI) user_agent = u'refresh_checker/1.0' credentials = OAuth2Credentials(access_token, client_id, client_secret, - refresh_token, token_expiry, token_uri, - user_agent, revoke_uri=revoke_uri) + refresh_token, token_expiry, token_uri, + user_agent, revoke_uri=revoke_uri) http = HttpMock(headers={'status': '200'}) http = credentials.authorize(http) - http.request(u'http://example.com', method=u'GET', headers={u'foo': u'bar'}) + http.request(u'http://example.com', method=u'GET', + headers={u'foo': u'bar'}) for k, v in six.iteritems(http.headers): self.assertTrue(isinstance(k, six.binary_type)) self.assertTrue(isinstance(v, six.binary_type)) - # Test again with unicode strings that can't simply be converted to ASCII. + # Test again with unicode strings that can't simply be converted + # to ASCII. try: http.request( - u'http://example.com', method=u'GET', headers={u'foo': u'\N{COMET}'}) + u'http://example.com', method=u'GET', + headers={u'foo': u'\N{COMET}'}) self.fail('Expected exception to be raised.') except NonAsciiHeaderError: pass @@ -819,10 +845,13 @@ class BasicCredentialsTests(unittest.TestCase): def test_get_access_token(self): S = 2 # number of seconds in which the token expires token_response_first = {'access_token': 'first_token', 'expires_in': S} - token_response_second = {'access_token': 'second_token', 'expires_in': S} + token_response_second = {'access_token': 'second_token', + 'expires_in': S} http = HttpMockSequence([ - ({'status': '200'}, json.dumps(token_response_first).encode('utf-8')), - ({'status': '200'}, json.dumps(token_response_second).encode('utf-8')), + ({'status': '200'}, json.dumps(token_response_first).encode( + 'utf-8')), + ({'status': '200'}, json.dumps(token_response_second).encode( + 'utf-8')), ]) token = self.credentials.get_access_token(http=http) @@ -844,7 +873,8 @@ class BasicCredentialsTests(unittest.TestCase): self.assertEqual('second_token', token.access_token) self.assertEqual(S - 1, token.expires_in) self.assertFalse(self.credentials.access_token_expired) - self.assertEqual(token_response_second, self.credentials.token_response) + self.assertEqual(token_response_second, + self.credentials.token_response) def test_has_scopes(self): self.assertTrue(self.credentials.has_scopes('foo')) @@ -866,23 +896,25 @@ class BasicCredentialsTests(unittest.TestCase): info_response_first = {'scope': 'foo bar'} info_response_second = {'error_description': 'abcdef'} http = HttpMockSequence([ - ({'status': '200'}, json.dumps(info_response_first).encode('utf-8')), - ({'status': '400'}, json.dumps(info_response_second).encode('utf-8')), - ({'status': '500'}, b''), + ({'status': '200'}, json.dumps(info_response_first).encode( + 'utf-8')), + ({'status': '400'}, json.dumps(info_response_second).encode( + 'utf-8')), + ({'status': '500'}, b''), ]) self.credentials.retrieve_scopes(http) self.assertEqual(set(['foo', 'bar']), self.credentials.scopes) self.assertRaises( - Error, - self.credentials.retrieve_scopes, - http) + Error, + self.credentials.retrieve_scopes, + http) self.assertRaises( - Error, - self.credentials.retrieve_scopes, - http) + Error, + self.credentials.retrieve_scopes, + http) class AccessTokenCredentialsTests(unittest.TestCase): @@ -891,13 +923,13 @@ class AccessTokenCredentialsTests(unittest.TestCase): access_token = 'foo' user_agent = 'refresh_checker/1.0' self.credentials = AccessTokenCredentials(access_token, user_agent, - revoke_uri=GOOGLE_REVOKE_URI) + revoke_uri=GOOGLE_REVOKE_URI) def test_token_refresh_success(self): for status_code in REFRESH_STATUS_CODES: http = HttpMockSequence([ - ({'status': status_code}, b''), - ]) + ({'status': status_code}, b''), + ]) http = self.credentials.authorize(http) try: resp, content = http.request('http://example.com') @@ -909,26 +941,26 @@ class AccessTokenCredentialsTests(unittest.TestCase): def test_token_revoke_success(self): _token_revoke_test_helper( - self, '200', revoke_raise=False, - valid_bool_value=True, token_attr='access_token') + self, '200', revoke_raise=False, + valid_bool_value=True, token_attr='access_token') def test_token_revoke_failure(self): _token_revoke_test_helper( - self, '400', revoke_raise=True, - valid_bool_value=False, token_attr='access_token') + self, '400', revoke_raise=True, + valid_bool_value=False, token_attr='access_token') def test_non_401_error_response(self): http = HttpMockSequence([ - ({'status': '400'}, b''), - ]) + ({'status': '400'}, b''), + ]) http = self.credentials.authorize(http) resp, content = http.request('http://example.com') self.assertEqual(400, resp.status) def test_auth_header_sent(self): http = HttpMockSequence([ - ({'status': '200'}, 'echo_request_headers'), - ]) + ({'status': '200'}, 'echo_request_headers'), + ]) http = self.credentials.authorize(http) resp, content = http.request('http://example.com') self.assertEqual(b'Bearer foo', content[b'Authorization']) @@ -945,34 +977,34 @@ class TestAssertionCredentials(unittest.TestCase): def setUp(self): user_agent = 'fun/2.0' - self.credentials = self.AssertionCredentialsTestImpl(self.assertion_type, - user_agent=user_agent) + self.credentials = self.AssertionCredentialsTestImpl( + self.assertion_type, user_agent=user_agent) def test_assertion_body(self): body = urllib.parse.parse_qs( - self.credentials._generate_refresh_request_body()) + self.credentials._generate_refresh_request_body()) self.assertEqual(self.assertion_text, body['assertion'][0]) self.assertEqual('urn:ietf:params:oauth:grant-type:jwt-bearer', - body['grant_type'][0]) + body['grant_type'][0]) def test_assertion_refresh(self): http = HttpMockSequence([ - ({'status': '200'}, b'{"access_token":"1/3w"}'), - ({'status': '200'}, 'echo_request_headers'), - ]) + ({'status': '200'}, b'{"access_token":"1/3w"}'), + ({'status': '200'}, 'echo_request_headers'), + ]) http = self.credentials.authorize(http) resp, content = http.request('http://example.com') self.assertEqual(b'Bearer 1/3w', content[b'Authorization']) def test_token_revoke_success(self): _token_revoke_test_helper( - self, '200', revoke_raise=False, - valid_bool_value=True, token_attr='access_token') + self, '200', revoke_raise=False, + valid_bool_value=True, token_attr='access_token') def test_token_revoke_failure(self): _token_revoke_test_helper( - self, '400', revoke_raise=True, - valid_bool_value=False, token_attr='access_token') + self, '400', revoke_raise=True, + valid_bool_value=False, token_attr='access_token') class UpdateQueryParamsTest(unittest.TestCase): @@ -1005,7 +1037,6 @@ class ExtractIdTokenTest(unittest.TestCase): body_json = json.dumps(body).encode('ascii') payload = base64.urlsafe_b64encode(body_json).strip(b'=') jwt = b'stuff.' + payload - self.assertRaises(VerifyJwtTokenError, _extract_id_token, jwt) @@ -1013,12 +1044,12 @@ class OAuth2WebServerFlowTest(unittest.TestCase): def setUp(self): self.flow = OAuth2WebServerFlow( - client_id='client_id+1', - client_secret='secret+1', - scope='foo', - redirect_uri=OOB_CALLBACK_URN, - user_agent='unittest-sample/1.0', - revoke_uri='dummy_revoke_uri', + client_id='client_id+1', + client_secret='secret+1', + scope='foo', + redirect_uri=OOB_CALLBACK_URN, + user_agent='unittest-sample/1.0', + revoke_uri='dummy_revoke_uri', ) def test_construct_authorize_url(self): @@ -1036,13 +1067,13 @@ class OAuth2WebServerFlowTest(unittest.TestCase): def test_override_flow_via_kwargs(self): """Passing kwargs to override defaults.""" flow = OAuth2WebServerFlow( - client_id='client_id+1', - client_secret='secret+1', - scope='foo', - redirect_uri=OOB_CALLBACK_URN, - user_agent='unittest-sample/1.0', - access_type='online', - response_type='token' + client_id='client_id+1', + client_secret='secret+1', + scope='foo', + redirect_uri=OOB_CALLBACK_URN, + user_agent='unittest-sample/1.0', + access_type='online', + response_type='token' ) authorize_url = flow.step1_get_authorize_url() @@ -1059,22 +1090,24 @@ class OAuth2WebServerFlowTest(unittest.TestCase): def test_exchange_failure(self): http = HttpMockSequence([ - ({'status': '400'}, b'{"error":"invalid_request"}'), - ]) + ({'status': '400'}, b'{"error":"invalid_request"}'), + ]) try: - credentials = self.flow.step2_exchange('some random code', http=http) - self.fail('should raise exception if exchange doesn\'t get 200') + credentials = self.flow.step2_exchange('some random code', + http=http) + self.fail('should raise exception if exchange doesn\'t get 200') except FlowExchangeError: pass def test_urlencoded_exchange_failure(self): http = HttpMockSequence([ - ({'status': '400'}, b'error=invalid_request'), + ({'status': '400'}, b'error=invalid_request'), ]) try: - credentials = self.flow.step2_exchange('some random code', http=http) + credentials = self.flow.step2_exchange('some random code', + http=http) self.fail('should raise exception if exchange doesn\'t get 200') except FlowExchangeError as e: self.assertEqual('invalid_request', str(e)) @@ -1084,27 +1117,28 @@ class OAuth2WebServerFlowTest(unittest.TestCase): # in place of regular string. # This test makes sure no strange object-to-string coversion # exceptions are being raised instead of FlowExchangeError. - http = HttpMockSequence([ - ({'status': '400'}, - b""" {"error": { - "type": "OAuthException", - "message": "Error validating verification code."} }"""), - ]) + payload = (b'{' + b' "error": {' + b' "message": "Error validating verification code.",' + b' "type": "OAuthException"' + b' }' + b'}') + http = HttpMockSequence([({'status': '400'}, payload)]) try: - credentials = self.flow.step2_exchange('some random code', http=http) + credentials = self.flow.step2_exchange('some random code', + http=http) self.fail('should raise exception if exchange doesn\'t get 200') except FlowExchangeError as e: pass def test_exchange_success(self): - http = HttpMockSequence([ - ({'status': '200'}, - b"""{ "access_token":"SlAV32hkKG", - "expires_in":3600, - "refresh_token":"8xLOxBtZp8" }"""), - ]) - + payload = (b'{' + b' "access_token":"SlAV32hkKG",' + b' "expires_in":3600,' + b' "refresh_token":"8xLOxBtZp8"' + b'}') + http = HttpMockSequence([({'status': '200'}, payload)]) credentials = self.flow.step2_exchange('some random code', http=http) self.assertEqual('SlAV32hkKG', credentials.access_token) self.assertNotEqual(None, credentials.token_expiry) @@ -1124,35 +1158,36 @@ class OAuth2WebServerFlowTest(unittest.TestCase): return name in self.d code = 'some random code' - not_a_dict = FakeDict({'code': code}) - payload = (b'{' - b' "access_token":"SlAV32hkKG",' - b' "expires_in":3600,' - b' "refresh_token":"8xLOxBtZp8"' - b'}') - http = HttpMockSequence([({'status': '200'}, payload), ]) + not_a_dict = FakeDict({'code': code}) + payload = (b'{' + b' "access_token":"SlAV32hkKG",' + b' "expires_in":3600,' + b' "refresh_token":"8xLOxBtZp8"' + b'}') + http = HttpMockSequence([({'status': '200'}, payload)]) credentials = self.flow.step2_exchange(not_a_dict, http=http) - self.assertEqual('SlAV32hkKG', credentials.access_token) - self.assertNotEqual(None, credentials.token_expiry) - self.assertEqual('8xLOxBtZp8', credentials.refresh_token) - self.assertEqual('dummy_revoke_uri', credentials.revoke_uri) - self.assertEqual(set(['foo']), credentials.scopes) - request_code = urllib.parse.parse_qs(http.requests[0]['body'])['code'][0] - self.assertEqual(code, request_code) + self.assertEqual('SlAV32hkKG', credentials.access_token) + self.assertNotEqual(None, credentials.token_expiry) + self.assertEqual('8xLOxBtZp8', credentials.refresh_token) + self.assertEqual('dummy_revoke_uri', credentials.revoke_uri) + self.assertEqual(set(['foo']), credentials.scopes) + request_code = urllib.parse.parse_qs( + http.requests[0]['body'])['code'][0] + self.assertEqual(code, request_code) def test_exchange_using_authorization_header(self): auth_header = 'Basic Y2xpZW50X2lkKzE6c2VjcmV0KzE=', flow = OAuth2WebServerFlow( - client_id='client_id+1', - authorization_header=auth_header, - scope='foo', - redirect_uri=OOB_CALLBACK_URN, - user_agent='unittest-sample/1.0', - revoke_uri='dummy_revoke_uri', + client_id='client_id+1', + authorization_header=auth_header, + scope='foo', + redirect_uri=OOB_CALLBACK_URN, + user_agent='unittest-sample/1.0', + revoke_uri='dummy_revoke_uri', ) http = HttpMockSequence([ - ({'status': '200'}, b'access_token=SlAV32hkKG'), + ({'status': '200'}, b'access_token=SlAV32hkKG'), ]) credentials = flow.step2_exchange('some random code', http=http) @@ -1166,7 +1201,7 @@ class OAuth2WebServerFlowTest(unittest.TestCase): def test_urlencoded_exchange_success(self): http = HttpMockSequence([ - ({'status': '200'}, b'access_token=SlAV32hkKG&expires_in=3600'), + ({'status': '200'}, b'access_token=SlAV32hkKG&expires_in=3600'), ]) credentials = self.flow.step2_exchange('some random code', http=http) @@ -1175,38 +1210,40 @@ class OAuth2WebServerFlowTest(unittest.TestCase): def test_urlencoded_expires_param(self): http = HttpMockSequence([ - # Note the 'expires=3600' where you'd normally - # have if named 'expires_in' - ({'status': '200'}, b'access_token=SlAV32hkKG&expires=3600'), + # Note the 'expires=3600' where you'd normally + # have if named 'expires_in' + ({'status': '200'}, b'access_token=SlAV32hkKG&expires=3600'), ]) credentials = self.flow.step2_exchange('some random code', http=http) self.assertNotEqual(None, credentials.token_expiry) def test_exchange_no_expires_in(self): - http = HttpMockSequence([ - ({'status': '200'}, b"""{ "access_token":"SlAV32hkKG", - "refresh_token":"8xLOxBtZp8" }"""), - ]) + payload = (b'{' + b' "access_token":"SlAV32hkKG",' + b' "refresh_token":"8xLOxBtZp8"' + b'}') + http = HttpMockSequence([({'status': '200'}, payload)]) credentials = self.flow.step2_exchange('some random code', http=http) self.assertEqual(None, credentials.token_expiry) def test_urlencoded_exchange_no_expires_in(self): http = HttpMockSequence([ - # This might be redundant but just to make sure - # urlencoded access_token gets parsed correctly - ({'status': '200'}, b'access_token=SlAV32hkKG'), + # This might be redundant but just to make sure + # urlencoded access_token gets parsed correctly + ({'status': '200'}, b'access_token=SlAV32hkKG'), ]) credentials = self.flow.step2_exchange('some random code', http=http) self.assertEqual(None, credentials.token_expiry) def test_exchange_fails_if_no_code(self): - http = HttpMockSequence([ - ({'status': '200'}, b"""{ "access_token":"SlAV32hkKG", - "refresh_token":"8xLOxBtZp8" }"""), - ]) + payload = (b'{' + b' "access_token":"SlAV32hkKG",' + b' "refresh_token":"8xLOxBtZp8"' + b'}') + http = HttpMockSequence([({'status': '200'}, payload)]) code = {'error': 'thou shall not pass'} try: @@ -1216,28 +1253,29 @@ class OAuth2WebServerFlowTest(unittest.TestCase): self.assertTrue('shall not pass' in str(e)) def test_exchange_id_token_fail(self): - http = HttpMockSequence([ - ({'status': '200'}, b"""{ "access_token":"SlAV32hkKG", - "refresh_token":"8xLOxBtZp8", - "id_token": "stuff.payload"}"""), - ]) + payload = (b'{' + b' "access_token":"SlAV32hkKG",' + b' "refresh_token":"8xLOxBtZp8",' + b' "id_token": "stuff.payload"' + b'}') + http = HttpMockSequence([({'status': '200'}, payload)]) self.assertRaises(VerifyJwtTokenError, self.flow.step2_exchange, - 'some random code', http=http) + 'some random code', http=http) def test_exchange_id_token(self): body = {'foo': 'bar'} body_json = json.dumps(body).encode('ascii') payload = base64.urlsafe_b64encode(body_json).strip(b'=') jwt = (base64.urlsafe_b64encode(b'stuff') + b'.' + payload + b'.' + - base64.urlsafe_b64encode(b'signature')) - - http = HttpMockSequence([ - ({'status': '200'}, ("""{ "access_token":"SlAV32hkKG", - "refresh_token":"8xLOxBtZp8", - "id_token": "%s"}""" % jwt).encode('utf-8')), - ]) + base64.urlsafe_b64encode(b'signature')) + payload = (b'{' + b' "access_token":"SlAV32hkKG",' + b' "refresh_token":"8xLOxBtZp8",' + b' "id_token": "' + jwt + '"' + b'}') + http = HttpMockSequence([({'status': '200'}, payload)]) credentials = self.flow.step2_exchange('some random code', http=http) self.assertEqual(credentials.id_token, body) @@ -1249,11 +1287,12 @@ class FlowFromCachedClientsecrets(unittest.TestCase): load_and_cache('client_secrets.json', 'some_secrets', cache_mock) flow = flow_from_clientsecrets( - 'some_secrets', '', redirect_uri='oob', cache=cache_mock) + 'some_secrets', '', redirect_uri='oob', cache=cache_mock) self.assertEqual('foo_client_secret', flow.client_secret) class CredentialsFromCodeTests(unittest.TestCase): + def setUp(self): self.client_id = 'client_id_abc' self.client_secret = 'secret_use_code' @@ -1265,63 +1304,66 @@ class CredentialsFromCodeTests(unittest.TestCase): token = 'asdfghjkl' payload = json.dumps({'access_token': token, 'expires_in': 3600}) http = HttpMockSequence([ - ({'status': '200'}, payload.encode('utf-8')), + ({'status': '200'}, payload.encode('utf-8')), ]) credentials = credentials_from_code(self.client_id, self.client_secret, - self.scope, self.code, redirect_uri=self.redirect_uri, - http=http) + self.scope, self.code, + redirect_uri=self.redirect_uri, + http=http) self.assertEqual(credentials.access_token, token) self.assertNotEqual(None, credentials.token_expiry) self.assertEqual(set(['foo']), credentials.scopes) def test_exchange_code_for_token_fail(self): http = HttpMockSequence([ - ({'status': '400'}, b'{"error":"invalid_request"}'), - ]) + ({'status': '400'}, b'{"error":"invalid_request"}'), + ]) try: - credentials = credentials_from_code(self.client_id, self.client_secret, - self.scope, self.code, redirect_uri=self.redirect_uri, - http=http) + credentials = credentials_from_code(self.client_id, + self.client_secret, + self.scope, self.code, + redirect_uri=self.redirect_uri, + http=http) self.fail('should raise exception if exchange doesn\'t get 200') except FlowExchangeError: pass def test_exchange_code_and_file_for_token(self): - http = HttpMockSequence([ - ({'status': '200'}, - b"""{ "access_token":"asdfghjkl", - "expires_in":3600 }"""), - ]) + payload = (b'{' + b' "access_token":"asdfghjkl",' + b' "expires_in":3600' + b'}') + http = HttpMockSequence([({'status': '200'}, payload)]) credentials = credentials_from_clientsecrets_and_code( - datafile('client_secrets.json'), self.scope, - self.code, http=http) + datafile('client_secrets.json'), self.scope, + self.code, http=http) self.assertEqual(credentials.access_token, 'asdfghjkl') self.assertNotEqual(None, credentials.token_expiry) self.assertEqual(set(['foo']), credentials.scopes) def test_exchange_code_and_cached_file_for_token(self): http = HttpMockSequence([ - ({'status': '200'}, b'{ "access_token":"asdfghjkl"}'), - ]) + ({'status': '200'}, b'{ "access_token":"asdfghjkl"}'), + ]) cache_mock = CacheMock() load_and_cache('client_secrets.json', 'some_secrets', cache_mock) credentials = credentials_from_clientsecrets_and_code( - 'some_secrets', self.scope, - self.code, http=http, cache=cache_mock) + 'some_secrets', self.scope, + self.code, http=http, cache=cache_mock) self.assertEqual(credentials.access_token, 'asdfghjkl') self.assertEqual(set(['foo']), credentials.scopes) def test_exchange_code_and_file_for_token_fail(self): http = HttpMockSequence([ - ({'status': '400'}, b'{"error":"invalid_request"}'), - ]) + ({'status': '400'}, b'{"error":"invalid_request"}'), + ]) try: credentials = credentials_from_clientsecrets_and_code( - datafile('client_secrets.json'), self.scope, - self.code, http=http) + datafile('client_secrets.json'), self.scope, + self.code, http=http) self.fail('should raise exception if exchange doesn\'t get 200') except FlowExchangeError: pass diff --git a/tests/test_service_account.py b/tests/test_service_account.py index 3306341..7ed0a6d 100644 --- a/tests/test_service_account.py +++ b/tests/test_service_account.py @@ -38,24 +38,26 @@ def datafile(filename): class ServiceAccountCredentialsTests(unittest.TestCase): + def setUp(self): self.service_account_id = '123' self.service_account_email = 'dummy@google.com' self.private_key_id = 'ABCDEF' self.private_key = datafile('pem_from_pkcs12.pem') self.scopes = ['dummy_scope'] - self.credentials = _ServiceAccountCredentials(self.service_account_id, - self.service_account_email, - self.private_key_id, - self.private_key, - []) + self.credentials = _ServiceAccountCredentials( + self.service_account_id, + self.service_account_email, + self.private_key_id, + self.private_key, + []) def test_sign_blob(self): private_key_id, signature = self.credentials.sign_blob('Google') self.assertEqual(self.private_key_id, private_key_id) pub_key = rsa.PublicKey.load_pkcs1_openssl_pem( - datafile('publickey_openssl.pem')) + datafile('publickey_openssl.pem')) self.assertTrue(rsa.pkcs1.verify(b'Google', signature, pub_key)) @@ -73,32 +75,37 @@ class ServiceAccountCredentialsTests(unittest.TestCase): def test_service_account_email(self): self.assertEqual(self.service_account_email, - self.credentials.service_account_email) + self.credentials.service_account_email) def test_create_scoped_required_without_scopes(self): self.assertTrue(self.credentials.create_scoped_required()) def test_create_scoped_required_with_scopes(self): - self.credentials = _ServiceAccountCredentials(self.service_account_id, - self.service_account_email, - self.private_key_id, - self.private_key, - self.scopes) + self.credentials = _ServiceAccountCredentials( + self.service_account_id, + self.service_account_email, + self.private_key_id, + self.private_key, + self.scopes) self.assertFalse(self.credentials.create_scoped_required()) def test_create_scoped(self): new_credentials = self.credentials.create_scoped(self.scopes) self.assertNotEqual(self.credentials, new_credentials) - self.assertTrue(isinstance(new_credentials, _ServiceAccountCredentials)) + self.assertTrue(isinstance(new_credentials, + _ServiceAccountCredentials)) self.assertEqual('dummy_scope', new_credentials._scopes) def test_access_token(self): S = 2 # number of seconds in which the token expires token_response_first = {'access_token': 'first_token', 'expires_in': S} - token_response_second = {'access_token': 'second_token', 'expires_in': S} + token_response_second = {'access_token': 'second_token', + 'expires_in': S} http = HttpMockSequence([ - ({'status': '200'}, json.dumps(token_response_first).encode('utf-8')), - ({'status': '200'}, json.dumps(token_response_second).encode('utf-8')), + ({'status': '200'}, + json.dumps(token_response_first).encode('utf-8')), + ({'status': '200'}, + json.dumps(token_response_second).encode('utf-8')), ]) token = self.credentials.get_access_token(http=http) @@ -120,4 +127,5 @@ class ServiceAccountCredentialsTests(unittest.TestCase): self.assertEqual('second_token', token.access_token) self.assertEqual(S - 1, token.expires_in) self.assertFalse(self.credentials.access_token_expired) - self.assertEqual(token_response_second, self.credentials.token_response) + self.assertEqual(token_response_second, + self.credentials.token_response) diff --git a/tests/test_tools.py b/tests/test_tools.py index 6b32e39..0a23216 100644 --- a/tests/test_tools.py +++ b/tests/test_tools.py @@ -13,7 +13,8 @@ class TestClientRedirectServer(unittest.TestCase): # create a ClientRedirectServer and run it in a thread to listen # for a mock GET request with the access token # the server should return a 200 message and store the token - httpd = tools.ClientRedirectServer(('localhost', 0), tools.ClientRedirectHandler) + httpd = tools.ClientRedirectServer(('localhost', 0), + tools.ClientRedirectHandler) code = 'foo' url = 'http://localhost:%i?code=%s' % (httpd.server_address[1], code) t = threading.Thread(target=httpd.handle_request) diff --git a/tests/test_util.py b/tests/test_util.py index 7c781dd..f2e67f0 100644 --- a/tests/test_util.py +++ b/tests/test_util.py @@ -11,17 +11,17 @@ class ScopeToStringTests(unittest.TestCase): def test_iterables(self): cases = [ - ('', ''), - ('', ()), - ('', []), - ('', ('', )), - ('', ['', ]), - ('a', ('a', )), - ('b', ['b', ]), - ('a b', ['a', 'b']), - ('a b', ('a', 'b')), - ('a b', 'a b'), - ('a b', (s for s in ['a', 'b'])), + ('', ''), + ('', ()), + ('', []), + ('', ('',)), + ('', ['', ]), + ('a', ('a',)), + ('b', ['b', ]), + ('a b', ['a', 'b']), + ('a b', ('a', 'b')), + ('a b', 'a b'), + ('a b', (s for s in ['a', 'b'])), ] for expected, case in cases: self.assertEqual(expected, util.scopes_to_string(case)) @@ -31,10 +31,10 @@ class StringToScopeTests(unittest.TestCase): def test_conversion(self): cases = [ - (['a', 'b'], ['a', 'b']), - ('', []), - ('a', ['a']), - ('a b c d e f', ['a', 'b', 'c', 'd', 'e', 'f']), + (['a', 'b'], ['a', 'b']), + ('', []), + ('a', ['a']), + ('a b c d e f', ['a', 'b', 'c', 'd', 'e', 'f']), ] for case, expected in cases: @@ -44,15 +44,16 @@ class StringToScopeTests(unittest.TestCase): class KeyConversionTests(unittest.TestCase): def test_key_conversions(self): - d = {'somekey': 'some value', 'another': 'something else', 'onemore': 'foo'} + d = {'somekey': 'some value', 'another': 'something else', + 'onemore': 'foo'} tuple_key = util.dict_to_tuple_key(d) # the resulting key should be naturally sorted self.assertEqual( - (('another', 'something else'), - ('onemore', 'foo'), - ('somekey', 'some value')), - tuple_key) + (('another', 'something else'), + ('onemore', 'foo'), + ('somekey', 'some value')), + tuple_key) # check we get the original dictionary back self.assertEqual(d, dict(tuple_key)) diff --git a/tests/test_xsrfutil.py b/tests/test_xsrfutil.py index 6f36bbc..f2c61aa 100644 --- a/tests/test_xsrfutil.py +++ b/tests/test_xsrfutil.py @@ -39,73 +39,74 @@ class XsrfUtilTests(unittest.TestCase): def testGenerateAndValidateToken(self): """Test generating and validating a token.""" token = xsrfutil.generate_token(TEST_KEY, - TEST_USER_ID_1, - action_id=TEST_ACTION_ID_1, - when=TEST_TIME) + TEST_USER_ID_1, + action_id=TEST_ACTION_ID_1, + when=TEST_TIME) # Check that the token is considered valid when it should be. self.assertTrue(xsrfutil.validate_token(TEST_KEY, - token, - TEST_USER_ID_1, - action_id=TEST_ACTION_ID_1, - current_time=TEST_TIME)) + token, + TEST_USER_ID_1, + action_id=TEST_ACTION_ID_1, + current_time=TEST_TIME)) # Should still be valid 15 minutes later. later15mins = TEST_TIME + 15 * 60 self.assertTrue(xsrfutil.validate_token(TEST_KEY, - token, - TEST_USER_ID_1, - action_id=TEST_ACTION_ID_1, - current_time=later15mins)) + token, + TEST_USER_ID_1, + action_id=TEST_ACTION_ID_1, + current_time=later15mins)) # But not if beyond the timeout. later2hours = TEST_TIME + 2 * 60 * 60 self.assertFalse(xsrfutil.validate_token(TEST_KEY, - token, - TEST_USER_ID_1, - action_id=TEST_ACTION_ID_1, - current_time=later2hours)) + token, + TEST_USER_ID_1, + action_id=TEST_ACTION_ID_1, + current_time=later2hours)) # Or if the key is different. self.assertFalse(xsrfutil.validate_token('another key', - token, - TEST_USER_ID_1, - action_id=TEST_ACTION_ID_1, - current_time=later15mins)) + token, + TEST_USER_ID_1, + action_id=TEST_ACTION_ID_1, + current_time=later15mins)) # Or the user ID.... self.assertFalse(xsrfutil.validate_token(TEST_KEY, - token, - TEST_USER_ID_2, - action_id=TEST_ACTION_ID_1, - current_time=later15mins)) + token, + TEST_USER_ID_2, + action_id=TEST_ACTION_ID_1, + current_time=later15mins)) # Or the action ID... self.assertFalse(xsrfutil.validate_token(TEST_KEY, - token, - TEST_USER_ID_1, - action_id=TEST_ACTION_ID_2, - current_time=later15mins)) + token, + TEST_USER_ID_1, + action_id=TEST_ACTION_ID_2, + current_time=later15mins)) # Invalid when truncated self.assertFalse(xsrfutil.validate_token(TEST_KEY, - token[:-1], - TEST_USER_ID_1, - action_id=TEST_ACTION_ID_1, - current_time=later15mins)) + token[:-1], + TEST_USER_ID_1, + action_id=TEST_ACTION_ID_1, + current_time=later15mins)) # Invalid with extra garbage self.assertFalse(xsrfutil.validate_token(TEST_KEY, - token + b'x', - TEST_USER_ID_1, - action_id=TEST_ACTION_ID_1, - current_time=later15mins)) + token + b'x', + TEST_USER_ID_1, + action_id=TEST_ACTION_ID_1, + current_time=later15mins)) # Invalid with token of None self.assertFalse(xsrfutil.validate_token(TEST_KEY, - None, - TEST_USER_ID_1, - action_id=TEST_ACTION_ID_1)) + None, + TEST_USER_ID_1, + action_id=TEST_ACTION_ID_1)) + if __name__ == '__main__': unittest.main()