From 6c5ca50b4c36a7565f433236947dc2bf0bdf5929 Mon Sep 17 00:00:00 2001 From: Winson Chan Date: Tue, 29 Apr 2014 16:39:34 -0700 Subject: [PATCH] Refactor engine to use plugins Refactor current engine process to instantiate the concrete engine using stevedore. The current engine uses importutils to load the concrete engine. Since the executor is tightly coupled to the engine, it is also refactored to use stevedore. The ScalableEngine is renamed to DefaultEngine and moved to the mistral.engine.drivers.default namespace. The setup.cfg file is updated to include entry points for the engine and executor plugins. Change-Id: Ia9d15b25ca96387e5ac22c1a86cffc7a816e92fd Implements: blueprint mistral-engine-plugin --- etc/mistral.conf.example | 4 +- mistral/api/hooks/engine.py | 3 +- mistral/cmd/launch.py | 8 +- mistral/config.py | 4 +- mistral/engine/__init__.py | 422 +++++++++++++++++- mistral/engine/abstract_engine.py | 323 -------------- mistral/engine/client.py | 123 ----- .../engine/{scalable => drivers}/__init__.py | 0 .../executor => drivers/default}/__init__.py | 0 .../{scalable => drivers/default}/engine.py | 29 +- .../server.py => drivers/default/executor.py} | 8 +- .../executor/client.py => executor.py} | 29 +- mistral/services/periodic.py | 3 +- .../api/v1/controllers/test_executions.py | 6 +- .../tests/api/v1/controllers/test_tasks.py | 4 +- mistral/tests/base.py | 38 +- .../engine/{scalable => default}/__init__.py | 0 .../{scalable => default}/test_engine.py | 20 +- .../{scalable => default}/test_executor.py | 4 +- mistral/tests/unit/engine/test_data_flow.py | 10 +- mistral/tests/unit/engine/test_task_retry.py | 10 +- setup.cfg | 4 + 22 files changed, 514 insertions(+), 538 deletions(-) delete mode 100644 mistral/engine/abstract_engine.py delete mode 100644 mistral/engine/client.py rename mistral/engine/{scalable => drivers}/__init__.py (100%) rename mistral/engine/{scalable/executor => drivers/default}/__init__.py (100%) rename mistral/engine/{scalable => drivers/default}/engine.py (71%) rename mistral/engine/{scalable/executor/server.py => drivers/default/executor.py} (95%) rename mistral/engine/{scalable/executor/client.py => executor.py} (68%) rename mistral/tests/unit/engine/{scalable => default}/__init__.py (100%) rename mistral/tests/unit/engine/{scalable => default}/test_engine.py (95%) rename mistral/tests/unit/engine/{scalable => default}/test_executor.py (97%) diff --git a/etc/mistral.conf.example b/etc/mistral.conf.example index fdcf2740..2b1861fb 100644 --- a/etc/mistral.conf.example +++ b/etc/mistral.conf.example @@ -35,8 +35,8 @@ host = 0.0.0.0 port = 8989 [engine] -# Mistral engine class (string value) -#engine=mistral.engine.scalable.engine +# Mistral engine plugin (string value) +#engine=default # Name of the engine node. This can be an opaque identifier. # It is not necessarily a hostname, FQDN, or IP address. (string value) diff --git a/mistral/api/hooks/engine.py b/mistral/api/hooks/engine.py index cf13980b..8605313a 100644 --- a/mistral/api/hooks/engine.py +++ b/mistral/api/hooks/engine.py @@ -15,7 +15,6 @@ from pecan.hooks import PecanHook from mistral import engine -from mistral.engine import client from mistral.openstack.common import log as logging @@ -26,7 +25,7 @@ class EngineHook(PecanHook): def __init__(self, transport=None): self.transport = engine.get_transport(transport) - self.engine = client.EngineClient(self.transport) + self.engine = engine.EngineClient(self.transport) def before(self, state): state.request.context['engine'] = self.engine diff --git a/mistral/cmd/launch.py b/mistral/cmd/launch.py index 7e72ec8e..223dbd4d 100755 --- a/mistral/cmd/launch.py +++ b/mistral/cmd/launch.py @@ -38,7 +38,7 @@ from oslo.config import cfg from mistral import config from mistral import engine -from mistral.engine.scalable.executor import server +from mistral.engine import executor from mistral.api import app from wsgiref import simple_server from mistral.openstack.common import log as logging @@ -50,7 +50,9 @@ LOG = logging.getLogger(__name__) def launch_executor(transport): target = messaging.Target(topic=cfg.CONF.executor.topic, server=cfg.CONF.executor.host) - endpoints = [server.Executor(transport)] + # Since engine and executor are tightly coupled, use the engine + # configuration to decide which executor to get. + endpoints = [executor.get_executor(cfg.CONF.engine.engine, transport)] ex_server = messaging.get_rpc_server( transport, target, endpoints, executor='eventlet') ex_server.start() @@ -60,7 +62,7 @@ def launch_executor(transport): def launch_engine(transport): target = messaging.Target(topic=cfg.CONF.engine.topic, server=cfg.CONF.engine.host) - endpoints = [engine.Engine(transport)] + endpoints = [engine.get_engine(cfg.CONF.engine.engine, transport)] en_server = messaging.get_rpc_server( transport, target, endpoints, executor='eventlet') en_server.start() diff --git a/mistral/config.py b/mistral/config.py index 99ac50a0..80ccb000 100644 --- a/mistral/config.py +++ b/mistral/config.py @@ -30,8 +30,8 @@ api_opts = [ ] engine_opts = [ - cfg.StrOpt('engine', default='mistral.engine.scalable.engine', - help='Mistral engine class'), + cfg.StrOpt('engine', default='default', + help='Mistral engine plugin'), cfg.StrOpt('host', default='0.0.0.0', help='Name of the engine node. This can be an opaque ' 'identifier. It is not necessarily a hostname, ' diff --git a/mistral/engine/__init__.py b/mistral/engine/__init__.py index 07fe7411..94ddaf9b 100644 --- a/mistral/engine/__init__.py +++ b/mistral/engine/__init__.py @@ -12,28 +12,62 @@ # See the License for the specific language governing permissions and # limitations under the License. +import abc +import copy +import eventlet + from oslo import messaging from oslo.config import cfg +from stevedore import driver +# If mistral.config is not imported here, nosetests will fail on import +# because workflow_trace_log_name is not registered. The use of importutils +# to import mistral.config instead of simply "from mistral import config" is +# to avoid pep8 error on module referenced but not used. +# TODO(m4dcoder): Refactor and clean up configuration registration. from mistral.openstack.common import importutils +importutils.import_module("mistral.config") + from mistral.openstack.common import log as logging +from mistral.db import api as db_api +from mistral import dsl_parser as parser +from mistral import exceptions as exc +from mistral.engine import states +from mistral.engine import workflow +from mistral.engine import data_flow +from mistral.engine import retry LOG = logging.getLogger(__name__) +WORKFLOW_TRACE = logging.getLogger(cfg.CONF.workflow_trace_log_name) def get_transport(transport=None): return (transport if transport else messaging.get_transport(cfg.CONF)) +def get_engine(name, transport): + mgr = driver.DriverManager( + namespace='mistral.engine.drivers', + name=name, + invoke_on_load=True, + invoke_kwds={'transport': transport}) + return mgr.driver + + class Engine(object): + """Abstract engine for workflow execution.""" + + __metaclass__ = abc.ABCMeta + + transport = None def __init__(self, transport=None): - module_name = cfg.CONF.engine.engine - module = importutils.import_module(module_name) self.transport = get_transport(transport) - self.backend = module.get_engine() - self.backend.transport = self.transport + + @abc.abstractmethod + def _run_tasks(cls, tasks): + raise NotImplementedError() def start_workflow_execution(self, cntx, **kwargs): """Starts a workflow execution based on the specified workbook name @@ -48,8 +82,47 @@ class Engine(object): workbook_name = kwargs.get('workbook_name') task_name = kwargs.get('task_name') context = kwargs.get('context', None) - return self.backend.start_workflow_execution( - workbook_name, task_name, context) + + context = copy.copy(context) if context else {} + + db_api.start_tx() + + WORKFLOW_TRACE.info("New execution started - [workbook_name = '%s', " + "task_name = '%s']" % (workbook_name, task_name)) + + # Persist execution and tasks in DB. + try: + workbook = self._get_workbook(workbook_name) + execution = self._create_execution(workbook_name, + task_name, + context) + + tasks = self._create_tasks( + workflow.find_workflow_tasks(workbook, task_name), + workbook, + workbook_name, execution['id'] + ) + + tasks_to_start, delayed_tasks = workflow.find_resolved_tasks(tasks) + + self._add_variables_to_data_flow_context(context, execution) + + data_flow.prepare_tasks(tasks_to_start, context) + + db_api.commit_tx() + except Exception as e: + LOG.exception("Failed to create necessary DB objects.") + raise exc.EngineException("Failed to create necessary DB objects:" + " %s" % e) + finally: + db_api.end_tx() + + for task in delayed_tasks: + self._schedule_run(workbook, task, context) + + self._run_tasks(tasks_to_start) + + return execution def stop_workflow_execution(self, cntx, **kwargs): """Stops the workflow execution with the given id. @@ -62,8 +135,9 @@ class Engine(object): """ workbook_name = kwargs.get('workbook_name') execution_id = kwargs.get('execution_id') - return self.backend.stop_workflow_execution( - workbook_name, execution_id) + + return db_api.execution_update(workbook_name, execution_id, + {"state": states.STOPPED}) def convey_task_result(self, cntx, **kwargs): """Conveys task result to Mistral Engine. @@ -88,8 +162,74 @@ class Engine(object): task_id = kwargs.get('task_id') state = kwargs.get('state') result = kwargs.get('result') - return self.backend.convey_task_result( - workbook_name, execution_id, task_id, state, result) + + db_api.start_tx() + + try: + workbook = self._get_workbook(workbook_name) + #TODO(rakhmerov): validate state transition + task = db_api.task_get(workbook_name, execution_id, task_id) + + wf_trace_msg = "Task '%s' [%s -> %s" % \ + (task['name'], task['state'], state) + + wf_trace_msg += ']' if state == states.ERROR \ + else ", result = %s]" % result + WORKFLOW_TRACE.info(wf_trace_msg) + + task_output = data_flow.get_task_output(task, result) + + # Update task state. + task, outbound_context = self._update_task(workbook, task, state, + task_output) + + execution = db_api.execution_get(workbook_name, execution_id) + + self._create_next_tasks(task, workbook) + + # Determine what tasks need to be started. + tasks = db_api.tasks_get(workbook_name, execution_id) + + new_exec_state = self._determine_execution_state(execution, tasks) + + if execution['state'] != new_exec_state: + wf_trace_msg = \ + "Execution '%s' [%s -> %s]" % \ + (execution_id, execution['state'], new_exec_state) + WORKFLOW_TRACE.info(wf_trace_msg) + + execution = \ + db_api.execution_update(workbook_name, execution_id, { + "state": new_exec_state + }) + + LOG.info("Changed execution state: %s" % execution) + + tasks_to_start, delayed_tasks = workflow.find_resolved_tasks(tasks) + + self._add_variables_to_data_flow_context(outbound_context, + execution) + + data_flow.prepare_tasks(tasks_to_start, outbound_context) + + db_api.commit_tx() + except Exception as e: + LOG.exception("Failed to create necessary DB objects.") + raise exc.EngineException("Failed to create necessary DB objects:" + " %s" % e) + finally: + db_api.end_tx() + + if states.is_stopped_or_finished(execution["state"]): + return task + + for task in delayed_tasks: + self._schedule_run(workbook, task, outbound_context) + + if tasks_to_start: + self._run_tasks(tasks_to_start) + + return task def get_workflow_execution_state(self, cntx, **kwargs): """Gets the workflow execution state. @@ -102,8 +242,15 @@ class Engine(object): """ workbook_name = kwargs.get('workbook_name') execution_id = kwargs.get('execution_id') - return self.backend.get_workflow_execution_state( - workbook_name, execution_id) + + execution = db_api.execution_get(workbook_name, execution_id) + + if not execution: + raise exc.EngineException("Workflow execution not found " + "[workbook_name=%s, execution_id=%s]" + % (workbook_name, execution_id)) + + return execution["state"] def get_task_state(self, cntx, **kwargs): """Gets task state. @@ -117,5 +264,252 @@ class Engine(object): workbook_name = kwargs.get('workbook_name') execution_id = kwargs.get('execution_id') task_id = kwargs.get('task_id') - return self.backend.get_task_state( - workbook_name, execution_id, task_id) + + task = db_api.task_get(workbook_name, execution_id, task_id) + + if not task: + raise exc.EngineException("Task not found.") + + return task["state"] + + @classmethod + def _create_execution(cls, workbook_name, task_name, context): + return db_api.execution_create(workbook_name, { + "workbook_name": workbook_name, + "task": task_name, + "state": states.RUNNING, + "context": context + }) + + @classmethod + def _add_variables_to_data_flow_context(cls, context, execution): + db_workbook = db_api.workbook_get(execution['workbook_name']) + + data_flow.add_token_to_context(context, db_workbook) + data_flow.add_execution_to_context(context, execution) + + @classmethod + def _create_next_tasks(cls, task, workbook): + tasks = workflow.find_tasks_after_completion(task, workbook) + + db_tasks = cls._create_tasks(tasks, workbook, task['workbook_name'], + task['execution_id']) + return workflow.find_resolved_tasks(db_tasks) + + @classmethod + def _create_tasks(cls, task_list, workbook, workbook_name, execution_id): + tasks = [] + + for task in task_list: + state, task_runtime_context = retry.get_task_runtime(task) + action_ns = workbook.namespaces.get(task.get_action_namespace()) + + action_spec = None + if action_ns: + action_spec = \ + action_ns.actions.get(task.get_action_name()) + + db_task = db_api.task_create(workbook_name, execution_id, { + "name": task.name, + "requires": task.requires, + "task_spec": task.to_dict(), + "action_spec": {} if not action_spec + else action_spec.to_dict(), + "state": state, + "tags": task.get_property("tags", None), + "task_runtime_context": task_runtime_context + }) + + tasks.append(db_task) + + return tasks + + @classmethod + def _get_workbook(cls, workbook_name): + wb = db_api.workbook_get(workbook_name) + return parser.get_workbook(wb["definition"]) + + @classmethod + def _determine_execution_state(cls, execution, tasks): + if workflow.is_error(tasks): + return states.ERROR + + if workflow.is_success(tasks) or workflow.is_finished(tasks): + return states.SUCCESS + + return execution['state'] + + @classmethod + def _update_task(cls, workbook, task, state, task_output): + """ + Update the task with the runtime information. The outbound_context + for this task is also calculated. + :return: task, outbound_context. task is the updated task and + computed outbound context. + """ + workbook_name = task['workbook_name'] + execution_id = task['execution_id'] + task_spec = workbook.tasks.get(task["name"]) + task_runtime_context = task["task_runtime_context"] + + # Compute the outbound_context, state and exec_flow_context. + outbound_context = data_flow.get_outbound_context(task, task_output) + state, task_runtime_context = retry.get_task_runtime( + task_spec, state, outbound_context, task_runtime_context) + + # Update the task. + update_values = {"state": state, + "output": task_output, + "task_runtime_context": task_runtime_context} + task = db_api.task_update(workbook_name, execution_id, task["id"], + update_values) + + return task, outbound_context + + def _schedule_run(cls, workbook, task, outbound_context): + """ + Schedules task to run after the delay defined in the task + specification. If no delay is specified this method is a no-op. + """ + + def run_delayed_task(): + """ + Runs the delayed task. Performs all the steps required to setup + a task to run which are not already done. This is mostly code + copied over from convey_task_result. + """ + db_api.start_tx() + try: + workbook_name = task['workbook_name'] + execution_id = task['execution_id'] + execution = db_api.execution_get(workbook_name, execution_id) + + # Change state from DELAYED to IDLE to unblock processing. + + WORKFLOW_TRACE.info("Task '%s' [%s -> %s]" + % (task['name'], + task['state'], states.IDLE)) + + db_task = db_api.task_update(workbook_name, + execution_id, + task['id'], + {"state": states.IDLE}) + task_to_start = [db_task] + data_flow.prepare_tasks(task_to_start, outbound_context) + db_api.commit_tx() + finally: + db_api.end_tx() + + if not states.is_stopped_or_finished(execution["state"]): + cls._run_tasks(task_to_start) + + task_spec = workbook.tasks.get(task['name']) + retries, break_on, delay_sec = task_spec.get_retry_parameters() + if delay_sec > 0: + # Run the task after the specified delay. + eventlet.spawn_after(delay_sec, run_delayed_task) + else: + LOG.warn("No delay specified for task(id=%s) name=%s. Not " + "scheduling for execution." % (task['id'], task['name'])) + + +class EngineClient(object): + """ + RPC client for the Engine. + """ + + def __init__(self, transport): + """Construct an RPC client for the Engine. + + :param transport: a messaging transport handle + :type transport: Transport + """ + target = messaging.Target(topic=cfg.CONF.engine.topic) + self._client = messaging.RPCClient(transport, target) + + def start_workflow_execution(self, workbook_name, task_name, context=None): + """Starts a workflow execution based on the specified workbook name + and target task. + + :param workbook_name: Workbook name + :param task_name: Target task name + :param context: Execution context which defines a workflow input + :return: Workflow execution. + """ + # TODO(m4dcoder): refactor auth context + cntx = {} + kwargs = {'workbook_name': workbook_name, + 'task_name': task_name, + 'context': context} + return self._client.call(cntx, 'start_workflow_execution', **kwargs) + + def stop_workflow_execution(self, workbook_name, execution_id): + """Stops the workflow execution with the given id. + + :param workbook_name: Workbook name. + :param execution_id: Workflow execution id. + :return: Workflow execution. + """ + # TODO(m4dcoder): refactor auth context + cntx = {} + kwargs = {'workbook_name': workbook_name, + 'execution_id': execution_id} + return self._client.call(cntx, 'stop_workflow_execution', **kwargs) + + def convey_task_result(self, workbook_name, execution_id, + task_id, state, result): + """Conveys task result to Mistral Engine. + + This method should be used by clients of Mistral Engine to update + state of a task once task action has been performed. One of the + clients of this method is Mistral REST API server that receives + task result from the outside action handlers. + + Note: calling this method serves an event notifying Mistral that + it possibly needs to move the workflow on, i.e. run other workflow + tasks for which all dependencies are satisfied. + + :param workbook_name: Workbook name. + :param execution_id: Workflow execution id. + :param task_id: Task id. + :param state: New task state. + :param result: Task result data. + :return: Task. + """ + # TODO(m4dcoder): refactor auth context + cntx = {} + kwargs = {'workbook_name': workbook_name, + 'execution_id': execution_id, + 'task_id': task_id, + 'state': state, + 'result': result} + return self._client.call(cntx, 'convey_task_result', **kwargs) + + def get_workflow_execution_state(self, workbook_name, execution_id): + """Gets the workflow execution state. + + :param workbook_name: Workbook name. + :param execution_id: Workflow execution id. + :return: Current workflow state. + """ + # TODO(m4dcoder): refactor auth context + cntx = {} + kwargs = {'workbook_name': workbook_name, + 'execution_id': execution_id} + return self._client.call( + cntx, 'get_workflow_execution_state', **kwargs) + + def get_task_state(self, workbook_name, execution_id, task_id): + """Gets task state. + + :param workbook_name: Workbook name. + :param execution_id: Workflow execution id. + :param task_id: Task id. + :return: Current task state. + """ + # TODO(m4dcoder): refactor auth context + cntx = {} + kwargs = {'workbook_name': workbook_name, + 'executioin_id': execution_id, + 'task_id': task_id} + return self._client.call(cntx, 'get_task_state', **kwargs) diff --git a/mistral/engine/abstract_engine.py b/mistral/engine/abstract_engine.py deleted file mode 100644 index d30d5390..00000000 --- a/mistral/engine/abstract_engine.py +++ /dev/null @@ -1,323 +0,0 @@ -# -*- coding: utf-8 -*- -# -# Copyright 2013 - Mirantis, Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import abc -import copy -import eventlet - -from oslo.config import cfg - -from mistral.openstack.common import log as logging -from mistral.db import api as db_api -from mistral import dsl_parser as parser -from mistral import exceptions as exc -from mistral.engine import states -from mistral.engine import workflow -from mistral.engine import data_flow -from mistral.engine import retry - - -LOG = logging.getLogger(__name__) -WORKFLOW_TRACE = logging.getLogger(cfg.CONF.workflow_trace_log_name) - - -class AbstractEngine(object): - transport = None - - @classmethod - @abc.abstractmethod - def _run_tasks(cls, tasks): - pass - - @classmethod - def start_workflow_execution(cls, workbook_name, task_name, context): - context = copy.copy(context) if context else {} - - db_api.start_tx() - - WORKFLOW_TRACE.info("New execution started - [workbook_name = '%s', " - "task_name = '%s']" % (workbook_name, task_name)) - - # Persist execution and tasks in DB. - try: - workbook = cls._get_workbook(workbook_name) - execution = cls._create_execution(workbook_name, - task_name, - context) - - tasks = cls._create_tasks( - workflow.find_workflow_tasks(workbook, task_name), - workbook, - workbook_name, execution['id'] - ) - - tasks_to_start, delayed_tasks = workflow.find_resolved_tasks(tasks) - - cls._add_variables_to_data_flow_context(context, execution) - - data_flow.prepare_tasks(tasks_to_start, context) - - db_api.commit_tx() - except Exception as e: - LOG.exception("Failed to create necessary DB objects.") - raise exc.EngineException("Failed to create necessary DB objects:" - " %s" % e) - finally: - db_api.end_tx() - - for task in delayed_tasks: - cls._schedule_run(workbook, task, context) - - cls._run_tasks(tasks_to_start) - - return execution - - @classmethod - def convey_task_result(cls, workbook_name, execution_id, - task_id, state, result): - db_api.start_tx() - - try: - workbook = cls._get_workbook(workbook_name) - #TODO(rakhmerov): validate state transition - task = db_api.task_get(workbook_name, execution_id, task_id) - - wf_trace_msg = "Task '%s' [%s -> %s" % \ - (task['name'], task['state'], state) - - wf_trace_msg += ']' if state == states.ERROR \ - else ", result = %s]" % result - WORKFLOW_TRACE.info(wf_trace_msg) - - task_output = data_flow.get_task_output(task, result) - - # Update task state. - task, outbound_context = cls._update_task(workbook, task, state, - task_output) - - execution = db_api.execution_get(workbook_name, execution_id) - - cls._create_next_tasks(task, workbook) - - # Determine what tasks need to be started. - tasks = db_api.tasks_get(workbook_name, execution_id) - - new_exec_state = cls._determine_execution_state(execution, tasks) - - if execution['state'] != new_exec_state: - wf_trace_msg = \ - "Execution '%s' [%s -> %s]" % \ - (execution_id, execution['state'], new_exec_state) - WORKFLOW_TRACE.info(wf_trace_msg) - - execution = \ - db_api.execution_update(workbook_name, execution_id, { - "state": new_exec_state - }) - - LOG.info("Changed execution state: %s" % execution) - - tasks_to_start, delayed_tasks = workflow.find_resolved_tasks(tasks) - - cls._add_variables_to_data_flow_context(outbound_context, - execution) - - data_flow.prepare_tasks(tasks_to_start, outbound_context) - - db_api.commit_tx() - except Exception as e: - LOG.exception("Failed to create necessary DB objects.") - raise exc.EngineException("Failed to create necessary DB objects:" - " %s" % e) - finally: - db_api.end_tx() - - if states.is_stopped_or_finished(execution["state"]): - return task - - for task in delayed_tasks: - cls._schedule_run(workbook, task, outbound_context) - - if tasks_to_start: - cls._run_tasks(tasks_to_start) - - return task - - @classmethod - def stop_workflow_execution(cls, workbook_name, execution_id): - return db_api.execution_update(workbook_name, execution_id, - {"state": states.STOPPED}) - - @classmethod - def get_workflow_execution_state(cls, workbook_name, execution_id): - execution = db_api.execution_get(workbook_name, execution_id) - - if not execution: - raise exc.EngineException("Workflow execution not found " - "[workbook_name=%s, execution_id=%s]" - % (workbook_name, execution_id)) - - return execution["state"] - - @classmethod - def get_task_state(cls, workbook_name, execution_id, task_id): - task = db_api.task_get(workbook_name, execution_id, task_id) - - if not task: - raise exc.EngineException("Task not found.") - - return task["state"] - - @classmethod - def _create_execution(cls, workbook_name, task_name, context): - return db_api.execution_create(workbook_name, { - "workbook_name": workbook_name, - "task": task_name, - "state": states.RUNNING, - "context": context - }) - - @classmethod - def _add_variables_to_data_flow_context(cls, context, execution): - db_workbook = db_api.workbook_get(execution['workbook_name']) - - data_flow.add_token_to_context(context, db_workbook) - data_flow.add_execution_to_context(context, execution) - - @classmethod - def _create_next_tasks(cls, task, workbook): - tasks = workflow.find_tasks_after_completion(task, workbook) - - db_tasks = cls._create_tasks(tasks, workbook, task['workbook_name'], - task['execution_id']) - return workflow.find_resolved_tasks(db_tasks) - - @classmethod - def _create_tasks(cls, task_list, workbook, workbook_name, execution_id): - tasks = [] - - for task in task_list: - state, task_runtime_context = retry.get_task_runtime(task) - action_ns = workbook.namespaces.get(task.get_action_namespace()) - - action_spec = None - if action_ns: - action_spec = \ - action_ns.actions.get(task.get_action_name()) - - db_task = db_api.task_create(workbook_name, execution_id, { - "name": task.name, - "requires": task.requires, - "task_spec": task.to_dict(), - "action_spec": {} if not action_spec - else action_spec.to_dict(), - "state": state, - "tags": task.get_property("tags", None), - "task_runtime_context": task_runtime_context - }) - - tasks.append(db_task) - - return tasks - - @classmethod - def _get_workbook(cls, workbook_name): - wb = db_api.workbook_get(workbook_name) - return parser.get_workbook(wb["definition"]) - - @classmethod - def _determine_execution_state(cls, execution, tasks): - if workflow.is_error(tasks): - return states.ERROR - - if workflow.is_success(tasks) or workflow.is_finished(tasks): - return states.SUCCESS - - return execution['state'] - - @classmethod - def _update_task(cls, workbook, task, state, task_output): - """ - Update the task with the runtime information. The outbound_context - for this task is also calculated. - :return: task, outbound_context. task is the updated task and - computed outbound context. - """ - workbook_name = task['workbook_name'] - execution_id = task['execution_id'] - task_spec = workbook.tasks.get(task["name"]) - task_runtime_context = task["task_runtime_context"] - - # Compute the outbound_context, state and exec_flow_context. - outbound_context = data_flow.get_outbound_context(task, task_output) - state, task_runtime_context = retry.get_task_runtime( - task_spec, state, outbound_context, task_runtime_context) - - # Update the task. - update_values = {"state": state, - "output": task_output, - "task_runtime_context": task_runtime_context} - task = db_api.task_update(workbook_name, execution_id, task["id"], - update_values) - - return task, outbound_context - - @classmethod - def _schedule_run(cls, workbook, task, outbound_context): - """ - Schedules task to run after the delay defined in the task - specification. If no delay is specified this method is a no-op. - """ - - def run_delayed_task(): - """ - Runs the delayed task. Performs all the steps required to setup - a task to run which are not already done. This is mostly code - copied over from convey_task_result. - """ - db_api.start_tx() - try: - workbook_name = task['workbook_name'] - execution_id = task['execution_id'] - execution = db_api.execution_get(workbook_name, execution_id) - - # Change state from DELAYED to IDLE to unblock processing. - - WORKFLOW_TRACE.info("Task '%s' [%s -> %s]" - % (task['name'], - task['state'], states.IDLE)) - - db_task = db_api.task_update(workbook_name, - execution_id, - task['id'], - {"state": states.IDLE}) - task_to_start = [db_task] - data_flow.prepare_tasks(task_to_start, outbound_context) - db_api.commit_tx() - finally: - db_api.end_tx() - - if not states.is_stopped_or_finished(execution["state"]): - cls._run_tasks(task_to_start) - - task_spec = workbook.tasks.get(task['name']) - retries, break_on, delay_sec = task_spec.get_retry_parameters() - if delay_sec > 0: - # Run the task after the specified delay. - eventlet.spawn_after(delay_sec, run_delayed_task) - else: - LOG.warn("No delay specified for task(id=%s) name=%s. Not " - "scheduling for execution." % (task['id'], task['name'])) diff --git a/mistral/engine/client.py b/mistral/engine/client.py deleted file mode 100644 index bd5a29e6..00000000 --- a/mistral/engine/client.py +++ /dev/null @@ -1,123 +0,0 @@ -# -*- coding: utf-8 -*- -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -from oslo import messaging -from oslo.config import cfg - -from mistral.openstack.common import log as logging - - -LOG = logging.getLogger(__name__) - - -class EngineClient(object): - """ - RPC client for the Engine. - """ - - def __init__(self, transport): - """Construct an RPC client for the Engine. - - :param transport: a messaging transport handle - :type transport: Transport - """ - target = messaging.Target(topic=cfg.CONF.engine.topic) - self._client = messaging.RPCClient(transport, target) - - def start_workflow_execution(self, workbook_name, task_name, context=None): - """Starts a workflow execution based on the specified workbook name - and target task. - - :param workbook_name: Workbook name - :param task_name: Target task name - :param context: Execution context which defines a workflow input - :return: Workflow execution. - """ - # TODO(m4dcoder): refactor auth context - cntx = {} - kwargs = {'workbook_name': workbook_name, - 'task_name': task_name, - 'context': context} - return self._client.call(cntx, 'start_workflow_execution', **kwargs) - - def stop_workflow_execution(self, workbook_name, execution_id): - """Stops the workflow execution with the given id. - - :param workbook_name: Workbook name. - :param execution_id: Workflow execution id. - :return: Workflow execution. - """ - # TODO(m4dcoder): refactor auth context - cntx = {} - kwargs = {'workbook_name': workbook_name, - 'execution_id': execution_id} - return self._client.call(cntx, 'stop_workflow_execution', **kwargs) - - def convey_task_result(self, workbook_name, execution_id, - task_id, state, result): - """Conveys task result to Mistral Engine. - - This method should be used by clients of Mistral Engine to update - state of a task once task action has been performed. One of the - clients of this method is Mistral REST API server that receives - task result from the outside action handlers. - - Note: calling this method serves an event notifying Mistral that - it possibly needs to move the workflow on, i.e. run other workflow - tasks for which all dependencies are satisfied. - - :param workbook_name: Workbook name. - :param execution_id: Workflow execution id. - :param task_id: Task id. - :param state: New task state. - :param result: Task result data. - :return: Task. - """ - # TODO(m4dcoder): refactor auth context - cntx = {} - kwargs = {'workbook_name': workbook_name, - 'execution_id': execution_id, - 'task_id': task_id, - 'state': state, - 'result': result} - return self._client.call(cntx, 'convey_task_result', **kwargs) - - def get_workflow_execution_state(self, workbook_name, execution_id): - """Gets the workflow execution state. - - :param workbook_name: Workbook name. - :param execution_id: Workflow execution id. - :return: Current workflow state. - """ - # TODO(m4dcoder): refactor auth context - cntx = {} - kwargs = {'workbook_name': workbook_name, - 'execution_id': execution_id} - return self._client.call( - cntx, 'get_workflow_execution_state', **kwargs) - - def get_task_state(self, workbook_name, execution_id, task_id): - """Gets task state. - - :param workbook_name: Workbook name. - :param execution_id: Workflow execution id. - :param task_id: Task id. - :return: Current task state. - """ - # TODO(m4dcoder): refactor auth context - cntx = {} - kwargs = {'workbook_name': workbook_name, - 'executioin_id': execution_id, - 'task_id': task_id} - return self._client.call(cntx, 'get_task_state', **kwargs) diff --git a/mistral/engine/scalable/__init__.py b/mistral/engine/drivers/__init__.py similarity index 100% rename from mistral/engine/scalable/__init__.py rename to mistral/engine/drivers/__init__.py diff --git a/mistral/engine/scalable/executor/__init__.py b/mistral/engine/drivers/default/__init__.py similarity index 100% rename from mistral/engine/scalable/executor/__init__.py rename to mistral/engine/drivers/default/__init__.py diff --git a/mistral/engine/scalable/engine.py b/mistral/engine/drivers/default/engine.py similarity index 71% rename from mistral/engine/scalable/engine.py rename to mistral/engine/drivers/default/engine.py index bd265389..42820e6d 100644 --- a/mistral/engine/scalable/engine.py +++ b/mistral/engine/drivers/default/engine.py @@ -1,7 +1,5 @@ # -*- coding: utf-8 -*- # -# Copyright 2013 - Mirantis, Inc. -# # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at @@ -16,29 +14,28 @@ from oslo import messaging from oslo.config import cfg + from mistral.openstack.common import log as logging -from mistral.engine.scalable.executor import client -from mistral.engine import abstract_engine as abs_eng +from mistral import engine +from mistral.engine import executor LOG = logging.getLogger(__name__) -class ScalableEngine(abs_eng.AbstractEngine): - @classmethod - def _notify_task_executors(cls, tasks): +class DefaultEngine(engine.Engine): + def _notify_task_executors(self, tasks): # TODO(m4dcoder): Use a pool for transport and client - if not cls.transport: - cls.transport = messaging.get_transport(cfg.CONF) - ex_client = client.ExecutorClient(cls.transport) + if not self.transport: + self.transport = messaging.get_transport(cfg.CONF) + exctr = executor.ExecutorClient(self.transport) for task in tasks: # TODO(m4dcoder): Fill request context argument with auth info context = {} - ex_client.handle_task(context, task=task) + exctr.handle_task(context, task=task) LOG.info("Submitted task for execution: '%s'" % task) - @classmethod - def _run_tasks(cls, tasks): + def _run_tasks(self, tasks): # TODO(rakhmerov): # This call outside of DB transaction creates a window # when the engine may crash and DB will not be consistent with @@ -47,8 +44,4 @@ class ScalableEngine(abs_eng.AbstractEngine): # However, making this call in DB transaction is really bad # since it makes transaction much longer in time and under load # may overload DB with open transactions. - cls._notify_task_executors(tasks) - - -def get_engine(): - return ScalableEngine + self._notify_task_executors(tasks) diff --git a/mistral/engine/scalable/executor/server.py b/mistral/engine/drivers/default/executor.py similarity index 95% rename from mistral/engine/scalable/executor/server.py rename to mistral/engine/drivers/default/executor.py index 2dad685f..90751118 100644 --- a/mistral/engine/scalable/executor/server.py +++ b/mistral/engine/drivers/default/executor.py @@ -19,9 +19,8 @@ from oslo.config import cfg from mistral.openstack.common import log as logging from mistral.db import api as db_api from mistral import exceptions as exc -from mistral import engine -from mistral.engine import client from mistral.engine import states +from mistral.engine import executor from mistral.actions import action_factory as a_f @@ -29,10 +28,7 @@ LOG = logging.getLogger(__name__) WORKFLOW_TRACE = logging.getLogger(cfg.CONF.workflow_trace_log_name) -class Executor(object): - def __init__(self, transport=None): - self.transport = engine.get_transport(transport) - self.engine = client.EngineClient(self.transport) +class DefaultExecutor(executor.Executor): def _do_task_action(self, task): """Executes the action defined by the task and return result. diff --git a/mistral/engine/scalable/executor/client.py b/mistral/engine/executor.py similarity index 68% rename from mistral/engine/scalable/executor/client.py rename to mistral/engine/executor.py index f189caa2..5e89389c 100644 --- a/mistral/engine/scalable/executor/client.py +++ b/mistral/engine/executor.py @@ -1,7 +1,5 @@ # -*- coding: utf-8 -*- # -# Copyright 2013 - Mirantis, Inc. -# # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at @@ -14,15 +12,42 @@ # See the License for the specific language governing permissions and # limitations under the License. +import abc + from oslo import messaging from oslo.config import cfg +from stevedore import driver from mistral.openstack.common import log as logging +from mistral import engine LOG = logging.getLogger(__name__) +def get_executor(name, transport): + mgr = driver.DriverManager( + namespace='mistral.executor.drivers', + name=name, + invoke_on_load=True, + invoke_kwds={'transport': transport}) + return mgr.driver + + +class Executor(object): + """Abstract class for task execution.""" + + __metaclass__ = abc.ABCMeta + + def __init__(self, transport=None): + self.transport = engine.get_transport(transport) + self.engine = engine.EngineClient(self.transport) + + @abc.abstractmethod + def handle_task(self, cntx, **kwargs): + raise NotImplementedError() + + class ExecutorClient(object): """ RPC client for the Executor. diff --git a/mistral/services/periodic.py b/mistral/services/periodic.py index 2925bf49..7a1a8504 100644 --- a/mistral/services/periodic.py +++ b/mistral/services/periodic.py @@ -16,7 +16,6 @@ from mistral.db import api as db_api from mistral import engine -from mistral.engine import client from mistral.openstack.common import log from mistral.openstack.common import periodic_task from mistral.openstack.common import threadgroup @@ -34,7 +33,7 @@ class MistralPeriodicTasks(periodic_task.PeriodicTasks): def __init__(self, transport=None): super(MistralPeriodicTasks, self).__init__() self.transport = engine.get_transport(transport) - self.engine = client.EngineClient(self.transport) + self.engine = engine.EngineClient(self.transport) @periodic_task.periodic_task(spacing=1, run_immediately=True) def scheduler_triggers(self, ctx): diff --git a/mistral/tests/api/v1/controllers/test_executions.py b/mistral/tests/api/v1/controllers/test_executions.py index fafaf57c..03e65274 100644 --- a/mistral/tests/api/v1/controllers/test_executions.py +++ b/mistral/tests/api/v1/controllers/test_executions.py @@ -21,7 +21,7 @@ from mistral import exceptions as ex from webtest.app import AppError from mistral.tests.api import base from mistral.db import api as db_api -from mistral.engine import client +from mistral import engine # TODO: later we need additional tests verifying all the errors etc. @@ -75,7 +75,7 @@ class TestExecutionsController(base.FunctionalTest): self.assertEqual(resp.status_int, 200) self.assertDictEqual(UPDATED_EXEC, canonize(resp.json)) - @mock.patch.object(client.EngineClient, 'start_workflow_execution', + @mock.patch.object(engine.EngineClient, 'start_workflow_execution', mock.MagicMock(return_value=EXECS[0])) def test_post(self): new_exec = EXECS[0].copy() @@ -86,7 +86,7 @@ class TestExecutionsController(base.FunctionalTest): self.assertEqual(resp.status_int, 201) self.assertDictEqual(EXECS[0], canonize(resp.json)) - @mock.patch.object(client.EngineClient, 'start_workflow_execution', + @mock.patch.object(engine.EngineClient, 'start_workflow_execution', mock.MagicMock(side_effect=ex.MistralException)) def test_post_throws_exception(self): with self.assertRaises(AppError) as context: diff --git a/mistral/tests/api/v1/controllers/test_tasks.py b/mistral/tests/api/v1/controllers/test_tasks.py index bd3bb38f..07a57956 100644 --- a/mistral/tests/api/v1/controllers/test_tasks.py +++ b/mistral/tests/api/v1/controllers/test_tasks.py @@ -18,7 +18,7 @@ import mock from mistral.tests.api import base from mistral.db import api as db_api -from mistral.engine import client +from mistral import engine # TODO: later we need additional tests verifying all the errors etc. @@ -48,7 +48,7 @@ class TestTasksController(base.FunctionalTest): self.assertEqual(resp.status_int, 200) self.assertDictEqual(TASKS[0], resp.json) - @mock.patch.object(client.EngineClient, "convey_task_result", + @mock.patch.object(engine.EngineClient, "convey_task_result", mock.MagicMock(return_value=UPDATED_TASK)) def test_put(self): resp = self.app.put_json( diff --git a/mistral/tests/base.py b/mistral/tests/base.py index 9ade353f..1469fb95 100644 --- a/mistral/tests/base.py +++ b/mistral/tests/base.py @@ -34,9 +34,8 @@ from mistral.db.sqlalchemy import api as db_api from mistral.openstack.common import log as logging from mistral.openstack.common.db.sqlalchemy import session from mistral import version -from mistral.engine import client -from mistral.engine.scalable import engine -from mistral.engine.scalable.executor import server +from mistral import engine +from mistral.engine import executor RESOURCES_PATH = 'tests/resources/' @@ -121,10 +120,11 @@ class DbTestCase(BaseTest): class EngineTestCase(DbTestCase): transport = get_fake_transport() + backend = engine.get_engine(cfg.CONF.engine.engine, transport) def __init__(self, *args, **kwargs): super(EngineTestCase, self).__init__(*args, **kwargs) - self.engine = client.EngineClient(self.transport) + self.engine = engine.EngineClient(self.transport) @classmethod def mock_task_result(cls, workbook_name, execution_id, @@ -133,8 +133,13 @@ class EngineTestCase(DbTestCase): Mock the engine convey_task_results to send request directly to the engine instead of going through the oslo.messaging transport. """ - return engine.ScalableEngine.convey_task_result( - workbook_name, execution_id, task_id, state, result) + cntx = {} + kwargs = {'workbook_name': workbook_name, + 'execution_id': execution_id, + 'task_id': task_id, + 'state': state, + 'result': result} + return cls.backend.convey_task_result(cntx, **kwargs) @classmethod def mock_start_workflow(cls, workbook_name, task_name, context=None): @@ -142,8 +147,11 @@ class EngineTestCase(DbTestCase): Mock the engine start_workflow_execution to send request directly to the engine instead of going through the oslo.messaging transport. """ - return engine.ScalableEngine.start_workflow_execution( - workbook_name, task_name, context) + cntx = {} + kwargs = {'workbook_name': workbook_name, + 'task_name': task_name, + 'context': context} + return cls.backend.start_workflow_execution(cntx, **kwargs) @classmethod def mock_get_workflow_state(cls, workbook_name, execution_id): @@ -151,8 +159,10 @@ class EngineTestCase(DbTestCase): Mock the engine get_workflow_execution_state to send request directly to the engine instead of going through the oslo.messaging transport. """ - return engine.ScalableEngine.get_workflow_execution_state( - workbook_name, execution_id) + cntx = {} + kwargs = {'workbook_name': workbook_name, + 'execution_id': execution_id} + return cls.backend.get_workflow_execution_state(cntx, **kwargs) @classmethod def mock_run_tasks(cls, tasks): @@ -160,9 +170,9 @@ class EngineTestCase(DbTestCase): Mock the engine _run_tasks to send requests directly to the task executor instead of going through the oslo.messaging transport. """ - executor = server.Executor(transport=cls.transport) + exctr = executor.get_executor(cfg.CONF.engine.engine, cls.transport) for task in tasks: - executor.handle_task({}, task=task) + exctr.handle_task({}, task=task) @classmethod def mock_handle_task(cls, cntx, **kwargs): @@ -170,5 +180,5 @@ class EngineTestCase(DbTestCase): Mock the executor handle_task to send requests directory to the task executor instead of going through the oslo.messaging transport. """ - executor = server.Executor(transport=cls.transport) - return executor.handle_task(cntx, **kwargs) + exctr = executor.get_executor(cfg.CONF.engine.engine, cls.transport) + return exctr.handle_task(cntx, **kwargs) diff --git a/mistral/tests/unit/engine/scalable/__init__.py b/mistral/tests/unit/engine/default/__init__.py similarity index 100% rename from mistral/tests/unit/engine/scalable/__init__.py rename to mistral/tests/unit/engine/default/__init__.py diff --git a/mistral/tests/unit/engine/scalable/test_engine.py b/mistral/tests/unit/engine/default/test_engine.py similarity index 95% rename from mistral/tests/unit/engine/scalable/test_engine.py rename to mistral/tests/unit/engine/default/test_engine.py index 1d1b0c89..3e814caf 100644 --- a/mistral/tests/unit/engine/scalable/test_engine.py +++ b/mistral/tests/unit/engine/default/test_engine.py @@ -22,9 +22,9 @@ from mistral.openstack.common import log as logging from mistral.db import api as db_api from mistral.actions import std_actions from mistral import expressions -from mistral.engine.scalable import engine +from mistral import engine from mistral.engine import states -from mistral.engine import client +from mistral.engine.drivers.default import engine as concrete_engine LOG = logging.getLogger(__name__) @@ -40,10 +40,10 @@ cfg.CONF.set_default('auth_enable', False, group='pecan') @mock.patch.object( - client.EngineClient, 'start_workflow_execution', + engine.EngineClient, 'start_workflow_execution', mock.MagicMock(side_effect=base.EngineTestCase.mock_start_workflow)) @mock.patch.object( - client.EngineClient, 'convey_task_result', + engine.EngineClient, 'convey_task_result', mock.MagicMock(side_effect=base.EngineTestCase.mock_task_result)) @mock.patch.object( db_api, 'workbook_get', @@ -54,7 +54,7 @@ cfg.CONF.set_default('auth_enable', False, group='pecan') mock.MagicMock(return_value={'state': states.SUCCESS})) class TestScalableEngine(base.EngineTestCase): @mock.patch.object( - engine.ScalableEngine, "_notify_task_executors", + concrete_engine.DefaultEngine, "_notify_task_executors", mock.MagicMock(return_value="")) def test_engine_one_task(self): execution = self.engine.start_workflow_execution(WB_NAME, "create-vms", @@ -72,11 +72,11 @@ class TestScalableEngine(base.EngineTestCase): self.assertEqual(task['state'], states.SUCCESS) @mock.patch.object( - client.EngineClient, 'get_workflow_execution_state', + engine.EngineClient, 'get_workflow_execution_state', mock.MagicMock( side_effect=base.EngineTestCase.mock_get_workflow_state)) @mock.patch.object( - engine.ScalableEngine, "_notify_task_executors", + concrete_engine.DefaultEngine, "_notify_task_executors", mock.MagicMock(return_value="")) def test_engine_multiple_tasks(self): execution = self.engine.start_workflow_execution(WB_NAME, "backup-vms", @@ -116,7 +116,7 @@ class TestScalableEngine(base.EngineTestCase): WB_NAME, execution['id'])) @mock.patch.object( - engine.ScalableEngine, '_run_tasks', + concrete_engine.DefaultEngine, '_run_tasks', mock.MagicMock(side_effect=base.EngineTestCase.mock_run_tasks)) @mock.patch.object( states, "get_state_by_http_status_code", @@ -135,7 +135,7 @@ class TestScalableEngine(base.EngineTestCase): self.assertEqual(task['state'], states.SUCCESS) @mock.patch.object( - engine.ScalableEngine, '_run_tasks', + concrete_engine.DefaultEngine, '_run_tasks', mock.MagicMock(side_effect=base.EngineTestCase.mock_run_tasks)) @mock.patch.object( expressions, "evaluate", mock.MagicMock(side_effect=lambda x, y: x)) @@ -201,7 +201,7 @@ class TestScalableEngine(base.EngineTestCase): self._assert_multiple_items(tasks, 4, state=states.SUCCESS) @mock.patch.object( - engine.ScalableEngine, '_run_tasks', + concrete_engine.DefaultEngine, '_run_tasks', mock.MagicMock(side_effect=base.EngineTestCase.mock_run_tasks)) @mock.patch.object( expressions, "evaluate", mock.MagicMock(side_effect=lambda x, y: x)) diff --git a/mistral/tests/unit/engine/scalable/test_executor.py b/mistral/tests/unit/engine/default/test_executor.py similarity index 97% rename from mistral/tests/unit/engine/scalable/test_executor.py rename to mistral/tests/unit/engine/default/test_executor.py index ad044d02..9433e458 100644 --- a/mistral/tests/unit/engine/scalable/test_executor.py +++ b/mistral/tests/unit/engine/default/test_executor.py @@ -28,8 +28,8 @@ from mistral.openstack.common import importutils from mistral.engine import states from mistral.db import api as db_api from mistral.actions import std_actions -from mistral.engine import client as engine -from mistral.engine.scalable.executor import client as executor +from mistral import engine +from mistral.engine import executor # We need to make sure that all configuration properties are registered. diff --git a/mistral/tests/unit/engine/test_data_flow.py b/mistral/tests/unit/engine/test_data_flow.py index e83d0f43..a331ae95 100644 --- a/mistral/tests/unit/engine/test_data_flow.py +++ b/mistral/tests/unit/engine/test_data_flow.py @@ -22,10 +22,10 @@ from oslo.config import cfg from mistral.openstack.common import log as logging from mistral.tests import base from mistral.db import api as db_api -from mistral.engine.scalable import engine from mistral.actions import std_actions +from mistral import engine from mistral.engine import states -from mistral.engine import client +from mistral.engine.drivers.default import engine as concrete_engine from mistral.utils.openstack import keystone @@ -62,13 +62,13 @@ def create_workbook(definition_path): @mock.patch.object( - client.EngineClient, 'start_workflow_execution', + engine.EngineClient, 'start_workflow_execution', mock.MagicMock(side_effect=base.EngineTestCase.mock_start_workflow)) @mock.patch.object( - client.EngineClient, 'convey_task_result', + engine.EngineClient, 'convey_task_result', mock.MagicMock(side_effect=base.EngineTestCase.mock_task_result)) @mock.patch.object( - engine.ScalableEngine, '_run_tasks', + concrete_engine.DefaultEngine, '_run_tasks', mock.MagicMock(side_effect=base.EngineTestCase.mock_run_tasks)) class DataFlowTest(base.EngineTestCase): def _check_in_context_execution(self, task): diff --git a/mistral/tests/unit/engine/test_task_retry.py b/mistral/tests/unit/engine/test_task_retry.py index 8604213a..80af9fbe 100644 --- a/mistral/tests/unit/engine/test_task_retry.py +++ b/mistral/tests/unit/engine/test_task_retry.py @@ -24,10 +24,10 @@ from mistral import exceptions as exc from mistral.openstack.common import log as logging from mistral.tests import base from mistral.db import api as db_api -from mistral.engine import client -from mistral.engine.scalable import engine from mistral.actions import std_actions +from mistral import engine from mistral.engine import states +from mistral.engine.drivers.default import engine as concrete_engine from mistral import dsl_parser as parser @@ -59,13 +59,13 @@ class FailBeforeSuccessMocker(object): @mock.patch.object( - client.EngineClient, 'start_workflow_execution', + engine.EngineClient, 'start_workflow_execution', mock.MagicMock(side_effect=base.EngineTestCase.mock_start_workflow)) @mock.patch.object( - client.EngineClient, 'convey_task_result', + engine.EngineClient, 'convey_task_result', mock.MagicMock(side_effect=base.EngineTestCase.mock_task_result)) @mock.patch.object( - engine.ScalableEngine, '_run_tasks', + concrete_engine.DefaultEngine, '_run_tasks', mock.MagicMock(side_effect=base.EngineTestCase.mock_run_tasks)) @mock.patch.object( db_api, 'workbook_get', diff --git a/setup.cfg b/setup.cfg index eb0f4571..0c5c765f 100644 --- a/setup.cfg +++ b/setup.cfg @@ -26,4 +26,8 @@ packages = console_scripts = mistral-server = mistral.cmd.launch:main +mistral.engine.drivers = + default = mistral.engine.drivers.default.engine:DefaultEngine +mistral.executor.drivers = + default = mistral.engine.drivers.default.executor:DefaultExecutor