# Copyright 2012 Nebula, Inc. # # Licensed under the Apache License, Version 2.0 (the "License"); you may # not use this file except in compliance with the License. You may obtain # a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations # under the License. from django import forms from django import http from django.test.utils import override_settings import mock import six from horizon import exceptions from horizon.test import helpers as test from horizon import workflows PROJECT_ID = "a23lkjre389fwenj" INSTANCE_ID = "sdlkjhf9832roiw" def local_callback_func(request, context): return "one" def other_callback_func(request, context): return "two" def extra_callback_func(request, context): return "extra" class TestActionOne(workflows.Action): project_id = forms.ChoiceField(label="Project") user_id = forms.ChoiceField(label="User") class Meta(object): name = "Test Action One" slug = "test_action_one" def populate_project_id_choices(self, request, context): return [(PROJECT_ID, "test_project")] def populate_user_id_choices(self, request, context): return [(request.user.id, request.user.username)] def handle(self, request, context): return {"foo": "bar"} class TestActionTwo(workflows.Action): instance_id = forms.CharField(label="Instance") class Meta(object): name = "Test Action Two" slug = "test_action_two" class TestActionThree(workflows.Action): extra = forms.CharField(widget=forms.widgets.Textarea) class Meta(object): name = "Test Action Three" slug = "test_action_three" class AdminAction(workflows.Action): admin_id = forms.CharField(label="Admin") class Meta(object): name = "Admin Action" slug = "admin_action" permissions = ("horizon.test",) class TestDisabledAction(workflows.Action): disabled_id = forms.CharField(label="Disabled") class Meta(object): name = "Test Action Disabled" slug = "test_action_disabled" class AdminForbiddenAction(workflows.Action): admin_id = forms.CharField(label="Admin forbidden") class Meta(object): name = "Admin Action" slug = "admin_action" policy_rules = (('action', 'forbidden'),) class TestStepOne(workflows.Step): action_class = TestActionOne contributes = ("project_id", "user_id") class TestStepTwo(workflows.Step): action_class = TestActionTwo depends_on = ("project_id",) contributes = ("instance_id",) connections = {"project_id": (local_callback_func, "horizon.test.unit.workflows.test_workflows." "other_callback_func")} class TestExtraStep(workflows.Step): action_class = TestActionThree depends_on = ("project_id",) contributes = ("extra_data",) connections = {"project_id": (extra_callback_func,)} after = TestStepOne before = TestStepTwo class AdminStep(workflows.Step): action_class = AdminAction contributes = ("admin_id",) after = TestStepOne before = TestStepTwo class TestDisabledStep(workflows.Step): action_class = TestDisabledAction contributes = ("disabled_id",) def allowed(self, request): return False class AdminForbiddenStep(workflows.Step): action_class = AdminForbiddenAction class TestWorkflow(workflows.Workflow): slug = "test_workflow" default_steps = (TestStepOne, TestStepTwo) class TestWorkflowView(workflows.WorkflowView): workflow_class = TestWorkflow template_name = "workflow.html" class TestFullscreenWorkflow(workflows.Workflow): slug = 'test_fullscreen_workflow' default_steps = (TestStepOne, TestStepTwo) fullscreen = True class TestFullscreenWorkflowView(workflows.WorkflowView): workflow_class = TestFullscreenWorkflow template_name = "workflow.html" class WorkflowsTests(test.TestCase): def setUp(self): super(WorkflowsTests, self).setUp() self.policy_patcher = mock.patch( 'openstack_auth.policy.check', lambda action, request: True) self.policy_check = self.policy_patcher.start() self.addCleanup(mock.patch.stopall) def tearDown(self): super(WorkflowsTests, self).tearDown() self._reset_workflow() def _reset_workflow(self): TestWorkflow._cls_registry = set([]) def test_workflow_construction(self): TestWorkflow.register(TestExtraStep) flow = TestWorkflow(self.request) self.assertQuerysetEqual(flow.steps, ['', '', '']) self.assertEqual(set(['project_id']), flow.depends_on) def test_step_construction(self): step_one = TestStepOne(TestWorkflow(self.request)) # Action slug is moved from Meta by metaclass, and # Step inherits slug from action. self.assertEqual(TestActionOne.name, step_one.name) self.assertEqual(TestActionOne.slug, step_one.slug) # Handlers should be empty since there are no connections. self.assertEqual(step_one._handlers, {}) step_two = TestStepTwo(TestWorkflow(self.request)) # Handlers should be populated since we do have connections. self.assertEqual([local_callback_func, other_callback_func], step_two._handlers["project_id"]) def test_step_invalid_connections_handlers_not_list_or_tuple(self): class InvalidStepA(TestStepTwo): connections = {'project_id': {}} class InvalidStepB(TestStepTwo): connections = {'project_id': ''} with self.assertRaises(TypeError): InvalidStepA(TestWorkflow(self.request)) with self.assertRaises(TypeError): InvalidStepB(TestWorkflow(self.request)) def test_step_invalid_connection_handler_not_string_or_callable(self): class InvalidStepA(TestStepTwo): connections = {'project_id': (None,)} class InvalidStepB(TestStepTwo): connections = {'project_id': (0,)} with self.assertRaises(TypeError): InvalidStepA(TestWorkflow(self.request)) with self.assertRaises(TypeError): InvalidStepB(TestWorkflow(self.request)) def test_step_invalid_callback(self): # This should raise an exception class InvalidStep(TestStepTwo): connections = {"project_id": ('local_callback_func',)} with self.assertRaises(ValueError): InvalidStep(TestWorkflow(self.request)) def test_connection_handlers_called(self): TestWorkflow.register(TestExtraStep) flow = TestWorkflow(self.request) # This should set the value without any errors, but trigger nothing flow.context['does_not_exist'] = False self.assertFalse(flow.context['does_not_exist']) # The order here is relevant. Note that we inserted "extra" between # steps one and two, and one has no handlers, so we should see # a response from extra, then one from each of step two's handlers. val = flow.context.set('project_id', PROJECT_ID) self.assertEqual([('test_action_three', 'extra'), ('test_action_two', 'one'), ('test_action_two', 'two')], val) def test_workflow_validation(self): flow = TestWorkflow(self.request) # Missing items fail validation. with self.assertRaises(exceptions.WorkflowValidationError): flow.is_valid() # All required items pass validation. seed = {"project_id": PROJECT_ID, "user_id": self.user.id, "instance_id": INSTANCE_ID} req = self.factory.post("/", seed) req.user = self.user flow = TestWorkflow(req, context_seed={"project_id": PROJECT_ID}) for step in flow.steps: if not step.action.is_valid(): self.fail("Step %s was unexpectedly invalid: %s" % (step.slug, step.action.errors)) self.assertTrue(flow.is_valid()) # Additional items shouldn't affect validation flow.context.set("extra_data", "foo") self.assertTrue(flow.is_valid()) def test_workflow_finalization(self): flow = TestWorkflow(self.request) self.assertTrue(flow.finalize()) def test_workflow_view(self): view = TestWorkflowView.as_view() req = self.factory.get("/") res = view(req) self.assertEqual(200, res.status_code) def test_workflow_registration(self): req = self.factory.get("/foo") flow = TestWorkflow(req) self.assertQuerysetEqual(flow.steps, ['', '']) TestWorkflow.register(TestExtraStep) flow = TestWorkflow(req) self.assertQuerysetEqual(flow.steps, ['', '', '']) def test_workflow_render(self): TestWorkflow.register(TestExtraStep) req = self.factory.get("/foo") flow = TestWorkflow(req) output = http.HttpResponse(flow.render()) self.assertContains(output, six.text_type(flow.name)) self.assertContains(output, six.text_type(TestActionOne.name)) self.assertContains(output, six.text_type(TestActionTwo.name)) self.assertContains(output, six.text_type(TestActionThree.name)) def test_has_permissions(self): self.assertQuerysetEqual(TestWorkflow._cls_registry, []) TestWorkflow.register(AdminStep) flow = TestWorkflow(self.request) step = AdminStep(flow) self.assertItemsEqual(step.permissions, ("horizon.test",)) self.assertQuerysetEqual(flow.steps, ['', '']) self.set_permissions(['test']) self.request.user = self.user flow = TestWorkflow(self.request) self.assertQuerysetEqual(flow.steps, ['', '', '']) def test_has_allowed(self): TestWorkflow.register(TestDisabledStep) flow = TestWorkflow(self.request) # Check TestDisabledStep is not included # even though TestDisabledStep is registered. self.assertQuerysetEqual(flow.steps, ['', '']) def test_step_is_hidden_on_policy(self): self.policy_patcher.stop() def policy_check(action, request): if action == (('action', 'forbidden'),): return False return True with mock.patch('openstack_auth.policy.check', policy_check): TestWorkflow.register(AdminForbiddenStep) flow = TestWorkflow(self.request) output = http.HttpResponse(flow.render()) self.assertNotContains(output, six.text_type(AdminForbiddenAction.name)) def test_entry_point(self): req = self.factory.get("/foo") flow = TestWorkflow(req) self.assertEqual("test_action_one", flow.get_entry_point()) flow = TestWorkflow(req, entry_point="test_action_two") self.assertEqual("test_action_two", flow.get_entry_point()) @override_settings(ALLOWED_HOSTS=['localhost']) def test_redirect_url_safe(self): url = 'http://localhost/test' view = TestWorkflowView() request = self.factory.get("/", data={ 'next': url, }) request.META['SERVER_NAME'] = "localhost" view.request = request context = view.get_context_data() self.assertEqual(url, context['REDIRECT_URL']) @override_settings(ALLOWED_HOSTS=['localhost']) def test_redirect_url_unsafe(self): url = 'http://evilcorp/test' view = TestWorkflowView() request = self.factory.get("/", data={ 'next': url, }) request.META['SERVER_NAME'] = "localhost" view.request = request context = view.get_context_data() self.assertIsNone(context['REDIRECT_URL'])