Avoid leaking admin-ness into combination alarms

Previously when an admin created a combination alarm on
behalf of an non-admin identity, this had the effect of leaking
visibility onto alarms that would not normally
be visible to the non-admin tenant.

Now we validate all alarm ids with the project ID of the non-admin
identity that will ultimately own the alarm instead of the project ID
of the API caller.

Fixes bug #1237632

Change-Id: I5d1cf41c9182f09bc37b93deb14dda58f1d6dcd6
This commit is contained in:
Mehdi Abaakouk
2013-10-10 19:03:15 +02:00
parent 7ee6f6d294
commit a8e93ddc9e
2 changed files with 127 additions and 18 deletions

View File

@@ -277,6 +277,25 @@ class ProjectNotAuthorized(Exception):
_("Not Authorized to access project %s") % id)
def _get_auth_project(on_behalf_of=None):
# when an alarm is created by an admin on behalf of another tenant
# we must ensure for:
# - threshold alarm, that an implicit query constraint on project_id is
# added so that admin-level visibility on statistics is not leaked
# - combination alarm, that alarm ids verification is scoped to
# alarms owned by the alarm project.
# hence for null auth_project (indicating admin-ness) we check if
# the creating tenant differs from the tenant on whose behalf the
# alarm is being created
auth_project = acl.get_limited_to_project(pecan.request.headers)
created_by = pecan.request.headers.get('X-Project-Id')
is_admin = auth_project is None
if is_admin and on_behalf_of != created_by:
auth_project = on_behalf_of
return auth_project
def _sanitize_query(query, db_func, on_behalf_of=None):
'''Check the query to see if:
1) the request is coming from admin - then allow full visibility
@@ -284,17 +303,8 @@ def _sanitize_query(query, db_func, on_behalf_of=None):
project.
'''
q = copy.copy(query)
auth_project = acl.get_limited_to_project(pecan.request.headers)
created_by = pecan.request.headers.get('X-Project-Id')
# when an alarm is created by an admin on behalf of another tenant
# we must ensure that an implicit query constraint on project_id is
# added so that admin-level visibility on statistics is not leaked,
# hence for null auth_project (indicating admin-ness) we check if
# the creating tenant differs from the tenant on whose behalf the
# alarm is being created
auth_project = (auth_project or
(on_behalf_of if on_behalf_of != created_by else None))
auth_project = _get_auth_project(on_behalf_of)
if auth_project:
_verify_query_segregation(q, auth_project)
@@ -1081,14 +1091,6 @@ class AlarmCombinationRule(_Base):
pecan.response.translatable_error = error
raise wsme.exc.ClientSideError(unicode(error))
for id in combination_rule.alarm_ids:
auth_project = acl.get_limited_to_project(pecan.request.headers)
alarms = list(pecan.request.storage_conn.get_alarms(
alarm_id=id, project=auth_project))
if len(alarms) < 1:
error = _("Alarm %s doesn't exists") % id
pecan.response.translatable_error = error
raise wsme.exc.ClientSideError(unicode(error))
return combination_rule
@@ -1178,6 +1180,7 @@ class Alarm(_Base):
error = _("%s is mandatory") % field
pecan.response.translatable_error = error
raise wsme.exc.ClientSideError(unicode(error))
if alarm.threshold_rule:
# ensure an implicit constraint on project_id is added to
# the query if not already present
@@ -1186,6 +1189,15 @@ class Alarm(_Base):
storage.SampleFilter.__init__,
on_behalf_of=alarm.project_id
)
elif alarm.combination_rule:
auth_project = _get_auth_project(alarm.project_id)
for id in alarm.combination_rule.alarm_ids:
alarms = list(pecan.request.storage_conn.get_alarms(
alarm_id=id, project=auth_project))
if not alarms:
error = _("Alarm %s doesn't exist") % id
pecan.response.translatable_error = error
raise wsme.exc.ClientSideError(unicode(error))
if alarm.threshold_rule and alarm.combination_rule:
error = _("threshold_rule and combination_rule "

View File

@@ -646,7 +646,104 @@ class TestAlarms(FunctionalTest,
else:
self.fail("Alarm not found")
def test_post_combination_alarm_as_user_with_unauthorized_alarm(self):
"""Test that post a combination alarm as normal user/project
with a alarm_id unauthorized for this project/user
"""
json = {
'enabled': False,
'name': 'added_alarm',
'state': 'ok',
'type': 'combination',
'ok_actions': ['http://something/ok'],
'alarm_actions': ['http://something/alarm'],
'insufficient_data_actions': ['http://something/no'],
'repeat_actions': True,
'combination_rule': {
'alarm_ids': ['a',
'b'],
'operator': 'and',
}
}
an_other_user_auth = {'X-User-Id': str(uuid.uuid4()),
'X-Project-Id': str(uuid.uuid4())}
resp = self.post_json('/alarms', params=json, status=400,
headers=an_other_user_auth)
self.assertEqual(jsonutils.loads(resp.body)['error_message']
['faultstring'],
"Alarm a doesn't exist")
def test_post_combination_alarm_as_admin_1(self):
"""Test that post a combination alarm as admin on behalf of a other
user/project with a alarm_id unauthorized for this project/user
"""
json = {
'enabled': False,
'name': 'added_alarm',
'state': 'ok',
'user_id': 'auseridthatisnotmine',
'project_id': 'aprojectidthatisnotmine',
'type': 'combination',
'ok_actions': ['http://something/ok'],
'alarm_actions': ['http://something/alarm'],
'insufficient_data_actions': ['http://something/no'],
'repeat_actions': True,
'combination_rule': {
'alarm_ids': ['a',
'b'],
'operator': 'and',
}
}
headers = {}
headers.update(self.auth_headers)
headers['X-Roles'] = 'admin'
resp = self.post_json('/alarms', params=json, status=400,
headers=headers)
self.assertEqual(jsonutils.loads(resp.body)['error_message']
['faultstring'],
"Alarm a doesn't exist")
def test_post_combination_alarm_as_admin_2(self):
"""Test that post a combination alarm as admin on behalf of nobody
with a alarm_id of someone else
"""
json = {
'enabled': False,
'name': 'added_alarm',
'state': 'ok',
'type': 'combination',
'ok_actions': ['http://something/ok'],
'alarm_actions': ['http://something/alarm'],
'insufficient_data_actions': ['http://something/no'],
'repeat_actions': True,
'combination_rule': {
'alarm_ids': ['a',
'b'],
'operator': 'and',
}
}
headers = {}
headers.update(self.auth_headers)
headers['X-Roles'] = 'admin'
self.post_json('/alarms', params=json, status=201,
headers=headers)
alarms = list(self.conn.get_alarms(enabled=False))
if alarms[0].name == 'added_alarm':
for key in json:
if key.endswith('_rule'):
storage_key = 'rule'
else:
storage_key = key
self.assertEqual(getattr(alarms[0], storage_key),
json[key])
else:
self.fail("Alarm not found")
def test_post_invalid_alarm_combination(self):
"""Test that post a combination alarm with a not existing alarm id
"""
json = {
'enabled': False,
'name': 'added_alarm',