From f7cb4943c73c67421f0b6adaa734df321b6c9663 Mon Sep 17 00:00:00 2001 From: Julien Danjou Date: Fri, 20 Sep 2013 11:48:13 +0200 Subject: [PATCH] api: allow alarm creation for others project by admins Change-Id: I8da7e81da799b5cbd6b1ce158ac10e12567aa3d7 Fixes-Bug: #1226470 --- ceilometer/api/acl.py | 23 +++- ceilometer/api/controllers/v2.py | 12 +- tests/api/v2/test_alarm_scenarios.py | 187 +++++++++++++++++++++++++++ 3 files changed, 215 insertions(+), 7 deletions(-) diff --git a/ceilometer/api/acl.py b/ceilometer/api/acl.py index 2021426e..7d8f077a 100644 --- a/ceilometer/api/acl.py +++ b/ceilometer/api/acl.py @@ -44,12 +44,29 @@ def install(app, conf): conf=dict(conf.get(OPT_GROUP_NAME))) -def get_limited_to_project(headers): - """Return the tenant the request should be limited to.""" +def get_limited_to(headers): + """Return the user and project the request should be limited to. + + :param headers: HTTP headers dictionary + :return: A tuple of (user, project), set to None if there's no limit on + one of these. + + """ global _ENFORCER if not _ENFORCER: _ENFORCER = policy.Enforcer() if not _ENFORCER.enforce('context_is_admin', {}, {'roles': headers.get('X-Roles', "").split(",")}): - return headers.get('X-Project-Id') + return headers.get('X-User-Id'), headers.get('X-Project-Id') + return None, None + + +def get_limited_to_project(headers): + """Return the project the request should be limited to. + + :param headers: HTTP headers dictionary + :return: A project, or None if there's no limit on it. + + """ + return get_limited_to(headers)[1] diff --git a/ceilometer/api/controllers/v2.py b/ceilometer/api/controllers/v2.py index 9bd880da..a598382c 100644 --- a/ceilometer/api/controllers/v2.py +++ b/ceilometer/api/controllers/v2.py @@ -1272,8 +1272,9 @@ class AlarmController(rest.RestController): now = timeutils.utcnow() data.alarm_id = self._id - data.user_id = alarm_in.user_id - data.project_id = alarm_in.project_id + user, project = acl.get_limited_to(pecan.request.headers) + data.user_id = user or data.user_id or alarm_in.user_id + data.project_id = project or data.project_id or alarm_in.project_id data.timestamp = now if alarm_in.state != data.state: data.state_timestamp = now @@ -1391,8 +1392,11 @@ class AlarmsController(rest.RestController): now = timeutils.utcnow() data.alarm_id = str(uuid.uuid4()) - data.user_id = pecan.request.headers.get('X-User-Id') - data.project_id = pecan.request.headers.get('X-Project-Id') + user, project = acl.get_limited_to(pecan.request.headers) + data.user_id = (user or data.user_id or + pecan.request.headers.get('X-User-Id')) + data.project_id = (project or data.project_id or + pecan.request.headers.get('X-Project-Id')) data.timestamp = now data.state_timestamp = now diff --git a/tests/api/v2/test_alarm_scenarios.py b/tests/api/v2/test_alarm_scenarios.py index be42641b..abd8c3bc 100644 --- a/tests/api/v2/test_alarm_scenarios.py +++ b/tests/api/v2/test_alarm_scenarios.py @@ -455,6 +455,141 @@ class TestAlarms(FunctionalTest, else: self.fail("Alarm not found") + def test_post_alarm_as_admin(self): + """Test the creation of an alarm as admin for another project.""" + json = { + 'enabled': False, + 'name': 'added_alarm', + 'state': 'ok', + 'type': 'threshold', + 'user_id': 'auseridthatisnotmine', + 'project_id': 'aprojectidthatisnotmine', + 'threshold_rule': { + 'meter_name': 'ameter', + 'query': [{'field': 'metadata.field', + 'op': 'eq', + 'value': '5', + 'type': 'string'}, + {'field': 'project_id', 'op': 'eq', + 'value': 'aprojectidthatisnotmine'}], + 'comparison_operator': 'le', + 'statistic': 'count', + 'threshold': 50, + 'evaluation_periods': 3, + 'period': 180, + } + } + headers = {} + headers.update(self.auth_headers) + headers['X-Roles'] = 'admin' + self.post_json('/alarms', params=json, status=201, + headers=headers) + alarms = list(self.conn.get_alarms(enabled=False)) + self.assertEqual(1, len(alarms)) + self.assertEqual(alarms[0].user_id, 'auseridthatisnotmine') + self.assertEqual(alarms[0].project_id, 'aprojectidthatisnotmine') + if alarms[0].name == 'added_alarm': + for key in json: + if key.endswith('_rule'): + storage_key = 'rule' + else: + storage_key = key + self.assertEqual(getattr(alarms[0], storage_key), + json[key]) + else: + self.fail("Alarm not found") + + def test_post_alarm_as_admin_no_user(self): + """Test the creation of an alarm as admin for another project but + forgetting to set the values. + """ + json = { + 'enabled': False, + 'name': 'added_alarm', + 'state': 'ok', + 'type': 'threshold', + 'project_id': 'aprojectidthatisnotmine', + 'threshold_rule': { + 'meter_name': 'ameter', + 'query': [{'field': 'metadata.field', + 'op': 'eq', + 'value': '5', + 'type': 'string'}, + {'field': 'project_id', 'op': 'eq', + 'value': 'aprojectidthatisnotmine'}], + 'comparison_operator': 'le', + 'statistic': 'count', + 'threshold': 50, + 'evaluation_periods': 3, + 'period': 180, + } + } + headers = {} + headers.update(self.auth_headers) + headers['X-Roles'] = 'admin' + self.post_json('/alarms', params=json, status=201, + headers=headers) + alarms = list(self.conn.get_alarms(enabled=False)) + self.assertEqual(1, len(alarms)) + self.assertEqual(alarms[0].user_id, self.auth_headers['X-User-Id']) + self.assertEqual(alarms[0].project_id, 'aprojectidthatisnotmine') + if alarms[0].name == 'added_alarm': + for key in json: + if key.endswith('_rule'): + storage_key = 'rule' + else: + storage_key = key + self.assertEqual(getattr(alarms[0], storage_key), + json[key]) + else: + self.fail("Alarm not found") + + def test_post_alarm_as_admin_no_project(self): + """Test the creation of an alarm as admin for another project but + forgetting to set the values. + """ + json = { + 'enabled': False, + 'name': 'added_alarm', + 'state': 'ok', + 'type': 'threshold', + 'user_id': 'auseridthatisnotmine', + 'threshold_rule': { + 'meter_name': 'ameter', + 'query': [{'field': 'metadata.field', + 'op': 'eq', + 'value': '5', + 'type': 'string'}, + {'field': 'project_id', 'op': 'eq', + 'value': 'aprojectidthatisnotmine'}], + 'comparison_operator': 'le', + 'statistic': 'count', + 'threshold': 50, + 'evaluation_periods': 3, + 'period': 180, + } + } + headers = {} + headers.update(self.auth_headers) + headers['X-Roles'] = 'admin' + self.post_json('/alarms', params=json, status=201, + headers=headers) + alarms = list(self.conn.get_alarms(enabled=False)) + self.assertEqual(1, len(alarms)) + self.assertEqual(alarms[0].user_id, 'auseridthatisnotmine') + self.assertEqual(alarms[0].project_id, + self.auth_headers['X-Project-Id']) + if alarms[0].name == 'added_alarm': + for key in json: + if key.endswith('_rule'): + storage_key = 'rule' + else: + storage_key = key + self.assertEqual(getattr(alarms[0], storage_key), + json[key]) + else: + self.fail("Alarm not found") + def test_post_alarm_combination(self): json = { 'enabled': False, @@ -551,6 +686,58 @@ class TestAlarms(FunctionalTest, storage_key = key self.assertEqual(getattr(alarm, storage_key), json[key]) + def test_put_alarm_as_admin(self): + json = { + 'user_id': 'myuserid', + 'project_id': 'myprojectid', + 'enabled': False, + 'name': 'name_put', + 'state': 'ok', + 'type': 'threshold', + 'ok_actions': ['http://something/ok'], + 'alarm_actions': ['http://something/alarm'], + 'insufficient_data_actions': ['http://something/no'], + 'repeat_actions': True, + 'threshold_rule': { + 'meter_name': 'ameter', + 'query': [{'field': 'metadata.field', + 'op': 'eq', + 'value': '5', + 'type': 'string'}, + {'field': 'project_id', 'op': 'eq', + 'value': 'myprojectid'}], + 'comparison_operator': 'le', + 'statistic': 'count', + 'threshold': 50, + 'evaluation_periods': 3, + 'period': 180, + } + } + headers = {} + headers.update(self.auth_headers) + headers['X-Roles'] = 'admin' + + data = self.get_json('/alarms', + headers=headers, + q=[{'field': 'name', + 'value': 'name1', + }]) + self.assertEqual(1, len(data)) + alarm_id = data[0]['alarm_id'] + + self.put_json('/alarms/%s' % alarm_id, + params=json, + headers=headers) + alarm = list(self.conn.get_alarms(alarm_id=alarm_id, enabled=False))[0] + self.assertEqual(alarm.user_id, 'myuserid') + self.assertEqual(alarm.project_id, 'myprojectid') + for key in json: + if key.endswith('_rule'): + storage_key = 'rule' + else: + storage_key = key + self.assertEqual(getattr(alarm, storage_key), json[key]) + def test_put_alarm_wrong_field(self): # Note: wsme will ignore unknown fields so will just not appear in # the Alarm.