Adding test coverage for client.py.
- `DeviceFlowInfo` (and using datetime now patch). - Corner cases for `step1_get_authorize_url` and `step2_exchange` - Failure cases for `flow_from_clientsecrets` - `verify_id_token` when the cached HTTP is used (i.e. the default argument)
This commit is contained in:
@@ -120,6 +120,7 @@ _DESIRED_METADATA_FLAVOR = 'Google'
|
|||||||
# Expose utcnow() at module level to allow for
|
# Expose utcnow() at module level to allow for
|
||||||
# easier testing (by replacing with a stub).
|
# easier testing (by replacing with a stub).
|
||||||
_UTCNOW = datetime.datetime.utcnow
|
_UTCNOW = datetime.datetime.utcnow
|
||||||
|
_NOW = datetime.datetime.now
|
||||||
|
|
||||||
|
|
||||||
class SETTINGS(object):
|
class SETTINGS(object):
|
||||||
@@ -1862,7 +1863,7 @@ class DeviceFlowInfo(collections.namedtuple('DeviceFlowInfo', (
|
|||||||
})
|
})
|
||||||
if 'expires_in' in response:
|
if 'expires_in' in response:
|
||||||
kwargs['user_code_expiry'] = (
|
kwargs['user_code_expiry'] = (
|
||||||
datetime.datetime.now() +
|
_NOW() +
|
||||||
datetime.timedelta(seconds=int(response['expires_in'])))
|
datetime.timedelta(seconds=int(response['expires_in'])))
|
||||||
return cls(**kwargs)
|
return cls(**kwargs)
|
||||||
|
|
||||||
@@ -2201,4 +2202,4 @@ def flow_from_clientsecrets(filename, scope, redirect_uri=None,
|
|||||||
raise
|
raise
|
||||||
else:
|
else:
|
||||||
raise UnknownClientSecretsFlowError(
|
raise UnknownClientSecretsFlowError(
|
||||||
'This OAuth 2.0 flow is unsupported: %r' % client_type)
|
'This OAuth 2.0 flow is unsupported: %r' % (client_type,))
|
||||||
|
|||||||
@@ -50,6 +50,7 @@ from oauth2client.client import AssertionCredentials
|
|||||||
from oauth2client.client import AUTHORIZED_USER
|
from oauth2client.client import AUTHORIZED_USER
|
||||||
from oauth2client.client import Credentials
|
from oauth2client.client import Credentials
|
||||||
from oauth2client.client import DEFAULT_ENV_NAME
|
from oauth2client.client import DEFAULT_ENV_NAME
|
||||||
|
from oauth2client.client import DeviceFlowInfo
|
||||||
from oauth2client.client import Error
|
from oauth2client.client import Error
|
||||||
from oauth2client.client import ApplicationDefaultCredentialsError
|
from oauth2client.client import ApplicationDefaultCredentialsError
|
||||||
from oauth2client.client import FlowExchangeError
|
from oauth2client.client import FlowExchangeError
|
||||||
@@ -80,6 +81,7 @@ from oauth2client.client import credentials_from_code
|
|||||||
from oauth2client.client import flow_from_clientsecrets
|
from oauth2client.client import flow_from_clientsecrets
|
||||||
from oauth2client.client import save_to_well_known_file
|
from oauth2client.client import save_to_well_known_file
|
||||||
from oauth2client.clientsecrets import _loadfile
|
from oauth2client.clientsecrets import _loadfile
|
||||||
|
from oauth2client.clientsecrets import InvalidClientSecretsError
|
||||||
from oauth2client.service_account import ServiceAccountCredentials
|
from oauth2client.service_account import ServiceAccountCredentials
|
||||||
from oauth2client._helpers import _to_bytes
|
from oauth2client._helpers import _to_bytes
|
||||||
|
|
||||||
@@ -1695,6 +1697,60 @@ class OAuth2WebServerFlowTest(unittest2.TestCase):
|
|||||||
self.assertEqual(OOB_CALLBACK_URN, q['redirect_uri'][0])
|
self.assertEqual(OOB_CALLBACK_URN, q['redirect_uri'][0])
|
||||||
self.assertEqual('online', q['access_type'][0])
|
self.assertEqual('online', q['access_type'][0])
|
||||||
|
|
||||||
|
@mock.patch('oauth2client.client.logger')
|
||||||
|
def test_step1_get_authorize_url_redirect_override(self, logger):
|
||||||
|
flow = OAuth2WebServerFlow('client_id+1', scope='foo',
|
||||||
|
redirect_uri=OOB_CALLBACK_URN)
|
||||||
|
alt_redirect = 'foo:bar'
|
||||||
|
self.assertEqual(flow.redirect_uri, OOB_CALLBACK_URN)
|
||||||
|
result = flow.step1_get_authorize_url(redirect_uri=alt_redirect)
|
||||||
|
# Make sure the redirect value was updated.
|
||||||
|
self.assertEqual(flow.redirect_uri, alt_redirect)
|
||||||
|
query_params = {
|
||||||
|
'client_id': flow.client_id,
|
||||||
|
'redirect_uri': alt_redirect,
|
||||||
|
'scope': flow.scope,
|
||||||
|
'access_type': 'offline',
|
||||||
|
'response_type': 'code',
|
||||||
|
}
|
||||||
|
expected = _update_query_params(flow.auth_uri, query_params)
|
||||||
|
assertUrisEqual(self, expected, result)
|
||||||
|
# Check stubs.
|
||||||
|
self.assertEqual(logger.warning.call_count, 1)
|
||||||
|
|
||||||
|
def test_step1_get_authorize_url_without_redirect(self):
|
||||||
|
flow = OAuth2WebServerFlow('client_id+1', scope='foo',
|
||||||
|
redirect_uri=None)
|
||||||
|
with self.assertRaises(ValueError):
|
||||||
|
flow.step1_get_authorize_url(redirect_uri=None)
|
||||||
|
|
||||||
|
def test_step1_get_authorize_url_without_login_hint(self):
|
||||||
|
login_hint = 'There are wascally wabbits nearby'
|
||||||
|
flow = OAuth2WebServerFlow('client_id+1', scope='foo',
|
||||||
|
redirect_uri=OOB_CALLBACK_URN,
|
||||||
|
login_hint=login_hint)
|
||||||
|
result = flow.step1_get_authorize_url()
|
||||||
|
query_params = {
|
||||||
|
'client_id': flow.client_id,
|
||||||
|
'login_hint': login_hint,
|
||||||
|
'redirect_uri': OOB_CALLBACK_URN,
|
||||||
|
'scope': flow.scope,
|
||||||
|
'access_type': 'offline',
|
||||||
|
'response_type': 'code',
|
||||||
|
}
|
||||||
|
expected = _update_query_params(flow.auth_uri, query_params)
|
||||||
|
assertUrisEqual(self, expected, result)
|
||||||
|
|
||||||
|
def test_step2_exchange_no_input(self):
|
||||||
|
flow = OAuth2WebServerFlow('client_id+1', scope='foo')
|
||||||
|
with self.assertRaises(ValueError):
|
||||||
|
flow.step2_exchange()
|
||||||
|
|
||||||
|
def test_step2_exchange_code_and_device_flow(self):
|
||||||
|
flow = OAuth2WebServerFlow('client_id+1', scope='foo')
|
||||||
|
with self.assertRaises(ValueError):
|
||||||
|
flow.step2_exchange(code='code', device_flow_info='dfi')
|
||||||
|
|
||||||
def test_scope_is_required(self):
|
def test_scope_is_required(self):
|
||||||
self.assertRaises(TypeError, OAuth2WebServerFlow, 'client_id+1')
|
self.assertRaises(TypeError, OAuth2WebServerFlow, 'client_id+1')
|
||||||
|
|
||||||
@@ -1910,6 +1966,43 @@ class FlowFromCachedClientsecrets(unittest2.TestCase):
|
|||||||
'some_secrets', '', redirect_uri='oob', cache=cache_mock)
|
'some_secrets', '', redirect_uri='oob', cache=cache_mock)
|
||||||
self.assertEqual('foo_client_secret', flow.client_secret)
|
self.assertEqual('foo_client_secret', flow.client_secret)
|
||||||
|
|
||||||
|
@mock.patch('oauth2client.clientsecrets.loadfile',
|
||||||
|
side_effect=InvalidClientSecretsError)
|
||||||
|
def test_flow_from_clientsecrets_invalid(self, loadfile_mock):
|
||||||
|
filename = object()
|
||||||
|
cache = object()
|
||||||
|
with self.assertRaises(InvalidClientSecretsError):
|
||||||
|
flow_from_clientsecrets(filename, None, cache=cache,
|
||||||
|
message=None)
|
||||||
|
loadfile_mock.assert_called_once_with(filename, cache=cache)
|
||||||
|
|
||||||
|
@mock.patch('oauth2client.clientsecrets.loadfile',
|
||||||
|
side_effect=InvalidClientSecretsError)
|
||||||
|
@mock.patch('sys.exit')
|
||||||
|
def test_flow_from_clientsecrets_invalid_w_msg(self, sys_exit,
|
||||||
|
loadfile_mock):
|
||||||
|
filename = object()
|
||||||
|
cache = object()
|
||||||
|
message = 'hi mom'
|
||||||
|
|
||||||
|
flow_from_clientsecrets(filename, None, cache=cache, message=message)
|
||||||
|
sys_exit.assert_called_once_with(message)
|
||||||
|
loadfile_mock.assert_called_once_with(filename, cache=cache)
|
||||||
|
|
||||||
|
@mock.patch('oauth2client.clientsecrets.loadfile')
|
||||||
|
def test_flow_from_clientsecrets_unknown_flow(self, loadfile_mock):
|
||||||
|
client_type = 'UNKNOWN'
|
||||||
|
loadfile_mock.return_value = client_type, None
|
||||||
|
filename = object()
|
||||||
|
cache = object()
|
||||||
|
|
||||||
|
err_msg = 'This OAuth 2.0 flow is unsupported: %r' % (client_type,)
|
||||||
|
with self.assertRaisesRegexp(client.UnknownClientSecretsFlowError,
|
||||||
|
err_msg):
|
||||||
|
flow_from_clientsecrets(filename, None, cache=cache)
|
||||||
|
|
||||||
|
loadfile_mock.assert_called_once_with(filename, cache=cache)
|
||||||
|
|
||||||
|
|
||||||
class CredentialsFromCodeTests(unittest2.TestCase):
|
class CredentialsFromCodeTests(unittest2.TestCase):
|
||||||
|
|
||||||
@@ -2061,5 +2154,60 @@ class Test__require_crypto_or_die(unittest2.TestCase):
|
|||||||
client._require_crypto_or_die()
|
client._require_crypto_or_die()
|
||||||
|
|
||||||
|
|
||||||
|
class TestDeviceFlowInfo(unittest2.TestCase):
|
||||||
|
|
||||||
|
DEVICE_CODE = 'e80ff179-fd65-416c-9dbf-56a23e5d23e4'
|
||||||
|
USER_CODE = '4bbd8b82-fc73-11e5-adf3-00c2c63e5792'
|
||||||
|
VER_URL = 'http://foo.bar'
|
||||||
|
|
||||||
|
def test_FromResponse(self):
|
||||||
|
response = {
|
||||||
|
'device_code': self.DEVICE_CODE,
|
||||||
|
'user_code': self.USER_CODE,
|
||||||
|
'verification_url': self.VER_URL,
|
||||||
|
}
|
||||||
|
result = DeviceFlowInfo.FromResponse(response)
|
||||||
|
expected_result = DeviceFlowInfo(self.DEVICE_CODE, self.USER_CODE,
|
||||||
|
None, self.VER_URL, None)
|
||||||
|
self.assertEqual(result, expected_result)
|
||||||
|
|
||||||
|
def test_FromResponse_fallback_to_uri(self):
|
||||||
|
response = {
|
||||||
|
'device_code': self.DEVICE_CODE,
|
||||||
|
'user_code': self.USER_CODE,
|
||||||
|
'verification_uri': self.VER_URL,
|
||||||
|
}
|
||||||
|
result = DeviceFlowInfo.FromResponse(response)
|
||||||
|
expected_result = DeviceFlowInfo(self.DEVICE_CODE, self.USER_CODE,
|
||||||
|
None, self.VER_URL, None)
|
||||||
|
self.assertEqual(result, expected_result)
|
||||||
|
|
||||||
|
def test_FromResponse_missing_url(self):
|
||||||
|
response = {
|
||||||
|
'device_code': self.DEVICE_CODE,
|
||||||
|
'user_code': self.USER_CODE,
|
||||||
|
}
|
||||||
|
with self.assertRaises(client.OAuth2DeviceCodeError):
|
||||||
|
DeviceFlowInfo.FromResponse(response)
|
||||||
|
|
||||||
|
@mock.patch('oauth2client.client._NOW')
|
||||||
|
def test_FromResponse_with_expires_in(self, dt_now):
|
||||||
|
expires_in = 23
|
||||||
|
response = {
|
||||||
|
'device_code': self.DEVICE_CODE,
|
||||||
|
'user_code': self.USER_CODE,
|
||||||
|
'verification_url': self.VER_URL,
|
||||||
|
'expires_in': expires_in,
|
||||||
|
}
|
||||||
|
now = datetime.datetime(1999, 1, 1, 12, 30, 27)
|
||||||
|
expire = datetime.datetime(1999, 1, 1, 12, 30, 27 + expires_in)
|
||||||
|
dt_now.return_value = now
|
||||||
|
|
||||||
|
result = DeviceFlowInfo.FromResponse(response)
|
||||||
|
expected_result = DeviceFlowInfo(self.DEVICE_CODE, self.USER_CODE,
|
||||||
|
None, self.VER_URL, expire)
|
||||||
|
self.assertEqual(result, expected_result)
|
||||||
|
|
||||||
|
|
||||||
if __name__ == '__main__': # pragma: NO COVER
|
if __name__ == '__main__': # pragma: NO COVER
|
||||||
unittest2.main()
|
unittest2.main()
|
||||||
|
|||||||
@@ -17,6 +17,8 @@
|
|||||||
import os
|
import os
|
||||||
import tempfile
|
import tempfile
|
||||||
import time
|
import time
|
||||||
|
|
||||||
|
import mock
|
||||||
import unittest2
|
import unittest2
|
||||||
|
|
||||||
from .http_mock import HttpMockSequence
|
from .http_mock import HttpMockSequence
|
||||||
@@ -129,6 +131,20 @@ class CryptTests(unittest2.TestCase):
|
|||||||
self.assertEqual('billy bob', contents['user'])
|
self.assertEqual('billy bob', contents['user'])
|
||||||
self.assertEqual('data', contents['metadata']['meta'])
|
self.assertEqual('data', contents['metadata']['meta'])
|
||||||
|
|
||||||
|
def test_verify_id_token_with_certs_uri_default_http(self):
|
||||||
|
jwt = self._create_signed_jwt()
|
||||||
|
|
||||||
|
http = HttpMockSequence([
|
||||||
|
({'status': '200'}, datafile('certs.json')),
|
||||||
|
])
|
||||||
|
|
||||||
|
with mock.patch('oauth2client.client._cached_http', new=http):
|
||||||
|
contents = verify_id_token(
|
||||||
|
jwt, 'some_audience_address@testing.gserviceaccount.com')
|
||||||
|
|
||||||
|
self.assertEqual('billy bob', contents['user'])
|
||||||
|
self.assertEqual('data', contents['metadata']['meta'])
|
||||||
|
|
||||||
def test_verify_id_token_with_certs_uri_fails(self):
|
def test_verify_id_token_with_certs_uri_fails(self):
|
||||||
jwt = self._create_signed_jwt()
|
jwt = self._create_signed_jwt()
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user