Restrict test result metadata keys

Certain keys like 'user' should not be exposed to the general
public. This patch adds conditional checks to determine whether
a user should have access to all metadata keys or a limited set of
the metadata keys.

Change-Id: Ie8109748dcd42a6add840eba43fd7af3f592e00c
This commit is contained in:
Paul Van Eck 2016-07-28 15:48:57 -07:00
parent fcac21f1bf
commit 90d4ec5199
5 changed files with 140 additions and 33 deletions

View File

@ -53,6 +53,7 @@ SHARED_TEST_RUN = 'shared'
# Roles
ROLE_USER = 'user'
ROLE_OWNER = 'owner'
ROLE_FOUNDATION = 'foundation'
# Organization types.
# OpenStack Foundation

View File

@ -14,11 +14,13 @@
# under the License.
"""Test results controller."""
import functools
from oslo_config import cfg
from oslo_log import log
import pecan
from pecan import rest
import six
from six.moves.urllib import parse
from refstack import db
@ -32,23 +34,44 @@ LOG = log.getLogger(__name__)
CONF = cfg.CONF
@api_utils.check_permissions(level=const.ROLE_USER)
class MetadataController(rest.RestController):
"""/v1/results/<test_id>/meta handler."""
rw_access_keys = ('shared', 'guideline', 'target',)
def _check_key(func):
"""Decorator to check that a specific key has write access."""
@functools.wraps(func)
def wrapper(*args, **kwargs):
meta_key = args[2]
if meta_key not in args[0].rw_access_keys:
pecan.abort(403)
return func(*args, **kwargs)
return wrapper
@pecan.expose('json')
def get(self, test_id):
"""Get test run metadata."""
test_info = db.get_test(test_id)
return test_info['meta']
role = api_utils.get_user_role(test_id)
if role in (const.ROLE_FOUNDATION, const.ROLE_OWNER):
return test_info['meta']
elif role in (const.ROLE_USER):
return {k: v for k, v in six.iteritems(test_info['meta'])
if k in self.rw_access_keys}
pecan.abort(403)
@pecan.expose('json')
def get_one(self, test_id, key):
"""Get value for key from test run metadata."""
return db.get_test_meta_key(test_id, key)
role = api_utils.get_user_role(test_id)
if role in (const.ROLE_FOUNDATION, const.ROLE_OWNER):
return db.get_test_meta_key(test_id, key)
elif role in (const.ROLE_USER) and key in self.rw_access_keys:
return db.get_test_meta_key(test_id, key)
pecan.abort(403)
@_check_key
@api_utils.check_permissions(level=const.ROLE_OWNER)
@pecan.expose('json')
def post(self, test_id, key):
@ -56,6 +79,7 @@ class MetadataController(rest.RestController):
db.save_test_meta_item(test_id, key, pecan.request.body)
pecan.response.status = 201
@_check_key
@api_utils.check_permissions(level=const.ROLE_OWNER)
@pecan.expose('json')
def delete(self, test_id, key):
@ -75,7 +99,8 @@ class ResultsController(validation.BaseRestControllerWithValidation):
@api_utils.check_permissions(level=const.ROLE_USER)
def get_one(self, test_id):
"""Handler for getting item."""
if api_utils.get_user_role(test_id) == const.ROLE_OWNER:
user_role = api_utils.get_user_role(test_id)
if user_role in (const.ROLE_FOUNDATION, const.ROLE_OWNER):
test_info = db.get_test(
test_id, allowed_keys=['id', 'cpid', 'created_at',
'duration_seconds', 'meta']
@ -85,7 +110,13 @@ class ResultsController(validation.BaseRestControllerWithValidation):
test_list = db.get_test_results(test_id)
test_name_list = [test_dict['name'] for test_dict in test_list]
test_info.update({'results': test_name_list,
'user_role': api_utils.get_user_role(test_id)})
'user_role': user_role})
if user_role not in (const.ROLE_FOUNDATION, const.ROLE_OWNER):
test_info['meta'] = {
k: v for k, v in six.iteritems(test_info['meta'])
if k in MetadataController.rw_access_keys
}
return test_info
def store_item(self, test):
@ -143,6 +174,15 @@ class ResultsController(validation.BaseRestControllerWithValidation):
results = db.get_test_records(page_number, per_page, filters)
for result in results:
# Only show all metadata if the user is the owner or a member
# of the Foundation group.
if (not api_utils.check_owner(result['id']) and
not api_utils.check_user_is_foundation_admin()):
result['meta'] = {
k: v for k, v in six.iteritems(result['meta'])
if k in MetadataController.rw_access_keys
}
result.update({'url': parse.urljoin(
CONF.ui_url, CONF.api.test_results_url
) % result['id']})

