Refactor: Don't hard code the error code

This patch replace the hard coded HTTP error code (400~410)
with the constants.

Change-Id: I952cac73a9713bde4ad757371ca8b4ded93f207e
This commit is contained in:
Dave Chen 2015-07-06 15:25:08 +08:00 committed by Brant Knudson
parent 6c4a73d969
commit a50e23b9b7
20 changed files with 535 additions and 380 deletions

View File

@ -13,6 +13,7 @@
# under the License.
from oslo_serialization import jsonutils
from six.moves import http_client
import webtest
from keystone.auth import controllers as auth_controllers
@ -125,7 +126,8 @@ class RestfulTestCase(tests.TestCase):
"""Ensures that response headers appear as expected."""
self.assertIn('X-Auth-Token', response.headers.get('Vary'))
def assertValidErrorResponse(self, response, expected_status=400):
def assertValidErrorResponse(self, response,
expected_status=http_client.BAD_REQUEST):
"""Verify that the error response is valid.
Subclasses can override this function based on the expected response.
@ -184,7 +186,8 @@ class RestfulTestCase(tests.TestCase):
self._from_content_type(response, content_type=response_content_type)
# we can save some code & improve coverage by always doing this
if method != 'HEAD' and response.status_code >= 400:
if (method != 'HEAD' and
response.status_code >= http_client.BAD_REQUEST):
self.assertValidErrorResponse(response)
# Contains the decoded response.body

View File

@ -15,6 +15,7 @@
import copy
import uuid
from six.moves import http_client
from testtools import matchers
from keystone.tests.unit import test_v3
@ -60,7 +61,7 @@ class EndpointFilterCRUDTestCase(TestExtensionCase):
'/endpoints/%(endpoint_id)s' % {
'project_id': uuid.uuid4().hex,
'endpoint_id': self.endpoint_id},
expected_status=404)
expected_status=http_client.NOT_FOUND)
def test_create_endpoint_project_association_with_invalid_endpoint(self):
"""PUT /OS-EP-FILTER/projects/{project_id}/endpoints/{endpoint_id}
@ -72,7 +73,7 @@ class EndpointFilterCRUDTestCase(TestExtensionCase):
'/endpoints/%(endpoint_id)s' % {
'project_id': self.default_domain_project_id,
'endpoint_id': uuid.uuid4().hex},
expected_status=404)
expected_status=http_client.NOT_FOUND)
def test_create_endpoint_project_association_with_unexpected_body(self):
"""PUT /OS-EP-FILTER/projects/{project_id}/endpoints/{endpoint_id}
@ -109,7 +110,7 @@ class EndpointFilterCRUDTestCase(TestExtensionCase):
'/endpoints/%(endpoint_id)s' % {
'project_id': uuid.uuid4().hex,
'endpoint_id': self.endpoint_id},
expected_status=404)
expected_status=http_client.NOT_FOUND)
def test_check_endpoint_project_association_with_invalid_endpoint(self):
"""HEAD /OS-EP-FILTER/projects/{project_id}/endpoints/{endpoint_id}
@ -122,7 +123,7 @@ class EndpointFilterCRUDTestCase(TestExtensionCase):
'/endpoints/%(endpoint_id)s' % {
'project_id': self.default_domain_project_id,
'endpoint_id': uuid.uuid4().hex},
expected_status=404)
expected_status=http_client.NOT_FOUND)
def test_list_endpoints_associated_with_valid_project(self):
"""GET /OS-EP-FILTER/projects/{project_id}/endpoints
@ -146,7 +147,7 @@ class EndpointFilterCRUDTestCase(TestExtensionCase):
self.put(self.default_request_url)
self.get('/OS-EP-FILTER/projects/%(project_id)s/endpoints' % {
'project_id': uuid.uuid4().hex},
expected_status=404)
expected_status=http_client.NOT_FOUND)
def test_list_projects_associated_with_endpoint(self):
"""GET /OS-EP-FILTER/endpoints/{endpoint_id}/projects
@ -180,7 +181,7 @@ class EndpointFilterCRUDTestCase(TestExtensionCase):
"""
self.get('/OS-EP-FILTER/endpoints/%(endpoint_id)s/projects' %
{'endpoint_id': uuid.uuid4().hex},
expected_status=404)
expected_status=http_client.NOT_FOUND)
def test_remove_endpoint_project_association(self):
"""DELETE /OS-EP-FILTER/projects/{project_id}/endpoints/{endpoint_id}
@ -206,7 +207,7 @@ class EndpointFilterCRUDTestCase(TestExtensionCase):
'/endpoints/%(endpoint_id)s' % {
'project_id': uuid.uuid4().hex,
'endpoint_id': self.endpoint_id},
expected_status=404)
expected_status=http_client.NOT_FOUND)
def test_remove_endpoint_project_association_with_invalid_endpoint(self):
"""DELETE /OS-EP-FILTER/projects/{project_id}/endpoints/{endpoint_id}
@ -219,7 +220,7 @@ class EndpointFilterCRUDTestCase(TestExtensionCase):
'/endpoints/%(endpoint_id)s' % {
'project_id': self.default_domain_project_id,
'endpoint_id': uuid.uuid4().hex},
expected_status=404)
expected_status=http_client.NOT_FOUND)
def test_endpoint_project_association_cleanup_when_project_deleted(self):
self.put(self.default_request_url)
@ -589,7 +590,7 @@ class EndpointGroupCRUDTestCase(TestExtensionCase):
invalid_body['endpoint_group']['filters'] = {'foobar': 'admin'}
self.post(self.DEFAULT_ENDPOINT_GROUP_URL,
body=invalid_body,
expected_status=400)
expected_status=http_client.BAD_REQUEST)
def test_get_endpoint_group(self):
"""GET /OS-EP-FILTER/endpoint_groups/{endpoint_group}
@ -624,7 +625,7 @@ class EndpointGroupCRUDTestCase(TestExtensionCase):
endpoint_group_id = 'foobar'
url = '/OS-EP-FILTER/endpoint_groups/%(endpoint_group_id)s' % {
'endpoint_group_id': endpoint_group_id}
self.get(url, expected_status=404)
self.get(url, expected_status=http_client.NOT_FOUND)
def test_check_endpoint_group(self):
"""HEAD /OS-EP-FILTER/endpoint_groups/{endpoint_group_id}
@ -648,7 +649,7 @@ class EndpointGroupCRUDTestCase(TestExtensionCase):
endpoint_group_id = 'foobar'
url = '/OS-EP-FILTER/endpoint_groups/%(endpoint_group_id)s' % {
'endpoint_group_id': endpoint_group_id}
self.head(url, expected_status=404)
self.head(url, expected_status=http_client.NOT_FOUND)
def test_patch_endpoint_group(self):
"""PATCH /OS-EP-FILTER/endpoint_groups/{endpoint_group}
@ -685,7 +686,7 @@ class EndpointGroupCRUDTestCase(TestExtensionCase):
}
url = '/OS-EP-FILTER/endpoint_groups/%(endpoint_group_id)s' % {
'endpoint_group_id': 'ABC'}
self.patch(url, body=body, expected_status=404)
self.patch(url, body=body, expected_status=http_client.NOT_FOUND)
def test_patch_invalid_endpoint_group(self):
"""PATCH /OS-EP-FILTER/endpoint_groups/{endpoint_group}
@ -707,7 +708,7 @@ class EndpointGroupCRUDTestCase(TestExtensionCase):
self.DEFAULT_ENDPOINT_GROUP_URL, self.DEFAULT_ENDPOINT_GROUP_BODY)
url = '/OS-EP-FILTER/endpoint_groups/%(endpoint_group_id)s' % {
'endpoint_group_id': endpoint_group_id}
self.patch(url, body=body, expected_status=400)
self.patch(url, body=body, expected_status=http_client.BAD_REQUEST)
# Perform a GET call to ensure that the content remains
# the same (as DEFAULT_ENDPOINT_GROUP_BODY) after attempting to update
@ -731,7 +732,7 @@ class EndpointGroupCRUDTestCase(TestExtensionCase):
url = '/OS-EP-FILTER/endpoint_groups/%(endpoint_group_id)s' % {
'endpoint_group_id': endpoint_group_id}
self.delete(url)
self.get(url, expected_status=404)
self.get(url, expected_status=http_client.NOT_FOUND)
def test_delete_invalid_endpoint_group(self):
"""GET /OS-EP-FILTER/endpoint_groups/{endpoint_group}
@ -742,7 +743,7 @@ class EndpointGroupCRUDTestCase(TestExtensionCase):
endpoint_group_id = 'foobar'
url = '/OS-EP-FILTER/endpoint_groups/%(endpoint_group_id)s' % {
'endpoint_group_id': endpoint_group_id}
self.delete(url, expected_status=404)
self.delete(url, expected_status=http_client.NOT_FOUND)
def test_add_endpoint_group_to_project(self):
"""Create a valid endpoint group and project association."""
@ -761,7 +762,7 @@ class EndpointGroupCRUDTestCase(TestExtensionCase):
project_id = uuid.uuid4().hex
url = self._get_project_endpoint_group_url(
endpoint_group_id, project_id)
self.put(url, expected_status=404)
self.put(url, expected_status=http_client.NOT_FOUND)
def test_get_endpoint_group_in_project(self):
"""Test retrieving project endpoint group association."""
@ -787,7 +788,7 @@ class EndpointGroupCRUDTestCase(TestExtensionCase):
project_id = uuid.uuid4().hex
url = self._get_project_endpoint_group_url(
endpoint_group_id, project_id)
self.get(url, expected_status=404)
self.get(url, expected_status=http_client.NOT_FOUND)
def test_list_endpoint_groups_in_project(self):
"""GET /OS-EP-FILTER/projects/{project_id}/endpoint_groups."""
@ -813,7 +814,7 @@ class EndpointGroupCRUDTestCase(TestExtensionCase):
project_id = uuid.uuid4().hex
url = ('/OS-EP-FILTER/projects/%(project_id)s/endpoint_groups' %
{'project_id': project_id})
self.get(url, expected_status=404)
self.get(url, expected_status=http_client.NOT_FOUND)
def test_empty_endpoint_groups_in_project(self):
"""Test when no endpoint groups associated with the project."""
@ -848,7 +849,7 @@ class EndpointGroupCRUDTestCase(TestExtensionCase):
project_id = uuid.uuid4().hex
url = self._get_project_endpoint_group_url(
endpoint_group_id, project_id)
self.head(url, expected_status=404)
self.head(url, expected_status=http_client.NOT_FOUND)
def test_list_endpoint_groups(self):
"""GET /OS-EP-FILTER/endpoint_groups."""
@ -992,7 +993,7 @@ class EndpointGroupCRUDTestCase(TestExtensionCase):
# endpoint group association again
self.delete('/projects/%(project_id)s' % {
'project_id': project['id']})
self.get(url, expected_status=404)
self.get(url, expected_status=http_client.NOT_FOUND)
def test_endpoint_group_project_cleanup_with_endpoint_group(self):
# create endpoint group
@ -1012,7 +1013,7 @@ class EndpointGroupCRUDTestCase(TestExtensionCase):
# now remove the project endpoint group association
self.delete(url)
self.get(url, expected_status=404)
self.get(url, expected_status=http_client.NOT_FOUND)
def test_removing_an_endpoint_group_project(self):
# create an endpoint group
@ -1026,7 +1027,7 @@ class EndpointGroupCRUDTestCase(TestExtensionCase):
# remove the endpoint group project
self.delete(url)
self.get(url, expected_status=404)
self.get(url, expected_status=http_client.NOT_FOUND)
def test_remove_endpoint_group_with_project_association(self):
# create an endpoint group
@ -1044,8 +1045,9 @@ class EndpointGroupCRUDTestCase(TestExtensionCase):
'%(endpoint_group_id)s'
% {'endpoint_group_id': endpoint_group_id})
self.delete(endpoint_group_url)
self.get(endpoint_group_url, expected_status=404)
self.get(project_endpoint_group_url, expected_status=404)
self.get(endpoint_group_url, expected_status=http_client.NOT_FOUND)
self.get(project_endpoint_group_url,
expected_status=http_client.NOT_FOUND)
def _create_valid_endpoint_group(self, url, body):
r = self.post(url, body=body)

View File

