Merge "Fix project authorization check"

This commit is contained in:
Jenkins 2014-06-11 06:44:43 +00:00 committed by Gerrit Code Review
commit 1c4f604f96
2 changed files with 56 additions and 5 deletions

View File

@ -324,11 +324,13 @@ def _verify_query_segregation(query, auth_project=None):
'''Ensure non-admin queries are not constrained to another project.'''
auth_project = (auth_project or
acl.get_limited_to_project(pecan.request.headers))
if auth_project:
for q in query:
if q.field == 'project_id' and (auth_project != q.value or
q.op != 'eq'):
raise ProjectNotAuthorized(q.value)
if not auth_project:
return
for q in query:
if q.field in ('project', 'project_id') and auth_project != q.value:
raise ProjectNotAuthorized(q.value)
def _validate_query(query, db_func, internal_keys=None,

View File

@ -265,6 +265,55 @@ class TestAlarms(FunctionalTest,
self.assertEqual(alarms[0]['alarm_id'], one['alarm_id'])
self.assertEqual(alarms[0]['repeat_actions'], one['repeat_actions'])
def test_get_alarm_project_filter_wrong_op_normal_user(self):
project = self.auth_headers['X-Project-Id']
def _test(field, op):
response = self.get_json('/alarms',
q=[{'field': field,
'op': op,
'value': project}],
expect_errors=True,
status=400,
headers=self.auth_headers)
faultstring = ('Invalid input for field/attribute op. '
'Value: \'%(op)s\'. unimplemented operator '
'for %(field)s' % {'field': field, 'op': op})
self.assertEqual(faultstring,
response.json['error_message']['faultstring'])
_test('project', 'ne')
_test('project_id', 'ne')
def test_get_alarm_project_filter_normal_user(self):
project = self.auth_headers['X-Project-Id']
def _test(field):
alarms = self.get_json('/alarms',
q=[{'field': field,
'op': 'eq',
'value': project}])
self.assertEqual(4, len(alarms))
_test('project')
_test('project_id')
def test_get_alarm_other_project_normal_user(self):
def _test(field):
response = self.get_json('/alarms',
q=[{'field': field,
'op': 'eq',
'value': 'other-project'}],
expect_errors=True,
status=401,
headers=self.auth_headers)
faultstring = 'Not Authorized to access project other-project'
self.assertEqual(faultstring,
response.json['error_message']['faultstring'])
_test('project')
_test('project_id')
def test_post_alarm_wsme_workaround(self):
jsons = {
'type': {