Add token related API to Keystone v3-ext/OS-OAUTH1 client

Enhance oauth_consumers_client with the following API
/v3/OS-OAUTH1/request_token
/v3/OS-OAUTH1/authorize/{request_token_id}
/v3/OS-OAUTH1/access_token
/v3/users/{user_id}/OS-OAUTH1/access_tokens/{access_token_id}
/v3/users/{user_id}/OS-OAUTH1/access_tokens
/v3/users/{user_id}/OS-OAUTH1/access_tokens/{access_token_id}/roles
/v3/users/{user_id}/OS-OAUTH1/access_tokens/{access_token_id}/roles/{role_id}

https://developer.openstack.org/api-ref/identity/v3-ext/index.html?expanded=#os-oauth1-api

Co-Authored-By: Nishant Kumar <nk613n@att.com>
Closes-Bug: #1682425

Change-Id: I4c5369ae3ad7a7add630e3ac6a4fc52f854bc77c
This commit is contained in:
Hemanth Nakkina 2017-04-19 11:14:40 +05:30 committed by Hemanth Nakkina
parent f6288d7057
commit d9594f5119
6 changed files with 463 additions and 4 deletions

View File

@ -0,0 +1,3 @@
---
features:
- Add a new client to handle the OAUTH token feature from the identity API.

View File

@ -223,6 +223,7 @@ class BaseIdentityV3AdminTest(BaseIdentityV3Test):
cls.projects_client = cls.os_admin.projects_client
cls.role_assignments = cls.os_admin.role_assignments_client
cls.oauth_consumers_client = cls.os_admin.oauth_consumers_client
cls.oauth_token_client = cls.os_admin.oauth_token_client
cls.domain_config_client = cls.os_admin.domain_config_client
cls.endpoint_filter_client = cls.os_admin.endpoint_filter_client
cls.endpoint_groups_client = cls.os_admin.endpoint_groups_client

View File

@ -200,6 +200,8 @@ class Manager(clients.ServiceClients):
**params_v3)
self.oauth_consumers_client = self.identity_v3.OAUTHConsumerClient(
**params_v3)
self.oauth_token_client = self.identity_v3.OAUTHTokenClient(
**params_v3)
self.domain_config_client = self.identity_v3.DomainConfigurationClient(
**params_v3)
self.endpoint_filter_client = \

View File

@ -28,6 +28,8 @@ from tempest.lib.services.identity.v3.inherited_roles_client import \
InheritedRolesClient
from tempest.lib.services.identity.v3.oauth_consumers_client import \
OAUTHConsumerClient
from tempest.lib.services.identity.v3.oauth_token_client import \
OAUTHTokenClient
from tempest.lib.services.identity.v3.policies_client import PoliciesClient
from tempest.lib.services.identity.v3.projects_client import ProjectsClient
from tempest.lib.services.identity.v3.regions_client import RegionsClient
@ -43,7 +45,7 @@ from tempest.lib.services.identity.v3.versions_client import VersionsClient
__all__ = ['CredentialsClient', 'DomainsClient', 'DomainConfigurationClient',
'EndPointGroupsClient', 'EndPointsClient', 'EndPointsFilterClient',
'GroupsClient', 'IdentityClient', 'InheritedRolesClient',
'OAUTHConsumerClient', 'PoliciesClient', 'ProjectsClient',
'RegionsClient', 'RoleAssignmentsClient', 'RolesClient',
'ServicesClient', 'V3TokenClient', 'TrustsClient', 'UsersClient',
'VersionsClient']
'OAUTHConsumerClient', 'OAUTHTokenClient', 'PoliciesClient',
'ProjectsClient', 'RegionsClient', 'RoleAssignmentsClient',
'RolesClient', 'ServicesClient', 'V3TokenClient', 'TrustsClient',
'UsersClient', 'VersionsClient']

View File

