# Copyright 2014 - Mirantis, Inc. # Copyright 2015 - StackStorm, Inc. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import copy from oslo_config import cfg from mistral.db.v2.sqlalchemy import api as db_api from mistral import exceptions as exc from mistral.lang import parser as spec_parser from mistral.lang.v2 import tasks from mistral.lang.v2 import workflows from mistral.services import workflows as wf_service from mistral.tests.unit import base from mistral.workflow import states from mistral_lib import utils # Use the set_default method to set value otherwise in certain test cases # the change in value is not permanent. cfg.CONF.set_default('auth_enable', False, group='pecan') WORKFLOW_LIST = """ --- version: '2.0' wf1: tags: [test, v2] type: reverse input: - param1 output: result: "{$.result}" tasks: task1: action: std.echo output="{$.param1}" publish: result: "{$}" wf2: type: direct output: result: "{$.result}" tasks: task1: workflow: my_wb.wf1 param1='Hi' task_name='task1' publish: result: "The result of subworkflow is '{$.final_result}'" """ UPDATED_WORKFLOW_LIST = """ --- version: '2.0' wf1: type: reverse input: - param1 - param2 output: result: "{$.result}" tasks: task1: action: std.echo output="{$.param1}{$.param2}" publish: result: "{$}" """ WORKFLOW_WITH_VAR_TASK_NAME = """ --- version: '2.0' engine_command_{task_name}: tasks: {task_name}: action: nova.servers_list """ WORKFLOW = WORKFLOW_WITH_VAR_TASK_NAME.format(task_name='task1') INVALID_WORKFLOW = """ --- verstion: '2.0' wf: tasks: task1: action: std.echo output="Task 1" """ INVALID_WORKFLOW_1 = """ --- version: '2.0' wf: tasks: task1: action: std.noop on-success: task2 # The task "task2" doesn't exist. task3: action: std.noop """ WORKFLOW_WITH_LONG_TASK_NAME = """ --- version: '2.0' test_workflow: tasks: {long_task_name}: action: std.noop """ WORKFLOW_WITH_LONG_JOIN_TASK_NAME = """ --- version: '2.0' test_workflow: tasks: task1: on-success: - {long_task_name} {long_task_name}: join: all """ WORKFLOWS_WITH_KEY_ORDER = """ --- version: '2.0' wf1: tasks: task1: publish: we: 1 dont_want: 2 to_be_sorted: 3 wf2: tasks: task1: action: std.noop """ class WorkflowServiceTest(base.DbTestCase): def test_create_workflows(self): db_wfs = wf_service.create_workflows(WORKFLOW_LIST) self.assertEqual(2, len(db_wfs)) # Workflow 1. wf1_db = self._assert_single_item(db_wfs, name='wf1') wf1_spec = spec_parser.get_workflow_spec(wf1_db.spec) self.assertEqual('wf1', wf1_spec.get_name()) self.assertListEqual(['test', 'v2'], wf1_spec.get_tags()) self.assertEqual('reverse', wf1_spec.get_type()) # Workflow 2. wf2_db = self._assert_single_item(db_wfs, name='wf2') wf2_spec = spec_parser.get_workflow_spec(wf2_db.spec) self.assertEqual('wf2', wf2_spec.get_name()) self.assertEqual('direct', wf2_spec.get_type()) def test_preserve_key_ordering_in_workflow_definition(self): db_wfs = wf_service.create_workflows(WORKFLOWS_WITH_KEY_ORDER) self.assertEqual(2, len(db_wfs)) wf1_db = self._assert_single_item(db_wfs, name='wf1') wf1_def = wf1_db.definition published_values = wf1_def.splitlines()[-3:] wf1_publish = [ item.strip() for item in published_values ] self.assertEqual( ['we: 1', 'dont_want: 2', 'to_be_sorted: 3'], wf1_publish ) def test_engine_commands_are_valid_task_names(self): for name in workflows.ENGINE_COMMANDS: wf_text = WORKFLOW_WITH_VAR_TASK_NAME.format(task_name=name) wf_defs = wf_service.create_workflows(wf_text) self.assertIsNotNone(wf_defs) self.assertEqual(1, len(wf_defs)) def test_update_workflows(self): db_wfs = wf_service.create_workflows(WORKFLOW_LIST) self.assertEqual(2, len(db_wfs)) # Workflow 1. wf1_db = self._assert_single_item(db_wfs, name='wf1') wf1_spec = spec_parser.get_workflow_spec(wf1_db.spec) self.assertEqual('wf1', wf1_spec.get_name()) self.assertEqual('reverse', wf1_spec.get_type()) self.assertIn('param1', wf1_spec.get_input()) self.assertIs( wf1_spec.get_input().get('param1'), utils.NotDefined ) db_wfs = wf_service.update_workflows(UPDATED_WORKFLOW_LIST) self.assertEqual(1, len(db_wfs)) wf1_db = self._assert_single_item(db_wfs, name='wf1') wf1_spec = spec_parser.get_workflow_spec(wf1_db.spec) self.assertEqual('wf1', wf1_spec.get_name()) self.assertListEqual([], wf1_spec.get_tags()) self.assertEqual('reverse', wf1_spec.get_type()) self.assertIn('param1', wf1_spec.get_input()) self.assertIn('param2', wf1_spec.get_input()) self.assertIs( wf1_spec.get_input().get('param1'), utils.NotDefined ) self.assertIs( wf1_spec.get_input().get('param2'), utils.NotDefined ) def test_update_non_existing_workflow_failed(self): exception = self.assertRaises( exc.DBEntityNotFoundError, wf_service.update_workflows, WORKFLOW ) self.assertIn("Workflow not found", str(exception)) def test_invalid_workflow_list(self): exception = self.assertRaises( exc.InvalidModelException, wf_service.create_workflows, INVALID_WORKFLOW ) self.assertIn("Invalid DSL", str(exception)) def test_update_workflow_execution_env(self): wf_exec_template = { 'spec': {}, 'start_params': {'task': 'my_task1'}, 'state': 'PAUSED', 'state_info': None, 'params': {'env': {'k1': 'abc'}}, 'created_at': None, 'updated_at': None, 'context': {'__env': {'k1': 'fee fi fo fum'}}, 'task_id': None, 'trust_id': None, 'description': None, 'output': None } states_permitted = [ states.IDLE, states.PAUSED, states.ERROR ] update_env = {'k1': 'foobar'} for state in states_permitted: wf_exec = copy.deepcopy(wf_exec_template) wf_exec['state'] = state with db_api.transaction(): created = db_api.create_workflow_execution(wf_exec) self.assertIsNone(created.updated_at) updated = wf_service.update_workflow_execution_env( created, update_env ) self.assertDictEqual(update_env, updated.params['env']) fetched = db_api.get_workflow_execution(created.id) self.assertEqual(updated, fetched) self.assertIsNotNone(fetched.updated_at) def test_update_workflow_execution_env_wrong_state(self): wf_exec_template = { 'spec': {}, 'start_params': {'task': 'my_task1'}, 'state': 'PAUSED', 'state_info': None, 'params': {'env': {'k1': 'abc'}}, 'created_at': None, 'updated_at': None, 'context': {'__env': {'k1': 'fee fi fo fum'}}, 'task_id': None, 'trust_id': None, 'description': None, 'output': None } states_not_permitted = [ states.RUNNING, states.RUNNING_DELAYED, states.SUCCESS, states.WAITING ] update_env = {'k1': 'foobar'} for state in states_not_permitted: wf_exec = copy.deepcopy(wf_exec_template) wf_exec['state'] = state with db_api.transaction(): created = db_api.create_workflow_execution(wf_exec) self.assertIsNone(created.updated_at) self.assertRaises( exc.NotAllowedException, wf_service.update_workflow_execution_env, created, update_env ) fetched = db_api.get_workflow_execution(created.id) self.assertDictEqual( wf_exec['params']['env'], fetched.params['env'] ) self.assertDictEqual( wf_exec['context']['__env'], fetched.context['__env'] ) def test_with_long_task_name(self): long_task_name = utils.generate_string(tasks.MAX_LENGTH_TASK_NAME + 1) workflow = WORKFLOW_WITH_LONG_TASK_NAME.format( long_task_name=long_task_name ) self.assertRaises( exc.InvalidModelException, wf_service.create_workflows, workflow ) def test_upper_bound_length_task_name(self): long_task_name = utils.generate_string(tasks.MAX_LENGTH_TASK_NAME) wf_text = WORKFLOW_WITH_LONG_TASK_NAME.format( long_task_name=long_task_name ) wf_defs = wf_service.create_workflows(wf_text) self.assertIsNotNone(wf_defs) self.assertEqual(1, len(wf_defs)) def test_with_long_join_task_name(self): long_task_name = utils.generate_string( tasks.MAX_LENGTH_JOIN_TASK_NAME + 1 ) wf_text = WORKFLOW_WITH_LONG_JOIN_TASK_NAME.format( long_task_name=long_task_name ) self.assertRaises( exc.InvalidModelException, wf_service.create_workflows, wf_text ) def test_upper_bound_length_join_task_name(self): long_task_name = utils.generate_string(tasks.MAX_LENGTH_JOIN_TASK_NAME) wf_text = WORKFLOW_WITH_LONG_JOIN_TASK_NAME.format( long_task_name=long_task_name ) wf_defs = wf_service.create_workflows(wf_text) self.assertIsNotNone(wf_defs) self.assertEqual(1, len(wf_defs)) def test_validation_mode_enabled_by_default(self): self.override_config('validation_mode', 'enabled', 'api') self.assertRaises( exc.InvalidModelException, wf_service.create_workflows, INVALID_WORKFLOW_1 ) wf_defs = wf_service.create_workflows( INVALID_WORKFLOW_1, validate=False ) # The workflow is created but it will never succeed since it's broken. self.assertIsNotNone(wf_defs) self.assertEqual(1, len(wf_defs)) def test_validation_mode_always_enabled(self): self.override_config('validation_mode', 'mandatory', 'api') self.assertRaises( exc.InvalidModelException, wf_service.create_workflows, INVALID_WORKFLOW_1 ) self.assertRaises( exc.InvalidModelException, wf_service.create_workflows, INVALID_WORKFLOW_1, validate=False ) def test_validation_mode_always_disabled(self): self.override_config('validation_mode', 'disabled', 'api') wf_defs = wf_service.create_workflows(INVALID_WORKFLOW_1) self.assertIsNotNone(wf_defs) self.assertEqual(1, len(wf_defs)) db_api.delete_workflow_definition(wf_defs[0].id) wf_service.create_workflows(INVALID_WORKFLOW_1, validate=True)