Fix project authorization check

Currently, project authorization check in _verify_query_segregation
only checks query field 'project_id', so normal user can pass the check
with query field 'project'. And the check also checks query operator, which
seems not right, because user will get 401 when project is his own but with
wrong operator, 400 wrong operator can be more precise.

This patch adds project field check and remove the operator check which can
be done in _validate_query.

Change-Id: I82439e4c02afd04d26ab5d419ef67bde1f4aa1ca
Closes-Bug: #1322111
This commit is contained in:
ZhiQiang Fan 2014-05-24 01:35:25 +08:00
parent 4bfa2f4a7d
commit 5d0853567f
2 changed files with 56 additions and 5 deletions

View File

@ -325,11 +325,13 @@ def _verify_query_segregation(query, auth_project=None):
'''Ensure non-admin queries are not constrained to another project.'''
auth_project = (auth_project or
acl.get_limited_to_project(pecan.request.headers))
if auth_project:
for q in query:
if q.field == 'project_id' and (auth_project != q.value or
q.op != 'eq'):
raise ProjectNotAuthorized(q.value)
if not auth_project:
return
for q in query:
if q.field in ('project', 'project_id') and auth_project != q.value:
raise ProjectNotAuthorized(q.value)
def _validate_query(query, db_func, internal_keys=None,

View File

@ -266,6 +266,55 @@ class TestAlarms(FunctionalTest,
self.assertEqual(alarms[0]['alarm_id'], one['alarm_id'])
self.assertEqual(alarms[0]['repeat_actions'], one['repeat_actions'])
def test_get_alarm_project_filter_wrong_op_normal_user(self):
project = self.auth_headers['X-Project-Id']
def _test(field, op):
response = self.get_json('/alarms',
q=[{'field': field,
'op': op,
'value': project}],
expect_errors=True,
status=400,
headers=self.auth_headers)
faultstring = ('Invalid input for field/attribute op. '
'Value: \'%(op)s\'. unimplemented operator '
'for %(field)s' % {'field': field, 'op': op})
self.assertEqual(faultstring,
response.json['error_message']['faultstring'])
_test('project', 'ne')
_test('project_id', 'ne')
def test_get_alarm_project_filter_normal_user(self):
project = self.auth_headers['X-Project-Id']
def _test(field):
alarms = self.get_json('/alarms',
q=[{'field': field,
'op': 'eq',
'value': project}])
self.assertEqual(4, len(alarms))
_test('project')
_test('project_id')
def test_get_alarm_other_project_normal_user(self):
def _test(field):
response = self.get_json('/alarms',
q=[{'field': field,
'op': 'eq',
'value': 'other-project'}],
expect_errors=True,
status=401,
headers=self.auth_headers)
faultstring = 'Not Authorized to access project other-project'
self.assertEqual(faultstring,
response.json['error_message']['faultstring'])
_test('project')
_test('project_id')
def test_post_alarm_wsme_workaround(self):
jsons = {
'type': {