From a50e23b9b7856807447048553e7f88c0109f18cf Mon Sep 17 00:00:00 2001 From: Dave Chen Date: Mon, 6 Jul 2015 15:25:08 +0800 Subject: [PATCH] Refactor: Don't hard code the error code This patch replace the hard coded HTTP error code (400~410) with the constants. Change-Id: I952cac73a9713bde4ad757371ca8b4ded93f207e --- keystone/tests/unit/rest.py | 7 +- ...st_associate_project_endpoint_extension.py | 50 +-- keystone/tests/unit/test_catalog.py | 28 +- keystone/tests/unit/test_middleware.py | 7 +- .../tests/unit/test_no_admin_token_auth.py | 3 +- keystone/tests/unit/test_v2.py | 65 ++-- keystone/tests/unit/test_v2_keystoneclient.py | 4 +- keystone/tests/unit/test_v3_assignment.py | 158 ++++++---- keystone/tests/unit/test_v3_auth.py | 289 +++++++++++------- keystone/tests/unit/test_v3_catalog.py | 35 ++- keystone/tests/unit/test_v3_credential.py | 10 +- keystone/tests/unit/test_v3_domain_config.py | 18 +- .../tests/unit/test_v3_endpoint_policy.py | 19 +- keystone/tests/unit/test_v3_federation.py | 99 +++--- keystone/tests/unit/test_v3_identity.py | 31 +- keystone/tests/unit/test_v3_oauth1.py | 54 ++-- keystone/tests/unit/test_v3_os_revoke.py | 4 +- keystone/tests/unit/test_v3_protection.py | 24 +- keystone/tests/unit/test_versions.py | 5 +- keystone/tests/unit/test_wsgi.py | 5 +- 20 files changed, 535 insertions(+), 380 deletions(-) diff --git a/keystone/tests/unit/rest.py b/keystone/tests/unit/rest.py index bfa52354bd..583fa37024 100644 --- a/keystone/tests/unit/rest.py +++ b/keystone/tests/unit/rest.py @@ -13,6 +13,7 @@ # under the License. from oslo_serialization import jsonutils +from six.moves import http_client import webtest from keystone.auth import controllers as auth_controllers @@ -125,7 +126,8 @@ class RestfulTestCase(tests.TestCase): """Ensures that response headers appear as expected.""" self.assertIn('X-Auth-Token', response.headers.get('Vary')) - def assertValidErrorResponse(self, response, expected_status=400): + def assertValidErrorResponse(self, response, + expected_status=http_client.BAD_REQUEST): """Verify that the error response is valid. Subclasses can override this function based on the expected response. @@ -184,7 +186,8 @@ class RestfulTestCase(tests.TestCase): self._from_content_type(response, content_type=response_content_type) # we can save some code & improve coverage by always doing this - if method != 'HEAD' and response.status_code >= 400: + if (method != 'HEAD' and + response.status_code >= http_client.BAD_REQUEST): self.assertValidErrorResponse(response) # Contains the decoded response.body diff --git a/keystone/tests/unit/test_associate_project_endpoint_extension.py b/keystone/tests/unit/test_associate_project_endpoint_extension.py index 9cde704e18..24fc82ddb6 100644 --- a/keystone/tests/unit/test_associate_project_endpoint_extension.py +++ b/keystone/tests/unit/test_associate_project_endpoint_extension.py @@ -15,6 +15,7 @@ import copy import uuid +from six.moves import http_client from testtools import matchers from keystone.tests.unit import test_v3 @@ -60,7 +61,7 @@ class EndpointFilterCRUDTestCase(TestExtensionCase): '/endpoints/%(endpoint_id)s' % { 'project_id': uuid.uuid4().hex, 'endpoint_id': self.endpoint_id}, - expected_status=404) + expected_status=http_client.NOT_FOUND) def test_create_endpoint_project_association_with_invalid_endpoint(self): """PUT /OS-EP-FILTER/projects/{project_id}/endpoints/{endpoint_id} @@ -72,7 +73,7 @@ class EndpointFilterCRUDTestCase(TestExtensionCase): '/endpoints/%(endpoint_id)s' % { 'project_id': self.default_domain_project_id, 'endpoint_id': uuid.uuid4().hex}, - expected_status=404) + expected_status=http_client.NOT_FOUND) def test_create_endpoint_project_association_with_unexpected_body(self): """PUT /OS-EP-FILTER/projects/{project_id}/endpoints/{endpoint_id} @@ -109,7 +110,7 @@ class EndpointFilterCRUDTestCase(TestExtensionCase): '/endpoints/%(endpoint_id)s' % { 'project_id': uuid.uuid4().hex, 'endpoint_id': self.endpoint_id}, - expected_status=404) + expected_status=http_client.NOT_FOUND) def test_check_endpoint_project_association_with_invalid_endpoint(self): """HEAD /OS-EP-FILTER/projects/{project_id}/endpoints/{endpoint_id} @@ -122,7 +123,7 @@ class EndpointFilterCRUDTestCase(TestExtensionCase): '/endpoints/%(endpoint_id)s' % { 'project_id': self.default_domain_project_id, 'endpoint_id': uuid.uuid4().hex}, - expected_status=404) + expected_status=http_client.NOT_FOUND) def test_list_endpoints_associated_with_valid_project(self): """GET /OS-EP-FILTER/projects/{project_id}/endpoints @@ -146,7 +147,7 @@ class EndpointFilterCRUDTestCase(TestExtensionCase): self.put(self.default_request_url) self.get('/OS-EP-FILTER/projects/%(project_id)s/endpoints' % { 'project_id': uuid.uuid4().hex}, - expected_status=404) + expected_status=http_client.NOT_FOUND) def test_list_projects_associated_with_endpoint(self): """GET /OS-EP-FILTER/endpoints/{endpoint_id}/projects @@ -180,7 +181,7 @@ class EndpointFilterCRUDTestCase(TestExtensionCase): """ self.get('/OS-EP-FILTER/endpoints/%(endpoint_id)s/projects' % {'endpoint_id': uuid.uuid4().hex}, - expected_status=404) + expected_status=http_client.NOT_FOUND) def test_remove_endpoint_project_association(self): """DELETE /OS-EP-FILTER/projects/{project_id}/endpoints/{endpoint_id} @@ -206,7 +207,7 @@ class EndpointFilterCRUDTestCase(TestExtensionCase): '/endpoints/%(endpoint_id)s' % { 'project_id': uuid.uuid4().hex, 'endpoint_id': self.endpoint_id}, - expected_status=404) + expected_status=http_client.NOT_FOUND) def test_remove_endpoint_project_association_with_invalid_endpoint(self): """DELETE /OS-EP-FILTER/projects/{project_id}/endpoints/{endpoint_id} @@ -219,7 +220,7 @@ class EndpointFilterCRUDTestCase(TestExtensionCase): '/endpoints/%(endpoint_id)s' % { 'project_id': self.default_domain_project_id, 'endpoint_id': uuid.uuid4().hex}, - expected_status=404) + expected_status=http_client.NOT_FOUND) def test_endpoint_project_association_cleanup_when_project_deleted(self): self.put(self.default_request_url) @@ -589,7 +590,7 @@ class EndpointGroupCRUDTestCase(TestExtensionCase): invalid_body['endpoint_group']['filters'] = {'foobar': 'admin'} self.post(self.DEFAULT_ENDPOINT_GROUP_URL, body=invalid_body, - expected_status=400) + expected_status=http_client.BAD_REQUEST) def test_get_endpoint_group(self): """GET /OS-EP-FILTER/endpoint_groups/{endpoint_group} @@ -624,7 +625,7 @@ class EndpointGroupCRUDTestCase(TestExtensionCase): endpoint_group_id = 'foobar' url = '/OS-EP-FILTER/endpoint_groups/%(endpoint_group_id)s' % { 'endpoint_group_id': endpoint_group_id} - self.get(url, expected_status=404) + self.get(url, expected_status=http_client.NOT_FOUND) def test_check_endpoint_group(self): """HEAD /OS-EP-FILTER/endpoint_groups/{endpoint_group_id} @@ -648,7 +649,7 @@ class EndpointGroupCRUDTestCase(TestExtensionCase): endpoint_group_id = 'foobar' url = '/OS-EP-FILTER/endpoint_groups/%(endpoint_group_id)s' % { 'endpoint_group_id': endpoint_group_id} - self.head(url, expected_status=404) + self.head(url, expected_status=http_client.NOT_FOUND) def test_patch_endpoint_group(self): """PATCH /OS-EP-FILTER/endpoint_groups/{endpoint_group} @@ -685,7 +686,7 @@ class EndpointGroupCRUDTestCase(TestExtensionCase): } url = '/OS-EP-FILTER/endpoint_groups/%(endpoint_group_id)s' % { 'endpoint_group_id': 'ABC'} - self.patch(url, body=body, expected_status=404) + self.patch(url, body=body, expected_status=http_client.NOT_FOUND) def test_patch_invalid_endpoint_group(self): """PATCH /OS-EP-FILTER/endpoint_groups/{endpoint_group} @@ -707,7 +708,7 @@ class EndpointGroupCRUDTestCase(TestExtensionCase): self.DEFAULT_ENDPOINT_GROUP_URL, self.DEFAULT_ENDPOINT_GROUP_BODY) url = '/OS-EP-FILTER/endpoint_groups/%(endpoint_group_id)s' % { 'endpoint_group_id': endpoint_group_id} - self.patch(url, body=body, expected_status=400) + self.patch(url, body=body, expected_status=http_client.BAD_REQUEST) # Perform a GET call to ensure that the content remains # the same (as DEFAULT_ENDPOINT_GROUP_BODY) after attempting to update @@ -731,7 +732,7 @@ class EndpointGroupCRUDTestCase(TestExtensionCase): url = '/OS-EP-FILTER/endpoint_groups/%(endpoint_group_id)s' % { 'endpoint_group_id': endpoint_group_id} self.delete(url) - self.get(url, expected_status=404) + self.get(url, expected_status=http_client.NOT_FOUND) def test_delete_invalid_endpoint_group(self): """GET /OS-EP-FILTER/endpoint_groups/{endpoint_group} @@ -742,7 +743,7 @@ class EndpointGroupCRUDTestCase(TestExtensionCase): endpoint_group_id = 'foobar' url = '/OS-EP-FILTER/endpoint_groups/%(endpoint_group_id)s' % { 'endpoint_group_id': endpoint_group_id} - self.delete(url, expected_status=404) + self.delete(url, expected_status=http_client.NOT_FOUND) def test_add_endpoint_group_to_project(self): """Create a valid endpoint group and project association.""" @@ -761,7 +762,7 @@ class EndpointGroupCRUDTestCase(TestExtensionCase): project_id = uuid.uuid4().hex url = self._get_project_endpoint_group_url( endpoint_group_id, project_id) - self.put(url, expected_status=404) + self.put(url, expected_status=http_client.NOT_FOUND) def test_get_endpoint_group_in_project(self): """Test retrieving project endpoint group association.""" @@ -787,7 +788,7 @@ class EndpointGroupCRUDTestCase(TestExtensionCase): project_id = uuid.uuid4().hex url = self._get_project_endpoint_group_url( endpoint_group_id, project_id) - self.get(url, expected_status=404) + self.get(url, expected_status=http_client.NOT_FOUND) def test_list_endpoint_groups_in_project(self): """GET /OS-EP-FILTER/projects/{project_id}/endpoint_groups.""" @@ -813,7 +814,7 @@ class EndpointGroupCRUDTestCase(TestExtensionCase): project_id = uuid.uuid4().hex url = ('/OS-EP-FILTER/projects/%(project_id)s/endpoint_groups' % {'project_id': project_id}) - self.get(url, expected_status=404) + self.get(url, expected_status=http_client.NOT_FOUND) def test_empty_endpoint_groups_in_project(self): """Test when no endpoint groups associated with the project.""" @@ -848,7 +849,7 @@ class EndpointGroupCRUDTestCase(TestExtensionCase): project_id = uuid.uuid4().hex url = self._get_project_endpoint_group_url( endpoint_group_id, project_id) - self.head(url, expected_status=404) + self.head(url, expected_status=http_client.NOT_FOUND) def test_list_endpoint_groups(self): """GET /OS-EP-FILTER/endpoint_groups.""" @@ -992,7 +993,7 @@ class EndpointGroupCRUDTestCase(TestExtensionCase): # endpoint group association again self.delete('/projects/%(project_id)s' % { 'project_id': project['id']}) - self.get(url, expected_status=404) + self.get(url, expected_status=http_client.NOT_FOUND) def test_endpoint_group_project_cleanup_with_endpoint_group(self): # create endpoint group @@ -1012,7 +1013,7 @@ class EndpointGroupCRUDTestCase(TestExtensionCase): # now remove the project endpoint group association self.delete(url) - self.get(url, expected_status=404) + self.get(url, expected_status=http_client.NOT_FOUND) def test_removing_an_endpoint_group_project(self): # create an endpoint group @@ -1026,7 +1027,7 @@ class EndpointGroupCRUDTestCase(TestExtensionCase): # remove the endpoint group project self.delete(url) - self.get(url, expected_status=404) + self.get(url, expected_status=http_client.NOT_FOUND) def test_remove_endpoint_group_with_project_association(self): # create an endpoint group @@ -1044,8 +1045,9 @@ class EndpointGroupCRUDTestCase(TestExtensionCase): '%(endpoint_group_id)s' % {'endpoint_group_id': endpoint_group_id}) self.delete(endpoint_group_url) - self.get(endpoint_group_url, expected_status=404) - self.get(project_endpoint_group_url, expected_status=404) + self.get(endpoint_group_url, expected_status=http_client.NOT_FOUND) + self.get(project_endpoint_group_url, + expected_status=http_client.NOT_FOUND) def _create_valid_endpoint_group(self, url, body): r = self.post(url, body=body) diff --git a/keystone/tests/unit/test_catalog.py b/keystone/tests/unit/test_catalog.py index 9f33e4fc6f..e5a59e3698 100644 --- a/keystone/tests/unit/test_catalog.py +++ b/keystone/tests/unit/test_catalog.py @@ -14,6 +14,8 @@ import uuid +from six.moves import http_client + from keystone import catalog from keystone.tests import unit as tests from keystone.tests.unit.ksfixtures import database @@ -102,16 +104,20 @@ class V2CatalogTestCase(rest.RestfulTestCase): self.assertNotIn("internalurl", response.result['endpoint']) def test_endpoint_create_with_null_publicurl(self): - self._endpoint_create(expected_status=400, publicurl=None) + self._endpoint_create(expected_status=http_client.BAD_REQUEST, + publicurl=None) def test_endpoint_create_with_empty_publicurl(self): - self._endpoint_create(expected_status=400, publicurl='') + self._endpoint_create(expected_status=http_client.BAD_REQUEST, + publicurl='') def test_endpoint_create_with_null_service_id(self): - self._endpoint_create(expected_status=400, service_id=None) + self._endpoint_create(expected_status=http_client.BAD_REQUEST, + service_id=None) def test_endpoint_create_with_empty_service_id(self): - self._endpoint_create(expected_status=400, service_id='') + self._endpoint_create(expected_status=http_client.BAD_REQUEST, + service_id='') def test_endpoint_create_with_valid_url(self): """Create endpoint with valid URL should be tested, too.""" @@ -146,7 +152,7 @@ class V2CatalogTestCase(rest.RestfulTestCase): # Case one: publicurl, internalurl and adminurl are # all invalid for invalid_url in invalid_urls: - self._endpoint_create(expected_status=400, + self._endpoint_create(expected_status=http_client.BAD_REQUEST, publicurl=invalid_url, internalurl=invalid_url, adminurl=invalid_url) @@ -154,7 +160,7 @@ class V2CatalogTestCase(rest.RestfulTestCase): # Case two: publicurl, internalurl are invalid # and adminurl is valid for invalid_url in invalid_urls: - self._endpoint_create(expected_status=400, + self._endpoint_create(expected_status=http_client.BAD_REQUEST, publicurl=invalid_url, internalurl=invalid_url, adminurl=valid_url) @@ -162,7 +168,7 @@ class V2CatalogTestCase(rest.RestfulTestCase): # Case three: publicurl, adminurl are invalid # and internalurl is valid for invalid_url in invalid_urls: - self._endpoint_create(expected_status=400, + self._endpoint_create(expected_status=http_client.BAD_REQUEST, publicurl=invalid_url, internalurl=valid_url, adminurl=invalid_url) @@ -170,7 +176,7 @@ class V2CatalogTestCase(rest.RestfulTestCase): # Case four: internalurl, adminurl are invalid # and publicurl is valid for invalid_url in invalid_urls: - self._endpoint_create(expected_status=400, + self._endpoint_create(expected_status=http_client.BAD_REQUEST, publicurl=valid_url, internalurl=invalid_url, adminurl=invalid_url) @@ -178,7 +184,7 @@ class V2CatalogTestCase(rest.RestfulTestCase): # Case five: publicurl is invalid, internalurl # and adminurl are valid for invalid_url in invalid_urls: - self._endpoint_create(expected_status=400, + self._endpoint_create(expected_status=http_client.BAD_REQUEST, publicurl=invalid_url, internalurl=valid_url, adminurl=valid_url) @@ -186,7 +192,7 @@ class V2CatalogTestCase(rest.RestfulTestCase): # Case six: internalurl is invalid, publicurl # and adminurl are valid for invalid_url in invalid_urls: - self._endpoint_create(expected_status=400, + self._endpoint_create(expected_status=http_client.BAD_REQUEST, publicurl=valid_url, internalurl=invalid_url, adminurl=valid_url) @@ -194,7 +200,7 @@ class V2CatalogTestCase(rest.RestfulTestCase): # Case seven: adminurl is invalid, publicurl # and internalurl are valid for invalid_url in invalid_urls: - self._endpoint_create(expected_status=400, + self._endpoint_create(expected_status=http_client.BAD_REQUEST, publicurl=valid_url, internalurl=valid_url, adminurl=invalid_url) diff --git a/keystone/tests/unit/test_middleware.py b/keystone/tests/unit/test_middleware.py index d420a568a0..17c143dbf4 100644 --- a/keystone/tests/unit/test_middleware.py +++ b/keystone/tests/unit/test_middleware.py @@ -16,6 +16,7 @@ import hashlib import uuid from oslo_config import cfg +from six.moves import http_client import webob from keystone.common import authorization @@ -96,14 +97,14 @@ class JsonBodyMiddlewareTest(tests.TestCase): content_type='application/json', method='POST') resp = middleware.JsonBodyMiddleware(None).process_request(req) - self.assertEqual(400, resp.status_int) + self.assertEqual(http_client.BAD_REQUEST, resp.status_int) def test_not_dict_body(self): req = make_request(body='42', content_type='application/json', method='POST') resp = middleware.JsonBodyMiddleware(None).process_request(req) - self.assertEqual(400, resp.status_int) + self.assertEqual(http_client.BAD_REQUEST, resp.status_int) self.assertTrue('valid JSON object' in resp.json['error']['message']) def test_no_content_type(self): @@ -118,7 +119,7 @@ class JsonBodyMiddlewareTest(tests.TestCase): content_type='text/plain', method='POST') resp = middleware.JsonBodyMiddleware(None).process_request(req) - self.assertEqual(400, resp.status_int) + self.assertEqual(http_client.BAD_REQUEST, resp.status_int) def test_unrecognized_content_type_without_body(self): req = make_request(content_type='text/plain', diff --git a/keystone/tests/unit/test_no_admin_token_auth.py b/keystone/tests/unit/test_no_admin_token_auth.py index 9f67fbd71d..887175dcf8 100644 --- a/keystone/tests/unit/test_no_admin_token_auth.py +++ b/keystone/tests/unit/test_no_admin_token_auth.py @@ -14,6 +14,7 @@ import os +from six.moves import http_client import webtest from keystone.tests import unit as tests @@ -56,4 +57,4 @@ class TestNoAdminTokenAuth(tests.TestCase): # If the following does not raise, then the test is successful. self.admin_app.get(REQ_PATH, headers={'X-Auth-Token': 'NotAdminToken'}, - status=401) + status=http_client.UNAUTHORIZED) diff --git a/keystone/tests/unit/test_v2.py b/keystone/tests/unit/test_v2.py index 415150cf34..acdfca5f06 100644 --- a/keystone/tests/unit/test_v2.py +++ b/keystone/tests/unit/test_v2.py @@ -19,6 +19,7 @@ import uuid from keystoneclient.common import cms from oslo_config import cfg import six +from six.moves import http_client from testtools import matchers from keystone.common import extension as keystone_extension @@ -70,13 +71,13 @@ class CoreApiTests(object): def test_public_not_found(self): r = self.public_request( path='/%s' % uuid.uuid4().hex, - expected_status=404) + expected_status=http_client.NOT_FOUND) self.assertValidErrorResponse(r) def test_admin_not_found(self): r = self.admin_request( path='/%s' % uuid.uuid4().hex, - expected_status=404) + expected_status=http_client.NOT_FOUND) self.assertValidErrorResponse(r) def test_public_multiple_choice(self): @@ -107,11 +108,11 @@ class CoreApiTests(object): def test_admin_extensions_404(self): self.admin_request(path='/v2.0/extensions/invalid-extension', - expected_status=404) + expected_status=http_client.NOT_FOUND) def test_public_osksadm_extension_404(self): self.public_request(path='/v2.0/extensions/OS-KSADM', - expected_status=404) + expected_status=http_client.NOT_FOUND) def test_admin_osksadm_extension(self): r = self.admin_request(path='/v2.0/extensions/OS-KSADM') @@ -170,7 +171,7 @@ class CoreApiTests(object): 'token_id': 'invalid', }, token=token, - expected_status=404) + expected_status=http_client.NOT_FOUND) def test_validate_token_service_role(self): self.md_foobar = self.assignment_api.add_role_to_user_and_project( @@ -204,7 +205,7 @@ class CoreApiTests(object): r = self.admin_request( path='/v2.0/tokens/%s' % token, token=token, - expected_status=401) + expected_status=http_client.UNAUTHORIZED) def test_validate_token_belongs_to(self): token = self.get_scoped_token() @@ -306,7 +307,7 @@ class CoreApiTests(object): }, }, token=token, - expected_status=400) + expected_status=http_client.BAD_REQUEST) self.assertValidErrorResponse(r) r = self.admin_request( @@ -321,7 +322,7 @@ class CoreApiTests(object): }, }, token=token, - expected_status=400) + expected_status=http_client.BAD_REQUEST) self.assertValidErrorResponse(r) # Test UPDATE request @@ -338,7 +339,7 @@ class CoreApiTests(object): }, }, token=token, - expected_status=400) + expected_status=http_client.BAD_REQUEST) self.assertValidErrorResponse(r) r = self.admin_request( @@ -351,7 +352,7 @@ class CoreApiTests(object): }, }, token=token, - expected_status=400) + expected_status=http_client.BAD_REQUEST) self.assertValidErrorResponse(r) def test_create_update_user_valid_enabled_type(self): @@ -373,7 +374,8 @@ class CoreApiTests(object): def test_error_response(self): """This triggers assertValidErrorResponse by convention.""" - self.public_request(path='/v2.0/tenants', expected_status=401) + self.public_request(path='/v2.0/tenants', + expected_status=http_client.UNAUTHORIZED) def test_invalid_parameter_error_response(self): token = self.get_scoped_token() @@ -387,13 +389,13 @@ class CoreApiTests(object): path='/v2.0/OS-KSADM/services', body=bad_body, token=token, - expected_status=400) + expected_status=http_client.BAD_REQUEST) self.assertValidErrorResponse(res) res = self.admin_request(method='POST', path='/v2.0/users', body=bad_body, token=token, - expected_status=400) + expected_status=http_client.BAD_REQUEST) self.assertValidErrorResponse(res) def _get_user_id(self, r): @@ -552,7 +554,7 @@ class CoreApiTests(object): }, }, token=token, - expected_status=404) + expected_status=http_client.NOT_FOUND) def test_update_user_with_invalid_tenant_no_prev_tenant(self): token = self.get_scoped_token() @@ -584,7 +586,7 @@ class CoreApiTests(object): }, }, token=token, - expected_status=404) + expected_status=http_client.NOT_FOUND) def test_update_user_with_old_tenant(self): token = self.get_scoped_token() @@ -669,13 +671,13 @@ class CoreApiTests(object): }, }, }, - expected_status=401) + expected_status=http_client.UNAUTHORIZED) self.assertValidErrorResponse(r) def test_www_authenticate_header(self): r = self.public_request( path='/v2.0/tenants', - expected_status=401) + expected_status=http_client.UNAUTHORIZED) self.assertEqual('Keystone uri="http://localhost"', r.headers.get('WWW-Authenticate')) @@ -684,7 +686,7 @@ class CoreApiTests(object): self.config_fixture.config(public_endpoint=test_url) r = self.public_request( path='/v2.0/tenants', - expected_status=401) + expected_status=http_client.UNAUTHORIZED) self.assertEqual('Keystone uri="%s"' % test_url, r.headers.get('WWW-Authenticate')) @@ -1141,8 +1143,9 @@ class V2TestCase(RestfulTestCase, CoreApiTests, LegacyV2UsernameTests): return r.result['user'][attribute_name] def test_service_crud_requires_auth(self): - """Service CRUD should 401 without an X-Auth-Token (bug 1006822).""" - # values here don't matter because we should 401 before they're checked + """Service CRUD should return unauthorized without an X-Auth-Token.""" + # values here don't matter because it will be unauthorized before + # they're checked (bug 1006822). service_path = '/v2.0/OS-KSADM/services/%s' % uuid.uuid4().hex service_body = { 'OS-KSADM:service': { @@ -1153,41 +1156,43 @@ class V2TestCase(RestfulTestCase, CoreApiTests, LegacyV2UsernameTests): r = self.admin_request(method='GET', path='/v2.0/OS-KSADM/services', - expected_status=401) + expected_status=http_client.UNAUTHORIZED) self.assertValidErrorResponse(r) r = self.admin_request(method='POST', path='/v2.0/OS-KSADM/services', body=service_body, - expected_status=401) + expected_status=http_client.UNAUTHORIZED) self.assertValidErrorResponse(r) r = self.admin_request(method='GET', path=service_path, - expected_status=401) + expected_status=http_client.UNAUTHORIZED) self.assertValidErrorResponse(r) r = self.admin_request(method='DELETE', path=service_path, - expected_status=401) + expected_status=http_client.UNAUTHORIZED) self.assertValidErrorResponse(r) def test_user_role_list_requires_auth(self): - """User role list should 401 without an X-Auth-Token (bug 1006815).""" - # values here don't matter because we should 401 before they're checked + """User role list return unauthorized without an X-Auth-Token.""" + # values here don't matter because it will be unauthorized before + # they're checked (bug 1006815). path = '/v2.0/tenants/%(tenant_id)s/users/%(user_id)s/roles' % { 'tenant_id': uuid.uuid4().hex, 'user_id': uuid.uuid4().hex, } - r = self.admin_request(path=path, expected_status=401) + r = self.admin_request(path=path, + expected_status=http_client.UNAUTHORIZED) self.assertValidErrorResponse(r) def test_fetch_revocation_list_nonadmin_fails(self): self.admin_request( method='GET', path='/v2.0/tokens/revoked', - expected_status=401) + expected_status=http_client.UNAUTHORIZED) def test_fetch_revocation_list_admin_200(self): token = self.get_scoped_token() @@ -1278,7 +1283,7 @@ class V2TestCase(RestfulTestCase, CoreApiTests, LegacyV2UsernameTests): }, }, token=token, - expected_status=400) + expected_status=http_client.BAD_REQUEST) self.assertValidErrorResponse(r) # Test UPDATE request @@ -1294,7 +1299,7 @@ class V2TestCase(RestfulTestCase, CoreApiTests, LegacyV2UsernameTests): }, }, token=token, - expected_status=400) + expected_status=http_client.BAD_REQUEST) self.assertValidErrorResponse(r) def test_authenticating_a_user_with_an_OSKSADM_password(self): diff --git a/keystone/tests/unit/test_v2_keystoneclient.py b/keystone/tests/unit/test_v2_keystoneclient.py index abd22a273c..54489f22c5 100644 --- a/keystone/tests/unit/test_v2_keystoneclient.py +++ b/keystone/tests/unit/test_v2_keystoneclient.py @@ -22,6 +22,7 @@ import mock from oslo_config import cfg from oslo_serialization import jsonutils from oslo_utils import timeutils +from six.moves import http_client from six.moves import range import webob @@ -1032,7 +1033,8 @@ class ClientDrivenTestCase(tests.TestCase): (new_password, self.user_two['password'])) self.public_server.application(req.environ, responseobject.start_fake_response) - self.assertEqual(403, responseobject.response_status) + self.assertEqual(http_client.FORBIDDEN, + responseobject.response_status) self.user_two['password'] = new_password self.assertRaises(client_exceptions.Unauthorized, diff --git a/keystone/tests/unit/test_v3_assignment.py b/keystone/tests/unit/test_v3_assignment.py index 03e5d30b28..f2e98d6392 100644 --- a/keystone/tests/unit/test_v3_assignment.py +++ b/keystone/tests/unit/test_v3_assignment.py @@ -14,6 +14,7 @@ import random import uuid from oslo_config import cfg +from six.moves import http_client from six.moves import range from keystone.common import controller @@ -75,9 +76,10 @@ class AssignmentTestCase(test_v3.RestfulTestCase, body={'domain': ref}) self.assertValidDomainResponse(r, ref) - def test_create_domain_400(self): + def test_create_domain_bad_request(self): """Call ``POST /domains``.""" - self.post('/domains', body={'domain': {}}, expected_status=400) + self.post('/domains', body={'domain': {}}, + expected_status=http_client.BAD_REQUEST) def test_list_domains(self): """Call ``GET /domains``.""" @@ -133,7 +135,8 @@ class AssignmentTestCase(test_v3.RestfulTestCase, } } self.admin_request( - path='/v2.0/tokens', method='POST', body=body, expected_status=401) + path='/v2.0/tokens', method='POST', body=body, + expected_status=http_client.UNAUTHORIZED) auth_data = self.build_authentication_request( user_id=self.user2['id'], @@ -160,21 +163,24 @@ class AssignmentTestCase(test_v3.RestfulTestCase, } } self.admin_request( - path='/v2.0/tokens', method='POST', body=body, expected_status=401) + path='/v2.0/tokens', method='POST', body=body, + expected_status=http_client.UNAUTHORIZED) # Try looking up in v3 by name and id auth_data = self.build_authentication_request( user_id=self.user2['id'], password=self.user2['password'], project_id=self.project2['id']) - self.v3_authenticate_token(auth_data, expected_status=401) + self.v3_authenticate_token(auth_data, + expected_status=http_client.UNAUTHORIZED) auth_data = self.build_authentication_request( username=self.user2['name'], user_domain_id=self.domain2['id'], password=self.user2['password'], project_id=self.project2['id']) - self.v3_authenticate_token(auth_data, expected_status=401) + self.v3_authenticate_token(auth_data, + expected_status=http_client.UNAUTHORIZED) def test_delete_enabled_domain_fails(self): """Call ``DELETE /domains/{domain_id}`` (when domain enabled).""" @@ -370,7 +376,7 @@ class AssignmentTestCase(test_v3.RestfulTestCase, # as the domain has already been disabled. self.head('/auth/tokens', headers={'x-subject-token': subject_token}, - expected_status=404) + expected_status=http_client.NOT_FOUND) def test_delete_domain_hierarchy(self): """Call ``DELETE /domains/{domain_id}``.""" @@ -485,14 +491,16 @@ class AssignmentTestCase(test_v3.RestfulTestCase, body={'project': ref}) self.assertValidProjectResponse(r, ref) - def test_create_project_400(self): + def test_create_project_bad_request(self): """Call ``POST /projects``.""" - self.post('/projects', body={'project': {}}, expected_status=400) + self.post('/projects', body={'project': {}}, + expected_status=http_client.BAD_REQUEST) def test_create_project_invalid_domain_id(self): """Call ``POST /projects``.""" ref = self.new_project_ref(domain_id=uuid.uuid4().hex) - self.post('/projects', body={'project': ref}, expected_status=400) + self.post('/projects', body={'project': ref}, + expected_status=http_client.BAD_REQUEST) def test_create_project_is_domain_not_allowed(self): """Call ``POST /projects``. @@ -644,18 +652,20 @@ class AssignmentTestCase(test_v3.RestfulTestCase, def test_get_project_with_parents_as_list_with_invalid_id(self): """Call ``GET /projects/{project_id}?parents_as_list``.""" self.get('/projects/%(project_id)s?parents_as_list' % { - 'project_id': None}, expected_status=404) + 'project_id': None}, expected_status=http_client.NOT_FOUND) self.get('/projects/%(project_id)s?parents_as_list' % { - 'project_id': uuid.uuid4().hex}, expected_status=404) + 'project_id': uuid.uuid4().hex}, + expected_status=http_client.NOT_FOUND) def test_get_project_with_subtree_as_list_with_invalid_id(self): """Call ``GET /projects/{project_id}?subtree_as_list``.""" self.get('/projects/%(project_id)s?subtree_as_list' % { - 'project_id': None}, expected_status=404) + 'project_id': None}, expected_status=http_client.NOT_FOUND) self.get('/projects/%(project_id)s?subtree_as_list' % { - 'project_id': uuid.uuid4().hex}, expected_status=404) + 'project_id': uuid.uuid4().hex}, + expected_status=http_client.NOT_FOUND) def test_get_project_with_parents_as_ids(self): """Call ``GET /projects/{project_id}?parents_as_ids``.""" @@ -766,7 +776,7 @@ class AssignmentTestCase(test_v3.RestfulTestCase, self.get( '/projects/%(project_id)s?parents_as_list&parents_as_ids' % { 'project_id': projects[1]['project']['id']}, - expected_status=400) + expected_status=http_client.BAD_REQUEST) def test_get_project_with_subtree_as_ids(self): """Call ``GET /projects/{project_id}?subtree_as_ids``. @@ -928,7 +938,7 @@ class AssignmentTestCase(test_v3.RestfulTestCase, self.get( '/projects/%(project_id)s?subtree_as_list&subtree_as_ids' % { 'project_id': projects[1]['project']['id']}, - expected_status=400) + expected_status=http_client.BAD_REQUEST) def test_update_project(self): """Call ``PATCH /projects/{project_id}``.""" @@ -965,7 +975,7 @@ class AssignmentTestCase(test_v3.RestfulTestCase, '/projects/%(project_id)s' % { 'project_id': leaf_project['id']}, body={'project': leaf_project}, - expected_status=403) + expected_status=http_client.FORBIDDEN) def test_update_project_is_domain_not_allowed(self): """Call ``PATCH /projects/{project_id}`` with is_domain. @@ -981,7 +991,7 @@ class AssignmentTestCase(test_v3.RestfulTestCase, self.patch('/projects/%(project_id)s' % { 'project_id': resp.result['project']['id']}, body={'project': project}, - expected_status=400) + expected_status=http_client.BAD_REQUEST) def test_disable_leaf_project(self): """Call ``PATCH /projects/{project_id}``.""" @@ -1004,7 +1014,7 @@ class AssignmentTestCase(test_v3.RestfulTestCase, '/projects/%(project_id)s' % { 'project_id': root_project['id']}, body={'project': root_project}, - expected_status=403) + expected_status=http_client.FORBIDDEN) def test_delete_project(self): """Call ``DELETE /projects/{project_id}`` @@ -1048,7 +1058,7 @@ class AssignmentTestCase(test_v3.RestfulTestCase, self.delete( '/projects/%(project_id)s' % { 'project_id': projects[0]['project']['id']}, - expected_status=403) + expected_status=http_client.FORBIDDEN) # Role CRUD tests @@ -1060,9 +1070,10 @@ class AssignmentTestCase(test_v3.RestfulTestCase, body={'role': ref}) return self.assertValidRoleResponse(r, ref) - def test_create_role_400(self): + def test_create_role_bad_request(self): """Call ``POST /roles``.""" - self.post('/roles', body={'role': {}}, expected_status=400) + self.post('/roles', body={'role': {}}, + expected_status=http_client.BAD_REQUEST) def test_list_roles(self): """Call ``GET /roles``.""" @@ -1132,7 +1143,7 @@ class AssignmentTestCase(test_v3.RestfulTestCase, """Grant role on a project to a user that doesn't exist, 404 result. When grant a role on a project to a user that doesn't exist, the server - returns 404 Not Found for the user. + returns Not Found for the user. """ @@ -1145,7 +1156,7 @@ class AssignmentTestCase(test_v3.RestfulTestCase, 'collection_url': collection_url, 'role_id': self.role_id} - self.put(member_url, expected_status=404) + self.put(member_url, expected_status=http_client.NOT_FOUND) def test_crud_user_domain_role_grants(self): collection_url = ( @@ -1184,7 +1195,7 @@ class AssignmentTestCase(test_v3.RestfulTestCase, 'collection_url': collection_url, 'role_id': self.role_id} - self.put(member_url, expected_status=404) + self.put(member_url, expected_status=http_client.NOT_FOUND) def test_crud_group_project_role_grants(self): collection_url = ( @@ -1224,7 +1235,7 @@ class AssignmentTestCase(test_v3.RestfulTestCase, 'collection_url': collection_url, 'role_id': self.role_id} - self.put(member_url, expected_status=404) + self.put(member_url, expected_status=http_client.NOT_FOUND) def test_crud_group_domain_role_grants(self): collection_url = ( @@ -1264,7 +1275,7 @@ class AssignmentTestCase(test_v3.RestfulTestCase, 'collection_url': collection_url, 'role_id': self.role_id} - self.put(member_url, expected_status=404) + self.put(member_url, expected_status=http_client.NOT_FOUND) def _create_new_user_and_assign_role_on_project(self): """Create a new user and assign user a role on a project.""" @@ -1292,7 +1303,7 @@ class AssignmentTestCase(test_v3.RestfulTestCase, # Clean up the role assignment self.delete(member_url, expected_status=204) # Make sure the role is gone - self.head(member_url, expected_status=404) + self.head(member_url, expected_status=http_client.NOT_FOUND) def test_delete_user_and_check_role_assignment_fails(self): """Call ``DELETE`` on the user and check the role assignment.""" @@ -1301,7 +1312,7 @@ class AssignmentTestCase(test_v3.RestfulTestCase, self.identity_api.delete_user(user['id']) # We should get a 404 when looking for the user in the identity # backend because we're not performing a delete operation on the role. - self.head(member_url, expected_status=404) + self.head(member_url, expected_status=http_client.NOT_FOUND) def test_token_revoked_once_group_role_grant_revoked(self): """Test token is revoked when group role grant is revoked @@ -1343,7 +1354,7 @@ class AssignmentTestCase(test_v3.RestfulTestCase, # validates the same token again; it should not longer be valid. self.head('/auth/tokens', headers={'x-subject-token': token}, - expected_status=404) + expected_status=http_client.NOT_FOUND) # Role Assignments tests @@ -1903,24 +1914,24 @@ class RoleAssignmentFailureTestCase(RoleAssignmentBaseTestCase): def test_get_role_assignments_by_domain_and_project(self): self.get_role_assignments(domain_id=self.domain_id, project_id=self.project_id, - expected_status=400) + expected_status=http_client.BAD_REQUEST) def test_get_role_assignments_by_user_and_group(self): self.get_role_assignments(user_id=self.default_user_id, group_id=self.default_group_id, - expected_status=400) + expected_status=http_client.BAD_REQUEST) def test_get_role_assignments_by_effective_and_inherited(self): self.config_fixture.config(group='os_inherit', enabled=True) self.get_role_assignments(domain_id=self.domain_id, effective=True, inherited_to_projects=True, - expected_status=400) + expected_status=http_client.BAD_REQUEST) def test_get_role_assignments_by_effective_and_group(self): self.get_role_assignments(effective=True, group_id=self.default_group_id, - expected_status=400) + expected_status=http_client.BAD_REQUEST) class RoleAssignmentDirectTestCase(RoleAssignmentBaseTestCase): @@ -2193,8 +2204,10 @@ class AssignmentInheritanceTestCase(test_v3.RestfulTestCase, project_id=self.project_id) # Check the user cannot get a domain nor a project token - self.v3_authenticate_token(domain_auth_data, expected_status=401) - self.v3_authenticate_token(project_auth_data, expected_status=401) + self.v3_authenticate_token(domain_auth_data, + expected_status=http_client.UNAUTHORIZED) + self.v3_authenticate_token(project_auth_data, + expected_status=http_client.UNAUTHORIZED) # Grant non-inherited role for user on domain non_inher_ud_link = self.build_role_assignment_link( @@ -2203,7 +2216,8 @@ class AssignmentInheritanceTestCase(test_v3.RestfulTestCase, # Check the user can get only a domain token self.v3_authenticate_token(domain_auth_data) - self.v3_authenticate_token(project_auth_data, expected_status=401) + self.v3_authenticate_token(project_auth_data, + expected_status=http_client.UNAUTHORIZED) # Create inherited role inherited_role = {'id': uuid.uuid4().hex, 'name': 'inherited'} @@ -2224,13 +2238,15 @@ class AssignmentInheritanceTestCase(test_v3.RestfulTestCase, # Check the user can only get a domain token self.v3_authenticate_token(domain_auth_data) - self.v3_authenticate_token(project_auth_data, expected_status=401) + self.v3_authenticate_token(project_auth_data, + expected_status=http_client.UNAUTHORIZED) # Delete non-inherited grant self.delete(non_inher_ud_link) # Check the user cannot get a domain token anymore - self.v3_authenticate_token(domain_auth_data, expected_status=401) + self.v3_authenticate_token(domain_auth_data, + expected_status=http_client.UNAUTHORIZED) def test_get_token_from_inherited_group_domain_role_grants(self): # Create a new group and put a new user in it to @@ -2255,8 +2271,10 @@ class AssignmentInheritanceTestCase(test_v3.RestfulTestCase, project_id=self.project_id) # Check the user cannot get a domain nor a project token - self.v3_authenticate_token(domain_auth_data, expected_status=401) - self.v3_authenticate_token(project_auth_data, expected_status=401) + self.v3_authenticate_token(domain_auth_data, + expected_status=http_client.UNAUTHORIZED) + self.v3_authenticate_token(project_auth_data, + expected_status=http_client.UNAUTHORIZED) # Grant non-inherited role for user on domain non_inher_gd_link = self.build_role_assignment_link( @@ -2265,7 +2283,8 @@ class AssignmentInheritanceTestCase(test_v3.RestfulTestCase, # Check the user can get only a domain token self.v3_authenticate_token(domain_auth_data) - self.v3_authenticate_token(project_auth_data, expected_status=401) + self.v3_authenticate_token(project_auth_data, + expected_status=http_client.UNAUTHORIZED) # Create inherited role inherited_role = {'id': uuid.uuid4().hex, 'name': 'inherited'} @@ -2286,13 +2305,15 @@ class AssignmentInheritanceTestCase(test_v3.RestfulTestCase, # Check the user can only get a domain token self.v3_authenticate_token(domain_auth_data) - self.v3_authenticate_token(project_auth_data, expected_status=401) + self.v3_authenticate_token(project_auth_data, + expected_status=http_client.UNAUTHORIZED) # Delete non-inherited grant self.delete(non_inher_gd_link) # Check the user cannot get a domain token anymore - self.v3_authenticate_token(domain_auth_data, expected_status=401) + self.v3_authenticate_token(domain_auth_data, + expected_status=http_client.UNAUTHORIZED) def _test_crud_inherited_and_direct_assignment_on_target(self, target_url): # Create a new role to avoid assignments loaded from sample data @@ -2308,7 +2329,7 @@ class AssignmentInheritanceTestCase(test_v3.RestfulTestCase, self.put(direct_url) # Check the direct assignment exists, but the inherited one does not self.head(direct_url) - self.head(inherited_url, expected_status=404) + self.head(inherited_url, expected_status=http_client.NOT_FOUND) # Now add the inherited assignment self.put(inherited_url) @@ -2320,13 +2341,13 @@ class AssignmentInheritanceTestCase(test_v3.RestfulTestCase, self.delete(inherited_url) # Check the direct assignment exists, but the inherited one does not self.head(direct_url) - self.head(inherited_url, expected_status=404) + self.head(inherited_url, expected_status=http_client.NOT_FOUND) # Now delete the inherited assignment self.delete(direct_url) # Check that none of them exist - self.head(direct_url, expected_status=404) - self.head(inherited_url, expected_status=404) + self.head(direct_url, expected_status=http_client.NOT_FOUND) + self.head(inherited_url, expected_status=http_client.NOT_FOUND) def test_crud_inherited_and_direct_assignment_on_domains(self): self._test_crud_inherited_and_direct_assignment_on_target( @@ -2801,8 +2822,10 @@ class AssignmentInheritanceTestCase(test_v3.RestfulTestCase, project_id=leaf_id) # Check the user cannot get a token on root nor leaf project - self.v3_authenticate_token(root_project_auth_data, expected_status=401) - self.v3_authenticate_token(leaf_project_auth_data, expected_status=401) + self.v3_authenticate_token(root_project_auth_data, + expected_status=http_client.UNAUTHORIZED) + self.v3_authenticate_token(leaf_project_auth_data, + expected_status=http_client.UNAUTHORIZED) # Grant non-inherited role for user on leaf project non_inher_up_link = self.build_role_assignment_link( @@ -2811,7 +2834,8 @@ class AssignmentInheritanceTestCase(test_v3.RestfulTestCase, self.put(non_inher_up_link) # Check the user can only get a token on leaf project - self.v3_authenticate_token(root_project_auth_data, expected_status=401) + self.v3_authenticate_token(root_project_auth_data, + expected_status=http_client.UNAUTHORIZED) self.v3_authenticate_token(leaf_project_auth_data) # Grant inherited role for user on root project @@ -2821,21 +2845,24 @@ class AssignmentInheritanceTestCase(test_v3.RestfulTestCase, self.put(inher_up_link) # Check the user still can get a token only on leaf project - self.v3_authenticate_token(root_project_auth_data, expected_status=401) + self.v3_authenticate_token(root_project_auth_data, + expected_status=http_client.UNAUTHORIZED) self.v3_authenticate_token(leaf_project_auth_data) # Delete non-inherited grant self.delete(non_inher_up_link) # Check the inherited role still applies for leaf project - self.v3_authenticate_token(root_project_auth_data, expected_status=401) + self.v3_authenticate_token(root_project_auth_data, + expected_status=http_client.UNAUTHORIZED) self.v3_authenticate_token(leaf_project_auth_data) # Delete inherited grant self.delete(inher_up_link) # Check the user cannot get a token on leaf project anymore - self.v3_authenticate_token(leaf_project_auth_data, expected_status=401) + self.v3_authenticate_token(leaf_project_auth_data, + expected_status=http_client.UNAUTHORIZED) def test_get_token_from_inherited_group_project_role_grants(self): # Create default scenario @@ -2858,8 +2885,10 @@ class AssignmentInheritanceTestCase(test_v3.RestfulTestCase, project_id=leaf_id) # Check the user cannot get a token on root nor leaf project - self.v3_authenticate_token(root_project_auth_data, expected_status=401) - self.v3_authenticate_token(leaf_project_auth_data, expected_status=401) + self.v3_authenticate_token(root_project_auth_data, + expected_status=http_client.UNAUTHORIZED) + self.v3_authenticate_token(leaf_project_auth_data, + expected_status=http_client.UNAUTHORIZED) # Grant non-inherited role for group on leaf project non_inher_gp_link = self.build_role_assignment_link( @@ -2868,7 +2897,8 @@ class AssignmentInheritanceTestCase(test_v3.RestfulTestCase, self.put(non_inher_gp_link) # Check the user can only get a token on leaf project - self.v3_authenticate_token(root_project_auth_data, expected_status=401) + self.v3_authenticate_token(root_project_auth_data, + expected_status=http_client.UNAUTHORIZED) self.v3_authenticate_token(leaf_project_auth_data) # Grant inherited role for group on root project @@ -2878,7 +2908,8 @@ class AssignmentInheritanceTestCase(test_v3.RestfulTestCase, self.put(inher_gp_link) # Check the user still can get a token only on leaf project - self.v3_authenticate_token(root_project_auth_data, expected_status=401) + self.v3_authenticate_token(root_project_auth_data, + expected_status=http_client.UNAUTHORIZED) self.v3_authenticate_token(leaf_project_auth_data) # Delete no-inherited grant @@ -2891,7 +2922,8 @@ class AssignmentInheritanceTestCase(test_v3.RestfulTestCase, self.delete(inher_gp_link) # Check the user cannot get a token on leaf project anymore - self.v3_authenticate_token(leaf_project_auth_data, expected_status=401) + self.v3_authenticate_token(leaf_project_auth_data, + expected_status=http_client.UNAUTHORIZED) def test_get_role_assignments_for_project_hierarchy(self): """Call ``GET /role_assignments``. @@ -3069,10 +3101,10 @@ class AssignmentInheritanceDisabledTestCase(test_v3.RestfulTestCase): 'role_id': role['id']} collection_url = base_collection_url + '/inherited_to_projects' - self.put(member_url, expected_status=404) - self.head(member_url, expected_status=404) - self.get(collection_url, expected_status=404) - self.delete(member_url, expected_status=404) + self.put(member_url, expected_status=http_client.NOT_FOUND) + self.head(member_url, expected_status=http_client.NOT_FOUND) + self.get(collection_url, expected_status=http_client.NOT_FOUND) + self.delete(member_url, expected_status=http_client.NOT_FOUND) class AssignmentV3toV2MethodsTestCase(tests.TestCase): diff --git a/keystone/tests/unit/test_v3_auth.py b/keystone/tests/unit/test_v3_auth.py index 96f0ff1f17..726d626a72 100644 --- a/keystone/tests/unit/test_v3_auth.py +++ b/keystone/tests/unit/test_v3_auth.py @@ -22,6 +22,7 @@ from keystoneclient.common import cms import mock from oslo_config import cfg from oslo_utils import timeutils +from six.moves import http_client from six.moves import range from testtools import matchers from testtools import testcase @@ -141,7 +142,7 @@ class TokenAPITests(object): path='/v2.0/tokens/%s' % v3_token, token=CONF.admin_token, method='GET', - expected_status=401) + expected_status=http_client.UNAUTHORIZED) def test_v3_v2_intermix_new_default_domain(self): # If the default_domain_id config option is changed, then should be @@ -199,7 +200,7 @@ class TokenAPITests(object): method='GET', path='/v2.0/tokens/%s' % v3_token, token=CONF.admin_token, - expected_status=401) + expected_status=http_client.UNAUTHORIZED) def test_v3_v2_intermix_non_default_project_failed(self): # self.project is in a non-default domain @@ -213,7 +214,7 @@ class TokenAPITests(object): method='GET', path='/v2.0/tokens/%s' % v3_token, token=CONF.admin_token, - expected_status=401) + expected_status=http_client.UNAUTHORIZED) def test_v3_v2_intermix_non_default_user_failed(self): self.assignment_api.create_grant( @@ -232,7 +233,7 @@ class TokenAPITests(object): method='GET', path='/v2.0/tokens/%s' % v3_token, token=CONF.admin_token, - expected_status=401) + expected_status=http_client.UNAUTHORIZED) def test_v3_v2_intermix_domain_scope_failed(self): self.assignment_api.create_grant( @@ -250,7 +251,7 @@ class TokenAPITests(object): path='/v2.0/tokens/%s' % v3_token, token=CONF.admin_token, method='GET', - expected_status=401) + expected_status=http_client.UNAUTHORIZED) def test_v3_v2_unscoped_token_intermix(self): r = self.v3_authenticate_token(self.build_authentication_request( @@ -390,7 +391,7 @@ class TokenAPITests(object): # Attempting to use the deleted token on v2 should fail. self.admin_request( path='/v2.0/tenants', method='GET', token=v2_token, - expected_status=401) + expected_status=http_client.UNAUTHORIZED) def test_rescoping_token(self): expires = self.v3_token_data['token']['expires_at'] @@ -434,7 +435,7 @@ class AllowRescopeScopedTokenDisabledTests(test_v3.RestfulTestCase): self.build_authentication_request( token=self.get_scoped_token(), project_id=self.project_id), - expected_status=403) + expected_status=http_client.FORBIDDEN) def _v2_token(self): body = { @@ -460,7 +461,7 @@ class AllowRescopeScopedTokenDisabledTests(test_v3.RestfulTestCase): self.admin_request(path='/v2.0/tokens', method='POST', body=body, - expected_status=403) + expected_status=http_client.FORBIDDEN) def test_rescoping_v2_to_v3_disabled(self): token = self._v2_token() @@ -468,7 +469,7 @@ class AllowRescopeScopedTokenDisabledTests(test_v3.RestfulTestCase): self.build_authentication_request( token=token['access']['token']['id'], project_id=self.project_id), - expected_status=403) + expected_status=http_client.FORBIDDEN) def test_rescoping_v3_to_v2_disabled(self): token = {'id': self.get_scoped_token()} @@ -498,7 +499,7 @@ class AllowRescopeScopedTokenDisabledTests(test_v3.RestfulTestCase): self.build_authentication_request( token=domain_scoped_token, project_id=self.project_id), - expected_status=403) + expected_status=http_client.FORBIDDEN) class TestPKITokenAPIs(test_v3.RestfulTestCase, TokenAPITests): @@ -660,17 +661,21 @@ class TestTokenRevokeSelfAndAdmin(test_v3.RestfulTestCase): token=user_token) self.delete('/auth/tokens', headers=headers, expected_status=204, token=user_token) - # invalid X-Auth-Token and invalid X-Subject-Token (401) - self.head('/auth/tokens', headers=headers, expected_status=401, + # invalid X-Auth-Token and invalid X-Subject-Token + self.head('/auth/tokens', headers=headers, + expected_status=http_client.UNAUTHORIZED, token=user_token) - # invalid X-Auth-Token and invalid X-Subject-Token (401) - self.delete('/auth/tokens', headers=headers, expected_status=401, + # invalid X-Auth-Token and invalid X-Subject-Token + self.delete('/auth/tokens', headers=headers, + expected_status=http_client.UNAUTHORIZED, token=user_token) - # valid X-Auth-Token and invalid X-Subject-Token (404) - self.delete('/auth/tokens', headers=headers, expected_status=404, + # valid X-Auth-Token and invalid X-Subject-Token + self.delete('/auth/tokens', headers=headers, + expected_status=http_client.NOT_FOUND, token=adminA_token) - # valid X-Auth-Token and invalid X-Subject-Token (404) - self.head('/auth/tokens', headers=headers, expected_status=404, + # valid X-Auth-Token and invalid X-Subject-Token + self.head('/auth/tokens', headers=headers, + expected_status=http_client.NOT_FOUND, token=adminA_token) def test_adminA_revokes_userA_token(self): @@ -694,14 +699,17 @@ class TestTokenRevokeSelfAndAdmin(test_v3.RestfulTestCase): token=user_token) self.delete('/auth/tokens', headers=headers, expected_status=204, token=adminA_token) - # invalid X-Auth-Token and invalid X-Subject-Token (401) - self.head('/auth/tokens', headers=headers, expected_status=401, + # invalid X-Auth-Token and invalid X-Subject-Token + self.head('/auth/tokens', headers=headers, + expected_status=http_client.UNAUTHORIZED, token=user_token) - # valid X-Auth-Token and invalid X-Subject-Token (404) - self.delete('/auth/tokens', headers=headers, expected_status=404, + # valid X-Auth-Token and invalid X-Subject-Token + self.delete('/auth/tokens', headers=headers, + expected_status=http_client.NOT_FOUND, token=adminA_token) - # valid X-Auth-Token and invalid X-Subject-Token (404) - self.head('/auth/tokens', headers=headers, expected_status=404, + # valid X-Auth-Token and invalid X-Subject-Token + self.head('/auth/tokens', headers=headers, + expected_status=http_client.NOT_FOUND, token=adminA_token) def test_adminB_fails_revoking_userA_token(self): @@ -729,9 +737,11 @@ class TestTokenRevokeSelfAndAdmin(test_v3.RestfulTestCase): password=self.userAdminB['password'], domain_name=self.domainB['name'])) - self.head('/auth/tokens', headers=headers, expected_status=403, + self.head('/auth/tokens', headers=headers, + expected_status=http_client.FORBIDDEN, token=adminB_token) - self.delete('/auth/tokens', headers=headers, expected_status=403, + self.delete('/auth/tokens', headers=headers, + expected_status=http_client.FORBIDDEN, token=adminB_token) @@ -907,7 +917,7 @@ class TestTokenRevokeById(test_v3.RestfulTestCase): self.delete(grant_url) self.head('/auth/tokens', headers={'X-Subject-Token': token}, - expected_status=404) + expected_status=http_client.NOT_FOUND) def role_data_fixtures(self): self.projectC = self.new_project_ref(domain_id=self.domainA['id']) @@ -1019,16 +1029,16 @@ class TestTokenRevokeById(test_v3.RestfulTestCase): # Check the tokens that used role1 is invalid self.head('/auth/tokens', headers={'X-Subject-Token': tokenA}, - expected_status=404) + expected_status=http_client.NOT_FOUND) self.head('/auth/tokens', headers={'X-Subject-Token': tokenB}, - expected_status=404) + expected_status=http_client.NOT_FOUND) self.head('/auth/tokens', headers={'X-Subject-Token': tokenD}, - expected_status=404) + expected_status=http_client.NOT_FOUND) self.head('/auth/tokens', headers={'X-Subject-Token': tokenE}, - expected_status=404) + expected_status=http_client.NOT_FOUND) # ...but the one using role2 is still valid self.head('/auth/tokens', @@ -1086,13 +1096,13 @@ class TestTokenRevokeById(test_v3.RestfulTestCase): # user should no longer have access to the project self.head('/auth/tokens', headers={'X-Subject-Token': token}, - expected_status=404) + expected_status=http_client.NOT_FOUND) self.v3_authenticate_token( self.build_authentication_request( user_id=self.user3['id'], password=self.user3['password'], project_id=self.projectA['id']), - expected_status=401) + expected_status=http_client.UNAUTHORIZED) def test_deleting_project_revokes_token(self): token = self.get_requested_token( @@ -1113,13 +1123,13 @@ class TestTokenRevokeById(test_v3.RestfulTestCase): # user should no longer have access to the project self.head('/auth/tokens', headers={'X-Subject-Token': token}, - expected_status=404) + expected_status=http_client.NOT_FOUND) self.v3_authenticate_token( self.build_authentication_request( user_id=self.user3['id'], password=self.user3['password'], project_id=self.projectA['id']), - expected_status=401) + expected_status=http_client.UNAUTHORIZED) def test_deleting_group_grant_revokes_tokens(self): """Test deleting a group grant revokes tokens. @@ -1171,15 +1181,15 @@ class TestTokenRevokeById(test_v3.RestfulTestCase): self.delete(grant_url) self.head('/auth/tokens', headers={'X-Subject-Token': token1}, - expected_status=404) + expected_status=http_client.NOT_FOUND) self.head('/auth/tokens', headers={'X-Subject-Token': token2}, - expected_status=404) + expected_status=http_client.NOT_FOUND) # But user3's token should be invalid too as revocation is done for # scope role & project self.head('/auth/tokens', headers={'X-Subject-Token': token3}, - expected_status=404) + expected_status=http_client.NOT_FOUND) def test_domain_group_role_assignment_maintains_token(self): """Test domain-group role assignment maintains existing token. @@ -1251,7 +1261,7 @@ class TestTokenRevokeById(test_v3.RestfulTestCase): 'user_id': self.user1['id']}) self.head('/auth/tokens', headers={'X-Subject-Token': token1}, - expected_status=404) + expected_status=http_client.NOT_FOUND) # But user2's token should still be valid self.head('/auth/tokens', headers={'X-Subject-Token': token2}, @@ -1295,13 +1305,13 @@ class TestTokenRevokeById(test_v3.RestfulTestCase): # authorization for the first user should now fail self.head('/auth/tokens', headers={'X-Subject-Token': user1_token}, - expected_status=404) + expected_status=http_client.NOT_FOUND) self.v3_authenticate_token( self.build_authentication_request( user_id=self.user1['id'], password=self.user1['password'], project_id=self.projectA['id']), - expected_status=401) + expected_status=http_client.UNAUTHORIZED) # authorization for the second user should still succeed self.head('/auth/tokens', @@ -1329,7 +1339,7 @@ class TestTokenRevokeById(test_v3.RestfulTestCase): '/projects/%(project_id)s' % {'project_id': self.projectA['id']}) # Make sure that we get a NotFound(404) when heading that role. - self.head(role_path, expected_status=404) + self.head(role_path, expected_status=http_client.NOT_FOUND) def get_v2_token(self, token=None, project_id=None): body = {'auth': {}, } @@ -1361,7 +1371,7 @@ class TestTokenRevokeById(test_v3.RestfulTestCase): self.head('/auth/tokens', headers={'X-Subject-Token': token}, - expected_status=404) + expected_status=http_client.NOT_FOUND) def test_revoke_token_from_token(self): # Test that a scoped token can be requested from an unscoped token, @@ -1393,7 +1403,7 @@ class TestTokenRevokeById(test_v3.RestfulTestCase): # The project-scoped token is invalidated. self.head('/auth/tokens', headers={'X-Subject-Token': project_scoped_token}, - expected_status=404) + expected_status=http_client.NOT_FOUND) # The unscoped token should still be valid. self.head('/auth/tokens', @@ -1413,7 +1423,7 @@ class TestTokenRevokeById(test_v3.RestfulTestCase): # The domain-scoped token is invalid. self.head('/auth/tokens', headers={'X-Subject-Token': domain_scoped_token}, - expected_status=404) + expected_status=http_client.NOT_FOUND) # The unscoped token should still be valid. self.head('/auth/tokens', @@ -1442,7 +1452,7 @@ class TestTokenRevokeById(test_v3.RestfulTestCase): # The project-scoped token is invalidated. self.head('/auth/tokens', headers={'X-Subject-Token': project_scoped_token}, - expected_status=404) + expected_status=http_client.NOT_FOUND) # The unscoped token should still be valid. self.head('/auth/tokens', @@ -1495,7 +1505,7 @@ class TestTokenRevokeByAssignment(TestTokenRevokeById): # while token for the projectB should not self.head('/auth/tokens', headers={'X-Subject-Token': project_token}, - expected_status=404) + expected_status=http_client.NOT_FOUND) revoked_tokens = [ t['id'] for t in self.token_provider_api.list_revoked_tokens()] # token is in token revocation list @@ -1557,7 +1567,8 @@ class TestTokenRevokeApi(TestTokenRevokeById): expected_status=200).json_body['token'] self.delete('/auth/tokens', headers=headers, expected_status=204) - self.head('/auth/tokens', headers=headers, expected_status=404) + self.head('/auth/tokens', headers=headers, + expected_status=http_client.NOT_FOUND) events_response = self.get('/OS-REVOKE/events', expected_status=200).json_body self.assertValidRevokedTokenResponse(events_response, @@ -1569,7 +1580,8 @@ class TestTokenRevokeApi(TestTokenRevokeById): response = self.get('/auth/tokens', headers=headers, expected_status=200).json_body['token'] self.delete('/auth/tokens', headers=headers, expected_status=204) - self.head('/auth/tokens', headers=headers, expected_status=404) + self.head('/auth/tokens', headers=headers, + expected_status=http_client.NOT_FOUND) events_response = self.get('/OS-REVOKE/events', expected_status=200).json_body @@ -1578,7 +1590,8 @@ class TestTokenRevokeApi(TestTokenRevokeById): audit_id=response['audit_ids'][0]) def test_revoke_by_id_false_410(self): - self.get('/auth/tokens/OS-PKI/revoked', expected_status=410) + self.get('/auth/tokens/OS-PKI/revoked', + expected_status=http_client.GONE) def test_list_delete_project_shows_in_event_list(self): self.role_data_fixtures() @@ -1662,7 +1675,8 @@ class TestTokenRevokeApi(TestTokenRevokeById): self.assertEventDataInList( events, audit_id=token2['audit_ids'][1]) - self.head('/auth/tokens', headers=headers, expected_status=404) + self.head('/auth/tokens', headers=headers, + expected_status=http_client.NOT_FOUND) self.head('/auth/tokens', headers=headers2, expected_status=200) self.head('/auth/tokens', headers=headers3, expected_status=200) @@ -2002,7 +2016,7 @@ class TestAuth(test_v3.RestfulTestCase): self._check_disabled_endpoint_result(r.result['token']['catalog'], disabled_endpoint_id) - def test_project_id_scoped_token_with_user_id_401(self): + def test_project_id_scoped_token_with_user_id_unauthorized(self): project = self.new_project_ref(domain_id=self.domain_id) self.resource_api.create_project(project['id'], project) @@ -2010,7 +2024,8 @@ class TestAuth(test_v3.RestfulTestCase): user_id=self.user['id'], password=self.user['password'], project_id=project['id']) - self.v3_authenticate_token(auth_data, expected_status=401) + self.v3_authenticate_token(auth_data, + expected_status=http_client.UNAUTHORIZED) def test_user_and_group_roles_scoped_token(self): """Test correct roles are returned in scoped token. @@ -2346,7 +2361,8 @@ class TestAuth(test_v3.RestfulTestCase): user_id=self.user['id'], password=self.user['password'], domain_id=self.domain['id']) - self.v3_authenticate_token(auth_data, expected_status=401) + self.v3_authenticate_token(auth_data, + expected_status=http_client.UNAUTHORIZED) def test_auth_with_id(self): auth_data = self.build_authentication_request( @@ -2395,34 +2411,39 @@ class TestAuth(test_v3.RestfulTestCase): auth_data = self.build_authentication_request( user_id=uuid.uuid4().hex, password=self.user['password']) - self.v3_authenticate_token(auth_data, expected_status=401) + self.v3_authenticate_token(auth_data, + expected_status=http_client.UNAUTHORIZED) def test_invalid_user_name(self): auth_data = self.build_authentication_request( username=uuid.uuid4().hex, user_domain_id=self.domain['id'], password=self.user['password']) - self.v3_authenticate_token(auth_data, expected_status=401) + self.v3_authenticate_token(auth_data, + expected_status=http_client.UNAUTHORIZED) def test_invalid_domain_id(self): auth_data = self.build_authentication_request( username=self.user['name'], user_domain_id=uuid.uuid4().hex, password=self.user['password']) - self.v3_authenticate_token(auth_data, expected_status=401) + self.v3_authenticate_token(auth_data, + expected_status=http_client.UNAUTHORIZED) def test_invalid_domain_name(self): auth_data = self.build_authentication_request( username=self.user['name'], user_domain_name=uuid.uuid4().hex, password=self.user['password']) - self.v3_authenticate_token(auth_data, expected_status=401) + self.v3_authenticate_token(auth_data, + expected_status=http_client.UNAUTHORIZED) def test_invalid_password(self): auth_data = self.build_authentication_request( user_id=self.user['id'], password=uuid.uuid4().hex) - self.v3_authenticate_token(auth_data, expected_status=401) + self.v3_authenticate_token(auth_data, + expected_status=http_client.UNAUTHORIZED) def test_remote_user_no_realm(self): api = auth.controllers.Auth() @@ -2588,7 +2609,8 @@ class TestAuth(test_v3.RestfulTestCase): user_id=user['id'], password='password') - self.v3_authenticate_token(auth_data, expected_status=401) + self.v3_authenticate_token(auth_data, + expected_status=http_client.UNAUTHORIZED) def test_disabled_default_project_result_in_unscoped_token(self): # create a disabled project to work with @@ -2666,7 +2688,8 @@ class TestAuth(test_v3.RestfulTestCase): user_id=self.user['id'], password=self.user['password'], project_id=project['id']) - self.v3_authenticate_token(auth_data, expected_status=401) + self.v3_authenticate_token(auth_data, + expected_status=http_client.UNAUTHORIZED) # user should not be able to auth with project_name & domain auth_data = self.build_authentication_request( @@ -2674,7 +2697,8 @@ class TestAuth(test_v3.RestfulTestCase): password=self.user['password'], project_name=project['name'], project_domain_id=domain['id']) - self.v3_authenticate_token(auth_data, expected_status=401) + self.v3_authenticate_token(auth_data, + expected_status=http_client.UNAUTHORIZED) def test_auth_methods_with_different_identities_fails(self): # get the token for a user. This is self.user which is different from @@ -2686,7 +2710,8 @@ class TestAuth(test_v3.RestfulTestCase): token=token, user_id=self.default_domain_user['id'], password=self.default_domain_user['password']) - self.v3_authenticate_token(auth_data, expected_status=401) + self.v3_authenticate_token(auth_data, + expected_status=http_client.UNAUTHORIZED) class TestAuthJSONExternal(test_v3.RestfulTestCase): @@ -2712,15 +2737,18 @@ class TestTrustOptional(test_v3.RestfulTestCase): self.config_fixture.config(group='trust', enabled=False) def test_trusts_404(self): - self.get('/OS-TRUST/trusts', body={'trust': {}}, expected_status=404) - self.post('/OS-TRUST/trusts', body={'trust': {}}, expected_status=404) + self.get('/OS-TRUST/trusts', body={'trust': {}}, + expected_status=http_client.NOT_FOUND) + self.post('/OS-TRUST/trusts', body={'trust': {}}, + expected_status=http_client.NOT_FOUND) - def test_auth_with_scope_in_trust_403(self): + def test_auth_with_scope_in_trust_forbidden(self): auth_data = self.build_authentication_request( user_id=self.user['id'], password=self.user['password'], trust_id=uuid.uuid4().hex) - self.v3_authenticate_token(auth_data, expected_status=403) + self.v3_authenticate_token(auth_data, + expected_status=http_client.FORBIDDEN) class TestTrustRedelegation(test_v3.RestfulTestCase): @@ -2804,7 +2832,7 @@ class TestTrustRedelegation(test_v3.RestfulTestCase): self.post('/OS-TRUST/trusts', body={'trust': self.chained_trust_ref}, token=trust_token, - expected_status=403) + expected_status=http_client.FORBIDDEN) def test_modified_redelegation_count_error(self): r = self.post('/OS-TRUST/trusts', @@ -2820,14 +2848,14 @@ class TestTrustRedelegation(test_v3.RestfulTestCase): self.post('/OS-TRUST/trusts', body={'trust': self.chained_trust_ref}, token=trust_token, - expected_status=403) + expected_status=http_client.FORBIDDEN) def test_max_redelegation_count_constraint(self): incorrect = CONF.trust.max_redelegation_count + 1 self.redelegated_trust_ref['redelegation_count'] = incorrect self.post('/OS-TRUST/trusts', body={'trust': self.redelegated_trust_ref}, - expected_status=403) + expected_status=http_client.FORBIDDEN) def test_redelegation_expiry(self): r = self.post('/OS-TRUST/trusts', @@ -2847,7 +2875,7 @@ class TestTrustRedelegation(test_v3.RestfulTestCase): self.post('/OS-TRUST/trusts', body={'trust': too_long_live_chained_trust_ref}, token=trust_token, - expected_status=403) + expected_status=http_client.FORBIDDEN) def test_redelegation_remaining_uses(self): r = self.post('/OS-TRUST/trusts', @@ -2862,7 +2890,7 @@ class TestTrustRedelegation(test_v3.RestfulTestCase): self.post('/OS-TRUST/trusts', body={'trust': self.chained_trust_ref}, token=trust_token, - expected_status=400) + expected_status=http_client.BAD_REQUEST) def test_roles_subset(self): # Build second role @@ -2949,7 +2977,7 @@ class TestTrustRedelegation(test_v3.RestfulTestCase): self.post('/OS-TRUST/trusts', body={'trust': self.chained_trust_ref}, token=trust_token, - expected_status=403) + expected_status=http_client.FORBIDDEN) def test_redelegation_terminator(self): r = self.post('/OS-TRUST/trusts', @@ -2977,7 +3005,7 @@ class TestTrustRedelegation(test_v3.RestfulTestCase): self.post('/OS-TRUST/trusts', body={'trust': ref}, token=trust_token, - expected_status=403) + expected_status=http_client.FORBIDDEN) class TestTrustChain(test_v3.RestfulTestCase): @@ -3088,7 +3116,8 @@ class TestTrustChain(test_v3.RestfulTestCase): expected_status=204) headers = {'X-Subject-Token': self.last_token} - self.head('/auth/tokens', headers=headers, expected_status=404) + self.head('/auth/tokens', headers=headers, + expected_status=http_client.NOT_FOUND) self.assert_trust_tokens_revoked(self.trust_chain[0]['id']) def test_delete_broken_chain(self): @@ -3111,7 +3140,8 @@ class TestTrustChain(test_v3.RestfulTestCase): auth_data = self.build_authentication_request( token=self.last_token, trust_id=self.trust_chain[-1]['id']) - self.v3_authenticate_token(auth_data, expected_status=404) + self.v3_authenticate_token(auth_data, + expected_status=http_client.NOT_FOUND) def test_intermediate_user_disabled(self): self.assert_user_authenticate(self.user_chain[0]) @@ -3123,7 +3153,8 @@ class TestTrustChain(test_v3.RestfulTestCase): # Bypass policy enforcement with mock.patch.object(rules, 'enforce', return_value=True): headers = {'X-Subject-Token': self.last_token} - self.head('/auth/tokens', headers=headers, expected_status=403) + self.head('/auth/tokens', headers=headers, + expected_status=http_client.FORBIDDEN) def test_intermediate_user_deleted(self): self.assert_user_authenticate(self.user_chain[0]) @@ -3133,7 +3164,8 @@ class TestTrustChain(test_v3.RestfulTestCase): # Bypass policy enforcement with mock.patch.object(rules, 'enforce', return_value=True): headers = {'X-Subject-Token': self.last_token} - self.head('/auth/tokens', headers=headers, expected_status=403) + self.head('/auth/tokens', headers=headers, + expected_status=http_client.FORBIDDEN) class TestTrustAuth(test_v3.RestfulTestCase): @@ -3159,9 +3191,10 @@ class TestTrustAuth(test_v3.RestfulTestCase): self.trustee_user['password'] = password self.trustee_user_id = self.trustee_user['id'] - def test_create_trust_400(self): + def test_create_trust_bad_request(self): # The server returns a 403 Forbidden rather than a 400, see bug 1133435 - self.post('/OS-TRUST/trusts', body={'trust': {}}, expected_status=403) + self.post('/OS-TRUST/trusts', body={'trust': {}}, + expected_status=http_client.FORBIDDEN) def test_create_unscoped_trust(self): ref = self.new_trust_ref( @@ -3175,7 +3208,8 @@ class TestTrustAuth(test_v3.RestfulTestCase): trustor_user_id=self.user_id, trustee_user_id=self.trustee_user_id, project_id=self.project_id) - self.post('/OS-TRUST/trusts', body={'trust': ref}, expected_status=403) + self.post('/OS-TRUST/trusts', body={'trust': ref}, + expected_status=http_client.FORBIDDEN) def _initialize_test_consume_trust(self, count): # Make sure remaining_uses is decremented as we consume the trust @@ -3219,13 +3253,14 @@ class TestTrustAuth(test_v3.RestfulTestCase): # No more uses, the trust is made unavailable self.get( '/OS-TRUST/trusts/%(trust_id)s' % {'trust_id': trust['id']}, - expected_status=404) + expected_status=http_client.NOT_FOUND) # this time we can't get a trust token auth_data = self.build_authentication_request( user_id=self.trustee_user['id'], password=self.trustee_user['password'], trust_id=trust['id']) - self.v3_authenticate_token(auth_data, expected_status=401) + self.v3_authenticate_token(auth_data, + expected_status=http_client.UNAUTHORIZED) def test_create_trust_with_bad_values_for_remaining_uses(self): # negative values for the remaining_uses parameter are forbidden @@ -3245,7 +3280,7 @@ class TestTrustAuth(test_v3.RestfulTestCase): role_ids=[self.role_id]) self.post('/OS-TRUST/trusts', body={'trust': ref}, - expected_status=400) + expected_status=http_client.BAD_REQUEST) def test_invalid_trust_request_without_impersonation(self): ref = self.new_trust_ref( @@ -3258,7 +3293,7 @@ class TestTrustAuth(test_v3.RestfulTestCase): self.post('/OS-TRUST/trusts', body={'trust': ref}, - expected_status=400) + expected_status=http_client.BAD_REQUEST) def test_invalid_trust_request_without_trustee(self): ref = self.new_trust_ref( @@ -3271,7 +3306,7 @@ class TestTrustAuth(test_v3.RestfulTestCase): self.post('/OS-TRUST/trusts', body={'trust': ref}, - expected_status=400) + expected_status=http_client.BAD_REQUEST) def test_create_unlimited_use_trust(self): # by default trusts are unlimited in terms of tokens that can be @@ -3343,7 +3378,7 @@ class TestTrustAuth(test_v3.RestfulTestCase): self.patch( '/OS-TRUST/trusts/%(trust_id)s' % {'trust_id': trust['id']}, body={'trust': ref}, - expected_status=404) + expected_status=http_client.NOT_FOUND) self.delete( '/OS-TRUST/trusts/%(trust_id)s' % {'trust_id': trust['id']}, @@ -3351,7 +3386,7 @@ class TestTrustAuth(test_v3.RestfulTestCase): self.get( '/OS-TRUST/trusts/%(trust_id)s' % {'trust_id': trust['id']}, - expected_status=404) + expected_status=http_client.NOT_FOUND) def test_create_trust_trustee_404(self): ref = self.new_trust_ref( @@ -3359,7 +3394,8 @@ class TestTrustAuth(test_v3.RestfulTestCase): trustee_user_id=uuid.uuid4().hex, project_id=self.project_id, role_ids=[self.role_id]) - self.post('/OS-TRUST/trusts', body={'trust': ref}, expected_status=404) + self.post('/OS-TRUST/trusts', body={'trust': ref}, + expected_status=http_client.NOT_FOUND) def test_create_trust_trustor_trustee_backwards(self): ref = self.new_trust_ref( @@ -3367,7 +3403,8 @@ class TestTrustAuth(test_v3.RestfulTestCase): trustee_user_id=self.user_id, project_id=self.project_id, role_ids=[self.role_id]) - self.post('/OS-TRUST/trusts', body={'trust': ref}, expected_status=403) + self.post('/OS-TRUST/trusts', body={'trust': ref}, + expected_status=http_client.FORBIDDEN) def test_create_trust_project_404(self): ref = self.new_trust_ref( @@ -3375,7 +3412,8 @@ class TestTrustAuth(test_v3.RestfulTestCase): trustee_user_id=self.trustee_user_id, project_id=uuid.uuid4().hex, role_ids=[self.role_id]) - self.post('/OS-TRUST/trusts', body={'trust': ref}, expected_status=404) + self.post('/OS-TRUST/trusts', body={'trust': ref}, + expected_status=http_client.NOT_FOUND) def test_create_trust_role_id_404(self): ref = self.new_trust_ref( @@ -3383,7 +3421,8 @@ class TestTrustAuth(test_v3.RestfulTestCase): trustee_user_id=self.trustee_user_id, project_id=self.project_id, role_ids=[uuid.uuid4().hex]) - self.post('/OS-TRUST/trusts', body={'trust': ref}, expected_status=404) + self.post('/OS-TRUST/trusts', body={'trust': ref}, + expected_status=http_client.NOT_FOUND) def test_create_trust_role_name_404(self): ref = self.new_trust_ref( @@ -3391,7 +3430,8 @@ class TestTrustAuth(test_v3.RestfulTestCase): trustee_user_id=self.trustee_user_id, project_id=self.project_id, role_names=[uuid.uuid4().hex]) - self.post('/OS-TRUST/trusts', body={'trust': ref}, expected_status=404) + self.post('/OS-TRUST/trusts', body={'trust': ref}, + expected_status=http_client.NOT_FOUND) def test_v3_v2_intermix_trustor_not_in_default_domain_failed(self): ref = self.new_trust_ref( @@ -3419,7 +3459,7 @@ class TestTrustAuth(test_v3.RestfulTestCase): path = '/v2.0/tokens/%s' % (token) self.admin_request( path=path, token=CONF.admin_token, - method='GET', expected_status=401) + method='GET', expected_status=http_client.UNAUTHORIZED) def test_v3_v2_intermix_trustor_not_in_default_domaini_failed(self): ref = self.new_trust_ref( @@ -3452,7 +3492,7 @@ class TestTrustAuth(test_v3.RestfulTestCase): path = '/v2.0/tokens/%s' % (token) self.admin_request( path=path, token=CONF.admin_token, - method='GET', expected_status=401) + method='GET', expected_status=http_client.UNAUTHORIZED) def test_v3_v2_intermix_project_not_in_default_domaini_failed(self): # create a trustee in default domain to delegate stuff to @@ -3492,7 +3532,7 @@ class TestTrustAuth(test_v3.RestfulTestCase): path = '/v2.0/tokens/%s' % (token) self.admin_request( path=path, token=CONF.admin_token, - method='GET', expected_status=401) + method='GET', expected_status=http_client.UNAUTHORIZED) def test_v3_v2_intermix(self): # create a trustee in default domain to delegate stuff to @@ -3624,7 +3664,7 @@ class TestTrustAuth(test_v3.RestfulTestCase): self.post('/OS-TRUST/trusts', body={'trust': ref}, token=trust_token, - expected_status=403) + expected_status=http_client.FORBIDDEN) def test_trust_deleted_grant(self): # create a new role @@ -3662,7 +3702,8 @@ class TestTrustAuth(test_v3.RestfulTestCase): user_id=self.trustee_user['id'], password=self.trustee_user['password'], trust_id=trust['id']) - r = self.v3_authenticate_token(auth_data, expected_status=403) + r = self.v3_authenticate_token(auth_data, + expected_status=http_client.FORBIDDEN) def test_trust_chained(self): """Test that a trust token can't be used to execute another trust. @@ -3730,7 +3771,8 @@ class TestTrustAuth(test_v3.RestfulTestCase): auth_data = self.build_authentication_request( token=trust_token, trust_id=trust1['id']) - r = self.v3_authenticate_token(auth_data, expected_status=403) + r = self.v3_authenticate_token(auth_data, + expected_status=http_client.FORBIDDEN) def assertTrustTokensRevoked(self, trust_id): revocation_response = self.get('/OS-REVOKE/events', @@ -3766,7 +3808,8 @@ class TestTrustAuth(test_v3.RestfulTestCase): 'trust_id': trust_id}, expected_status=204) headers = {'X-Subject-Token': trust_token} - self.head('/auth/tokens', headers=headers, expected_status=404) + self.head('/auth/tokens', headers=headers, + expected_status=http_client.NOT_FOUND) self.assertTrustTokensRevoked(trust_id) def disable_user(self, user): @@ -3798,7 +3841,8 @@ class TestTrustAuth(test_v3.RestfulTestCase): user_id=self.trustee_user['id'], password=self.trustee_user['password'], trust_id=trust['id']) - self.v3_authenticate_token(auth_data, expected_status=403) + self.v3_authenticate_token(auth_data, + expected_status=http_client.FORBIDDEN) def test_trust_get_token_fails_if_trustee_disabled(self): ref = self.new_trust_ref( @@ -3825,7 +3869,8 @@ class TestTrustAuth(test_v3.RestfulTestCase): user_id=self.trustee_user['id'], password=self.trustee_user['password'], trust_id=trust['id']) - self.v3_authenticate_token(auth_data, expected_status=401) + self.v3_authenticate_token(auth_data, + expected_status=http_client.UNAUTHORIZED) def test_delete_trust(self): ref = self.new_trust_ref( @@ -3846,17 +3891,18 @@ class TestTrustAuth(test_v3.RestfulTestCase): self.get('/OS-TRUST/trusts/%(trust_id)s' % { 'trust_id': trust['id']}, - expected_status=404) + expected_status=http_client.NOT_FOUND) self.get('/OS-TRUST/trusts/%(trust_id)s' % { 'trust_id': trust['id']}, - expected_status=404) + expected_status=http_client.NOT_FOUND) auth_data = self.build_authentication_request( user_id=self.trustee_user['id'], password=self.trustee_user['password'], trust_id=trust['id']) - self.v3_authenticate_token(auth_data, expected_status=401) + self.v3_authenticate_token(auth_data, + expected_status=http_client.UNAUTHORIZED) def test_list_trusts(self): ref = self.new_trust_ref( @@ -3918,7 +3964,7 @@ class TestTrustAuth(test_v3.RestfulTestCase): expected_status=200)) self.get('/OS-TRUST/trusts?trustor_user_id=%s' % - self.user_id, expected_status=401, + self.user_id, expected_status=http_client.UNAUTHORIZED, token=trust_token) def test_trustee_can_do_role_ops(self): @@ -3977,7 +4023,8 @@ class TestTrustAuth(test_v3.RestfulTestCase): user_id=self.default_domain_user['id'], password=self.default_domain_user['password'], trust_id=trust_id) - self.v3_authenticate_token(auth_data, expected_status=403) + self.v3_authenticate_token(auth_data, + expected_status=http_client.FORBIDDEN) r = self.get('/OS-TRUST/trusts/%s' % trust_id) self.assertEqual(3, r.result.get('trust').get('remaining_uses')) @@ -4075,7 +4122,7 @@ class TestAuthSpecificData(test_v3.RestfulTestCase): user_id=self.user['id'], password=self.user['password'], domain_id=self.domain['id']), - expected_status=403) + expected_status=http_client.FORBIDDEN) def test_get_catalog_unscoped_token(self): """Call ``GET /auth/catalog`` with an unscoped token.""" @@ -4084,14 +4131,14 @@ class TestAuthSpecificData(test_v3.RestfulTestCase): auth=self.build_authentication_request( user_id=self.default_domain_user['id'], password=self.default_domain_user['password']), - expected_status=403) + expected_status=http_client.FORBIDDEN) def test_get_catalog_no_token(self): """Call ``GET /auth/catalog`` without a token.""" self.get( '/auth/catalog', noauth=True, - expected_status=401) + expected_status=http_client.UNAUTHORIZED) def test_get_projects_project_scoped_token(self): r = self.get('/auth/projects', expected_status=200) @@ -4190,13 +4237,15 @@ class TestFernetTokenProvider(test_v3.RestfulTestCase): unscoped_token = self._get_unscoped_token() tampered_token = (unscoped_token[:50] + uuid.uuid4().hex + unscoped_token[50 + 32:]) - self._validate_token(tampered_token, expected_status=404) + self._validate_token(tampered_token, + expected_status=http_client.NOT_FOUND) def test_revoke_unscoped_token(self): unscoped_token = self._get_unscoped_token() self._validate_token(unscoped_token) self._revoke_token(unscoped_token) - self._validate_token(unscoped_token, expected_status=404) + self._validate_token(unscoped_token, + expected_status=http_client.NOT_FOUND) def test_unscoped_token_is_invalid_after_disabling_user(self): unscoped_token = self._get_unscoped_token() @@ -4270,13 +4319,15 @@ class TestFernetTokenProvider(test_v3.RestfulTestCase): project_scoped_token = self._get_project_scoped_token() tampered_token = (project_scoped_token[:50] + uuid.uuid4().hex + project_scoped_token[50 + 32:]) - self._validate_token(tampered_token, expected_status=404) + self._validate_token(tampered_token, + expected_status=http_client.NOT_FOUND) def test_revoke_project_scoped_token(self): project_scoped_token = self._get_project_scoped_token() self._validate_token(project_scoped_token) self._revoke_token(project_scoped_token) - self._validate_token(project_scoped_token, expected_status=404) + self._validate_token(project_scoped_token, + expected_status=http_client.NOT_FOUND) def test_project_scoped_token_is_invalid_after_disabling_user(self): project_scoped_token = self._get_project_scoped_token() @@ -4378,7 +4429,8 @@ class TestFernetTokenProvider(test_v3.RestfulTestCase): # Get a trust scoped token tampered_token = (trust_scoped_token[:50] + uuid.uuid4().hex + trust_scoped_token[50 + 32:]) - self._validate_token(tampered_token, expected_status=404) + self._validate_token(tampered_token, + expected_status=http_client.NOT_FOUND) def test_revoke_trust_scoped_token(self): trustee_user, trust = self._create_trust() @@ -4386,7 +4438,8 @@ class TestFernetTokenProvider(test_v3.RestfulTestCase): # Validate a trust scoped token self._validate_token(trust_scoped_token) self._revoke_token(trust_scoped_token) - self._validate_token(trust_scoped_token, expected_status=404) + self._validate_token(trust_scoped_token, + expected_status=http_client.NOT_FOUND) def test_trust_scoped_token_is_invalid_after_disabling_trustee(self): trustee_user, trust = self._create_trust() @@ -4460,7 +4513,7 @@ class TestFernetTokenProvider(test_v3.RestfulTestCase): self.token_provider_api.validate_token, trust_scoped_token) - def test_v2_validate_unscoped_token_returns_401(self): + def test_v2_validate_unscoped_token_returns_unauthorized(self): """Test raised exception when validating unscoped token. Test that validating an unscoped token in v2.0 of a v3 user of a @@ -4471,7 +4524,7 @@ class TestFernetTokenProvider(test_v3.RestfulTestCase): self.token_provider_api.validate_v2_token, unscoped_token) - def test_v2_validate_domain_scoped_token_returns_401(self): + def test_v2_validate_domain_scoped_token_returns_unauthorized(self): """Test raised exception when validating a domain scoped token. Test that validating an domain scoped token in v2.0 diff --git a/keystone/tests/unit/test_v3_catalog.py b/keystone/tests/unit/test_v3_catalog.py index f96b2a12fb..bf439cefc4 100644 --- a/keystone/tests/unit/test_v3_catalog.py +++ b/keystone/tests/unit/test_v3_catalog.py @@ -15,6 +15,7 @@ import copy import uuid +from six.moves import http_client from testtools import matchers from keystone import catalog @@ -184,7 +185,7 @@ class CatalogTestCase(test_v3.RestfulTestCase): self.put( '/regions/%s' % uuid.uuid4().hex, body={'region': ref}, - expected_status=400) + expected_status=http_client.BAD_REQUEST) def test_list_regions(self): """Call ``GET /regions``.""" @@ -326,19 +327,22 @@ class CatalogTestCase(test_v3.RestfulTestCase): """Call ``POST /services``.""" ref = self.new_service_ref() ref['enabled'] = 'True' - self.post('/services', body={'service': ref}, expected_status=400) + self.post('/services', body={'service': ref}, + expected_status=http_client.BAD_REQUEST) def test_create_service_enabled_str_false(self): """Call ``POST /services``.""" ref = self.new_service_ref() ref['enabled'] = 'False' - self.post('/services', body={'service': ref}, expected_status=400) + self.post('/services', body={'service': ref}, + expected_status=http_client.BAD_REQUEST) def test_create_service_enabled_str_random(self): """Call ``POST /services``.""" ref = self.new_service_ref() ref['enabled'] = 'puppies' - self.post('/services', body={'service': ref}, expected_status=400) + self.post('/services', body={'service': ref}, + expected_status=http_client.BAD_REQUEST) def test_list_services(self): """Call ``GET /services``.""" @@ -575,7 +579,7 @@ class CatalogTestCase(test_v3.RestfulTestCase): self.post( '/endpoints', body={'endpoint': ref}, - expected_status=400) + expected_status=http_client.BAD_REQUEST) def test_create_endpoint_enabled_str_false(self): """Call ``POST /endpoints`` with enabled: 'False'.""" @@ -584,7 +588,7 @@ class CatalogTestCase(test_v3.RestfulTestCase): self.post( '/endpoints', body={'endpoint': ref}, - expected_status=400) + expected_status=http_client.BAD_REQUEST) def test_create_endpoint_enabled_str_random(self): """Call ``POST /endpoints`` with enabled: 'puppies'.""" @@ -593,13 +597,14 @@ class CatalogTestCase(test_v3.RestfulTestCase): self.post( '/endpoints', body={'endpoint': ref}, - expected_status=400) + expected_status=http_client.BAD_REQUEST) def test_create_endpoint_with_invalid_region_id(self): """Call ``POST /endpoints``.""" ref = self.new_endpoint_ref(service_id=self.service_id) ref["region_id"] = uuid.uuid4().hex - self.post('/endpoints', body={'endpoint': ref}, expected_status=400) + self.post('/endpoints', body={'endpoint': ref}, + expected_status=http_client.BAD_REQUEST) def test_create_endpoint_with_region(self): """EndpointV3 creates the region before creating the endpoint, if @@ -623,7 +628,8 @@ class CatalogTestCase(test_v3.RestfulTestCase): """Call ``POST /endpoints``.""" ref = self.new_endpoint_ref(service_id=self.service_id) ref["url"] = '' - self.post('/endpoints', body={'endpoint': ref}, expected_status=400) + self.post('/endpoints', body={'endpoint': ref}, + expected_status=http_client.BAD_REQUEST) def test_get_endpoint(self): """Call ``GET /endpoints/{endpoint_id}``.""" @@ -667,7 +673,7 @@ class CatalogTestCase(test_v3.RestfulTestCase): '/endpoints/%(endpoint_id)s' % { 'endpoint_id': self.endpoint_id}, body={'endpoint': {'enabled': 'True'}}, - expected_status=400) + expected_status=http_client.BAD_REQUEST) def test_update_endpoint_enabled_str_false(self): """Call ``PATCH /endpoints/{endpoint_id}`` with enabled: 'False'.""" @@ -675,7 +681,7 @@ class CatalogTestCase(test_v3.RestfulTestCase): '/endpoints/%(endpoint_id)s' % { 'endpoint_id': self.endpoint_id}, body={'endpoint': {'enabled': 'False'}}, - expected_status=400) + expected_status=http_client.BAD_REQUEST) def test_update_endpoint_enabled_str_random(self): """Call ``PATCH /endpoints/{endpoint_id}`` with enabled: 'kitties'.""" @@ -683,7 +689,7 @@ class CatalogTestCase(test_v3.RestfulTestCase): '/endpoints/%(endpoint_id)s' % { 'endpoint_id': self.endpoint_id}, body={'endpoint': {'enabled': 'kitties'}}, - expected_status=400) + expected_status=http_client.BAD_REQUEST) def test_delete_endpoint(self): """Call ``DELETE /endpoints/{endpoint_id}``.""" @@ -762,7 +768,8 @@ class CatalogTestCase(test_v3.RestfulTestCase): self.delete('/endpoints/%s' % ref['id']) # make sure it's deleted (GET should return 404) - self.get('/endpoints/%s' % ref['id'], expected_status=404) + self.get('/endpoints/%s' % ref['id'], + expected_status=http_client.NOT_FOUND) def test_endpoint_create_with_valid_url(self): """Create endpoint with valid url should be tested,too.""" @@ -798,7 +805,7 @@ class CatalogTestCase(test_v3.RestfulTestCase): ref['url'] = invalid_url self.post('/endpoints', body={'endpoint': ref}, - expected_status=400) + expected_status=http_client.BAD_REQUEST) class TestCatalogAPISQL(tests.TestCase): diff --git a/keystone/tests/unit/test_v3_credential.py b/keystone/tests/unit/test_v3_credential.py index 2587b95998..dd8cf2dd82 100644 --- a/keystone/tests/unit/test_v3_credential.py +++ b/keystone/tests/unit/test_v3_credential.py @@ -18,6 +18,7 @@ import uuid from keystoneclient.contrib.ec2 import utils as ec2_utils from oslo_config import cfg +from six.moves import http_client from testtools import matchers from keystone import exception @@ -252,10 +253,10 @@ class CredentialTestCase(CredentialBaseTestCase): "secret": uuid.uuid4().hex} ref['blob'] = json.dumps(blob) ref['type'] = 'ec2' - # Assert 400 status for bad request with missing project_id + # Assert bad request status when missing project_id self.post( '/credentials', - body={'credential': ref}, expected_status=400) + body={'credential': ref}, expected_status=http_client.BAD_REQUEST) def test_create_ec2_credential_with_invalid_blob(self): """Call ``POST /credentials`` for creating ec2 @@ -265,11 +266,10 @@ class CredentialTestCase(CredentialBaseTestCase): project_id=self.project_id) ref['blob'] = '{"abc":"def"d}' ref['type'] = 'ec2' - # Assert 400 status for bad request containing invalid - # blob + # Assert bad request status when request contains invalid blob response = self.post( '/credentials', - body={'credential': ref}, expected_status=400) + body={'credential': ref}, expected_status=http_client.BAD_REQUEST) self.assertValidErrorResponse(response) def test_create_credential_with_admin_token(self): diff --git a/keystone/tests/unit/test_v3_domain_config.py b/keystone/tests/unit/test_v3_domain_config.py index 6f96f0e733..701cd3cfa8 100644 --- a/keystone/tests/unit/test_v3_domain_config.py +++ b/keystone/tests/unit/test_v3_domain_config.py @@ -14,6 +14,7 @@ import copy import uuid from oslo_config import cfg +from six.moves import http_client from keystone import exception from keystone.tests.unit import test_v3 @@ -103,21 +104,24 @@ class DomainConfigTestCase(test_v3.RestfulTestCase): def test_get_non_existant_config(self): """Call ``GET /domains{domain_id}/config when no config defined``.""" self.get('/domains/%(domain_id)s/config' % { - 'domain_id': self.domain['id']}, expected_status=404) + 'domain_id': self.domain['id']}, + expected_status=http_client.NOT_FOUND) def test_get_non_existant_config_group(self): """Call ``GET /domains{domain_id}/config/{group_not_exist}``.""" config = {'ldap': {'url': uuid.uuid4().hex}} self.domain_config_api.create_config(self.domain['id'], config) self.get('/domains/%(domain_id)s/config/identity' % { - 'domain_id': self.domain['id']}, expected_status=404) + 'domain_id': self.domain['id']}, + expected_status=http_client.NOT_FOUND) def test_get_non_existant_config_option(self): """Call ``GET /domains{domain_id}/config/group/{option_not_exist}``.""" config = {'ldap': {'url': uuid.uuid4().hex}} self.domain_config_api.create_config(self.domain['id'], config) self.get('/domains/%(domain_id)s/config/ldap/user_tree_dn' % { - 'domain_id': self.domain['id']}, expected_status=404) + 'domain_id': self.domain['id']}, + expected_status=http_client.NOT_FOUND) def test_update_config(self): """Call ``PATCH /domains/{domain_id}/config``.""" @@ -163,7 +167,7 @@ class DomainConfigTestCase(test_v3.RestfulTestCase): self.patch('/domains/%(domain_id)s/config/%(invalid_group)s' % { 'domain_id': self.domain['id'], 'invalid_group': invalid_group}, body={'config': new_config}, - expected_status=403) + expected_status=http_client.FORBIDDEN) # Trying to update a valid group, but one that is not in the current # config should result in NotFound config = {'ldap': {'suffix': uuid.uuid4().hex}} @@ -172,7 +176,7 @@ class DomainConfigTestCase(test_v3.RestfulTestCase): self.patch('/domains/%(domain_id)s/config/identity' % { 'domain_id': self.domain['id']}, body={'config': new_config}, - expected_status=404) + expected_status=http_client.NOT_FOUND) def test_update_config_option(self): """Call ``PATCH /domains/{domain_id}/config/{group}/{option}``.""" @@ -199,7 +203,7 @@ class DomainConfigTestCase(test_v3.RestfulTestCase): 'domain_id': self.domain['id'], 'invalid_option': invalid_option}, body={'config': new_config}, - expected_status=403) + expected_status=http_client.FORBIDDEN) # Trying to update a valid option, but one that is not in the current # config should result in NotFound new_config = {'suffix': uuid.uuid4().hex} @@ -207,4 +211,4 @@ class DomainConfigTestCase(test_v3.RestfulTestCase): '/domains/%(domain_id)s/config/ldap/suffix' % { 'domain_id': self.domain['id']}, body={'config': new_config}, - expected_status=404) + expected_status=http_client.NOT_FOUND) diff --git a/keystone/tests/unit/test_v3_endpoint_policy.py b/keystone/tests/unit/test_v3_endpoint_policy.py index 4daeff4dcc..3423d2d816 100644 --- a/keystone/tests/unit/test_v3_endpoint_policy.py +++ b/keystone/tests/unit/test_v3_endpoint_policy.py @@ -12,6 +12,7 @@ # License for the specific language governing permissions and limitations # under the License. +from six.moves import http_client from testtools import matchers from keystone.tests.unit import test_v3 @@ -48,7 +49,9 @@ class EndpointPolicyTestCase(test_v3.RestfulTestCase): # Test when the resource does not exist also ensures # that there is not a false negative after creation. - self.assert_head_and_get_return_same_response(url, expected_status=404) + self.assert_head_and_get_return_same_response( + url, + expected_status=http_client.NOT_FOUND) self.put(url, expected_status=204) @@ -58,7 +61,9 @@ class EndpointPolicyTestCase(test_v3.RestfulTestCase): self.delete(url, expected_status=204) # test that the deleted resource is no longer accessible - self.assert_head_and_get_return_same_response(url, expected_status=404) + self.assert_head_and_get_return_same_response( + url, + expected_status=http_client.NOT_FOUND) def test_crud_for_policy_for_explicit_endpoint(self): """PUT, HEAD and DELETE for explicit endpoint policy.""" @@ -136,7 +141,7 @@ class EndpointPolicyTestCase(test_v3.RestfulTestCase): self.delete('/endpoints/%(endpoint_id)s' % { 'endpoint_id': self.endpoint['id']}) - self.head(url, expected_status=404) + self.head(url, expected_status=http_client.NOT_FOUND) def test_region_service_association_cleanup_when_region_deleted(self): url = ('/policies/%(policy_id)s/OS-ENDPOINT-POLICY' @@ -151,7 +156,7 @@ class EndpointPolicyTestCase(test_v3.RestfulTestCase): self.delete('/regions/%(region_id)s' % { 'region_id': self.region['id']}) - self.head(url, expected_status=404) + self.head(url, expected_status=http_client.NOT_FOUND) def test_region_service_association_cleanup_when_service_deleted(self): url = ('/policies/%(policy_id)s/OS-ENDPOINT-POLICY' @@ -166,7 +171,7 @@ class EndpointPolicyTestCase(test_v3.RestfulTestCase): self.delete('/services/%(service_id)s' % { 'service_id': self.service['id']}) - self.head(url, expected_status=404) + self.head(url, expected_status=http_client.NOT_FOUND) def test_service_association_cleanup_when_service_deleted(self): url = ('/policies/%(policy_id)s/OS-ENDPOINT-POLICY' @@ -180,7 +185,7 @@ class EndpointPolicyTestCase(test_v3.RestfulTestCase): self.delete('/policies/%(policy_id)s' % { 'policy_id': self.policy['id']}) - self.head(url, expected_status=404) + self.head(url, expected_status=http_client.NOT_FOUND) def test_service_association_cleanup_when_policy_deleted(self): url = ('/policies/%(policy_id)s/OS-ENDPOINT-POLICY' @@ -194,7 +199,7 @@ class EndpointPolicyTestCase(test_v3.RestfulTestCase): self.delete('/services/%(service_id)s' % { 'service_id': self.service['id']}) - self.head(url, expected_status=404) + self.head(url, expected_status=http_client.NOT_FOUND) class JsonHomeTests(test_v3.JsonHomeTestMixin): diff --git a/keystone/tests/unit/test_v3_federation.py b/keystone/tests/unit/test_v3_federation.py index 4f30bdcacc..e14ed8ad1f 100644 --- a/keystone/tests/unit/test_v3_federation.py +++ b/keystone/tests/unit/test_v3_federation.py @@ -26,6 +26,7 @@ from oslotest import mockpatch import saml2 from saml2 import saml from saml2 import sigver +from six.moves import http_client from six.moves import range, urllib, zip xmldsig = importutils.try_import("saml2.xmldsig") if not xmldsig: @@ -899,7 +900,7 @@ class FederatedIdentityProviderTests(FederationTests): body['remote_ids'] = [uuid.uuid4().hex, repeated_remote_id] self.put(url, body={'identity_provider': body}, - expected_status=409) + expected_status=http_client.CONFLICT) def test_create_idp_remote_empty(self): """Creates an IdP with empty remote_ids.""" @@ -1026,7 +1027,7 @@ class FederatedIdentityProviderTests(FederationTests): self.put(url, body={'identity_provider': body}, expected_status=201) self.put(url, body={'identity_provider': body}, - expected_status=409) + expected_status=http_client.CONFLICT) def test_get_idp(self): """Create and later fetch IdP.""" @@ -1051,7 +1052,7 @@ class FederatedIdentityProviderTests(FederationTests): self.assertIsNotNone(idp_id) url = self.base_url(suffix=idp_id) - self.get(url, expected_status=404) + self.get(url, expected_status=http_client.NOT_FOUND) def test_delete_existing_idp(self): """Create and later delete IdP. @@ -1065,7 +1066,7 @@ class FederatedIdentityProviderTests(FederationTests): self.assertIsNotNone(idp_id) url = self.base_url(suffix=idp_id) self.delete(url) - self.get(url, expected_status=404) + self.get(url, expected_status=http_client.NOT_FOUND) def test_delete_idp_also_deletes_assigned_protocols(self): """Deleting an IdP will delete its assigned protocol.""" @@ -1091,7 +1092,7 @@ class FederatedIdentityProviderTests(FederationTests): # removing IdP will remove the assigned protocol as well self.assertEqual(1, len(self.federation_api.list_protocols(idp_id))) self.delete(idp_url) - self.get(idp_url, expected_status=404) + self.get(idp_url, expected_status=http_client.NOT_FOUND) self.assertEqual(0, len(self.federation_api.list_protocols(idp_id))) def test_delete_nonexisting_idp(self): @@ -1101,7 +1102,7 @@ class FederatedIdentityProviderTests(FederationTests): """ idp_id = uuid.uuid4().hex url = self.base_url(suffix=idp_id) - self.delete(url, expected_status=404) + self.delete(url, expected_status=http_client.NOT_FOUND) def test_update_idp_mutable_attributes(self): """Update IdP's mutable parameters.""" @@ -1142,7 +1143,7 @@ class FederatedIdentityProviderTests(FederationTests): def test_update_idp_immutable_attributes(self): """Update IdP's immutable parameters. - Expect HTTP 403 code. + Expect HTTP FORBIDDEN. """ default_resp = self._create_default_idp() @@ -1156,7 +1157,8 @@ class FederatedIdentityProviderTests(FederationTests): body['protocols'] = [uuid.uuid4().hex, uuid.uuid4().hex] url = self.base_url(suffix=idp_id) - self.patch(url, body={'identity_provider': body}, expected_status=403) + self.patch(url, body={'identity_provider': body}, + expected_status=http_client.FORBIDDEN) def test_update_nonexistent_idp(self): """Update nonexistent IdP @@ -1170,7 +1172,7 @@ class FederatedIdentityProviderTests(FederationTests): body['enabled'] = False body = {'identity_provider': body} - self.patch(url, body=body, expected_status=404) + self.patch(url, body=body, expected_status=http_client.NOT_FOUND) def test_assign_protocol_to_idp(self): """Assign a protocol to existing IdP.""" @@ -1208,7 +1210,7 @@ class FederatedIdentityProviderTests(FederationTests): kwargs = {'expected_status': 201} resp, idp_id, proto = self._assign_protocol_to_idp(proto='saml2', url=url, **kwargs) - kwargs = {'expected_status': 409} + kwargs = {'expected_status': http_client.CONFLICT} resp, idp_id, proto = self._assign_protocol_to_idp(idp_id=idp_id, proto='saml2', validate=False, @@ -1222,7 +1224,7 @@ class FederatedIdentityProviderTests(FederationTests): """ idp_id = uuid.uuid4().hex - kwargs = {'expected_status': 404} + kwargs = {'expected_status': http_client.NOT_FOUND} self._assign_protocol_to_idp(proto='saml2', idp_id=idp_id, validate=False, @@ -1299,7 +1301,7 @@ class FederatedIdentityProviderTests(FederationTests): url = url % {'idp_id': idp_id, 'protocol_id': proto} self.delete(url) - self.get(url, expected_status=404) + self.get(url, expected_status=http_client.NOT_FOUND) class MappingCRUDTests(FederationTests): @@ -1364,7 +1366,7 @@ class MappingCRUDTests(FederationTests): url = url % {'mapping_id': str(mapping_id)} resp = self.delete(url) self.assertResponseStatus(resp, 204) - self.get(url, expected_status=404) + self.get(url, expected_status=http_client.NOT_FOUND) def test_mapping_get(self): url = self.MAPPING_URL + '%(mapping_id)s' @@ -1387,70 +1389,73 @@ class MappingCRUDTests(FederationTests): def test_delete_mapping_dne(self): url = self.MAPPING_URL + uuid.uuid4().hex - self.delete(url, expected_status=404) + self.delete(url, expected_status=http_client.NOT_FOUND) def test_get_mapping_dne(self): url = self.MAPPING_URL + uuid.uuid4().hex - self.get(url, expected_status=404) + self.get(url, expected_status=http_client.NOT_FOUND) def test_create_mapping_bad_requirements(self): url = self.MAPPING_URL + uuid.uuid4().hex - self.put(url, expected_status=400, + self.put(url, expected_status=http_client.BAD_REQUEST, body={'mapping': mapping_fixtures.MAPPING_BAD_REQ}) def test_create_mapping_no_rules(self): url = self.MAPPING_URL + uuid.uuid4().hex - self.put(url, expected_status=400, + self.put(url, expected_status=http_client.BAD_REQUEST, body={'mapping': mapping_fixtures.MAPPING_NO_RULES}) def test_create_mapping_no_remote_objects(self): url = self.MAPPING_URL + uuid.uuid4().hex - self.put(url, expected_status=400, + self.put(url, expected_status=http_client.BAD_REQUEST, body={'mapping': mapping_fixtures.MAPPING_NO_REMOTE}) def test_create_mapping_bad_value(self): url = self.MAPPING_URL + uuid.uuid4().hex - self.put(url, expected_status=400, + self.put(url, expected_status=http_client.BAD_REQUEST, body={'mapping': mapping_fixtures.MAPPING_BAD_VALUE}) def test_create_mapping_missing_local(self): url = self.MAPPING_URL + uuid.uuid4().hex - self.put(url, expected_status=400, + self.put(url, expected_status=http_client.BAD_REQUEST, body={'mapping': mapping_fixtures.MAPPING_MISSING_LOCAL}) def test_create_mapping_missing_type(self): url = self.MAPPING_URL + uuid.uuid4().hex - self.put(url, expected_status=400, + self.put(url, expected_status=http_client.BAD_REQUEST, body={'mapping': mapping_fixtures.MAPPING_MISSING_TYPE}) def test_create_mapping_wrong_type(self): url = self.MAPPING_URL + uuid.uuid4().hex - self.put(url, expected_status=400, + self.put(url, expected_status=http_client.BAD_REQUEST, body={'mapping': mapping_fixtures.MAPPING_WRONG_TYPE}) def test_create_mapping_extra_remote_properties_not_any_of(self): url = self.MAPPING_URL + uuid.uuid4().hex mapping = mapping_fixtures.MAPPING_EXTRA_REMOTE_PROPS_NOT_ANY_OF - self.put(url, expected_status=400, body={'mapping': mapping}) + self.put(url, expected_status=http_client.BAD_REQUEST, + body={'mapping': mapping}) def test_create_mapping_extra_remote_properties_any_one_of(self): url = self.MAPPING_URL + uuid.uuid4().hex mapping = mapping_fixtures.MAPPING_EXTRA_REMOTE_PROPS_ANY_ONE_OF - self.put(url, expected_status=400, body={'mapping': mapping}) + self.put(url, expected_status=http_client.BAD_REQUEST, + body={'mapping': mapping}) def test_create_mapping_extra_remote_properties_just_type(self): url = self.MAPPING_URL + uuid.uuid4().hex mapping = mapping_fixtures.MAPPING_EXTRA_REMOTE_PROPS_JUST_TYPE - self.put(url, expected_status=400, body={'mapping': mapping}) + self.put(url, expected_status=http_client.BAD_REQUEST, + body={'mapping': mapping}) def test_create_mapping_empty_map(self): url = self.MAPPING_URL + uuid.uuid4().hex - self.put(url, expected_status=400, + self.put(url, expected_status=http_client.BAD_REQUEST, body={'mapping': {}}) def test_create_mapping_extra_rules_properties(self): url = self.MAPPING_URL + uuid.uuid4().hex - self.put(url, expected_status=400, + self.put(url, expected_status=http_client.BAD_REQUEST, body={'mapping': mapping_fixtures.MAPPING_EXTRA_RULES_PROPS}) def test_create_mapping_with_blacklist_and_whitelist(self): @@ -1462,7 +1467,8 @@ class MappingCRUDTests(FederationTests): """ url = self.MAPPING_URL + uuid.uuid4().hex mapping = mapping_fixtures.MAPPING_GROUPS_WHITELIST_AND_BLACKLIST - self.put(url, expected_status=400, body={'mapping': mapping}) + self.put(url, expected_status=http_client.BAD_REQUEST, + body={'mapping': mapping}) class FederatedTokenTests(FederationTests, FederatedSetupMixin): @@ -1679,14 +1685,14 @@ class FederatedTokenTests(FederationTests, FederatedSetupMixin): self.federation_api.update_idp(self.IDP, enabled_false) self.v3_authenticate_token( self.TOKEN_SCOPE_PROJECT_EMPLOYEE_FROM_CUSTOMER, - expected_status=403) + expected_status=http_client.FORBIDDEN) def test_scope_to_bad_project(self): """Scope unscoped token with a project we don't have access to.""" self.v3_authenticate_token( self.TOKEN_SCOPE_PROJECT_EMPLOYEE_FROM_CUSTOMER, - expected_status=401) + expected_status=http_client.UNAUTHORIZED) def test_scope_to_project_multiple_times(self): """Try to scope the unscoped token multiple times. @@ -1725,7 +1731,7 @@ class FederatedTokenTests(FederationTests, FederatedSetupMixin): """Try to scope token from non-existent unscoped token.""" self.v3_authenticate_token( self.TOKEN_SCOPE_PROJECT_FROM_NONEXISTENT_TOKEN, - expected_status=404) + expected_status=http_client.NOT_FOUND) def test_issue_token_from_rules_without_user(self): api = auth_controllers.Auth() @@ -1779,7 +1785,7 @@ class FederatedTokenTests(FederationTests, FederatedSetupMixin): """Try to scope to a domain that has no direct roles.""" self.v3_authenticate_token( self.TOKEN_SCOPE_DOMAIN_D_FROM_CUSTOMER, - expected_status=401) + expected_status=http_client.UNAUTHORIZED) def test_list_projects(self): urls = ('/OS-FEDERATION/projects', '/auth/projects') @@ -2729,7 +2735,7 @@ class SAMLGenerationTests(FederationTests): with mock.patch.object(keystone_idp, '_sign_assertion', return_value=self.signed_assertion): self.post(self.SAML_GENERATION_ROUTE, body=body, - expected_status=403) + expected_status=http_client.FORBIDDEN) def test_generate_saml_route(self): """Test that the SAML generation endpoint produces XML. @@ -2792,7 +2798,8 @@ class SAMLGenerationTests(FederationTests): self.SERVICE_PROVDIER_ID) del body['auth']['scope'] - self.post(self.SAML_GENERATION_ROUTE, body=body, expected_status=400) + self.post(self.SAML_GENERATION_ROUTE, body=body, + expected_status=http_client.BAD_REQUEST) def test_invalid_token_body(self): """Test that missing the token in request body raises an exception. @@ -2806,7 +2813,8 @@ class SAMLGenerationTests(FederationTests): self.SERVICE_PROVDIER_ID) del body['auth']['identity']['token'] - self.post(self.SAML_GENERATION_ROUTE, body=body, expected_status=400) + self.post(self.SAML_GENERATION_ROUTE, body=body, + expected_status=http_client.BAD_REQUEST) def test_sp_not_found(self): """Test SAML generation with an invalid service provider ID. @@ -2817,7 +2825,8 @@ class SAMLGenerationTests(FederationTests): sp_id = uuid.uuid4().hex token_id = self._fetch_valid_token() body = self._create_generate_saml_request(token_id, sp_id) - self.post(self.SAML_GENERATION_ROUTE, body=body, expected_status=404) + self.post(self.SAML_GENERATION_ROUTE, body=body, + expected_status=http_client.NOT_FOUND) def test_sp_disabled(self): """Try generating assertion for disabled Service Provider.""" @@ -2829,7 +2838,8 @@ class SAMLGenerationTests(FederationTests): token_id = self._fetch_valid_token() body = self._create_generate_saml_request(token_id, self.SERVICE_PROVDIER_ID) - self.post(self.SAML_GENERATION_ROUTE, body=body, expected_status=403) + self.post(self.SAML_GENERATION_ROUTE, body=body, + expected_status=http_client.FORBIDDEN) def test_token_not_found(self): """Test that an invalid token in the request body raises an exception. @@ -2841,7 +2851,8 @@ class SAMLGenerationTests(FederationTests): token_id = uuid.uuid4().hex body = self._create_generate_saml_request(token_id, self.SERVICE_PROVDIER_ID) - self.post(self.SAML_GENERATION_ROUTE, body=body, expected_status=404) + self.post(self.SAML_GENERATION_ROUTE, body=body, + expected_status=http_client.NOT_FOUND) def test_generate_ecp_route(self): """Test that the ECP generation endpoint produces XML. @@ -3113,7 +3124,7 @@ class ServiceProviderTests(FederationTests): def test_get_service_provider_fail(self): url = self.base_url(suffix=uuid.uuid4().hex) - self.get(url, expected_status=404) + self.get(url, expected_status=http_client.NOT_FOUND) def test_create_service_provider(self): url = self.base_url(suffix=uuid.uuid4().hex) @@ -3152,7 +3163,7 @@ class ServiceProviderTests(FederationTests): sp = self.sp_ref() sp[uuid.uuid4().hex] = uuid.uuid4().hex self.put(url, body={'service_provider': sp}, - expected_status=400) + expected_status=http_client.BAD_REQUEST) def test_list_service_providers(self): """Test listing of service provider objects. @@ -3219,21 +3230,21 @@ class ServiceProviderTests(FederationTests): new_sp_ref = {'id': uuid.uuid4().hex} url = self.base_url(suffix=self.SERVICE_PROVIDER_ID) self.patch(url, body={'service_provider': new_sp_ref}, - expected_status=400) + expected_status=http_client.BAD_REQUEST) def test_update_service_provider_unknown_parameter(self): new_sp_ref = self.sp_ref() new_sp_ref[uuid.uuid4().hex] = uuid.uuid4().hex url = self.base_url(suffix=self.SERVICE_PROVIDER_ID) self.patch(url, body={'service_provider': new_sp_ref}, - expected_status=400) + expected_status=http_client.BAD_REQUEST) def test_update_service_provider_404(self): new_sp_ref = self.sp_ref() new_sp_ref['description'] = uuid.uuid4().hex url = self.base_url(suffix=uuid.uuid4().hex) self.patch(url, body={'service_provider': new_sp_ref}, - expected_status=404) + expected_status=http_client.NOT_FOUND) def test_update_sp_relay_state(self): """Update an SP with custome relay state.""" @@ -3253,7 +3264,7 @@ class ServiceProviderTests(FederationTests): def test_delete_service_provider_404(self): url = self.base_url(suffix=uuid.uuid4().hex) - self.delete(url, expected_status=404) + self.delete(url, expected_status=http_client.NOT_FOUND) class WebSSOTests(FederatedTokenTests): diff --git a/keystone/tests/unit/test_v3_identity.py b/keystone/tests/unit/test_v3_identity.py index e009082906..5fbed08d93 100644 --- a/keystone/tests/unit/test_v3_identity.py +++ b/keystone/tests/unit/test_v3_identity.py @@ -17,6 +17,7 @@ import uuid import fixtures from oslo_config import cfg +from six.moves import http_client from testtools import matchers from keystone.common import controller @@ -104,9 +105,10 @@ class IdentityTestCase(test_v3.RestfulTestCase): ref['domain_id'] = CONF.identity.default_domain_id return self.assertValidUserResponse(r, ref) - def test_create_user_400(self): + def test_create_user_bad_request(self): """Call ``POST /users``.""" - self.post('/users', body={'user': {}}, expected_status=400) + self.post('/users', body={'user': {}}, + expected_status=http_client.BAD_REQUEST) def test_list_users(self): """Call ``GET /users``.""" @@ -300,10 +302,12 @@ class IdentityTestCase(test_v3.RestfulTestCase): expected_status=200) # auth as user with original password should not work after change - self.v3_authenticate_token(old_password_auth, expected_status=401) + self.v3_authenticate_token(old_password_auth, + expected_status=http_client.UNAUTHORIZED) # auth as user with an old token should not work after change - self.v3_authenticate_token(old_token_auth, expected_status=404) + self.v3_authenticate_token(old_token_auth, + expected_status=http_client.NOT_FOUND) # new password should work new_password_auth = self.build_authentication_request( @@ -389,9 +393,10 @@ class IdentityTestCase(test_v3.RestfulTestCase): body={'group': ref}) return self.assertValidGroupResponse(r, ref) - def test_create_group_400(self): + def test_create_group_bad_request(self): """Call ``POST /groups``.""" - self.post('/groups', body={'group': {}}, expected_status=400) + self.post('/groups', body={'group': {}}, + expected_status=http_client.BAD_REQUEST) def test_list_groups(self): """Call ``GET /groups``.""" @@ -581,30 +586,32 @@ class UserSelfServiceChangingPasswordsTestCase(test_v3.RestfulTestCase): expected_status=204) # old password fails - self.get_request_token(self.user_ref['password'], expected_status=401) + self.get_request_token(self.user_ref['password'], + expected_status=http_client.UNAUTHORIZED) # old token fails - self.v3_authenticate_token(old_token_auth, expected_status=404) + self.v3_authenticate_token(old_token_auth, + expected_status=http_client.NOT_FOUND) # new password works self.get_request_token(new_password, expected_status=201) def test_changing_password_with_missing_original_password_fails(self): r = self.change_password(password=uuid.uuid4().hex, - expected_status=400) + expected_status=http_client.BAD_REQUEST) self.assertThat(r.result['error']['message'], matchers.Contains('original_password')) def test_changing_password_with_missing_password_fails(self): r = self.change_password(original_password=self.user_ref['password'], - expected_status=400) + expected_status=http_client.BAD_REQUEST) self.assertThat(r.result['error']['message'], matchers.Contains('password')) def test_changing_password_with_incorrect_password_fails(self): self.change_password(password=uuid.uuid4().hex, original_password=uuid.uuid4().hex, - expected_status=401) + expected_status=http_client.UNAUTHORIZED) def test_changing_password_with_disabled_user_fails(self): # disable the user account @@ -614,7 +621,7 @@ class UserSelfServiceChangingPasswordsTestCase(test_v3.RestfulTestCase): self.change_password(password=uuid.uuid4().hex, original_password=self.user_ref['password'], - expected_status=401) + expected_status=http_client.UNAUTHORIZED) def test_changing_password_not_logged(self): # When a user changes their password, the password isn't logged at any diff --git a/keystone/tests/unit/test_v3_oauth1.py b/keystone/tests/unit/test_v3_oauth1.py index 6c063c5eb6..8794a426d6 100644 --- a/keystone/tests/unit/test_v3_oauth1.py +++ b/keystone/tests/unit/test_v3_oauth1.py @@ -18,6 +18,7 @@ import uuid from oslo_config import cfg from oslo_serialization import jsonutils from pycadf import cadftaxonomy +from six.moves import http_client from six.moves import urllib from keystone.contrib import oauth1 @@ -182,7 +183,7 @@ class ConsumerCRUDTests(OAuth1Tests): update_ref['secret'] = uuid.uuid4().hex self.patch(self.CONSUMER_URL + '/%s' % original_id, body={'consumer': update_ref}, - expected_status=400) + expected_status=http_client.BAD_REQUEST) def test_consumer_update_bad_id(self): consumer = self._create_single_consumer() @@ -195,7 +196,7 @@ class ConsumerCRUDTests(OAuth1Tests): update_ref['id'] = update_description self.patch(self.CONSUMER_URL + '/%s' % original_id, body={'consumer': update_ref}, - expected_status=400) + expected_status=http_client.BAD_REQUEST) def test_consumer_update_normalize_field(self): # If update a consumer with a field with : or - in the name, @@ -236,7 +237,7 @@ class ConsumerCRUDTests(OAuth1Tests): def test_consumer_get_bad_id(self): self.get(self.CONSUMER_URL + '/%(consumer_id)s' % {'consumer_id': uuid.uuid4().hex}, - expected_status=404) + expected_status=http_client.NOT_FOUND) class OAuthFlowTests(OAuth1Tests): @@ -291,7 +292,7 @@ class AccessTokenCRUDTests(OAuthFlowTests): self.delete('/users/%(user)s/OS-OAUTH1/access_tokens/%(auth)s' % {'user': self.user_id, 'auth': uuid.uuid4().hex}, - expected_status=404) + expected_status=http_client.NOT_FOUND) def test_list_no_access_tokens(self): resp = self.get('/users/%(user_id)s/OS-OAUTH1/access_tokens' @@ -316,7 +317,7 @@ class AccessTokenCRUDTests(OAuthFlowTests): self.get('/users/%(user_id)s/OS-OAUTH1/access_tokens/%(key)s' % {'user_id': self.user_id, 'key': uuid.uuid4().hex}, - expected_status=404) + expected_status=http_client.NOT_FOUND) def test_list_all_roles_in_access_token(self): self.test_oauth_flow() @@ -341,7 +342,7 @@ class AccessTokenCRUDTests(OAuthFlowTests): url = ('/users/%(id)s/OS-OAUTH1/access_tokens/%(key)s/roles/%(role)s' % {'id': self.user_id, 'key': self.access_token.key, 'role': uuid.uuid4().hex}) - self.get(url, expected_status=404) + self.get(url, expected_status=http_client.NOT_FOUND) def test_list_and_delete_access_tokens(self): self.test_oauth_flow() @@ -405,7 +406,7 @@ class AuthTokenTests(OAuthFlowTests): headers = {'X-Subject-Token': self.keystone_token_id, 'X-Auth-Token': self.keystone_token_id} self.get('/auth/tokens', headers=headers, - expected_status=404) + expected_status=http_client.NOT_FOUND) def test_deleting_consumer_also_deletes_tokens(self): self.test_oauth_flow() @@ -426,7 +427,7 @@ class AuthTokenTests(OAuthFlowTests): headers = {'X-Subject-Token': self.keystone_token_id, 'X-Auth-Token': self.keystone_token_id} self.head('/auth/tokens', headers=headers, - expected_status=404) + expected_status=http_client.NOT_FOUND) def test_change_user_password_also_deletes_tokens(self): self.test_oauth_flow() @@ -445,7 +446,7 @@ class AuthTokenTests(OAuthFlowTests): headers = {'X-Subject-Token': self.keystone_token_id, 'X-Auth-Token': self.keystone_token_id} self.admin_request(path='/auth/tokens', headers=headers, - method='GET', expected_status=404) + method='GET', expected_status=http_client.NOT_FOUND) def test_deleting_project_also_invalidates_tokens(self): self.test_oauth_flow() @@ -462,7 +463,7 @@ class AuthTokenTests(OAuthFlowTests): headers = {'X-Subject-Token': self.keystone_token_id, 'X-Auth-Token': self.keystone_token_id} self.admin_request(path='/auth/tokens', headers=headers, - method='GET', expected_status=404) + method='GET', expected_status=http_client.NOT_FOUND) def test_token_chaining_is_not_allowed(self): self.test_oauth_flow() @@ -477,7 +478,7 @@ class AuthTokenTests(OAuthFlowTests): body=auth_data, token=self.keystone_token_id, method='POST', - expected_status=403) + expected_status=http_client.FORBIDDEN) def test_delete_keystone_tokens_by_consumer_id(self): self.test_oauth_flow() @@ -545,14 +546,14 @@ class AuthTokenTests(OAuthFlowTests): self.post('/OS-TRUST/trusts', body={'trust': ref}, token=self.keystone_token_id, - expected_status=403) + expected_status=http_client.FORBIDDEN) def test_oauth_token_cannot_authorize_request_token(self): self.test_oauth_flow() url = self._approve_request_token_url() body = {'roles': [{'id': self.role_id}]} self.put(url, body=body, token=self.keystone_token_id, - expected_status=403) + expected_status=http_client.FORBIDDEN) def test_oauth_token_cannot_list_request_tokens(self): self._set_policy({"identity:list_access_tokens": [], @@ -561,7 +562,7 @@ class AuthTokenTests(OAuthFlowTests): self.test_oauth_flow() url = '/users/%s/OS-OAUTH1/access_tokens' % self.user_id self.get(url, token=self.keystone_token_id, - expected_status=403) + expected_status=http_client.FORBIDDEN) def _set_policy(self, new_policy): self.tempfile = self.useFixture(temporaryfile.SecureTempFile()) @@ -575,14 +576,16 @@ class AuthTokenTests(OAuthFlowTests): trust_token = self._create_trust_get_token() url = self._approve_request_token_url() body = {'roles': [{'id': self.role_id}]} - self.put(url, body=body, token=trust_token, expected_status=403) + self.put(url, body=body, token=trust_token, + expected_status=http_client.FORBIDDEN) def test_trust_token_cannot_list_request_tokens(self): self._set_policy({"identity:list_access_tokens": [], "identity:create_trust": []}) trust_token = self._create_trust_get_token() url = '/users/%s/OS-OAUTH1/access_tokens' % self.user_id - self.get(url, token=trust_token, expected_status=403) + self.get(url, token=trust_token, + expected_status=http_client.FORBIDDEN) class MaliciousOAuth1Tests(OAuth1Tests): @@ -592,7 +595,8 @@ class MaliciousOAuth1Tests(OAuth1Tests): consumer_id = consumer['id'] consumer = {'key': consumer_id, 'secret': uuid.uuid4().hex} url, headers = self._create_request_token(consumer, self.project_id) - self.post(url, headers=headers, expected_status=401) + self.post(url, headers=headers, + expected_status=http_client.UNAUTHORIZED) def test_bad_request_token_key(self): consumer = self._create_single_consumer() @@ -605,7 +609,7 @@ class MaliciousOAuth1Tests(OAuth1Tests): response_content_type='application/x-www-urlformencoded') url = self._authorize_request_token(uuid.uuid4().hex) body = {'roles': [{'id': self.role_id}]} - self.put(url, body=body, expected_status=404) + self.put(url, body=body, expected_status=http_client.NOT_FOUND) def test_bad_consumer_id(self): consumer = self._create_single_consumer() @@ -613,7 +617,7 @@ class MaliciousOAuth1Tests(OAuth1Tests): consumer_secret = consumer['secret'] consumer = {'key': consumer_id, 'secret': consumer_secret} url, headers = self._create_request_token(consumer, self.project_id) - self.post(url, headers=headers, expected_status=404) + self.post(url, headers=headers, expected_status=http_client.NOT_FOUND) def test_bad_requested_project_id(self): consumer = self._create_single_consumer() @@ -622,7 +626,7 @@ class MaliciousOAuth1Tests(OAuth1Tests): consumer = {'key': consumer_id, 'secret': consumer_secret} project_id = uuid.uuid4().hex url, headers = self._create_request_token(consumer, project_id) - self.post(url, headers=headers, expected_status=404) + self.post(url, headers=headers, expected_status=http_client.NOT_FOUND) def test_bad_verifier(self): consumer = self._create_single_consumer() @@ -647,7 +651,8 @@ class MaliciousOAuth1Tests(OAuth1Tests): request_token.set_verifier(uuid.uuid4().hex) url, headers = self._create_access_token(consumer, request_token) - self.post(url, headers=headers, expected_status=401) + self.post(url, headers=headers, + expected_status=http_client.UNAUTHORIZED) def test_bad_authorizing_roles(self): consumer = self._create_single_consumer() @@ -667,7 +672,7 @@ class MaliciousOAuth1Tests(OAuth1Tests): url = self._authorize_request_token(request_key) body = {'roles': [{'id': self.role_id}]} self.admin_request(path=url, method='PUT', - body=body, expected_status=404) + body=body, expected_status=http_client.NOT_FOUND) def test_expired_authorizing_request_token(self): self.config_fixture.config(group='oauth1', request_token_duration=-1) @@ -691,7 +696,7 @@ class MaliciousOAuth1Tests(OAuth1Tests): url = self._authorize_request_token(request_key) body = {'roles': [{'id': self.role_id}]} - self.put(url, body=body, expected_status=401) + self.put(url, body=body, expected_status=http_client.UNAUTHORIZED) def test_expired_creating_keystone_token(self): self.config_fixture.config(group='oauth1', access_token_duration=-1) @@ -731,7 +736,8 @@ class MaliciousOAuth1Tests(OAuth1Tests): url, headers, body = self._get_oauth_token(self.consumer, self.access_token) - self.post(url, headers=headers, body=body, expected_status=401) + self.post(url, headers=headers, body=body, + expected_status=http_client.UNAUTHORIZED) def test_missing_oauth_headers(self): endpoint = '/OS-OAUTH1/request_token' diff --git a/keystone/tests/unit/test_v3_os_revoke.py b/keystone/tests/unit/test_v3_os_revoke.py index 48226cd41e..86ced7248d 100644 --- a/keystone/tests/unit/test_v3_os_revoke.py +++ b/keystone/tests/unit/test_v3_os_revoke.py @@ -15,6 +15,7 @@ import uuid from oslo_utils import timeutils import six +from six.moves import http_client from testtools import matchers from keystone.common import utils @@ -112,7 +113,8 @@ class OSRevokeTests(test_v3.RestfulTestCase, test_v3.JsonHomeTestMixin): self.assertReportedEventMatchesRecorded(events[0], sample, before_time) def test_list_since_invalid(self): - self.get('/OS-REVOKE/events?since=blah', expected_status=400) + self.get('/OS-REVOKE/events?since=blah', + expected_status=http_client.BAD_REQUEST) def test_list_since_valid(self): resp = self.get('/OS-REVOKE/events?since=2013-02-27T18:30:59.999999Z') diff --git a/keystone/tests/unit/test_v3_protection.py b/keystone/tests/unit/test_v3_protection.py index 458c61de12..578045b39a 100644 --- a/keystone/tests/unit/test_v3_protection.py +++ b/keystone/tests/unit/test_v3_protection.py @@ -17,6 +17,7 @@ import uuid from oslo_config import cfg from oslo_serialization import jsonutils +from six.moves import http_client from keystone import exception from keystone.policy.backends import rules @@ -428,7 +429,8 @@ class IdentityTestPolicySample(test_v3.RestfulTestCase): user2_token = self.get_requested_token(user2_auth) self.get('/auth/tokens', token=user1_token, - headers={'X-Subject-Token': user2_token}, expected_status=403) + headers={'X-Subject-Token': user2_token}, + expected_status=http_client.FORBIDDEN) def test_admin_validate_user_token(self): # An admin can validate a user's token. @@ -490,7 +492,7 @@ class IdentityTestPolicySample(test_v3.RestfulTestCase): self.head('/auth/tokens', token=user1_token, headers={'X-Subject-Token': user2_token}, - expected_status=403) + expected_status=http_client.FORBIDDEN) def test_admin_check_user_token(self): # An admin can check a user's token. @@ -552,7 +554,7 @@ class IdentityTestPolicySample(test_v3.RestfulTestCase): self.delete('/auth/tokens', token=user1_token, headers={'X-Subject-Token': user2_token}, - expected_status=403) + expected_status=http_client.FORBIDDEN) def test_admin_revoke_user_token(self): # An admin can revoke a user's token. @@ -948,7 +950,8 @@ class IdentityTestv3CloudPolicySample(test_v3.RestfulTestCase, collection_url = self.build_role_assignment_query_url( domain_id=self.domainB['id']) - self.get(collection_url, auth=self.auth, expected_status=403) + self.get(collection_url, auth=self.auth, + expected_status=http_client.FORBIDDEN) def test_domain_user_list_assignments_of_domain_failed(self): self.auth = self.build_authentication_request( @@ -958,7 +961,8 @@ class IdentityTestv3CloudPolicySample(test_v3.RestfulTestCase, collection_url = self.build_role_assignment_query_url( domain_id=self.domainA['id']) - self.get(collection_url, auth=self.auth, expected_status=403) + self.get(collection_url, auth=self.auth, + expected_status=http_client.FORBIDDEN) def test_cloud_admin_list_assignments_of_project(self): self.auth = self.build_authentication_request( @@ -1021,7 +1025,8 @@ class IdentityTestv3CloudPolicySample(test_v3.RestfulTestCase, collection_url = self.build_role_assignment_query_url( project_id=self.project['id']) - self.get(collection_url, auth=self.auth, expected_status=403) + self.get(collection_url, auth=self.auth, + expected_status=http_client.FORBIDDEN) def test_cloud_admin(self): self.auth = self.build_authentication_request( @@ -1145,7 +1150,8 @@ class IdentityTestv3CloudPolicySample(test_v3.RestfulTestCase, user2_token = self.get_requested_token(user2_auth) self.get('/auth/tokens', token=user1_token, - headers={'X-Subject-Token': user2_token}, expected_status=403) + headers={'X-Subject-Token': user2_token}, + expected_status=http_client.FORBIDDEN) def test_admin_validate_user_token(self): # An admin can validate a user's token. @@ -1207,7 +1213,7 @@ class IdentityTestv3CloudPolicySample(test_v3.RestfulTestCase, self.head('/auth/tokens', token=user1_token, headers={'X-Subject-Token': user2_token}, - expected_status=403) + expected_status=http_client.FORBIDDEN) def test_admin_check_user_token(self): # An admin can check a user's token. @@ -1269,7 +1275,7 @@ class IdentityTestv3CloudPolicySample(test_v3.RestfulTestCase, self.delete('/auth/tokens', token=user1_token, headers={'X-Subject-Token': user2_token}, - expected_status=403) + expected_status=http_client.FORBIDDEN) def test_admin_revoke_user_token(self): # An admin can revoke a user's token. diff --git a/keystone/tests/unit/test_versions.py b/keystone/tests/unit/test_versions.py index 744386ffde..e84b129554 100644 --- a/keystone/tests/unit/test_versions.py +++ b/keystone/tests/unit/test_versions.py @@ -20,6 +20,7 @@ import random import mock from oslo_config import cfg from oslo_serialization import jsonutils +from six.moves import http_client from testtools import matchers as tt_matchers from keystone.common import json_home @@ -788,7 +789,7 @@ class VersionTestCase(tests.TestCase): client = tests.TestClient(self.public_app) # request to /v2.0 should fail resp = client.get('/v2.0/') - self.assertEqual(404, resp.status_int) + self.assertEqual(http_client.NOT_FOUND, resp.status_int) # request to /v3 should pass resp = client.get('/v3/') @@ -821,7 +822,7 @@ class VersionTestCase(tests.TestCase): client = tests.TestClient(self.public_app) # request to /v3 should fail resp = client.get('/v3/') - self.assertEqual(404, resp.status_int) + self.assertEqual(http_client.NOT_FOUND, resp.status_int) # request to /v2.0 should pass resp = client.get('/v2.0/') diff --git a/keystone/tests/unit/test_wsgi.py b/keystone/tests/unit/test_wsgi.py index 62156bd589..13b4a43888 100644 --- a/keystone/tests/unit/test_wsgi.py +++ b/keystone/tests/unit/test_wsgi.py @@ -23,6 +23,7 @@ import mock import oslo_i18n from oslo_serialization import jsonutils import six +from six.moves import http_client from testtools import matchers import webob @@ -195,14 +196,14 @@ class ApplicationTest(BaseWSGITest): def test_render_exception(self): e = exception.Unauthorized(message=u'\u7f51\u7edc') resp = wsgi.render_exception(e) - self.assertEqual(401, resp.status_int) + self.assertEqual(http_client.UNAUTHORIZED, resp.status_int) def test_render_exception_host(self): e = exception.Unauthorized(message=u'\u7f51\u7edc') context = {'host_url': 'http://%s:5000' % uuid.uuid4().hex} resp = wsgi.render_exception(e, context=context) - self.assertEqual(401, resp.status_int) + self.assertEqual(http_client.UNAUTHORIZED, resp.status_int) def test_improperly_encoded_params(self): class FakeApp(wsgi.Application):