# vim: tabstop=4 shiftwidth=4 softtabstop=4 # # Licensed under the Apache License, Version 2.0 (the "License"); you may # not use this file except in compliance with the License. You may obtain # a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations # under the License. import os import random import uuid from keystone.common.sql import migration from keystone import config from keystone import contrib from keystone.contrib.federation import utils as mapping_utils from keystone.openstack.common import importutils from keystone.openstack.common import jsonutils from keystone.openstack.common import log from keystone.tests import mapping_fixtures from keystone.tests import test_v3 CONF = config.CONF LOG = log.getLogger(__name__) def dummy_validator(*args, **kwargs): pass class FederationTests(test_v3.RestfulTestCase): EXTENSION_NAME = 'federation' EXTENSION_TO_ADD = 'federation_extension' def setup_database(self): super(FederationTests, self).setup_database() package_name = "%s.%s.migrate_repo" % (contrib.__name__, self.EXTENSION_NAME) package = importutils.import_module(package_name) self.repo_path = os.path.abspath(os.path.dirname(package.__file__)) migration.db_version_control(version=None, repo_path=self.repo_path) migration.db_sync(version=None, repo_path=self.repo_path) class FederatedIdentityProviderTests(FederationTests): """A test class for Identity Providers.""" idp_keys = ['description', 'enabled'] default_body = {'description': None, 'enabled': True} def base_url(self, suffix=None): if suffix is not None: return '/OS-FEDERATION/identity_providers/' + str(suffix) return '/OS-FEDERATION/identity_providers' def _fetch_attribute_from_response(self, resp, parameter, assert_is_not_none=True): """Fetch single attribute from TestResponse object.""" result = resp.result.get(parameter, None) if assert_is_not_none: self.assertIsNotNone(result) return result def _fetch_attributes_from_response(self, resp, parameters=[], assert_is_not_none=True): """Fetch parameters from the TestResponse object.""" result = dict() kwargs = {'assert_is_not_none': assert_is_not_none} for parameter in parameters: value = self._fetch_attribute_from_response(resp, parameter, **kwargs) result[parameter] = value return result def _create_and_decapsulate_response(self, body=None): """Create IdP and fetch it's random id along with entity.""" default_resp = self._create_default_idp(body=body) idp = self._fetch_attribute_from_response(default_resp, 'identity_provider') self.assertIsNotNone(idp) idp_id = idp.get('id') return (idp_id, idp) def _get_idp(self, idp_id): """Fetch IdP entity based on it's id.""" url = self.base_url(suffix=idp_id) resp = self.get(url) return resp def _create_default_idp(self, body=None): """Create default IdP.""" url = self.base_url(suffix=uuid.uuid4().hex) if body is None: body = self._http_idp_input() resp = self.put(url, body={'identity_provider': body}, expected_status=201) return resp def _http_idp_input(self, **kwargs): """Create default input for IdP data.""" body = None if 'body' not in kwargs: body = self.default_body.copy() body['description'] = uuid.uuid4().hex else: body = kwargs['body'] return body def _assign_protocol_to_idp(self, idp_id=None, proto=None, url=None, mapping_id=None, validate=True, **kwargs): if url is None: url = self.base_url(suffix='%(idp_id)s/protocols/%(protocol_id)s') if idp_id is None: idp_id, _ = self._create_and_decapsulate_response() if proto is None: proto = uuid.uuid4().hex if mapping_id is None: mapping_id = uuid.uuid4().hex body = {'mapping_id': mapping_id} url = url % {'idp_id': idp_id, 'protocol_id': proto} resp = self.put(url, body={'protocol': body}, **kwargs) if validate: self.assertValidResponse(resp, 'protocol', dummy_validator, keys_to_check=['id', 'mapping_id'], ref={'id': proto, 'mapping_id': mapping_id}) return (resp, idp_id, proto) def _get_protocol(self, idp_id, protocol_id): url = "%s/protocols/%s" % (idp_id, protocol_id) url = self.base_url(suffix=url) r = self.get(url) return r def test_create_idp(self): """Creates the IdentityProvider entity.""" keys_to_check = self.idp_keys body = self._http_idp_input() resp = self._create_default_idp(body=body) self.assertValidResponse(resp, 'identity_provider', dummy_validator, keys_to_check=keys_to_check, ref=body) def test_list_idps(self, iterations=5): """Lists all available IdentityProviders. This test collects ids of created IdPs and intersects it with the list of all available IdPs. List of all IdPs can be a superset of IdPs created in this test, because other tests also create IdPs. """ def get_id(resp): r = self._fetch_attribute_from_response(resp, 'identity_provider') return r.get('id') ids = [] for _ in xrange(iterations): id = get_id(self._create_default_idp()) ids.append(id) ids = set(ids) keys_to_check = self.idp_keys url = self.base_url() resp = self.get(url) self.assertValidListResponse(resp, 'identity_providers', dummy_validator, keys_to_check=keys_to_check) entities = self._fetch_attribute_from_response(resp, 'identity_providers') entities_ids = set([e['id'] for e in entities]) ids_intersection = entities_ids.intersection(ids) self.assertEqual(ids_intersection, ids) def test_check_idp_uniqueness(self): """Add same IdP twice. Expect HTTP 409 code for the latter call. """ url = self.base_url(suffix=uuid.uuid4().hex) body = self._http_idp_input() self.put(url, body={'identity_provider': body}, expected_status=201) self.put(url, body={'identity_provider': body}, expected_status=409) def test_get_idp(self): """Create and later fetch IdP.""" body = self._http_idp_input() default_resp = self._create_default_idp(body=body) default_idp = self._fetch_attribute_from_response(default_resp, 'identity_provider') idp_id = default_idp.get('id') url = self.base_url(suffix=idp_id) resp = self.get(url) self.assertValidResponse(resp, 'identity_provider', dummy_validator, keys_to_check=body.keys(), ref=body) def test_get_nonexisting_idp(self): """Fetch nonexisting IdP entity. Expected HTTP 404 status code. """ idp_id = uuid.uuid4().hex self.assertIsNotNone(idp_id) url = self.base_url(suffix=idp_id) self.get(url, expected_status=404) def test_delete_existing_idp(self): """Create and later delete IdP. Expect HTTP 404 for the GET IdP call. """ default_resp = self._create_default_idp() default_idp = self._fetch_attribute_from_response(default_resp, 'identity_provider') idp_id = default_idp.get('id') self.assertIsNotNone(idp_id) url = self.base_url(suffix=idp_id) self.delete(url) self.get(url, expected_status=404) def test_delete_nonexisting_idp(self): """Delete nonexisting IdP. Expect HTTP 404 for the GET IdP call. """ idp_id = uuid.uuid4().hex url = self.base_url(suffix=idp_id) self.delete(url, expected_status=404) def test_update_idp_mutable_attributes(self): """Update IdP's mutable parameters.""" default_resp = self._create_default_idp() default_idp = self._fetch_attribute_from_response(default_resp, 'identity_provider') idp_id = default_idp.get('id') url = self.base_url(suffix=idp_id) self.assertIsNotNone(idp_id) _enabled = not default_idp.get('enabled') body = {'description': uuid.uuid4().hex, 'enabled': _enabled} body = {'identity_provider': body} resp = self.patch(url, body=body) updated_idp = self._fetch_attribute_from_response(resp, 'identity_provider') body = body['identity_provider'] for key in body.keys(): self.assertEqual(body[key], updated_idp.get(key)) resp = self.get(url) updated_idp = self._fetch_attribute_from_response(resp, 'identity_provider') for key in body.keys(): self.assertEqual(body[key], updated_idp.get(key)) def test_update_idp_immutable_attributes(self): """Update IdP's immutable parameters. Expect HTTP 403 code. """ default_resp = self._create_default_idp() default_idp = self._fetch_attribute_from_response(default_resp, 'identity_provider') idp_id = default_idp.get('id') self.assertIsNotNone(idp_id) body = self._http_idp_input() body['id'] = uuid.uuid4().hex body['protocols'] = [uuid.uuid4().hex, uuid.uuid4().hex] url = self.base_url(suffix=idp_id) self.patch(url, body={'identity_provider': body}, expected_status=403) def test_update_nonexistent_idp(self): """Update nonexistent IdP Expect HTTP 404 code. """ idp_id = uuid.uuid4().hex url = self.base_url(suffix=idp_id) body = self._http_idp_input() body['enabled'] = False body = {'identity_provider': body} self.patch(url, body=body, expected_status=404) def test_assign_protocol_to_idp(self): """Assign a protocol to existing IdP.""" self._assign_protocol_to_idp(expected_status=201) def test_protocol_composite_pk(self): """Test whether Keystone let's add two entities with identical names, however attached to different IdPs. 1. Add IdP and assign it protocol with predefined name 2. Add another IdP and assign it a protocol with same name. Expect HTTP 201 code """ url = self.base_url(suffix='%(idp_id)s/protocols/%(protocol_id)s') kwargs = {'expected_status': 201} self._assign_protocol_to_idp(proto='saml2', url=url, **kwargs) self._assign_protocol_to_idp(proto='saml2', url=url, **kwargs) def test_protocol_idp_pk_uniqueness(self): """Test whether Keystone checks for unique idp/protocol values. Add same protocol twice, expect Keystone to reject a latter call and return HTTP 409 code. """ url = self.base_url(suffix='%(idp_id)s/protocols/%(protocol_id)s') kwargs = {'expected_status': 201} resp, idp_id, proto = self._assign_protocol_to_idp(proto='saml2', url=url, **kwargs) kwargs = {'expected_status': 409} resp, idp_id, proto = self._assign_protocol_to_idp(idp_id=idp_id, proto='saml2', validate=False, url=url, **kwargs) def test_assign_protocol_to_nonexistent_idp(self): """Assign protocol to IdP that doesn't exist. Expect HTTP 404 code. """ idp_id = uuid.uuid4().hex kwargs = {'expected_status': 404} self._assign_protocol_to_idp(proto='saml2', idp_id=idp_id, validate=False, **kwargs) def test_get_protocol(self): """Create and later fetch protocol tied to IdP.""" resp, idp_id, proto = self._assign_protocol_to_idp(expected_status=201) proto_id = self._fetch_attribute_from_response(resp, 'protocol')['id'] url = "%s/protocols/%s" % (idp_id, proto_id) url = self.base_url(suffix=url) resp = self.get(url) reference = {'id': proto_id} self.assertValidResponse(resp, 'protocol', dummy_validator, keys_to_check=reference.keys(), ref=reference) def test_list_protocols(self): """Create set of protocols and later list them. Compare input and output id sets. """ resp, idp_id, proto = self._assign_protocol_to_idp(expected_status=201) iterations = random.randint(0, 16) protocol_ids = [] for _ in xrange(iterations): resp, _, proto = self._assign_protocol_to_idp(idp_id=idp_id, expected_status=201) proto_id = self._fetch_attribute_from_response(resp, 'protocol') proto_id = proto_id['id'] protocol_ids.append(proto_id) url = "%s/protocols" % idp_id url = self.base_url(suffix=url) resp = self.get(url) self.assertValidListResponse(resp, 'protocols', dummy_validator, keys_to_check=['id']) entities = self._fetch_attribute_from_response(resp, 'protocols') entities = set([entity['id'] for entity in entities]) protocols_intersection = entities.intersection(protocol_ids) self.assertEqual(protocols_intersection, set(protocol_ids)) def test_update_protocols_attribute(self): """Update protocol's attribute.""" resp, idp_id, proto = self._assign_protocol_to_idp(expected_status=201) new_mapping_id = uuid.uuid4().hex url = "%s/protocols/%s" % (idp_id, proto) url = self.base_url(suffix=url) body = {'mapping_id': new_mapping_id} resp = self.patch(url, body={'protocol': body}) self.assertValidResponse(resp, 'protocol', dummy_validator, keys_to_check=['id', 'mapping_id'], ref={'id': proto, 'mapping_id': new_mapping_id} ) def test_delete_protocol(self): """Delete protocol. Expect HTTP 404 code for the GET call after the protocol is deleted. """ url = self.base_url(suffix='/%(idp_id)s/' 'protocols/%(protocol_id)s') resp, idp_id, proto = self._assign_protocol_to_idp(expected_status=201) url = url % {'idp_id': idp_id, 'protocol_id': proto} self.delete(url) self.get(url, expected_status=404) class MappingCRUDTests(FederationTests): """A class for testing CRUD operations for Mappings.""" MAPPING_URL = '/OS-FEDERATION/mappings/' def assertValidMappingListResponse(self, resp, *args, **kwargs): return self.assertValidListResponse( resp, 'mappings', self.assertValidMapping, keys_to_check=[], *args, **kwargs) def assertValidMappingResponse(self, resp, *args, **kwargs): return self.assertValidResponse( resp, 'mapping', self.assertValidMapping, keys_to_check=[], *args, **kwargs) def assertValidMapping(self, entity, ref=None): self.assertIsNotNone(entity.get('id')) self.assertIsNotNone(entity.get('rules')) if ref: self.assertEqual(jsonutils.loads(entity['rules']), ref['rules']) return entity def _create_default_mapping_entry(self): url = self.MAPPING_URL + uuid.uuid4().hex resp = self.put(url, body={'mapping': mapping_fixtures.MAPPING_LARGE}, expected_status=201) return resp def _get_id_from_response(self, resp): r = resp.result.get('mapping') return r.get('id') def test_mapping_create(self): resp = self._create_default_mapping_entry() self.assertValidMappingResponse(resp, mapping_fixtures.MAPPING_LARGE) def test_mapping_list(self): url = self.MAPPING_URL self._create_default_mapping_entry() resp = self.get(url) entities = resp.result.get('mappings') self.assertIsNotNone(entities) self.assertResponseStatus(resp, 200) self.assertValidListLinks(resp.result.get('links')) self.assertEqual(len(entities), 1) def test_mapping_delete(self): url = self.MAPPING_URL + '%(mapping_id)s' resp = self._create_default_mapping_entry() mapping_id = self._get_id_from_response(resp) url = url % {'mapping_id': str(mapping_id)} resp = self.delete(url) self.assertResponseStatus(resp, 204) self.get(url, expected_status=404) def test_mapping_get(self): url = self.MAPPING_URL + '%(mapping_id)s' resp = self._create_default_mapping_entry() mapping_id = self._get_id_from_response(resp) url = url % {'mapping_id': mapping_id} resp = self.get(url) self.assertValidMappingResponse(resp, mapping_fixtures.MAPPING_LARGE) def test_mapping_update(self): url = self.MAPPING_URL + '%(mapping_id)s' resp = self._create_default_mapping_entry() mapping_id = self._get_id_from_response(resp) url = url % {'mapping_id': mapping_id} resp = self.patch(url, body={'mapping': mapping_fixtures.MAPPING_SMALL}) self.assertValidMappingResponse(resp, mapping_fixtures.MAPPING_SMALL) resp = self.get(url) self.assertValidMappingResponse(resp, mapping_fixtures.MAPPING_SMALL) def test_delete_mapping_dne(self): url = self.MAPPING_URL + uuid.uuid4().hex self.delete(url, expected_status=404) def test_get_mapping_dne(self): url = self.MAPPING_URL + uuid.uuid4().hex self.get(url, expected_status=404) def test_create_mapping_bad_requirements(self): url = self.MAPPING_URL + uuid.uuid4().hex self.put(url, expected_status=400, body={'mapping': mapping_fixtures.MAPPING_BAD_REQ}) def test_create_mapping_no_rules(self): url = self.MAPPING_URL + uuid.uuid4().hex self.put(url, expected_status=400, body={'mapping': mapping_fixtures.MAPPING_NO_RULES}) def test_create_mapping_no_remote_objects(self): url = self.MAPPING_URL + uuid.uuid4().hex self.put(url, expected_status=400, body={'mapping': mapping_fixtures.MAPPING_NO_REMOTE}) def test_create_mapping_bad_value(self): url = self.MAPPING_URL + uuid.uuid4().hex self.put(url, expected_status=400, body={'mapping': mapping_fixtures.MAPPING_BAD_VALUE}) def test_create_mapping_missing_local(self): url = self.MAPPING_URL + uuid.uuid4().hex self.put(url, expected_status=400, body={'mapping': mapping_fixtures.MAPPING_MISSING_LOCAL}) def test_create_mapping_missing_type(self): url = self.MAPPING_URL + uuid.uuid4().hex self.put(url, expected_status=400, body={'mapping': mapping_fixtures.MAPPING_MISSING_TYPE}) def test_create_mapping_wrong_type(self): url = self.MAPPING_URL + uuid.uuid4().hex self.put(url, expected_status=400, body={'mapping': mapping_fixtures.MAPPING_WRONG_TYPE}) def test_create_mapping_extra_remote_properties_not_any_of(self): url = self.MAPPING_URL + uuid.uuid4().hex mapping = mapping_fixtures.MAPPING_EXTRA_REMOTE_PROPS_NOT_ANY_OF self.put(url, expected_status=400, body={'mapping': mapping}) def test_create_mapping_extra_remote_properties_any_one_of(self): url = self.MAPPING_URL + uuid.uuid4().hex mapping = mapping_fixtures.MAPPING_EXTRA_REMOTE_PROPS_ANY_ONE_OF self.put(url, expected_status=400, body={'mapping': mapping}) def test_create_mapping_extra_remote_properties_just_type(self): url = self.MAPPING_URL + uuid.uuid4().hex mapping = mapping_fixtures.MAPPING_EXTRA_REMOTE_PROPS_JUST_TYPE self.put(url, expected_status=400, body={'mapping': mapping}) def test_create_mapping_empty_map(self): url = self.MAPPING_URL + uuid.uuid4().hex self.put(url, expected_status=400, body={'mapping': {}}) def test_create_mapping_extra_rules_properties(self): url = self.MAPPING_URL + uuid.uuid4().hex self.put(url, expected_status=400, body={'mapping': mapping_fixtures.MAPPING_EXTRA_RULES_PROPS}) class MappingRuleEngineTests(FederationTests): """A class for testing the mapping rule engine.""" def test_rule_engine_any_one_of_and_direct_mapping(self): """Should return user's name and group id EMPLOYEE_GROUP_ID. The ADMIN_ASSERTION should successfully have a match in MAPPING_LARGE. The will test the case where `any_one_of` is valid, and there is a direct mapping for the users name. """ mapping = mapping_fixtures.MAPPING_LARGE assertion = mapping_fixtures.ADMIN_ASSERTION rp = mapping_utils.RuleProcessor(mapping['rules']) values = rp.process(assertion) fn = mapping_fixtures.ADMIN_ASSERTION.get('FirstName') ln = mapping_fixtures.ADMIN_ASSERTION.get('LastName') full_name = '%s %s' % (fn, ln) group_ids = values.get('group_ids') name = values.get('name') self.assertIn(mapping_fixtures.EMPLOYEE_GROUP_ID, group_ids) self.assertEqual(name, full_name) def test_rule_engine_no_regex_match(self): """Should return no values, the email of the tester won't match. This will not match since the email in the assertion will fail the regex test. It is set to match any @example.com address. But the incoming value is set to eviltester@example.org. """ mapping = mapping_fixtures.MAPPING_LARGE assertion = mapping_fixtures.BAD_TESTER_ASSERTION rp = mapping_utils.RuleProcessor(mapping['rules']) values = rp.process(assertion) group_ids = values.get('group_ids') name = values.get('name') self.assertIsNone(name) self.assertEqual(group_ids, []) def test_rule_engine_any_one_of_many_rules(self): """Should return group CONTRACTOR_GROUP_ID. The CONTRACTOR_ASSERTION should successfully have a match in MAPPING_SMALL. This will test the case where many rules must be matched, including an `any_one_of`, and a direct mapping. """ mapping = mapping_fixtures.MAPPING_SMALL assertion = mapping_fixtures.CONTRACTOR_ASSERTION rp = mapping_utils.RuleProcessor(mapping['rules']) values = rp.process(assertion) group_ids = values.get('group_ids') name = values.get('name') self.assertIsNone(name) self.assertIn(mapping_fixtures.CONTRACTOR_GROUP_ID, group_ids) def test_rule_engine_not_any_of_and_direct_mapping(self): """Should return user's name and email. The CUSTOMER_ASSERTION should successfully have a match in MAPPING_LARGE. This will test the case where a requirement has `not_any_of`, and direct mapping to a username, no group. """ mapping = mapping_fixtures.MAPPING_LARGE assertion = mapping_fixtures.CUSTOMER_ASSERTION rp = mapping_utils.RuleProcessor(mapping['rules']) values = rp.process(assertion) user_name = mapping_fixtures.CUSTOMER_ASSERTION.get('UserName') group_ids = values.get('group_ids') name = values.get('name') self.assertEqual(name, user_name) self.assertEqual(group_ids, []) def test_rule_engine_not_any_of_many_rules(self): """Should return group EMPLOYEE_GROUP_ID. The EMPLOYEE_ASSERTION should successfully have a match in MAPPING_SMALL. This will test the case where many remote rules must be matched, including a `not_any_of`. """ mapping = mapping_fixtures.MAPPING_SMALL assertion = mapping_fixtures.EMPLOYEE_ASSERTION rp = mapping_utils.RuleProcessor(mapping['rules']) values = rp.process(assertion) group_ids = values.get('group_ids') name = values.get('name') self.assertIsNone(name) self.assertIn(mapping_fixtures.EMPLOYEE_GROUP_ID, group_ids) def test_rule_engine_regex_match_and_many_groups(self): """Should return group DEVELOPER_GROUP_ID and TESTER_GROUP_ID. The TESTER_ASSERTION should successfully have a match in MAPPING_LARGE. This will test a successful regex match for an `any_one_of` evaluation type, and will have many groups returned. """ mapping = mapping_fixtures.MAPPING_LARGE assertion = mapping_fixtures.TESTER_ASSERTION rp = mapping_utils.RuleProcessor(mapping['rules']) values = rp.process(assertion) group_ids = values.get('group_ids') name = values.get('name') self.assertIsNone(name) self.assertIn(mapping_fixtures.DEVELOPER_GROUP_ID, group_ids) self.assertIn(mapping_fixtures.TESTER_GROUP_ID, group_ids)