@ -14,6 +14,8 @@
import uuid
from six.moves import http_client
from keystone import catalog
from keystone.tests import unit as tests
from keystone.tests.unit.ksfixtures import database
@ -102,16 +104,20 @@ class V2CatalogTestCase(rest.RestfulTestCase):
self.assertNotIn("internalurl", response.result['endpoint'])
def test_endpoint_create_with_null_publicurl(self):
self._endpoint_create(expected_status=400, publicurl=None)
self._endpoint_create(expected_status=http_client.BAD_REQUEST,
publicurl=None)
def test_endpoint_create_with_empty_publicurl(self):
self._endpoint_create(expected_status=400, publicurl='')
self._endpoint_create(expected_status=http_client.BAD_REQUEST,
publicurl='')
def test_endpoint_create_with_null_service_id(self):
self._endpoint_create(expected_status=400, service_id=None)
self._endpoint_create(expected_status=http_client.BAD_REQUEST,
service_id=None)
def test_endpoint_create_with_empty_service_id(self):
self._endpoint_create(expected_status=400, service_id='')
self._endpoint_create(expected_status=http_client.BAD_REQUEST,
service_id='')
def test_endpoint_create_with_valid_url(self):
"""Create endpoint with valid URL should be tested, too."""
@ -146,7 +152,7 @@ class V2CatalogTestCase(rest.RestfulTestCase):
# Case one: publicurl, internalurl and adminurl are
# all invalid
for invalid_url in invalid_urls:
self._endpoint_create(expected_status=400,
self._endpoint_create(expected_status=http_client.BAD_REQUEST,
publicurl=invalid_url,
internalurl=invalid_url,
adminurl=invalid_url)
@ -154,7 +160,7 @@ class V2CatalogTestCase(rest.RestfulTestCase):
# Case two: publicurl, internalurl are invalid
# and adminurl is valid
for invalid_url in invalid_urls:
self._endpoint_create(expected_status=400,
self._endpoint_create(expected_status=http_client.BAD_REQUEST,
publicurl=invalid_url,
internalurl=invalid_url,
adminurl=valid_url)
@ -162,7 +168,7 @@ class V2CatalogTestCase(rest.RestfulTestCase):
# Case three: publicurl, adminurl are invalid
# and internalurl is valid
for invalid_url in invalid_urls:
self._endpoint_create(expected_status=400,
self._endpoint_create(expected_status=http_client.BAD_REQUEST,
publicurl=invalid_url,
internalurl=valid_url,
adminurl=invalid_url)
@ -170,7 +176,7 @@ class V2CatalogTestCase(rest.RestfulTestCase):
# Case four: internalurl, adminurl are invalid
# and publicurl is valid
for invalid_url in invalid_urls:
self._endpoint_create(expected_status=400,
self._endpoint_create(expected_status=http_client.BAD_REQUEST,
publicurl=valid_url,
internalurl=invalid_url,
adminurl=invalid_url)
@ -178,7 +184,7 @@ class V2CatalogTestCase(rest.RestfulTestCase):
# Case five: publicurl is invalid, internalurl
# and adminurl are valid
for invalid_url in invalid_urls:
self._endpoint_create(expected_status=400,
self._endpoint_create(expected_status=http_client.BAD_REQUEST,
publicurl=invalid_url,
internalurl=valid_url,
adminurl=valid_url)
@ -186,7 +192,7 @@ class V2CatalogTestCase(rest.RestfulTestCase):
# Case six: internalurl is invalid, publicurl
# and adminurl are valid
for invalid_url in invalid_urls:
self._endpoint_create(expected_status=400,
self._endpoint_create(expected_status=http_client.BAD_REQUEST,
publicurl=valid_url,
internalurl=invalid_url,
adminurl=valid_url)
@ -194,7 +200,7 @@ class V2CatalogTestCase(rest.RestfulTestCase):
# Case seven: adminurl is invalid, publicurl
# and internalurl are valid
for invalid_url in invalid_urls:
self._endpoint_create(expected_status=400,
self._endpoint_create(expected_status=http_client.BAD_REQUEST,
publicurl=valid_url,
internalurl=valid_url,
adminurl=invalid_url)

View File

@ -16,6 +16,7 @@ import hashlib
import uuid
from oslo_config import cfg
from six.moves import http_client
import webob
from keystone.common import authorization
@ -96,14 +97,14 @@ class JsonBodyMiddlewareTest(tests.TestCase):
content_type='application/json',
method='POST')
resp = middleware.JsonBodyMiddleware(None).process_request(req)
self.assertEqual(400, resp.status_int)
self.assertEqual(http_client.BAD_REQUEST, resp.status_int)
def test_not_dict_body(self):
req = make_request(body='42',
content_type='application/json',
method='POST')
resp = middleware.JsonBodyMiddleware(None).process_request(req)
self.assertEqual(400, resp.status_int)
self.assertEqual(http_client.BAD_REQUEST, resp.status_int)
self.assertTrue('valid JSON object' in resp.json['error']['message'])
def test_no_content_type(self):
@ -118,7 +119,7 @@ class JsonBodyMiddlewareTest(tests.TestCase):
content_type='text/plain',
method='POST')
resp = middleware.JsonBodyMiddleware(None).process_request(req)
self.assertEqual(400, resp.status_int)
self.assertEqual(http_client.BAD_REQUEST, resp.status_int)
def test_unrecognized_content_type_without_body(self):
req = make_request(content_type='text/plain',

View File

@ -14,6 +14,7 @@
import os
from six.moves import http_client
import webtest
from keystone.tests import unit as tests
@ -56,4 +57,4 @@ class TestNoAdminTokenAuth(tests.TestCase):
# If the following does not raise, then the test is successful.
self.admin_app.get(REQ_PATH, headers={'X-Auth-Token': 'NotAdminToken'},
status=401)
status=http_client.UNAUTHORIZED)

View File

@ -19,6 +19,7 @@ import uuid
from keystoneclient.common import cms
from oslo_config import cfg
import six
from six.moves import http_client
from testtools import matchers
from keystone.common import extension as keystone_extension
@ -70,13 +71,13 @@ class CoreApiTests(object):
def test_public_not_found(self):
r = self.public_request(
path='/%s' % uuid.uuid4().hex,
expected_status=404)
expected_status=http_client.NOT_FOUND)
self.assertValidErrorResponse(r)
def test_admin_not_found(self):
r = self.admin_request(
path='/%s' % uuid.uuid4().hex,
expected_status=404)
expected_status=http_client.NOT_FOUND)
self.assertValidErrorResponse(r)
def test_public_multiple_choice(self):
@ -107,11 +108,11 @@ class CoreApiTests(object):
def test_admin_extensions_404(self):
self.admin_request(path='/v2.0/extensions/invalid-extension',
expected_status=404)
expected_status=http_client.NOT_FOUND)
def test_public_osksadm_extension_404(self):
self.public_request(path='/v2.0/extensions/OS-KSADM',
expected_status=404)
expected_status=http_client.NOT_FOUND)
def test_admin_osksadm_extension(self):
r = self.admin_request(path='/v2.0/extensions/OS-KSADM')
@ -170,7 +171,7 @@ class CoreApiTests(object):
'token_id': 'invalid',
},
token=token,
expected_status=404)
expected_status=http_client.NOT_FOUND)
def test_validate_token_service_role(self):
self.md_foobar = self.assignment_api.add_role_to_user_and_project(
@ -204,7 +205,7 @@ class CoreApiTests(object):
r = self.admin_request(
path='/v2.0/tokens/%s' % token,
token=token,
expected_status=401)
expected_status=http_client.UNAUTHORIZED)
def test_validate_token_belongs_to(self):
token = self.get_scoped_token()
@ -306,7 +307,7 @@ class CoreApiTests(object):
},
},
token=token,
expected_status=400)
expected_status=http_client.BAD_REQUEST)
self.assertValidErrorResponse(r)
r = self.admin_request(
@ -321,7 +322,7 @@ class CoreApiTests(object):
},
},
token=token,
expected_status=400)
expected_status=http_client.BAD_REQUEST)
self.assertValidErrorResponse(r)
# Test UPDATE request
@ -338,7 +339,7 @@ class CoreApiTests(object):
},
},
token=token,
expected_status=400)
expected_status=http_client.BAD_REQUEST)
self.assertValidErrorResponse(r)
r = self.admin_request(
@ -351,7 +352,7 @@ class CoreApiTests(object):
},
},
token=token,
expected_status=400)
expected_status=http_client.BAD_REQUEST)
self.assertValidErrorResponse(r)
def test_create_update_user_valid_enabled_type(self):
@ -373,7 +374,8 @@ class CoreApiTests(object):
def test_error_response(self):
"""This triggers assertValidErrorResponse by convention."""
self.public_request(path='/v2.0/tenants', expected_status=401)
self.public_request(path='/v2.0/tenants',
expected_status=http_client.UNAUTHORIZED)
def test_invalid_parameter_error_response(self):
token = self.get_scoped_token()
@ -387,13 +389,13 @@ class CoreApiTests(object):
path='/v2.0/OS-KSADM/services',
body=bad_body,
token=token,
expected_status=400)
expected_status=http_client.BAD_REQUEST)
self.assertValidErrorResponse(res)
res = self.admin_request(method='POST',
path='/v2.0/users',
body=bad_body,
token=token,
expected_status=400)
expected_status=http_client.BAD_REQUEST)
self.assertValidErrorResponse(res)
def _get_user_id(self, r):
@ -552,7 +554,7 @@ class CoreApiTests(object):
},
},
token=token,
expected_status=404)
expected_status=http_client.NOT_FOUND)
def test_update_user_with_invalid_tenant_no_prev_tenant(self):
token = self.get_scoped_token()
@ -584,7 +586,7 @@ class CoreApiTests(object):
},
},
token=token,
expected_status=404)
expected_status=http_client.NOT_FOUND)
def test_update_user_with_old_tenant(self):
token = self.get_scoped_token()
@ -669,13 +671,13 @@ class CoreApiTests(object):
},
},
},
expected_status=401)
expected_status=http_client.UNAUTHORIZED)
self.assertValidErrorResponse(r)
def test_www_authenticate_header(self):
r = self.public_request(
path='/v2.0/tenants',
expected_status=401)
expected_status=http_client.UNAUTHORIZED)
self.assertEqual('Keystone uri="http://localhost"',
r.headers.get('WWW-Authenticate'))
@ -684,7 +686,7 @@ class CoreApiTests(object):
self.config_fixture.config(public_endpoint=test_url)
r = self.public_request(
path='/v2.0/tenants',
expected_status=401)
expected_status=http_client.UNAUTHORIZED)
self.assertEqual('Keystone uri="%s"' % test_url,
r.headers.get('WWW-Authenticate'))
@ -1141,8 +1143,9 @@ class V2TestCase(RestfulTestCase, CoreApiTests, LegacyV2UsernameTests):
return r.result['user'][attribute_name]
def test_service_crud_requires_auth(self):
"""Service CRUD should 401 without an X-Auth-Token (bug 1006822)."""
# values here don't matter because we should 401 before they're checked
"""Service CRUD should return unauthorized without an X-Auth-Token."""
# values here don't matter because it will be unauthorized before
# they're checked (bug 1006822).
service_path = '/v2.0/OS-KSADM/services/%s' % uuid.uuid4().hex
service_body = {
'OS-KSADM:service': {
@ -1153,41 +1156,43 @@ class V2TestCase(RestfulTestCase, CoreApiTests, LegacyV2UsernameTests):
r = self.admin_request(method='GET',
path='/v2.0/OS-KSADM/services',
expected_status=401)
expected_status=http_client.UNAUTHORIZED)
self.assertValidErrorResponse(r)
r = self.admin_request(method='POST',
path='/v2.0/OS-KSADM/services',
body=service_body,
expected_status=401)
expected_status=http_client.UNAUTHORIZED)
self.assertValidErrorResponse(r)
r = self.admin_request(method='GET',
path=service_path,
expected_status=401)
expected_status=http_client.UNAUTHORIZED)
self.assertValidErrorResponse(r)
r = self.admin_request(method='DELETE',
path=service_path,
expected_status=401)
expected_status=http_client.UNAUTHORIZED)
self.assertValidErrorResponse(r)
def test_user_role_list_requires_auth(self):
"""User role list should 401 without an X-Auth-Token (bug 1006815)."""
# values here don't matter because we should 401 before they're checked
"""User role list return unauthorized without an X-Auth-Token."""
# values here don't matter because it will be unauthorized before
# they're checked (bug 1006815).
path = '/v2.0/tenants/%(tenant_id)s/users/%(user_id)s/roles' % {
'tenant_id': uuid.uuid4().hex,
'user_id': uuid.uuid4().hex,
}
r = self.admin_request(path=path, expected_status=401)
r = self.admin_request(path=path,
expected_status=http_client.UNAUTHORIZED)
self.assertValidErrorResponse(r)
def test_fetch_revocation_list_nonadmin_fails(self):
self.admin_request(
method='GET',
path='/v2.0/tokens/revoked',
expected_status=401)
expected_status=http_client.UNAUTHORIZED)
def test_fetch_revocation_list_admin_200(self):
token = self.get_scoped_token()
@ -1278,7 +1283,7 @@ class V2TestCase(RestfulTestCase, CoreApiTests, LegacyV2UsernameTests):
},
},
token=token,
expected_status=400)
expected_status=http_client.BAD_REQUEST)
self.assertValidErrorResponse(r)
# Test UPDATE request
@ -1294,7 +1299,7 @@ class V2TestCase(RestfulTestCase, CoreApiTests, LegacyV2UsernameTests):
},
},
token=token,
expected_status=400)
expected_status=http_client.BAD_REQUEST)
self.assertValidErrorResponse(r)
def test_authenticating_a_user_with_an_OSKSADM_password(self):

View File

