diff --git a/etc/mistral.conf.example b/etc/mistral.conf.example index fdcf2740e..2b1861fb2 100644 --- a/etc/mistral.conf.example +++ b/etc/mistral.conf.example @@ -35,8 +35,8 @@ host = 0.0.0.0 port = 8989 [engine] -# Mistral engine class (string value) -#engine=mistral.engine.scalable.engine +# Mistral engine plugin (string value) +#engine=default # Name of the engine node. This can be an opaque identifier. # It is not necessarily a hostname, FQDN, or IP address. (string value) diff --git a/mistral/api/hooks/engine.py b/mistral/api/hooks/engine.py index cf13980b9..8605313a1 100644 --- a/mistral/api/hooks/engine.py +++ b/mistral/api/hooks/engine.py @@ -15,7 +15,6 @@ from pecan.hooks import PecanHook from mistral import engine -from mistral.engine import client from mistral.openstack.common import log as logging @@ -26,7 +25,7 @@ class EngineHook(PecanHook): def __init__(self, transport=None): self.transport = engine.get_transport(transport) - self.engine = client.EngineClient(self.transport) + self.engine = engine.EngineClient(self.transport) def before(self, state): state.request.context['engine'] = self.engine diff --git a/mistral/cmd/launch.py b/mistral/cmd/launch.py index 7e72ec8ec..223dbd4d3 100755 --- a/mistral/cmd/launch.py +++ b/mistral/cmd/launch.py @@ -38,7 +38,7 @@ from oslo.config import cfg from mistral import config from mistral import engine -from mistral.engine.scalable.executor import server +from mistral.engine import executor from mistral.api import app from wsgiref import simple_server from mistral.openstack.common import log as logging @@ -50,7 +50,9 @@ LOG = logging.getLogger(__name__) def launch_executor(transport): target = messaging.Target(topic=cfg.CONF.executor.topic, server=cfg.CONF.executor.host) - endpoints = [server.Executor(transport)] + # Since engine and executor are tightly coupled, use the engine + # configuration to decide which executor to get. + endpoints = [executor.get_executor(cfg.CONF.engine.engine, transport)] ex_server = messaging.get_rpc_server( transport, target, endpoints, executor='eventlet') ex_server.start() @@ -60,7 +62,7 @@ def launch_executor(transport): def launch_engine(transport): target = messaging.Target(topic=cfg.CONF.engine.topic, server=cfg.CONF.engine.host) - endpoints = [engine.Engine(transport)] + endpoints = [engine.get_engine(cfg.CONF.engine.engine, transport)] en_server = messaging.get_rpc_server( transport, target, endpoints, executor='eventlet') en_server.start() diff --git a/mistral/config.py b/mistral/config.py index 99ac50a00..80ccb0007 100644 --- a/mistral/config.py +++ b/mistral/config.py @@ -30,8 +30,8 @@ api_opts = [ ] engine_opts = [ - cfg.StrOpt('engine', default='mistral.engine.scalable.engine', - help='Mistral engine class'), + cfg.StrOpt('engine', default='default', + help='Mistral engine plugin'), cfg.StrOpt('host', default='0.0.0.0', help='Name of the engine node. This can be an opaque ' 'identifier. It is not necessarily a hostname, ' diff --git a/mistral/engine/__init__.py b/mistral/engine/__init__.py index 07fe7411d..94ddaf9b2 100644 --- a/mistral/engine/__init__.py +++ b/mistral/engine/__init__.py @@ -12,28 +12,62 @@ # See the License for the specific language governing permissions and # limitations under the License. +import abc +import copy +import eventlet + from oslo import messaging from oslo.config import cfg +from stevedore import driver +# If mistral.config is not imported here, nosetests will fail on import +# because workflow_trace_log_name is not registered. The use of importutils +# to import mistral.config instead of simply "from mistral import config" is +# to avoid pep8 error on module referenced but not used. +# TODO(m4dcoder): Refactor and clean up configuration registration. from mistral.openstack.common import importutils +importutils.import_module("mistral.config") + from mistral.openstack.common import log as logging +from mistral.db import api as db_api +from mistral import dsl_parser as parser +from mistral import exceptions as exc +from mistral.engine import states +from mistral.engine import workflow +from mistral.engine import data_flow +from mistral.engine import retry LOG = logging.getLogger(__name__) +WORKFLOW_TRACE = logging.getLogger(cfg.CONF.workflow_trace_log_name) def get_transport(transport=None): return (transport if transport else messaging.get_transport(cfg.CONF)) +def get_engine(name, transport): + mgr = driver.DriverManager( + namespace='mistral.engine.drivers', + name=name, + invoke_on_load=True, + invoke_kwds={'transport': transport}) + return mgr.driver + + class Engine(object): + """Abstract engine for workflow execution.""" + + __metaclass__ = abc.ABCMeta + + transport = None def __init__(self, transport=None): - module_name = cfg.CONF.engine.engine - module = importutils.import_module(module_name) self.transport = get_transport(transport) - self.backend = module.get_engine() - self.backend.transport = self.transport + + @abc.abstractmethod + def _run_tasks(cls, tasks): + raise NotImplementedError() def start_workflow_execution(self, cntx, **kwargs): """Starts a workflow execution based on the specified workbook name @@ -48,8 +82,47 @@ class Engine(object): workbook_name = kwargs.get('workbook_name') task_name = kwargs.get('task_name') context = kwargs.get('context', None) - return self.backend.start_workflow_execution( - workbook_name, task_name, context) + + context = copy.copy(context) if context else {} + + db_api.start_tx() + + WORKFLOW_TRACE.info("New execution started - [workbook_name = '%s', " + "task_name = '%s']" % (workbook_name, task_name)) + + # Persist execution and tasks in DB. + try: + workbook = self._get_workbook(workbook_name) + execution = self._create_execution(workbook_name, + task_name, + context) + + tasks = self._create_tasks( + workflow.find_workflow_tasks(workbook, task_name), + workbook, + workbook_name, execution['id'] + ) + + tasks_to_start, delayed_tasks = workflow.find_resolved_tasks(tasks) + + self._add_variables_to_data_flow_context(context, execution) + + data_flow.prepare_tasks(tasks_to_start, context) + + db_api.commit_tx() + except Exception as e: + LOG.exception("Failed to create necessary DB objects.") + raise exc.EngineException("Failed to create necessary DB objects:" + " %s" % e) + finally: + db_api.end_tx() + + for task in delayed_tasks: + self._schedule_run(workbook, task, context) + + self._run_tasks(tasks_to_start) + + return execution def stop_workflow_execution(self, cntx, **kwargs): """Stops the workflow execution with the given id. @@ -62,8 +135,9 @@ class Engine(object): """ workbook_name = kwargs.get('workbook_name') execution_id = kwargs.get('execution_id') - return self.backend.stop_workflow_execution( - workbook_name, execution_id) + + return db_api.execution_update(workbook_name, execution_id, + {"state": states.STOPPED}) def convey_task_result(self, cntx, **kwargs): """Conveys task result to Mistral Engine. @@ -88,8 +162,74 @@ class Engine(object): task_id = kwargs.get('task_id') state = kwargs.get('state') result = kwargs.get('result') - return self.backend.convey_task_result( - workbook_name, execution_id, task_id, state, result) + + db_api.start_tx() + + try: + workbook = self._get_workbook(workbook_name) + #TODO(rakhmerov): validate state transition + task = db_api.task_get(workbook_name, execution_id, task_id) + + wf_trace_msg = "Task '%s' [%s -> %s" % \ + (task['name'], task['state'], state) + + wf_trace_msg += ']' if state == states.ERROR \ + else ", result = %s]" % result + WORKFLOW_TRACE.info(wf_trace_msg) + + task_output = data_flow.get_task_output(task, result) + + # Update task state. + task, outbound_context = self._update_task(workbook, task, state, + task_output) + + execution = db_api.execution_get(workbook_name, execution_id) + + self._create_next_tasks(task, workbook) + + # Determine what tasks need to be started. + tasks = db_api.tasks_get(workbook_name, execution_id) + + new_exec_state = self._determine_execution_state(execution, tasks) + + if execution['state'] != new_exec_state: + wf_trace_msg = \ + "Execution '%s' [%s -> %s]" % \ + (execution_id, execution['state'], new_exec_state) + WORKFLOW_TRACE.info(wf_trace_msg) + + execution = \ + db_api.execution_update(workbook_name, execution_id, { + "state": new_exec_state + }) + + LOG.info("Changed execution state: %s" % execution) + + tasks_to_start, delayed_tasks = workflow.find_resolved_tasks(tasks) + + self._add_variables_to_data_flow_context(outbound_context, + execution) + + data_flow.prepare_tasks(tasks_to_start, outbound_context) + + db_api.commit_tx() + except Exception as e: + LOG.exception("Failed to create necessary DB objects.") + raise exc.EngineException("Failed to create necessary DB objects:" + " %s" % e) + finally: + db_api.end_tx() + + if states.is_stopped_or_finished(execution["state"]): + return task + + for task in delayed_tasks: + self._schedule_run(workbook, task, outbound_context) + + if tasks_to_start: + self._run_tasks(tasks_to_start) + + return task def get_workflow_execution_state(self, cntx, **kwargs): """Gets the workflow execution state. @@ -102,8 +242,15 @@ class Engine(object): """ workbook_name = kwargs.get('workbook_name') execution_id = kwargs.get('execution_id') - return self.backend.get_workflow_execution_state( - workbook_name, execution_id) + + execution = db_api.execution_get(workbook_name, execution_id) + + if not execution: + raise exc.EngineException("Workflow execution not found " + "[workbook_name=%s, execution_id=%s]" + % (workbook_name, execution_id)) + + return execution["state"] def get_task_state(self, cntx, **kwargs): """Gets task state. @@ -117,5 +264,252 @@ class Engine(object): workbook_name = kwargs.get('workbook_name') execution_id = kwargs.get('execution_id') task_id = kwargs.get('task_id') - return self.backend.get_task_state( - workbook_name, execution_id, task_id) + + task = db_api.task_get(workbook_name, execution_id, task_id) + + if not task: + raise exc.EngineException("Task not found.") + + return task["state"] + + @classmethod + def _create_execution(cls, workbook_name, task_name, context): + return db_api.execution_create(workbook_name, { + "workbook_name": workbook_name, + "task": task_name, + "state": states.RUNNING, + "context": context + }) + + @classmethod + def _add_variables_to_data_flow_context(cls, context, execution): + db_workbook = db_api.workbook_get(execution['workbook_name']) + + data_flow.add_token_to_context(context, db_workbook) + data_flow.add_execution_to_context(context, execution) + + @classmethod + def _create_next_tasks(cls, task, workbook): + tasks = workflow.find_tasks_after_completion(task, workbook) + + db_tasks = cls._create_tasks(tasks, workbook, task['workbook_name'], + task['execution_id']) + return workflow.find_resolved_tasks(db_tasks) + + @classmethod + def _create_tasks(cls, task_list, workbook, workbook_name, execution_id): + tasks = [] + + for task in task_list: + state, task_runtime_context = retry.get_task_runtime(task) + action_ns = workbook.namespaces.get(task.get_action_namespace()) + + action_spec = None + if action_ns: + action_spec = \ + action_ns.actions.get(task.get_action_name()) + + db_task = db_api.task_create(workbook_name, execution_id, { + "name": task.name, + "requires": task.requires, + "task_spec": task.to_dict(), + "action_spec": {} if not action_spec + else action_spec.to_dict(), + "state": state, + "tags": task.get_property("tags", None), + "task_runtime_context": task_runtime_context + }) + + tasks.append(db_task) + + return tasks + + @classmethod + def _get_workbook(cls, workbook_name): + wb = db_api.workbook_get(workbook_name) + return parser.get_workbook(wb["definition"]) + + @classmethod + def _determine_execution_state(cls, execution, tasks): + if workflow.is_error(tasks): + return states.ERROR + + if workflow.is_success(tasks) or workflow.is_finished(tasks): + return states.SUCCESS + + return execution['state'] + + @classmethod + def _update_task(cls, workbook, task, state, task_output): + """ + Update the task with the runtime information. The outbound_context + for this task is also calculated. + :return: task, outbound_context. task is the updated task and + computed outbound context. + """ + workbook_name = task['workbook_name'] + execution_id = task['execution_id'] + task_spec = workbook.tasks.get(task["name"]) + task_runtime_context = task["task_runtime_context"] + + # Compute the outbound_context, state and exec_flow_context. + outbound_context = data_flow.get_outbound_context(task, task_output) + state, task_runtime_context = retry.get_task_runtime( + task_spec, state, outbound_context, task_runtime_context) + + # Update the task. + update_values = {"state": state, + "output": task_output, + "task_runtime_context": task_runtime_context} + task = db_api.task_update(workbook_name, execution_id, task["id"], + update_values) + + return task, outbound_context + + def _schedule_run(cls, workbook, task, outbound_context): + """ + Schedules task to run after the delay defined in the task + specification. If no delay is specified this method is a no-op. + """ + + def run_delayed_task(): + """ + Runs the delayed task. Performs all the steps required to setup + a task to run which are not already done. This is mostly code + copied over from convey_task_result. + """ + db_api.start_tx() + try: + workbook_name = task['workbook_name'] + execution_id = task['execution_id'] + execution = db_api.execution_get(workbook_name, execution_id) + + # Change state from DELAYED to IDLE to unblock processing. + + WORKFLOW_TRACE.info("Task '%s' [%s -> %s]" + % (task['name'], + task['state'], states.IDLE)) + + db_task = db_api.task_update(workbook_name, + execution_id, + task['id'], + {"state": states.IDLE}) + task_to_start = [db_task] + data_flow.prepare_tasks(task_to_start, outbound_context) + db_api.commit_tx() + finally: + db_api.end_tx() + + if not states.is_stopped_or_finished(execution["state"]): + cls._run_tasks(task_to_start) + + task_spec = workbook.tasks.get(task['name']) + retries, break_on, delay_sec = task_spec.get_retry_parameters() + if delay_sec > 0: + # Run the task after the specified delay. + eventlet.spawn_after(delay_sec, run_delayed_task) + else: + LOG.warn("No delay specified for task(id=%s) name=%s. Not " + "scheduling for execution." % (task['id'], task['name'])) + + +class EngineClient(object): + """ + RPC client for the Engine. + """ + + def __init__(self, transport): + """Construct an RPC client for the Engine. + + :param transport: a messaging transport handle + :type transport: Transport + """ + target = messaging.Target(topic=cfg.CONF.engine.topic) + self._client = messaging.RPCClient(transport, target) + + def start_workflow_execution(self, workbook_name, task_name, context=None): + """Starts a workflow execution based on the specified workbook name + and target task. + + :param workbook_name: Workbook name + :param task_name: Target task name + :param context: Execution context which defines a workflow input + :return: Workflow execution. + """ + # TODO(m4dcoder): refactor auth context + cntx = {} + kwargs = {'workbook_name': workbook_name, + 'task_name': task_name, + 'context': context} + return self._client.call(cntx, 'start_workflow_execution', **kwargs) + + def stop_workflow_execution(self, workbook_name, execution_id): + """Stops the workflow execution with the given id. + + :param workbook_name: Workbook name. + :param execution_id: Workflow execution id. + :return: Workflow execution. + """ + # TODO(m4dcoder): refactor auth context + cntx = {} + kwargs = {'workbook_name': workbook_name, + 'execution_id': execution_id} + return self._client.call(cntx, 'stop_workflow_execution', **kwargs) + + def convey_task_result(self, workbook_name, execution_id, + task_id, state, result): + """Conveys task result to Mistral Engine. + + This method should be used by clients of Mistral Engine to update + state of a task once task action has been performed. One of the + clients of this method is Mistral REST API server that receives + task result from the outside action handlers. + + Note: calling this method serves an event notifying Mistral that + it possibly needs to move the workflow on, i.e. run other workflow + tasks for which all dependencies are satisfied. + + :param workbook_name: Workbook name. + :param execution_id: Workflow execution id. + :param task_id: Task id. + :param state: New task state. + :param result: Task result data. + :return: Task. + """ + # TODO(m4dcoder): refactor auth context + cntx = {} + kwargs = {'workbook_name': workbook_name, + 'execution_id': execution_id, + 'task_id': task_id, + 'state': state, + 'result': result} + return self._client.call(cntx, 'convey_task_result', **kwargs) + + def get_workflow_execution_state(self, workbook_name, execution_id): + """Gets the workflow execution state. + + :param workbook_name: Workbook name. + :param execution_id: Workflow execution id. + :return: Current workflow state. + """ + # TODO(m4dcoder): refactor auth context + cntx = {} + kwargs = {'workbook_name': workbook_name, + 'execution_id': execution_id} + return self._client.call( + cntx, 'get_workflow_execution_state', **kwargs) + + def get_task_state(self, workbook_name, execution_id, task_id): + """Gets task state. + + :param workbook_name: Workbook name. + :param execution_id: Workflow execution id. + :param task_id: Task id. + :return: Current task state. + """ + # TODO(m4dcoder): refactor auth context + cntx = {} + kwargs = {'workbook_name': workbook_name, + 'executioin_id': execution_id, + 'task_id': task_id} + return self._client.call(cntx, 'get_task_state', **kwargs) diff --git a/mistral/engine/abstract_engine.py b/mistral/engine/abstract_engine.py deleted file mode 100644 index d30d5390d..000000000 --- a/mistral/engine/abstract_engine.py +++ /dev/null @@ -1,323 +0,0 @@ -# -*- coding: utf-8 -*- -# -# Copyright 2013 - Mirantis, Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import abc -import copy -import eventlet - -from oslo.config import cfg - -from mistral.openstack.common import log as logging -from mistral.db import api as db_api -from mistral import dsl_parser as parser -from mistral import exceptions as exc -from mistral.engine import states -from mistral.engine import workflow -from mistral.engine import data_flow -from mistral.engine import retry - - -LOG = logging.getLogger(__name__) -WORKFLOW_TRACE = logging.getLogger(cfg.CONF.workflow_trace_log_name) - - -class AbstractEngine(object): - transport = None - - @classmethod - @abc.abstractmethod - def _run_tasks(cls, tasks): - pass - - @classmethod - def start_workflow_execution(cls, workbook_name, task_name, context): - context = copy.copy(context) if context else {} - - db_api.start_tx() - - WORKFLOW_TRACE.info("New execution started - [workbook_name = '%s', " - "task_name = '%s']" % (workbook_name, task_name)) - - # Persist execution and tasks in DB. - try: - workbook = cls._get_workbook(workbook_name) - execution = cls._create_execution(workbook_name, - task_name, - context) - - tasks = cls._create_tasks( - workflow.find_workflow_tasks(workbook, task_name), - workbook, - workbook_name, execution['id'] - ) - - tasks_to_start, delayed_tasks = workflow.find_resolved_tasks(tasks) - - cls._add_variables_to_data_flow_context(context, execution) - - data_flow.prepare_tasks(tasks_to_start, context) - - db_api.commit_tx() - except Exception as e: - LOG.exception("Failed to create necessary DB objects.") - raise exc.EngineException("Failed to create necessary DB objects:" - " %s" % e) - finally: - db_api.end_tx() - - for task in delayed_tasks: - cls._schedule_run(workbook, task, context) - - cls._run_tasks(tasks_to_start) - - return execution - - @classmethod - def convey_task_result(cls, workbook_name, execution_id, - task_id, state, result): - db_api.start_tx() - - try: - workbook = cls._get_workbook(workbook_name) - #TODO(rakhmerov): validate state transition - task = db_api.task_get(workbook_name, execution_id, task_id) - - wf_trace_msg = "Task '%s' [%s -> %s" % \ - (task['name'], task['state'], state) - - wf_trace_msg += ']' if state == states.ERROR \ - else ", result = %s]" % result - WORKFLOW_TRACE.info(wf_trace_msg) - - task_output = data_flow.get_task_output(task, result) - - # Update task state. - task, outbound_context = cls._update_task(workbook, task, state, - task_output) - - execution = db_api.execution_get(workbook_name, execution_id) - - cls._create_next_tasks(task, workbook) - - # Determine what tasks need to be started. - tasks = db_api.tasks_get(workbook_name, execution_id) - - new_exec_state = cls._determine_execution_state(execution, tasks) - - if execution['state'] != new_exec_state: - wf_trace_msg = \ - "Execution '%s' [%s -> %s]" % \ - (execution_id, execution['state'], new_exec_state) - WORKFLOW_TRACE.info(wf_trace_msg) - - execution = \ - db_api.execution_update(workbook_name, execution_id, { - "state": new_exec_state - }) - - LOG.info("Changed execution state: %s" % execution) - - tasks_to_start, delayed_tasks = workflow.find_resolved_tasks(tasks) - - cls._add_variables_to_data_flow_context(outbound_context, - execution) - - data_flow.prepare_tasks(tasks_to_start, outbound_context) - - db_api.commit_tx() - except Exception as e: - LOG.exception("Failed to create necessary DB objects.") - raise exc.EngineException("Failed to create necessary DB objects:" - " %s" % e) - finally: - db_api.end_tx() - - if states.is_stopped_or_finished(execution["state"]): - return task - - for task in delayed_tasks: - cls._schedule_run(workbook, task, outbound_context) - - if tasks_to_start: - cls._run_tasks(tasks_to_start) - - return task - - @classmethod - def stop_workflow_execution(cls, workbook_name, execution_id): - return db_api.execution_update(workbook_name, execution_id, - {"state": states.STOPPED}) - - @classmethod - def get_workflow_execution_state(cls, workbook_name, execution_id): - execution = db_api.execution_get(workbook_name, execution_id) - - if not execution: - raise exc.EngineException("Workflow execution not found " - "[workbook_name=%s, execution_id=%s]" - % (workbook_name, execution_id)) - - return execution["state"] - - @classmethod - def get_task_state(cls, workbook_name, execution_id, task_id): - task = db_api.task_get(workbook_name, execution_id, task_id) - - if not task: - raise exc.EngineException("Task not found.") - - return task["state"] - - @classmethod - def _create_execution(cls, workbook_name, task_name, context): - return db_api.execution_create(workbook_name, { - "workbook_name": workbook_name, - "task": task_name, - "state": states.RUNNING, - "context": context - }) - - @classmethod - def _add_variables_to_data_flow_context(cls, context, execution): - db_workbook = db_api.workbook_get(execution['workbook_name']) - - data_flow.add_token_to_context(context, db_workbook) - data_flow.add_execution_to_context(context, execution) - - @classmethod - def _create_next_tasks(cls, task, workbook): - tasks = workflow.find_tasks_after_completion(task, workbook) - - db_tasks = cls._create_tasks(tasks, workbook, task['workbook_name'], - task['execution_id']) - return workflow.find_resolved_tasks(db_tasks) - - @classmethod - def _create_tasks(cls, task_list, workbook, workbook_name, execution_id): - tasks = [] - - for task in task_list: - state, task_runtime_context = retry.get_task_runtime(task) - action_ns = workbook.namespaces.get(task.get_action_namespace()) - - action_spec = None - if action_ns: - action_spec = \ - action_ns.actions.get(task.get_action_name()) - - db_task = db_api.task_create(workbook_name, execution_id, { - "name": task.name, - "requires": task.requires, - "task_spec": task.to_dict(), - "action_spec": {} if not action_spec - else action_spec.to_dict(), - "state": state, - "tags": task.get_property("tags", None), - "task_runtime_context": task_runtime_context - }) - - tasks.append(db_task) - - return tasks - - @classmethod - def _get_workbook(cls, workbook_name): - wb = db_api.workbook_get(workbook_name) - return parser.get_workbook(wb["definition"]) - - @classmethod - def _determine_execution_state(cls, execution, tasks): - if workflow.is_error(tasks): - return states.ERROR - - if workflow.is_success(tasks) or workflow.is_finished(tasks): - return states.SUCCESS - - return execution['state'] - - @classmethod - def _update_task(cls, workbook, task, state, task_output): - """ - Update the task with the runtime information. The outbound_context - for this task is also calculated. - :return: task, outbound_context. task is the updated task and - computed outbound context. - """ - workbook_name = task['workbook_name'] - execution_id = task['execution_id'] - task_spec = workbook.tasks.get(task["name"]) - task_runtime_context = task["task_runtime_context"] - - # Compute the outbound_context, state and exec_flow_context. - outbound_context = data_flow.get_outbound_context(task, task_output) - state, task_runtime_context = retry.get_task_runtime( - task_spec, state, outbound_context, task_runtime_context) - - # Update the task. - update_values = {"state": state, - "output": task_output, - "task_runtime_context": task_runtime_context} - task = db_api.task_update(workbook_name, execution_id, task["id"], - update_values) - - return task, outbound_context - - @classmethod - def _schedule_run(cls, workbook, task, outbound_context): - """ - Schedules task to run after the delay defined in the task - specification. If no delay is specified this method is a no-op. - """ - - def run_delayed_task(): - """ - Runs the delayed task. Performs all the steps required to setup - a task to run which are not already done. This is mostly code - copied over from convey_task_result. - """ - db_api.start_tx() - try: - workbook_name = task['workbook_name'] - execution_id = task['execution_id'] - execution = db_api.execution_get(workbook_name, execution_id) - - # Change state from DELAYED to IDLE to unblock processing. - - WORKFLOW_TRACE.info("Task '%s' [%s -> %s]" - % (task['name'], - task['state'], states.IDLE)) - - db_task = db_api.task_update(workbook_name, - execution_id, - task['id'], - {"state": states.IDLE}) - task_to_start = [db_task] - data_flow.prepare_tasks(task_to_start, outbound_context) - db_api.commit_tx() - finally: - db_api.end_tx() - - if not states.is_stopped_or_finished(execution["state"]): - cls._run_tasks(task_to_start) - - task_spec = workbook.tasks.get(task['name']) - retries, break_on, delay_sec = task_spec.get_retry_parameters() - if delay_sec > 0: - # Run the task after the specified delay. - eventlet.spawn_after(delay_sec, run_delayed_task) - else: - LOG.warn("No delay specified for task(id=%s) name=%s. Not " - "scheduling for execution." % (task['id'], task['name'])) diff --git a/mistral/engine/client.py b/mistral/engine/client.py deleted file mode 100644 index bd5a29e65..000000000 --- a/mistral/engine/client.py +++ /dev/null @@ -1,123 +0,0 @@ -# -*- coding: utf-8 -*- -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -from oslo import messaging -from oslo.config import cfg - -from mistral.openstack.common import log as logging - - -LOG = logging.getLogger(__name__) - - -class EngineClient(object): - """ - RPC client for the Engine. - """ - - def __init__(self, transport): - """Construct an RPC client for the Engine. - - :param transport: a messaging transport handle - :type transport: Transport - """ - target = messaging.Target(topic=cfg.CONF.engine.topic) - self._client = messaging.RPCClient(transport, target) - - def start_workflow_execution(self, workbook_name, task_name, context=None): - """Starts a workflow execution based on the specified workbook name - and target task. - - :param workbook_name: Workbook name - :param task_name: Target task name - :param context: Execution context which defines a workflow input - :return: Workflow execution. - """ - # TODO(m4dcoder): refactor auth context - cntx = {} - kwargs = {'workbook_name': workbook_name, - 'task_name': task_name, - 'context': context} - return self._client.call(cntx, 'start_workflow_execution', **kwargs) - - def stop_workflow_execution(self, workbook_name, execution_id): - """Stops the workflow execution with the given id. - - :param workbook_name: Workbook name. - :param execution_id: Workflow execution id. - :return: Workflow execution. - """ - # TODO(m4dcoder): refactor auth context - cntx = {} - kwargs = {'workbook_name': workbook_name, - 'execution_id': execution_id} - return self._client.call(cntx, 'stop_workflow_execution', **kwargs) - - def convey_task_result(self, workbook_name, execution_id, - task_id, state, result): - """Conveys task result to Mistral Engine. - - This method should be used by clients of Mistral Engine to update - state of a task once task action has been performed. One of the - clients of this method is Mistral REST API server that receives - task result from the outside action handlers. - - Note: calling this method serves an event notifying Mistral that - it possibly needs to move the workflow on, i.e. run other workflow - tasks for which all dependencies are satisfied. - - :param workbook_name: Workbook name. - :param execution_id: Workflow execution id. - :param task_id: Task id. - :param state: New task state. - :param result: Task result data. - :return: Task. - """ - # TODO(m4dcoder): refactor auth context - cntx = {} - kwargs = {'workbook_name': workbook_name, - 'execution_id': execution_id, - 'task_id': task_id, - 'state': state, - 'result': result} - return self._client.call(cntx, 'convey_task_result', **kwargs) - - def get_workflow_execution_state(self, workbook_name, execution_id): - """Gets the workflow execution state. - - :param workbook_name: Workbook name. - :param execution_id: Workflow execution id. - :return: Current workflow state. - """ - # TODO(m4dcoder): refactor auth context - cntx = {} - kwargs = {'workbook_name': workbook_name, - 'execution_id': execution_id} - return self._client.call( - cntx, 'get_workflow_execution_state', **kwargs) - - def get_task_state(self, workbook_name, execution_id, task_id): - """Gets task state. - - :param workbook_name: Workbook name. - :param execution_id: Workflow execution id. - :param task_id: Task id. - :return: Current task state. - """ - # TODO(m4dcoder): refactor auth context - cntx = {} - kwargs = {'workbook_name': workbook_name, - 'executioin_id': execution_id, - 'task_id': task_id} - return self._client.call(cntx, 'get_task_state', **kwargs) diff --git a/mistral/engine/scalable/__init__.py b/mistral/engine/drivers/__init__.py similarity index 100% rename from mistral/engine/scalable/__init__.py rename to mistral/engine/drivers/__init__.py diff --git a/mistral/engine/scalable/executor/__init__.py b/mistral/engine/drivers/default/__init__.py similarity index 100% rename from mistral/engine/scalable/executor/__init__.py rename to mistral/engine/drivers/default/__init__.py diff --git a/mistral/engine/scalable/engine.py b/mistral/engine/drivers/default/engine.py similarity index 71% rename from mistral/engine/scalable/engine.py rename to mistral/engine/drivers/default/engine.py index bd2653897..42820e6d5 100644 --- a/mistral/engine/scalable/engine.py +++ b/mistral/engine/drivers/default/engine.py @@ -1,7 +1,5 @@ # -*- coding: utf-8 -*- # -# Copyright 2013 - Mirantis, Inc. -# # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at @@ -16,29 +14,28 @@ from oslo import messaging from oslo.config import cfg + from mistral.openstack.common import log as logging -from mistral.engine.scalable.executor import client -from mistral.engine import abstract_engine as abs_eng +from mistral import engine +from mistral.engine import executor LOG = logging.getLogger(__name__) -class ScalableEngine(abs_eng.AbstractEngine): - @classmethod - def _notify_task_executors(cls, tasks): +class DefaultEngine(engine.Engine): + def _notify_task_executors(self, tasks): # TODO(m4dcoder): Use a pool for transport and client - if not cls.transport: - cls.transport = messaging.get_transport(cfg.CONF) - ex_client = client.ExecutorClient(cls.transport) + if not self.transport: + self.transport = messaging.get_transport(cfg.CONF) + exctr = executor.ExecutorClient(self.transport) for task in tasks: # TODO(m4dcoder): Fill request context argument with auth info context = {} - ex_client.handle_task(context, task=task) + exctr.handle_task(context, task=task) LOG.info("Submitted task for execution: '%s'" % task) - @classmethod - def _run_tasks(cls, tasks): + def _run_tasks(self, tasks): # TODO(rakhmerov): # This call outside of DB transaction creates a window # when the engine may crash and DB will not be consistent with @@ -47,8 +44,4 @@ class ScalableEngine(abs_eng.AbstractEngine): # However, making this call in DB transaction is really bad # since it makes transaction much longer in time and under load # may overload DB with open transactions. - cls._notify_task_executors(tasks) - - -def get_engine(): - return ScalableEngine + self._notify_task_executors(tasks) diff --git a/mistral/engine/scalable/executor/server.py b/mistral/engine/drivers/default/executor.py similarity index 95% rename from mistral/engine/scalable/executor/server.py rename to mistral/engine/drivers/default/executor.py index 2dad685f3..907511187 100644 --- a/mistral/engine/scalable/executor/server.py +++ b/mistral/engine/drivers/default/executor.py @@ -19,9 +19,8 @@ from oslo.config import cfg from mistral.openstack.common import log as logging from mistral.db import api as db_api from mistral import exceptions as exc -from mistral import engine -from mistral.engine import client from mistral.engine import states +from mistral.engine import executor from mistral.actions import action_factory as a_f @@ -29,10 +28,7 @@ LOG = logging.getLogger(__name__) WORKFLOW_TRACE = logging.getLogger(cfg.CONF.workflow_trace_log_name) -class Executor(object): - def __init__(self, transport=None): - self.transport = engine.get_transport(transport) - self.engine = client.EngineClient(self.transport) +class DefaultExecutor(executor.Executor): def _do_task_action(self, task): """Executes the action defined by the task and return result. diff --git a/mistral/engine/scalable/executor/client.py b/mistral/engine/executor.py similarity index 68% rename from mistral/engine/scalable/executor/client.py rename to mistral/engine/executor.py index f189caa25..5e89389cf 100644 --- a/mistral/engine/scalable/executor/client.py +++ b/mistral/engine/executor.py @@ -1,7 +1,5 @@ # -*- coding: utf-8 -*- # -# Copyright 2013 - Mirantis, Inc. -# # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at @@ -14,15 +12,42 @@ # See the License for the specific language governing permissions and # limitations under the License. +import abc + from oslo import messaging from oslo.config import cfg +from stevedore import driver from mistral.openstack.common import log as logging +from mistral import engine LOG = logging.getLogger(__name__) +def get_executor(name, transport): + mgr = driver.DriverManager( + namespace='mistral.executor.drivers', + name=name, + invoke_on_load=True, + invoke_kwds={'transport': transport}) + return mgr.driver + + +class Executor(object): + """Abstract class for task execution.""" + + __metaclass__ = abc.ABCMeta + + def __init__(self, transport=None): + self.transport = engine.get_transport(transport) + self.engine = engine.EngineClient(self.transport) + + @abc.abstractmethod + def handle_task(self, cntx, **kwargs): + raise NotImplementedError() + + class ExecutorClient(object): """ RPC client for the Executor. diff --git a/mistral/services/periodic.py b/mistral/services/periodic.py index 2925bf49f..7a1a8504d 100644 --- a/mistral/services/periodic.py +++ b/mistral/services/periodic.py @@ -16,7 +16,6 @@ from mistral.db import api as db_api from mistral import engine -from mistral.engine import client from mistral.openstack.common import log from mistral.openstack.common import periodic_task from mistral.openstack.common import threadgroup @@ -34,7 +33,7 @@ class MistralPeriodicTasks(periodic_task.PeriodicTasks): def __init__(self, transport=None): super(MistralPeriodicTasks, self).__init__() self.transport = engine.get_transport(transport) - self.engine = client.EngineClient(self.transport) + self.engine = engine.EngineClient(self.transport) @periodic_task.periodic_task(spacing=1, run_immediately=True) def scheduler_triggers(self, ctx): diff --git a/mistral/tests/api/v1/controllers/test_executions.py b/mistral/tests/api/v1/controllers/test_executions.py index fafaf57c8..03e652743 100644 --- a/mistral/tests/api/v1/controllers/test_executions.py +++ b/mistral/tests/api/v1/controllers/test_executions.py @@ -21,7 +21,7 @@ from mistral import exceptions as ex from webtest.app import AppError from mistral.tests.api import base from mistral.db import api as db_api -from mistral.engine import client +from mistral import engine # TODO: later we need additional tests verifying all the errors etc. @@ -75,7 +75,7 @@ class TestExecutionsController(base.FunctionalTest): self.assertEqual(resp.status_int, 200) self.assertDictEqual(UPDATED_EXEC, canonize(resp.json)) - @mock.patch.object(client.EngineClient, 'start_workflow_execution', + @mock.patch.object(engine.EngineClient, 'start_workflow_execution', mock.MagicMock(return_value=EXECS[0])) def test_post(self): new_exec = EXECS[0].copy() @@ -86,7 +86,7 @@ class TestExecutionsController(base.FunctionalTest): self.assertEqual(resp.status_int, 201) self.assertDictEqual(EXECS[0], canonize(resp.json)) - @mock.patch.object(client.EngineClient, 'start_workflow_execution', + @mock.patch.object(engine.EngineClient, 'start_workflow_execution', mock.MagicMock(side_effect=ex.MistralException)) def test_post_throws_exception(self): with self.assertRaises(AppError) as context: diff --git a/mistral/tests/api/v1/controllers/test_tasks.py b/mistral/tests/api/v1/controllers/test_tasks.py index bd3bb38f0..07a579567 100644 --- a/mistral/tests/api/v1/controllers/test_tasks.py +++ b/mistral/tests/api/v1/controllers/test_tasks.py @@ -18,7 +18,7 @@ import mock from mistral.tests.api import base from mistral.db import api as db_api -from mistral.engine import client +from mistral import engine # TODO: later we need additional tests verifying all the errors etc. @@ -48,7 +48,7 @@ class TestTasksController(base.FunctionalTest): self.assertEqual(resp.status_int, 200) self.assertDictEqual(TASKS[0], resp.json) - @mock.patch.object(client.EngineClient, "convey_task_result", + @mock.patch.object(engine.EngineClient, "convey_task_result", mock.MagicMock(return_value=UPDATED_TASK)) def test_put(self): resp = self.app.put_json( diff --git a/mistral/tests/base.py b/mistral/tests/base.py index 9ade353f6..1469fb950 100644 --- a/mistral/tests/base.py +++ b/mistral/tests/base.py @@ -34,9 +34,8 @@ from mistral.db.sqlalchemy import api as db_api from mistral.openstack.common import log as logging from mistral.openstack.common.db.sqlalchemy import session from mistral import version -from mistral.engine import client -from mistral.engine.scalable import engine -from mistral.engine.scalable.executor import server +from mistral import engine +from mistral.engine import executor RESOURCES_PATH = 'tests/resources/' @@ -121,10 +120,11 @@ class DbTestCase(BaseTest): class EngineTestCase(DbTestCase): transport = get_fake_transport() + backend = engine.get_engine(cfg.CONF.engine.engine, transport) def __init__(self, *args, **kwargs): super(EngineTestCase, self).__init__(*args, **kwargs) - self.engine = client.EngineClient(self.transport) + self.engine = engine.EngineClient(self.transport) @classmethod def mock_task_result(cls, workbook_name, execution_id, @@ -133,8 +133,13 @@ class EngineTestCase(DbTestCase): Mock the engine convey_task_results to send request directly to the engine instead of going through the oslo.messaging transport. """ - return engine.ScalableEngine.convey_task_result( - workbook_name, execution_id, task_id, state, result) + cntx = {} + kwargs = {'workbook_name': workbook_name, + 'execution_id': execution_id, + 'task_id': task_id, + 'state': state, + 'result': result} + return cls.backend.convey_task_result(cntx, **kwargs) @classmethod def mock_start_workflow(cls, workbook_name, task_name, context=None): @@ -142,8 +147,11 @@ class EngineTestCase(DbTestCase): Mock the engine start_workflow_execution to send request directly to the engine instead of going through the oslo.messaging transport. """ - return engine.ScalableEngine.start_workflow_execution( - workbook_name, task_name, context) + cntx = {} + kwargs = {'workbook_name': workbook_name, + 'task_name': task_name, + 'context': context} + return cls.backend.start_workflow_execution(cntx, **kwargs) @classmethod def mock_get_workflow_state(cls, workbook_name, execution_id): @@ -151,8 +159,10 @@ class EngineTestCase(DbTestCase): Mock the engine get_workflow_execution_state to send request directly to the engine instead of going through the oslo.messaging transport. """ - return engine.ScalableEngine.get_workflow_execution_state( - workbook_name, execution_id) + cntx = {} + kwargs = {'workbook_name': workbook_name, + 'execution_id': execution_id} + return cls.backend.get_workflow_execution_state(cntx, **kwargs) @classmethod def mock_run_tasks(cls, tasks): @@ -160,9 +170,9 @@ class EngineTestCase(DbTestCase): Mock the engine _run_tasks to send requests directly to the task executor instead of going through the oslo.messaging transport. """ - executor = server.Executor(transport=cls.transport) + exctr = executor.get_executor(cfg.CONF.engine.engine, cls.transport) for task in tasks: - executor.handle_task({}, task=task) + exctr.handle_task({}, task=task) @classmethod def mock_handle_task(cls, cntx, **kwargs): @@ -170,5 +180,5 @@ class EngineTestCase(DbTestCase): Mock the executor handle_task to send requests directory to the task executor instead of going through the oslo.messaging transport. """ - executor = server.Executor(transport=cls.transport) - return executor.handle_task(cntx, **kwargs) + exctr = executor.get_executor(cfg.CONF.engine.engine, cls.transport) + return exctr.handle_task(cntx, **kwargs) diff --git a/mistral/tests/unit/engine/scalable/__init__.py b/mistral/tests/unit/engine/default/__init__.py similarity index 100% rename from mistral/tests/unit/engine/scalable/__init__.py rename to mistral/tests/unit/engine/default/__init__.py diff --git a/mistral/tests/unit/engine/scalable/test_engine.py b/mistral/tests/unit/engine/default/test_engine.py similarity index 95% rename from mistral/tests/unit/engine/scalable/test_engine.py rename to mistral/tests/unit/engine/default/test_engine.py index 1d1b0c89e..3e814caf8 100644 --- a/mistral/tests/unit/engine/scalable/test_engine.py +++ b/mistral/tests/unit/engine/default/test_engine.py @@ -22,9 +22,9 @@ from mistral.openstack.common import log as logging from mistral.db import api as db_api from mistral.actions import std_actions from mistral import expressions -from mistral.engine.scalable import engine +from mistral import engine from mistral.engine import states -from mistral.engine import client +from mistral.engine.drivers.default import engine as concrete_engine LOG = logging.getLogger(__name__) @@ -40,10 +40,10 @@ cfg.CONF.set_default('auth_enable', False, group='pecan') @mock.patch.object( - client.EngineClient, 'start_workflow_execution', + engine.EngineClient, 'start_workflow_execution', mock.MagicMock(side_effect=base.EngineTestCase.mock_start_workflow)) @mock.patch.object( - client.EngineClient, 'convey_task_result', + engine.EngineClient, 'convey_task_result', mock.MagicMock(side_effect=base.EngineTestCase.mock_task_result)) @mock.patch.object( db_api, 'workbook_get', @@ -54,7 +54,7 @@ cfg.CONF.set_default('auth_enable', False, group='pecan') mock.MagicMock(return_value={'state': states.SUCCESS})) class TestScalableEngine(base.EngineTestCase): @mock.patch.object( - engine.ScalableEngine, "_notify_task_executors", + concrete_engine.DefaultEngine, "_notify_task_executors", mock.MagicMock(return_value="")) def test_engine_one_task(self): execution = self.engine.start_workflow_execution(WB_NAME, "create-vms", @@ -72,11 +72,11 @@ class TestScalableEngine(base.EngineTestCase): self.assertEqual(task['state'], states.SUCCESS) @mock.patch.object( - client.EngineClient, 'get_workflow_execution_state', + engine.EngineClient, 'get_workflow_execution_state', mock.MagicMock( side_effect=base.EngineTestCase.mock_get_workflow_state)) @mock.patch.object( - engine.ScalableEngine, "_notify_task_executors", + concrete_engine.DefaultEngine, "_notify_task_executors", mock.MagicMock(return_value="")) def test_engine_multiple_tasks(self): execution = self.engine.start_workflow_execution(WB_NAME, "backup-vms", @@ -116,7 +116,7 @@ class TestScalableEngine(base.EngineTestCase): WB_NAME, execution['id'])) @mock.patch.object( - engine.ScalableEngine, '_run_tasks', + concrete_engine.DefaultEngine, '_run_tasks', mock.MagicMock(side_effect=base.EngineTestCase.mock_run_tasks)) @mock.patch.object( states, "get_state_by_http_status_code", @@ -135,7 +135,7 @@ class TestScalableEngine(base.EngineTestCase): self.assertEqual(task['state'], states.SUCCESS) @mock.patch.object( - engine.ScalableEngine, '_run_tasks', + concrete_engine.DefaultEngine, '_run_tasks', mock.MagicMock(side_effect=base.EngineTestCase.mock_run_tasks)) @mock.patch.object( expressions, "evaluate", mock.MagicMock(side_effect=lambda x, y: x)) @@ -201,7 +201,7 @@ class TestScalableEngine(base.EngineTestCase): self._assert_multiple_items(tasks, 4, state=states.SUCCESS) @mock.patch.object( - engine.ScalableEngine, '_run_tasks', + concrete_engine.DefaultEngine, '_run_tasks', mock.MagicMock(side_effect=base.EngineTestCase.mock_run_tasks)) @mock.patch.object( expressions, "evaluate", mock.MagicMock(side_effect=lambda x, y: x)) diff --git a/mistral/tests/unit/engine/scalable/test_executor.py b/mistral/tests/unit/engine/default/test_executor.py similarity index 97% rename from mistral/tests/unit/engine/scalable/test_executor.py rename to mistral/tests/unit/engine/default/test_executor.py index ad044d024..9433e458f 100644 --- a/mistral/tests/unit/engine/scalable/test_executor.py +++ b/mistral/tests/unit/engine/default/test_executor.py @@ -28,8 +28,8 @@ from mistral.openstack.common import importutils from mistral.engine import states from mistral.db import api as db_api from mistral.actions import std_actions -from mistral.engine import client as engine -from mistral.engine.scalable.executor import client as executor +from mistral import engine +from mistral.engine import executor # We need to make sure that all configuration properties are registered. diff --git a/mistral/tests/unit/engine/test_data_flow.py b/mistral/tests/unit/engine/test_data_flow.py index 97748cd7e..c6ecc62f7 100644 --- a/mistral/tests/unit/engine/test_data_flow.py +++ b/mistral/tests/unit/engine/test_data_flow.py @@ -22,10 +22,10 @@ from oslo.config import cfg from mistral.openstack.common import log as logging from mistral.tests import base from mistral.db import api as db_api -from mistral.engine.scalable import engine from mistral.actions import std_actions +from mistral import engine from mistral.engine import states -from mistral.engine import client +from mistral.engine.drivers.default import engine as concrete_engine from mistral.utils.openstack import keystone @@ -62,13 +62,13 @@ def create_workbook(definition_path): @mock.patch.object( - client.EngineClient, 'start_workflow_execution', + engine.EngineClient, 'start_workflow_execution', mock.MagicMock(side_effect=base.EngineTestCase.mock_start_workflow)) @mock.patch.object( - client.EngineClient, 'convey_task_result', + engine.EngineClient, 'convey_task_result', mock.MagicMock(side_effect=base.EngineTestCase.mock_task_result)) @mock.patch.object( - engine.ScalableEngine, '_run_tasks', + concrete_engine.DefaultEngine, '_run_tasks', mock.MagicMock(side_effect=base.EngineTestCase.mock_run_tasks)) class DataFlowTest(base.EngineTestCase): def _check_in_context_execution(self, task): diff --git a/mistral/tests/unit/engine/test_task_retry.py b/mistral/tests/unit/engine/test_task_retry.py index 8604213ae..80af9fbe8 100644 --- a/mistral/tests/unit/engine/test_task_retry.py +++ b/mistral/tests/unit/engine/test_task_retry.py @@ -24,10 +24,10 @@ from mistral import exceptions as exc from mistral.openstack.common import log as logging from mistral.tests import base from mistral.db import api as db_api -from mistral.engine import client -from mistral.engine.scalable import engine from mistral.actions import std_actions +from mistral import engine from mistral.engine import states +from mistral.engine.drivers.default import engine as concrete_engine from mistral import dsl_parser as parser @@ -59,13 +59,13 @@ class FailBeforeSuccessMocker(object): @mock.patch.object( - client.EngineClient, 'start_workflow_execution', + engine.EngineClient, 'start_workflow_execution', mock.MagicMock(side_effect=base.EngineTestCase.mock_start_workflow)) @mock.patch.object( - client.EngineClient, 'convey_task_result', + engine.EngineClient, 'convey_task_result', mock.MagicMock(side_effect=base.EngineTestCase.mock_task_result)) @mock.patch.object( - engine.ScalableEngine, '_run_tasks', + concrete_engine.DefaultEngine, '_run_tasks', mock.MagicMock(side_effect=base.EngineTestCase.mock_run_tasks)) @mock.patch.object( db_api, 'workbook_get', diff --git a/setup.cfg b/setup.cfg index 55de8e902..0c948fbdc 100644 --- a/setup.cfg +++ b/setup.cfg @@ -25,4 +25,8 @@ packages = console_scripts = mistral-server = mistral.cmd.launch:main +mistral.engine.drivers = + default = mistral.engine.drivers.default.engine:DefaultEngine +mistral.executor.drivers = + default = mistral.engine.drivers.default.executor:DefaultExecutor