api: allow alarm creation for others project by admins

Change-Id: I8da7e81da799b5cbd6b1ce158ac10e12567aa3d7
Fixes-Bug: #1226470
This commit is contained in:
Julien Danjou 2013-09-20 11:48:13 +02:00
parent 70c4256856
commit f7cb4943c7
3 changed files with 215 additions and 7 deletions

View File

@ -44,12 +44,29 @@ def install(app, conf):
conf=dict(conf.get(OPT_GROUP_NAME)))
def get_limited_to_project(headers):
"""Return the tenant the request should be limited to."""
def get_limited_to(headers):
"""Return the user and project the request should be limited to.
:param headers: HTTP headers dictionary
:return: A tuple of (user, project), set to None if there's no limit on
one of these.
"""
global _ENFORCER
if not _ENFORCER:
_ENFORCER = policy.Enforcer()
if not _ENFORCER.enforce('context_is_admin',
{},
{'roles': headers.get('X-Roles', "").split(",")}):
return headers.get('X-Project-Id')
return headers.get('X-User-Id'), headers.get('X-Project-Id')
return None, None
def get_limited_to_project(headers):
"""Return the project the request should be limited to.
:param headers: HTTP headers dictionary
:return: A project, or None if there's no limit on it.
"""
return get_limited_to(headers)[1]

View File

@ -1272,8 +1272,9 @@ class AlarmController(rest.RestController):
now = timeutils.utcnow()
data.alarm_id = self._id
data.user_id = alarm_in.user_id
data.project_id = alarm_in.project_id
user, project = acl.get_limited_to(pecan.request.headers)
data.user_id = user or data.user_id or alarm_in.user_id
data.project_id = project or data.project_id or alarm_in.project_id
data.timestamp = now
if alarm_in.state != data.state:
data.state_timestamp = now
@ -1391,8 +1392,11 @@ class AlarmsController(rest.RestController):
now = timeutils.utcnow()
data.alarm_id = str(uuid.uuid4())
data.user_id = pecan.request.headers.get('X-User-Id')
data.project_id = pecan.request.headers.get('X-Project-Id')
user, project = acl.get_limited_to(pecan.request.headers)
data.user_id = (user or data.user_id or
pecan.request.headers.get('X-User-Id'))
data.project_id = (project or data.project_id or
pecan.request.headers.get('X-Project-Id'))
data.timestamp = now
data.state_timestamp = now

View File