@ -22,6 +22,7 @@ import mock
from oslo_config import cfg
from oslo_serialization import jsonutils
from oslo_utils import timeutils
from six.moves import http_client
from six.moves import range
import webob
@ -1032,7 +1033,8 @@ class ClientDrivenTestCase(tests.TestCase):
(new_password, self.user_two['password']))
self.public_server.application(req.environ,
responseobject.start_fake_response)
self.assertEqual(403, responseobject.response_status)
self.assertEqual(http_client.FORBIDDEN,
responseobject.response_status)
self.user_two['password'] = new_password
self.assertRaises(client_exceptions.Unauthorized,

View File

@ -14,6 +14,7 @@ import random
import uuid
from oslo_config import cfg
from six.moves import http_client
from six.moves import range
from keystone.common import controller
@ -75,9 +76,10 @@ class AssignmentTestCase(test_v3.RestfulTestCase,
body={'domain': ref})
self.assertValidDomainResponse(r, ref)
def test_create_domain_400(self):
def test_create_domain_bad_request(self):
"""Call ``POST /domains``."""
self.post('/domains', body={'domain': {}}, expected_status=400)
self.post('/domains', body={'domain': {}},
expected_status=http_client.BAD_REQUEST)
def test_list_domains(self):
"""Call ``GET /domains``."""
@ -133,7 +135,8 @@ class AssignmentTestCase(test_v3.RestfulTestCase,
}
}
self.admin_request(
path='/v2.0/tokens', method='POST', body=body, expected_status=401)
path='/v2.0/tokens', method='POST', body=body,
expected_status=http_client.UNAUTHORIZED)
auth_data = self.build_authentication_request(
user_id=self.user2['id'],
@ -160,21 +163,24 @@ class AssignmentTestCase(test_v3.RestfulTestCase,
}
}
self.admin_request(
path='/v2.0/tokens', method='POST', body=body, expected_status=401)
path='/v2.0/tokens', method='POST', body=body,
expected_status=http_client.UNAUTHORIZED)
# Try looking up in v3 by name and id
auth_data = self.build_authentication_request(
user_id=self.user2['id'],
password=self.user2['password'],
project_id=self.project2['id'])
self.v3_authenticate_token(auth_data, expected_status=401)
self.v3_authenticate_token(auth_data,
expected_status=http_client.UNAUTHORIZED)
auth_data = self.build_authentication_request(
username=self.user2['name'],
user_domain_id=self.domain2['id'],
password=self.user2['password'],
project_id=self.project2['id'])
self.v3_authenticate_token(auth_data, expected_status=401)
self.v3_authenticate_token(auth_data,
expected_status=http_client.UNAUTHORIZED)
def test_delete_enabled_domain_fails(self):
"""Call ``DELETE /domains/{domain_id}`` (when domain enabled)."""
@ -370,7 +376,7 @@ class AssignmentTestCase(test_v3.RestfulTestCase,
# as the domain has already been disabled.
self.head('/auth/tokens',
headers={'x-subject-token': subject_token},
expected_status=404)
expected_status=http_client.NOT_FOUND)
def test_delete_domain_hierarchy(self):
"""Call ``DELETE /domains/{domain_id}``."""
@ -485,14 +491,16 @@ class AssignmentTestCase(test_v3.RestfulTestCase,
body={'project': ref})
self.assertValidProjectResponse(r, ref)
def test_create_project_400(self):
def test_create_project_bad_request(self):
"""Call ``POST /projects``."""
self.post('/projects', body={'project': {}}, expected_status=400)
self.post('/projects', body={'project': {}},
expected_status=http_client.BAD_REQUEST)
def test_create_project_invalid_domain_id(self):
"""Call ``POST /projects``."""
ref = self.new_project_ref(domain_id=uuid.uuid4().hex)
self.post('/projects', body={'project': ref}, expected_status=400)
self.post('/projects', body={'project': ref},
expected_status=http_client.BAD_REQUEST)
def test_create_project_is_domain_not_allowed(self):
"""Call ``POST /projects``.
@ -644,18 +652,20 @@ class AssignmentTestCase(test_v3.RestfulTestCase,
def test_get_project_with_parents_as_list_with_invalid_id(self):
"""Call ``GET /projects/{project_id}?parents_as_list``."""
self.get('/projects/%(project_id)s?parents_as_list' % {
'project_id': None}, expected_status=404)
'project_id': None}, expected_status=http_client.NOT_FOUND)
self.get('/projects/%(project_id)s?parents_as_list' % {
'project_id': uuid.uuid4().hex}, expected_status=404)
'project_id': uuid.uuid4().hex},
expected_status=http_client.NOT_FOUND)
def test_get_project_with_subtree_as_list_with_invalid_id(self):
"""Call ``GET /projects/{project_id}?subtree_as_list``."""
self.get('/projects/%(project_id)s?subtree_as_list' % {
'project_id': None}, expected_status=404)
'project_id': None}, expected_status=http_client.NOT_FOUND)
self.get('/projects/%(project_id)s?subtree_as_list' % {
'project_id': uuid.uuid4().hex}, expected_status=404)
'project_id': uuid.uuid4().hex},
expected_status=http_client.NOT_FOUND)
def test_get_project_with_parents_as_ids(self):
"""Call ``GET /projects/{project_id}?parents_as_ids``."""
@ -766,7 +776,7 @@ class AssignmentTestCase(test_v3.RestfulTestCase,
self.get(
'/projects/%(project_id)s?parents_as_list&parents_as_ids' % {
'project_id': projects[1]['project']['id']},
expected_status=400)
expected_status=http_client.BAD_REQUEST)
def test_get_project_with_subtree_as_ids(self):
"""Call ``GET /projects/{project_id}?subtree_as_ids``.
@ -928,7 +938,7 @@ class AssignmentTestCase(test_v3.RestfulTestCase,
self.get(
'/projects/%(project_id)s?subtree_as_list&subtree_as_ids' % {
'project_id': projects[1]['project']['id']},
expected_status=400)
expected_status=http_client.BAD_REQUEST)
def test_update_project(self):
"""Call ``PATCH /projects/{project_id}``."""
@ -965,7 +975,7 @@ class AssignmentTestCase(test_v3.RestfulTestCase,
'/projects/%(project_id)s' % {
'project_id': leaf_project['id']},
body={'project': leaf_project},
expected_status=403)
expected_status=http_client.FORBIDDEN)
def test_update_project_is_domain_not_allowed(self):
"""Call ``PATCH /projects/{project_id}`` with is_domain.
@ -981,7 +991,7 @@ class AssignmentTestCase(test_v3.RestfulTestCase,
self.patch('/projects/%(project_id)s' % {
'project_id': resp.result['project']['id']},
body={'project': project},
expected_status=400)
expected_status=http_client.BAD_REQUEST)
def test_disable_leaf_project(self):
"""Call ``PATCH /projects/{project_id}``."""
@ -1004,7 +1014,7 @@ class AssignmentTestCase(test_v3.RestfulTestCase,
'/projects/%(project_id)s' % {
'project_id': root_project['id']},
body={'project': root_project},
expected_status=403)
expected_status=http_client.FORBIDDEN)
def test_delete_project(self):
"""Call ``DELETE /projects/{project_id}``
@ -1048,7 +1058,7 @@ class AssignmentTestCase(test_v3.RestfulTestCase,
self.delete(
'/projects/%(project_id)s' % {
'project_id': projects[0]['project']['id']},
expected_status=403)
expected_status=http_client.FORBIDDEN)
# Role CRUD tests
@ -1060,9 +1070,10 @@ class AssignmentTestCase(test_v3.RestfulTestCase,
body={'role': ref})
return self.assertValidRoleResponse(r, ref)
def test_create_role_400(self):
def test_create_role_bad_request(self):
"""Call ``POST /roles``."""
self.post('/roles', body={'role': {}}, expected_status=400)
self.post('/roles', body={'role': {}},
expected_status=http_client.BAD_REQUEST)
def test_list_roles(self):
"""Call ``GET /roles``."""
@ -1132,7 +1143,7 @@ class AssignmentTestCase(test_v3.RestfulTestCase,
"""Grant role on a project to a user that doesn't exist, 404 result.
When grant a role on a project to a user that doesn't exist, the server
returns 404 Not Found for the user.
returns Not Found for the user.
"""
@ -1145,7 +1156,7 @@ class AssignmentTestCase(test_v3.RestfulTestCase,
'collection_url': collection_url,
'role_id': self.role_id}
self.put(member_url, expected_status=404)
self.put(member_url, expected_status=http_client.NOT_FOUND)
def test_crud_user_domain_role_grants(self):
collection_url = (
@ -1184,7 +1195,7 @@ class AssignmentTestCase(test_v3.RestfulTestCase,
'collection_url': collection_url,
'role_id': self.role_id}
self.put(member_url, expected_status=404)
self.put(member_url, expected_status=http_client.NOT_FOUND)
def test_crud_group_project_role_grants(self):
collection_url = (
@ -1224,7 +1235,7 @@ class AssignmentTestCase(test_v3.RestfulTestCase,
'collection_url': collection_url,
'role_id': self.role_id}
self.put(member_url, expected_status=404)
self.put(member_url, expected_status=http_client.NOT_FOUND)
def test_crud_group_domain_role_grants(self):
collection_url = (
@ -1264,7 +1275,7 @@ class AssignmentTestCase(test_v3.RestfulTestCase,
'collection_url': collection_url,
'role_id': self.role_id}
self.put(member_url, expected_status=404)
self.put(member_url, expected_status=http_client.NOT_FOUND)
def _create_new_user_and_assign_role_on_project(self):
"""Create a new user and assign user a role on a project."""
@ -1292,7 +1303,7 @@ class AssignmentTestCase(test_v3.RestfulTestCase,
# Clean up the role assignment
self.delete(member_url, expected_status=204)
# Make sure the role is gone
self.head(member_url, expected_status=404)
self.head(member_url, expected_status=http_client.NOT_FOUND)
def test_delete_user_and_check_role_assignment_fails(self):
"""Call ``DELETE`` on the user and check the role assignment."""
@ -1301,7 +1312,7 @@ class AssignmentTestCase(test_v3.RestfulTestCase,
self.identity_api.delete_user(user['id'])
# We should get a 404 when looking for the user in the identity
# backend because we're not performing a delete operation on the role.
self.head(member_url, expected_status=404)
self.head(member_url, expected_status=http_client.NOT_FOUND)
def test_token_revoked_once_group_role_grant_revoked(self):
"""Test token is revoked when group role grant is revoked
@ -1343,7 +1354,7 @@ class AssignmentTestCase(test_v3.RestfulTestCase,
# validates the same token again; it should not longer be valid.
self.head('/auth/tokens',
headers={'x-subject-token': token},
expected_status=404)
expected_status=http_client.NOT_FOUND)
# Role Assignments tests
@ -1903,24 +1914,24 @@ class RoleAssignmentFailureTestCase(RoleAssignmentBaseTestCase):
def test_get_role_assignments_by_domain_and_project(self):
self.get_role_assignments(domain_id=self.domain_id,
project_id=self.project_id,
expected_status=400)
expected_status=http_client.BAD_REQUEST)
def test_get_role_assignments_by_user_and_group(self):
self.get_role_assignments(user_id=self.default_user_id,
group_id=self.default_group_id,
expected_status=400)
expected_status=http_client.BAD_REQUEST)
def test_get_role_assignments_by_effective_and_inherited(self):
self.config_fixture.config(group='os_inherit', enabled=True)
self.get_role_assignments(domain_id=self.domain_id, effective=True,
inherited_to_projects=True,
expected_status=400)
expected_status=http_client.BAD_REQUEST)
def test_get_role_assignments_by_effective_and_group(self):
self.get_role_assignments(effective=True,
group_id=self.default_group_id,
expected_status=400)
expected_status=http_client.BAD_REQUEST)
class RoleAssignmentDirectTestCase(RoleAssignmentBaseTestCase):
@ -2193,8 +2204,10 @@ class AssignmentInheritanceTestCase(test_v3.RestfulTestCase,
project_id=self.project_id)
# Check the user cannot get a domain nor a project token
self.v3_authenticate_token(domain_auth_data, expected_status=401)
self.v3_authenticate_token(project_auth_data, expected_status=401)
self.v3_authenticate_token(domain_auth_data,
expected_status=http_client.UNAUTHORIZED)
self.v3_authenticate_token(project_auth_data,
expected_status=http_client.UNAUTHORIZED)
# Grant non-inherited role for user on domain
non_inher_ud_link = self.build_role_assignment_link(
@ -2203,7 +2216,8 @@ class AssignmentInheritanceTestCase(test_v3.RestfulTestCase,
# Check the user can get only a domain token
self.v3_authenticate_token(domain_auth_data)
self.v3_authenticate_token(project_auth_data, expected_status=401)
self.v3_authenticate_token(project_auth_data,
expected_status=http_client.UNAUTHORIZED)
# Create inherited role
inherited_role = {'id': uuid.uuid4().hex, 'name': 'inherited'}
@ -2224,13 +2238,15 @@ class AssignmentInheritanceTestCase(test_v3.RestfulTestCase,
# Check the user can only get a domain token
self.v3_authenticate_token(domain_auth_data)
self.v3_authenticate_token(project_auth_data, expected_status=401)
self.v3_authenticate_token(project_auth_data,
expected_status=http_client.UNAUTHORIZED)
# Delete non-inherited grant
self.delete(non_inher_ud_link)
# Check the user cannot get a domain token anymore
self.v3_authenticate_token(domain_auth_data, expected_status=401)
self.v3_authenticate_token(domain_auth_data,
expected_status=http_client.UNAUTHORIZED)
def test_get_token_from_inherited_group_domain_role_grants(self):
# Create a new group and put a new user in it to
@ -2255,8 +2271,10 @@ class AssignmentInheritanceTestCase(test_v3.RestfulTestCase,
project_id=self.project_id)
# Check the user cannot get a domain nor a project token
self.v3_authenticate_token(domain_auth_data, expected_status=401)
self.v3_authenticate_token(project_auth_data, expected_status=401)
self.v3_authenticate_token(domain_auth_data,
expected_status=http_client.UNAUTHORIZED)
self.v3_authenticate_token(project_auth_data,
expected_status=http_client.UNAUTHORIZED)
# Grant non-inherited role for user on domain
non_inher_gd_link = self.build_role_assignment_link(
@ -2265,7 +2283,8 @@ class AssignmentInheritanceTestCase(test_v3.RestfulTestCase,
# Check the user can get only a domain token
self.v3_authenticate_token(domain_auth_data)
self.v3_authenticate_token(project_auth_data, expected_status=401)
self.v3_authenticate_token(project_auth_data,
expected_status=http_client.UNAUTHORIZED)
# Create inherited role
inherited_role = {'id': uuid.uuid4().hex, 'name': 'inherited'}
@ -2286,13 +2305,15 @@ class AssignmentInheritanceTestCase(test_v3.RestfulTestCase,
# Check the user can only get a domain token
self.v3_authenticate_token(domain_auth_data)
self.v3_authenticate_token(project_auth_data, expected_status=401)
self.v3_authenticate_token(project_auth_data,
expected_status=http_client.UNAUTHORIZED)
# Delete non-inherited grant
self.delete(non_inher_gd_link)
# Check the user cannot get a domain token anymore
self.v3_authenticate_token(domain_auth_data, expected_status=401)
self.v3_authenticate_token(domain_auth_data,
expected_status=http_client.UNAUTHORIZED)
def _test_crud_inherited_and_direct_assignment_on_target(self, target_url):
# Create a new role to avoid assignments loaded from sample data
@ -2308,7 +2329,7 @@ class AssignmentInheritanceTestCase(test_v3.RestfulTestCase,
self.put(direct_url)
# Check the direct assignment exists, but the inherited one does not
self.head(direct_url)
self.head(inherited_url, expected_status=404)
self.head(inherited_url, expected_status=http_client.NOT_FOUND)
# Now add the inherited assignment
self.put(inherited_url)
@ -2320,13 +2341,13 @@ class AssignmentInheritanceTestCase(test_v3.RestfulTestCase,
self.delete(inherited_url)
# Check the direct assignment exists, but the inherited one does not
self.head(direct_url)
self.head(inherited_url, expected_status=404)
self.head(inherited_url, expected_status=http_client.NOT_FOUND)
# Now delete the inherited assignment
self.delete(direct_url)
# Check that none of them exist
self.head(direct_url, expected_status=404)
self.head(inherited_url, expected_status=404)
self.head(direct_url, expected_status=http_client.NOT_FOUND)
self.head(inherited_url, expected_status=http_client.NOT_FOUND)
def test_crud_inherited_and_direct_assignment_on_domains(self):
self._test_crud_inherited_and_direct_assignment_on_target(
@ -2801,8 +2822,10 @@ class AssignmentInheritanceTestCase(test_v3.RestfulTestCase,
project_id=leaf_id)
# Check the user cannot get a token on root nor leaf project
self.v3_authenticate_token(root_project_auth_data, expected_status=401)
self.v3_authenticate_token(leaf_project_auth_data, expected_status=401)
self.v3_authenticate_token(root_project_auth_data,
expected_status=http_client.UNAUTHORIZED)
self.v3_authenticate_token(leaf_project_auth_data,
expected_status=http_client.UNAUTHORIZED)
# Grant non-inherited role for user on leaf project
non_inher_up_link = self.build_role_assignment_link(
@ -2811,7 +2834,8 @@ class AssignmentInheritanceTestCase(test_v3.RestfulTestCase,
self.put(non_inher_up_link)
# Check the user can only get a token on leaf project
self.v3_authenticate_token(root_project_auth_data, expected_status=401)
self.v3_authenticate_token(root_project_auth_data,
expected_status=http_client.UNAUTHORIZED)
self.v3_authenticate_token(leaf_project_auth_data)
# Grant inherited role for user on root project
@ -2821,21 +2845,24 @@ class AssignmentInheritanceTestCase(test_v3.RestfulTestCase,
self.put(inher_up_link)
# Check the user still can get a token only on leaf project
self.v3_authenticate_token(root_project_auth_data, expected_status=401)
self.v3_authenticate_token(root_project_auth_data,
expected_status=http_client.UNAUTHORIZED)
self.v3_authenticate_token(leaf_project_auth_data)
# Delete non-inherited grant
self.delete(non_inher_up_link)
# Check the inherited role still applies for leaf project
self.v3_authenticate_token(root_project_auth_data, expected_status=401)
self.v3_authenticate_token(root_project_auth_data,
expected_status=http_client.UNAUTHORIZED)
self.v3_authenticate_token(leaf_project_auth_data)
# Delete inherited grant
self.delete(inher_up_link)
# Check the user cannot get a token on leaf project anymore
self.v3_authenticate_token(leaf_project_auth_data, expected_status=401)
self.v3_authenticate_token(leaf_project_auth_data,
expected_status=http_client.UNAUTHORIZED)
def test_get_token_from_inherited_group_project_role_grants(self):
# Create default scenario
@ -2858,8 +2885,10 @@ class AssignmentInheritanceTestCase(test_v3.RestfulTestCase,
project_id=leaf_id)
# Check the user cannot get a token on root nor leaf project
self.v3_authenticate_token(root_project_auth_data, expected_status=401)
self.v3_authenticate_token(leaf_project_auth_data, expected_status=401)
self.v3_authenticate_token(root_project_auth_data,
expected_status=http_client.UNAUTHORIZED)
self.v3_authenticate_token(leaf_project_auth_data,
expected_status=http_client.UNAUTHORIZED)
# Grant non-inherited role for group on leaf project
non_inher_gp_link = self.build_role_assignment_link(
@ -2868,7 +2897,8 @@ class AssignmentInheritanceTestCase(test_v3.RestfulTestCase,
self.put(non_inher_gp_link)
# Check the user can only get a token on leaf project
self.v3_authenticate_token(root_project_auth_data, expected_status=401)
self.v3_authenticate_token(root_project_auth_data,
expected_status=http_client.UNAUTHORIZED)
self.v3_authenticate_token(leaf_project_auth_data)
# Grant inherited role for group on root project
@ -2878,7 +2908,8 @@ class AssignmentInheritanceTestCase(test_v3.RestfulTestCase,
self.put(inher_gp_link)
# Check the user still can get a token only on leaf project
self.v3_authenticate_token(root_project_auth_data, expected_status=401)
self.v3_authenticate_token(root_project_auth_data,
expected_status=http_client.UNAUTHORIZED)
self.v3_authenticate_token(leaf_project_auth_data)
# Delete no-inherited grant
@ -2891,7 +2922,8 @@ class AssignmentInheritanceTestCase(test_v3.RestfulTestCase,
self.delete(inher_gp_link)
# Check the user cannot get a token on leaf project anymore
self.v3_authenticate_token(leaf_project_auth_data, expected_status=401)
self.v3_authenticate_token(leaf_project_auth_data,
expected_status=http_client.UNAUTHORIZED)
def test_get_role_assignments_for_project_hierarchy(self):
"""Call ``GET /role_assignments``.
@ -3069,10 +3101,10 @@ class AssignmentInheritanceDisabledTestCase(test_v3.RestfulTestCase):
'role_id': role['id']}
collection_url = base_collection_url + '/inherited_to_projects'
self.put(member_url, expected_status=404)
self.head(member_url, expected_status=404)
self.get(collection_url, expected_status=404)
self.delete(member_url, expected_status=404)
self.put(member_url, expected_status=http_client.NOT_FOUND)
self.head(member_url, expected_status=http_client.NOT_FOUND)
self.get(collection_url, expected_status=http_client.NOT_FOUND)
self.delete(member_url, expected_status=http_client.NOT_FOUND)
class AssignmentV3toV2MethodsTestCase(tests.TestCase):

View File

@ -22,6 +22,7 @@ from keystoneclient.common import cms
import mock
from oslo_config import cfg
from oslo_utils import timeutils
from six.moves import http_client
from six.moves import range
from testtools import matchers
from testtools import testcase
@ -141,7 +142,7 @@ class TokenAPITests(object):
path='/v2.0/tokens/%s' % v3_token,
token=CONF.admin_token,
method='GET',
expected_status=401)
expected_status=http_client.UNAUTHORIZED)
def test_v3_v2_intermix_new_default_domain(self):
# If the default_domain_id config option is changed, then should be
@ -199,7 +200,7 @@ class TokenAPITests(object):
method='GET',
path='/v2.0/tokens/%s' % v3_token,
token=CONF.admin_token,
expected_status=401)
expected_status=http_client.UNAUTHORIZED)
def test_v3_v2_intermix_non_default_project_failed(self):
# self.project is in a non-default domain
@ -213,7 +214,7 @@ class TokenAPITests(object):
method='GET',
path='/v2.0/tokens/%s' % v3_token,
token=CONF.admin_token,
expected_status=401)
expected_status=http_client.UNAUTHORIZED)
def test_v3_v2_intermix_non_default_user_failed(self):
self.assignment_api.create_grant(
@ -232,7 +233,7 @@ class TokenAPITests(object):
method='GET',
path='/v2.0/tokens/%s' % v3_token,
token=CONF.admin_token,
expected_status=401)
expected_status=http_client.UNAUTHORIZED)
def test_v3_v2_intermix_domain_scope_failed(self):
self.assignment_api.create_grant(
@ -250,7 +251,7 @@ class TokenAPITests(object):
path='/v2.0/tokens/%s' % v3_token,
token=CONF.admin_token,
method='GET',
expected_status=401)
expected_status=http_client.UNAUTHORIZED)
def test_v3_v2_unscoped_token_intermix(self):
r = self.v3_authenticate_token(self.build_authentication_request(
@ -390,7 +391,7 @@ class TokenAPITests(object):
# Attempting to use the deleted token on v2 should fail.
self.admin_request(
path='/v2.0/tenants', method='GET', token=v2_token,
expected_status=401)
expected_status=http_client.UNAUTHORIZED)
def test_rescoping_token(self):
expires = self.v3_token_data['token']['expires_at']
@ -434,7 +435,7 @@ class AllowRescopeScopedTokenDisabledTests(test_v3.RestfulTestCase):
self.build_authentication_request(
token=self.get_scoped_token(),
project_id=self.project_id),
expected_status=403)
expected_status=http_client.FORBIDDEN)
def _v2_token(self):
body = {
@ -460,7 +461,7 @@ class AllowRescopeScopedTokenDisabledTests(test_v3.RestfulTestCase):
self.admin_request(path='/v2.0/tokens',
method='POST',
body=body,
expected_status=403)
expected_status=http_client.FORBIDDEN)
def test_rescoping_v2_to_v3_disabled(self):
token = self._v2_token()
@ -468,7 +469,7 @@ class AllowRescopeScopedTokenDisabledTests(test_v3.RestfulTestCase):
self.build_authentication_request(
token=token['access']['token']['id'],
project_id=self.project_id),
expected_status=403)
expected_status=http_client.FORBIDDEN)
def test_rescoping_v3_to_v2_disabled(self):
token = {'id': self.get_scoped_token()}
@ -498,7 +499,7 @@ class AllowRescopeScopedTokenDisabledTests(test_v3.RestfulTestCase):
self.build_authentication_request(
token=domain_scoped_token,
project_id=self.project_id),
expected_status=403)
expected_status=http_client.FORBIDDEN)
class TestPKITokenAPIs(test_v3.RestfulTestCase, TokenAPITests):
@ -660,17 +661,21 @@ class TestTokenRevokeSelfAndAdmin(test_v3.RestfulTestCase):
token=user_token)
self.delete('/auth/tokens', headers=headers, expected_status=204,
token=user_token)
# invalid X-Auth-Token and invalid X-Subject-Token (401)
self.head('/auth/tokens', headers=headers, expected_status=401,
# invalid X-Auth-Token and invalid X-Subject-Token
self.head('/auth/tokens', headers=headers,
expected_status=http_client.UNAUTHORIZED,
token=user_token)
# invalid X-Auth-Token and invalid X-Subject-Token (401)
self.delete('/auth/tokens', headers=headers, expected_status=401,
# invalid X-Auth-Token and invalid X-Subject-Token
self.delete('/auth/tokens', headers=headers,
expected_status=http_client.UNAUTHORIZED,
token=user_token)
# valid X-Auth-Token and invalid X-Subject-Token (404)
self.delete('/auth/tokens', headers=headers, expected_status=404,
# valid X-Auth-Token and invalid X-Subject-Token
self.delete('/auth/tokens', headers=headers,
expected_status=http_client.NOT_FOUND,
token=adminA_token)
# valid X-Auth-Token and invalid X-Subject-Token (404)
self.head('/auth/tokens', headers=headers, expected_status=404,
# valid X-Auth-Token and invalid X-Subject-Token
self.head('/auth/tokens', headers=headers,
expected_status=http_client.NOT_FOUND,
token=adminA_token)
def test_adminA_revokes_userA_token(self):
@ -694,14 +699,17 @@ class TestTokenRevokeSelfAndAdmin(test_v3.RestfulTestCase):
token=user_token)
self.delete('/auth/tokens', headers=headers, expected_status=204,
token=adminA_token)
# invalid X-Auth-Token and invalid X-Subject-Token (401)
self.head('/auth/tokens', headers=headers, expected_status=401,
# invalid X-Auth-Token and invalid X-Subject-Token
self.head('/auth/tokens', headers=headers,
expected_status=http_client.UNAUTHORIZED,
token=user_token)
# valid X-Auth-Token and invalid X-Subject-Token (404)
self.delete('/auth/tokens', headers=headers, expected_status=404,
# valid X-Auth-Token and invalid X-Subject-Token
self.delete('/auth/tokens', headers=headers,
expected_status=http_client.NOT_FOUND,
token=adminA_token)
# valid X-Auth-Token and invalid X-Subject-Token (404)
self.head('/auth/tokens', headers=headers, expected_status=404,
# valid X-Auth-Token and invalid X-Subject-Token
self.head('/auth/tokens', headers=headers,
expected_status=http_client.NOT_FOUND,
token=adminA_token)
def test_adminB_fails_revoking_userA_token(self):
@ -729,9 +737,11 @@ class TestTokenRevokeSelfAndAdmin(test_v3.RestfulTestCase):
password=self.userAdminB['password'],
domain_name=self.domainB['name']))
self.head('/auth/tokens', headers=headers, expected_status=403,
self.head('/auth/tokens', headers=headers,
expected_status=http_client.FORBIDDEN,
token=adminB_token)
self.delete('/auth/tokens', headers=headers, expected_status=403,
self.delete('/auth/tokens', headers=headers,
expected_status=http_client.FORBIDDEN,
token=adminB_token)
@ -907,7 +917,7 @@ class TestTokenRevokeById(test_v3.RestfulTestCase):
self.delete(grant_url)
self.head('/auth/tokens',
headers={'X-Subject-Token': token},
expected_status=404)
expected_status=http_client.NOT_FOUND)
def role_data_fixtures(self):
self.projectC = self.new_project_ref(domain_id=self.domainA['id'])
@ -1019,16 +1029,16 @@ class TestTokenRevokeById(test_v3.RestfulTestCase):
# Check the tokens that used role1 is invalid
self.head('/auth/tokens',
headers={'X-Subject-Token': tokenA},
expected_status=404)
expected_status=http_client.NOT_FOUND)
self.head('/auth/tokens',
headers={'X-Subject-Token': tokenB},
expected_status=404)
expected_status=http_client.NOT_FOUND)
self.head('/auth/tokens',
headers={'X-Subject-Token': tokenD},
expected_status=404)
expected_status=http_client.NOT_FOUND)
self.head('/auth/tokens',
headers={'X-Subject-Token': tokenE},
expected_status=404)
expected_status=http_client.NOT_FOUND)
# ...but the one using role2 is still valid
self.head('/auth/tokens',
@ -1086,13 +1096,13 @@ class TestTokenRevokeById(test_v3.RestfulTestCase):
# user should no longer have access to the project
self.head('/auth/tokens',
headers={'X-Subject-Token': token},
expected_status=404)
expected_status=http_client.NOT_FOUND)
self.v3_authenticate_token(
self.build_authentication_request(
user_id=self.user3['id'],
password=self.user3['password'],
project_id=self.projectA['id']),
expected_status=401)
expected_status=http_client.UNAUTHORIZED)
def test_deleting_project_revokes_token(self):
token = self.get_requested_token(
@ -1113,13 +1123,13 @@ class TestTokenRevokeById(test_v3.RestfulTestCase):
# user should no longer have access to the project
self.head('/auth/tokens',
headers={'X-Subject-Token': token},
expected_status=404)
expected_status=http_client.NOT_FOUND)
self.v3_authenticate_token(
self.build_authentication_request(
user_id=self.user3['id'],
password=self.user3['password'],
project_id=self.projectA['id']),
expected_status=401)
expected_status=http_client.UNAUTHORIZED)
def test_deleting_group_grant_revokes_tokens(self):
"""Test deleting a group grant revokes tokens.
@ -1171,15 +1181,15 @@ class TestTokenRevokeById(test_v3.RestfulTestCase):
self.delete(grant_url)
self.head('/auth/tokens',
headers={'X-Subject-Token': token1},
expected_status=404)
expected_status=http_client.NOT_FOUND)
self.head('/auth/tokens',
headers={'X-Subject-Token': token2},
expected_status=404)
expected_status=http_client.NOT_FOUND)
# But user3's token should be invalid too as revocation is done for
# scope role & project
self.head('/auth/tokens',
headers={'X-Subject-Token': token3},
expected_status=404)
expected_status=http_client.NOT_FOUND)
def test_domain_group_role_assignment_maintains_token(self):
"""Test domain-group role assignment maintains existing token.
@ -1251,7 +1261,7 @@ class TestTokenRevokeById(test_v3.RestfulTestCase):
'user_id': self.user1['id']})
self.head('/auth/tokens',
headers={'X-Subject-Token': token1},
expected_status=404)
expected_status=http_client.NOT_FOUND)
# But user2's token should still be valid
self.head('/auth/tokens',
headers={'X-Subject-Token': token2},
@ -1295,13 +1305,13 @@ class TestTokenRevokeById(test_v3.RestfulTestCase):
# authorization for the first user should now fail
self.head('/auth/tokens',
headers={'X-Subject-Token': user1_token},
expected_status=404)
expected_status=http_client.NOT_FOUND)
self.v3_authenticate_token(
self.build_authentication_request(
user_id=self.user1['id'],
password=self.user1['password'],
project_id=self.projectA['id']),
expected_status=401)
expected_status=http_client.UNAUTHORIZED)
# authorization for the second user should still succeed
self.head('/auth/tokens',
@ -1329,7 +1339,7 @@ class TestTokenRevokeById(test_v3.RestfulTestCase):
'/projects/%(project_id)s' % {'project_id': self.projectA['id']})
# Make sure that we get a NotFound(404) when heading that role.
self.head(role_path, expected_status=404)
self.head(role_path, expected_status=http_client.NOT_FOUND)
def get_v2_token(self, token=None, project_id=None):
body = {'auth': {}, }
@ -1361,7 +1371,7 @@ class TestTokenRevokeById(test_v3.RestfulTestCase):
self.head('/auth/tokens',
headers={'X-Subject-Token': token},
expected_status=404)
expected_status=http_client.NOT_FOUND)
def test_revoke_token_from_token(self):
# Test that a scoped token can be requested from an unscoped token,
@ -1393,7 +1403,7 @@ class TestTokenRevokeById(test_v3.RestfulTestCase):
# The project-scoped token is invalidated.
self.head('/auth/tokens',
headers={'X-Subject-Token': project_scoped_token},
expected_status=404)
expected_status=http_client.NOT_FOUND)
# The unscoped token should still be valid.
self.head('/auth/tokens',
@ -1413,7 +1423,7 @@ class TestTokenRevokeById(test_v3.RestfulTestCase):
# The domain-scoped token is invalid.
self.head('/auth/tokens',
headers={'X-Subject-Token': domain_scoped_token},
expected_status=404)
expected_status=http_client.NOT_FOUND)
# The unscoped token should still be valid.
self.head('/auth/tokens',
@ -1442,7 +1452,7 @@ class TestTokenRevokeById(test_v3.RestfulTestCase):
# The project-scoped token is invalidated.
self.head('/auth/tokens',
headers={'X-Subject-Token': project_scoped_token},
expected_status=404)
expected_status=http_client.NOT_FOUND)
# The unscoped token should still be valid.
self.head('/auth/tokens',
@ -1495,7 +1505,7 @@ class TestTokenRevokeByAssignment(TestTokenRevokeById):
# while token for the projectB should not
self.head('/auth/tokens',
headers={'X-Subject-Token': project_token},
expected_status=404)
expected_status=http_client.NOT_FOUND)
revoked_tokens = [
t['id'] for t in self.token_provider_api.list_revoked_tokens()]
# token is in token revocation list
@ -1557,7 +1567,8 @@ class TestTokenRevokeApi(TestTokenRevokeById):
expected_status=200).json_body['token']
self.delete('/auth/tokens', headers=headers, expected_status=204)
self.head('/auth/tokens', headers=headers, expected_status=404)
self.head('/auth/tokens', headers=headers,
expected_status=http_client.NOT_FOUND)
events_response = self.get('/OS-REVOKE/events',
expected_status=200).json_body
self.assertValidRevokedTokenResponse(events_response,
@ -1569,7 +1580,8 @@ class TestTokenRevokeApi(TestTokenRevokeById):
response = self.get('/auth/tokens', headers=headers,
expected_status=200).json_body['token']
self.delete('/auth/tokens', headers=headers, expected_status=204)
self.head('/auth/tokens', headers=headers, expected_status=404)
self.head('/auth/tokens', headers=headers,
expected_status=http_client.NOT_FOUND)
events_response = self.get('/OS-REVOKE/events',
expected_status=200).json_body
@ -1578,7 +1590,8 @@ class TestTokenRevokeApi(TestTokenRevokeById):
audit_id=response['audit_ids'][0])
def test_revoke_by_id_false_410(self):
self.get('/auth/tokens/OS-PKI/revoked', expected_status=410)
self.get('/auth/tokens/OS-PKI/revoked',
expected_status=http_client.GONE)
def test_list_delete_project_shows_in_event_list(self):
self.role_data_fixtures()
@ -1662,7 +1675,8 @@ class TestTokenRevokeApi(TestTokenRevokeById):
self.assertEventDataInList(
events,
audit_id=token2['audit_ids'][1])
self.head('/auth/tokens', headers=headers, expected_status=404)
self.head('/auth/tokens', headers=headers,
expected_status=http_client.NOT_FOUND)
self.head('/auth/tokens', headers=headers2, expected_status=200)
self.head('/auth/tokens', headers=headers3, expected_status=200)
@ -2002,7 +2016,7 @@ class TestAuth(test_v3.RestfulTestCase):
self._check_disabled_endpoint_result(r.result['token']['catalog'],
disabled_endpoint_id)
def test_project_id_scoped_token_with_user_id_401(self):
def test_project_id_scoped_token_with_user_id_unauthorized(self):
project = self.new_project_ref(domain_id=self.domain_id)
self.resource_api.create_project(project['id'], project)
@ -2010,7 +2024,8 @@ class TestAuth(test_v3.RestfulTestCase):
user_id=self.user['id'],
password=self.user['password'],
project_id=project['id'])
self.v3_authenticate_token(auth_data, expected_status=401)
self.v3_authenticate_token(auth_data,
expected_status=http_client.UNAUTHORIZED)
def test_user_and_group_roles_scoped_token(self):
"""Test correct roles are returned in scoped token.
@ -2346,7 +2361,8 @@ class TestAuth(test_v3.RestfulTestCase):
user_id=self.user['id'],
password=self.user['password'],
domain_id=self.domain['id'])
self.v3_authenticate_token(auth_data, expected_status=401)
self.v3_authenticate_token(auth_data,
expected_status=http_client.UNAUTHORIZED)
def test_auth_with_id(self):
auth_data = self.build_authentication_request(
@ -2395,34 +2411,39 @@ class TestAuth(test_v3.RestfulTestCase):
auth_data = self.build_authentication_request(
user_id=uuid.uuid4().hex,
password=self.user['password'])
self.v3_authenticate_token(auth_data, expected_status=401)
self.v3_authenticate_token(auth_data,
expected_status=http_client.UNAUTHORIZED)
def test_invalid_user_name(self):
auth_data = self.build_authentication_request(
username=uuid.uuid4().hex,
user_domain_id=self.domain['id'],
password=self.user['password'])
self.v3_authenticate_token(auth_data, expected_status=401)
self.v3_authenticate_token(auth_data,
expected_status=http_client.UNAUTHORIZED)
def test_invalid_domain_id(self):
auth_data = self.build_authentication_request(
username=self.user['name'],
user_domain_id=uuid.uuid4().hex,
password=self.user['password'])
self.v3_authenticate_token(auth_data, expected_status=401)
self.v3_authenticate_token(auth_data,
expected_status=http_client.UNAUTHORIZED)
def test_invalid_domain_name(self):
auth_data = self.build_authentication_request(
username=self.user['name'],
user_domain_name=uuid.uuid4().hex,
password=self.user['password'])
self.v3_authenticate_token(auth_data, expected_status=401)
self.v3_authenticate_token(auth_data,
expected_status=http_client.UNAUTHORIZED)
def test_invalid_password(self):
auth_data = self.build_authentication_request(
user_id=self.user['id'],
password=uuid.uuid4().hex)
self.v3_authenticate_token(auth_data, expected_status=401)
self.v3_authenticate_token(auth_data,
expected_status=http_client.UNAUTHORIZED)
def test_remote_user_no_realm(self):
api = auth.controllers.Auth()
@ -2588,7 +2609,8 @@ class TestAuth(test_v3.RestfulTestCase):
user_id=user['id'],
password='password')
self.v3_authenticate_token(auth_data, expected_status=401)
self.v3_authenticate_token(auth_data,
expected_status=http_client.UNAUTHORIZED)
def test_disabled_default_project_result_in_unscoped_token(self):
# create a disabled project to work with
@ -2666,7 +2688,8 @@ class TestAuth(test_v3.RestfulTestCase):
user_id=self.user['id'],
password=self.user['password'],
project_id=project['id'])
self.v3_authenticate_token(auth_data, expected_status=401)
self.v3_authenticate_token(auth_data,
expected_status=http_client.UNAUTHORIZED)
# user should not be able to auth with project_name & domain
auth_data = self.build_authentication_request(
@ -2674,7 +2697,8 @@ class TestAuth(test_v3.RestfulTestCase):
password=self.user['password'],
project_name=project['name'],
project_domain_id=domain['id'])
self.v3_authenticate_token(auth_data, expected_status=401)
self.v3_authenticate_token(auth_data,
expected_status=http_client.UNAUTHORIZED)
def test_auth_methods_with_different_identities_fails(self):
# get the token for a user. This is self.user which is different from
@ -2686,7 +2710,8 @@ class TestAuth(test_v3.RestfulTestCase):
token=token,
user_id=self.default_domain_user['id'],
password=self.default_domain_user['password'])
self.v3_authenticate_token(auth_data, expected_status=401)
self.v3_authenticate_token(auth_data,
expected_status=http_client.UNAUTHORIZED)
class TestAuthJSONExternal(test_v3.RestfulTestCase):
@ -2712,15 +2737,18 @@ class TestTrustOptional(test_v3.RestfulTestCase):
self.config_fixture.config(group='trust', enabled=False)
def test_trusts_404(self):
self.get('/OS-TRUST/trusts', body={'trust': {}}, expected_status=404)
self.post('/OS-TRUST/trusts', body={'trust': {}}, expected_status=404)
self.get('/OS-TRUST/trusts', body={'trust': {}},
expected_status=http_client.NOT_FOUND)
self.post('/OS-TRUST/trusts', body={'trust': {}},
expected_status=http_client.NOT_FOUND)
def test_auth_with_scope_in_trust_403(self):
def test_auth_with_scope_in_trust_forbidden(self):
auth_data = self.build_authentication_request(
user_id=self.user['id'],
password=self.user['password'],
trust_id=uuid.uuid4().hex)
self.v3_authenticate_token(auth_data, expected_status=403)
self.v3_authenticate_token(auth_data,
expected_status=http_client.FORBIDDEN)
class TestTrustRedelegation(test_v3.RestfulTestCase):
@ -2804,7 +2832,7 @@ class TestTrustRedelegation(test_v3.RestfulTestCase):
self.post('/OS-TRUST/trusts',
body={'trust': self.chained_trust_ref},
token=trust_token,
expected_status=403)
expected_status=http_client.FORBIDDEN)
def test_modified_redelegation_count_error(self):
r = self.post('/OS-TRUST/trusts',
@ -2820,14 +2848,14 @@ class TestTrustRedelegation(test_v3.RestfulTestCase):
self.post('/OS-TRUST/trusts',
body={'trust': self.chained_trust_ref},
token=trust_token,
expected_status=403)
expected_status=http_client.FORBIDDEN)
def test_max_redelegation_count_constraint(self):
incorrect = CONF.trust.max_redelegation_count + 1
self.redelegated_trust_ref['redelegation_count'] = incorrect
self.post('/OS-TRUST/trusts',
body={'trust': self.redelegated_trust_ref},
expected_status=403)
expected_status=http_client.FORBIDDEN)
def test_redelegation_expiry(self):
r = self.post('/OS-TRUST/trusts',
@ -2847,7 +2875,7 @@ class TestTrustRedelegation(test_v3.RestfulTestCase):
self.post('/OS-TRUST/trusts',
body={'trust': too_long_live_chained_trust_ref},
token=trust_token,
expected_status=403)
expected_status=http_client.FORBIDDEN)
def test_redelegation_remaining_uses(self):
r = self.post('/OS-TRUST/trusts',
@ -2862,7 +2890,7 @@ class TestTrustRedelegation(test_v3.RestfulTestCase):
self.post('/OS-TRUST/trusts',
body={'trust': self.chained_trust_ref},
token=trust_token,
expected_status=400)
expected_status=http_client.BAD_REQUEST)
def test_roles_subset(self):
# Build second role
@ -2949,7 +2977,7 @@ class TestTrustRedelegation(test_v3.RestfulTestCase):
self.post('/OS-TRUST/trusts',
body={'trust': self.chained_trust_ref},
token=trust_token,
expected_status=403)
expected_status=http_client.FORBIDDEN)
def test_redelegation_terminator(self):
r = self.post('/OS-TRUST/trusts',
@ -2977,7 +3005,7 @@ class TestTrustRedelegation(test_v3.RestfulTestCase):
self.post('/OS-TRUST/trusts',
body={'trust': ref},
token=trust_token,
expected_status=403)
expected_status=http_client.FORBIDDEN)
class TestTrustChain(test_v3.RestfulTestCase):
@ -3088,7 +3116,8 @@ class TestTrustChain(test_v3.RestfulTestCase):
expected_status=204)
headers = {'X-Subject-Token': self.last_token}
self.head('/auth/tokens', headers=headers, expected_status=404)
self.head('/auth/tokens', headers=headers,
expected_status=http_client.NOT_FOUND)
self.assert_trust_tokens_revoked(self.trust_chain[0]['id'])
def test_delete_broken_chain(self):
@ -3111,7 +3140,8 @@ class TestTrustChain(test_v3.RestfulTestCase):
auth_data = self.build_authentication_request(
token=self.last_token,
trust_id=self.trust_chain[-1]['id'])
self.v3_authenticate_token(auth_data, expected_status=404)
self.v3_authenticate_token(auth_data,
expected_status=http_client.NOT_FOUND)
def test_intermediate_user_disabled(self):
self.assert_user_authenticate(self.user_chain[0])
@ -3123,7 +3153,8 @@ class TestTrustChain(test_v3.RestfulTestCase):
# Bypass policy enforcement
with mock.patch.object(rules, 'enforce', return_value=True):
headers = {'X-Subject-Token': self.last_token}
self.head('/auth/tokens', headers=headers, expected_status=403)
self.head('/auth/tokens', headers=headers,
expected_status=http_client.FORBIDDEN)
def test_intermediate_user_deleted(self):
self.assert_user_authenticate(self.user_chain[0])
@ -3133,7 +3164,8 @@ class TestTrustChain(test_v3.RestfulTestCase):
# Bypass policy enforcement
with mock.patch.object(rules, 'enforce', return_value=True):
headers = {'X-Subject-Token': self.last_token}
self.head('/auth/tokens', headers=headers, expected_status=403)
self.head('/auth/tokens', headers=headers,
expected_status=http_client.FORBIDDEN)
class TestTrustAuth(test_v3.RestfulTestCase):
@ -3159,9 +3191,10 @@ class TestTrustAuth(test_v3.RestfulTestCase):
self.trustee_user['password'] = password
self.trustee_user_id = self.trustee_user['id']
def test_create_trust_400(self):
def test_create_trust_bad_request(self):
# The server returns a 403 Forbidden rather than a 400, see bug 1133435
self.post('/OS-TRUST/trusts', body={'trust': {}}, expected_status=403)
self.post('/OS-TRUST/trusts', body={'trust': {}},
expected_status=http_client.FORBIDDEN)
def test_create_unscoped_trust(self):
ref = self.new_trust_ref(
@ -3175,7 +3208,8 @@ class TestTrustAuth(test_v3.RestfulTestCase):
trustor_user_id=self.user_id,
trustee_user_id=self.trustee_user_id,
project_id=self.project_id)
self.post('/OS-TRUST/trusts', body={'trust': ref}, expected_status=403)
self.post('/OS-TRUST/trusts', body={'trust': ref},
expected_status=http_client.FORBIDDEN)
def _initialize_test_consume_trust(self, count):
# Make sure remaining_uses is decremented as we consume the trust
@ -3219,13 +3253,14 @@ class TestTrustAuth(test_v3.RestfulTestCase):
# No more uses, the trust is made unavailable
self.get(
'/OS-TRUST/trusts/%(trust_id)s' % {'trust_id': trust['id']},
expected_status=404)
expected_status=http_client.NOT_FOUND)
# this time we can't get a trust token
auth_data = self.build_authentication_request(
user_id=self.trustee_user['id'],
password=self.trustee_user['password'],
trust_id=trust['id'])
self.v3_authenticate_token(auth_data, expected_status=401)
self.v3_authenticate_token(auth_data,
expected_status=http_client.UNAUTHORIZED)
def test_create_trust_with_bad_values_for_remaining_uses(self):
# negative values for the remaining_uses parameter are forbidden
@ -3245,7 +3280,7 @@ class TestTrustAuth(test_v3.RestfulTestCase):
role_ids=[self.role_id])
self.post('/OS-TRUST/trusts',
body={'trust': ref},
expected_status=400)
expected_status=http_client.BAD_REQUEST)
def test_invalid_trust_request_without_impersonation(self):
ref = self.new_trust_ref(
@ -3258,7 +3293,7 @@ class TestTrustAuth(test_v3.RestfulTestCase):
self.post('/OS-TRUST/trusts',
body={'trust': ref},
expected_status=400)
expected_status=http_client.BAD_REQUEST)
def test_invalid_trust_request_without_trustee(self):
ref = self.new_trust_ref(
@ -3271,7 +3306,7 @@ class TestTrustAuth(test_v3.RestfulTestCase):
self.post('/OS-TRUST/trusts',
body={'trust': ref},
expected_status=400)
expected_status=http_client.BAD_REQUEST)
def test_create_unlimited_use_trust(self):
# by default trusts are unlimited in terms of tokens that can be
@ -3343,7 +3378,7 @@ class TestTrustAuth(test_v3.RestfulTestCase):
self.patch(
'/OS-TRUST/trusts/%(trust_id)s' % {'trust_id': trust['id']},
body={'trust': ref},
expected_status=404)
expected_status=http_client.NOT_FOUND)
self.delete(
'/OS-TRUST/trusts/%(trust_id)s' % {'trust_id': trust['id']},
@ -3351,7 +3386,7 @@ class TestTrustAuth(test_v3.RestfulTestCase):
self.get(
'/OS-TRUST/trusts/%(trust_id)s' % {'trust_id': trust['id']},
expected_status=404)
expected_status=http_client.NOT_FOUND)
def test_create_trust_trustee_404(self):
ref = self.new_trust_ref(
@ -3359,7 +3394,8 @@ class TestTrustAuth(test_v3.RestfulTestCase):
trustee_user_id=uuid.uuid4().hex,
project_id=self.project_id,
role_ids=[self.role_id])
self.post('/OS-TRUST/trusts', body={'trust': ref}, expected_status=404)
self.post('/OS-TRUST/trusts', body={'trust': ref},
expected_status=http_client.NOT_FOUND)
def test_create_trust_trustor_trustee_backwards(self):
ref = self.new_trust_ref(
@ -3367,7 +3403,8 @@ class TestTrustAuth(test_v3.RestfulTestCase):
trustee_user_id=self.user_id,
project_id=self.project_id,
role_ids=[self.role_id])
self.post('/OS-TRUST/trusts', body={'trust': ref}, expected_status=403)
self.post('/OS-TRUST/trusts', body={'trust': ref},
expected_status=http_client.FORBIDDEN)
def test_create_trust_project_404(self):
ref = self.new_trust_ref(
@ -3375,7 +3412,8 @@ class TestTrustAuth(test_v3.RestfulTestCase):
trustee_user_id=self.trustee_user_id,
project_id=uuid.uuid4().hex,
role_ids=[self.role_id])
self.post('/OS-TRUST/trusts', body={'trust': ref}, expected_status=404)
self.post('/OS-TRUST/trusts', body={'trust': ref},
expected_status=http_client.NOT_FOUND)
def test_create_trust_role_id_404(self):
ref = self.new_trust_ref(
@ -3383,7 +3421,8 @@ class TestTrustAuth(test_v3.RestfulTestCase):
trustee_user_id=self.trustee_user_id,
project_id=self.project_id,
role_ids=[uuid.uuid4().hex])
self.post('/OS-TRUST/trusts', body={'trust': ref}, expected_status=404)
self.post('/OS-TRUST/trusts', body={'trust': ref},
expected_status=http_client.NOT_FOUND)
def test_create_trust_role_name_404(self):
ref = self.new_trust_ref(
@ -3391,7 +3430,8 @@ class TestTrustAuth(test_v3.RestfulTestCase):
trustee_user_id=self.trustee_user_id,
project_id=self.project_id,
role_names=[uuid.uuid4().hex])
self.post('/OS-TRUST/trusts', body={'trust': ref}, expected_status=404)
self.post('/OS-TRUST/trusts', body={'trust': ref},
expected_status=http_client.NOT_FOUND)
def test_v3_v2_intermix_trustor_not_in_default_domain_failed(self):
ref = self.new_trust_ref(
@ -3419,7 +3459,7 @@ class TestTrustAuth(test_v3.RestfulTestCase):
path = '/v2.0/tokens/%s' % (token)
self.admin_request(
path=path, token=CONF.admin_token,
method='GET', expected_status=401)
method='GET', expected_status=http_client.UNAUTHORIZED)
def test_v3_v2_intermix_trustor_not_in_default_domaini_failed(self):
ref = self.new_trust_ref(
@ -3452,7 +3492,7 @@ class TestTrustAuth(test_v3.RestfulTestCase):
path = '/v2.0/tokens/%s' % (token)
self.admin_request(
path=path, token=CONF.admin_token,
method='GET', expected_status=401)
method='GET', expected_status=http_client.UNAUTHORIZED)
def test_v3_v2_intermix_project_not_in_default_domaini_failed(self):
# create a trustee in default domain to delegate stuff to
@ -3492,7 +3532,7 @@ class TestTrustAuth(test_v3.RestfulTestCase):
path = '/v2.0/tokens/%s' % (token)
self.admin_request(
path=path, token=CONF.admin_token,
method='GET', expected_status=401)
method='GET', expected_status=http_client.UNAUTHORIZED)
def test_v3_v2_intermix(self):
# create a trustee in default domain to delegate stuff to
@ -3624,7 +3664,7 @@ class TestTrustAuth(test_v3.RestfulTestCase):
self.post('/OS-TRUST/trusts',
body={'trust': ref},
token=trust_token,
expected_status=403)
expected_status=http_client.FORBIDDEN)
def test_trust_deleted_grant(self):
# create a new role
@ -3662,7 +3702,8 @@ class TestTrustAuth(test_v3.RestfulTestCase):
user_id=self.trustee_user['id'],
password=self.trustee_user['password'],
trust_id=trust['id'])
r = self.v3_authenticate_token(auth_data, expected_status=403)
r = self.v3_authenticate_token(auth_data,
expected_status=http_client.FORBIDDEN)
def test_trust_chained(self):
"""Test that a trust token can't be used to execute another trust.
@ -3730,7 +3771,8 @@ class TestTrustAuth(test_v3.RestfulTestCase):
auth_data = self.build_authentication_request(
token=trust_token,
trust_id=trust1['id'])
r = self.v3_authenticate_token(auth_data, expected_status=403)
r = self.v3_authenticate_token(auth_data,
expected_status=http_client.FORBIDDEN)
def assertTrustTokensRevoked(self, trust_id):
revocation_response = self.get('/OS-REVOKE/events',
@ -3766,7 +3808,8 @@ class TestTrustAuth(test_v3.RestfulTestCase):
'trust_id': trust_id},
expected_status=204)
headers = {'X-Subject-Token': trust_token}
self.head('/auth/tokens', headers=headers, expected_status=404)
self.head('/auth/tokens', headers=headers,
expected_status=http_client.NOT_FOUND)
self.assertTrustTokensRevoked(trust_id)
def disable_user(self, user):
@ -3798,7 +3841,8 @@ class TestTrustAuth(test_v3.RestfulTestCase):
user_id=self.trustee_user['id'],
password=self.trustee_user['password'],
trust_id=trust['id'])
self.v3_authenticate_token(auth_data, expected_status=403)
self.v3_authenticate_token(auth_data,
expected_status=http_client.FORBIDDEN)
def test_trust_get_token_fails_if_trustee_disabled(self):
ref = self.new_trust_ref(
@ -3825,7 +3869,8 @@ class TestTrustAuth(test_v3.RestfulTestCase):
user_id=self.trustee_user['id'],
password=self.trustee_user['password'],
trust_id=trust['id'])
self.v3_authenticate_token(auth_data, expected_status=401)
self.v3_authenticate_token(auth_data,
expected_status=http_client.UNAUTHORIZED)
def test_delete_trust(self):
ref = self.new_trust_ref(
@ -3846,17 +3891,18 @@ class TestTrustAuth(test_v3.RestfulTestCase):
self.get('/OS-TRUST/trusts/%(trust_id)s' % {
'trust_id': trust['id']},
expected_status=404)
expected_status=http_client.NOT_FOUND)
self.get('/OS-TRUST/trusts/%(trust_id)s' % {
'trust_id': trust['id']},
expected_status=404)
expected_status=http_client.NOT_FOUND)
auth_data = self.build_authentication_request(
user_id=self.trustee_user['id'],
password=self.trustee_user['password'],
trust_id=trust['id'])
self.v3_authenticate_token(auth_data, expected_status=401)
self.v3_authenticate_token(auth_data,
expected_status=http_client.UNAUTHORIZED)
def test_list_trusts(self):
ref = self.new_trust_ref(
@ -3918,7 +3964,7 @@ class TestTrustAuth(test_v3.RestfulTestCase):
expected_status=200))
self.get('/OS-TRUST/trusts?trustor_user_id=%s' %
self.user_id, expected_status=401,
self.user_id, expected_status=http_client.UNAUTHORIZED,
token=trust_token)
def test_trustee_can_do_role_ops(self):
@ -3977,7 +4023,8 @@ class TestTrustAuth(test_v3.RestfulTestCase):
user_id=self.default_domain_user['id'],
password=self.default_domain_user['password'],
trust_id=trust_id)
self.v3_authenticate_token(auth_data, expected_status=403)
self.v3_authenticate_token(auth_data,
expected_status=http_client.FORBIDDEN)
r = self.get('/OS-TRUST/trusts/%s' % trust_id)
self.assertEqual(3, r.result.get('trust').get('remaining_uses'))
@ -4075,7 +4122,7 @@ class TestAuthSpecificData(test_v3.RestfulTestCase):
user_id=self.user['id'],
password=self.user['password'],
domain_id=self.domain['id']),
expected_status=403)
expected_status=http_client.FORBIDDEN)
def test_get_catalog_unscoped_token(self):
"""Call ``GET /auth/catalog`` with an unscoped token."""
@ -4084,14 +4131,14 @@ class TestAuthSpecificData(test_v3.RestfulTestCase):
auth=self.build_authentication_request(
user_id=self.default_domain_user['id'],
password=self.default_domain_user['password']),
expected_status=403)
expected_status=http_client.FORBIDDEN)
def test_get_catalog_no_token(self):
"""Call ``GET /auth/catalog`` without a token."""
self.get(
'/auth/catalog',
noauth=True,
expected_status=401)
expected_status=http_client.UNAUTHORIZED)
def test_get_projects_project_scoped_token(self):
r = self.get('/auth/projects', expected_status=200)
@ -4190,13 +4237,15 @@ class TestFernetTokenProvider(test_v3.RestfulTestCase):
unscoped_token = self._get_unscoped_token()
tampered_token = (unscoped_token[:50] + uuid.uuid4().hex +
unscoped_token[50 + 32:])
self._validate_token(tampered_token, expected_status=404)
self._validate_token(tampered_token,
expected_status=http_client.NOT_FOUND)
def test_revoke_unscoped_token(self):
unscoped_token = self._get_unscoped_token()
self._validate_token(unscoped_token)
self._revoke_token(unscoped_token)
self._validate_token(unscoped_token, expected_status=404)
self._validate_token(unscoped_token,
expected_status=http_client.NOT_FOUND)
def test_unscoped_token_is_invalid_after_disabling_user(self):
unscoped_token = self._get_unscoped_token()
@ -4270,13 +4319,15 @@ class TestFernetTokenProvider(test_v3.RestfulTestCase):
project_scoped_token = self._get_project_scoped_token()
tampered_token = (project_scoped_token[:50] + uuid.uuid4().hex +
project_scoped_token[50 + 32:])
self._validate_token(tampered_token, expected_status=404)
self._validate_token(tampered_token,
expected_status=http_client.NOT_FOUND)
def test_revoke_project_scoped_token(self):
project_scoped_token = self._get_project_scoped_token()
self._validate_token(project_scoped_token)
self._revoke_token(project_scoped_token)
self._validate_token(project_scoped_token, expected_status=404)
self._validate_token(project_scoped_token,
expected_status=http_client.NOT_FOUND)
def test_project_scoped_token_is_invalid_after_disabling_user(self):
project_scoped_token = self._get_project_scoped_token()
@ -4378,7 +4429,8 @@ class TestFernetTokenProvider(test_v3.RestfulTestCase):
# Get a trust scoped token
tampered_token = (trust_scoped_token[:50] + uuid.uuid4().hex +
trust_scoped_token[50 + 32:])
self._validate_token(tampered_token, expected_status=404)
self._validate_token(tampered_token,
expected_status=http_client.NOT_FOUND)
def test_revoke_trust_scoped_token(self):
trustee_user, trust = self._create_trust()
@ -4386,7 +4438,8 @@ class TestFernetTokenProvider(test_v3.RestfulTestCase):
# Validate a trust scoped token
self._validate_token(trust_scoped_token)
self._revoke_token(trust_scoped_token)
self._validate_token(trust_scoped_token, expected_status=404)
self._validate_token(trust_scoped_token,
expected_status=http_client.NOT_FOUND)
def test_trust_scoped_token_is_invalid_after_disabling_trustee(self):
trustee_user, trust = self._create_trust()
@ -4460,7 +4513,7 @@ class TestFernetTokenProvider(test_v3.RestfulTestCase):
self.token_provider_api.validate_token,
trust_scoped_token)
def test_v2_validate_unscoped_token_returns_401(self):
def test_v2_validate_unscoped_token_returns_unauthorized(self):
"""Test raised exception when validating unscoped token.
Test that validating an unscoped token in v2.0 of a v3 user of a
@ -4471,7 +4524,7 @@ class TestFernetTokenProvider(test_v3.RestfulTestCase):
self.token_provider_api.validate_v2_token,
unscoped_token)
def test_v2_validate_domain_scoped_token_returns_401(self):
def test_v2_validate_domain_scoped_token_returns_unauthorized(self):
"""Test raised exception when validating a domain scoped token.
Test that validating an domain scoped token in v2.0

