From 8f7a1a221a5fdf89a6be79f3bfe6b80fddaa8efb Mon Sep 17 00:00:00 2001 From: Renat Akhmerov Date: Thu, 17 Jul 2014 14:31:43 +0700 Subject: [PATCH] Initial commit for the new engine * Skeleton for the new engine * Main modules and interfaces (Engine, WorkflowHandler, policies) * Sketches of main engine methods * Reverse workflow handler * Function to check valid state transitions * Unit tests Change-Id: Id6ae293154bca7cab69965725ea6960d5f8d8790 --- mistral/engine1/__init__.py | 0 mistral/engine1/base.py | 126 +++++++++++ mistral/engine1/default_engine.py | 199 ++++++++++++++++++ mistral/engine1/policies.py | 25 +++ mistral/engine1/rpc.py | 156 ++++++++++++++ mistral/engine1/states.py | 61 ++++++ mistral/exceptions.py | 6 +- mistral/tests/unit/engine1/__init__.py | 0 .../tests/unit/engine1/test_default_engine.py | 26 +++ mistral/tests/unit/engine1/test_states.py | 72 +++++++ mistral/tests/workflow/__init__.py | 0 .../tests/workflow/test_direct_workflow.py | 38 ++++ .../tests/workflow/test_reverse_workflow.py | 38 ++++ mistral/workbook/v1/__init__.py | 0 mistral/workbook/v2/__init__.py | 0 mistral/workflow/__init__.py | 0 mistral/workflow/base.py | 121 +++++++++++ mistral/workflow/direct_workflow.py | 29 +++ mistral/workflow/reverse_workflow.py | 77 +++++++ mistral/workflow/selector.py | 33 +++ 20 files changed, 1006 insertions(+), 1 deletion(-) create mode 100644 mistral/engine1/__init__.py create mode 100644 mistral/engine1/base.py create mode 100644 mistral/engine1/default_engine.py create mode 100644 mistral/engine1/policies.py create mode 100644 mistral/engine1/rpc.py create mode 100644 mistral/engine1/states.py create mode 100644 mistral/tests/unit/engine1/__init__.py create mode 100644 mistral/tests/unit/engine1/test_default_engine.py create mode 100644 mistral/tests/unit/engine1/test_states.py create mode 100644 mistral/tests/workflow/__init__.py create mode 100644 mistral/tests/workflow/test_direct_workflow.py create mode 100644 mistral/tests/workflow/test_reverse_workflow.py create mode 100644 mistral/workbook/v1/__init__.py create mode 100644 mistral/workbook/v2/__init__.py create mode 100644 mistral/workflow/__init__.py create mode 100644 mistral/workflow/base.py create mode 100644 mistral/workflow/direct_workflow.py create mode 100644 mistral/workflow/reverse_workflow.py create mode 100644 mistral/workflow/selector.py diff --git a/mistral/engine1/__init__.py b/mistral/engine1/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/mistral/engine1/base.py b/mistral/engine1/base.py new file mode 100644 index 00000000..65b4f8d2 --- /dev/null +++ b/mistral/engine1/base.py @@ -0,0 +1,126 @@ +# -*- coding: utf-8 -*- +# +# Copyright 2014 - Mirantis, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +import abc +import six + + +class TaskResult(object): + """Explicit data structure containing a result of task execution.""" + + def __init__(self, data=None, error=None): + self.data = data + self.error = error + + def is_error(self): + return self.error is not None + + def is_success(self): + return not self.is_error() + + +@six.add_metaclass(abc.ABCMeta) +class Engine(object): + """Engine interface.""" + + @abc.abstractmethod + def start_workflow(self, workbook_name, workflow_name, task_name, input): + """Starts the specified workflow. + + :param workbook_name: Workbook name. + :param workflow_name: Workflow name. + :param task_name: Task name. + :param input: Workflow input data as a dictionary. + :return: Workflow execution object. + """ + raise NotImplemented + + @abc.abstractmethod + def on_task_result(self, task_id, task_result): + """Accepts workflow task raw result and continues the workflow. + + Task result here is a raw task result which comes from a corresponding + action/workflow associated which the task is associated with. + :param task_id: Task id. + :param task_result: Task result object. + :return: + """ + raise NotImplemented + + @abc.abstractmethod + def stop_workflow(self, execution_id): + """Stops workflow execution. + + :param execution_id: Execution id. + :return: Workflow execution object. + """ + raise NotImplemented + + @abc.abstractmethod + def resume_workflow(self, execution_id): + """Resumes workflow execution. + + :param execution_id: Execution id. + :return: Workflow execution object. + """ + raise NotImplemented + + @abc.abstractmethod + def rollback_workflow(self, execution_id): + """Rolls back workflow execution. + + :param execution_id: Execution id. + :return: Workflow execution object. + """ + raise NotImplemented + + +@six.add_metaclass(abc.ABCMeta) +class WorkflowPolicy(object): + """Workflow policy. + + Provides the interface to change the workflow state depending on certain + conditions. + """ + + @abc.abstractmethod + def on_task_finish(self, exec_db, task_db): + """Calculates workflow state after task completion. + + :param task_db: Completed task. + :return: New workflow state. + """ + raise NotImplemented + + +@six.add_metaclass(abc.ABCMeta) +class TaskPolicy(object): + """Task policy. + + Provides the interface to perform any work after a has completed. + An example of task policy may be 'retry' policy that makes engine + to run a task repeatedly if it finishes with a failure. + """ + + @abc.abstractmethod + def on_task_finish(self, task_db): + """Calculates workflow state after task completion. + + :param task_db: Completed task. + :return: New task state. + """ + raise NotImplemented diff --git a/mistral/engine1/default_engine.py b/mistral/engine1/default_engine.py new file mode 100644 index 00000000..7d1ba410 --- /dev/null +++ b/mistral/engine1/default_engine.py @@ -0,0 +1,199 @@ +# -*- coding: utf-8 -*- +# +# Copyright 2014 - Mirantis, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from oslo.config import cfg + +from mistral.db import api as db_api +from mistral.engine1 import base +from mistral.engine1 import states +from mistral import exceptions as exc +from mistral.openstack.common import log as logging +from mistral.workflow import selector as wf_selector + +LOG = logging.getLogger(__name__) +WF_TRACE = logging.getLogger(cfg.CONF.workflow_trace_log_name) + +# TODO(rakhmerov): Add necessary logging including WF_TRACE. +# TODO(rakhmerov): All is written here assuming data models are not dicts. + + +def _select_workflow_handler(exec_db): + handler_cls = wf_selector.select_workflow_handler(exec_db['wf_spec']) + + if not handler_cls: + msg = 'Failed to find a workflow handler [workflow=%s.%s]' % \ + (exec_db['wb_spec'].name, exec_db['wf_name']) + raise exc.EngineException(msg) + + return handler_cls(exec_db) + + +def _create_db_execution(workbook_name, workflow_name, task_name, input): + # TODO(rakhmerov): Change DB model attributes. + return db_api.execution_create(workbook_name, { + "workbook_name": workbook_name, + "workflow_name": workflow_name, + "task": task_name, + "state": states.RUNNING, + "input": input + }) + + +def _create_db_tasks(exec_db, task_specs): + tasks_db = [] + + for task_spec in task_specs: + task_db = db_api.task_create(exec_db.id, { + 'execution_id': exec_db.id, + 'name': task_spec.name, + 'state': states.IDLE, + 'specification': task_spec.to_dict(), + 'parameters': {}, # TODO(rakhmerov): Evaluate. + 'in_context': {}, # TODO(rakhmerov): Evaluate. + 'output': {}, # TODO(rakhmerov): Evaluate. + 'runtime_context': None + }) + + tasks_db.append(task_db) + + return tasks_db + + +def _apply_task_policies(task_db): + # TODO(rakhmerov): Implement. + pass + + +def _apply_workflow_policies(exec_db, task_db): + # TODO(rakhmerov): Implement. + pass + + +def _run_action(t): + # TODO(rakhmerov): Implement. + pass + + +def _run_workflow(t): + # TODO(rakhmerov): Implement. + pass + + +def _process(exec_db, task_specs): + LOG.debug('Processing workflow tasks: %s' % task_specs) + + tasks_db = _create_db_tasks(exec_db, task_specs) + + for t in tasks_db: + if t.action: + _run_action(t) + elif t.workflow: + _run_workflow(t) + else: + msg = "Neither 'action' nor 'workflow' is defined in task" \ + " specification [task_spec=%s]" % t + raise exc.WorkflowException(msg) + + +class DefaultEngine(base.Engine): + def start_workflow(self, workbook_name, workflow_name, task_name, input): + db_api.start_tx() + + try: + exec_db = _create_db_execution( + workbook_name, + workflow_name, + task_name, + input + ) + + wf_handler = _select_workflow_handler(exec_db) + + task_specs = wf_handler.start_workflow(task_name=task_name) + + if len(task_specs) > 0: + _process(exec_db, task_specs) + + db_api.commit_tx() + finally: + db_api.end_tx() + + return exec_db + + def on_task_result(self, task_id, task_result): + db_api.start_tx() + + try: + task_db = db_api.task_get(task_id) + exec_db = db_api.execution_get(task_db.execution_id) + + wf_handler = _select_workflow_handler(exec_db) + + task_specs = wf_handler.on_task_result( + task_db, + task_result + ) + + if len(task_specs) > 0: + _apply_task_policies(task_db) + _apply_workflow_policies(exec_db, task_db) + + _process(exec_db, task_specs) + + db_api.commit_tx() + finally: + db_api.end_tx() + + return task_db + + def stop_workflow(self, execution_id): + db_api.start_tx() + + try: + exec_db = db_api.execution_get(execution_id) + + wf_handler = _select_workflow_handler(exec_db) + + wf_handler.stop_workflow() + + db_api.commit_tx() + finally: + db_api.end_tx() + + return exec_db + + def resume_workflow(self, execution_id): + db_api.start_tx() + + try: + exec_db = db_api.execution_get(execution_id) + + wf_handler = _select_workflow_handler(exec_db) + + task_specs = wf_handler.resume_workflow() + + if len(task_specs) > 0: + _process(exec_db, task_specs) + + db_api.commit_tx() + finally: + db_api.end_tx() + + return exec_db + + def rollback_workflow(self, execution_id): + # TODO(rakhmerov): Implement. + raise NotImplemented diff --git a/mistral/engine1/policies.py b/mistral/engine1/policies.py new file mode 100644 index 00000000..158f8020 --- /dev/null +++ b/mistral/engine1/policies.py @@ -0,0 +1,25 @@ +# -*- coding: utf-8 -*- +# +# Copyright 2014 - Mirantis, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from mistral.engine1 import base + +# TODO(rakhmerov): Add all needed policies. + + +class RetryTaskPolicy(base.TaskPolicy): + def on_task_finish(self, task_db): + # TODO(rakhmerov): Implement. + raise NotImplemented diff --git a/mistral/engine1/rpc.py b/mistral/engine1/rpc.py new file mode 100644 index 00000000..63d5d6e5 --- /dev/null +++ b/mistral/engine1/rpc.py @@ -0,0 +1,156 @@ +# -*- coding: utf-8 -*- +# +# Copyright 2014 - Mirantis, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from oslo import messaging + +from mistral import context as auth_ctx +from mistral.openstack.common import log as logging + + +LOG = logging.getLogger(__name__) + + +# TODO(rakhmerov): Add engine and executor servers so that we don't need to +# adopt them to work with rpc (taking care about transport, signatures etc.). + +class EngineClient(object): + """RPC client for the Engine.""" + + def __init__(self, transport): + """Construct an RPC client for the Engine. + + :param transport: Messaging transport. + :type transport: Transport. + """ + serializer = auth_ctx.RpcContextSerializer( + auth_ctx.JsonPayloadSerializer()) + + # TODO(rakhmerov): Clarify topic. + target = messaging.Target( + topic='mistral.engine1.default_engine:DefaultEngine' + ) + + self._client = messaging.RPCClient( + transport, + target, + serializer=serializer + ) + + def start_workflow(self, workbook_name, workflow_name, task_name, input): + """Starts a workflow execution based on the specified workbook name + and target task. + + :param workbook_name: Workbook name. + :param task_name: Target task name. + :param input: Workflow input data. + :return: Workflow execution. + """ + kwargs = { + 'workbook_name': workbook_name, + 'workflow_name': workflow_name, + 'task_name': task_name, + 'input': input + } + + return self._client.call(auth_ctx.ctx(), 'start_workflow', **kwargs) + + def on_task_result(self, task_id, task_result): + """Conveys task result to Mistral Engine. + + This method should be used by clients of Mistral Engine to update + state of a task once task action has been performed. One of the + clients of this method is Mistral REST API server that receives + task result from the outside action handlers. + + Note: calling this method serves an event notifying Mistral that + it possibly needs to move the workflow on, i.e. run other workflow + tasks for which all dependencies are satisfied. + + :param task_id: Task id. + :param task_result: Task result data. + :return: Task. + """ + kwargs = { + 'task_id': task_id, + 'task_result': task_result + } + + return self._client.call(auth_ctx.ctx(), 'on_task_result', **kwargs) + + def stop_workflow(self, execution_id): + """Stops the workflow with the given execution id. + + :param execution_id: Workflow execution id. + :return: Workflow execution. + """ + kwargs = {'execution_id': execution_id} + + return self._client.call(auth_ctx.ctx(), 'stop_workflow', **kwargs) + + def resume_workflow(self, execution_id): + """Resumes the workflow with the given execution id. + + :param execution_id: Workflow execution id. + :return: Workflow execution. + """ + kwargs = {'execution_id': execution_id} + + return self._client.call(auth_ctx.ctx(), 'resume_workflow', **kwargs) + + def rollback_workflow(self, execution_id): + """Rolls back the workflow with the given execution id. + + :param execution_id: Workflow execution id. + :return: Workflow execution. + """ + kwargs = {'execution_id': execution_id} + + return self._client.call(auth_ctx.ctx(), 'rollback_workflow', **kwargs) + + +class ExecutorClient(object): + """RPC client for Executor.""" + + def __init__(self, transport): + """Construct an RPC client for the Executor. + + :param transport: Messaging transport. + :type transport: Transport. + """ + serializer = auth_ctx.RpcContextSerializer( + auth_ctx.JsonPayloadSerializer()) + + # TODO(rakhmerov): Clarify topic. + target = messaging.Target( + topic='mistral.engine1.default_engine:DefaultExecutor' + ) + + self._client = messaging.RPCClient( + transport, + target, + serializer=serializer + ) + + # TODO(rakhmerov): Most likely it will be a different method. + def handle_task(self, cntx, **kwargs): + """Send the task request to Executor for execution. + + :param cntx: a request context dict + :type cntx: MistralContext + :param kwargs: a dict of method arguments + :type kwargs: dict + """ + return self._client.cast(cntx, 'handle_task', **kwargs) diff --git a/mistral/engine1/states.py b/mistral/engine1/states.py new file mode 100644 index 00000000..af02a4dd --- /dev/null +++ b/mistral/engine1/states.py @@ -0,0 +1,61 @@ +# -*- coding: utf-8 -*- +# +# Copyright 2013 - Mirantis, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Valid task and workflow states.""" + +IDLE = 'IDLE' +RUNNING = 'RUNNING' +STOPPED = 'STOPPED' +DELAYED = 'DELAYED' +SUCCESS = 'SUCCESS' +ERROR = 'ERROR' + +_ALL = [IDLE, RUNNING, SUCCESS, ERROR, STOPPED, DELAYED] + +_VALID_TRANSITIONS = { + IDLE: [RUNNING, ERROR], + RUNNING: [STOPPED, DELAYED, SUCCESS, ERROR], + STOPPED: [RUNNING, ERROR], + DELAYED: [RUNNING, ERROR], + SUCCESS: [], + ERROR: [] +} + + +def is_valid(state): + return state in _ALL + + +def is_invalid(state): + return not is_valid(state) + + +def is_finished(state): + return state in [SUCCESS, ERROR] + + +def is_stopped_or_finished(state): + return state == STOPPED or is_finished(state) + + +def is_valid_transition(from_state, to_state): + if is_invalid(from_state) or is_invalid(to_state): + return False + + if from_state == to_state: + return True + + return to_state in _VALID_TRANSITIONS[from_state] diff --git a/mistral/exceptions.py b/mistral/exceptions.py index cacd359d..48cbbb8c 100644 --- a/mistral/exceptions.py +++ b/mistral/exceptions.py @@ -74,7 +74,11 @@ class ActionRegistrationException(MistralException): class EngineException(MistralException): - pass + http_code = 500 + + +class WorkflowException(MistralException): + http_code = 400 class ApplicationContextNotFoundException(MistralException): diff --git a/mistral/tests/unit/engine1/__init__.py b/mistral/tests/unit/engine1/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/mistral/tests/unit/engine1/test_default_engine.py b/mistral/tests/unit/engine1/test_default_engine.py new file mode 100644 index 00000000..09af5bcd --- /dev/null +++ b/mistral/tests/unit/engine1/test_default_engine.py @@ -0,0 +1,26 @@ +# -*- coding: utf-8 -*- +# +# Copyright 2013 - Mirantis, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from mistral.openstack.common import log as logging +from mistral.tests import base + +LOG = logging.getLogger(__name__) + + +class DefaultEngineTest(base.BaseTest): + def test_start_workflow(self): + # TODO(rakhmerov): Implement. + pass diff --git a/mistral/tests/unit/engine1/test_states.py b/mistral/tests/unit/engine1/test_states.py new file mode 100644 index 00000000..a30d5161 --- /dev/null +++ b/mistral/tests/unit/engine1/test_states.py @@ -0,0 +1,72 @@ +# -*- coding: utf-8 -*- +# +# Copyright 2013 - Mirantis, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from mistral.engine1 import states as s +from mistral.openstack.common import log as logging +from mistral.tests import base + +LOG = logging.getLogger(__name__) + + +class StatesModuleTest(base.BaseTest): + def test_is_valid_transition(self): + # From IDLE + self.assertTrue(s.is_valid_transition(s.IDLE, s.IDLE)) + self.assertTrue(s.is_valid_transition(s.IDLE, s.RUNNING)) + self.assertTrue(s.is_valid_transition(s.IDLE, s.ERROR)) + self.assertFalse(s.is_valid_transition(s.IDLE, s.STOPPED)) + self.assertFalse(s.is_valid_transition(s.IDLE, s.DELAYED)) + self.assertFalse(s.is_valid_transition(s.IDLE, s.SUCCESS)) + + # From RUNNING + self.assertTrue(s.is_valid_transition(s.RUNNING, s.RUNNING)) + self.assertTrue(s.is_valid_transition(s.RUNNING, s.ERROR)) + self.assertTrue(s.is_valid_transition(s.RUNNING, s.STOPPED)) + self.assertTrue(s.is_valid_transition(s.RUNNING, s.DELAYED)) + self.assertTrue(s.is_valid_transition(s.RUNNING, s.SUCCESS)) + self.assertFalse(s.is_valid_transition(s.RUNNING, s.IDLE)) + + # From STOPPED + self.assertTrue(s.is_valid_transition(s.STOPPED, s.STOPPED)) + self.assertTrue(s.is_valid_transition(s.STOPPED, s.RUNNING)) + self.assertTrue(s.is_valid_transition(s.STOPPED, s.ERROR)) + self.assertFalse(s.is_valid_transition(s.STOPPED, s.DELAYED)) + self.assertFalse(s.is_valid_transition(s.STOPPED, s.SUCCESS)) + self.assertFalse(s.is_valid_transition(s.STOPPED, s.IDLE)) + + # From DELAYED + self.assertTrue(s.is_valid_transition(s.DELAYED, s.DELAYED)) + self.assertTrue(s.is_valid_transition(s.DELAYED, s.RUNNING)) + self.assertTrue(s.is_valid_transition(s.DELAYED, s.ERROR)) + self.assertFalse(s.is_valid_transition(s.DELAYED, s.STOPPED)) + self.assertFalse(s.is_valid_transition(s.DELAYED, s.SUCCESS)) + self.assertFalse(s.is_valid_transition(s.DELAYED, s.IDLE)) + + # From SUCCESS + self.assertTrue(s.is_valid_transition(s.SUCCESS, s.SUCCESS)) + self.assertFalse(s.is_valid_transition(s.SUCCESS, s.RUNNING)) + self.assertFalse(s.is_valid_transition(s.SUCCESS, s.ERROR)) + self.assertFalse(s.is_valid_transition(s.SUCCESS, s.STOPPED)) + self.assertFalse(s.is_valid_transition(s.SUCCESS, s.DELAYED)) + self.assertFalse(s.is_valid_transition(s.SUCCESS, s.IDLE)) + + # From ERROR + self.assertTrue(s.is_valid_transition(s.ERROR, s.ERROR)) + self.assertFalse(s.is_valid_transition(s.ERROR, s.RUNNING)) + self.assertFalse(s.is_valid_transition(s.ERROR, s.STOPPED)) + self.assertFalse(s.is_valid_transition(s.ERROR, s.DELAYED)) + self.assertFalse(s.is_valid_transition(s.ERROR, s.SUCCESS)) + self.assertFalse(s.is_valid_transition(s.ERROR, s.IDLE)) diff --git a/mistral/tests/workflow/__init__.py b/mistral/tests/workflow/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/mistral/tests/workflow/test_direct_workflow.py b/mistral/tests/workflow/test_direct_workflow.py new file mode 100644 index 00000000..d5adf162 --- /dev/null +++ b/mistral/tests/workflow/test_direct_workflow.py @@ -0,0 +1,38 @@ +# -*- coding: utf-8 -*- +# +# Copyright 2013 - Mirantis, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from mistral.openstack.common import log as logging +from mistral.tests import base + +LOG = logging.getLogger(__name__) + + +class DirectWorkflowHandlerTest(base.BaseTest): + def test_start_workflow(self): + # TODO(rakhmerov): Implement. + pass + + def test_stop_workflow(self): + # TODO(rakhmerov): Implement. + pass + + def test_resume_workflow(self): + # TODO(rakhmerov): Implement. + pass + + def test_on_task_result_workflow(self): + # TODO(rakhmerov): Implement. + pass diff --git a/mistral/tests/workflow/test_reverse_workflow.py b/mistral/tests/workflow/test_reverse_workflow.py new file mode 100644 index 00000000..48b2053c --- /dev/null +++ b/mistral/tests/workflow/test_reverse_workflow.py @@ -0,0 +1,38 @@ +# -*- coding: utf-8 -*- +# +# Copyright 2013 - Mirantis, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from mistral.openstack.common import log as logging +from mistral.tests import base + +LOG = logging.getLogger(__name__) + + +class ReverseWorkflowHandlerTest(base.BaseTest): + def test_start_workflow(self): + # TODO(rakhmerov): Implement. + pass + + def test_stop_workflow(self): + # TODO(rakhmerov): Implement. + pass + + def test_resume_workflow(self): + # TODO(rakhmerov): Implement. + pass + + def test_on_task_result_workflow(self): + # TODO(rakhmerov): Implement. + pass diff --git a/mistral/workbook/v1/__init__.py b/mistral/workbook/v1/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/mistral/workbook/v2/__init__.py b/mistral/workbook/v2/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/mistral/workflow/__init__.py b/mistral/workflow/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/mistral/workflow/base.py b/mistral/workflow/base.py new file mode 100644 index 00000000..3bcd0ed1 --- /dev/null +++ b/mistral/workflow/base.py @@ -0,0 +1,121 @@ +# -*- coding: utf-8 -*- +# +# Copyright 2014 - Mirantis, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import abc +from mistral.engine1 import states +from mistral.openstack.common import log as logging + +LOG = logging.getLogger(__name__) + + +class WorkflowHandler(object): + """Workflow Handler base class. + + Different workflow handler implement different workflow algorithms. + In practice it may actually mean that there may be multiple ways of + describing workflow models (and even languages) that will be supported + by Mistral. + """ + + def __init__(self, exec_db): + """Creates new workflow handler. + + :param exec_db: Execution. + """ + self.exec_db = exec_db + + def is_stopped_or_finished(self): + return states.is_stopped_or_finished(self.exec_db.state) + + def stop_workflow(self): + """Stops workflow this handler is associated with. + + :return: Execution object. + """ + state = self.exec_db.state + + if states.is_valid_transition(state, states.STOPPED): + self.exec_db.state = states.STOPPED + + LOG.info('Stopped workflow [execution=%s]' % self.exec_db) + else: + LOG.info("Can't change workflow state [execution=%s," + " state=%s, new state=%s]" % + (self.exec_db, state, states.STOPPED)) + + return self.exec_db + + def resume_workflow(self): + """Resumes workflow this handler is associated with. + + :return: Tasks available to run. + """ + state = self.exec_db.state + + if states.is_valid_transition(state, states.RUNNING): + self.exec_db.state = states.RUNNING + + LOG.info('Resumed workflow [execution=%s]' % self.exec_db) + else: + LOG.info("Can't change workflow state [execution=%s," + " state=%s, new state=%s]" % + (self.exec_db, state, states.RUNNING)) + + # TODO(rakhmerov): A concrete handler should also find tasks to run. + + return [] + + @abc.abstractmethod + def start_workflow(self, **kwargs): + """Starts workflow. + + Given a workflow specification this method makes required analysis + according to this workflow type rules and identifies a list of + tasks that can be scheduled for execution. + :param kwargs: Additional parameters specific to workflow type. + :return: List of tasks that can be scheduled for execution. + """ + raise NotImplemented + + @abc.abstractmethod + def on_task_result(self, task, task_result): + """Handles event of arriving a task result. + + Given task result performs analysis of the workflow execution and + identifies tasks that can be scheduled for execution. + :param task: Task that the result corresponds to. + :param task_result: Task result. + :return List of tasks that can be scheduled for execution. + """ + raise NotImplemented + + +class FlowControl(object): + """Flow control structure. + + Expresses a control structure that influences the way how workflow + execution goes at a certain point. + """ + + def decide(self, upstream_tasks, downstream_tasks): + """Makes a decision in a form of changed states of downstream tasks. + + :param upstream_tasks: Upstream workflow tasks. + :param downstream_tasks: Downstream workflow tasks. + :return: Dictionary {task: state} for those tasks whose states + have changed. {task} is a subset of {downstream_tasks}. + """ + raise NotImplemented diff --git a/mistral/workflow/direct_workflow.py b/mistral/workflow/direct_workflow.py new file mode 100644 index 00000000..8b9b823d --- /dev/null +++ b/mistral/workflow/direct_workflow.py @@ -0,0 +1,29 @@ +# -*- coding: utf-8 -*- +# +# Copyright 2014 - Mirantis, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from mistral.workflow import base + + +class DirectWorkflowHandler(base.WorkflowHandler): + + @classmethod + def start_workflow(cls, workflow_spec, execution, **kwargs): + # TODO(rakhmerov): Implement. + raise NotImplemented + + def on_task_result(cls, workflow_spec, execution, task, task_result): + # TODO(rakhmerov): Implement. + raise NotImplemented diff --git a/mistral/workflow/reverse_workflow.py b/mistral/workflow/reverse_workflow.py new file mode 100644 index 00000000..43d75f5a --- /dev/null +++ b/mistral/workflow/reverse_workflow.py @@ -0,0 +1,77 @@ +# -*- coding: utf-8 -*- +# +# Copyright 2014 - Mirantis, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from mistral.engine1 import states +from mistral import exceptions as exc +from mistral.workflow import base + + +class ReverseWorkflowHandler(base.WorkflowHandler): + """'Reverse workflow' handler. + + This handler implements the workflow pattern which is based on + dependencies between tasks, i.e. each task in a workflow graph + may be dependent on other tasks. To run this type of workflow + user must specify a task name that serves a target node in the + graph that the algorithm should come to by resolving all + dependencies. + For example, if there's a workflow consisting of two tasks + 'A' and 'B' where 'A' depends on 'B' and if we specify a target + task name 'A' then the handler first will run task 'B' and then, + when a dependency of 'A' is resolved, will run task 'A'. + """ + + def start_workflow(self, **kwargs): + wf_spec = self.exec_db.wf_spec + + task_name = kwargs.get('task_name') + + if task_name not in wf_spec.tasks: + msg = 'Invalid task name [wf_spec=%s, task_name=%s]' % \ + (wf_spec, task_name) + raise exc.WorkflowException(msg) + + return self._find_tasks_with_no_dependencies(task_name) + + def on_task_result(self, task, task_result): + task.state = states.ERROR if task_result.is_error() else states.SUCCESS + task.output = task_result.data + + if task.state == states.ERROR: + # No need to check state transition since it's possible to switch + # to ERROR state from any other state. + self.exec_db.state = states.ERROR + return [] + + return self._find_resolved_tasks() + + def _find_tasks_with_no_dependencies(self, target_task_name): + """Given a target task name finds tasks with no dependencies. + + :param target_task_name: Name of the target task in the workflow graph + that dependencies are unwound from. + :return: Tasks with no dependencies. + """ + # TODO(rakhmerov): Implement. + raise NotImplemented + + def _find_resolved_tasks(self): + """Finds all tasks with resolved dependencies. + + :return: Tasks with resolved dependencies. + """ + # TODO(rakhmerov): Implement. + raise NotImplemented diff --git a/mistral/workflow/selector.py b/mistral/workflow/selector.py new file mode 100644 index 00000000..6980d12e --- /dev/null +++ b/mistral/workflow/selector.py @@ -0,0 +1,33 @@ +# -*- coding: utf-8 -*- +# +# Copyright 2014 - Mirantis, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from mistral.workflow import direct_workflow +from mistral.workflow import reverse_workflow + +# TODO(rakhmerov): Take DSL versions into account. + + +def select_workflow_handler(workflow_spec): + # TODO(rakhmerov): This algorithm is actually for DSL v2. + wf_type = workflow_spec.type or 'direct' + + if wf_type == 'reverse': + return reverse_workflow.ReverseWorkflowHandler + + if wf_type == 'direct': + return direct_workflow.DirectWorkflowHandler + + return None