@ -455,6 +455,141 @@ class TestAlarms(FunctionalTest,
else:
self.fail("Alarm not found")
def test_post_alarm_as_admin(self):
"""Test the creation of an alarm as admin for another project."""
json = {
'enabled': False,
'name': 'added_alarm',
'state': 'ok',
'type': 'threshold',
'user_id': 'auseridthatisnotmine',
'project_id': 'aprojectidthatisnotmine',
'threshold_rule': {
'meter_name': 'ameter',
'query': [{'field': 'metadata.field',
'op': 'eq',
'value': '5',
'type': 'string'},
{'field': 'project_id', 'op': 'eq',
'value': 'aprojectidthatisnotmine'}],
'comparison_operator': 'le',
'statistic': 'count',
'threshold': 50,
'evaluation_periods': 3,
'period': 180,
}
}
headers = {}
headers.update(self.auth_headers)
headers['X-Roles'] = 'admin'
self.post_json('/alarms', params=json, status=201,
headers=headers)
alarms = list(self.conn.get_alarms(enabled=False))
self.assertEqual(1, len(alarms))
self.assertEqual(alarms[0].user_id, 'auseridthatisnotmine')
self.assertEqual(alarms[0].project_id, 'aprojectidthatisnotmine')
if alarms[0].name == 'added_alarm':
for key in json:
if key.endswith('_rule'):
storage_key = 'rule'
else:
storage_key = key
self.assertEqual(getattr(alarms[0], storage_key),
json[key])
else:
self.fail("Alarm not found")
def test_post_alarm_as_admin_no_user(self):
"""Test the creation of an alarm as admin for another project but
forgetting to set the values.
"""
json = {
'enabled': False,
'name': 'added_alarm',
'state': 'ok',
'type': 'threshold',
'project_id': 'aprojectidthatisnotmine',
'threshold_rule': {
'meter_name': 'ameter',
'query': [{'field': 'metadata.field',
'op': 'eq',
'value': '5',
'type': 'string'},
{'field': 'project_id', 'op': 'eq',
'value': 'aprojectidthatisnotmine'}],
'comparison_operator': 'le',
'statistic': 'count',
'threshold': 50,
'evaluation_periods': 3,
'period': 180,
}
}
headers = {}
headers.update(self.auth_headers)
headers['X-Roles'] = 'admin'
self.post_json('/alarms', params=json, status=201,
headers=headers)
alarms = list(self.conn.get_alarms(enabled=False))
self.assertEqual(1, len(alarms))
self.assertEqual(alarms[0].user_id, self.auth_headers['X-User-Id'])
self.assertEqual(alarms[0].project_id, 'aprojectidthatisnotmine')
if alarms[0].name == 'added_alarm':
for key in json:
if key.endswith('_rule'):
storage_key = 'rule'
else:
storage_key = key
self.assertEqual(getattr(alarms[0], storage_key),
json[key])
else:
self.fail("Alarm not found")
def test_post_alarm_as_admin_no_project(self):
"""Test the creation of an alarm as admin for another project but
forgetting to set the values.
"""
json = {
'enabled': False,
'name': 'added_alarm',
'state': 'ok',
'type': 'threshold',
'user_id': 'auseridthatisnotmine',
'threshold_rule': {
'meter_name': 'ameter',
'query': [{'field': 'metadata.field',
'op': 'eq',
'value': '5',
'type': 'string'},
{'field': 'project_id', 'op': 'eq',
'value': 'aprojectidthatisnotmine'}],
'comparison_operator': 'le',
'statistic': 'count',
'threshold': 50,
'evaluation_periods': 3,
'period': 180,
}
}
headers = {}
headers.update(self.auth_headers)
headers['X-Roles'] = 'admin'
self.post_json('/alarms', params=json, status=201,
headers=headers)
alarms = list(self.conn.get_alarms(enabled=False))
self.assertEqual(1, len(alarms))
self.assertEqual(alarms[0].user_id, 'auseridthatisnotmine')
self.assertEqual(alarms[0].project_id,
self.auth_headers['X-Project-Id'])
if alarms[0].name == 'added_alarm':
for key in json:
if key.endswith('_rule'):
storage_key = 'rule'
else:
storage_key = key
self.assertEqual(getattr(alarms[0], storage_key),
json[key])
else:
self.fail("Alarm not found")
def test_post_alarm_combination(self):
json = {
'enabled': False,
@ -551,6 +686,58 @@ class TestAlarms(FunctionalTest,
storage_key = key
self.assertEqual(getattr(alarm, storage_key), json[key])
def test_put_alarm_as_admin(self):
json = {
'user_id': 'myuserid',
'project_id': 'myprojectid',
'enabled': False,
'name': 'name_put',
'state': 'ok',
'type': 'threshold',
'ok_actions': ['http://something/ok'],
'alarm_actions': ['http://something/alarm'],
'insufficient_data_actions': ['http://something/no'],
'repeat_actions': True,
'threshold_rule': {
'meter_name': 'ameter',
'query': [{'field': 'metadata.field',
'op': 'eq',
'value': '5',
'type': 'string'},
{'field': 'project_id', 'op': 'eq',
'value': 'myprojectid'}],
'comparison_operator': 'le',
'statistic': 'count',
'threshold': 50,
'evaluation_periods': 3,
'period': 180,
}
}
headers = {}
headers.update(self.auth_headers)
headers['X-Roles'] = 'admin'
data = self.get_json('/alarms',
headers=headers,
q=[{'field': 'name',
'value': 'name1',
}])
self.assertEqual(1, len(data))
alarm_id = data[0]['alarm_id']
self.put_json('/alarms/%s' % alarm_id,
params=json,
headers=headers)
alarm = list(self.conn.get_alarms(alarm_id=alarm_id, enabled=False))[0]
self.assertEqual(alarm.user_id, 'myuserid')
self.assertEqual(alarm.project_id, 'myprojectid')
for key in json:
if key.endswith('_rule'):
storage_key = 'rule'
else:
storage_key = key
self.assertEqual(getattr(alarm, storage_key), json[key])
def test_put_alarm_wrong_field(self):
# Note: wsme will ignore unknown fields so will just not appear in
# the Alarm.