@ -0,0 +1,236 @@
# Copyright 2017 AT&T Corporation.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import binascii
import hashlib
import hmac
import random
import time
import six
from six.moves.urllib import parse as urlparse
from oslo_serialization import jsonutils as json
from tempest.lib.common import rest_client
class OAUTHTokenClient(rest_client.RestClient):
api_version = "v3"
def _escape(self, s):
"""Escape a unicode string in an OAuth-compatible fashion."""
safe = b'~'
s = s.encode('utf-8') if isinstance(s, six.text_type) else s
s = urlparse.quote(s, safe)
if isinstance(s, six.binary_type):
s = s.decode('utf-8')
return s
def _generate_params_with_signature(self, client_key, uri,
client_secret=None,
resource_owner_key=None,
resource_owner_secret=None,
callback_uri=None,
verifier=None,
http_method='GET'):
"""Generate OAUTH params along with signature."""
timestamp = six.text_type(int(time.time()))
nonce = six.text_type(random.getrandbits(64)) + timestamp
oauth_params = [
('oauth_nonce', nonce),
('oauth_timestamp', timestamp),
('oauth_version', '1.0'),
('oauth_signature_method', 'HMAC-SHA1'),
('oauth_consumer_key', client_key),
]
if resource_owner_key:
oauth_params.append(('oauth_token', resource_owner_key))
if callback_uri:
oauth_params.append(('oauth_callback', callback_uri))
if verifier:
oauth_params.append(('oauth_verifier', verifier))
# normalize_params
key_values = [(self._escape(k), self._escape(v))
for k, v in oauth_params]
key_values.sort()
parameter_parts = ['{0}={1}'.format(k, v) for k, v in key_values]
normalized_params = '&'.join(parameter_parts)
# normalize_uri
scheme, netloc, path, params, query, fragment = urlparse.urlparse(uri)
scheme = scheme.lower()
netloc = netloc.lower()
normalized_uri = urlparse.urlunparse((scheme, netloc, path,
params, '', ''))
# construct base string
base_string = self._escape(http_method.upper())
base_string += '&'
base_string += self._escape(normalized_uri)
base_string += '&'
base_string += self._escape(normalized_params)
# sign using hmac-sha1
key = self._escape(client_secret or '')
key += '&'
key += self._escape(resource_owner_secret or '')
key_utf8 = key.encode('utf-8')
text_utf8 = base_string.encode('utf-8')
signature = hmac.new(key_utf8, text_utf8, hashlib.sha1)
sig = binascii.b2a_base64(signature.digest())[:-1].decode('utf-8')
oauth_params.append(('oauth_signature', sig))
return oauth_params
def _generate_oauth_header(self, oauth_params):
authorization_header = {}
authorization_header_parameters_parts = []
for oauth_parameter_name, value in oauth_params:
escaped_name = self._escape(oauth_parameter_name)
escaped_value = self._escape(value)
part = '{0}="{1}"'.format(escaped_name, escaped_value)
authorization_header_parameters_parts.append(part)
authorization_header_parameters = ', '.join(
authorization_header_parameters_parts)
oauth_string = 'OAuth %s' % authorization_header_parameters
authorization_header['Authorization'] = oauth_string
return authorization_header
def create_request_token(self, consumer_key, consumer_secret, project_id):
"""Create request token.
For more information, please refer to the official API reference:
http://developer.openstack.org/api-ref/identity/v3-ext/#create-request-token
"""
endpoint = 'OS-OAUTH1/request_token'
headers = {'Requested-Project-Id': project_id}
oauth_params = self._generate_params_with_signature(
consumer_key,
self.base_url + '/' + endpoint,
client_secret=consumer_secret,
callback_uri='oob',
http_method='POST')
oauth_header = self._generate_oauth_header(oauth_params)
headers.update(oauth_header)
resp, body = self.post(endpoint,
body=None,
headers=headers)
self.expected_success(201, resp.status)
if not isinstance(body, str):
body = body.decode('utf-8')
body = dict(item.split("=") for item in body.split("&"))
return rest_client.ResponseBody(resp, body)
def authorize_request_token(self, request_token_id, role_ids):
"""Authorize request token.
For more information, please refer to the official API reference:
http://developer.openstack.org/api-ref/identity/v3-ext/#authorize-request-token
"""
roles = [{'id': role_id} for role_id in role_ids]
body = {'roles': roles}
post_body = json.dumps(body)
resp, body = self.put("OS-OAUTH1/authorize/%s" % request_token_id,
post_body)
self.expected_success(200, resp.status)
body = json.loads(body)
return rest_client.ResponseBody(resp, body)
def create_access_token(self, consumer_key, consumer_secret, request_key,
request_secret, oauth_verifier):
"""Create access token.
For more information, please refer to the official API reference:
http://developer.openstack.org/api-ref/identity/v3-ext/#create-access-token
"""
endpoint = 'OS-OAUTH1/access_token'
oauth_params = self._generate_params_with_signature(
consumer_key,
self.base_url + '/' + endpoint,
client_secret=consumer_secret,
resource_owner_key=request_key,
resource_owner_secret=request_secret,
verifier=oauth_verifier,
http_method='POST')
headers = self._generate_oauth_header(oauth_params)
resp, body = self.post(endpoint, body=None, headers=headers)
self.expected_success(201, resp.status)
if not isinstance(body, str):
body = body.decode('utf-8')
body = dict(item.split("=") for item in body.split("&"))
return rest_client.ResponseBody(resp, body)
def get_access_token(self, user_id, access_token_id):
"""Get access token.
For more information, please refer to the official API reference:
http://developer.openstack.org/api-ref/identity/v3-ext/#get-access-token
"""
resp, body = self.get("users/%s/OS-OAUTH1/access_tokens/%s"
% (user_id, access_token_id))
self.expected_success(200, resp.status)
body = json.loads(body)
return rest_client.ResponseBody(resp, body)
def revoke_access_token(self, user_id, access_token_id):
"""Revoke access token.
For more information, please refer to the official API reference:
http://developer.openstack.org/api-ref/identity/v3-ext/#revoke-access-token
"""
resp, body = self.delete("users/%s/OS-OAUTH1/access_tokens/%s"
% (user_id, access_token_id))
self.expected_success(204, resp.status)
return rest_client.ResponseBody(resp, body)
def list_access_tokens(self, user_id):
"""List access tokens.
For more information, please refer to the official API reference:
http://developer.openstack.org/api-ref/identity/v3-ext/#list-access-tokens
"""
resp, body = self.get("users/%s/OS-OAUTH1/access_tokens"
% (user_id))
self.expected_success(200, resp.status)
body = json.loads(body)
return rest_client.ResponseBody(resp, body)
def list_access_token_roles(self, user_id, access_token_id):
"""List roles for an access token.
For more information, please refer to the official API reference:
http://developer.openstack.org/api-ref/identity/v3-ext/#list-roles-for-an-access-token
"""
resp, body = self.get("users/%s/OS-OAUTH1/access_tokens/%s/roles"
% (user_id, access_token_id))
self.expected_success(200, resp.status)
body = json.loads(body)
return rest_client.ResponseBody(resp, body)
def get_access_token_role(self, user_id, access_token_id, role_id):
"""Show role details for an access token.
For more information, please refer to the official API reference:
http://developer.openstack.org/api-ref/identity/v3-ext/#show-role-details-for-an-access-token
"""
resp, body = self.get("users/%s/OS-OAUTH1/access_tokens/%s/roles/%s"
% (user_id, access_token_id, role_id))
self.expected_success(200, resp.status)
body = json.loads(body)
return rest_client.ResponseBody(resp, body)

