Merge pull request #355 from jonparrott/move-multistore-tests

Move tests that only apply to multistore_file out of test_file.py
This commit is contained in:
Danny Hermes
2015-12-08 12:55:17 -08:00
2 changed files with 214 additions and 182 deletions

View File

@@ -32,9 +32,6 @@ from .http_mock import HttpMockSequence
import six import six
from oauth2client import file from oauth2client import file
from oauth2client import locked_file
from oauth2client import multistore_file
from oauth2client import util
from oauth2client.client import AccessTokenCredentials from oauth2client.client import AccessTokenCredentials
from oauth2client.client import OAuth2Credentials from oauth2client.client import OAuth2Credentials
from six.moves import http_client from six.moves import http_client
@@ -46,7 +43,8 @@ except: # pragma: NO COVER
__author__ = 'jcgregorio@google.com (Joe Gregorio)' __author__ = 'jcgregorio@google.com (Joe Gregorio)'
FILENAME = tempfile.mktemp('oauth2client_test.data') _filehandle, FILENAME = tempfile.mkstemp('oauth2client_test.data')
os.close(_filehandle)
class OAuth2ClientFileTests(unittest.TestCase): class OAuth2ClientFileTests(unittest.TestCase):
@@ -63,7 +61,7 @@ class OAuth2ClientFileTests(unittest.TestCase):
except OSError: except OSError:
pass pass
def create_test_credentials(self, client_id='some_client_id', def _create_test_credentials(self, client_id='some_client_id',
expiration=None): expiration=None):
access_token = 'foo' access_token = 'foo'
client_secret = 'cOuDdkfjxxnv+' client_secret = 'cOuDdkfjxxnv+'
@@ -98,7 +96,7 @@ class OAuth2ClientFileTests(unittest.TestCase):
def test_pickle_and_json_interop(self): def test_pickle_and_json_interop(self):
# Write a file with a pickled OAuth2Credentials. # Write a file with a pickled OAuth2Credentials.
credentials = self.create_test_credentials() credentials = self._create_test_credentials()
f = open(FILENAME, 'wb') f = open(FILENAME, 'wb')
pickle.dump(credentials, f) pickle.dump(credentials, f)
@@ -122,7 +120,7 @@ class OAuth2ClientFileTests(unittest.TestCase):
def test_token_refresh_store_expired(self): def test_token_refresh_store_expired(self):
expiration = (datetime.datetime.utcnow() - expiration = (datetime.datetime.utcnow() -
datetime.timedelta(minutes=15)) datetime.timedelta(minutes=15))
credentials = self.create_test_credentials(expiration=expiration) credentials = self._create_test_credentials(expiration=expiration)
s = file.Storage(FILENAME) s = file.Storage(FILENAME)
s.put(credentials) s.put(credentials)
@@ -145,7 +143,7 @@ class OAuth2ClientFileTests(unittest.TestCase):
# from the store expires before the original request succeeds. # from the store expires before the original request succeeds.
expiration = (datetime.datetime.utcnow() + expiration = (datetime.datetime.utcnow() +
datetime.timedelta(minutes=15)) datetime.timedelta(minutes=15))
credentials = self.create_test_credentials(expiration=expiration) credentials = self._create_test_credentials(expiration=expiration)
s = file.Storage(FILENAME) s = file.Storage(FILENAME)
s.put(credentials) s.put(credentials)
@@ -174,7 +172,7 @@ class OAuth2ClientFileTests(unittest.TestCase):
def test_token_refresh_good_store(self): def test_token_refresh_good_store(self):
expiration = (datetime.datetime.utcnow() + expiration = (datetime.datetime.utcnow() +
datetime.timedelta(minutes=15)) datetime.timedelta(minutes=15))
credentials = self.create_test_credentials(expiration=expiration) credentials = self._create_test_credentials(expiration=expiration)
s = file.Storage(FILENAME) s = file.Storage(FILENAME)
s.put(credentials) s.put(credentials)
@@ -189,7 +187,7 @@ class OAuth2ClientFileTests(unittest.TestCase):
def test_token_refresh_stream_body(self): def test_token_refresh_stream_body(self):
expiration = (datetime.datetime.utcnow() + expiration = (datetime.datetime.utcnow() +
datetime.timedelta(minutes=15)) datetime.timedelta(minutes=15))
credentials = self.create_test_credentials(expiration=expiration) credentials = self._create_test_credentials(expiration=expiration)
s = file.Storage(FILENAME) s = file.Storage(FILENAME)
s.put(credentials) s.put(credentials)
@@ -219,7 +217,7 @@ class OAuth2ClientFileTests(unittest.TestCase):
self.assertEqual(credentials.access_token, valid_access_token) self.assertEqual(credentials.access_token, valid_access_token)
def test_credentials_delete(self): def test_credentials_delete(self):
credentials = self.create_test_credentials() credentials = self._create_test_credentials()
s = file.Storage(FILENAME) s = file.Storage(FILENAME)
s.put(credentials) s.put(credentials)
@@ -241,178 +239,11 @@ class OAuth2ClientFileTests(unittest.TestCase):
self.assertNotEquals(None, credentials) self.assertNotEquals(None, credentials)
self.assertEquals('foo', credentials.access_token) self.assertEquals('foo', credentials.access_token)
self.assertTrue(os.path.exists(FILENAME))
if os.name == 'posix':
mode = os.stat(FILENAME).st_mode mode = os.stat(FILENAME).st_mode
self.assertEquals('0o600', oct(stat.S_IMODE(mode)))
if os.name == 'posix':
self.assertEquals('0o600',
oct(stat.S_IMODE(os.stat(FILENAME).st_mode)))
def test_read_only_file_fail_lock(self):
credentials = self.create_test_credentials()
open(FILENAME, 'a+b').close()
os.chmod(FILENAME, 0o400)
store = multistore_file.get_credential_storage(
FILENAME,
credentials.client_id,
credentials.user_agent,
['some-scope', 'some-other-scope'])
store.put(credentials)
if os.name == 'posix':
self.assertTrue(store._multistore._read_only)
os.chmod(FILENAME, 0o600)
def test_multistore_no_symbolic_link_files(self):
if hasattr(os, 'symlink'):
SYMFILENAME = FILENAME + 'sym'
os.symlink(FILENAME, SYMFILENAME)
store = multistore_file.get_credential_storage(
SYMFILENAME,
'some_client_id',
'user-agent/1.0',
['some-scope', 'some-other-scope'])
try:
store.get()
self.fail('Should have raised an exception.')
except locked_file.CredentialsFileSymbolicLinkError:
pass
finally:
os.unlink(SYMFILENAME)
def test_multistore_non_existent_file(self):
store = multistore_file.get_credential_storage(
FILENAME,
'some_client_id',
'user-agent/1.0',
['some-scope', 'some-other-scope'])
credentials = store.get()
self.assertEquals(None, credentials)
def test_multistore_file(self):
credentials = self.create_test_credentials()
store = multistore_file.get_credential_storage(
FILENAME,
credentials.client_id,
credentials.user_agent,
['some-scope', 'some-other-scope'])
store.put(credentials)
credentials = store.get()
self.assertNotEquals(None, credentials)
self.assertEquals('foo', credentials.access_token)
store.delete()
credentials = store.get()
self.assertEquals(None, credentials)
if os.name == 'posix':
self.assertEquals('0o600',
oct(stat.S_IMODE(os.stat(FILENAME).st_mode)))
def test_multistore_file_custom_key(self):
credentials = self.create_test_credentials()
custom_key = {'myapp': 'testing', 'clientid': 'some client'}
store = multistore_file.get_credential_storage_custom_key(
FILENAME, custom_key)
store.put(credentials)
stored_credentials = store.get()
self.assertNotEquals(None, stored_credentials)
self.assertEqual(credentials.access_token,
stored_credentials.access_token)
store.delete()
stored_credentials = store.get()
self.assertEquals(None, stored_credentials)
def test_multistore_file_custom_string_key(self):
credentials = self.create_test_credentials()
# store with string key
store = multistore_file.get_credential_storage_custom_string_key(
FILENAME, 'mykey')
store.put(credentials)
stored_credentials = store.get()
self.assertNotEquals(None, stored_credentials)
self.assertEqual(credentials.access_token,
stored_credentials.access_token)
# try retrieving with a dictionary
store_dict = multistore_file.get_credential_storage_custom_string_key(
FILENAME, {'key': 'mykey'})
stored_credentials = store.get()
self.assertNotEquals(None, stored_credentials)
self.assertEqual(credentials.access_token,
stored_credentials.access_token)
store.delete()
stored_credentials = store.get()
self.assertEquals(None, stored_credentials)
def test_multistore_file_backwards_compatibility(self):
credentials = self.create_test_credentials()
scopes = ['scope1', 'scope2']
# store the credentials using the legacy key method
store = multistore_file.get_credential_storage(
FILENAME, 'client_id', 'user_agent', scopes)
store.put(credentials)
# retrieve the credentials using a custom key that matches the
# legacy key
key = {'clientId': 'client_id', 'userAgent': 'user_agent',
'scope': util.scopes_to_string(scopes)}
store = multistore_file.get_credential_storage_custom_key(
FILENAME, key)
stored_credentials = store.get()
self.assertEqual(credentials.access_token,
stored_credentials.access_token)
def test_multistore_file_get_all_keys(self):
# start with no keys
keys = multistore_file.get_all_credential_keys(FILENAME)
self.assertEquals([], keys)
# store credentials
credentials = self.create_test_credentials(client_id='client1')
custom_key = {'myapp': 'testing', 'clientid': 'client1'}
store1 = multistore_file.get_credential_storage_custom_key(
FILENAME, custom_key)
store1.put(credentials)
keys = multistore_file.get_all_credential_keys(FILENAME)
self.assertEquals([custom_key], keys)
# store more credentials
credentials = self.create_test_credentials(client_id='client2')
string_key = 'string_key'
store2 = multistore_file.get_credential_storage_custom_string_key(
FILENAME, string_key)
store2.put(credentials)
keys = multistore_file.get_all_credential_keys(FILENAME)
self.assertEquals(2, len(keys))
self.assertTrue(custom_key in keys)
self.assertTrue({'key': string_key} in keys)
# back to no keys
store1.delete()
store2.delete()
keys = multistore_file.get_all_credential_keys(FILENAME)
self.assertEquals([], keys)
if __name__ == '__main__': # pragma: NO COVER if __name__ == '__main__': # pragma: NO COVER

