Merge "Verify user/project ID for alarm created by non-admin user"

This commit is contained in:
Jenkins 2014-03-27 16:28:34 +00:00 committed by Gerrit Code Review
commit e8a7eed15c
2 changed files with 105 additions and 11 deletions

View File

@ -272,9 +272,10 @@ class Query(_Base):
class ProjectNotAuthorized(ClientSideError):
def __init__(self, id):
def __init__(self, id, aspect='project'):
params = dict(aspect=aspect, id=id)
super(ProjectNotAuthorized, self).__init__(
_("Not Authorized to access project %s") % id,
_("Not Authorized to access %(aspect)s %(id)s") % params,
status_code=401)
@ -2037,15 +2038,24 @@ class AlarmsController(rest.RestController):
now = timeutils.utcnow()
data.alarm_id = str(uuid.uuid4())
user, project = acl.get_limited_to(pecan.request.headers)
if user:
data.user_id = user
elif data.user_id == wtypes.Unset:
data.user_id = pecan.request.headers.get('X-User-Id')
if project:
data.project_id = project
elif data.project_id == wtypes.Unset:
data.project_id = pecan.request.headers.get('X-Project-Id')
user_limit, project_limit = acl.get_limited_to(pecan.request.headers)
def _set_ownership(aspect, owner_limitation, header):
attr = '%s_id' % aspect
requested_owner = getattr(data, attr)
explicit_owner = requested_owner != wtypes.Unset
caller = pecan.request.headers.get(header)
if (owner_limitation and explicit_owner
and requested_owner != caller):
raise ProjectNotAuthorized(requested_owner, aspect)
actual_owner = (owner_limitation or
requested_owner if explicit_owner else caller)
setattr(data, attr, actual_owner)
_set_ownership('user', user_limit, 'X-User-Id')
_set_ownership('project', project_limit, 'X-Project-Id')
data.timestamp = now
data.state_timestamp = now

View File

@ -857,6 +857,90 @@ class TestAlarms(FunctionalTest,
alarms[0].project_id)
self._verify_alarm(json, alarms[0], 'added_alarm')
@staticmethod
def _alarm_representation_owned_by(identifiers):
json = {
'name': 'added_alarm',
'enabled': False,
'type': 'threshold',
'ok_actions': ['http://something/ok'],
'threshold_rule': {
'meter_name': 'ameter',
'query': [{'field': 'metadata.field',
'op': 'eq',
'value': '5',
'type': 'string'}],
'comparison_operator': 'le',
'statistic': 'count',
'threshold': 50,
'evaluation_periods': 3,
'period': 180,
}
}
for aspect, id in identifiers.iteritems():
json['%s_id' % aspect] = id
return json
def _do_test_post_alarm_as_nonadmin_on_behalf_of_another(self,
identifiers):
"""Test that posting an alarm as non-admin on behalf of another
user/project fails with an explicit 401 instead of reverting
to the requestor's identity.
"""
json = self._alarm_representation_owned_by(identifiers)
headers = {}
headers.update(self.auth_headers)
headers['X-Roles'] = 'demo'
resp = self.post_json('/alarms', params=json, status=401,
headers=headers)
aspect = 'user' if 'user' in identifiers else 'project'
params = dict(aspect=aspect, id=identifiers[aspect])
self.assertEqual("Not Authorized to access %(aspect)s %(id)s" % params,
jsonutils.loads(resp.body)['error_message']
['faultstring'])
def test_post_alarm_as_nonadmin_on_behalf_of_another_user(self):
identifiers = dict(user='auseridthatisnotmine')
self._do_test_post_alarm_as_nonadmin_on_behalf_of_another(identifiers)
def test_post_alarm_as_nonadmin_on_behalf_of_another_project(self):
identifiers = dict(project='aprojectidthatisnotmine')
self._do_test_post_alarm_as_nonadmin_on_behalf_of_another(identifiers)
def test_post_alarm_as_nonadmin_on_behalf_of_another_creds(self):
identifiers = dict(user='auseridthatisnotmine',
project='aprojectidthatisnotmine')
self._do_test_post_alarm_as_nonadmin_on_behalf_of_another(identifiers)
def _do_test_post_alarm_as_nonadmin_on_behalf_of_self(self, identifiers):
"""Test posting an alarm as non-admin on behalf of own user/project
creates alarm associated with the requestor's identity.
"""
json = self._alarm_representation_owned_by(identifiers)
headers = {}
headers.update(self.auth_headers)
headers['X-Roles'] = 'demo'
self.post_json('/alarms', params=json, status=201, headers=headers)
alarms = list(self.conn.get_alarms(enabled=False))
self.assertEqual(1, len(alarms))
self.assertEqual(alarms[0].user_id,
self.auth_headers['X-User-Id'])
self.assertEqual(alarms[0].project_id,
self.auth_headers['X-Project-Id'])
def test_post_alarm_as_nonadmin_on_behalf_of_own_user(self):
identifiers = dict(user=self.auth_headers['X-User-Id'])
self._do_test_post_alarm_as_nonadmin_on_behalf_of_self(identifiers)
def test_post_alarm_as_nonadmin_on_behalf_of_own_project(self):
identifiers = dict(project=self.auth_headers['X-Project-Id'])
self._do_test_post_alarm_as_nonadmin_on_behalf_of_self(identifiers)
def test_post_alarm_as_nonadmin_on_behalf_of_own_creds(self):
identifiers = dict(user=self.auth_headers['X-User-Id'],
project=self.auth_headers['X-Project-Id'])
self._do_test_post_alarm_as_nonadmin_on_behalf_of_self(identifiers)
def test_post_alarm_combination(self):
json = {
'enabled': False,