View File

@ -207,29 +207,34 @@ def enforce_permissions(test_id, level):
role = get_user_role(test_id)
if not role:
pecan.abort(401)
if level == const.ROLE_USER:
if role in (const.ROLE_OWNER, const.ROLE_USER):
if role in (const.ROLE_OWNER, const.ROLE_USER, const.ROLE_FOUNDATION):
return
pecan.abort(403)
if level == const.ROLE_OWNER:
if get_user_role(test_id) in (const.ROLE_OWNER,):
elif level == const.ROLE_OWNER:
if role in (const.ROLE_OWNER, const.ROLE_FOUNDATION):
return
pecan.abort(403)
elif level == const.ROLE_FOUNDATION:
if role in (const.ROLE_FOUNDATION):
return
else:
raise ValueError('Permission level %s is undefined'
'' % level)
raise ValueError('Permission level %s is undefined' % level)
def get_user_role(test_id):
"""Return user role for current user and specified test run."""
if _check_owner(test_id):
if check_user_is_foundation_admin():
return const.ROLE_FOUNDATION
if check_owner(test_id):
return const.ROLE_OWNER
if _check_user(test_id):
if check_user(test_id):
return const.ROLE_USER
return
def _check_user(test_id):
def check_user(test_id):
"""Check that user has access to shared test run."""
test_owner = db.get_test_meta_key(test_id, const.USER)
if not test_owner:
@ -237,10 +242,10 @@ def _check_user(test_id):
elif db.get_test_meta_key(test_id, const.SHARED_TEST_RUN):
return True
else:
return _check_owner(test_id)
return check_owner(test_id)
def _check_owner(test_id):
def check_owner(test_id):
"""Check that user has access to specified test run as owner."""
if not is_authenticated():
return False

View File

