keystone/keystone/tests/test_v3_filters.py

437 lines
17 KiB
Python

# Copyright 2012 OpenStack LLC
# Copyright 2013 IBM Corp.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import uuid
from oslo.serialization import jsonutils
from keystone import config
from keystone.policy.backends import rules
from keystone.tests import filtering
from keystone.tests.ksfixtures import temporaryfile
from keystone.tests import test_v3
CONF = config.CONF
class IdentityTestFilteredCase(filtering.FilterTests,
test_v3.RestfulTestCase):
"""Test filter enforcement on the v3 Identity API."""
def setUp(self):
"""Setup for Identity Filter Test Cases."""
super(IdentityTestFilteredCase, self).setUp()
# Initialize the policy engine and allow us to write to a temp
# file in each test to create the policies
self.orig_policy_file = CONF.policy_file
rules.reset()
self.tempfile = self.useFixture(temporaryfile.SecureTempFile())
self.tmpfilename = self.tempfile.file_name
self.config_fixture.config(policy_file=self.tmpfilename)
# drop the policy rules
self.addCleanup(rules.reset)
def load_sample_data(self):
"""Create sample data for these tests.
As well as the usual housekeeping, create a set of domains,
users, roles and projects for the subsequent tests:
- Three domains: A,B & C. C is disabled.
- DomainA has user1, DomainB has user2 and user3
- DomainA has group1 and group2, DomainB has group3
- User1 has a role on DomainA
Remember that there will also be a fourth domain in existence,
the default domain.
"""
# Start by creating a few domains
self._populate_default_domain()
self.domainA = self.new_domain_ref()
self.assignment_api.create_domain(self.domainA['id'], self.domainA)
self.domainB = self.new_domain_ref()
self.assignment_api.create_domain(self.domainB['id'], self.domainB)
self.domainC = self.new_domain_ref()
self.domainC['enabled'] = False
self.assignment_api.create_domain(self.domainC['id'], self.domainC)
# Now create some users, one in domainA and two of them in domainB
self.user1 = self.new_user_ref(domain_id=self.domainA['id'])
password = uuid.uuid4().hex
self.user1['password'] = password
self.user1 = self.identity_api.create_user(self.user1)
self.user1['password'] = password
self.user2 = self.new_user_ref(domain_id=self.domainB['id'])
self.user2['password'] = password
self.user2 = self.identity_api.create_user(self.user2)
self.user2['password'] = password
self.user3 = self.new_user_ref(domain_id=self.domainB['id'])
self.user3['password'] = password
self.user3 = self.identity_api.create_user(self.user3)
self.user3['password'] = password
self.role = self.new_role_ref()
self.assignment_api.create_role(self.role['id'], self.role)
self.assignment_api.create_grant(self.role['id'],
user_id=self.user1['id'],
domain_id=self.domainA['id'])
# A default auth request we can use - un-scoped user token
self.auth = self.build_authentication_request(
user_id=self.user1['id'],
password=self.user1['password'])
def _get_id_list_from_ref_list(self, ref_list):
result_list = []
for x in ref_list:
result_list.append(x['id'])
return result_list
def _set_policy(self, new_policy):
with open(self.tmpfilename, "w") as policyfile:
policyfile.write(jsonutils.dumps(new_policy))
def test_list_users_filtered_by_domain(self):
"""GET /users?domain_id=mydomain (filtered)
Test Plan:
- Update policy so api is unprotected
- Use an un-scoped token to make sure we can filter the
users by domainB, getting back the 2 users in that domain
"""
self._set_policy({"identity:list_users": []})
url_by_name = '/users?domain_id=%s' % self.domainB['id']
r = self.get(url_by_name, auth=self.auth)
# We should get back two users, those in DomainB
id_list = self._get_id_list_from_ref_list(r.result.get('users'))
self.assertIn(self.user2['id'], id_list)
self.assertIn(self.user3['id'], id_list)
def test_list_filtered_domains(self):
"""GET /domains?enabled=0
Test Plan:
- Update policy for no protection on api
- Filter by the 'enabled' boolean to get disabled domains, which
should return just domainC
- Try the filter using different ways of specifying True/False
to test that our handling of booleans in filter matching is
correct
"""
new_policy = {"identity:list_domains": []}
self._set_policy(new_policy)
r = self.get('/domains?enabled=0', auth=self.auth)
id_list = self._get_id_list_from_ref_list(r.result.get('domains'))
self.assertEqual(len(id_list), 1)
self.assertIn(self.domainC['id'], id_list)
# Try a few ways of specifying 'false'
for val in ('0', 'false', 'False', 'FALSE', 'n', 'no', 'off'):
r = self.get('/domains?enabled=%s' % val, auth=self.auth)
id_list = self._get_id_list_from_ref_list(r.result.get('domains'))
self.assertEqual([self.domainC['id']], id_list)
# Now try a few ways of specifying 'true' when we should get back
# the other two domains, plus the default domain
for val in ('1', 'true', 'True', 'TRUE', 'y', 'yes', 'on'):
r = self.get('/domains?enabled=%s' % val, auth=self.auth)
id_list = self._get_id_list_from_ref_list(r.result.get('domains'))
self.assertEqual(len(id_list), 3)
self.assertIn(self.domainA['id'], id_list)
self.assertIn(self.domainB['id'], id_list)
self.assertIn(CONF.identity.default_domain_id, id_list)
r = self.get('/domains?enabled', auth=self.auth)
id_list = self._get_id_list_from_ref_list(r.result.get('domains'))
self.assertEqual(len(id_list), 3)
self.assertIn(self.domainA['id'], id_list)
self.assertIn(self.domainB['id'], id_list)
self.assertIn(CONF.identity.default_domain_id, id_list)
def test_multiple_filters(self):
"""GET /domains?enabled&name=myname
Test Plan:
- Update policy for no protection on api
- Filter by the 'enabled' boolean and name - this should
return a single domain
"""
new_policy = {"identity:list_domains": []}
self._set_policy(new_policy)
my_url = '/domains?enableds&name=%s' % self.domainA['name']
r = self.get(my_url, auth=self.auth)
id_list = self._get_id_list_from_ref_list(r.result.get('domains'))
self.assertEqual(len(id_list), 1)
self.assertIn(self.domainA['id'], id_list)
def test_list_users_filtered_by_funny_name(self):
"""GET /users?name=%myname%
Test Plan:
- Update policy so api is unprotected
- Update a user with name that has filter escape characters
- Ensure we can filter on it
"""
self._set_policy({"identity:list_users": []})
user = self.user1
user['name'] = '%my%name%'
self.identity_api.update_user(user['id'], user)
url_by_name = '/users?name=%my%name%'
r = self.get(url_by_name, auth=self.auth)
self.assertEqual(len(r.result.get('users')), 1)
self.assertEqual(r.result.get('users')[0]['id'], user['id'])
def test_inexact_filters(self):
# Create 20 users
user_list = self._create_test_data('user', 20)
# Set up some names that we can filter on
user = user_list[5]
user['name'] = 'The'
self.identity_api.update_user(user['id'], user)
user = user_list[6]
user['name'] = 'The Ministry'
self.identity_api.update_user(user['id'], user)
user = user_list[7]
user['name'] = 'The Ministry of'
self.identity_api.update_user(user['id'], user)
user = user_list[8]
user['name'] = 'The Ministry of Silly'
self.identity_api.update_user(user['id'], user)
user = user_list[9]
user['name'] = 'The Ministry of Silly Walks'
self.identity_api.update_user(user['id'], user)
# ...and one for useful case insensitivity testing
user = user_list[10]
user['name'] = 'the ministry of silly walks OF'
self.identity_api.update_user(user['id'], user)
self._set_policy({"identity:list_users": []})
url_by_name = '/users?name__contains=Ministry'
r = self.get(url_by_name, auth=self.auth)
self.assertEqual(len(r.result.get('users')), 4)
self._match_with_list(r.result.get('users'), user_list,
list_start=6, list_end=10)
url_by_name = '/users?name__icontains=miNIstry'
r = self.get(url_by_name, auth=self.auth)
self.assertEqual(len(r.result.get('users')), 5)
self._match_with_list(r.result.get('users'), user_list,
list_start=6, list_end=11)
url_by_name = '/users?name__startswith=The'
r = self.get(url_by_name, auth=self.auth)
self.assertEqual(len(r.result.get('users')), 5)
self._match_with_list(r.result.get('users'), user_list,
list_start=5, list_end=10)
url_by_name = '/users?name__istartswith=the'
r = self.get(url_by_name, auth=self.auth)
self.assertEqual(len(r.result.get('users')), 6)
self._match_with_list(r.result.get('users'), user_list,
list_start=5, list_end=11)
url_by_name = '/users?name__endswith=of'
r = self.get(url_by_name, auth=self.auth)
self.assertEqual(len(r.result.get('users')), 1)
self.assertEqual(r.result.get('users')[0]['id'], user_list[7]['id'])
url_by_name = '/users?name__iendswith=OF'
r = self.get(url_by_name, auth=self.auth)
self.assertEqual(len(r.result.get('users')), 2)
self.assertEqual(r.result.get('users')[0]['id'], user_list[7]['id'])
self.assertEqual(r.result.get('users')[1]['id'], user_list[10]['id'])
self._delete_test_data('user', user_list)
def test_filter_sql_injection_attack(self):
"""GET /users?name=<injected sql_statement>
Test Plan:
- Attempt to get all entities back by passing a two-term attribute
- Attempt to piggyback filter to damage DB (e.g. drop table)
"""
self._set_policy({"identity:list_users": [],
"identity:list_groups": [],
"identity:create_group": []})
url_by_name = "/users?name=anything' or 'x'='x"
r = self.get(url_by_name, auth=self.auth)
self.assertEqual(len(r.result.get('users')), 0)
# See if we can add a SQL command...use the group table instead of the
# user table since 'user' is reserved word for SQLAlchemy.
group = self.new_group_ref(domain_id=self.domainB['id'])
group = self.identity_api.create_group(group)
url_by_name = "/users?name=x'; drop table group"
r = self.get(url_by_name, auth=self.auth)
# Check group table is still there...
url_by_name = "/groups"
r = self.get(url_by_name, auth=self.auth)
self.assertTrue(len(r.result.get('groups')) > 0)
class IdentityTestListLimitCase(IdentityTestFilteredCase):
"""Test list limiting enforcement on the v3 Identity API."""
content_type = 'json'
def setUp(self):
"""Setup for Identity Limit Test Cases."""
super(IdentityTestListLimitCase, self).setUp()
self._set_policy({"identity:list_users": [],
"identity:list_groups": [],
"identity:list_projects": [],
"identity:list_services": [],
"identity:list_policies": []})
# Create 10 entries for each of the entities we are going to test
self.ENTITY_TYPES = ['user', 'group', 'project']
self.entity_lists = {}
for entity in self.ENTITY_TYPES:
self.entity_lists[entity] = self._create_test_data(entity, 10)
# Make sure we clean up when finished
self.addCleanup(self.clean_up_entity, entity)
self.service_list = []
self.addCleanup(self.clean_up_service)
for _ in range(10):
new_entity = {'id': uuid.uuid4().hex, 'type': uuid.uuid4().hex}
service = self.catalog_api.create_service(new_entity['id'],
new_entity)
self.service_list.append(service)
self.policy_list = []
self.addCleanup(self.clean_up_policy)
for _ in range(10):
new_entity = {'id': uuid.uuid4().hex, 'type': uuid.uuid4().hex,
'blob': uuid.uuid4().hex}
policy = self.policy_api.create_policy(new_entity['id'],
new_entity)
self.policy_list.append(policy)
def clean_up_entity(self, entity):
"""Clean up entity test data from Identity Limit Test Cases."""
self._delete_test_data(entity, self.entity_lists[entity])
def clean_up_service(self):
"""Clean up service test data from Identity Limit Test Cases."""
for service in self.service_list:
self.catalog_api.delete_service(service['id'])
def clean_up_policy(self):
"""Clean up policy test data from Identity Limit Test Cases."""
for policy in self.policy_list:
self.policy_api.delete_policy(policy['id'])
def _test_entity_list_limit(self, entity, driver):
"""GET /<entities> (limited)
Test Plan:
- For the specified type of entity:
- Update policy for no protection on api
- Add a bunch of entities
- Set the global list limit to 5, and check that getting all
- entities only returns 5
- Set the driver list_limit to 4, and check that now only 4 are
- returned
"""
if entity == 'policy':
plural = 'policies'
else:
plural = '%ss' % entity
self.config_fixture.config(list_limit=5)
self.config_fixture.config(group=driver, list_limit=None)
r = self.get('/%s' % plural, auth=self.auth)
self.assertEqual(len(r.result.get(plural)), 5)
self.assertIs(r.result.get('truncated'), True)
self.config_fixture.config(group=driver, list_limit=4)
r = self.get('/%s' % plural, auth=self.auth)
self.assertEqual(len(r.result.get(plural)), 4)
self.assertIs(r.result.get('truncated'), True)
def test_users_list_limit(self):
self._test_entity_list_limit('user', 'identity')
def test_groups_list_limit(self):
self._test_entity_list_limit('group', 'identity')
def test_projects_list_limit(self):
self._test_entity_list_limit('project', 'assignment')
def test_services_list_limit(self):
self._test_entity_list_limit('service', 'catalog')
def test_non_driver_list_limit(self):
"""Check list can be limited without driver level support.
Policy limiting is not done at the driver level (since it
really isn't worth doing it there). So use this as a test
for ensuring the controller level will successfully limit
in this case.
"""
self._test_entity_list_limit('policy', 'policy')
def test_no_limit(self):
"""Check truncated attribute not set when list not limited."""
r = self.get('/services', auth=self.auth)
self.assertEqual(len(r.result.get('services')), 10)
self.assertIsNone(r.result.get('truncated'))
def test_at_limit(self):
"""Check truncated attribute not set when list at max size."""
# Test this by overriding the general limit with a higher
# driver-specific limit (allowing all entities to be returned
# in the collection), which should result in a non truncated list
self.config_fixture.config(list_limit=5)
self.config_fixture.config(group='catalog', list_limit=10)
r = self.get('/services', auth=self.auth)
self.assertEqual(len(r.result.get('services')), 10)
self.assertIsNone(r.result.get('truncated'))