Use webtest for v2 and v3 API testing.
The intention is to remain as close as possible to the original implementation and so leaves a number of easy cleanups and optimisations until a later patch. In writing tests their are a number of changes for API tests that are a result of webob/webtest restrictions: * response.body is now the string body and response.result is the parsed dictionary. * response.status is now a string eg. '200 OK', use response.status_code to get the integer * response.getheader no longer exists. response.headers is a dictionary like object that can be accessed (case independently) with [] or .get() Working towards: blueprint extract-eventlet Change-Id: I393b4bad2fd6eacc0b8ae98fc204d1323014b5e4
This commit is contained in:
parent
fe57d6de5b
commit
09a5c436c5
|
@ -31,10 +31,10 @@ class V2CatalogTestCase(test_content_types.RestfulTestCase):
|
|||
|
||||
def _get_token_id(self, r):
|
||||
"""Applicable only to JSON."""
|
||||
return r.body['access']['token']['id']
|
||||
return r.result['access']['token']['id']
|
||||
|
||||
def assertValidErrorResponse(self, response):
|
||||
self.assertEqual(response.status, 400)
|
||||
self.assertEqual(response.status_code, 400)
|
||||
|
||||
def _endpoint_create(self, expected_status=200, missing_param=None):
|
||||
path = '/v2.0/endpoints'
|
||||
|
@ -56,20 +56,20 @@ class V2CatalogTestCase(test_content_types.RestfulTestCase):
|
|||
|
||||
def test_endpoint_create(self):
|
||||
req_body, response = self._endpoint_create(expected_status=200)
|
||||
self.assertTrue('endpoint' in response.body)
|
||||
self.assertTrue('id' in response.body['endpoint'])
|
||||
self.assertTrue('endpoint' in response.result)
|
||||
self.assertTrue('id' in response.result['endpoint'])
|
||||
for field, value in req_body['endpoint'].iteritems():
|
||||
self.assertEqual(response.body['endpoint'][field], value)
|
||||
self.assertEqual(response.result['endpoint'][field], value)
|
||||
|
||||
def test_endpoint_create_with_missing_adminurl(self):
|
||||
req_body, response = self._endpoint_create(expected_status=200,
|
||||
missing_param='adminurl')
|
||||
self.assertEqual(response.status, 200)
|
||||
self.assertEqual(response.status_code, 200)
|
||||
|
||||
def test_endpoint_create_with_missing_internalurl(self):
|
||||
req_body, response = self._endpoint_create(expected_status=200,
|
||||
missing_param='internalurl')
|
||||
self.assertEqual(response.status, 200)
|
||||
self.assertEqual(response.status_code, 200)
|
||||
|
||||
def test_endpoint_create_with_missing_publicurl(self):
|
||||
req_body, response = self._endpoint_create(expected_status=400,
|
||||
|
|
|
@ -14,11 +14,12 @@
|
|||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import httplib
|
||||
import io
|
||||
import uuid
|
||||
|
||||
from lxml import etree
|
||||
import nose.exc
|
||||
import webtest
|
||||
|
||||
from keystone.common import serializer
|
||||
from keystone.openstack.common import jsonutils
|
||||
|
@ -63,8 +64,10 @@ class RestfulTestCase(test.TestCase):
|
|||
self.load_backends()
|
||||
self.load_fixtures(default_fixtures)
|
||||
|
||||
self.public_server = self.serveapp('keystone', name='main')
|
||||
self.admin_server = self.serveapp('keystone', name='admin')
|
||||
self.public_app = webtest.TestApp(
|
||||
self.loadapp('keystone', name='main'))
|
||||
self.admin_app = webtest.TestApp(
|
||||
self.loadapp('keystone', name='admin'))
|
||||
|
||||
# TODO(termie): is_admin is being deprecated once the policy stuff
|
||||
# is all working
|
||||
|
@ -77,40 +80,31 @@ class RestfulTestCase(test.TestCase):
|
|||
|
||||
def tearDown(self):
|
||||
"""Kill running servers and release references to avoid leaks."""
|
||||
self.public_server.kill()
|
||||
self.admin_server.kill()
|
||||
self.public_server = None
|
||||
self.admin_server = None
|
||||
self.public_app = None
|
||||
self.admin_app = None
|
||||
super(RestfulTestCase, self).tearDown()
|
||||
|
||||
def request(self, host='0.0.0.0', port=80, method='GET', path='/',
|
||||
headers=None, body=None, expected_status=None):
|
||||
"""Perform request and fetch httplib.HTTPResponse from the server."""
|
||||
|
||||
# Initialize headers dictionary
|
||||
headers = {} if not headers else headers
|
||||
|
||||
connection = httplib.HTTPConnection(host, port, timeout=100000)
|
||||
|
||||
# Perform the request
|
||||
connection.request(method, path, body, headers)
|
||||
|
||||
# Retrieve the response so we can close the connection
|
||||
response = connection.getresponse()
|
||||
|
||||
response.body = response.read()
|
||||
|
||||
# Close the connection
|
||||
connection.close()
|
||||
|
||||
# Automatically assert HTTP status code
|
||||
if expected_status:
|
||||
self.assertResponseStatus(response, expected_status)
|
||||
def request(self, app, path, body=None, headers=None, token=None,
|
||||
expected_status=None, **kwargs):
|
||||
if headers:
|
||||
headers = dict([(str(k), str(v)) for k, v in headers.iteritems()])
|
||||
else:
|
||||
self.assertResponseSuccessful(response)
|
||||
self.assertValidResponseHeaders(response)
|
||||
headers = {}
|
||||
|
||||
if token:
|
||||
headers['X-Auth-Token'] = str(token)
|
||||
|
||||
# setting body this way because of:
|
||||
# https://github.com/Pylons/webtest/issues/71
|
||||
if body:
|
||||
kwargs['body_file'] = io.BytesIO(body)
|
||||
|
||||
# sets environ['REMOTE_ADDR']
|
||||
kwargs.setdefault('remote_addr', 'localhost')
|
||||
|
||||
response = app.request(path, headers=headers,
|
||||
status=expected_status, **kwargs)
|
||||
|
||||
# Contains the response headers, body, etc
|
||||
return response
|
||||
|
||||
def assertResponseSuccessful(self, response):
|
||||
|
@ -124,7 +118,7 @@ class RestfulTestCase(test.TestCase):
|
|||
self.assertResponseSuccessful(response, 203)
|
||||
"""
|
||||
self.assertTrue(
|
||||
response.status >= 200 and response.status <= 299,
|
||||
response.status_code >= 200 and response.status_code <= 299,
|
||||
'Status code %d is outside of the expected range (2xx)\n\n%s' %
|
||||
(response.status, response.body))
|
||||
|
||||
|
@ -139,14 +133,14 @@ class RestfulTestCase(test.TestCase):
|
|||
self.assertResponseStatus(response, 203)
|
||||
"""
|
||||
self.assertEqual(
|
||||
response.status,
|
||||
response.status_code,
|
||||
expected_status,
|
||||
'Status code %s is not %s, as expected)\n\n%s' %
|
||||
(response.status, expected_status, response.body))
|
||||
(response.status_code, expected_status, response.body))
|
||||
|
||||
def assertValidResponseHeaders(self, response):
|
||||
"""Ensures that response headers appear as expected."""
|
||||
self.assertIn('X-Auth-Token', response.getheader('Vary'))
|
||||
self.assertIn('X-Auth-Token', response.headers.get('Vary'))
|
||||
|
||||
def _to_content_type(self, body, headers, content_type=None):
|
||||
"""Attempt to encode JSON and XML automatically."""
|
||||
|
@ -167,21 +161,18 @@ class RestfulTestCase(test.TestCase):
|
|||
"""Attempt to decode JSON and XML automatically, if detected."""
|
||||
content_type = content_type or self.content_type
|
||||
|
||||
# make the original response body available, for convenience
|
||||
response.raw = response.body
|
||||
|
||||
if response.body is not None and response.body.strip():
|
||||
# if a body is provided, a Content-Type is also expected
|
||||
header = response.getheader('Content-Type', None)
|
||||
header = response.headers.get('Content-Type', None)
|
||||
self.assertIn(content_type, header)
|
||||
|
||||
if content_type == 'json':
|
||||
response.body = jsonutils.loads(response.body)
|
||||
response.result = jsonutils.loads(response.body)
|
||||
elif content_type == 'xml':
|
||||
response.body = etree.fromstring(response.body)
|
||||
response.result = etree.fromstring(response.body)
|
||||
|
||||
def restful_request(self, method='GET', headers=None, body=None,
|
||||
token=None, content_type=None, **kwargs):
|
||||
content_type=None, **kwargs):
|
||||
"""Serializes/deserializes json/xml as request/response body.
|
||||
|
||||
.. WARNING::
|
||||
|
@ -193,9 +184,6 @@ class RestfulTestCase(test.TestCase):
|
|||
# Initialize headers dictionary
|
||||
headers = {} if not headers else headers
|
||||
|
||||
if token is not None:
|
||||
headers['X-Auth-Token'] = token
|
||||
|
||||
body = self._to_content_type(body, headers, content_type)
|
||||
|
||||
# Perform the HTTP request/response
|
||||
|
@ -205,32 +193,26 @@ class RestfulTestCase(test.TestCase):
|
|||
self._from_content_type(response, content_type)
|
||||
|
||||
# we can save some code & improve coverage by always doing this
|
||||
if method != 'HEAD' and response.status >= 400:
|
||||
if method != 'HEAD' and response.status_code >= 400:
|
||||
self.assertValidErrorResponse(response)
|
||||
|
||||
# Contains the decoded response.body
|
||||
return response
|
||||
|
||||
def _get_port(self, server):
|
||||
return server.socket_info['socket'][1]
|
||||
def _request(self, convert=True, **kwargs):
|
||||
if convert:
|
||||
response = self.restful_request(**kwargs)
|
||||
else:
|
||||
response = self.request(**kwargs)
|
||||
|
||||
def _public_port(self):
|
||||
return self._get_port(self.public_server)
|
||||
|
||||
def _admin_port(self):
|
||||
return self._get_port(self.admin_server)
|
||||
|
||||
def public_request(self, port=None, **kwargs):
|
||||
kwargs['port'] = port or self._public_port()
|
||||
response = self.restful_request(**kwargs)
|
||||
self.assertValidResponseHeaders(response)
|
||||
return response
|
||||
|
||||
def admin_request(self, port=None, **kwargs):
|
||||
kwargs['port'] = port or self._admin_port()
|
||||
response = self.restful_request(**kwargs)
|
||||
self.assertValidResponseHeaders(response)
|
||||
return response
|
||||
def public_request(self, **kwargs):
|
||||
return self._request(app=self.public_app, **kwargs)
|
||||
|
||||
def admin_request(self, **kwargs):
|
||||
return self._request(app=self.admin_app, **kwargs)
|
||||
|
||||
def get_scoped_token(self):
|
||||
"""Convenience method so that we can test authenticated requests."""
|
||||
|
@ -597,12 +579,12 @@ class JsonTestCase(RestfulTestCase, CoreApiTests):
|
|||
|
||||
def _get_token_id(self, r):
|
||||
"""Applicable only to JSON."""
|
||||
return r.body['access']['token']['id']
|
||||
return r.result['access']['token']['id']
|
||||
|
||||
def assertValidErrorResponse(self, r):
|
||||
self.assertIsNotNone(r.body.get('error'))
|
||||
self.assertValidError(r.body['error'])
|
||||
self.assertEqual(r.body['error']['code'], r.status)
|
||||
self.assertIsNotNone(r.result.get('error'))
|
||||
self.assertValidError(r.result['error'])
|
||||
self.assertEqual(r.result['error']['code'], r.status_code)
|
||||
|
||||
def assertValidExtension(self, extension):
|
||||
super(JsonTestCase, self).assertValidExtension(extension)
|
||||
|
@ -614,42 +596,42 @@ class JsonTestCase(RestfulTestCase, CoreApiTests):
|
|||
self.assertValidExtensionLink(link)
|
||||
|
||||
def assertValidExtensionListResponse(self, r):
|
||||
self.assertIsNotNone(r.body.get('extensions'))
|
||||
self.assertIsNotNone(r.body['extensions'].get('values'))
|
||||
self.assertNotEmpty(r.body['extensions'].get('values'))
|
||||
for extension in r.body['extensions']['values']:
|
||||
self.assertIsNotNone(r.result.get('extensions'))
|
||||
self.assertIsNotNone(r.result['extensions'].get('values'))
|
||||
self.assertNotEmpty(r.result['extensions'].get('values'))
|
||||
for extension in r.result['extensions']['values']:
|
||||
self.assertValidExtension(extension)
|
||||
|
||||
def assertValidExtensionResponse(self, r):
|
||||
self.assertValidExtension(r.body.get('extension'))
|
||||
self.assertValidExtension(r.result.get('extension'))
|
||||
|
||||
def assertValidAuthenticationResponse(self, r,
|
||||
require_service_catalog=False):
|
||||
self.assertIsNotNone(r.body.get('access'))
|
||||
self.assertIsNotNone(r.body['access'].get('token'))
|
||||
self.assertIsNotNone(r.body['access'].get('user'))
|
||||
self.assertIsNotNone(r.result.get('access'))
|
||||
self.assertIsNotNone(r.result['access'].get('token'))
|
||||
self.assertIsNotNone(r.result['access'].get('user'))
|
||||
|
||||
# validate token
|
||||
self.assertIsNotNone(r.body['access']['token'].get('id'))
|
||||
self.assertIsNotNone(r.body['access']['token'].get('expires'))
|
||||
tenant = r.body['access']['token'].get('tenant')
|
||||
self.assertIsNotNone(r.result['access']['token'].get('id'))
|
||||
self.assertIsNotNone(r.result['access']['token'].get('expires'))
|
||||
tenant = r.result['access']['token'].get('tenant')
|
||||
if tenant is not None:
|
||||
# validate tenant
|
||||
self.assertIsNotNone(tenant.get('id'))
|
||||
self.assertIsNotNone(tenant.get('name'))
|
||||
|
||||
# validate user
|
||||
self.assertIsNotNone(r.body['access']['user'].get('id'))
|
||||
self.assertIsNotNone(r.body['access']['user'].get('name'))
|
||||
self.assertIsNotNone(r.result['access']['user'].get('id'))
|
||||
self.assertIsNotNone(r.result['access']['user'].get('name'))
|
||||
|
||||
if require_service_catalog:
|
||||
# roles are only provided with a service catalog
|
||||
roles = r.body['access']['user'].get('roles')
|
||||
roles = r.result['access']['user'].get('roles')
|
||||
self.assertNotEmpty(roles)
|
||||
for role in roles:
|
||||
self.assertIsNotNone(role.get('name'))
|
||||
|
||||
serviceCatalog = r.body['access'].get('serviceCatalog')
|
||||
serviceCatalog = r.result['access'].get('serviceCatalog')
|
||||
# validate service catalog
|
||||
if require_service_catalog:
|
||||
self.assertIsNotNone(serviceCatalog)
|
||||
|
@ -657,7 +639,7 @@ class JsonTestCase(RestfulTestCase, CoreApiTests):
|
|||
self.assertTrue(isinstance(serviceCatalog, list))
|
||||
if require_service_catalog:
|
||||
self.assertNotEmpty(serviceCatalog)
|
||||
for service in r.body['access']['serviceCatalog']:
|
||||
for service in r.result['access']['serviceCatalog']:
|
||||
# validate service
|
||||
self.assertIsNotNone(service.get('name'))
|
||||
self.assertIsNotNone(service.get('type'))
|
||||
|
@ -670,25 +652,25 @@ class JsonTestCase(RestfulTestCase, CoreApiTests):
|
|||
self.assertIsNotNone(endpoint.get('publicURL'))
|
||||
|
||||
def assertValidTenantListResponse(self, r):
|
||||
self.assertIsNotNone(r.body.get('tenants'))
|
||||
self.assertNotEmpty(r.body['tenants'])
|
||||
for tenant in r.body['tenants']:
|
||||
self.assertIsNotNone(r.result.get('tenants'))
|
||||
self.assertNotEmpty(r.result['tenants'])
|
||||
for tenant in r.result['tenants']:
|
||||
self.assertValidTenant(tenant)
|
||||
self.assertIsNotNone(tenant.get('enabled'))
|
||||
self.assertIn(tenant.get('enabled'), [True, False])
|
||||
|
||||
def assertValidUserResponse(self, r):
|
||||
self.assertIsNotNone(r.body.get('user'))
|
||||
self.assertValidUser(r.body['user'])
|
||||
self.assertIsNotNone(r.result.get('user'))
|
||||
self.assertValidUser(r.result['user'])
|
||||
|
||||
def assertValidTenantResponse(self, r):
|
||||
self.assertIsNotNone(r.body.get('tenant'))
|
||||
self.assertValidTenant(r.body['tenant'])
|
||||
self.assertIsNotNone(r.result.get('tenant'))
|
||||
self.assertValidTenant(r.result['tenant'])
|
||||
|
||||
def assertValidRoleListResponse(self, r):
|
||||
self.assertIsNotNone(r.body.get('roles'))
|
||||
self.assertNotEmpty(r.body['roles'])
|
||||
for role in r.body['roles']:
|
||||
self.assertIsNotNone(r.result.get('roles'))
|
||||
self.assertNotEmpty(r.result['roles'])
|
||||
for role in r.result['roles']:
|
||||
self.assertValidRole(role)
|
||||
|
||||
def assertValidVersion(self, version):
|
||||
|
@ -707,19 +689,19 @@ class JsonTestCase(RestfulTestCase, CoreApiTests):
|
|||
self.assertIsNotNone(media.get('type'))
|
||||
|
||||
def assertValidMultipleChoiceResponse(self, r):
|
||||
self.assertIsNotNone(r.body.get('versions'))
|
||||
self.assertIsNotNone(r.body['versions'].get('values'))
|
||||
self.assertNotEmpty(r.body['versions']['values'])
|
||||
for version in r.body['versions']['values']:
|
||||
self.assertIsNotNone(r.result.get('versions'))
|
||||
self.assertIsNotNone(r.result['versions'].get('values'))
|
||||
self.assertNotEmpty(r.result['versions']['values'])
|
||||
for version in r.result['versions']['values']:
|
||||
self.assertValidVersion(version)
|
||||
|
||||
def assertValidVersionResponse(self, r):
|
||||
self.assertValidVersion(r.body.get('version'))
|
||||
self.assertValidVersion(r.result.get('version'))
|
||||
|
||||
def assertValidEndpointListResponse(self, r):
|
||||
self.assertIsNotNone(r.body.get('endpoints'))
|
||||
self.assertNotEmpty(r.body['endpoints'])
|
||||
for endpoint in r.body['endpoints']:
|
||||
self.assertIsNotNone(r.result.get('endpoints'))
|
||||
self.assertNotEmpty(r.result['endpoints'])
|
||||
for endpoint in r.result['endpoints']:
|
||||
self.assertIsNotNone(endpoint.get('id'))
|
||||
self.assertIsNotNone(endpoint.get('name'))
|
||||
self.assertIsNotNone(endpoint.get('type'))
|
||||
|
@ -778,16 +760,15 @@ class JsonTestCase(RestfulTestCase, CoreApiTests):
|
|||
|
||||
def test_fetch_revocation_list_admin_200(self):
|
||||
token = self.get_scoped_token()
|
||||
r = self.restful_request(
|
||||
r = self.admin_request(
|
||||
method='GET',
|
||||
path='/v2.0/tokens/revoked',
|
||||
token=token,
|
||||
expected_status=200,
|
||||
port=self._admin_port())
|
||||
expected_status=200)
|
||||
self.assertValidRevocationListResponse(r)
|
||||
|
||||
def assertValidRevocationListResponse(self, response):
|
||||
self.assertIsNotNone(response.body['signed'])
|
||||
self.assertIsNotNone(response.result['signed'])
|
||||
|
||||
def test_create_update_user_json_invalid_enabled_type(self):
|
||||
# Enforce usage of boolean for 'enabled' field in JSON
|
||||
|
@ -831,18 +812,18 @@ class XmlTestCase(RestfulTestCase, CoreApiTests):
|
|||
content_type = 'xml'
|
||||
|
||||
def _get_token_id(self, r):
|
||||
return r.body.find(self._tag('token')).get('id')
|
||||
return r.result.find(self._tag('token')).get('id')
|
||||
|
||||
def _tag(self, tag_name, xmlns=None):
|
||||
"""Helper method to build an namespaced element name."""
|
||||
return '{%(ns)s}%(tag)s' % {'ns': xmlns or self.xmlns, 'tag': tag_name}
|
||||
|
||||
def assertValidErrorResponse(self, r):
|
||||
xml = r.body
|
||||
xml = r.result
|
||||
self.assertEqual(xml.tag, self._tag('error'))
|
||||
|
||||
self.assertValidError(xml)
|
||||
self.assertEqual(xml.get('code'), str(r.status))
|
||||
self.assertEqual(xml.get('code'), str(r.status_code))
|
||||
|
||||
def assertValidExtension(self, extension):
|
||||
super(XmlTestCase, self).assertValidExtension(extension)
|
||||
|
@ -855,7 +836,7 @@ class XmlTestCase(RestfulTestCase, CoreApiTests):
|
|||
self.assertValidExtensionLink(link)
|
||||
|
||||
def assertValidExtensionListResponse(self, r):
|
||||
xml = r.body
|
||||
xml = r.result
|
||||
self.assertEqual(xml.tag, self._tag('extensions'))
|
||||
|
||||
self.assertNotEmpty(xml.findall(self._tag('extension')))
|
||||
|
@ -863,7 +844,7 @@ class XmlTestCase(RestfulTestCase, CoreApiTests):
|
|||
self.assertValidExtension(extension)
|
||||
|
||||
def assertValidExtensionResponse(self, r):
|
||||
xml = r.body
|
||||
xml = r.result
|
||||
self.assertEqual(xml.tag, self._tag('extension'))
|
||||
|
||||
self.assertValidExtension(xml)
|
||||
|
@ -886,7 +867,7 @@ class XmlTestCase(RestfulTestCase, CoreApiTests):
|
|||
self.assertIsNotNone(media.get('type'))
|
||||
|
||||
def assertValidMultipleChoiceResponse(self, r):
|
||||
xml = r.body
|
||||
xml = r.result
|
||||
self.assertEqual(xml.tag, self._tag('versions'))
|
||||
|
||||
self.assertNotEmpty(xml.findall(self._tag('version')))
|
||||
|
@ -894,13 +875,13 @@ class XmlTestCase(RestfulTestCase, CoreApiTests):
|
|||
self.assertValidVersion(version)
|
||||
|
||||
def assertValidVersionResponse(self, r):
|
||||
xml = r.body
|
||||
xml = r.result
|
||||
self.assertEqual(xml.tag, self._tag('version'))
|
||||
|
||||
self.assertValidVersion(xml)
|
||||
|
||||
def assertValidEndpointListResponse(self, r):
|
||||
xml = r.body
|
||||
xml = r.result
|
||||
self.assertEqual(xml.tag, self._tag('endpoints'))
|
||||
|
||||
self.assertNotEmpty(xml.findall(self._tag('endpoint')))
|
||||
|
@ -913,28 +894,28 @@ class XmlTestCase(RestfulTestCase, CoreApiTests):
|
|||
self.assertIsNotNone(endpoint.get('adminURL'))
|
||||
|
||||
def assertValidTenantResponse(self, r):
|
||||
xml = r.body
|
||||
xml = r.result
|
||||
self.assertEqual(xml.tag, self._tag('tenant'))
|
||||
|
||||
self.assertValidTenant(xml)
|
||||
|
||||
def assertValidUserResponse(self, r):
|
||||
xml = r.body
|
||||
xml = r.result
|
||||
self.assertEqual(xml.tag, self._tag('user'))
|
||||
|
||||
self.assertValidUser(xml)
|
||||
|
||||
def assertValidRoleListResponse(self, r):
|
||||
xml = r.body
|
||||
xml = r.result
|
||||
self.assertEqual(xml.tag, self._tag('roles'))
|
||||
|
||||
self.assertNotEmpty(r.body.findall(self._tag('role')))
|
||||
for role in r.body.findall(self._tag('role')):
|
||||
self.assertNotEmpty(r.result.findall(self._tag('role')))
|
||||
for role in r.result.findall(self._tag('role')):
|
||||
self.assertValidRole(role)
|
||||
|
||||
def assertValidAuthenticationResponse(self, r,
|
||||
require_service_catalog=False):
|
||||
xml = r.body
|
||||
xml = r.result
|
||||
self.assertEqual(xml.tag, self._tag('access'))
|
||||
|
||||
# validate token
|
||||
|
@ -981,18 +962,17 @@ class XmlTestCase(RestfulTestCase, CoreApiTests):
|
|||
self.assertIsNotNone(endpoint.get('publicURL'))
|
||||
|
||||
def assertValidTenantListResponse(self, r):
|
||||
xml = r.body
|
||||
xml = r.result
|
||||
self.assertEqual(xml.tag, self._tag('tenants'))
|
||||
|
||||
self.assertNotEmpty(r.body)
|
||||
for tenant in r.body.findall(self._tag('tenant')):
|
||||
self.assertNotEmpty(r.result)
|
||||
for tenant in r.result.findall(self._tag('tenant')):
|
||||
self.assertValidTenant(tenant)
|
||||
self.assertIn(tenant.get('enabled'), ['true', 'false'])
|
||||
|
||||
def test_authenticate_with_invalid_xml_in_password(self):
|
||||
# public_request would auto escape the ampersand
|
||||
self.request(
|
||||
port=self._public_port(),
|
||||
self.public_request(
|
||||
method='POST',
|
||||
path='/v2.0/tokens',
|
||||
headers={
|
||||
|
@ -1005,15 +985,15 @@ class XmlTestCase(RestfulTestCase, CoreApiTests):
|
|||
<passwordCredentials username="FOO" password="&"/>
|
||||
</auth>
|
||||
""",
|
||||
expected_status=400)
|
||||
expected_status=400,
|
||||
convert=False)
|
||||
|
||||
def test_add_tenant_xml(self):
|
||||
"""
|
||||
verify create a tenant without providing description field
|
||||
"""
|
||||
token = self.get_scoped_token()
|
||||
r = self.request(
|
||||
port=self._admin_port(),
|
||||
r = self.admin_request(
|
||||
method='POST',
|
||||
path='/v2.0/tenants',
|
||||
headers={
|
||||
|
@ -1026,19 +1006,19 @@ class XmlTestCase(RestfulTestCase, CoreApiTests):
|
|||
enabled="true" name="ACME Corp">
|
||||
<description></description>
|
||||
</tenant>
|
||||
""")
|
||||
""",
|
||||
convert=False)
|
||||
self._from_content_type(r, 'json')
|
||||
self.assertIsNotNone(r.body.get('tenant'))
|
||||
self.assertValidTenant(r.body['tenant'])
|
||||
self.assertEqual(r.body['tenant'].get('description'), "")
|
||||
self.assertIsNotNone(r.result.get('tenant'))
|
||||
self.assertValidTenant(r.result['tenant'])
|
||||
self.assertEqual(r.result['tenant'].get('description'), "")
|
||||
|
||||
def test_add_tenant_json(self):
|
||||
"""
|
||||
verify create a tenant without providing description field
|
||||
"""
|
||||
token = self.get_scoped_token()
|
||||
r = self.request(
|
||||
port=self._admin_port(),
|
||||
r = self.admin_request(
|
||||
method='POST',
|
||||
path='/v2.0/tenants',
|
||||
headers={
|
||||
|
@ -1051,8 +1031,9 @@ class XmlTestCase(RestfulTestCase, CoreApiTests):
|
|||
"description":"",
|
||||
"enabled":"true"}
|
||||
}
|
||||
""")
|
||||
""",
|
||||
convert=False)
|
||||
self._from_content_type(r, 'json')
|
||||
self.assertIsNotNone(r.body.get('tenant'))
|
||||
self.assertValidTenant(r.body['tenant'])
|
||||
self.assertEqual(r.body['tenant'].get('description'), "")
|
||||
self.assertIsNotNone(r.result.get('tenant'))
|
||||
self.assertValidTenant(r.result['tenant'])
|
||||
self.assertEqual(r.result['tenant'].get('description'), "")
|
||||
|
|
|
@ -2,6 +2,7 @@ import datetime
|
|||
import uuid
|
||||
|
||||
from lxml import etree
|
||||
import webtest
|
||||
|
||||
from keystone import auth
|
||||
from keystone.common import serializer
|
||||
|
@ -38,6 +39,11 @@ class RestfulTestCase(test_content_types.RestfulTestCase):
|
|||
sql_util.setup_test_database()
|
||||
self.load_backends()
|
||||
|
||||
self.public_app = webtest.TestApp(
|
||||
self.loadapp('keystone', name='main'))
|
||||
self.admin_app = webtest.TestApp(
|
||||
self.loadapp('keystone', name='admin'))
|
||||
|
||||
if load_sample_data:
|
||||
self.domain_id = uuid.uuid4().hex
|
||||
self.domain = self.new_domain_ref()
|
||||
|
@ -204,8 +210,8 @@ class RestfulTestCase(test_content_types.RestfulTestCase):
|
|||
|
||||
"""
|
||||
r = super(RestfulTestCase, self).admin_request(*args, **kwargs)
|
||||
if r.getheader('Content-Type') == 'application/xml':
|
||||
r.body = serializer.from_xml(etree.tostring(r.body))
|
||||
if r.headers.get('Content-Type') == 'application/xml':
|
||||
r.result = serializer.from_xml(etree.tostring(r.result))
|
||||
return r
|
||||
|
||||
def get_scoped_token(self):
|
||||
|
@ -234,7 +240,7 @@ class RestfulTestCase(test_content_types.RestfulTestCase):
|
|||
}
|
||||
}
|
||||
})
|
||||
return r.getheader('X-Subject-Token')
|
||||
return r.headers.get('X-Subject-Token')
|
||||
|
||||
def get_requested_token(self, auth):
|
||||
"""Request the specific token we want."""
|
||||
|
@ -243,7 +249,7 @@ class RestfulTestCase(test_content_types.RestfulTestCase):
|
|||
method='POST',
|
||||
path='/v3/auth/tokens',
|
||||
body=auth)
|
||||
return r.getheader('X-Subject-Token')
|
||||
return r.headers.get('X-Subject-Token')
|
||||
|
||||
def v3_request(self, path, **kwargs):
|
||||
# Check if the caller has passed in auth details for
|
||||
|
@ -296,15 +302,15 @@ class RestfulTestCase(test_content_types.RestfulTestCase):
|
|||
return r
|
||||
|
||||
def assertValidErrorResponse(self, r):
|
||||
if r.getheader('Content-Type') == 'application/xml':
|
||||
resp = serializer.from_xml(etree.tostring(r.body))
|
||||
if r.headers.get('Content-Type') == 'application/xml':
|
||||
resp = serializer.from_xml(etree.tostring(r.result))
|
||||
else:
|
||||
resp = r.body
|
||||
resp = r.result
|
||||
self.assertIsNotNone(resp.get('error'))
|
||||
self.assertIsNotNone(resp['error'].get('code'))
|
||||
self.assertIsNotNone(resp['error'].get('title'))
|
||||
self.assertIsNotNone(resp['error'].get('message'))
|
||||
self.assertEqual(int(resp['error']['code']), r.status)
|
||||
self.assertEqual(int(resp['error']['code']), r.status_code)
|
||||
|
||||
def assertValidListLinks(self, links):
|
||||
self.assertIsNotNone(links)
|
||||
|
@ -331,7 +337,7 @@ class RestfulTestCase(test_content_types.RestfulTestCase):
|
|||
response, and asserted to be equal.
|
||||
|
||||
"""
|
||||
entities = resp.body.get(key)
|
||||
entities = resp.result.get(key)
|
||||
self.assertIsNotNone(entities)
|
||||
|
||||
if expected_length is not None:
|
||||
|
@ -341,7 +347,7 @@ class RestfulTestCase(test_content_types.RestfulTestCase):
|
|||
self.assertNotEmpty(entities)
|
||||
|
||||
# collections should have relational links
|
||||
self.assertValidListLinks(resp.body.get('links'))
|
||||
self.assertValidListLinks(resp.result.get('links'))
|
||||
|
||||
for entity in entities:
|
||||
self.assertIsNotNone(entity)
|
||||
|
@ -356,7 +362,7 @@ class RestfulTestCase(test_content_types.RestfulTestCase):
|
|||
def assertValidResponse(self, resp, key, entity_validator, *args,
|
||||
**kwargs):
|
||||
"""Make assertions common to all API responses."""
|
||||
entity = resp.body.get(key)
|
||||
entity = resp.result.get(key)
|
||||
self.assertIsNotNone(entity)
|
||||
self.assertValidEntity(entity, *args, **kwargs)
|
||||
entity_validator(entity, *args, **kwargs)
|
||||
|
@ -397,8 +403,8 @@ class RestfulTestCase(test_content_types.RestfulTestCase):
|
|||
self.assertTrue(isinstance(dt, datetime.datetime))
|
||||
|
||||
def assertValidTokenResponse(self, r, user=None):
|
||||
self.assertTrue(r.getheader('X-Subject-Token'))
|
||||
token = r.body['token']
|
||||
self.assertTrue(r.headers.get('X-Subject-Token'))
|
||||
token = r.result['token']
|
||||
|
||||
self.assertIsNotNone(token.get('expires_at'))
|
||||
expires_at = self.assertValidISO8601ExtendedFormatDatetime(
|
||||
|
|
|
@ -104,9 +104,9 @@ class TestTokenAPIs(test_v3.RestfulTestCase):
|
|||
user_domain_id=self.domain_id,
|
||||
password=self.user['password'])
|
||||
resp = self.post('/auth/tokens', body=auth_data)
|
||||
self.token_data = resp.body
|
||||
self.token = resp.getheader('X-Subject-Token')
|
||||
self.headers = {'X-Subject-Token': resp.getheader('X-Subject-Token')}
|
||||
self.token_data = resp.result
|
||||
self.token = resp.headers.get('X-Subject-Token')
|
||||
self.headers = {'X-Subject-Token': resp.headers.get('X-Subject-Token')}
|
||||
|
||||
def test_default_fixture_scope_token(self):
|
||||
self.assertIsNotNone(self.get_scoped_token())
|
||||
|
@ -117,8 +117,8 @@ class TestTokenAPIs(test_v3.RestfulTestCase):
|
|||
user_id=self.user['id'],
|
||||
password=self.user['password'])
|
||||
resp = self.post('/auth/tokens', body=auth_data)
|
||||
token_data = resp.body
|
||||
token_id = resp.getheader('X-Subject-Token')
|
||||
token_data = resp.result
|
||||
token_id = resp.headers.get('X-Subject-Token')
|
||||
self.assertIn('expires_at', token_data['token'])
|
||||
token_signed = cms.cms_sign_token(json.dumps(token_data),
|
||||
CONF.signing.certfile,
|
||||
|
@ -131,8 +131,8 @@ class TestTokenAPIs(test_v3.RestfulTestCase):
|
|||
user_id=self.user['id'],
|
||||
password=self.user['password'])
|
||||
resp = self.post('/auth/tokens', body=auth_data)
|
||||
token_data = resp.body
|
||||
token = resp.getheader('X-Subject-Token')
|
||||
token_data = resp.result
|
||||
token = resp.headers.get('X-Subject-Token')
|
||||
|
||||
# now validate the v3 token with v2 API
|
||||
path = '/v2.0/tokens/%s' % (token)
|
||||
|
@ -152,8 +152,8 @@ class TestTokenAPIs(test_v3.RestfulTestCase):
|
|||
password=self.user['password'],
|
||||
domain_id=self.domain['id'])
|
||||
resp = self.post('/auth/tokens', body=auth_data)
|
||||
token_data = resp.body
|
||||
token = resp.getheader('X-Subject-Token')
|
||||
token_data = resp.result
|
||||
token = resp.headers.get('X-Subject-Token')
|
||||
|
||||
# now validate the v3 token with v2 API
|
||||
path = '/v2.0/tokens/%s' % (token)
|
||||
|
@ -168,8 +168,8 @@ class TestTokenAPIs(test_v3.RestfulTestCase):
|
|||
password=self.default_domain_user['password'],
|
||||
project_id=self.project['id'])
|
||||
resp = self.post('/auth/tokens', body=auth_data)
|
||||
token_data = resp.body
|
||||
token = resp.getheader('X-Subject-Token')
|
||||
token_data = resp.result
|
||||
token = resp.headers.get('X-Subject-Token')
|
||||
|
||||
# now validate the v3 token with v2 API
|
||||
path = '/v2.0/tokens/%s' % (token)
|
||||
|
@ -184,15 +184,15 @@ class TestTokenAPIs(test_v3.RestfulTestCase):
|
|||
user_id=self.default_domain_user['id'],
|
||||
password=self.default_domain_user['password'])
|
||||
resp = self.post('/auth/tokens', body=auth_data)
|
||||
token_data = resp.body
|
||||
token = resp.getheader('X-Subject-Token')
|
||||
token_data = resp.result
|
||||
token = resp.headers.get('X-Subject-Token')
|
||||
|
||||
# now validate the v3 token with v2 API
|
||||
path = '/v2.0/tokens/%s' % (token)
|
||||
resp = self.admin_request(path=path,
|
||||
token='ADMIN',
|
||||
method='GET')
|
||||
v2_token = resp.body
|
||||
v2_token = resp.result
|
||||
self.assertEqual(v2_token['access']['user']['id'],
|
||||
token_data['token']['user']['id'])
|
||||
# v2 token time has not fraction of second precision so
|
||||
|
@ -206,15 +206,15 @@ class TestTokenAPIs(test_v3.RestfulTestCase):
|
|||
user_id=self.default_domain_user['id'],
|
||||
password=self.default_domain_user['password'])
|
||||
resp = self.post('/auth/tokens', body=auth_data)
|
||||
token_data = resp.body
|
||||
token = resp.getheader('X-Subject-Token')
|
||||
token_data = resp.result
|
||||
token = resp.headers.get('X-Subject-Token')
|
||||
|
||||
# now validate the v3 token with v2 API
|
||||
path = '/v2.0/tokens/%s' % (token)
|
||||
resp = self.admin_request(path=path,
|
||||
token='ADMIN',
|
||||
method='GET')
|
||||
v2_token = resp.body
|
||||
v2_token = resp.result
|
||||
self.assertEqual(v2_token['access']['user']['id'],
|
||||
token_data['token']['user']['id'])
|
||||
# v2 token time has not fraction of second precision so
|
||||
|
@ -231,15 +231,15 @@ class TestTokenAPIs(test_v3.RestfulTestCase):
|
|||
password=self.default_domain_user['password'],
|
||||
project_id=self.default_domain_project['id'])
|
||||
resp = self.post('/auth/tokens', body=auth_data)
|
||||
token_data = resp.body
|
||||
token = resp.getheader('X-Subject-Token')
|
||||
token_data = resp.result
|
||||
token = resp.headers.get('X-Subject-Token')
|
||||
|
||||
# now validate the v3 token with v2 API
|
||||
path = '/v2.0/tokens/%s' % (token)
|
||||
resp = self.admin_request(path=path,
|
||||
token='ADMIN',
|
||||
method='GET')
|
||||
v2_token = resp.body
|
||||
v2_token = resp.result
|
||||
self.assertEqual(v2_token['access']['user']['id'],
|
||||
token_data['token']['user']['id'])
|
||||
# v2 token time has not fraction of second precision so
|
||||
|
@ -258,15 +258,15 @@ class TestTokenAPIs(test_v3.RestfulTestCase):
|
|||
password=self.default_domain_user['password'],
|
||||
project_id=self.default_domain_project['id'])
|
||||
resp = self.post('/auth/tokens', body=auth_data)
|
||||
token_data = resp.body
|
||||
token = resp.getheader('X-Subject-Token')
|
||||
token_data = resp.result
|
||||
token = resp.headers.get('X-Subject-Token')
|
||||
|
||||
# now validate the v3 token with v2 API
|
||||
path = '/v2.0/tokens/%s' % (token)
|
||||
resp = self.admin_request(path=path,
|
||||
token='ADMIN',
|
||||
method='GET')
|
||||
v2_token = resp.body
|
||||
v2_token = resp.result
|
||||
self.assertEqual(v2_token['access']['user']['id'],
|
||||
token_data['token']['user']['id'])
|
||||
# v2 token time has not fraction of second precision so
|
||||
|
@ -288,11 +288,11 @@ class TestTokenAPIs(test_v3.RestfulTestCase):
|
|||
resp = self.admin_request(path='/v2.0/tokens',
|
||||
method='POST',
|
||||
body=body)
|
||||
v2_token_data = resp.body
|
||||
v2_token_data = resp.result
|
||||
v2_token = v2_token_data['access']['token']['id']
|
||||
headers = {'X-Subject-Token': v2_token}
|
||||
resp = self.get('/auth/tokens', headers=headers)
|
||||
token_data = resp.body
|
||||
token_data = resp.result
|
||||
self.assertEqual(v2_token_data['access']['user']['id'],
|
||||
token_data['token']['user']['id'])
|
||||
# v2 token time has not fraction of second precision so
|
||||
|
@ -312,11 +312,11 @@ class TestTokenAPIs(test_v3.RestfulTestCase):
|
|||
resp = self.admin_request(path='/v2.0/tokens',
|
||||
method='POST',
|
||||
body=body)
|
||||
v2_token_data = resp.body
|
||||
v2_token_data = resp.result
|
||||
v2_token = v2_token_data['access']['token']['id']
|
||||
headers = {'X-Subject-Token': v2_token}
|
||||
resp = self.get('/auth/tokens', headers=headers)
|
||||
token_data = resp.body
|
||||
token_data = resp.result
|
||||
self.assertEqual(v2_token_data['access']['user']['id'],
|
||||
token_data['token']['user']['id'])
|
||||
# v2 token time has not fraction of second precision so
|
||||
|
@ -337,11 +337,11 @@ class TestTokenAPIs(test_v3.RestfulTestCase):
|
|||
resp = self.admin_request(path='/v2.0/tokens',
|
||||
method='POST',
|
||||
body=body)
|
||||
v2_token_data = resp.body
|
||||
v2_token_data = resp.result
|
||||
v2_token = v2_token_data['access']['token']['id']
|
||||
headers = {'X-Subject-Token': v2_token}
|
||||
resp = self.get('/auth/tokens', headers=headers)
|
||||
token_data = resp.body
|
||||
token_data = resp.result
|
||||
self.assertEqual(v2_token_data['access']['user']['id'],
|
||||
token_data['token']['user']['id'])
|
||||
# v2 token time has not fraction of second precision so
|
||||
|
@ -364,11 +364,11 @@ class TestTokenAPIs(test_v3.RestfulTestCase):
|
|||
resp = self.admin_request(path='/v2.0/tokens',
|
||||
method='POST',
|
||||
body=body)
|
||||
v2_token_data = resp.body
|
||||
v2_token_data = resp.result
|
||||
v2_token = v2_token_data['access']['token']['id']
|
||||
headers = {'X-Subject-Token': v2_token}
|
||||
resp = self.get('/auth/tokens', headers=headers)
|
||||
token_data = resp.body
|
||||
token_data = resp.result
|
||||
self.assertEqual(v2_token_data['access']['user']['id'],
|
||||
token_data['token']['user']['id'])
|
||||
# v2 token time has not fraction of second precision so
|
||||
|
@ -386,7 +386,7 @@ class TestTokenAPIs(test_v3.RestfulTestCase):
|
|||
r = self.post('/auth/tokens', body=auth_data)
|
||||
self.assertValidProjectScopedTokenResponse(r)
|
||||
# make sure expires stayed the same
|
||||
self.assertEqual(expires, r.body['token']['expires_at'])
|
||||
self.assertEqual(expires, r.result['token']['expires_at'])
|
||||
|
||||
def test_check_token(self):
|
||||
self.head('/auth/tokens', headers=self.headers, expected_status=204)
|
||||
|
@ -402,7 +402,7 @@ class TestTokenAPIs(test_v3.RestfulTestCase):
|
|||
|
||||
# make sure we have a CRL
|
||||
r = self.get('/auth/tokens/OS-PKI/revoked')
|
||||
self.assertIn('signed', r.body)
|
||||
self.assertIn('signed', r.result)
|
||||
|
||||
|
||||
class TestTokenRevoking(test_v3.RestfulTestCase):
|
||||
|
@ -517,7 +517,7 @@ class TestTokenRevoking(test_v3.RestfulTestCase):
|
|||
password=self.user1['password'],
|
||||
project_id=self.projectA['id'])
|
||||
resp = self.post('/auth/tokens', body=auth_data)
|
||||
token = resp.getheader('X-Subject-Token')
|
||||
token = resp.headers.get('X-Subject-Token')
|
||||
# Confirm token is valid
|
||||
self.head('/auth/tokens',
|
||||
headers={'X-Subject-Token': token},
|
||||
|
@ -548,7 +548,7 @@ class TestTokenRevoking(test_v3.RestfulTestCase):
|
|||
password=self.user1['password'],
|
||||
project_id=self.projectA['id'])
|
||||
resp = self.post('/auth/tokens', body=auth_data)
|
||||
token = resp.getheader('X-Subject-Token')
|
||||
token = resp.headers.get('X-Subject-Token')
|
||||
# Confirm token is valid
|
||||
self.head('/auth/tokens',
|
||||
headers={'X-Subject-Token': token},
|
||||
|
@ -583,19 +583,19 @@ class TestTokenRevoking(test_v3.RestfulTestCase):
|
|||
password=self.user1['password'],
|
||||
project_id=self.projectA['id'])
|
||||
resp = self.post('/auth/tokens', body=auth_data)
|
||||
token1 = resp.getheader('X-Subject-Token')
|
||||
token1 = resp.headers.get('X-Subject-Token')
|
||||
auth_data = self.build_authentication_request(
|
||||
user_id=self.user2['id'],
|
||||
password=self.user2['password'],
|
||||
project_id=self.projectA['id'])
|
||||
resp = self.post('/auth/tokens', body=auth_data)
|
||||
token2 = resp.getheader('X-Subject-Token')
|
||||
token2 = resp.headers.get('X-Subject-Token')
|
||||
auth_data = self.build_authentication_request(
|
||||
user_id=self.user3['id'],
|
||||
password=self.user3['password'],
|
||||
project_id=self.projectA['id'])
|
||||
resp = self.post('/auth/tokens', body=auth_data)
|
||||
token3 = resp.getheader('X-Subject-Token')
|
||||
token3 = resp.headers.get('X-Subject-Token')
|
||||
# Confirm tokens are valid
|
||||
self.head('/auth/tokens',
|
||||
headers={'X-Subject-Token': token1},
|
||||
|
@ -640,7 +640,7 @@ class TestTokenRevoking(test_v3.RestfulTestCase):
|
|||
password=self.user1['password'],
|
||||
project_id=self.projectA['id'])
|
||||
resp = self.post('/auth/tokens', body=auth_data)
|
||||
token = resp.getheader('X-Subject-Token')
|
||||
token = resp.headers.get('X-Subject-Token')
|
||||
# Confirm token is valid
|
||||
self.head('/auth/tokens',
|
||||
headers={'X-Subject-Token': token},
|
||||
|
@ -676,13 +676,13 @@ class TestTokenRevoking(test_v3.RestfulTestCase):
|
|||
password=self.user1['password'],
|
||||
project_id=self.projectA['id'])
|
||||
resp = self.post('/auth/tokens', body=auth_data)
|
||||
token1 = resp.getheader('X-Subject-Token')
|
||||
token1 = resp.headers.get('X-Subject-Token')
|
||||
auth_data = self.build_authentication_request(
|
||||
user_id=self.user2['id'],
|
||||
password=self.user2['password'],
|
||||
project_id=self.projectA['id'])
|
||||
resp = self.post('/auth/tokens', body=auth_data)
|
||||
token2 = resp.getheader('X-Subject-Token')
|
||||
token2 = resp.headers.get('X-Subject-Token')
|
||||
# Confirm tokens are valid
|
||||
self.head('/auth/tokens',
|
||||
headers={'X-Subject-Token': token1},
|
||||
|
@ -771,7 +771,7 @@ class TestAuthJSON(test_v3.RestfulTestCase):
|
|||
password=self.user['password'])
|
||||
r = self.post('/auth/tokens', body=auth_data)
|
||||
self.assertValidProjectScopedTokenResponse(r)
|
||||
self.assertEqual(r.body['token']['project']['id'], project['id'])
|
||||
self.assertEqual(r.result['token']['project']['id'], project['id'])
|
||||
|
||||
def test_default_project_id_scoped_token_with_user_id_401(self):
|
||||
# create a second project to work with
|
||||
|
@ -950,8 +950,8 @@ class TestAuthJSON(test_v3.RestfulTestCase):
|
|||
r = self.post('/auth/tokens', body=auth_data)
|
||||
self.assertValidUnscopedTokenResponse(r)
|
||||
|
||||
token = r.getheader('X-Subject-Token')
|
||||
headers = {'X-Subject-Token': r.getheader('X-Subject-Token')}
|
||||
token = r.headers.get('X-Subject-Token')
|
||||
headers = {'X-Subject-Token': r.headers.get('X-Subject-Token')}
|
||||
|
||||
# test token auth
|
||||
auth_data = self.build_authentication_request(token=token)
|
||||
|
@ -1195,7 +1195,7 @@ class TestTrustAuth(TestAuthInfo):
|
|||
self.assertValidProjectTrustScopedTokenResponse(
|
||||
r, self.default_domain_user)
|
||||
|
||||
token = r.getheader('X-Subject-Token')
|
||||
token = r.headers.get('X-Subject-Token')
|
||||
|
||||
# now validate the v3 token with v2 API
|
||||
path = '/v2.0/tokens/%s' % (token)
|
||||
|
@ -1219,7 +1219,7 @@ class TestTrustAuth(TestAuthInfo):
|
|||
password=self.default_domain_user['password'],
|
||||
project_id=self.default_domain_project_id)
|
||||
r = self.post('/auth/tokens', body=auth_data)
|
||||
token = r.getheader('X-Subject-Token')
|
||||
token = r.headers.get('X-Subject-Token')
|
||||
|
||||
r = self.post('/OS-TRUST/trusts', body={'trust': ref}, token=token)
|
||||
trust = self.assertValidTrustResponse(r)
|
||||
|
@ -1231,7 +1231,7 @@ class TestTrustAuth(TestAuthInfo):
|
|||
r = self.post('/auth/tokens', body=auth_data)
|
||||
self.assertValidProjectTrustScopedTokenResponse(
|
||||
r, self.trustee_user)
|
||||
token = r.getheader('X-Subject-Token')
|
||||
token = r.headers.get('X-Subject-Token')
|
||||
|
||||
# now validate the v3 token with v2 API
|
||||
path = '/v2.0/tokens/%s' % (token)
|
||||
|
@ -1261,7 +1261,7 @@ class TestTrustAuth(TestAuthInfo):
|
|||
password=self.default_domain_user['password'],
|
||||
project_id=self.default_domain_project_id)
|
||||
r = self.post('/auth/tokens', body=auth_data)
|
||||
token = r.getheader('X-Subject-Token')
|
||||
token = r.headers.get('X-Subject-Token')
|
||||
|
||||
r = self.post('/OS-TRUST/trusts', body={'trust': ref}, token=token)
|
||||
trust = self.assertValidTrustResponse(r)
|
||||
|
@ -1273,7 +1273,7 @@ class TestTrustAuth(TestAuthInfo):
|
|||
r = self.post('/auth/tokens', body=auth_data)
|
||||
self.assertValidProjectTrustScopedTokenResponse(
|
||||
r, trustee_user)
|
||||
token = r.getheader('X-Subject-Token')
|
||||
token = r.headers.get('X-Subject-Token')
|
||||
|
||||
# now validate the v3 token with v2 API
|
||||
path = '/v2.0/tokens/%s' % (token)
|
||||
|
@ -1302,7 +1302,7 @@ class TestTrustAuth(TestAuthInfo):
|
|||
password=self.default_domain_user['password'],
|
||||
project_id=self.default_domain_project_id)
|
||||
r = self.post('/auth/tokens', body=auth_data)
|
||||
token = r.getheader('X-Subject-Token')
|
||||
token = r.headers.get('X-Subject-Token')
|
||||
|
||||
r = self.post('/OS-TRUST/trusts', body={'trust': ref}, token=token)
|
||||
trust = self.assertValidTrustResponse(r)
|
||||
|
@ -1314,7 +1314,7 @@ class TestTrustAuth(TestAuthInfo):
|
|||
r = self.post('/auth/tokens', body=auth_data)
|
||||
self.assertValidProjectTrustScopedTokenResponse(
|
||||
r, trustee_user)
|
||||
token = r.getheader('X-Subject-Token')
|
||||
token = r.headers.get('X-Subject-Token')
|
||||
|
||||
# now validate the v3 token with v2 API
|
||||
path = '/v2.0/tokens/%s' % (token)
|
||||
|
@ -1342,16 +1342,17 @@ class TestTrustAuth(TestAuthInfo):
|
|||
trust_id=trust['id'])
|
||||
r = self.post('/auth/tokens', body=auth_data)
|
||||
self.assertValidProjectTrustScopedTokenResponse(r, self.trustee_user)
|
||||
self.assertEqual(r.body['token']['user']['id'],
|
||||
self.assertEqual(r.result['token']['user']['id'],
|
||||
self.trustee_user['id'])
|
||||
self.assertEqual(r.body['token']['user']['name'],
|
||||
self.assertEqual(r.result['token']['user']['name'],
|
||||
self.trustee_user['name'])
|
||||
self.assertEqual(r.body['token']['user']['domain']['id'],
|
||||
self.assertEqual(r.result['token']['user']['domain']['id'],
|
||||
self.domain['id'])
|
||||
self.assertEqual(r.body['token']['user']['domain']['name'],
|
||||
self.assertEqual(r.result['token']['user']['domain']['name'],
|
||||
self.domain['name'])
|
||||
self.assertEqual(r.body['token']['project']['id'], self.project['id'])
|
||||
self.assertEqual(r.body['token']['project']['name'],
|
||||
self.assertEqual(r.result['token']['project']['id'],
|
||||
self.project['id'])
|
||||
self.assertEqual(r.result['token']['project']['name'],
|
||||
self.project['name'])
|
||||
|
||||
def test_exercise_trust_scoped_token_with_impersonation(self):
|
||||
|
@ -1373,14 +1374,15 @@ class TestTrustAuth(TestAuthInfo):
|
|||
trust_id=trust['id'])
|
||||
r = self.post('/auth/tokens', body=auth_data)
|
||||
self.assertValidProjectTrustScopedTokenResponse(r, self.user)
|
||||
self.assertEqual(r.body['token']['user']['id'], self.user['id'])
|
||||
self.assertEqual(r.body['token']['user']['name'], self.user['name'])
|
||||
self.assertEqual(r.body['token']['user']['domain']['id'],
|
||||
self.assertEqual(r.result['token']['user']['id'], self.user['id'])
|
||||
self.assertEqual(r.result['token']['user']['name'], self.user['name'])
|
||||
self.assertEqual(r.result['token']['user']['domain']['id'],
|
||||
self.domain['id'])
|
||||
self.assertEqual(r.body['token']['user']['domain']['name'],
|
||||
self.assertEqual(r.result['token']['user']['domain']['name'],
|
||||
self.domain['name'])
|
||||
self.assertEqual(r.body['token']['project']['id'], self.project['id'])
|
||||
self.assertEqual(r.body['token']['project']['name'],
|
||||
self.assertEqual(r.result['token']['project']['id'],
|
||||
self.project['id'])
|
||||
self.assertEqual(r.result['token']['project']['name'],
|
||||
self.project['name'])
|
||||
|
||||
def test_delete_trust(self):
|
||||
|
@ -1431,12 +1433,12 @@ class TestTrustAuth(TestAuthInfo):
|
|||
|
||||
r = self.get('/OS-TRUST/trusts?trustor_user_id=%s' %
|
||||
self.user_id, expected_status=200)
|
||||
trusts = r.body['trusts']
|
||||
trusts = r.result['trusts']
|
||||
self.assertEqual(len(trusts), 3)
|
||||
|
||||
r = self.get('/OS-TRUST/trusts?trustee_user_id=%s' %
|
||||
self.user_id, expected_status=200)
|
||||
trusts = r.body['trusts']
|
||||
trusts = r.result['trusts']
|
||||
self.assertEqual(len(trusts), 0)
|
||||
|
||||
def test_change_password_invalidates_trust_tokens(self):
|
||||
|
@ -1459,7 +1461,7 @@ class TestTrustAuth(TestAuthInfo):
|
|||
r = self.post('/auth/tokens', body=auth_data)
|
||||
|
||||
self.assertValidProjectTrustScopedTokenResponse(r, self.user)
|
||||
trust_token = r.getheader('X-Subject-Token')
|
||||
trust_token = r.headers.get('X-Subject-Token')
|
||||
|
||||
self.get('/OS-TRUST/trusts?trustor_user_id=%s' %
|
||||
self.user_id, expected_status=200,
|
||||
|
|
|
@ -84,7 +84,7 @@ class CatalogTestCase(test_v3.RestfulTestCase):
|
|||
self.assertValidEndpointResponse(r, ref)
|
||||
|
||||
def assertValidErrorResponse(self, response):
|
||||
self.assertTrue(response.status in [400])
|
||||
self.assertTrue(response.status_code in [400])
|
||||
|
||||
def test_create_endpoint_400(self):
|
||||
"""POST /endpoints"""
|
||||
|
@ -135,7 +135,7 @@ class CatalogTestCase(test_v3.RestfulTestCase):
|
|||
path='/v2.0/endpoints',
|
||||
token=self.get_scoped_token(),
|
||||
body={'endpoint': ref})
|
||||
endpoint_v2 = r.body['endpoint']
|
||||
endpoint_v2 = r.result['endpoint']
|
||||
|
||||
# test the endpoint on v3
|
||||
r = self.get('/endpoints')
|
||||
|
|
|
@ -402,7 +402,7 @@ class IdentityTestCase(test_v3.RestfulTestCase):
|
|||
'group_id': self.group_id})
|
||||
self.assertValidUserListResponse(r, ref=self.user)
|
||||
self.assertIn('/groups/%(group_id)s/users' % {
|
||||
'group_id': self.group_id}, r.body['links']['self'])
|
||||
'group_id': self.group_id}, r.result['links']['self'])
|
||||
|
||||
def test_remove_user_from_group(self):
|
||||
"""DELETE /groups/{group_id}/users/{user_id}"""
|
||||
|
@ -450,7 +450,7 @@ class IdentityTestCase(test_v3.RestfulTestCase):
|
|||
password=self.user['password'],
|
||||
project_id=self.project['id'])
|
||||
resp = self.post('/auth/tokens', body=auth_data)
|
||||
token = resp.getheader('X-Subject-Token')
|
||||
token = resp.headers.get('X-Subject-Token')
|
||||
# Confirm token is valid for now
|
||||
self.head('/auth/tokens',
|
||||
headers={'X-Subject-Token': token},
|
||||
|
@ -565,14 +565,14 @@ class IdentityTestCase(test_v3.RestfulTestCase):
|
|||
self.head(member_url)
|
||||
r = self.get(collection_url)
|
||||
self.assertValidRoleListResponse(r, ref=self.role)
|
||||
self.assertIn(collection_url, r.body['links']['self'])
|
||||
self.assertIn(collection_url, r.result['links']['self'])
|
||||
|
||||
# FIXME(gyee): this test is no longer valid as user
|
||||
# have no role in the project. Can't get a scoped token
|
||||
#self.delete(member_url)
|
||||
#r = self.get(collection_url)
|
||||
#self.assertValidRoleListResponse(r, expected_length=0)
|
||||
#self.assertIn(collection_url, r.body['links']['self'])
|
||||
#self.assertIn(collection_url, r.result['links']['self'])
|
||||
|
||||
def test_crud_user_domain_role_grants(self):
|
||||
collection_url = (
|
||||
|
@ -587,12 +587,12 @@ class IdentityTestCase(test_v3.RestfulTestCase):
|
|||
self.head(member_url)
|
||||
r = self.get(collection_url)
|
||||
self.assertValidRoleListResponse(r, ref=self.role)
|
||||
self.assertIn(collection_url, r.body['links']['self'])
|
||||
self.assertIn(collection_url, r.result['links']['self'])
|
||||
|
||||
self.delete(member_url)
|
||||
r = self.get(collection_url)
|
||||
self.assertValidRoleListResponse(r, expected_length=0)
|
||||
self.assertIn(collection_url, r.body['links']['self'])
|
||||
self.assertIn(collection_url, r.result['links']['self'])
|
||||
|
||||
def test_crud_group_project_role_grants(self):
|
||||
collection_url = (
|
||||
|
@ -607,12 +607,12 @@ class IdentityTestCase(test_v3.RestfulTestCase):
|
|||
self.head(member_url)
|
||||
r = self.get(collection_url)
|
||||
self.assertValidRoleListResponse(r, ref=self.role)
|
||||
self.assertIn(collection_url, r.body['links']['self'])
|
||||
self.assertIn(collection_url, r.result['links']['self'])
|
||||
|
||||
self.delete(member_url)
|
||||
r = self.get(collection_url)
|
||||
self.assertValidRoleListResponse(r, expected_length=0)
|
||||
self.assertIn(collection_url, r.body['links']['self'])
|
||||
self.assertIn(collection_url, r.result['links']['self'])
|
||||
|
||||
def test_crud_group_domain_role_grants(self):
|
||||
collection_url = (
|
||||
|
@ -627,9 +627,9 @@ class IdentityTestCase(test_v3.RestfulTestCase):
|
|||
self.head(member_url)
|
||||
r = self.get(collection_url)
|
||||
self.assertValidRoleListResponse(r, ref=self.role)
|
||||
self.assertIn(collection_url, r.body['links']['self'])
|
||||
self.assertIn(collection_url, r.result['links']['self'])
|
||||
|
||||
self.delete(member_url)
|
||||
r = self.get(collection_url)
|
||||
self.assertValidRoleListResponse(r, expected_length=0)
|
||||
self.assertIn(collection_url, r.body['links']['self'])
|
||||
self.assertIn(collection_url, r.result['links']['self'])
|
||||
|
|
|
@ -142,7 +142,7 @@ class IdentityTestProtectedCase(test_v3.RestfulTestCase):
|
|||
"""
|
||||
self._set_policy({"identity:list_users": []})
|
||||
r = self.get('/users', auth=self.auth)
|
||||
id_list = self._get_id_list_from_ref_list(r.body.get('users'))
|
||||
id_list = self._get_id_list_from_ref_list(r.result.get('users'))
|
||||
self.assertIn(self.user1['id'], id_list)
|
||||
self.assertIn(self.user2['id'], id_list)
|
||||
self.assertIn(self.user3['id'], id_list)
|
||||
|
@ -160,7 +160,7 @@ class IdentityTestProtectedCase(test_v3.RestfulTestCase):
|
|||
url_by_name = '/users?domain_id=%s' % self.domainB['id']
|
||||
r = self.get(url_by_name, auth=self.auth)
|
||||
# We should get back two users, those in DomainB
|
||||
id_list = self._get_id_list_from_ref_list(r.body.get('users'))
|
||||
id_list = self._get_id_list_from_ref_list(r.result.get('users'))
|
||||
self.assertIn(self.user2['id'], id_list)
|
||||
self.assertIn(self.user3['id'], id_list)
|
||||
|
||||
|
@ -181,8 +181,7 @@ class IdentityTestProtectedCase(test_v3.RestfulTestCase):
|
|||
self._set_policy(new_policy)
|
||||
url_by_name = '/users/%s' % self.user1['id']
|
||||
r = self.get(url_by_name, auth=self.auth)
|
||||
body = r.body
|
||||
self.assertEquals(self.user1['id'], body['user']['id'])
|
||||
self.assertEquals(self.user1['id'], r.result['user']['id'])
|
||||
|
||||
def test_list_users_protected_by_domain(self):
|
||||
"""GET /users?domain_id=mydomain (protected)
|
||||
|
@ -205,7 +204,7 @@ class IdentityTestProtectedCase(test_v3.RestfulTestCase):
|
|||
url_by_name = '/users?domain_id=%s' % self.domainA['id']
|
||||
r = self.get(url_by_name, auth=self.auth)
|
||||
# We should only get back one user, the one in DomainA
|
||||
id_list = self._get_id_list_from_ref_list(r.body.get('users'))
|
||||
id_list = self._get_id_list_from_ref_list(r.result.get('users'))
|
||||
self.assertEqual(len(id_list), 1)
|
||||
self.assertIn(self.user1['id'], id_list)
|
||||
|
||||
|
@ -234,7 +233,7 @@ class IdentityTestProtectedCase(test_v3.RestfulTestCase):
|
|||
url_by_name = '/groups?domain_id=%s' % self.domainA['id']
|
||||
r = self.get(url_by_name, auth=self.auth)
|
||||
# We should only get back two groups, the ones in DomainA
|
||||
id_list = self._get_id_list_from_ref_list(r.body.get('groups'))
|
||||
id_list = self._get_id_list_from_ref_list(r.result.get('groups'))
|
||||
self.assertEqual(len(id_list), 2)
|
||||
self.assertIn(self.group1['id'], id_list)
|
||||
self.assertIn(self.group2['id'], id_list)
|
||||
|
@ -266,7 +265,7 @@ class IdentityTestProtectedCase(test_v3.RestfulTestCase):
|
|||
r = self.get(url_by_name, auth=self.auth)
|
||||
# We should only get back one user, the one in DomainA that matches
|
||||
# the name supplied
|
||||
id_list = self._get_id_list_from_ref_list(r.body.get('groups'))
|
||||
id_list = self._get_id_list_from_ref_list(r.result.get('groups'))
|
||||
self.assertEqual(len(id_list), 1)
|
||||
self.assertIn(self.group2['id'], id_list)
|
||||
|
||||
|
@ -285,21 +284,21 @@ class IdentityTestProtectedCase(test_v3.RestfulTestCase):
|
|||
new_policy = {"identity:list_domains": []}
|
||||
self._set_policy(new_policy)
|
||||
r = self.get('/domains?enabled=0', auth=self.auth)
|
||||
id_list = self._get_id_list_from_ref_list(r.body.get('domains'))
|
||||
id_list = self._get_id_list_from_ref_list(r.result.get('domains'))
|
||||
self.assertEqual(len(id_list), 1)
|
||||
self.assertIn(self.domainC['id'], id_list)
|
||||
|
||||
# Now try a few ways of specifying 'true' when we should get back
|
||||
# the other two domains, plus the default domain
|
||||
r = self.get('/domains?enabled=1', auth=self.auth)
|
||||
id_list = self._get_id_list_from_ref_list(r.body.get('domains'))
|
||||
id_list = self._get_id_list_from_ref_list(r.result.get('domains'))
|
||||
self.assertEqual(len(id_list), 3)
|
||||
self.assertIn(self.domainA['id'], id_list)
|
||||
self.assertIn(self.domainB['id'], id_list)
|
||||
self.assertIn(DEFAULT_DOMAIN_ID, id_list)
|
||||
|
||||
r = self.get('/domains?enabled', auth=self.auth)
|
||||
id_list = self._get_id_list_from_ref_list(r.body.get('domains'))
|
||||
id_list = self._get_id_list_from_ref_list(r.result.get('domains'))
|
||||
self.assertEqual(len(id_list), 3)
|
||||
self.assertIn(self.domainA['id'], id_list)
|
||||
self.assertIn(self.domainB['id'], id_list)
|
||||
|
@ -319,6 +318,6 @@ class IdentityTestProtectedCase(test_v3.RestfulTestCase):
|
|||
|
||||
my_url = '/domains?enableds&name=%s' % self.domainA['name']
|
||||
r = self.get(my_url, auth=self.auth)
|
||||
id_list = self._get_id_list_from_ref_list(r.body.get('domains'))
|
||||
id_list = self._get_id_list_from_ref_list(r.result.get('domains'))
|
||||
self.assertEqual(len(id_list), 1)
|
||||
self.assertIn(self.domainA['id'], id_list)
|
||||
|
|
Loading…
Reference in New Issue