Merge pull request #486 from dhermes/cover-client-v7
Getting client.py to 100% coverage.
This commit is contained in:
@@ -120,6 +120,7 @@ _DESIRED_METADATA_FLAVOR = 'Google'
|
||||
# Expose utcnow() at module level to allow for
|
||||
# easier testing (by replacing with a stub).
|
||||
_UTCNOW = datetime.datetime.utcnow
|
||||
_NOW = datetime.datetime.now
|
||||
|
||||
|
||||
class SETTINGS(object):
|
||||
@@ -1862,7 +1863,7 @@ class DeviceFlowInfo(collections.namedtuple('DeviceFlowInfo', (
|
||||
})
|
||||
if 'expires_in' in response:
|
||||
kwargs['user_code_expiry'] = (
|
||||
datetime.datetime.now() +
|
||||
_NOW() +
|
||||
datetime.timedelta(seconds=int(response['expires_in'])))
|
||||
return cls(**kwargs)
|
||||
|
||||
@@ -2020,17 +2021,17 @@ class OAuth2WebServerFlow(Flow):
|
||||
if resp.status == http_client.OK:
|
||||
try:
|
||||
flow_info = json.loads(content)
|
||||
except ValueError as e:
|
||||
except ValueError as exc:
|
||||
raise OAuth2DeviceCodeError(
|
||||
'Could not parse server response as JSON: "%s", '
|
||||
'error: "%s"' % (content, e))
|
||||
'error: "%s"' % (content, exc))
|
||||
return DeviceFlowInfo.FromResponse(flow_info)
|
||||
else:
|
||||
error_msg = 'Invalid response %s.' % resp.status
|
||||
error_msg = 'Invalid response %s.' % (resp.status,)
|
||||
try:
|
||||
d = json.loads(content)
|
||||
if 'error' in d:
|
||||
error_msg += ' Error: %s' % d['error']
|
||||
error_dict = json.loads(content)
|
||||
if 'error' in error_dict:
|
||||
error_msg += ' Error: %s' % (error_dict['error'],)
|
||||
except ValueError:
|
||||
# Couldn't decode a JSON response, stick with the
|
||||
# default message.
|
||||
@@ -2201,4 +2202,4 @@ def flow_from_clientsecrets(filename, scope, redirect_uri=None,
|
||||
raise
|
||||
else:
|
||||
raise UnknownClientSecretsFlowError(
|
||||
'This OAuth 2.0 flow is unsupported: %r' % client_type)
|
||||
'This OAuth 2.0 flow is unsupported: %r' % (client_type,))
|
||||
|
||||
@@ -50,6 +50,7 @@ from oauth2client.client import AssertionCredentials
|
||||
from oauth2client.client import AUTHORIZED_USER
|
||||
from oauth2client.client import Credentials
|
||||
from oauth2client.client import DEFAULT_ENV_NAME
|
||||
from oauth2client.client import DeviceFlowInfo
|
||||
from oauth2client.client import Error
|
||||
from oauth2client.client import ApplicationDefaultCredentialsError
|
||||
from oauth2client.client import FlowExchangeError
|
||||
@@ -80,6 +81,8 @@ from oauth2client.client import credentials_from_code
|
||||
from oauth2client.client import flow_from_clientsecrets
|
||||
from oauth2client.client import save_to_well_known_file
|
||||
from oauth2client.clientsecrets import _loadfile
|
||||
from oauth2client.clientsecrets import InvalidClientSecretsError
|
||||
from oauth2client.clientsecrets import TYPE_WEB
|
||||
from oauth2client.service_account import ServiceAccountCredentials
|
||||
from oauth2client._helpers import _to_bytes
|
||||
|
||||
@@ -1695,6 +1698,153 @@ class OAuth2WebServerFlowTest(unittest2.TestCase):
|
||||
self.assertEqual(OOB_CALLBACK_URN, q['redirect_uri'][0])
|
||||
self.assertEqual('online', q['access_type'][0])
|
||||
|
||||
@mock.patch('oauth2client.client.logger')
|
||||
def test_step1_get_authorize_url_redirect_override(self, logger):
|
||||
flow = OAuth2WebServerFlow('client_id+1', scope='foo',
|
||||
redirect_uri=OOB_CALLBACK_URN)
|
||||
alt_redirect = 'foo:bar'
|
||||
self.assertEqual(flow.redirect_uri, OOB_CALLBACK_URN)
|
||||
result = flow.step1_get_authorize_url(redirect_uri=alt_redirect)
|
||||
# Make sure the redirect value was updated.
|
||||
self.assertEqual(flow.redirect_uri, alt_redirect)
|
||||
query_params = {
|
||||
'client_id': flow.client_id,
|
||||
'redirect_uri': alt_redirect,
|
||||
'scope': flow.scope,
|
||||
'access_type': 'offline',
|
||||
'response_type': 'code',
|
||||
}
|
||||
expected = _update_query_params(flow.auth_uri, query_params)
|
||||
assertUrisEqual(self, expected, result)
|
||||
# Check stubs.
|
||||
self.assertEqual(logger.warning.call_count, 1)
|
||||
|
||||
def test_step1_get_authorize_url_without_redirect(self):
|
||||
flow = OAuth2WebServerFlow('client_id+1', scope='foo',
|
||||
redirect_uri=None)
|
||||
with self.assertRaises(ValueError):
|
||||
flow.step1_get_authorize_url(redirect_uri=None)
|
||||
|
||||
def test_step1_get_authorize_url_without_login_hint(self):
|
||||
login_hint = 'There are wascally wabbits nearby'
|
||||
flow = OAuth2WebServerFlow('client_id+1', scope='foo',
|
||||
redirect_uri=OOB_CALLBACK_URN,
|
||||
login_hint=login_hint)
|
||||
result = flow.step1_get_authorize_url()
|
||||
query_params = {
|
||||
'client_id': flow.client_id,
|
||||
'login_hint': login_hint,
|
||||
'redirect_uri': OOB_CALLBACK_URN,
|
||||
'scope': flow.scope,
|
||||
'access_type': 'offline',
|
||||
'response_type': 'code',
|
||||
}
|
||||
expected = _update_query_params(flow.auth_uri, query_params)
|
||||
assertUrisEqual(self, expected, result)
|
||||
|
||||
def test_step1_get_device_and_user_codes_wo_device_uri(self):
|
||||
flow = OAuth2WebServerFlow('CID', scope='foo', device_uri=None)
|
||||
with self.assertRaises(ValueError):
|
||||
flow.step1_get_device_and_user_codes()
|
||||
|
||||
def _step1_get_device_and_user_codes_helper(
|
||||
self, extra_headers=None, user_agent=None, default_http=False,
|
||||
content=None):
|
||||
flow = OAuth2WebServerFlow('CID', scope='foo',
|
||||
user_agent=user_agent)
|
||||
device_code = 'bfc06756-062e-430f-9f0f-460ca44724e5'
|
||||
user_code = '5faf2780-fc83-11e5-9bc2-00c2c63e5792'
|
||||
ver_url = 'http://foo.bar'
|
||||
if content is None:
|
||||
content = json.dumps({
|
||||
'device_code': device_code,
|
||||
'user_code': user_code,
|
||||
'verification_url': ver_url,
|
||||
})
|
||||
http = HttpMockSequence([
|
||||
({'status': http_client.OK}, content),
|
||||
])
|
||||
if default_http:
|
||||
with mock.patch('httplib2.Http', return_value=http):
|
||||
result = flow.step1_get_device_and_user_codes()
|
||||
else:
|
||||
result = flow.step1_get_device_and_user_codes(http=http)
|
||||
|
||||
expected = DeviceFlowInfo(device_code, user_code,
|
||||
None, ver_url, None)
|
||||
self.assertEqual(result, expected)
|
||||
self.assertEqual(len(http.requests), 1)
|
||||
self.assertEqual(http.requests[0]['uri'], client.GOOGLE_DEVICE_URI)
|
||||
body = http.requests[0]['body']
|
||||
self.assertEqual(urllib.parse.parse_qs(body),
|
||||
{'client_id': [flow.client_id],
|
||||
'scope': [flow.scope]})
|
||||
headers = {'content-type': 'application/x-www-form-urlencoded'}
|
||||
if extra_headers is not None:
|
||||
headers.update(extra_headers)
|
||||
self.assertEqual(http.requests[0]['headers'], headers)
|
||||
|
||||
def test_step1_get_device_and_user_codes(self):
|
||||
self._step1_get_device_and_user_codes_helper()
|
||||
|
||||
def test_step1_get_device_and_user_codes_w_user_agent(self):
|
||||
user_agent = 'spiderman'
|
||||
extra_headers = {'user-agent': user_agent}
|
||||
self._step1_get_device_and_user_codes_helper(
|
||||
user_agent=user_agent, extra_headers=extra_headers)
|
||||
|
||||
def test_step1_get_device_and_user_codes_w_default_http(self):
|
||||
self._step1_get_device_and_user_codes_helper(default_http=True)
|
||||
|
||||
def test_step1_get_device_and_user_codes_bad_payload(self):
|
||||
non_json_content = b'{'
|
||||
with self.assertRaises(client.OAuth2DeviceCodeError):
|
||||
self._step1_get_device_and_user_codes_helper(
|
||||
content=non_json_content)
|
||||
|
||||
def _step1_get_device_and_user_codes_fail_helper(self, status,
|
||||
content, error_msg):
|
||||
flow = OAuth2WebServerFlow('CID', scope='foo')
|
||||
http = HttpMockSequence([
|
||||
({'status': status}, content),
|
||||
])
|
||||
with self.assertRaises(client.OAuth2DeviceCodeError) as exc_manager:
|
||||
flow.step1_get_device_and_user_codes(http=http)
|
||||
|
||||
self.assertEqual(exc_manager.exception.args, (error_msg,))
|
||||
|
||||
def test_step1_get_device_and_user_codes_non_json_failure(self):
|
||||
status = http_client.BAD_REQUEST
|
||||
content = 'Nope not JSON.'
|
||||
error_msg = 'Invalid response %s.' % (status,)
|
||||
self._step1_get_device_and_user_codes_fail_helper(status, content,
|
||||
error_msg)
|
||||
|
||||
def test_step1_get_device_and_user_codes_basic_failure(self):
|
||||
status = http_client.INTERNAL_SERVER_ERROR
|
||||
content = b'{}'
|
||||
error_msg = 'Invalid response %s.' % (status,)
|
||||
self._step1_get_device_and_user_codes_fail_helper(status, content,
|
||||
error_msg)
|
||||
|
||||
def test_step1_get_device_and_user_codes_failure_w_json_error(self):
|
||||
status = http_client.BAD_GATEWAY
|
||||
base_error = 'ZOMG user codes failure.'
|
||||
content = json.dumps({'error': base_error})
|
||||
error_msg = 'Invalid response %s. Error: %s' % (status, base_error)
|
||||
self._step1_get_device_and_user_codes_fail_helper(status, content,
|
||||
error_msg)
|
||||
|
||||
def test_step2_exchange_no_input(self):
|
||||
flow = OAuth2WebServerFlow('client_id+1', scope='foo')
|
||||
with self.assertRaises(ValueError):
|
||||
flow.step2_exchange()
|
||||
|
||||
def test_step2_exchange_code_and_device_flow(self):
|
||||
flow = OAuth2WebServerFlow('client_id+1', scope='foo')
|
||||
with self.assertRaises(ValueError):
|
||||
flow.step2_exchange(code='code', device_flow_info='dfi')
|
||||
|
||||
def test_scope_is_required(self):
|
||||
self.assertRaises(TypeError, OAuth2WebServerFlow, 'client_id+1')
|
||||
|
||||
@@ -1704,7 +1854,7 @@ class OAuth2WebServerFlowTest(unittest2.TestCase):
|
||||
])
|
||||
|
||||
with self.assertRaises(FlowExchangeError):
|
||||
credentials = self.flow.step2_exchange('some random code',
|
||||
credentials = self.flow.step2_exchange(code='some random code',
|
||||
http=http)
|
||||
|
||||
def test_urlencoded_exchange_failure(self):
|
||||
@@ -1714,7 +1864,7 @@ class OAuth2WebServerFlowTest(unittest2.TestCase):
|
||||
|
||||
with self.assertRaisesRegexp(FlowExchangeError,
|
||||
'invalid_request'):
|
||||
credentials = self.flow.step2_exchange('some random code',
|
||||
credentials = self.flow.step2_exchange(code='some random code',
|
||||
http=http)
|
||||
|
||||
def test_exchange_failure_with_json_error(self):
|
||||
@@ -1731,23 +1881,32 @@ class OAuth2WebServerFlowTest(unittest2.TestCase):
|
||||
http = HttpMockSequence([({'status': '400'}, payload)])
|
||||
|
||||
with self.assertRaises(FlowExchangeError):
|
||||
credentials = self.flow.step2_exchange('some random code',
|
||||
credentials = self.flow.step2_exchange(code='some random code',
|
||||
http=http)
|
||||
|
||||
def test_exchange_success(self):
|
||||
def _exchange_success_test_helper(self, code=None, device_flow_info=None):
|
||||
payload = (b'{'
|
||||
b' "access_token":"SlAV32hkKG",'
|
||||
b' "expires_in":3600,'
|
||||
b' "refresh_token":"8xLOxBtZp8"'
|
||||
b'}')
|
||||
http = HttpMockSequence([({'status': '200'}, payload)])
|
||||
credentials = self.flow.step2_exchange('some random code', http=http)
|
||||
credentials = self.flow.step2_exchange(
|
||||
code=code, device_flow_info=device_flow_info, http=http)
|
||||
self.assertEqual('SlAV32hkKG', credentials.access_token)
|
||||
self.assertNotEqual(None, credentials.token_expiry)
|
||||
self.assertEqual('8xLOxBtZp8', credentials.refresh_token)
|
||||
self.assertEqual('dummy_revoke_uri', credentials.revoke_uri)
|
||||
self.assertEqual(set(['foo']), credentials.scopes)
|
||||
|
||||
def test_exchange_success(self):
|
||||
self._exchange_success_test_helper(code='some random code')
|
||||
|
||||
def test_exchange_success_with_device_flow_info(self):
|
||||
device_flow_info = DeviceFlowInfo('some random code', None,
|
||||
None, None, None)
|
||||
self._exchange_success_test_helper(device_flow_info=device_flow_info)
|
||||
|
||||
def test_exchange_success_binary_code(self):
|
||||
binary_code = b'some random code'
|
||||
access_token = 'SlAV32hkKG'
|
||||
@@ -1761,7 +1920,7 @@ class OAuth2WebServerFlowTest(unittest2.TestCase):
|
||||
' "refresh_token":"' + refresh_token + '"'
|
||||
'}')
|
||||
http = HttpMockSequence([({'status': '200'}, _to_bytes(payload))])
|
||||
credentials = self.flow.step2_exchange(binary_code, http=http)
|
||||
credentials = self.flow.step2_exchange(code=binary_code, http=http)
|
||||
self.assertEqual(access_token, credentials.access_token)
|
||||
self.assertIsNotNone(credentials.token_expiry)
|
||||
self.assertEqual(refresh_token, credentials.refresh_token)
|
||||
@@ -1788,7 +1947,7 @@ class OAuth2WebServerFlowTest(unittest2.TestCase):
|
||||
b'}')
|
||||
http = HttpMockSequence([({'status': '200'}, payload)])
|
||||
|
||||
credentials = self.flow.step2_exchange(not_a_dict, http=http)
|
||||
credentials = self.flow.step2_exchange(code=not_a_dict, http=http)
|
||||
self.assertEqual('SlAV32hkKG', credentials.access_token)
|
||||
self.assertNotEqual(None, credentials.token_expiry)
|
||||
self.assertEqual('8xLOxBtZp8', credentials.refresh_token)
|
||||
@@ -1812,7 +1971,7 @@ class OAuth2WebServerFlowTest(unittest2.TestCase):
|
||||
({'status': '200'}, b'access_token=SlAV32hkKG'),
|
||||
])
|
||||
|
||||
credentials = flow.step2_exchange('some random code', http=http)
|
||||
credentials = flow.step2_exchange(code='some random code', http=http)
|
||||
self.assertEqual('SlAV32hkKG', credentials.access_token)
|
||||
|
||||
test_request = http.requests[0]
|
||||
@@ -1826,7 +1985,8 @@ class OAuth2WebServerFlowTest(unittest2.TestCase):
|
||||
({'status': '200'}, b'access_token=SlAV32hkKG&expires_in=3600'),
|
||||
])
|
||||
|
||||
credentials = self.flow.step2_exchange('some random code', http=http)
|
||||
credentials = self.flow.step2_exchange(code='some random code',
|
||||
http=http)
|
||||
self.assertEqual('SlAV32hkKG', credentials.access_token)
|
||||
self.assertNotEqual(None, credentials.token_expiry)
|
||||
|
||||
@@ -1837,7 +1997,8 @@ class OAuth2WebServerFlowTest(unittest2.TestCase):
|
||||
({'status': '200'}, b'access_token=SlAV32hkKG&expires=3600'),
|
||||
])
|
||||
|
||||
credentials = self.flow.step2_exchange('some random code', http=http)
|
||||
credentials = self.flow.step2_exchange(code='some random code',
|
||||
http=http)
|
||||
self.assertNotEqual(None, credentials.token_expiry)
|
||||
|
||||
def test_exchange_no_expires_in(self):
|
||||
@@ -1847,7 +2008,8 @@ class OAuth2WebServerFlowTest(unittest2.TestCase):
|
||||
b'}')
|
||||
http = HttpMockSequence([({'status': '200'}, payload)])
|
||||
|
||||
credentials = self.flow.step2_exchange('some random code', http=http)
|
||||
credentials = self.flow.step2_exchange(code='some random code',
|
||||
http=http)
|
||||
self.assertEqual(None, credentials.token_expiry)
|
||||
|
||||
def test_urlencoded_exchange_no_expires_in(self):
|
||||
@@ -1857,7 +2019,8 @@ class OAuth2WebServerFlowTest(unittest2.TestCase):
|
||||
({'status': '200'}, b'access_token=SlAV32hkKG'),
|
||||
])
|
||||
|
||||
credentials = self.flow.step2_exchange('some random code', http=http)
|
||||
credentials = self.flow.step2_exchange(code='some random code',
|
||||
http=http)
|
||||
self.assertEqual(None, credentials.token_expiry)
|
||||
|
||||
def test_exchange_fails_if_no_code(self):
|
||||
@@ -1870,7 +2033,7 @@ class OAuth2WebServerFlowTest(unittest2.TestCase):
|
||||
code = {'error': 'thou shall not pass'}
|
||||
with self.assertRaisesRegexp(FlowExchangeError,
|
||||
'shall not pass'):
|
||||
credentials = self.flow.step2_exchange(code, http=http)
|
||||
credentials = self.flow.step2_exchange(code=code, http=http)
|
||||
|
||||
def test_exchange_id_token_fail(self):
|
||||
payload = (b'{'
|
||||
@@ -1881,7 +2044,7 @@ class OAuth2WebServerFlowTest(unittest2.TestCase):
|
||||
http = HttpMockSequence([({'status': '200'}, payload)])
|
||||
|
||||
self.assertRaises(VerifyJwtTokenError, self.flow.step2_exchange,
|
||||
'some random code', http=http)
|
||||
code='some random code', http=http)
|
||||
|
||||
def test_exchange_id_token(self):
|
||||
body = {'foo': 'bar'}
|
||||
@@ -1896,7 +2059,8 @@ class OAuth2WebServerFlowTest(unittest2.TestCase):
|
||||
b' "id_token": "' + jwt + b'"'
|
||||
b'}')
|
||||
http = HttpMockSequence([({'status': '200'}, payload)])
|
||||
credentials = self.flow.step2_exchange('some random code', http=http)
|
||||
credentials = self.flow.step2_exchange(code='some random code',
|
||||
http=http)
|
||||
self.assertEqual(credentials.id_token, body)
|
||||
|
||||
|
||||
@@ -1910,6 +2074,82 @@ class FlowFromCachedClientsecrets(unittest2.TestCase):
|
||||
'some_secrets', '', redirect_uri='oob', cache=cache_mock)
|
||||
self.assertEqual('foo_client_secret', flow.client_secret)
|
||||
|
||||
@mock.patch('oauth2client.clientsecrets.loadfile')
|
||||
def _flow_from_clientsecrets_success_helper(self, loadfile_mock,
|
||||
device_uri=None,
|
||||
revoke_uri=None):
|
||||
client_type = TYPE_WEB
|
||||
client_info = {
|
||||
'auth_uri': 'auth_uri',
|
||||
'token_uri': 'token_uri',
|
||||
'client_id': 'client_id',
|
||||
'client_secret': 'client_secret',
|
||||
}
|
||||
if revoke_uri is not None:
|
||||
client_info['revoke_uri'] = revoke_uri
|
||||
loadfile_mock.return_value = client_type, client_info
|
||||
filename = object()
|
||||
scope = ['baz']
|
||||
cache = object()
|
||||
|
||||
if device_uri is not None:
|
||||
result = flow_from_clientsecrets(filename, scope, cache=cache,
|
||||
device_uri=device_uri)
|
||||
self.assertEqual(result.device_uri, device_uri)
|
||||
else:
|
||||
result = flow_from_clientsecrets(filename, scope, cache=cache)
|
||||
|
||||
self.assertIsInstance(result, OAuth2WebServerFlow)
|
||||
loadfile_mock.assert_called_once_with(filename, cache=cache)
|
||||
|
||||
def test_flow_from_clientsecrets_success(self):
|
||||
self._flow_from_clientsecrets_success_helper()
|
||||
|
||||
def test_flow_from_clientsecrets_success_w_device_uri(self):
|
||||
device_uri = 'http://device.uri'
|
||||
self._flow_from_clientsecrets_success_helper(device_uri=device_uri)
|
||||
|
||||
def test_flow_from_clientsecrets_success_w_revoke_uri(self):
|
||||
revoke_uri = 'http://revoke.uri'
|
||||
self._flow_from_clientsecrets_success_helper(revoke_uri=revoke_uri)
|
||||
|
||||
@mock.patch('oauth2client.clientsecrets.loadfile',
|
||||
side_effect=InvalidClientSecretsError)
|
||||
def test_flow_from_clientsecrets_invalid(self, loadfile_mock):
|
||||
filename = object()
|
||||
cache = object()
|
||||
with self.assertRaises(InvalidClientSecretsError):
|
||||
flow_from_clientsecrets(filename, None, cache=cache,
|
||||
message=None)
|
||||
loadfile_mock.assert_called_once_with(filename, cache=cache)
|
||||
|
||||
@mock.patch('oauth2client.clientsecrets.loadfile',
|
||||
side_effect=InvalidClientSecretsError)
|
||||
@mock.patch('sys.exit')
|
||||
def test_flow_from_clientsecrets_invalid_w_msg(self, sys_exit,
|
||||
loadfile_mock):
|
||||
filename = object()
|
||||
cache = object()
|
||||
message = 'hi mom'
|
||||
|
||||
flow_from_clientsecrets(filename, None, cache=cache, message=message)
|
||||
sys_exit.assert_called_once_with(message)
|
||||
loadfile_mock.assert_called_once_with(filename, cache=cache)
|
||||
|
||||
@mock.patch('oauth2client.clientsecrets.loadfile')
|
||||
def test_flow_from_clientsecrets_unknown_flow(self, loadfile_mock):
|
||||
client_type = 'UNKNOWN'
|
||||
loadfile_mock.return_value = client_type, None
|
||||
filename = object()
|
||||
cache = object()
|
||||
|
||||
err_msg = 'This OAuth 2.0 flow is unsupported: %r' % (client_type,)
|
||||
with self.assertRaisesRegexp(client.UnknownClientSecretsFlowError,
|
||||
err_msg):
|
||||
flow_from_clientsecrets(filename, None, cache=cache)
|
||||
|
||||
loadfile_mock.assert_called_once_with(filename, cache=cache)
|
||||
|
||||
|
||||
class CredentialsFromCodeTests(unittest2.TestCase):
|
||||
|
||||
@@ -2061,5 +2301,60 @@ class Test__require_crypto_or_die(unittest2.TestCase):
|
||||
client._require_crypto_or_die()
|
||||
|
||||
|
||||
class TestDeviceFlowInfo(unittest2.TestCase):
|
||||
|
||||
DEVICE_CODE = 'e80ff179-fd65-416c-9dbf-56a23e5d23e4'
|
||||
USER_CODE = '4bbd8b82-fc73-11e5-adf3-00c2c63e5792'
|
||||
VER_URL = 'http://foo.bar'
|
||||
|
||||
def test_FromResponse(self):
|
||||
response = {
|
||||
'device_code': self.DEVICE_CODE,
|
||||
'user_code': self.USER_CODE,
|
||||
'verification_url': self.VER_URL,
|
||||
}
|
||||
result = DeviceFlowInfo.FromResponse(response)
|
||||
expected_result = DeviceFlowInfo(self.DEVICE_CODE, self.USER_CODE,
|
||||
None, self.VER_URL, None)
|
||||
self.assertEqual(result, expected_result)
|
||||
|
||||
def test_FromResponse_fallback_to_uri(self):
|
||||
response = {
|
||||
'device_code': self.DEVICE_CODE,
|
||||
'user_code': self.USER_CODE,
|
||||
'verification_uri': self.VER_URL,
|
||||
}
|
||||
result = DeviceFlowInfo.FromResponse(response)
|
||||
expected_result = DeviceFlowInfo(self.DEVICE_CODE, self.USER_CODE,
|
||||
None, self.VER_URL, None)
|
||||
self.assertEqual(result, expected_result)
|
||||
|
||||
def test_FromResponse_missing_url(self):
|
||||
response = {
|
||||
'device_code': self.DEVICE_CODE,
|
||||
'user_code': self.USER_CODE,
|
||||
}
|
||||
with self.assertRaises(client.OAuth2DeviceCodeError):
|
||||
DeviceFlowInfo.FromResponse(response)
|
||||
|
||||
@mock.patch('oauth2client.client._NOW')
|
||||
def test_FromResponse_with_expires_in(self, dt_now):
|
||||
expires_in = 23
|
||||
response = {
|
||||
'device_code': self.DEVICE_CODE,
|
||||
'user_code': self.USER_CODE,
|
||||
'verification_url': self.VER_URL,
|
||||
'expires_in': expires_in,
|
||||
}
|
||||
now = datetime.datetime(1999, 1, 1, 12, 30, 27)
|
||||
expire = datetime.datetime(1999, 1, 1, 12, 30, 27 + expires_in)
|
||||
dt_now.return_value = now
|
||||
|
||||
result = DeviceFlowInfo.FromResponse(response)
|
||||
expected_result = DeviceFlowInfo(self.DEVICE_CODE, self.USER_CODE,
|
||||
None, self.VER_URL, expire)
|
||||
self.assertEqual(result, expected_result)
|
||||
|
||||
|
||||
if __name__ == '__main__': # pragma: NO COVER
|
||||
unittest2.main()
|
||||
|
||||
@@ -17,6 +17,8 @@
|
||||
import os
|
||||
import tempfile
|
||||
import time
|
||||
|
||||
import mock
|
||||
import unittest2
|
||||
|
||||
from .http_mock import HttpMockSequence
|
||||
@@ -129,6 +131,20 @@ class CryptTests(unittest2.TestCase):
|
||||
self.assertEqual('billy bob', contents['user'])
|
||||
self.assertEqual('data', contents['metadata']['meta'])
|
||||
|
||||
def test_verify_id_token_with_certs_uri_default_http(self):
|
||||
jwt = self._create_signed_jwt()
|
||||
|
||||
http = HttpMockSequence([
|
||||
({'status': '200'}, datafile('certs.json')),
|
||||
])
|
||||
|
||||
with mock.patch('oauth2client.client._cached_http', new=http):
|
||||
contents = verify_id_token(
|
||||
jwt, 'some_audience_address@testing.gserviceaccount.com')
|
||||
|
||||
self.assertEqual('billy bob', contents['user'])
|
||||
self.assertEqual('data', contents['metadata']['meta'])
|
||||
|
||||
def test_verify_id_token_with_certs_uri_fails(self):
|
||||
jwt = self._create_signed_jwt()
|
||||
|
||||
|
||||
Reference in New Issue
Block a user