Py3 oauth tests
Remove the last exclusion. Revert py3 tests to use same case as py2 tests. Change-Id: I651a7163ac8a20258e55d8a118245d658a5af3ed
This commit is contained in:
parent
68473b26ee
commit
ee0a294ec3
|
@ -34,6 +34,11 @@ from keystone.tests.unit.ksfixtures import temporaryfile
|
||||||
from keystone.tests.unit import test_v3
|
from keystone.tests.unit import test_v3
|
||||||
|
|
||||||
|
|
||||||
|
def _urllib_parse_qs_text_keys(content):
|
||||||
|
results = urllib.parse.parse_qs(content)
|
||||||
|
return {key.decode('utf-8'): value for key, value in results.items()}
|
||||||
|
|
||||||
|
|
||||||
class OAuth1ContribTests(test_v3.RestfulTestCase):
|
class OAuth1ContribTests(test_v3.RestfulTestCase):
|
||||||
|
|
||||||
@mock.patch.object(versionutils, 'report_deprecated_feature')
|
@mock.patch.object(versionutils, 'report_deprecated_feature')
|
||||||
|
@ -101,6 +106,8 @@ class OAuth1Tests(test_v3.RestfulTestCase):
|
||||||
return endpoint, headers, ref
|
return endpoint, headers, ref
|
||||||
|
|
||||||
def _authorize_request_token(self, request_id):
|
def _authorize_request_token(self, request_id):
|
||||||
|
if isinstance(request_id, bytes):
|
||||||
|
request_id = request_id.decode()
|
||||||
return '/OS-OAUTH1/authorize/%s' % (request_id)
|
return '/OS-OAUTH1/authorize/%s' % (request_id)
|
||||||
|
|
||||||
|
|
||||||
|
@ -262,7 +269,7 @@ class OAuthFlowTests(OAuth1Tests):
|
||||||
content = self.post(
|
content = self.post(
|
||||||
url, headers=headers,
|
url, headers=headers,
|
||||||
response_content_type='application/x-www-urlformencoded')
|
response_content_type='application/x-www-urlformencoded')
|
||||||
credentials = urllib.parse.parse_qs(content.result)
|
credentials = _urllib_parse_qs_text_keys(content.result)
|
||||||
request_key = credentials['oauth_token'][0]
|
request_key = credentials['oauth_token'][0]
|
||||||
request_secret = credentials['oauth_token_secret'][0]
|
request_secret = credentials['oauth_token_secret'][0]
|
||||||
self.request_token = oauth1.Token(request_key, request_secret)
|
self.request_token = oauth1.Token(request_key, request_secret)
|
||||||
|
@ -281,7 +288,7 @@ class OAuthFlowTests(OAuth1Tests):
|
||||||
content = self.post(
|
content = self.post(
|
||||||
url, headers=headers,
|
url, headers=headers,
|
||||||
response_content_type='application/x-www-urlformencoded')
|
response_content_type='application/x-www-urlformencoded')
|
||||||
credentials = urllib.parse.parse_qs(content.result)
|
credentials = _urllib_parse_qs_text_keys(content.result)
|
||||||
access_key = credentials['oauth_token'][0]
|
access_key = credentials['oauth_token'][0]
|
||||||
access_secret = credentials['oauth_token_secret'][0]
|
access_secret = credentials['oauth_token_secret'][0]
|
||||||
self.access_token = oauth1.Token(access_key, access_secret)
|
self.access_token = oauth1.Token(access_key, access_secret)
|
||||||
|
@ -311,13 +318,15 @@ class AccessTokenCRUDTests(OAuthFlowTests):
|
||||||
|
|
||||||
def test_get_single_access_token(self):
|
def test_get_single_access_token(self):
|
||||||
self.test_oauth_flow()
|
self.test_oauth_flow()
|
||||||
|
access_token_key_string = self.access_token.key.decode()
|
||||||
|
|
||||||
url = '/users/%(user_id)s/OS-OAUTH1/access_tokens/%(key)s' % {
|
url = '/users/%(user_id)s/OS-OAUTH1/access_tokens/%(key)s' % {
|
||||||
'user_id': self.user_id,
|
'user_id': self.user_id,
|
||||||
'key': self.access_token.key
|
'key': access_token_key_string
|
||||||
}
|
}
|
||||||
resp = self.get(url)
|
resp = self.get(url)
|
||||||
entity = resp.result['access_token']
|
entity = resp.result['access_token']
|
||||||
self.assertEqual(self.access_token.key, entity['id'])
|
self.assertEqual(access_token_key_string, entity['id'])
|
||||||
self.assertEqual(self.consumer['key'], entity['consumer_id'])
|
self.assertEqual(self.consumer['key'], entity['consumer_id'])
|
||||||
self.assertEqual('http://localhost/v3' + url, entity['links']['self'])
|
self.assertEqual('http://localhost/v3' + url, entity['links']['self'])
|
||||||
|
|
||||||
|
@ -331,15 +340,17 @@ class AccessTokenCRUDTests(OAuthFlowTests):
|
||||||
self.test_oauth_flow()
|
self.test_oauth_flow()
|
||||||
resp = self.get('/users/%(id)s/OS-OAUTH1/access_tokens/%(key)s/roles'
|
resp = self.get('/users/%(id)s/OS-OAUTH1/access_tokens/%(key)s/roles'
|
||||||
% {'id': self.user_id,
|
% {'id': self.user_id,
|
||||||
'key': self.access_token.key})
|
'key': self.access_token.key.decode()})
|
||||||
entities = resp.result['roles']
|
entities = resp.result['roles']
|
||||||
self.assertTrue(entities)
|
self.assertTrue(entities)
|
||||||
self.assertValidListLinks(resp.result['links'])
|
self.assertValidListLinks(resp.result['links'])
|
||||||
|
|
||||||
def test_get_role_in_access_token(self):
|
def test_get_role_in_access_token(self):
|
||||||
self.test_oauth_flow()
|
self.test_oauth_flow()
|
||||||
|
|
||||||
|
access_token_key = self.access_token.key.decode()
|
||||||
url = ('/users/%(id)s/OS-OAUTH1/access_tokens/%(key)s/roles/%(role)s'
|
url = ('/users/%(id)s/OS-OAUTH1/access_tokens/%(key)s/roles/%(role)s'
|
||||||
% {'id': self.user_id, 'key': self.access_token.key,
|
% {'id': self.user_id, 'key': access_token_key,
|
||||||
'role': self.role_id})
|
'role': self.role_id})
|
||||||
resp = self.get(url)
|
resp = self.get(url)
|
||||||
entity = resp.result['role']
|
entity = resp.result['role']
|
||||||
|
@ -347,8 +358,10 @@ class AccessTokenCRUDTests(OAuthFlowTests):
|
||||||
|
|
||||||
def test_get_role_in_access_token_dne(self):
|
def test_get_role_in_access_token_dne(self):
|
||||||
self.test_oauth_flow()
|
self.test_oauth_flow()
|
||||||
|
|
||||||
|
access_token_key = self.access_token.key.decode()
|
||||||
url = ('/users/%(id)s/OS-OAUTH1/access_tokens/%(key)s/roles/%(role)s'
|
url = ('/users/%(id)s/OS-OAUTH1/access_tokens/%(key)s/roles/%(role)s'
|
||||||
% {'id': self.user_id, 'key': self.access_token.key,
|
% {'id': self.user_id, 'key': access_token_key,
|
||||||
'role': uuid.uuid4().hex})
|
'role': uuid.uuid4().hex})
|
||||||
self.get(url, expected_status=http_client.NOT_FOUND)
|
self.get(url, expected_status=http_client.NOT_FOUND)
|
||||||
|
|
||||||
|
@ -361,10 +374,11 @@ class AccessTokenCRUDTests(OAuthFlowTests):
|
||||||
self.assertTrue(entities)
|
self.assertTrue(entities)
|
||||||
self.assertValidListLinks(resp.result['links'])
|
self.assertValidListLinks(resp.result['links'])
|
||||||
|
|
||||||
|
access_token_key = self.access_token.key.decode()
|
||||||
# Delete access_token
|
# Delete access_token
|
||||||
resp = self.delete('/users/%(user)s/OS-OAUTH1/access_tokens/%(auth)s'
|
resp = self.delete('/users/%(user)s/OS-OAUTH1/access_tokens/%(auth)s'
|
||||||
% {'user': self.user_id,
|
% {'user': self.user_id,
|
||||||
'auth': self.access_token.key})
|
'auth': access_token_key})
|
||||||
self.assertResponseStatus(resp, http_client.NO_CONTENT)
|
self.assertResponseStatus(resp, http_client.NO_CONTENT)
|
||||||
|
|
||||||
# List access_token should be 0
|
# List access_token should be 0
|
||||||
|
@ -386,7 +400,7 @@ class AuthTokenTests(OAuthFlowTests):
|
||||||
|
|
||||||
# now verify the oauth section
|
# now verify the oauth section
|
||||||
oauth_section = r.result['token']['OS-OAUTH1']
|
oauth_section = r.result['token']['OS-OAUTH1']
|
||||||
self.assertEqual(self.access_token.key,
|
self.assertEqual(self.access_token.key.decode(),
|
||||||
oauth_section['access_token_id'])
|
oauth_section['access_token_id'])
|
||||||
self.assertEqual(self.consumer['key'], oauth_section['consumer_id'])
|
self.assertEqual(self.consumer['key'], oauth_section['consumer_id'])
|
||||||
|
|
||||||
|
@ -404,10 +418,11 @@ class AuthTokenTests(OAuthFlowTests):
|
||||||
def test_delete_access_token_also_revokes_token(self):
|
def test_delete_access_token_also_revokes_token(self):
|
||||||
self.test_oauth_flow()
|
self.test_oauth_flow()
|
||||||
|
|
||||||
|
access_token_key = self.access_token.key.decode()
|
||||||
# Delete access token
|
# Delete access token
|
||||||
resp = self.delete('/users/%(user)s/OS-OAUTH1/access_tokens/%(auth)s'
|
resp = self.delete('/users/%(user)s/OS-OAUTH1/access_tokens/%(auth)s'
|
||||||
% {'user': self.user_id,
|
% {'user': self.user_id,
|
||||||
'auth': self.access_token.key})
|
'auth': access_token_key})
|
||||||
self.assertResponseStatus(resp, http_client.NO_CONTENT)
|
self.assertResponseStatus(resp, http_client.NO_CONTENT)
|
||||||
|
|
||||||
# Check Keystone Token no longer exists
|
# Check Keystone Token no longer exists
|
||||||
|
@ -530,7 +545,7 @@ class AuthTokenTests(OAuthFlowTests):
|
||||||
content = self.post(
|
content = self.post(
|
||||||
url, headers=headers,
|
url, headers=headers,
|
||||||
response_content_type='application/x-www-urlformencoded')
|
response_content_type='application/x-www-urlformencoded')
|
||||||
credentials = urllib.parse.parse_qs(content.result)
|
credentials = _urllib_parse_qs_text_keys(content.result)
|
||||||
request_key = credentials['oauth_token'][0]
|
request_key = credentials['oauth_token'][0]
|
||||||
request_secret = credentials['oauth_token_secret'][0]
|
request_secret = credentials['oauth_token_secret'][0]
|
||||||
self.request_token = oauth1.Token(request_key, request_secret)
|
self.request_token = oauth1.Token(request_key, request_secret)
|
||||||
|
@ -658,7 +673,7 @@ class MaliciousOAuth1Tests(OAuth1Tests):
|
||||||
content = self.post(
|
content = self.post(
|
||||||
url, headers=headers,
|
url, headers=headers,
|
||||||
response_content_type='application/x-www-urlformencoded')
|
response_content_type='application/x-www-urlformencoded')
|
||||||
credentials = urllib.parse.parse_qs(content.result)
|
credentials = _urllib_parse_qs_text_keys(content.result)
|
||||||
request_key = credentials['oauth_token'][0]
|
request_key = credentials['oauth_token'][0]
|
||||||
request_secret = credentials['oauth_token_secret'][0]
|
request_secret = credentials['oauth_token_secret'][0]
|
||||||
request_token = oauth1.Token(request_key, request_secret)
|
request_token = oauth1.Token(request_key, request_secret)
|
||||||
|
@ -684,7 +699,7 @@ class MaliciousOAuth1Tests(OAuth1Tests):
|
||||||
content = self.post(
|
content = self.post(
|
||||||
url, headers=headers,
|
url, headers=headers,
|
||||||
response_content_type='application/x-www-urlformencoded')
|
response_content_type='application/x-www-urlformencoded')
|
||||||
credentials = urllib.parse.parse_qs(content.result)
|
credentials = _urllib_parse_qs_text_keys(content.result)
|
||||||
request_key = credentials['oauth_token'][0]
|
request_key = credentials['oauth_token'][0]
|
||||||
|
|
||||||
self.assignment_api.remove_role_from_user_and_project(
|
self.assignment_api.remove_role_from_user_and_project(
|
||||||
|
@ -708,7 +723,7 @@ class MaliciousOAuth1Tests(OAuth1Tests):
|
||||||
content = self.post(
|
content = self.post(
|
||||||
url, headers=headers,
|
url, headers=headers,
|
||||||
response_content_type='application/x-www-urlformencoded')
|
response_content_type='application/x-www-urlformencoded')
|
||||||
credentials = urllib.parse.parse_qs(content.result)
|
credentials = _urllib_parse_qs_text_keys(content.result)
|
||||||
request_key = credentials['oauth_token'][0]
|
request_key = credentials['oauth_token'][0]
|
||||||
request_secret = credentials['oauth_token_secret'][0]
|
request_secret = credentials['oauth_token_secret'][0]
|
||||||
self.request_token = oauth1.Token(request_key, request_secret)
|
self.request_token = oauth1.Token(request_key, request_secret)
|
||||||
|
@ -731,7 +746,7 @@ class MaliciousOAuth1Tests(OAuth1Tests):
|
||||||
content = self.post(
|
content = self.post(
|
||||||
url, headers=headers,
|
url, headers=headers,
|
||||||
response_content_type='application/x-www-urlformencoded')
|
response_content_type='application/x-www-urlformencoded')
|
||||||
credentials = urllib.parse.parse_qs(content.result)
|
credentials = _urllib_parse_qs_text_keys(content.result)
|
||||||
request_key = credentials['oauth_token'][0]
|
request_key = credentials['oauth_token'][0]
|
||||||
request_secret = credentials['oauth_token_secret'][0]
|
request_secret = credentials['oauth_token_secret'][0]
|
||||||
self.request_token = oauth1.Token(request_key, request_secret)
|
self.request_token = oauth1.Token(request_key, request_secret)
|
||||||
|
@ -748,7 +763,7 @@ class MaliciousOAuth1Tests(OAuth1Tests):
|
||||||
content = self.post(
|
content = self.post(
|
||||||
url, headers=headers,
|
url, headers=headers,
|
||||||
response_content_type='application/x-www-urlformencoded')
|
response_content_type='application/x-www-urlformencoded')
|
||||||
credentials = urllib.parse.parse_qs(content.result)
|
credentials = _urllib_parse_qs_text_keys(content.result)
|
||||||
access_key = credentials['oauth_token'][0]
|
access_key = credentials['oauth_token'][0]
|
||||||
access_secret = credentials['oauth_token_secret'][0]
|
access_secret = credentials['oauth_token_secret'][0]
|
||||||
self.access_token = oauth1.Token(access_key, access_secret)
|
self.access_token = oauth1.Token(access_key, access_secret)
|
||||||
|
@ -832,17 +847,18 @@ class OAuthNotificationTests(OAuth1Tests,
|
||||||
content = self.post(
|
content = self.post(
|
||||||
url, headers=headers,
|
url, headers=headers,
|
||||||
response_content_type='application/x-www-urlformencoded')
|
response_content_type='application/x-www-urlformencoded')
|
||||||
credentials = urllib.parse.parse_qs(content.result)
|
credentials = _urllib_parse_qs_text_keys(content.result)
|
||||||
request_key = credentials['oauth_token'][0]
|
request_key = credentials['oauth_token'][0]
|
||||||
request_secret = credentials['oauth_token_secret'][0]
|
request_secret = credentials['oauth_token_secret'][0]
|
||||||
self.request_token = oauth1.Token(request_key, request_secret)
|
self.request_token = oauth1.Token(request_key, request_secret)
|
||||||
self.assertIsNotNone(self.request_token.key)
|
self.assertIsNotNone(self.request_token.key)
|
||||||
|
|
||||||
|
request_key_string = request_key.decode()
|
||||||
# Test to ensure the create request token notification is sent
|
# Test to ensure the create request token notification is sent
|
||||||
self._assert_notify_sent(request_key,
|
self._assert_notify_sent(request_key_string,
|
||||||
test_notifications.CREATED_OPERATION,
|
test_notifications.CREATED_OPERATION,
|
||||||
'OS-OAUTH1:request_token')
|
'OS-OAUTH1:request_token')
|
||||||
self._assert_last_audit(request_key,
|
self._assert_last_audit(request_key_string,
|
||||||
test_notifications.CREATED_OPERATION,
|
test_notifications.CREATED_OPERATION,
|
||||||
'OS-OAUTH1:request_token',
|
'OS-OAUTH1:request_token',
|
||||||
cadftaxonomy.SECURITY_CREDENTIAL)
|
cadftaxonomy.SECURITY_CREDENTIAL)
|
||||||
|
@ -860,31 +876,32 @@ class OAuthNotificationTests(OAuth1Tests,
|
||||||
content = self.post(
|
content = self.post(
|
||||||
url, headers=headers,
|
url, headers=headers,
|
||||||
response_content_type='application/x-www-urlformencoded')
|
response_content_type='application/x-www-urlformencoded')
|
||||||
credentials = urllib.parse.parse_qs(content.result)
|
credentials = _urllib_parse_qs_text_keys(content.result)
|
||||||
access_key = credentials['oauth_token'][0]
|
access_key = credentials['oauth_token'][0]
|
||||||
access_secret = credentials['oauth_token_secret'][0]
|
access_secret = credentials['oauth_token_secret'][0]
|
||||||
self.access_token = oauth1.Token(access_key, access_secret)
|
self.access_token = oauth1.Token(access_key, access_secret)
|
||||||
self.assertIsNotNone(self.access_token.key)
|
self.assertIsNotNone(self.access_token.key)
|
||||||
|
|
||||||
|
access_key_string = access_key.decode()
|
||||||
# Test to ensure the create access token notification is sent
|
# Test to ensure the create access token notification is sent
|
||||||
self._assert_notify_sent(access_key,
|
self._assert_notify_sent(access_key_string,
|
||||||
test_notifications.CREATED_OPERATION,
|
test_notifications.CREATED_OPERATION,
|
||||||
'OS-OAUTH1:access_token')
|
'OS-OAUTH1:access_token')
|
||||||
self._assert_last_audit(access_key,
|
self._assert_last_audit(access_key_string,
|
||||||
test_notifications.CREATED_OPERATION,
|
test_notifications.CREATED_OPERATION,
|
||||||
'OS-OAUTH1:access_token',
|
'OS-OAUTH1:access_token',
|
||||||
cadftaxonomy.SECURITY_CREDENTIAL)
|
cadftaxonomy.SECURITY_CREDENTIAL)
|
||||||
|
|
||||||
resp = self.delete('/users/%(user)s/OS-OAUTH1/access_tokens/%(auth)s'
|
resp = self.delete('/users/%(user)s/OS-OAUTH1/access_tokens/%(auth)s'
|
||||||
% {'user': self.user_id,
|
% {'user': self.user_id,
|
||||||
'auth': self.access_token.key})
|
'auth': self.access_token.key.decode()})
|
||||||
self.assertResponseStatus(resp, http_client.NO_CONTENT)
|
self.assertResponseStatus(resp, http_client.NO_CONTENT)
|
||||||
|
|
||||||
# Test to ensure the delete access token notification is sent
|
# Test to ensure the delete access token notification is sent
|
||||||
self._assert_notify_sent(access_key,
|
self._assert_notify_sent(access_key_string,
|
||||||
test_notifications.DELETED_OPERATION,
|
test_notifications.DELETED_OPERATION,
|
||||||
'OS-OAUTH1:access_token')
|
'OS-OAUTH1:access_token')
|
||||||
self._assert_last_audit(access_key,
|
self._assert_last_audit(access_key_string,
|
||||||
test_notifications.DELETED_OPERATION,
|
test_notifications.DELETED_OPERATION,
|
||||||
'OS-OAUTH1:access_token',
|
'OS-OAUTH1:access_token',
|
||||||
cadftaxonomy.SECURITY_CREDENTIAL)
|
cadftaxonomy.SECURITY_CREDENTIAL)
|
||||||
|
|
|
@ -1 +0,0 @@
|
||||||
keystone.tests.unit.test_v3_oauth1
|
|
|
@ -2,7 +2,7 @@
|
||||||
|
|
||||||
set -o pipefail
|
set -o pipefail
|
||||||
|
|
||||||
TESTRARGS=`python -c 'print ("^((?!%s).)*$" % "|".join(f.strip() for f in open("tests-py3-blacklist.txt")))'`
|
TESTRARGS=$1
|
||||||
python setup.py testr --testr-args="--subunit $TESTRARGS" | subunit-trace -f
|
python setup.py testr --testr-args="--subunit $TESTRARGS" | subunit-trace -f
|
||||||
retval=$?
|
retval=$?
|
||||||
# NOTE(mtreinish) The pipe above would eat the slowest display from pbr's testr
|
# NOTE(mtreinish) The pipe above would eat the slowest display from pbr's testr
|
||||||
|
|
Loading…
Reference in New Issue