View File

@ -15,6 +15,7 @@
import copy
import uuid
from six.moves import http_client
from testtools import matchers
from keystone import catalog
@ -184,7 +185,7 @@ class CatalogTestCase(test_v3.RestfulTestCase):
self.put(
'/regions/%s' % uuid.uuid4().hex,
body={'region': ref},
expected_status=400)
expected_status=http_client.BAD_REQUEST)
def test_list_regions(self):
"""Call ``GET /regions``."""
@ -326,19 +327,22 @@ class CatalogTestCase(test_v3.RestfulTestCase):
"""Call ``POST /services``."""
ref = self.new_service_ref()
ref['enabled'] = 'True'
self.post('/services', body={'service': ref}, expected_status=400)
self.post('/services', body={'service': ref},
expected_status=http_client.BAD_REQUEST)
def test_create_service_enabled_str_false(self):
"""Call ``POST /services``."""
ref = self.new_service_ref()
ref['enabled'] = 'False'
self.post('/services', body={'service': ref}, expected_status=400)
self.post('/services', body={'service': ref},
expected_status=http_client.BAD_REQUEST)
def test_create_service_enabled_str_random(self):
"""Call ``POST /services``."""
ref = self.new_service_ref()
ref['enabled'] = 'puppies'
self.post('/services', body={'service': ref}, expected_status=400)
self.post('/services', body={'service': ref},
expected_status=http_client.BAD_REQUEST)
def test_list_services(self):
"""Call ``GET /services``."""
@ -575,7 +579,7 @@ class CatalogTestCase(test_v3.RestfulTestCase):
self.post(
'/endpoints',
body={'endpoint': ref},
expected_status=400)
expected_status=http_client.BAD_REQUEST)
def test_create_endpoint_enabled_str_false(self):
"""Call ``POST /endpoints`` with enabled: 'False'."""
@ -584,7 +588,7 @@ class CatalogTestCase(test_v3.RestfulTestCase):
self.post(
'/endpoints',
body={'endpoint': ref},
expected_status=400)
expected_status=http_client.BAD_REQUEST)
def test_create_endpoint_enabled_str_random(self):
"""Call ``POST /endpoints`` with enabled: 'puppies'."""
@ -593,13 +597,14 @@ class CatalogTestCase(test_v3.RestfulTestCase):
self.post(
'/endpoints',
body={'endpoint': ref},
expected_status=400)
expected_status=http_client.BAD_REQUEST)
def test_create_endpoint_with_invalid_region_id(self):
"""Call ``POST /endpoints``."""
ref = self.new_endpoint_ref(service_id=self.service_id)
ref["region_id"] = uuid.uuid4().hex
self.post('/endpoints', body={'endpoint': ref}, expected_status=400)
self.post('/endpoints', body={'endpoint': ref},
expected_status=http_client.BAD_REQUEST)
def test_create_endpoint_with_region(self):
"""EndpointV3 creates the region before creating the endpoint, if
@ -623,7 +628,8 @@ class CatalogTestCase(test_v3.RestfulTestCase):
"""Call ``POST /endpoints``."""
ref = self.new_endpoint_ref(service_id=self.service_id)
ref["url"] = ''
self.post('/endpoints', body={'endpoint': ref}, expected_status=400)
self.post('/endpoints', body={'endpoint': ref},
expected_status=http_client.BAD_REQUEST)
def test_get_endpoint(self):
"""Call ``GET /endpoints/{endpoint_id}``."""
@ -667,7 +673,7 @@ class CatalogTestCase(test_v3.RestfulTestCase):
'/endpoints/%(endpoint_id)s' % {
'endpoint_id': self.endpoint_id},
body={'endpoint': {'enabled': 'True'}},
expected_status=400)
expected_status=http_client.BAD_REQUEST)
def test_update_endpoint_enabled_str_false(self):
"""Call ``PATCH /endpoints/{endpoint_id}`` with enabled: 'False'."""
@ -675,7 +681,7 @@ class CatalogTestCase(test_v3.RestfulTestCase):
'/endpoints/%(endpoint_id)s' % {
'endpoint_id': self.endpoint_id},
body={'endpoint': {'enabled': 'False'}},
expected_status=400)
expected_status=http_client.BAD_REQUEST)
def test_update_endpoint_enabled_str_random(self):
"""Call ``PATCH /endpoints/{endpoint_id}`` with enabled: 'kitties'."""
@ -683,7 +689,7 @@ class CatalogTestCase(test_v3.RestfulTestCase):
'/endpoints/%(endpoint_id)s' % {
'endpoint_id': self.endpoint_id},
body={'endpoint': {'enabled': 'kitties'}},
expected_status=400)
expected_status=http_client.BAD_REQUEST)
def test_delete_endpoint(self):
"""Call ``DELETE /endpoints/{endpoint_id}``."""
@ -762,7 +768,8 @@ class CatalogTestCase(test_v3.RestfulTestCase):
self.delete('/endpoints/%s' % ref['id'])
# make sure it's deleted (GET should return 404)
self.get('/endpoints/%s' % ref['id'], expected_status=404)
self.get('/endpoints/%s' % ref['id'],
expected_status=http_client.NOT_FOUND)
def test_endpoint_create_with_valid_url(self):
"""Create endpoint with valid url should be tested,too."""
@ -798,7 +805,7 @@ class CatalogTestCase(test_v3.RestfulTestCase):
ref['url'] = invalid_url
self.post('/endpoints',
body={'endpoint': ref},
expected_status=400)
expected_status=http_client.BAD_REQUEST)
class TestCatalogAPISQL(tests.TestCase):

View File

@ -18,6 +18,7 @@ import uuid
from keystoneclient.contrib.ec2 import utils as ec2_utils
from oslo_config import cfg
from six.moves import http_client
from testtools import matchers
from keystone import exception
@ -252,10 +253,10 @@ class CredentialTestCase(CredentialBaseTestCase):
"secret": uuid.uuid4().hex}
ref['blob'] = json.dumps(blob)
ref['type'] = 'ec2'
# Assert 400 status for bad request with missing project_id
# Assert bad request status when missing project_id
self.post(
'/credentials',
body={'credential': ref}, expected_status=400)
body={'credential': ref}, expected_status=http_client.BAD_REQUEST)
def test_create_ec2_credential_with_invalid_blob(self):
"""Call ``POST /credentials`` for creating ec2
@ -265,11 +266,10 @@ class CredentialTestCase(CredentialBaseTestCase):
project_id=self.project_id)
ref['blob'] = '{"abc":"def"d}'
ref['type'] = 'ec2'
# Assert 400 status for bad request containing invalid
# blob
# Assert bad request status when request contains invalid blob
response = self.post(
'/credentials',
body={'credential': ref}, expected_status=400)
body={'credential': ref}, expected_status=http_client.BAD_REQUEST)
self.assertValidErrorResponse(response)
def test_create_credential_with_admin_token(self):