@ -83,25 +83,37 @@ class ResultsControllerTestCase(BaseControllerTestCase):
@mock.patch('refstack.db.get_test')
@mock.patch('refstack.db.get_test_results')
def test_get(self, mock_get_test_res, mock_get_test):
self.mock_get_user_role.return_value = const.ROLE_USER
self.mock_get_user_role.return_value = const.ROLE_FOUNDATION
test_info = {'created_at': 'bar',
'duration_seconds': 999}
'duration_seconds': 999,
'meta': {'shared': 'true', 'user': 'fake-user'}}
mock_get_test.return_value = test_info
mock_get_test_res.return_value = [{'name': 'test1'},
{'name': 'test2'}]
actual_result = self.controller.get_one('fake_arg')
# All meta should be exposed when user is a Foundation admin.
expected_result = {
'created_at': 'bar',
'duration_seconds': 999,
'results': ['test1', 'test2'],
'user_role': const.ROLE_USER
'user_role': const.ROLE_FOUNDATION,
'meta': {'shared': 'true', 'user': 'fake-user'}
}
self.assertEqual(actual_result, expected_result)
self.assertEqual(expected_result, actual_result)
mock_get_test_res.assert_called_once_with('fake_arg')
mock_get_test.assert_called_once_with('fake_arg')
# If not owner or Foundation admin, don't show all metadata.
self.mock_get_user_role.return_value = const.ROLE_USER
mock_get_test.return_value = test_info
mock_get_test_res.return_value = [{'name': 'test1'},
{'name': 'test2'}]
actual_result = self.controller.get_one('fake_arg')
expected_result['meta'] = {'shared': 'true'}
expected_result['user_role'] = const.ROLE_USER
self.assertEqual(expected_result, actual_result)
@mock.patch('refstack.db.get_test')
@mock.patch('refstack.db.get_test_results')
@ -217,6 +229,8 @@ class ResultsControllerTestCase(BaseControllerTestCase):
self.assertRaises(webob.exc.HTTPError,
self.controller.get)
@mock.patch('refstack.api.utils.check_owner')
@mock.patch('refstack.api.utils.check_user_is_foundation_admin')
@mock.patch('refstack.db.get_test_records')
@mock.patch('refstack.db.get_test_records_count')
@mock.patch('refstack.api.utils.get_page_number')
@ -225,7 +239,9 @@ class ResultsControllerTestCase(BaseControllerTestCase):
parse_input,
get_page,
get_test_count,
db_get_test):
db_get_test,
check_foundation,
check_owner):
expected_input_params = [
const.START_DATE,
@ -239,6 +255,8 @@ class ResultsControllerTestCase(BaseControllerTestCase):
records_count = 50
get_test_count.return_value = records_count
get_page.return_value = (page_number, total_pages_number)
check_foundation.return_value = False
check_owner.return_value = True
self.CONF.set_override('results_per_page',
per_page,
'api')
@ -257,7 +275,7 @@ class ResultsControllerTestCase(BaseControllerTestCase):
}
actual_result = self.controller.get()
self.assertEqual(actual_result, expected_result)
self.assertEqual(expected_result, actual_result)
parse_input.assert_called_once_with(expected_input_params)
@ -578,36 +596,73 @@ class MetadataControllerTestCase(BaseControllerTestCase):
@mock.patch('refstack.db.get_test')
def test_get(self, mock_db_get_test):
self.mock_get_user_role.return_value = const.ROLE_USER
mock_db_get_test.return_value = {'meta': 'fake_meta'}
self.assertEqual('fake_meta', self.controller.get('test_id'))
mock_db_get_test.return_value = {'meta': {'shared': 'true',
'user': 'fake-user'}}
# Only the key 'shared' should be allowed through.
self.assertEqual({'shared': 'true'}, self.controller.get('test_id'))
mock_db_get_test.assert_called_once_with('test_id')
# Test that the result owner can see all metadata keys.
self.mock_get_user_role.return_value = const.ROLE_OWNER
self.assertEqual({'shared': 'true', 'user': 'fake-user'},
self.controller.get('test_id'))
# Test that a Foundation admin can see all metadata keys.
self.mock_get_user_role.return_value = const.ROLE_FOUNDATION
self.assertEqual({'shared': 'true', 'user': 'fake-user'},
self.controller.get('test_id'))
@mock.patch('refstack.db.get_test_meta_key')
def test_get_one(self, mock_db_get_test_meta_key):
self.mock_get_user_role.return_value = const.ROLE_USER
# Test when key is not an allowed key.
self.assertRaises(webob.exc.HTTPError,
self.controller.get_one, 'test_id', 'answer')
# Test when key is an allowed key.
mock_db_get_test_meta_key.return_value = 42
self.assertEqual(42, self.controller.get_one('test_id', 'answer'))
mock_db_get_test_meta_key.assert_called_once_with('test_id', 'answer')
self.assertEqual(42, self.controller.get_one('test_id', 'shared'))
mock_db_get_test_meta_key.assert_called_once_with('test_id', 'shared')
# Test when the user owns the test result.
self.mock_get_user_role.return_value = const.ROLE_OWNER
self.assertEqual(42, self.controller.get_one('test_id', 'user'))
# Test when the user is a Foundation admin.
self.mock_get_user_role.return_value = const.ROLE_FOUNDATION
self.assertEqual(42, self.controller.get_one('test_id', 'user'))
@mock.patch('refstack.db.save_test_meta_item')
def test_post(self, mock_save_test_meta_item):
self.mock_get_user_role.return_value = const.ROLE_OWNER
self.controller.post('test_id', 'answer')
# Test trying to post a valid key.
self.controller.post('test_id', 'shared')
self.assertEqual(201, self.mock_response.status)
mock_save_test_meta_item.assert_called_once_with(
'test_id', 'answer', self.mock_request.body)
'test_id', 'shared', self.mock_request.body)
# Test trying to post an invalid key.
self.assertRaises(webob.exc.HTTPError,
self.controller.post, 'test_id', 'user')
# Test when not an owner of the result.
self.mock_get_user_role.return_value = const.ROLE_USER
self.mock_abort.side_effect = webob.exc.HTTPError()
self.assertRaises(webob.exc.HTTPError,
self.controller.post, 'test_id', 'answer')
self.controller.post, 'test_id', 'shared')
@mock.patch('refstack.db.delete_test_meta_item')
def test_delete(self, mock_delete_test_meta_item):
self.mock_get_user_role.return_value = const.ROLE_OWNER
self.controller.delete('test_id', 'answer')
self.controller.delete('test_id', 'shared')
self.assertEqual(204, self.mock_response.status)
mock_delete_test_meta_item.assert_called_once_with('test_id', 'answer')
mock_delete_test_meta_item.assert_called_once_with('test_id', 'shared')
# The key 'user' is not a valid key that can be deleted.
self.assertRaises(webob.exc.HTTPError,
self.controller.delete, 'test_id', 'user')
self.mock_get_user_role.return_value = const.ROLE_USER
self.mock_abort.side_effect = webob.exc.HTTPError()

