Enhance policy rules to workflow actions

Policy rules are a more flexible way (than permissions) to determine
what Dashboards/Panels/Table actions are visible for the given
user. But workflow actions (and thus workflow steps) do not use
them. As a consequence, there are situations when according to the
backend service policies (e.g. Neutron) an action represented by a
workflow step is denied for the user (e.g. user is not permitted to
create subnets), yet he sees the corresponding steps in Horizon,
provides and submits the data which leads to an error from Neutron
side. More appropriate behavior here for Horizon would be to not show
to the user the workflow steps he is not able to complete - and if
these steps are required for successfully completing the workflow,
make him unable to start the workflow itself.

Implements: blueprint add-policy-rules-to-workflow-actions
Change-Id: Idededa3dee361e4a42106921a9f332a69c14ae21
This commit is contained in:
Timur Sufiev 2014-12-24 10:23:00 -08:00
parent a45b6b9660
commit 4328c72af1
3 changed files with 72 additions and 6 deletions

View File

@ -14,6 +14,7 @@
from django import forms
from django import http
import mock
import six
@ -81,6 +82,15 @@ class AdminAction(workflows.Action):
permissions = ("horizon.test",)
class AdminForbiddenAction(workflows.Action):
admin_id = forms.CharField(label="Admin forbidden")
class Meta(object):
name = "Admin Action"
slug = "admin_action"
policy_rules = (('action', 'forbidden'),)
class TestStepOne(workflows.Step):
action_class = TestActionOne
contributes = ("project_id", "user_id")
@ -111,6 +121,10 @@ class AdminStep(workflows.Step):
before = TestStepTwo
class AdminForbiddenStep(workflows.Step):
action_class = AdminForbiddenAction
class TestWorkflow(workflows.Workflow):
slug = "test_workflow"
default_steps = (TestStepOne, TestStepTwo)
@ -135,6 +149,10 @@ class TestFullscreenWorkflowView(workflows.WorkflowView):
class WorkflowsTests(test.TestCase):
def setUp(self):
super(WorkflowsTests, self).setUp()
self.policy_patcher = mock.patch(
'openstack_auth.policy.check', lambda action, request: True)
self.policy_check = self.policy_patcher.start()
self.addCleanup(mock.patch.stopall)
def tearDown(self):
super(WorkflowsTests, self).tearDown()
@ -294,6 +312,21 @@ class WorkflowsTests(test.TestCase):
'<AdminStep: admin_action>',
'<TestStepTwo: test_action_two>'])
def test_step_is_hidden_on_policy(self):
self.policy_patcher.stop()
def policy_check(action, request):
if action == (('action', 'forbidden'),):
return False
return True
with mock.patch('openstack_auth.policy.check', policy_check):
TestWorkflow.register(AdminForbiddenStep)
flow = TestWorkflow(self.request)
output = http.HttpResponse(flow.render())
self.assertNotContains(output,
six.text_type(AdminForbiddenAction.name))
def test_entry_point(self):
req = self.factory.get("/foo")
flow = TestWorkflow(req)

View File

@ -13,6 +13,7 @@
# under the License.
import copy
from importlib import import_module # noqa
import inspect
import logging
@ -25,7 +26,7 @@ from django.template.defaultfilters import safe # noqa
from django.template.defaultfilters import slugify # noqa
from django.utils.encoding import force_text
from django.utils.translation import ugettext_lazy as _
from importlib import import_module
from openstack_auth import policy
import six
from horizon import base
@ -66,6 +67,7 @@ class ActionMetaclass(forms.forms.DeclarativeFieldsMetaclass):
cls.name = getattr(opts, "name", name)
cls.slug = getattr(opts, "slug", slugify(name))
cls.permissions = getattr(opts, "permissions", ())
cls.policy_rules = getattr(opts, "policy_rules", ())
cls.progress_message = getattr(opts,
"progress_message",
_("Processing..."))
@ -113,6 +115,23 @@ class Action(forms.Form):
A list of permission names which this action requires in order to be
completed. Defaults to an empty list (``[]``).
.. attribute:: policy_rules
list of scope and rule tuples to do policy checks on, the
composition of which is (scope, rule)
scope: service type managing the policy for action
rule: string representing the action to be checked
for a policy that requires a single rule check:
policy_rules should look like
"(("compute", "compute:create_instance"),)"
for a policy that requires multiple rule checks:
rules should look like
"(("identity", "identity:list_users"),
("identity", "identity:list_roles"))"
where two service-rule clauses are OR-ed.
.. attribute:: help_text
A string of simple help text to be displayed alongside the Action's
@ -300,6 +319,7 @@ class Step(object):
self.slug = self.action_class.slug
self.name = self.action_class.name
self.permissions = self.action_class.permissions
self.policy_rules = self.action_class.policy_rules
self.has_errors = False
self._handlers = {}
@ -673,10 +693,12 @@ class Workflow(html.HTMLElement):
for default_step in self.default_steps:
self.register(default_step)
self._registry[default_step] = default_step(self)
self._ordered_steps = [self._registry[step_class]
for step_class in ordered_step_classes
if has_permissions(self.request.user,
self._registry[step_class])]
self._ordered_steps = []
for step_class in ordered_step_classes:
cls = self._registry[step_class]
if (has_permissions(self.request.user, cls) and
policy.check(cls.policy_rules, self.request)):
self._ordered_steps.append(cls)
def _order_steps(self):
steps = list(copy.copy(self.default_steps))
@ -835,6 +857,13 @@ class Workflow(html.HTMLElement):
else:
return message
def verify_integrity(self):
provided_keys = self.contributions | set(self.context_seed.keys())
if len(self.depends_on - provided_keys):
raise exceptions.NotAvailable(
_("The current user has insufficient permission to complete "
"the requested task."))
def render(self):
"""Renders the workflow."""
workflow_template = template.loader.get_template(self.template_name)

View File

@ -89,6 +89,7 @@ class WorkflowView(hz_views.ModalBackdropMixin, generic.TemplateView):
"""
context = super(WorkflowView, self).get_context_data(**kwargs)
workflow = self.get_workflow()
workflow.verify_integrity()
context[self.context_object_name] = workflow
next = self.request.GET.get(workflow.redirect_param_name)
context['REDIRECT_URL'] = next
@ -141,7 +142,10 @@ class WorkflowView(hz_views.ModalBackdropMixin, generic.TemplateView):
def get(self, request, *args, **kwargs):
"""Handler for HTTP GET requests."""
context = self.get_context_data(**kwargs)
try:
context = self.get_context_data(**kwargs)
except exceptions.NotAvailable:
exceptions.handle(request)
self.set_workflow_step_errors(context)
return self.render_to_response(context)