View File

@@ -1,12 +1,20 @@
"""Unit tests for oauth2client.multistore_file.""" """Unit tests for oauth2client.multistore_file."""
import datetime
import errno import errno
import os import os
import stat
import tempfile import tempfile
import unittest import unittest
from oauth2client import util
from oauth2client.client import OAuth2Credentials
from oauth2client import locked_file
from oauth2client import multistore_file from oauth2client import multistore_file
_filehandle, FILENAME = tempfile.mkstemp('oauth2client_test.data')
os.close(_filehandle)
class _MockLockedFile(object): class _MockLockedFile(object):
@@ -28,6 +36,33 @@ class _MockLockedFile(object):
class MultistoreFileTests(unittest.TestCase): class MultistoreFileTests(unittest.TestCase):
def tearDown(self):
try:
os.unlink(FILENAME)
except OSError:
pass
def setUp(self):
try:
os.unlink(FILENAME)
except OSError:
pass
def _create_test_credentials(self, client_id='some_client_id',
expiration=None):
access_token = 'foo'
client_secret = 'cOuDdkfjxxnv+'
refresh_token = '1/0/a.df219fjls0'
token_expiry = expiration or datetime.datetime.utcnow()
token_uri = 'https://www.google.com/accounts/o8/oauth2/token'
user_agent = 'refresh_checker/1.0'
credentials = OAuth2Credentials(
access_token, client_id, client_secret,
refresh_token, token_expiry, token_uri,
user_agent)
return credentials
def test_lock_file_raises_ioerror(self): def test_lock_file_raises_ioerror(self):
filehandle, filename = tempfile.mkstemp() filehandle, filename = tempfile.mkstemp()
os.close(filehandle) os.close(filehandle)
@@ -41,3 +76,169 @@ class MultistoreFileTests(unittest.TestCase):
self.assertTrue(multistore._file.open_and_lock_called) self.assertTrue(multistore._file.open_and_lock_called)
finally: finally:
os.unlink(filename) os.unlink(filename)
def test_read_only_file_fail_lock(self):
credentials = self._create_test_credentials()
open(FILENAME, 'a+b').close()
os.chmod(FILENAME, 0o400)
store = multistore_file.get_credential_storage(
FILENAME,
credentials.client_id,
credentials.user_agent,
['some-scope', 'some-other-scope'])
store.put(credentials)
if os.name == 'posix':
self.assertTrue(store._multistore._read_only)
os.chmod(FILENAME, 0o600)
def test_multistore_no_symbolic_link_files(self):
if hasattr(os, 'symlink'):
SYMFILENAME = FILENAME + 'sym'
os.symlink(FILENAME, SYMFILENAME)
store = multistore_file.get_credential_storage(
SYMFILENAME,
'some_client_id',
'user-agent/1.0',
['some-scope', 'some-other-scope'])
try:
self.assertRaises(
locked_file.CredentialsFileSymbolicLinkError,
store.get)
finally:
os.unlink(SYMFILENAME)
def test_multistore_non_existent_file(self):
store = multistore_file.get_credential_storage(
FILENAME,
'some_client_id',
'user-agent/1.0',
['some-scope', 'some-other-scope'])
credentials = store.get()
self.assertEquals(None, credentials)
def test_multistore_file(self):
credentials = self._create_test_credentials()
store = multistore_file.get_credential_storage(
FILENAME,
credentials.client_id,
credentials.user_agent,
['some-scope', 'some-other-scope'])
store.put(credentials)
credentials = store.get()
self.assertNotEquals(None, credentials)
self.assertEquals('foo', credentials.access_token)
store.delete()
credentials = store.get()
self.assertEquals(None, credentials)
if os.name == 'posix':
self.assertEquals(
0o600, stat.S_IMODE(os.stat(FILENAME).st_mode))
def test_multistore_file_custom_key(self):
credentials = self._create_test_credentials()
custom_key = {'myapp': 'testing', 'clientid': 'some client'}
store = multistore_file.get_credential_storage_custom_key(
FILENAME, custom_key)
store.put(credentials)
stored_credentials = store.get()
self.assertNotEquals(None, stored_credentials)
self.assertEqual(credentials.access_token,
stored_credentials.access_token)
store.delete()
stored_credentials = store.get()
self.assertEquals(None, stored_credentials)
def test_multistore_file_custom_string_key(self):
credentials = self._create_test_credentials()
# store with string key
store = multistore_file.get_credential_storage_custom_string_key(
FILENAME, 'mykey')
store.put(credentials)
stored_credentials = store.get()
self.assertNotEquals(None, stored_credentials)
self.assertEqual(credentials.access_token,
stored_credentials.access_token)
# try retrieving with a dictionary
multistore_file.get_credential_storage_custom_string_key(
FILENAME, {'key': 'mykey'})
stored_credentials = store.get()
self.assertNotEquals(None, stored_credentials)
self.assertEqual(credentials.access_token,
stored_credentials.access_token)
store.delete()
stored_credentials = store.get()
self.assertEquals(None, stored_credentials)
def test_multistore_file_backwards_compatibility(self):
credentials = self._create_test_credentials()
scopes = ['scope1', 'scope2']
# store the credentials using the legacy key method
store = multistore_file.get_credential_storage(
FILENAME, 'client_id', 'user_agent', scopes)
store.put(credentials)
# retrieve the credentials using a custom key that matches the
# legacy key
key = {'clientId': 'client_id', 'userAgent': 'user_agent',
'scope': util.scopes_to_string(scopes)}
store = multistore_file.get_credential_storage_custom_key(
FILENAME, key)
stored_credentials = store.get()
self.assertEqual(credentials.access_token,
stored_credentials.access_token)
def test_multistore_file_get_all_keys(self):
# start with no keys
keys = multistore_file.get_all_credential_keys(FILENAME)
self.assertEquals([], keys)
# store credentials
credentials = self._create_test_credentials(client_id='client1')
custom_key = {'myapp': 'testing', 'clientid': 'client1'}
store1 = multistore_file.get_credential_storage_custom_key(
FILENAME, custom_key)
store1.put(credentials)
keys = multistore_file.get_all_credential_keys(FILENAME)
self.assertEquals([custom_key], keys)
# store more credentials
credentials = self._create_test_credentials(client_id='client2')
string_key = 'string_key'
store2 = multistore_file.get_credential_storage_custom_string_key(
FILENAME, string_key)
store2.put(credentials)
keys = multistore_file.get_all_credential_keys(FILENAME)
self.assertEquals(2, len(keys))
self.assertTrue(custom_key in keys)
self.assertTrue({'key': string_key} in keys)
# back to no keys
store1.delete()
store2.delete()
keys = multistore_file.get_all_credential_keys(FILENAME)
self.assertEquals([], keys)