Control Events RBAC from policy.json
The ceilometer events RBAC is currently hard-coded so that only an admin user can view events.The end-user should be able to customize who should be able to view events rather than hard-coding the control to admins. This changeset adds two new rules into the policy.json so that RBAC for events index and show methods can be configured using the policy.json file. DocImpact Change-Id: I7bf4d385b9ee8fa8f1097b6400cbbc4135f2a9b6 blueprint: events-rbac
This commit is contained in:
parent
d79593c943
commit
52235d0748
@ -30,6 +30,7 @@ import wsmeext.pecan as wsme_pecan
|
||||
|
||||
from ceilometer.api.controllers.v2 import base
|
||||
from ceilometer.api.controllers.v2 import utils as v2_utils
|
||||
from ceilometer.api import rbac
|
||||
from ceilometer.event.storage import models as event_models
|
||||
from ceilometer.i18n import _
|
||||
from ceilometer import storage
|
||||
@ -238,7 +239,6 @@ class EventsController(rest.RestController):
|
||||
"""Works on Events."""
|
||||
|
||||
@v2_utils.requires_context
|
||||
@v2_utils.requires_admin
|
||||
@wsme_pecan.wsexpose([Event], [EventQuery], int)
|
||||
def get_all(self, q=None, limit=None):
|
||||
"""Return all events matching the query filters.
|
||||
@ -246,6 +246,7 @@ class EventsController(rest.RestController):
|
||||
:param q: Filter arguments for which Events to return
|
||||
:param limit: Maximum number of samples to be returned.
|
||||
"""
|
||||
rbac.enforce("events:index", pecan.request)
|
||||
q = q or []
|
||||
limit = v2_utils.enforce_limit(limit)
|
||||
event_filter = _event_query_to_event_filter(q)
|
||||
@ -259,13 +260,13 @@ class EventsController(rest.RestController):
|
||||
limit)]
|
||||
|
||||
@v2_utils.requires_context
|
||||
@v2_utils.requires_admin
|
||||
@wsme_pecan.wsexpose(Event, wtypes.text)
|
||||
def get_one(self, message_id):
|
||||
"""Return a single event with the given message id.
|
||||
|
||||
:param message_id: Message ID of the Event to be returned
|
||||
"""
|
||||
rbac.enforce("events:show", pecan.request)
|
||||
event_filter = storage.EventFilter(message_id=message_id)
|
||||
events = [event for event
|
||||
in pecan.request.event_storage_conn.get_events(event_filter)]
|
||||
|
@ -17,8 +17,11 @@
|
||||
import datetime
|
||||
import hashlib
|
||||
import json
|
||||
import os
|
||||
|
||||
from oslo_utils import fileutils
|
||||
from oslo_utils import timeutils
|
||||
import six
|
||||
import webtest
|
||||
|
||||
from ceilometer.api import app
|
||||
@ -217,14 +220,6 @@ class TestAPIEventACL(TestAPIACL):
|
||||
|
||||
PATH = '/events'
|
||||
|
||||
def test_non_admin_get_events(self):
|
||||
data = self.get_json(self.PATH, expect_errors=True,
|
||||
headers={"X-Roles": "Member",
|
||||
"X-Auth-Token": VALID_TOKEN2,
|
||||
"X-Project-Id": "project-good",
|
||||
"X-User-Id": "user-good"})
|
||||
self.assertEqual(401, data.status_int)
|
||||
|
||||
def test_non_admin_get_event_types(self):
|
||||
data = self.get_json('/event_types', expect_errors=True,
|
||||
headers={"X-Roles": "Member",
|
||||
@ -238,6 +233,48 @@ class TestApiEventRBAC(v2.FunctionalTest,
|
||||
|
||||
PATH = '/events'
|
||||
|
||||
def setUp(self):
|
||||
super(TestApiEventRBAC, self).setUp()
|
||||
content = ('{"context_is_admin": "role:admin",'
|
||||
'"segregation": "rule:context_is_admin",'
|
||||
'"default" : "!",'
|
||||
'"telemetry:events:index": "rule:context_is_admin",'
|
||||
'"telemetry:events:show": "rule:context_is_admin"}')
|
||||
if six.PY3:
|
||||
content = content.encode('utf-8')
|
||||
self.tempfile = fileutils.write_to_tempfile(content=content,
|
||||
prefix='policy',
|
||||
suffix='.json')
|
||||
|
||||
self.CONF.set_override("policy_file",
|
||||
self.path_get(self.tempfile),
|
||||
group='oslo_policy')
|
||||
self.app = self._make_app()
|
||||
|
||||
def tearDown(self):
|
||||
os.remove(self.tempfile)
|
||||
super(TestApiEventRBAC, self).tearDown()
|
||||
|
||||
def test_get_event_by_message_rbac(self):
|
||||
headers_rbac = {"X-Roles": "non-admin"}
|
||||
data = self.get_json(self.PATH + "/100",
|
||||
expect_errors=True,
|
||||
headers=headers_rbac,
|
||||
status=403)
|
||||
self.assertEqual(u'403 Forbidden\n\nAccess was denied to this '
|
||||
'resource.\n\n RBAC Authorization Failed ',
|
||||
data.json['error_message'])
|
||||
|
||||
def test_get_events_rbac(self):
|
||||
headers_rbac = {"X-Roles": "non-admin"}
|
||||
data = self.get_json(self.PATH,
|
||||
expect_errors=True,
|
||||
headers=headers_rbac,
|
||||
status=403)
|
||||
self.assertEqual(u'403 Forbidden\n\nAccess was denied to this '
|
||||
'resource.\n\n RBAC Authorization Failed ',
|
||||
data.json['error_message'])
|
||||
|
||||
def test_get_events_without_project(self):
|
||||
headers_no_proj = {"X-Roles": "admin", "X-User-Id": "user-good"}
|
||||
resp = self.get_json(self.PATH, expect_errors=True,
|
||||
|
@ -27,5 +27,8 @@
|
||||
|
||||
"telemetry:alarm_history": "rule:context_is_admin",
|
||||
"telemetry:change_alarm_state": "rule:context_is_admin",
|
||||
"telemetry:query_alarm_history": "rule:context_is_admin"
|
||||
"telemetry:query_alarm_history": "rule:context_is_admin",
|
||||
|
||||
"telemetry:events:index": "rule:context_is_admin",
|
||||
"telemetry:events:show": "rule:context_is_admin"
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user