View File

@ -0,0 +1,215 @@
# Copyright 2017 AT&T Corporation.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslotest import mockpatch
from tempest.lib.services.identity.v3 import oauth_token_client
from tempest.tests.lib import fake_auth_provider
from tempest.tests.lib import fake_http
from tempest.tests.lib.services import base
class TestOAUTHTokenClient(base.BaseServiceTest):
FAKE_CREATE_REQUEST_TOKEN = {
'oauth_token': '29971f',
'oauth_token_secret': '238eb8',
'oauth_expires_at': '2013-09-11T06:07:51.501805Z'
}
FAKE_AUTHORIZE_REQUEST_TOKEN = {
'token': {
'oauth_verifier': '8171'
}
}
FAKE_CREATE_ACCESS_TOKEN = {
'oauth_token': 'accd36',
'oauth_token_secret': 'aa47da',
'oauth_expires_at': '2013-09-11T06:07:51.501805Z'
}
FAKE_ACCESS_TOKEN_INFO = {
'access_token': {
'consumer_id': '7fea2d',
'id': '6be26a',
'expires_at': '2013-09-11T06:07:51.501805Z',
'links': {
'roles': 'http://example.com/identity/v3/' +
'users/ce9e07/OS-OAUTH1/access_tokens/6be26a/roles',
'self': 'http://example.com/identity/v3/' +
'users/ce9e07/OS-OAUTH1/access_tokens/6be26a'
},
'project_id': 'b9fca3',
'authorizing_user_id': 'ce9e07'
}
}
FAKE_LIST_ACCESS_TOKENS = {
'access_tokens': [
{
'consumer_id': '7fea2d',
'id': '6be26a',
'expires_at': '2013-09-11T06:07:51.501805Z',
'links': {
'roles': 'http://example.com/identity/v3/' +
'users/ce9e07/OS-OAUTH1/access_tokens/' +
'6be26a/roles',
'self': 'http://example.com/identity/v3/' +
'users/ce9e07/OS-OAUTH1/access_tokens/6be26a'
},
'project_id': 'b9fca3',
'authorizing_user_id': 'ce9e07'
}
],
'links': {
'next': None,
'previous': None,
'self': 'http://example.com/identity/v3/' +
'users/ce9e07/OS-OAUTH1/access_tokens'
}
}
FAKE_LIST_ACCESS_TOKEN_ROLES = {
'roles': [
{
'id': '26b860',
'domain_id': 'fake_domain',
'links': {
'self': 'http://example.com/identity/v3/' +
'roles/26b860'
},
'name': 'fake_role'
}
],
'links': {
'next': None,
'previous': None,
'self': 'http://example.com/identity/v3/' +
'users/ce9e07/OS-OAUTH1/access_tokens/6be26a/roles'
}
}
FAKE_ACCESS_TOKEN_ROLE_INFO = {
'role': {
'id': '26b860',
'domain_id': 'fake_domain',
'links': {
'self': 'http://example.com/identity/v3/' +
'roles/26b860'
},
'name': 'fake_role'
}
}
def setUp(self):
super(TestOAUTHTokenClient, self).setUp()
fake_auth = fake_auth_provider.FakeAuthProvider()
self.client = oauth_token_client.OAUTHTokenClient(fake_auth,
'identity',
'regionOne')
def _mock_token_response(self, body):
temp_response = [key + '=' + value for key, value in body.items()]
return '&'.join(temp_response)
def _test_authorize_request_token(self, bytes_body=False):
self.check_service_client_function(
self.client.authorize_request_token,
'tempest.lib.common.rest_client.RestClient.put',
self.FAKE_AUTHORIZE_REQUEST_TOKEN,
bytes_body,
request_token_id=self.FAKE_CREATE_REQUEST_TOKEN['oauth_token'],
role_ids=['26b860'],
status=200)
def test_create_request_token(self):
mock_resp = self._mock_token_response(self.FAKE_CREATE_REQUEST_TOKEN)
resp = fake_http.fake_http_response(None, status=201), mock_resp
self.useFixture(mockpatch.Patch(
'tempest.lib.common.rest_client.RestClient.post',
return_value=resp))
resp = self.client.create_request_token(
consumer_key='12345',
consumer_secret='23456',
project_id='c8f58432c6f00162f04d3250f')
self.assertEqual(self.FAKE_CREATE_REQUEST_TOKEN, resp)
def test_authorize_token_request_with_str_body(self):
self._test_authorize_request_token()
def test_authorize_token_request_with_bytes_body(self):
self._test_authorize_request_token(bytes_body=True)
def test_create_access_token(self):
mock_resp = self._mock_token_response(self.FAKE_CREATE_ACCESS_TOKEN)
req_secret = self.FAKE_CREATE_REQUEST_TOKEN['oauth_token_secret']
resp = fake_http.fake_http_response(None, status=201), mock_resp
self.useFixture(mockpatch.Patch(
'tempest.lib.common.rest_client.RestClient.post',
return_value=resp))
resp = self.client.create_access_token(
consumer_key='12345',
consumer_secret='23456',
request_key=self.FAKE_CREATE_REQUEST_TOKEN['oauth_token'],
request_secret=req_secret,
oauth_verifier='8171')
self.assertEqual(self.FAKE_CREATE_ACCESS_TOKEN, resp)
def test_get_access_token(self):
self.check_service_client_function(
self.client.get_access_token,
'tempest.lib.common.rest_client.RestClient.get',
self.FAKE_ACCESS_TOKEN_INFO,
user_id='ce9e07',
access_token_id=self.FAKE_ACCESS_TOKEN_INFO['access_token']['id'],
status=200)
def test_list_access_tokens(self):
self.check_service_client_function(
self.client.list_access_tokens,
'tempest.lib.common.rest_client.RestClient.get',
self.FAKE_LIST_ACCESS_TOKENS,
user_id='ce9e07',
status=200)
def test_revoke_access_token(self):
self.check_service_client_function(
self.client.revoke_access_token,
'tempest.lib.common.rest_client.RestClient.delete',
{},
user_id=self.FAKE_ACCESS_TOKEN_INFO['access_token']['consumer_id'],
access_token_id=self.FAKE_ACCESS_TOKEN_INFO['access_token']['id'],
status=204)
def test_list_access_token_roles(self):
self.check_service_client_function(
self.client.list_access_token_roles,
'tempest.lib.common.rest_client.RestClient.get',
self.FAKE_LIST_ACCESS_TOKEN_ROLES,
user_id='ce9e07',
access_token_id=self.FAKE_ACCESS_TOKEN_INFO['access_token']['id'],
status=200)
def test_get_access_token_role(self):
self.check_service_client_function(
self.client.get_access_token_role,
'tempest.lib.common.rest_client.RestClient.get',
self.FAKE_ACCESS_TOKEN_ROLE_INFO,
user_id='ce9e07',
access_token_id=self.FAKE_ACCESS_TOKEN_INFO['access_token']['id'],
role_id=self.FAKE_ACCESS_TOKEN_ROLE_INFO['role']['id'],
status=200)