View File

@ -333,6 +333,7 @@ class APIUtilsTestCase(base.BaseTestCase):
mock_get_user.side_effect = mock_db.NotFound('User')
self.assertFalse(api_utils.is_authenticated())
@mock.patch('refstack.api.utils.check_user_is_foundation_admin')
@mock.patch('pecan.abort', side_effect=exc.HTTPError)
@mock.patch('refstack.db.get_test_meta_key')
@mock.patch.object(api_utils, 'is_authenticated')
@ -340,8 +341,10 @@ class APIUtilsTestCase(base.BaseTestCase):
def test_check_get_user_role(self, mock_get_user_id,
mock_is_authenticated,
mock_get_test_meta_key,
mock_pecan_abort):
mock_pecan_abort,
mock_check_foundation):
# Check user level
mock_check_foundation.return_value = False
mock_get_test_meta_key.return_value = None
self.assertEqual(const.ROLE_USER, api_utils.get_user_role('fake_test'))
api_utils.enforce_permissions('fake_test', const.ROLE_USER)
@ -403,6 +406,7 @@ class APIUtilsTestCase(base.BaseTestCase):
self.assertRaises(exc.HTTPError, api_utils.enforce_permissions,
'fake_test', const.ROLE_OWNER)
@mock.patch('refstack.api.utils.check_user_is_foundation_admin')
@mock.patch('pecan.abort', side_effect=exc.HTTPError)
@mock.patch('refstack.db.get_test_meta_key')
@mock.patch.object(api_utils, 'is_authenticated')
@ -410,7 +414,8 @@ class APIUtilsTestCase(base.BaseTestCase):
def test_check_permissions(self, mock_get_user_id,
mock_is_authenticated,
mock_get_test_meta_key,
mock_pecan_abort):
mock_pecan_abort,
mock_foundation_check):
@api_utils.check_permissions(level=const.ROLE_USER)
class ControllerWithPermissions(rest.RestController):
@ -439,6 +444,7 @@ class APIUtilsTestCase(base.BaseTestCase):
}.get(args)
mock_is_authenticated.return_value = True
mock_foundation_check.return_value = False
self.assertEqual(public_test, fake_controller.get(public_test))
self.assertRaises(exc.HTTPError, fake_controller.delete, public_test)
self.assertEqual(private_test, fake_controller.get(private_test))