View File

@ -14,6 +14,7 @@ import copy
import uuid
from oslo_config import cfg
from six.moves import http_client
from keystone import exception
from keystone.tests.unit import test_v3
@ -103,21 +104,24 @@ class DomainConfigTestCase(test_v3.RestfulTestCase):
def test_get_non_existant_config(self):
"""Call ``GET /domains{domain_id}/config when no config defined``."""
self.get('/domains/%(domain_id)s/config' % {
'domain_id': self.domain['id']}, expected_status=404)
'domain_id': self.domain['id']},
expected_status=http_client.NOT_FOUND)
def test_get_non_existant_config_group(self):
"""Call ``GET /domains{domain_id}/config/{group_not_exist}``."""
config = {'ldap': {'url': uuid.uuid4().hex}}
self.domain_config_api.create_config(self.domain['id'], config)
self.get('/domains/%(domain_id)s/config/identity' % {
'domain_id': self.domain['id']}, expected_status=404)
'domain_id': self.domain['id']},
expected_status=http_client.NOT_FOUND)
def test_get_non_existant_config_option(self):
"""Call ``GET /domains{domain_id}/config/group/{option_not_exist}``."""
config = {'ldap': {'url': uuid.uuid4().hex}}
self.domain_config_api.create_config(self.domain['id'], config)
self.get('/domains/%(domain_id)s/config/ldap/user_tree_dn' % {
'domain_id': self.domain['id']}, expected_status=404)
'domain_id': self.domain['id']},
expected_status=http_client.NOT_FOUND)
def test_update_config(self):
"""Call ``PATCH /domains/{domain_id}/config``."""
@ -163,7 +167,7 @@ class DomainConfigTestCase(test_v3.RestfulTestCase):
self.patch('/domains/%(domain_id)s/config/%(invalid_group)s' % {
'domain_id': self.domain['id'], 'invalid_group': invalid_group},
body={'config': new_config},
expected_status=403)
expected_status=http_client.FORBIDDEN)
# Trying to update a valid group, but one that is not in the current
# config should result in NotFound
config = {'ldap': {'suffix': uuid.uuid4().hex}}
@ -172,7 +176,7 @@ class DomainConfigTestCase(test_v3.RestfulTestCase):
self.patch('/domains/%(domain_id)s/config/identity' % {
'domain_id': self.domain['id']},
body={'config': new_config},
expected_status=404)
expected_status=http_client.NOT_FOUND)
def test_update_config_option(self):
"""Call ``PATCH /domains/{domain_id}/config/{group}/{option}``."""
@ -199,7 +203,7 @@ class DomainConfigTestCase(test_v3.RestfulTestCase):
'domain_id': self.domain['id'],
'invalid_option': invalid_option},
body={'config': new_config},
expected_status=403)
expected_status=http_client.FORBIDDEN)
# Trying to update a valid option, but one that is not in the current
# config should result in NotFound
new_config = {'suffix': uuid.uuid4().hex}
@ -207,4 +211,4 @@ class DomainConfigTestCase(test_v3.RestfulTestCase):
'/domains/%(domain_id)s/config/ldap/suffix' % {
'domain_id': self.domain['id']},
body={'config': new_config},
expected_status=404)
expected_status=http_client.NOT_FOUND)

