From 5d0853567f54282662d099c39bc39d1cac3ffb60 Mon Sep 17 00:00:00 2001 From: ZhiQiang Fan Date: Sat, 24 May 2014 01:35:25 +0800 Subject: [PATCH] Fix project authorization check Currently, project authorization check in _verify_query_segregation only checks query field 'project_id', so normal user can pass the check with query field 'project'. And the check also checks query operator, which seems not right, because user will get 401 when project is his own but with wrong operator, 400 wrong operator can be more precise. This patch adds project field check and remove the operator check which can be done in _validate_query. Change-Id: I82439e4c02afd04d26ab5d419ef67bde1f4aa1ca Closes-Bug: #1322111 --- ceilometer/api/controllers/v2.py | 12 +++-- .../tests/api/v2/test_alarm_scenarios.py | 49 +++++++++++++++++++ 2 files changed, 56 insertions(+), 5 deletions(-) diff --git a/ceilometer/api/controllers/v2.py b/ceilometer/api/controllers/v2.py index dbe870825a..8eca01ef82 100644 --- a/ceilometer/api/controllers/v2.py +++ b/ceilometer/api/controllers/v2.py @@ -325,11 +325,13 @@ def _verify_query_segregation(query, auth_project=None): '''Ensure non-admin queries are not constrained to another project.''' auth_project = (auth_project or acl.get_limited_to_project(pecan.request.headers)) - if auth_project: - for q in query: - if q.field == 'project_id' and (auth_project != q.value or - q.op != 'eq'): - raise ProjectNotAuthorized(q.value) + + if not auth_project: + return + + for q in query: + if q.field in ('project', 'project_id') and auth_project != q.value: + raise ProjectNotAuthorized(q.value) def _validate_query(query, db_func, internal_keys=None, diff --git a/ceilometer/tests/api/v2/test_alarm_scenarios.py b/ceilometer/tests/api/v2/test_alarm_scenarios.py index aa3481edf6..e7d0cba1f1 100644 --- a/ceilometer/tests/api/v2/test_alarm_scenarios.py +++ b/ceilometer/tests/api/v2/test_alarm_scenarios.py @@ -266,6 +266,55 @@ class TestAlarms(FunctionalTest, self.assertEqual(alarms[0]['alarm_id'], one['alarm_id']) self.assertEqual(alarms[0]['repeat_actions'], one['repeat_actions']) + def test_get_alarm_project_filter_wrong_op_normal_user(self): + project = self.auth_headers['X-Project-Id'] + + def _test(field, op): + response = self.get_json('/alarms', + q=[{'field': field, + 'op': op, + 'value': project}], + expect_errors=True, + status=400, + headers=self.auth_headers) + faultstring = ('Invalid input for field/attribute op. ' + 'Value: \'%(op)s\'. unimplemented operator ' + 'for %(field)s' % {'field': field, 'op': op}) + self.assertEqual(faultstring, + response.json['error_message']['faultstring']) + + _test('project', 'ne') + _test('project_id', 'ne') + + def test_get_alarm_project_filter_normal_user(self): + project = self.auth_headers['X-Project-Id'] + + def _test(field): + alarms = self.get_json('/alarms', + q=[{'field': field, + 'op': 'eq', + 'value': project}]) + self.assertEqual(4, len(alarms)) + + _test('project') + _test('project_id') + + def test_get_alarm_other_project_normal_user(self): + def _test(field): + response = self.get_json('/alarms', + q=[{'field': field, + 'op': 'eq', + 'value': 'other-project'}], + expect_errors=True, + status=401, + headers=self.auth_headers) + faultstring = 'Not Authorized to access project other-project' + self.assertEqual(faultstring, + response.json['error_message']['faultstring']) + + _test('project') + _test('project_id') + def test_post_alarm_wsme_workaround(self): jsons = { 'type': {