View File

@ -12,6 +12,7 @@
# License for the specific language governing permissions and limitations
# under the License.
from six.moves import http_client
from testtools import matchers
from keystone.tests.unit import test_v3
@ -48,7 +49,9 @@ class EndpointPolicyTestCase(test_v3.RestfulTestCase):
# Test when the resource does not exist also ensures
# that there is not a false negative after creation.
self.assert_head_and_get_return_same_response(url, expected_status=404)
self.assert_head_and_get_return_same_response(
url,
expected_status=http_client.NOT_FOUND)
self.put(url, expected_status=204)
@ -58,7 +61,9 @@ class EndpointPolicyTestCase(test_v3.RestfulTestCase):
self.delete(url, expected_status=204)
# test that the deleted resource is no longer accessible
self.assert_head_and_get_return_same_response(url, expected_status=404)
self.assert_head_and_get_return_same_response(
url,
expected_status=http_client.NOT_FOUND)
def test_crud_for_policy_for_explicit_endpoint(self):
"""PUT, HEAD and DELETE for explicit endpoint policy."""
@ -136,7 +141,7 @@ class EndpointPolicyTestCase(test_v3.RestfulTestCase):
self.delete('/endpoints/%(endpoint_id)s' % {
'endpoint_id': self.endpoint['id']})
self.head(url, expected_status=404)
self.head(url, expected_status=http_client.NOT_FOUND)
def test_region_service_association_cleanup_when_region_deleted(self):
url = ('/policies/%(policy_id)s/OS-ENDPOINT-POLICY'
@ -151,7 +156,7 @@ class EndpointPolicyTestCase(test_v3.RestfulTestCase):
self.delete('/regions/%(region_id)s' % {
'region_id': self.region['id']})
self.head(url, expected_status=404)
self.head(url, expected_status=http_client.NOT_FOUND)
def test_region_service_association_cleanup_when_service_deleted(self):
url = ('/policies/%(policy_id)s/OS-ENDPOINT-POLICY'
@ -166,7 +171,7 @@ class EndpointPolicyTestCase(test_v3.RestfulTestCase):
self.delete('/services/%(service_id)s' % {
'service_id': self.service['id']})
self.head(url, expected_status=404)
self.head(url, expected_status=http_client.NOT_FOUND)
def test_service_association_cleanup_when_service_deleted(self):
url = ('/policies/%(policy_id)s/OS-ENDPOINT-POLICY'
@ -180,7 +185,7 @@ class EndpointPolicyTestCase(test_v3.RestfulTestCase):
self.delete('/policies/%(policy_id)s' % {
'policy_id': self.policy['id']})
self.head(url, expected_status=404)
self.head(url, expected_status=http_client.NOT_FOUND)
def test_service_association_cleanup_when_policy_deleted(self):
url = ('/policies/%(policy_id)s/OS-ENDPOINT-POLICY'
@ -194,7 +199,7 @@ class EndpointPolicyTestCase(test_v3.RestfulTestCase):
self.delete('/services/%(service_id)s' % {
'service_id': self.service['id']})
self.head(url, expected_status=404)
self.head(url, expected_status=http_client.NOT_FOUND)
class JsonHomeTests(test_v3.JsonHomeTestMixin):

View File

@ -26,6 +26,7 @@ from oslotest import mockpatch
import saml2
from saml2 import saml
from saml2 import sigver
from six.moves import http_client
from six.moves import range, urllib, zip
xmldsig = importutils.try_import("saml2.xmldsig")
if not xmldsig:
@ -899,7 +900,7 @@ class FederatedIdentityProviderTests(FederationTests):
body['remote_ids'] = [uuid.uuid4().hex,
repeated_remote_id]
self.put(url, body={'identity_provider': body},
expected_status=409)
expected_status=http_client.CONFLICT)
def test_create_idp_remote_empty(self):
"""Creates an IdP with empty remote_ids."""
@ -1026,7 +1027,7 @@ class FederatedIdentityProviderTests(FederationTests):
self.put(url, body={'identity_provider': body},
expected_status=201)
self.put(url, body={'identity_provider': body},
expected_status=409)
expected_status=http_client.CONFLICT)
def test_get_idp(self):
"""Create and later fetch IdP."""
@ -1051,7 +1052,7 @@ class FederatedIdentityProviderTests(FederationTests):
self.assertIsNotNone(idp_id)
url = self.base_url(suffix=idp_id)
self.get(url, expected_status=404)
self.get(url, expected_status=http_client.NOT_FOUND)
def test_delete_existing_idp(self):
"""Create and later delete IdP.
@ -1065,7 +1066,7 @@ class FederatedIdentityProviderTests(FederationTests):
self.assertIsNotNone(idp_id)
url = self.base_url(suffix=idp_id)
self.delete(url)
self.get(url, expected_status=404)
self.get(url, expected_status=http_client.NOT_FOUND)
def test_delete_idp_also_deletes_assigned_protocols(self):
"""Deleting an IdP will delete its assigned protocol."""
@ -1091,7 +1092,7 @@ class FederatedIdentityProviderTests(FederationTests):
# removing IdP will remove the assigned protocol as well
self.assertEqual(1, len(self.federation_api.list_protocols(idp_id)))
self.delete(idp_url)
self.get(idp_url, expected_status=404)
self.get(idp_url, expected_status=http_client.NOT_FOUND)
self.assertEqual(0, len(self.federation_api.list_protocols(idp_id)))
def test_delete_nonexisting_idp(self):
@ -1101,7 +1102,7 @@ class FederatedIdentityProviderTests(FederationTests):
"""
idp_id = uuid.uuid4().hex
url = self.base_url(suffix=idp_id)
self.delete(url, expected_status=404)
self.delete(url, expected_status=http_client.NOT_FOUND)
def test_update_idp_mutable_attributes(self):
"""Update IdP's mutable parameters."""
@ -1142,7 +1143,7 @@ class FederatedIdentityProviderTests(FederationTests):
def test_update_idp_immutable_attributes(self):
"""Update IdP's immutable parameters.
Expect HTTP 403 code.
Expect HTTP FORBIDDEN.
"""
default_resp = self._create_default_idp()
@ -1156,7 +1157,8 @@ class FederatedIdentityProviderTests(FederationTests):
body['protocols'] = [uuid.uuid4().hex, uuid.uuid4().hex]
url = self.base_url(suffix=idp_id)
self.patch(url, body={'identity_provider': body}, expected_status=403)
self.patch(url, body={'identity_provider': body},
expected_status=http_client.FORBIDDEN)
def test_update_nonexistent_idp(self):
"""Update nonexistent IdP
@ -1170,7 +1172,7 @@ class FederatedIdentityProviderTests(FederationTests):
body['enabled'] = False
body = {'identity_provider': body}
self.patch(url, body=body, expected_status=404)
self.patch(url, body=body, expected_status=http_client.NOT_FOUND)
def test_assign_protocol_to_idp(self):
"""Assign a protocol to existing IdP."""
@ -1208,7 +1210,7 @@ class FederatedIdentityProviderTests(FederationTests):
kwargs = {'expected_status': 201}
resp, idp_id, proto = self._assign_protocol_to_idp(proto='saml2',
url=url, **kwargs)
kwargs = {'expected_status': 409}
kwargs = {'expected_status': http_client.CONFLICT}
resp, idp_id, proto = self._assign_protocol_to_idp(idp_id=idp_id,
proto='saml2',
validate=False,
@ -1222,7 +1224,7 @@ class FederatedIdentityProviderTests(FederationTests):
"""
idp_id = uuid.uuid4().hex
kwargs = {'expected_status': 404}
kwargs = {'expected_status': http_client.NOT_FOUND}
self._assign_protocol_to_idp(proto='saml2',
idp_id=idp_id,
validate=False,
@ -1299,7 +1301,7 @@ class FederatedIdentityProviderTests(FederationTests):
url = url % {'idp_id': idp_id,
'protocol_id': proto}
self.delete(url)
self.get(url, expected_status=404)
self.get(url, expected_status=http_client.NOT_FOUND)
class MappingCRUDTests(FederationTests):
@ -1364,7 +1366,7 @@ class MappingCRUDTests(FederationTests):
url = url % {'mapping_id': str(mapping_id)}
resp = self.delete(url)
self.assertResponseStatus(resp, 204)
self.get(url, expected_status=404)
self.get(url, expected_status=http_client.NOT_FOUND)
def test_mapping_get(self):
url = self.MAPPING_URL + '%(mapping_id)s'
@ -1387,70 +1389,73 @@ class MappingCRUDTests(FederationTests):
def test_delete_mapping_dne(self):
url = self.MAPPING_URL + uuid.uuid4().hex
self.delete(url, expected_status=404)
self.delete(url, expected_status=http_client.NOT_FOUND)
def test_get_mapping_dne(self):
url = self.MAPPING_URL + uuid.uuid4().hex
self.get(url, expected_status=404)
self.get(url, expected_status=http_client.NOT_FOUND)
def test_create_mapping_bad_requirements(self):
url = self.MAPPING_URL + uuid.uuid4().hex
self.put(url, expected_status=400,
self.put(url, expected_status=http_client.BAD_REQUEST,
body={'mapping': mapping_fixtures.MAPPING_BAD_REQ})
def test_create_mapping_no_rules(self):
url = self.MAPPING_URL + uuid.uuid4().hex
self.put(url, expected_status=400,
self.put(url, expected_status=http_client.BAD_REQUEST,
body={'mapping': mapping_fixtures.MAPPING_NO_RULES})
def test_create_mapping_no_remote_objects(self):
url = self.MAPPING_URL + uuid.uuid4().hex
self.put(url, expected_status=400,
self.put(url, expected_status=http_client.BAD_REQUEST,
body={'mapping': mapping_fixtures.MAPPING_NO_REMOTE})
def test_create_mapping_bad_value(self):
url = self.MAPPING_URL + uuid.uuid4().hex
self.put(url, expected_status=400,
self.put(url, expected_status=http_client.BAD_REQUEST,
body={'mapping': mapping_fixtures.MAPPING_BAD_VALUE})
def test_create_mapping_missing_local(self):
url = self.MAPPING_URL + uuid.uuid4().hex
self.put(url, expected_status=400,
self.put(url, expected_status=http_client.BAD_REQUEST,
body={'mapping': mapping_fixtures.MAPPING_MISSING_LOCAL})
def test_create_mapping_missing_type(self):
url = self.MAPPING_URL + uuid.uuid4().hex
self.put(url, expected_status=400,
self.put(url, expected_status=http_client.BAD_REQUEST,
body={'mapping': mapping_fixtures.MAPPING_MISSING_TYPE})
def test_create_mapping_wrong_type(self):
url = self.MAPPING_URL + uuid.uuid4().hex
self.put(url, expected_status=400,
self.put(url, expected_status=http_client.BAD_REQUEST,
body={'mapping': mapping_fixtures.MAPPING_WRONG_TYPE})
def test_create_mapping_extra_remote_properties_not_any_of(self):
url = self.MAPPING_URL + uuid.uuid4().hex
mapping = mapping_fixtures.MAPPING_EXTRA_REMOTE_PROPS_NOT_ANY_OF
self.put(url, expected_status=400, body={'mapping': mapping})
self.put(url, expected_status=http_client.BAD_REQUEST,
body={'mapping': mapping})
def test_create_mapping_extra_remote_properties_any_one_of(self):
url = self.MAPPING_URL + uuid.uuid4().hex
mapping = mapping_fixtures.MAPPING_EXTRA_REMOTE_PROPS_ANY_ONE_OF
self.put(url, expected_status=400, body={'mapping': mapping})
self.put(url, expected_status=http_client.BAD_REQUEST,
body={'mapping': mapping})
def test_create_mapping_extra_remote_properties_just_type(self):
url = self.MAPPING_URL + uuid.uuid4().hex
mapping = mapping_fixtures.MAPPING_EXTRA_REMOTE_PROPS_JUST_TYPE
self.put(url, expected_status=400, body={'mapping': mapping})
self.put(url, expected_status=http_client.BAD_REQUEST,
body={'mapping': mapping})
def test_create_mapping_empty_map(self):
url = self.MAPPING_URL + uuid.uuid4().hex
self.put(url, expected_status=400,
self.put(url, expected_status=http_client.BAD_REQUEST,
body={'mapping': {}})
def test_create_mapping_extra_rules_properties(self):
url = self.MAPPING_URL + uuid.uuid4().hex
self.put(url, expected_status=400,
self.put(url, expected_status=http_client.BAD_REQUEST,
body={'mapping': mapping_fixtures.MAPPING_EXTRA_RULES_PROPS})
def test_create_mapping_with_blacklist_and_whitelist(self):
@ -1462,7 +1467,8 @@ class MappingCRUDTests(FederationTests):
"""
url = self.MAPPING_URL + uuid.uuid4().hex
mapping = mapping_fixtures.MAPPING_GROUPS_WHITELIST_AND_BLACKLIST
self.put(url, expected_status=400, body={'mapping': mapping})
self.put(url, expected_status=http_client.BAD_REQUEST,
body={'mapping': mapping})
class FederatedTokenTests(FederationTests, FederatedSetupMixin):
@ -1679,14 +1685,14 @@ class FederatedTokenTests(FederationTests, FederatedSetupMixin):
self.federation_api.update_idp(self.IDP, enabled_false)
self.v3_authenticate_token(
self.TOKEN_SCOPE_PROJECT_EMPLOYEE_FROM_CUSTOMER,
expected_status=403)
expected_status=http_client.FORBIDDEN)
def test_scope_to_bad_project(self):
"""Scope unscoped token with a project we don't have access to."""
self.v3_authenticate_token(
self.TOKEN_SCOPE_PROJECT_EMPLOYEE_FROM_CUSTOMER,
expected_status=401)
expected_status=http_client.UNAUTHORIZED)
def test_scope_to_project_multiple_times(self):
"""Try to scope the unscoped token multiple times.
@ -1725,7 +1731,7 @@ class FederatedTokenTests(FederationTests, FederatedSetupMixin):
"""Try to scope token from non-existent unscoped token."""
self.v3_authenticate_token(
self.TOKEN_SCOPE_PROJECT_FROM_NONEXISTENT_TOKEN,
expected_status=404)
expected_status=http_client.NOT_FOUND)
def test_issue_token_from_rules_without_user(self):
api = auth_controllers.Auth()
@ -1779,7 +1785,7 @@ class FederatedTokenTests(FederationTests, FederatedSetupMixin):
"""Try to scope to a domain that has no direct roles."""
self.v3_authenticate_token(
self.TOKEN_SCOPE_DOMAIN_D_FROM_CUSTOMER,
expected_status=401)
expected_status=http_client.UNAUTHORIZED)
def test_list_projects(self):
urls = ('/OS-FEDERATION/projects', '/auth/projects')
@ -2729,7 +2735,7 @@ class SAMLGenerationTests(FederationTests):
with mock.patch.object(keystone_idp, '_sign_assertion',
return_value=self.signed_assertion):
self.post(self.SAML_GENERATION_ROUTE, body=body,
expected_status=403)
expected_status=http_client.FORBIDDEN)
def test_generate_saml_route(self):
"""Test that the SAML generation endpoint produces XML.
@ -2792,7 +2798,8 @@ class SAMLGenerationTests(FederationTests):
self.SERVICE_PROVDIER_ID)
del body['auth']['scope']
self.post(self.SAML_GENERATION_ROUTE, body=body, expected_status=400)
self.post(self.SAML_GENERATION_ROUTE, body=body,
expected_status=http_client.BAD_REQUEST)
def test_invalid_token_body(self):
"""Test that missing the token in request body raises an exception.
@ -2806,7 +2813,8 @@ class SAMLGenerationTests(FederationTests):
self.SERVICE_PROVDIER_ID)
del body['auth']['identity']['token']
self.post(self.SAML_GENERATION_ROUTE, body=body, expected_status=400)
self.post(self.SAML_GENERATION_ROUTE, body=body,
expected_status=http_client.BAD_REQUEST)
def test_sp_not_found(self):
"""Test SAML generation with an invalid service provider ID.
@ -2817,7 +2825,8 @@ class SAMLGenerationTests(FederationTests):
sp_id = uuid.uuid4().hex
token_id = self._fetch_valid_token()
body = self._create_generate_saml_request(token_id, sp_id)
self.post(self.SAML_GENERATION_ROUTE, body=body, expected_status=404)
self.post(self.SAML_GENERATION_ROUTE, body=body,
expected_status=http_client.NOT_FOUND)
def test_sp_disabled(self):
"""Try generating assertion for disabled Service Provider."""
@ -2829,7 +2838,8 @@ class SAMLGenerationTests(FederationTests):
token_id = self._fetch_valid_token()
body = self._create_generate_saml_request(token_id,
self.SERVICE_PROVDIER_ID)
self.post(self.SAML_GENERATION_ROUTE, body=body, expected_status=403)
self.post(self.SAML_GENERATION_ROUTE, body=body,
expected_status=http_client.FORBIDDEN)
def test_token_not_found(self):
"""Test that an invalid token in the request body raises an exception.
@ -2841,7 +2851,8 @@ class SAMLGenerationTests(FederationTests):
token_id = uuid.uuid4().hex
body = self._create_generate_saml_request(token_id,
self.SERVICE_PROVDIER_ID)
self.post(self.SAML_GENERATION_ROUTE, body=body, expected_status=404)
self.post(self.SAML_GENERATION_ROUTE, body=body,
expected_status=http_client.NOT_FOUND)
def test_generate_ecp_route(self):
"""Test that the ECP generation endpoint produces XML.
@ -3113,7 +3124,7 @@ class ServiceProviderTests(FederationTests):
def test_get_service_provider_fail(self):
url = self.base_url(suffix=uuid.uuid4().hex)
self.get(url, expected_status=404)
self.get(url, expected_status=http_client.NOT_FOUND)
def test_create_service_provider(self):
url = self.base_url(suffix=uuid.uuid4().hex)
@ -3152,7 +3163,7 @@ class ServiceProviderTests(FederationTests):
sp = self.sp_ref()
sp[uuid.uuid4().hex] = uuid.uuid4().hex
self.put(url, body={'service_provider': sp},
expected_status=400)
expected_status=http_client.BAD_REQUEST)
def test_list_service_providers(self):
"""Test listing of service provider objects.
@ -3219,21 +3230,21 @@ class ServiceProviderTests(FederationTests):
new_sp_ref = {'id': uuid.uuid4().hex}
url = self.base_url(suffix=self.SERVICE_PROVIDER_ID)
self.patch(url, body={'service_provider': new_sp_ref},
expected_status=400)
expected_status=http_client.BAD_REQUEST)
def test_update_service_provider_unknown_parameter(self):
new_sp_ref = self.sp_ref()
new_sp_ref[uuid.uuid4().hex] = uuid.uuid4().hex
url = self.base_url(suffix=self.SERVICE_PROVIDER_ID)
self.patch(url, body={'service_provider': new_sp_ref},
expected_status=400)
expected_status=http_client.BAD_REQUEST)
def test_update_service_provider_404(self):
new_sp_ref = self.sp_ref()
new_sp_ref['description'] = uuid.uuid4().hex
url = self.base_url(suffix=uuid.uuid4().hex)
self.patch(url, body={'service_provider': new_sp_ref},
expected_status=404)
expected_status=http_client.NOT_FOUND)
def test_update_sp_relay_state(self):
"""Update an SP with custome relay state."""
@ -3253,7 +3264,7 @@ class ServiceProviderTests(FederationTests):
def test_delete_service_provider_404(self):
url = self.base_url(suffix=uuid.uuid4().hex)
self.delete(url, expected_status=404)
self.delete(url, expected_status=http_client.NOT_FOUND)
class WebSSOTests(FederatedTokenTests):

View File

@ -17,6 +17,7 @@ import uuid
import fixtures
from oslo_config import cfg
from six.moves import http_client
from testtools import matchers
from keystone.common import controller
@ -104,9 +105,10 @@ class IdentityTestCase(test_v3.RestfulTestCase):
ref['domain_id'] = CONF.identity.default_domain_id
return self.assertValidUserResponse(r, ref)
def test_create_user_400(self):
def test_create_user_bad_request(self):
"""Call ``POST /users``."""
self.post('/users', body={'user': {}}, expected_status=400)
self.post('/users', body={'user': {}},
expected_status=http_client.BAD_REQUEST)
def test_list_users(self):
"""Call ``GET /users``."""
@ -300,10 +302,12 @@ class IdentityTestCase(test_v3.RestfulTestCase):
expected_status=200)
# auth as user with original password should not work after change
self.v3_authenticate_token(old_password_auth, expected_status=401)
self.v3_authenticate_token(old_password_auth,
expected_status=http_client.UNAUTHORIZED)
# auth as user with an old token should not work after change
self.v3_authenticate_token(old_token_auth, expected_status=404)
self.v3_authenticate_token(old_token_auth,
expected_status=http_client.NOT_FOUND)
# new password should work
new_password_auth = self.build_authentication_request(
@ -389,9 +393,10 @@ class IdentityTestCase(test_v3.RestfulTestCase):
body={'group': ref})
return self.assertValidGroupResponse(r, ref)
def test_create_group_400(self):
def test_create_group_bad_request(self):
"""Call ``POST /groups``."""
self.post('/groups', body={'group': {}}, expected_status=400)
self.post('/groups', body={'group': {}},
expected_status=http_client.BAD_REQUEST)
def test_list_groups(self):
"""Call ``GET /groups``."""
@ -581,30 +586,32 @@ class UserSelfServiceChangingPasswordsTestCase(test_v3.RestfulTestCase):
expected_status=204)
# old password fails
self.get_request_token(self.user_ref['password'], expected_status=401)
self.get_request_token(self.user_ref['password'],
expected_status=http_client.UNAUTHORIZED)
# old token fails
self.v3_authenticate_token(old_token_auth, expected_status=404)
self.v3_authenticate_token(old_token_auth,
expected_status=http_client.NOT_FOUND)
# new password works
self.get_request_token(new_password, expected_status=201)
def test_changing_password_with_missing_original_password_fails(self):
r = self.change_password(password=uuid.uuid4().hex,
expected_status=400)
expected_status=http_client.BAD_REQUEST)
self.assertThat(r.result['error']['message'],
matchers.Contains('original_password'))
def test_changing_password_with_missing_password_fails(self):
r = self.change_password(original_password=self.user_ref['password'],
expected_status=400)
expected_status=http_client.BAD_REQUEST)
self.assertThat(r.result['error']['message'],
matchers.Contains('password'))
def test_changing_password_with_incorrect_password_fails(self):
self.change_password(password=uuid.uuid4().hex,
original_password=uuid.uuid4().hex,
expected_status=401)
expected_status=http_client.UNAUTHORIZED)
def test_changing_password_with_disabled_user_fails(self):
# disable the user account
@ -614,7 +621,7 @@ class UserSelfServiceChangingPasswordsTestCase(test_v3.RestfulTestCase):
self.change_password(password=uuid.uuid4().hex,
original_password=self.user_ref['password'],
expected_status=401)
expected_status=http_client.UNAUTHORIZED)
def test_changing_password_not_logged(self):
# When a user changes their password, the password isn't logged at any

View File

@ -18,6 +18,7 @@ import uuid
from oslo_config import cfg
from oslo_serialization import jsonutils
from pycadf import cadftaxonomy
from six.moves import http_client
from six.moves import urllib
from keystone.contrib import oauth1
@ -182,7 +183,7 @@ class ConsumerCRUDTests(OAuth1Tests):
update_ref['secret'] = uuid.uuid4().hex
self.patch(self.CONSUMER_URL + '/%s' % original_id,
body={'consumer': update_ref},
expected_status=400)
expected_status=http_client.BAD_REQUEST)
def test_consumer_update_bad_id(self):
consumer = self._create_single_consumer()
@ -195,7 +196,7 @@ class ConsumerCRUDTests(OAuth1Tests):
update_ref['id'] = update_description
self.patch(self.CONSUMER_URL + '/%s' % original_id,
body={'consumer': update_ref},
expected_status=400)
expected_status=http_client.BAD_REQUEST)
def test_consumer_update_normalize_field(self):
# If update a consumer with a field with : or - in the name,
@ -236,7 +237,7 @@ class ConsumerCRUDTests(OAuth1Tests):
def test_consumer_get_bad_id(self):
self.get(self.CONSUMER_URL + '/%(consumer_id)s'
% {'consumer_id': uuid.uuid4().hex},
expected_status=404)
expected_status=http_client.NOT_FOUND)
class OAuthFlowTests(OAuth1Tests):
@ -291,7 +292,7 @@ class AccessTokenCRUDTests(OAuthFlowTests):
self.delete('/users/%(user)s/OS-OAUTH1/access_tokens/%(auth)s'
% {'user': self.user_id,
'auth': uuid.uuid4().hex},
expected_status=404)
expected_status=http_client.NOT_FOUND)
def test_list_no_access_tokens(self):
resp = self.get('/users/%(user_id)s/OS-OAUTH1/access_tokens'
@ -316,7 +317,7 @@ class AccessTokenCRUDTests(OAuthFlowTests):
self.get('/users/%(user_id)s/OS-OAUTH1/access_tokens/%(key)s'
% {'user_id': self.user_id,
'key': uuid.uuid4().hex},
expected_status=404)
expected_status=http_client.NOT_FOUND)
def test_list_all_roles_in_access_token(self):
self.test_oauth_flow()
@ -341,7 +342,7 @@ class AccessTokenCRUDTests(OAuthFlowTests):
url = ('/users/%(id)s/OS-OAUTH1/access_tokens/%(key)s/roles/%(role)s'
% {'id': self.user_id, 'key': self.access_token.key,
'role': uuid.uuid4().hex})
self.get(url, expected_status=404)
self.get(url, expected_status=http_client.NOT_FOUND)
def test_list_and_delete_access_tokens(self):
self.test_oauth_flow()
@ -405,7 +406,7 @@ class AuthTokenTests(OAuthFlowTests):
headers = {'X-Subject-Token': self.keystone_token_id,
'X-Auth-Token': self.keystone_token_id}
self.get('/auth/tokens', headers=headers,
expected_status=404)
expected_status=http_client.NOT_FOUND)
def test_deleting_consumer_also_deletes_tokens(self):
self.test_oauth_flow()
@ -426,7 +427,7 @@ class AuthTokenTests(OAuthFlowTests):
headers = {'X-Subject-Token': self.keystone_token_id,
'X-Auth-Token': self.keystone_token_id}
self.head('/auth/tokens', headers=headers,
expected_status=404)
expected_status=http_client.NOT_FOUND)
def test_change_user_password_also_deletes_tokens(self):
self.test_oauth_flow()
@ -445,7 +446,7 @@ class AuthTokenTests(OAuthFlowTests):
headers = {'X-Subject-Token': self.keystone_token_id,
'X-Auth-Token': self.keystone_token_id}
self.admin_request(path='/auth/tokens', headers=headers,
method='GET', expected_status=404)
method='GET', expected_status=http_client.NOT_FOUND)
def test_deleting_project_also_invalidates_tokens(self):
self.test_oauth_flow()
@ -462,7 +463,7 @@ class AuthTokenTests(OAuthFlowTests):
headers = {'X-Subject-Token': self.keystone_token_id,
'X-Auth-Token': self.keystone_token_id}
self.admin_request(path='/auth/tokens', headers=headers,
method='GET', expected_status=404)
method='GET', expected_status=http_client.NOT_FOUND)
def test_token_chaining_is_not_allowed(self):
self.test_oauth_flow()
@ -477,7 +478,7 @@ class AuthTokenTests(OAuthFlowTests):
body=auth_data,
token=self.keystone_token_id,
method='POST',
expected_status=403)
expected_status=http_client.FORBIDDEN)
def test_delete_keystone_tokens_by_consumer_id(self):
self.test_oauth_flow()
@ -545,14 +546,14 @@ class AuthTokenTests(OAuthFlowTests):
self.post('/OS-TRUST/trusts',
body={'trust': ref},
token=self.keystone_token_id,
expected_status=403)
expected_status=http_client.FORBIDDEN)
def test_oauth_token_cannot_authorize_request_token(self):
self.test_oauth_flow()
url = self._approve_request_token_url()
body = {'roles': [{'id': self.role_id}]}
self.put(url, body=body, token=self.keystone_token_id,
expected_status=403)
expected_status=http_client.FORBIDDEN)
def test_oauth_token_cannot_list_request_tokens(self):
self._set_policy({"identity:list_access_tokens": [],
@ -561,7 +562,7 @@ class AuthTokenTests(OAuthFlowTests):
self.test_oauth_flow()
url = '/users/%s/OS-OAUTH1/access_tokens' % self.user_id
self.get(url, token=self.keystone_token_id,
expected_status=403)
expected_status=http_client.FORBIDDEN)
def _set_policy(self, new_policy):
self.tempfile = self.useFixture(temporaryfile.SecureTempFile())
@ -575,14 +576,16 @@ class AuthTokenTests(OAuthFlowTests):
trust_token = self._create_trust_get_token()
url = self._approve_request_token_url()
body = {'roles': [{'id': self.role_id}]}
self.put(url, body=body, token=trust_token, expected_status=403)
self.put(url, body=body, token=trust_token,
expected_status=http_client.FORBIDDEN)
def test_trust_token_cannot_list_request_tokens(self):
self._set_policy({"identity:list_access_tokens": [],
"identity:create_trust": []})
trust_token = self._create_trust_get_token()
url = '/users/%s/OS-OAUTH1/access_tokens' % self.user_id
self.get(url, token=trust_token, expected_status=403)
self.get(url, token=trust_token,
expected_status=http_client.FORBIDDEN)
class MaliciousOAuth1Tests(OAuth1Tests):
@ -592,7 +595,8 @@ class MaliciousOAuth1Tests(OAuth1Tests):
consumer_id = consumer['id']
consumer = {'key': consumer_id, 'secret': uuid.uuid4().hex}
url, headers = self._create_request_token(consumer, self.project_id)
self.post(url, headers=headers, expected_status=401)
self.post(url, headers=headers,
expected_status=http_client.UNAUTHORIZED)
def test_bad_request_token_key(self):
consumer = self._create_single_consumer()
@ -605,7 +609,7 @@ class MaliciousOAuth1Tests(OAuth1Tests):
response_content_type='application/x-www-urlformencoded')
url = self._authorize_request_token(uuid.uuid4().hex)
body = {'roles': [{'id': self.role_id}]}
self.put(url, body=body, expected_status=404)
self.put(url, body=body, expected_status=http_client.NOT_FOUND)
def test_bad_consumer_id(self):
consumer = self._create_single_consumer()
@ -613,7 +617,7 @@ class MaliciousOAuth1Tests(OAuth1Tests):
consumer_secret = consumer['secret']
consumer = {'key': consumer_id, 'secret': consumer_secret}
url, headers = self._create_request_token(consumer, self.project_id)
self.post(url, headers=headers, expected_status=404)
self.post(url, headers=headers, expected_status=http_client.NOT_FOUND)
def test_bad_requested_project_id(self):
consumer = self._create_single_consumer()
@ -622,7 +626,7 @@ class MaliciousOAuth1Tests(OAuth1Tests):
consumer = {'key': consumer_id, 'secret': consumer_secret}
project_id = uuid.uuid4().hex
url, headers = self._create_request_token(consumer, project_id)
self.post(url, headers=headers, expected_status=404)
self.post(url, headers=headers, expected_status=http_client.NOT_FOUND)
def test_bad_verifier(self):
consumer = self._create_single_consumer()
@ -647,7 +651,8 @@ class MaliciousOAuth1Tests(OAuth1Tests):
request_token.set_verifier(uuid.uuid4().hex)
url, headers = self._create_access_token(consumer, request_token)
self.post(url, headers=headers, expected_status=401)
self.post(url, headers=headers,
expected_status=http_client.UNAUTHORIZED)
def test_bad_authorizing_roles(self):
consumer = self._create_single_consumer()
@ -667,7 +672,7 @@ class MaliciousOAuth1Tests(OAuth1Tests):
url = self._authorize_request_token(request_key)
body = {'roles': [{'id': self.role_id}]}
self.admin_request(path=url, method='PUT',
body=body, expected_status=404)
body=body, expected_status=http_client.NOT_FOUND)
def test_expired_authorizing_request_token(self):
self.config_fixture.config(group='oauth1', request_token_duration=-1)
@ -691,7 +696,7 @@ class MaliciousOAuth1Tests(OAuth1Tests):
url = self._authorize_request_token(request_key)
body = {'roles': [{'id': self.role_id}]}
self.put(url, body=body, expected_status=401)
self.put(url, body=body, expected_status=http_client.UNAUTHORIZED)
def test_expired_creating_keystone_token(self):
self.config_fixture.config(group='oauth1', access_token_duration=-1)
@ -731,7 +736,8 @@ class MaliciousOAuth1Tests(OAuth1Tests):
url, headers, body = self._get_oauth_token(self.consumer,
self.access_token)
self.post(url, headers=headers, body=body, expected_status=401)
self.post(url, headers=headers, body=body,
expected_status=http_client.UNAUTHORIZED)
def test_missing_oauth_headers(self):
endpoint = '/OS-OAUTH1/request_token'

View File

@ -15,6 +15,7 @@ import uuid
from oslo_utils import timeutils
import six
from six.moves import http_client
from testtools import matchers
from keystone.common import utils
@ -112,7 +113,8 @@ class OSRevokeTests(test_v3.RestfulTestCase, test_v3.JsonHomeTestMixin):
self.assertReportedEventMatchesRecorded(events[0], sample, before_time)
def test_list_since_invalid(self):
self.get('/OS-REVOKE/events?since=blah', expected_status=400)
self.get('/OS-REVOKE/events?since=blah',
expected_status=http_client.BAD_REQUEST)
def test_list_since_valid(self):
resp = self.get('/OS-REVOKE/events?since=2013-02-27T18:30:59.999999Z')

View File

@ -17,6 +17,7 @@ import uuid
from oslo_config import cfg
from oslo_serialization import jsonutils
from six.moves import http_client
from keystone import exception
from keystone.policy.backends import rules
@ -428,7 +429,8 @@ class IdentityTestPolicySample(test_v3.RestfulTestCase):
user2_token = self.get_requested_token(user2_auth)
self.get('/auth/tokens', token=user1_token,
headers={'X-Subject-Token': user2_token}, expected_status=403)
headers={'X-Subject-Token': user2_token},
expected_status=http_client.FORBIDDEN)
def test_admin_validate_user_token(self):
# An admin can validate a user's token.
@ -490,7 +492,7 @@ class IdentityTestPolicySample(test_v3.RestfulTestCase):
self.head('/auth/tokens', token=user1_token,
headers={'X-Subject-Token': user2_token},
expected_status=403)
expected_status=http_client.FORBIDDEN)
def test_admin_check_user_token(self):
# An admin can check a user's token.
@ -552,7 +554,7 @@ class IdentityTestPolicySample(test_v3.RestfulTestCase):
self.delete('/auth/tokens', token=user1_token,
headers={'X-Subject-Token': user2_token},
expected_status=403)
expected_status=http_client.FORBIDDEN)
def test_admin_revoke_user_token(self):
# An admin can revoke a user's token.
@ -948,7 +950,8 @@ class IdentityTestv3CloudPolicySample(test_v3.RestfulTestCase,
collection_url = self.build_role_assignment_query_url(
domain_id=self.domainB['id'])
self.get(collection_url, auth=self.auth, expected_status=403)
self.get(collection_url, auth=self.auth,
expected_status=http_client.FORBIDDEN)
def test_domain_user_list_assignments_of_domain_failed(self):
self.auth = self.build_authentication_request(
@ -958,7 +961,8 @@ class IdentityTestv3CloudPolicySample(test_v3.RestfulTestCase,
collection_url = self.build_role_assignment_query_url(
domain_id=self.domainA['id'])
self.get(collection_url, auth=self.auth, expected_status=403)
self.get(collection_url, auth=self.auth,
expected_status=http_client.FORBIDDEN)
def test_cloud_admin_list_assignments_of_project(self):
self.auth = self.build_authentication_request(
@ -1021,7 +1025,8 @@ class IdentityTestv3CloudPolicySample(test_v3.RestfulTestCase,
collection_url = self.build_role_assignment_query_url(
project_id=self.project['id'])
self.get(collection_url, auth=self.auth, expected_status=403)
self.get(collection_url, auth=self.auth,
expected_status=http_client.FORBIDDEN)
def test_cloud_admin(self):
self.auth = self.build_authentication_request(
@ -1145,7 +1150,8 @@ class IdentityTestv3CloudPolicySample(test_v3.RestfulTestCase,
user2_token = self.get_requested_token(user2_auth)
self.get('/auth/tokens', token=user1_token,
headers={'X-Subject-Token': user2_token}, expected_status=403)
headers={'X-Subject-Token': user2_token},
expected_status=http_client.FORBIDDEN)
def test_admin_validate_user_token(self):
# An admin can validate a user's token.
@ -1207,7 +1213,7 @@ class IdentityTestv3CloudPolicySample(test_v3.RestfulTestCase,
self.head('/auth/tokens', token=user1_token,
headers={'X-Subject-Token': user2_token},
expected_status=403)
expected_status=http_client.FORBIDDEN)
def test_admin_check_user_token(self):
# An admin can check a user's token.
@ -1269,7 +1275,7 @@ class IdentityTestv3CloudPolicySample(test_v3.RestfulTestCase,
self.delete('/auth/tokens', token=user1_token,
headers={'X-Subject-Token': user2_token},
expected_status=403)
expected_status=http_client.FORBIDDEN)
def test_admin_revoke_user_token(self):
# An admin can revoke a user's token.

View File

@ -20,6 +20,7 @@ import random
import mock
from oslo_config import cfg
from oslo_serialization import jsonutils
from six.moves import http_client
from testtools import matchers as tt_matchers
from keystone.common import json_home
@ -788,7 +789,7 @@ class VersionTestCase(tests.TestCase):
client = tests.TestClient(self.public_app)
# request to /v2.0 should fail
resp = client.get('/v2.0/')
self.assertEqual(404, resp.status_int)
self.assertEqual(http_client.NOT_FOUND, resp.status_int)
# request to /v3 should pass
resp = client.get('/v3/')
@ -821,7 +822,7 @@ class VersionTestCase(tests.TestCase):
client = tests.TestClient(self.public_app)
# request to /v3 should fail
resp = client.get('/v3/')
self.assertEqual(404, resp.status_int)
self.assertEqual(http_client.NOT_FOUND, resp.status_int)
# request to /v2.0 should pass
resp = client.get('/v2.0/')

View File

@ -23,6 +23,7 @@ import mock
import oslo_i18n
from oslo_serialization import jsonutils
import six
from six.moves import http_client
from testtools import matchers
import webob
@ -195,14 +196,14 @@ class ApplicationTest(BaseWSGITest):
def test_render_exception(self):
e = exception.Unauthorized(message=u'\u7f51\u7edc')
resp = wsgi.render_exception(e)
self.assertEqual(401, resp.status_int)
self.assertEqual(http_client.UNAUTHORIZED, resp.status_int)
def test_render_exception_host(self):
e = exception.Unauthorized(message=u'\u7f51\u7edc')
context = {'host_url': 'http://%s:5000' % uuid.uuid4().hex}
resp = wsgi.render_exception(e, context=context)
self.assertEqual(401, resp.status_int)
self.assertEqual(http_client.UNAUTHORIZED, resp.status_int)
def test_improperly_encoded_params(self):
class FakeApp(wsgi.Application):