Refactor engine to use plugins

Refactor current engine process to instantiate the concrete engine using
stevedore. The current engine uses importutils to load the concrete engine.
Since the executor is tightly coupled to the engine, it is also refactored
to use stevedore. The ScalableEngine is renamed to DefaultEngine and moved
to the mistral.engine.drivers.default namespace. The setup.cfg file is
updated to include entry points for the engine and executor plugins.

Change-Id: Ia9d15b25ca96387e5ac22c1a86cffc7a816e92fd
Implements: blueprint mistral-engine-plugin
This commit is contained in:
Winson Chan 2014-04-29 16:39:34 -07:00
parent 62d47c6d82
commit 6c5ca50b4c
22 changed files with 514 additions and 538 deletions

View File

@ -35,8 +35,8 @@ host = 0.0.0.0
port = 8989
[engine]
# Mistral engine class (string value)
#engine=mistral.engine.scalable.engine
# Mistral engine plugin (string value)
#engine=default
# Name of the engine node. This can be an opaque identifier.
# It is not necessarily a hostname, FQDN, or IP address. (string value)

View File

@ -15,7 +15,6 @@
from pecan.hooks import PecanHook
from mistral import engine
from mistral.engine import client
from mistral.openstack.common import log as logging
@ -26,7 +25,7 @@ class EngineHook(PecanHook):
def __init__(self, transport=None):
self.transport = engine.get_transport(transport)
self.engine = client.EngineClient(self.transport)
self.engine = engine.EngineClient(self.transport)
def before(self, state):
state.request.context['engine'] = self.engine

View File

@ -38,7 +38,7 @@ from oslo.config import cfg
from mistral import config
from mistral import engine
from mistral.engine.scalable.executor import server
from mistral.engine import executor
from mistral.api import app
from wsgiref import simple_server
from mistral.openstack.common import log as logging
@ -50,7 +50,9 @@ LOG = logging.getLogger(__name__)
def launch_executor(transport):
target = messaging.Target(topic=cfg.CONF.executor.topic,
server=cfg.CONF.executor.host)
endpoints = [server.Executor(transport)]
# Since engine and executor are tightly coupled, use the engine
# configuration to decide which executor to get.
endpoints = [executor.get_executor(cfg.CONF.engine.engine, transport)]
ex_server = messaging.get_rpc_server(
transport, target, endpoints, executor='eventlet')
ex_server.start()
@ -60,7 +62,7 @@ def launch_executor(transport):
def launch_engine(transport):
target = messaging.Target(topic=cfg.CONF.engine.topic,
server=cfg.CONF.engine.host)
endpoints = [engine.Engine(transport)]
endpoints = [engine.get_engine(cfg.CONF.engine.engine, transport)]
en_server = messaging.get_rpc_server(
transport, target, endpoints, executor='eventlet')
en_server.start()

View File

@ -30,8 +30,8 @@ api_opts = [
]
engine_opts = [
cfg.StrOpt('engine', default='mistral.engine.scalable.engine',
help='Mistral engine class'),
cfg.StrOpt('engine', default='default',
help='Mistral engine plugin'),
cfg.StrOpt('host', default='0.0.0.0',
help='Name of the engine node. This can be an opaque '
'identifier. It is not necessarily a hostname, '

View File

@ -12,28 +12,62 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import abc
import copy
import eventlet
from oslo import messaging
from oslo.config import cfg
from stevedore import driver
# If mistral.config is not imported here, nosetests will fail on import
# because workflow_trace_log_name is not registered. The use of importutils
# to import mistral.config instead of simply "from mistral import config" is
# to avoid pep8 error on module referenced but not used.
# TODO(m4dcoder): Refactor and clean up configuration registration.
from mistral.openstack.common import importutils
importutils.import_module("mistral.config")
from mistral.openstack.common import log as logging
from mistral.db import api as db_api
from mistral import dsl_parser as parser
from mistral import exceptions as exc
from mistral.engine import states
from mistral.engine import workflow
from mistral.engine import data_flow
from mistral.engine import retry
LOG = logging.getLogger(__name__)
WORKFLOW_TRACE = logging.getLogger(cfg.CONF.workflow_trace_log_name)
def get_transport(transport=None):
return (transport if transport else messaging.get_transport(cfg.CONF))
def get_engine(name, transport):
mgr = driver.DriverManager(
namespace='mistral.engine.drivers',
name=name,
invoke_on_load=True,
invoke_kwds={'transport': transport})
return mgr.driver
class Engine(object):
"""Abstract engine for workflow execution."""
__metaclass__ = abc.ABCMeta
transport = None
def __init__(self, transport=None):
module_name = cfg.CONF.engine.engine
module = importutils.import_module(module_name)
self.transport = get_transport(transport)
self.backend = module.get_engine()
self.backend.transport = self.transport
@abc.abstractmethod
def _run_tasks(cls, tasks):
raise NotImplementedError()
def start_workflow_execution(self, cntx, **kwargs):
"""Starts a workflow execution based on the specified workbook name
@ -48,8 +82,47 @@ class Engine(object):
workbook_name = kwargs.get('workbook_name')
task_name = kwargs.get('task_name')
context = kwargs.get('context', None)
return self.backend.start_workflow_execution(
workbook_name, task_name, context)
context = copy.copy(context) if context else {}
db_api.start_tx()
WORKFLOW_TRACE.info("New execution started - [workbook_name = '%s', "
"task_name = '%s']" % (workbook_name, task_name))
# Persist execution and tasks in DB.
try:
workbook = self._get_workbook(workbook_name)
execution = self._create_execution(workbook_name,
task_name,
context)
tasks = self._create_tasks(
workflow.find_workflow_tasks(workbook, task_name),
workbook,
workbook_name, execution['id']
)
tasks_to_start, delayed_tasks = workflow.find_resolved_tasks(tasks)
self._add_variables_to_data_flow_context(context, execution)
data_flow.prepare_tasks(tasks_to_start, context)
db_api.commit_tx()
except Exception as e:
LOG.exception("Failed to create necessary DB objects.")
raise exc.EngineException("Failed to create necessary DB objects:"
" %s" % e)
finally:
db_api.end_tx()
for task in delayed_tasks:
self._schedule_run(workbook, task, context)
self._run_tasks(tasks_to_start)
return execution
def stop_workflow_execution(self, cntx, **kwargs):
"""Stops the workflow execution with the given id.
@ -62,8 +135,9 @@ class Engine(object):
"""
workbook_name = kwargs.get('workbook_name')
execution_id = kwargs.get('execution_id')
return self.backend.stop_workflow_execution(
workbook_name, execution_id)
return db_api.execution_update(workbook_name, execution_id,
{"state": states.STOPPED})
def convey_task_result(self, cntx, **kwargs):
"""Conveys task result to Mistral Engine.
@ -88,8 +162,74 @@ class Engine(object):
task_id = kwargs.get('task_id')
state = kwargs.get('state')
result = kwargs.get('result')
return self.backend.convey_task_result(
workbook_name, execution_id, task_id, state, result)
db_api.start_tx()
try:
workbook = self._get_workbook(workbook_name)
#TODO(rakhmerov): validate state transition
task = db_api.task_get(workbook_name, execution_id, task_id)
wf_trace_msg = "Task '%s' [%s -> %s" % \
(task['name'], task['state'], state)
wf_trace_msg += ']' if state == states.ERROR \
else ", result = %s]" % result
WORKFLOW_TRACE.info(wf_trace_msg)
task_output = data_flow.get_task_output(task, result)
# Update task state.
task, outbound_context = self._update_task(workbook, task, state,
task_output)
execution = db_api.execution_get(workbook_name, execution_id)
self._create_next_tasks(task, workbook)
# Determine what tasks need to be started.
tasks = db_api.tasks_get(workbook_name, execution_id)
new_exec_state = self._determine_execution_state(execution, tasks)
if execution['state'] != new_exec_state:
wf_trace_msg = \
"Execution '%s' [%s -> %s]" % \
(execution_id, execution['state'], new_exec_state)
WORKFLOW_TRACE.info(wf_trace_msg)
execution = \
db_api.execution_update(workbook_name, execution_id, {
"state": new_exec_state
})
LOG.info("Changed execution state: %s" % execution)
tasks_to_start, delayed_tasks = workflow.find_resolved_tasks(tasks)
self._add_variables_to_data_flow_context(outbound_context,
execution)
data_flow.prepare_tasks(tasks_to_start, outbound_context)
db_api.commit_tx()
except Exception as e:
LOG.exception("Failed to create necessary DB objects.")
raise exc.EngineException("Failed to create necessary DB objects:"
" %s" % e)
finally:
db_api.end_tx()
if states.is_stopped_or_finished(execution["state"]):
return task
for task in delayed_tasks:
self._schedule_run(workbook, task, outbound_context)
if tasks_to_start:
self._run_tasks(tasks_to_start)
return task
def get_workflow_execution_state(self, cntx, **kwargs):
"""Gets the workflow execution state.
@ -102,8 +242,15 @@ class Engine(object):
"""
workbook_name = kwargs.get('workbook_name')
execution_id = kwargs.get('execution_id')
return self.backend.get_workflow_execution_state(
workbook_name, execution_id)
execution = db_api.execution_get(workbook_name, execution_id)
if not execution:
raise exc.EngineException("Workflow execution not found "
"[workbook_name=%s, execution_id=%s]"
% (workbook_name, execution_id))
return execution["state"]
def get_task_state(self, cntx, **kwargs):
"""Gets task state.
@ -117,5 +264,252 @@ class Engine(object):
workbook_name = kwargs.get('workbook_name')
execution_id = kwargs.get('execution_id')
task_id = kwargs.get('task_id')
return self.backend.get_task_state(
workbook_name, execution_id, task_id)
task = db_api.task_get(workbook_name, execution_id, task_id)
if not task:
raise exc.EngineException("Task not found.")
return task["state"]
@classmethod
def _create_execution(cls, workbook_name, task_name, context):
return db_api.execution_create(workbook_name, {
"workbook_name": workbook_name,
"task": task_name,
"state": states.RUNNING,
"context": context
})
@classmethod
def _add_variables_to_data_flow_context(cls, context, execution):
db_workbook = db_api.workbook_get(execution['workbook_name'])
data_flow.add_token_to_context(context, db_workbook)
data_flow.add_execution_to_context(context, execution)
@classmethod
def _create_next_tasks(cls, task, workbook):
tasks = workflow.find_tasks_after_completion(task, workbook)
db_tasks = cls._create_tasks(tasks, workbook, task['workbook_name'],
task['execution_id'])
return workflow.find_resolved_tasks(db_tasks)
@classmethod
def _create_tasks(cls, task_list, workbook, workbook_name, execution_id):
tasks = []
for task in task_list:
state, task_runtime_context = retry.get_task_runtime(task)
action_ns = workbook.namespaces.get(task.get_action_namespace())
action_spec = None
if action_ns:
action_spec = \
action_ns.actions.get(task.get_action_name())
db_task = db_api.task_create(workbook_name, execution_id, {
"name": task.name,
"requires": task.requires,
"task_spec": task.to_dict(),
"action_spec": {} if not action_spec
else action_spec.to_dict(),
"state": state,
"tags": task.get_property("tags", None),
"task_runtime_context": task_runtime_context
})
tasks.append(db_task)
return tasks
@classmethod
def _get_workbook(cls, workbook_name):
wb = db_api.workbook_get(workbook_name)
return parser.get_workbook(wb["definition"])
@classmethod
def _determine_execution_state(cls, execution, tasks):
if workflow.is_error(tasks):
return states.ERROR
if workflow.is_success(tasks) or workflow.is_finished(tasks):
return states.SUCCESS
return execution['state']
@classmethod
def _update_task(cls, workbook, task, state, task_output):
"""
Update the task with the runtime information. The outbound_context
for this task is also calculated.
:return: task, outbound_context. task is the updated task and
computed outbound context.
"""
workbook_name = task['workbook_name']
execution_id = task['execution_id']
task_spec = workbook.tasks.get(task["name"])
task_runtime_context = task["task_runtime_context"]
# Compute the outbound_context, state and exec_flow_context.
outbound_context = data_flow.get_outbound_context(task, task_output)
state, task_runtime_context = retry.get_task_runtime(
task_spec, state, outbound_context, task_runtime_context)
# Update the task.
update_values = {"state": state,
"output": task_output,
"task_runtime_context": task_runtime_context}
task = db_api.task_update(workbook_name, execution_id, task["id"],
update_values)
return task, outbound_context
def _schedule_run(cls, workbook, task, outbound_context):
"""
Schedules task to run after the delay defined in the task
specification. If no delay is specified this method is a no-op.
"""
def run_delayed_task():
"""
Runs the delayed task. Performs all the steps required to setup
a task to run which are not already done. This is mostly code
copied over from convey_task_result.
"""
db_api.start_tx()
try:
workbook_name = task['workbook_name']
execution_id = task['execution_id']
execution = db_api.execution_get(workbook_name, execution_id)
# Change state from DELAYED to IDLE to unblock processing.
WORKFLOW_TRACE.info("Task '%s' [%s -> %s]"
% (task['name'],
task['state'], states.IDLE))
db_task = db_api.task_update(workbook_name,
execution_id,
task['id'],
{"state": states.IDLE})
task_to_start = [db_task]
data_flow.prepare_tasks(task_to_start, outbound_context)
db_api.commit_tx()
finally:
db_api.end_tx()
if not states.is_stopped_or_finished(execution["state"]):
cls._run_tasks(task_to_start)
task_spec = workbook.tasks.get(task['name'])
retries, break_on, delay_sec = task_spec.get_retry_parameters()
if delay_sec > 0:
# Run the task after the specified delay.
eventlet.spawn_after(delay_sec, run_delayed_task)
else:
LOG.warn("No delay specified for task(id=%s) name=%s. Not "
"scheduling for execution." % (task['id'], task['name']))
class EngineClient(object):
"""
RPC client for the Engine.
"""
def __init__(self, transport):
"""Construct an RPC client for the Engine.
:param transport: a messaging transport handle
:type transport: Transport
"""
target = messaging.Target(topic=cfg.CONF.engine.topic)
self._client = messaging.RPCClient(transport, target)
def start_workflow_execution(self, workbook_name, task_name, context=None):
"""Starts a workflow execution based on the specified workbook name
and target task.
:param workbook_name: Workbook name
:param task_name: Target task name
:param context: Execution context which defines a workflow input
:return: Workflow execution.
"""
# TODO(m4dcoder): refactor auth context
cntx = {}
kwargs = {'workbook_name': workbook_name,
'task_name': task_name,
'context': context}
return self._client.call(cntx, 'start_workflow_execution', **kwargs)
def stop_workflow_execution(self, workbook_name, execution_id):
"""Stops the workflow execution with the given id.
:param workbook_name: Workbook name.
:param execution_id: Workflow execution id.
:return: Workflow execution.
"""
# TODO(m4dcoder): refactor auth context
cntx = {}
kwargs = {'workbook_name': workbook_name,
'execution_id': execution_id}
return self._client.call(cntx, 'stop_workflow_execution', **kwargs)
def convey_task_result(self, workbook_name, execution_id,
task_id, state, result):
"""Conveys task result to Mistral Engine.
This method should be used by clients of Mistral Engine to update
state of a task once task action has been performed. One of the
clients of this method is Mistral REST API server that receives
task result from the outside action handlers.
Note: calling this method serves an event notifying Mistral that
it possibly needs to move the workflow on, i.e. run other workflow
tasks for which all dependencies are satisfied.
:param workbook_name: Workbook name.
:param execution_id: Workflow execution id.
:param task_id: Task id.
:param state: New task state.
:param result: Task result data.
:return: Task.
"""
# TODO(m4dcoder): refactor auth context
cntx = {}
kwargs = {'workbook_name': workbook_name,
'execution_id': execution_id,
'task_id': task_id,
'state': state,
'result': result}
return self._client.call(cntx, 'convey_task_result', **kwargs)
def get_workflow_execution_state(self, workbook_name, execution_id):
"""Gets the workflow execution state.
:param workbook_name: Workbook name.
:param execution_id: Workflow execution id.
:return: Current workflow state.
"""
# TODO(m4dcoder): refactor auth context
cntx = {}
kwargs = {'workbook_name': workbook_name,
'execution_id': execution_id}
return self._client.call(
cntx, 'get_workflow_execution_state', **kwargs)
def get_task_state(self, workbook_name, execution_id, task_id):
"""Gets task state.
:param workbook_name: Workbook name.
:param execution_id: Workflow execution id.
:param task_id: Task id.
:return: Current task state.
"""
# TODO(m4dcoder): refactor auth context
cntx = {}
kwargs = {'workbook_name': workbook_name,
'executioin_id': execution_id,
'task_id': task_id}
return self._client.call(cntx, 'get_task_state', **kwargs)

View File

@ -1,323 +0,0 @@
# -*- coding: utf-8 -*-
#
# Copyright 2013 - Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import abc
import copy
import eventlet
from oslo.config import cfg
from mistral.openstack.common import log as logging
from mistral.db import api as db_api
from mistral import dsl_parser as parser
from mistral import exceptions as exc
from mistral.engine import states
from mistral.engine import workflow
from mistral.engine import data_flow
from mistral.engine import retry
LOG = logging.getLogger(__name__)
WORKFLOW_TRACE = logging.getLogger(cfg.CONF.workflow_trace_log_name)
class AbstractEngine(object):
transport = None
@classmethod
@abc.abstractmethod
def _run_tasks(cls, tasks):
pass
@classmethod
def start_workflow_execution(cls, workbook_name, task_name, context):
context = copy.copy(context) if context else {}
db_api.start_tx()
WORKFLOW_TRACE.info("New execution started - [workbook_name = '%s', "
"task_name = '%s']" % (workbook_name, task_name))
# Persist execution and tasks in DB.
try:
workbook = cls._get_workbook(workbook_name)
execution = cls._create_execution(workbook_name,
task_name,
context)
tasks = cls._create_tasks(
workflow.find_workflow_tasks(workbook, task_name),
workbook,
workbook_name, execution['id']
)
tasks_to_start, delayed_tasks = workflow.find_resolved_tasks(tasks)
cls._add_variables_to_data_flow_context(context, execution)
data_flow.prepare_tasks(tasks_to_start, context)
db_api.commit_tx()
except Exception as e:
LOG.exception("Failed to create necessary DB objects.")
raise exc.EngineException("Failed to create necessary DB objects:"
" %s" % e)
finally:
db_api.end_tx()
for task in delayed_tasks:
cls._schedule_run(workbook, task, context)
cls._run_tasks(tasks_to_start)
return execution
@classmethod
def convey_task_result(cls, workbook_name, execution_id,
task_id, state, result):
db_api.start_tx()
try:
workbook = cls._get_workbook(workbook_name)
#TODO(rakhmerov): validate state transition
task = db_api.task_get(workbook_name, execution_id, task_id)
wf_trace_msg = "Task '%s' [%s -> %s" % \
(task['name'], task['state'], state)
wf_trace_msg += ']' if state == states.ERROR \
else ", result = %s]" % result
WORKFLOW_TRACE.info(wf_trace_msg)
task_output = data_flow.get_task_output(task, result)
# Update task state.
task, outbound_context = cls._update_task(workbook, task, state,
task_output)
execution = db_api.execution_get(workbook_name, execution_id)
cls._create_next_tasks(task, workbook)
# Determine what tasks need to be started.
tasks = db_api.tasks_get(workbook_name, execution_id)
new_exec_state = cls._determine_execution_state(execution, tasks)
if execution['state'] != new_exec_state:
wf_trace_msg = \
"Execution '%s' [%s -> %s]" % \
(execution_id, execution['state'], new_exec_state)
WORKFLOW_TRACE.info(wf_trace_msg)
execution = \
db_api.execution_update(workbook_name, execution_id, {
"state": new_exec_state
})
LOG.info("Changed execution state: %s" % execution)
tasks_to_start, delayed_tasks = workflow.find_resolved_tasks(tasks)
cls._add_variables_to_data_flow_context(outbound_context,
execution)
data_flow.prepare_tasks(tasks_to_start, outbound_context)
db_api.commit_tx()
except Exception as e:
LOG.exception("Failed to create necessary DB objects.")
raise exc.EngineException("Failed to create necessary DB objects:"
" %s" % e)
finally:
db_api.end_tx()
if states.is_stopped_or_finished(execution["state"]):
return task
for task in delayed_tasks:
cls._schedule_run(workbook, task, outbound_context)
if tasks_to_start:
cls._run_tasks(tasks_to_start)
return task
@classmethod
def stop_workflow_execution(cls, workbook_name, execution_id):
return db_api.execution_update(workbook_name, execution_id,
{"state": states.STOPPED})
@classmethod
def get_workflow_execution_state(cls, workbook_name, execution_id):
execution = db_api.execution_get(workbook_name, execution_id)
if not execution:
raise exc.EngineException("Workflow execution not found "
"[workbook_name=%s, execution_id=%s]"
% (workbook_name, execution_id))
return execution["state"]
@classmethod
def get_task_state(cls, workbook_name, execution_id, task_id):
task = db_api.task_get(workbook_name, execution_id, task_id)
if not task:
raise exc.EngineException("Task not found.")
return task["state"]
@classmethod
def _create_execution(cls, workbook_name, task_name, context):
return db_api.execution_create(workbook_name, {
"workbook_name": workbook_name,
"task": task_name,
"state": states.RUNNING,
"context": context
})
@classmethod
def _add_variables_to_data_flow_context(cls, context, execution):
db_workbook = db_api.workbook_get(execution['workbook_name'])
data_flow.add_token_to_context(context, db_workbook)
data_flow.add_execution_to_context(context, execution)
@classmethod
def _create_next_tasks(cls, task, workbook):
tasks = workflow.find_tasks_after_completion(task, workbook)
db_tasks = cls._create_tasks(tasks, workbook, task['workbook_name'],
task['execution_id'])
return workflow.find_resolved_tasks(db_tasks)
@classmethod
def _create_tasks(cls, task_list, workbook, workbook_name, execution_id):
tasks = []
for task in task_list:
state, task_runtime_context = retry.get_task_runtime(task)
action_ns = workbook.namespaces.get(task.get_action_namespace())
action_spec = None
if action_ns:
action_spec = \
action_ns.actions.get(task.get_action_name())
db_task = db_api.task_create(workbook_name, execution_id, {
"name": task.name,
"requires": task.requires,
"task_spec": task.to_dict(),
"action_spec": {} if not action_spec
else action_spec.to_dict(),
"state": state,
"tags": task.get_property("tags", None),
"task_runtime_context": task_runtime_context
})
tasks.append(db_task)
return tasks
@classmethod
def _get_workbook(cls, workbook_name):
wb = db_api.workbook_get(workbook_name)
return parser.get_workbook(wb["definition"])
@classmethod
def _determine_execution_state(cls, execution, tasks):
if workflow.is_error(tasks):
return states.ERROR
if workflow.is_success(tasks) or workflow.is_finished(tasks):
return states.SUCCESS
return execution['state']
@classmethod
def _update_task(cls, workbook, task, state, task_output):
"""
Update the task with the runtime information. The outbound_context
for this task is also calculated.
:return: task, outbound_context. task is the updated task and
computed outbound context.
"""
workbook_name = task['workbook_name']
execution_id = task['execution_id']
task_spec = workbook.tasks.get(task["name"])
task_runtime_context = task["task_runtime_context"]
# Compute the outbound_context, state and exec_flow_context.
outbound_context = data_flow.get_outbound_context(task, task_output)
state, task_runtime_context = retry.get_task_runtime(
task_spec, state, outbound_context, task_runtime_context)
# Update the task.
update_values = {"state": state,
"output": task_output,
"task_runtime_context": task_runtime_context}
task = db_api.task_update(workbook_name, execution_id, task["id"],
update_values)
return task, outbound_context
@classmethod
def _schedule_run(cls, workbook, task, outbound_context):
"""
Schedules task to run after the delay defined in the task
specification. If no delay is specified this method is a no-op.
"""
def run_delayed_task():
"""
Runs the delayed task. Performs all the steps required to setup
a task to run which are not already done. This is mostly code
copied over from convey_task_result.
"""
db_api.start_tx()
try:
workbook_name = task['workbook_name']
execution_id = task['execution_id']
execution = db_api.execution_get(workbook_name, execution_id)
# Change state from DELAYED to IDLE to unblock processing.
WORKFLOW_TRACE.info("Task '%s' [%s -> %s]"
% (task['name'],
task['state'], states.IDLE))
db_task = db_api.task_update(workbook_name,
execution_id,
task['id'],
{"state": states.IDLE})
task_to_start = [db_task]
data_flow.prepare_tasks(task_to_start, outbound_context)
db_api.commit_tx()
finally:
db_api.end_tx()
if not states.is_stopped_or_finished(execution["state"]):
cls._run_tasks(task_to_start)
task_spec = workbook.tasks.get(task['name'])
retries, break_on, delay_sec = task_spec.get_retry_parameters()
if delay_sec > 0:
# Run the task after the specified delay.
eventlet.spawn_after(delay_sec, run_delayed_task)
else:
LOG.warn("No delay specified for task(id=%s) name=%s. Not "
"scheduling for execution." % (task['id'], task['name']))

View File

@ -1,123 +0,0 @@
# -*- coding: utf-8 -*-
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from oslo import messaging
from oslo.config import cfg
from mistral.openstack.common import log as logging
LOG = logging.getLogger(__name__)
class EngineClient(object):
"""
RPC client for the Engine.
"""
def __init__(self, transport):
"""Construct an RPC client for the Engine.
:param transport: a messaging transport handle
:type transport: Transport
"""
target = messaging.Target(topic=cfg.CONF.engine.topic)
self._client = messaging.RPCClient(transport, target)
def start_workflow_execution(self, workbook_name, task_name, context=None):
"""Starts a workflow execution based on the specified workbook name
and target task.
:param workbook_name: Workbook name
:param task_name: Target task name
:param context: Execution context which defines a workflow input
:return: Workflow execution.
"""
# TODO(m4dcoder): refactor auth context
cntx = {}
kwargs = {'workbook_name': workbook_name,
'task_name': task_name,
'context': context}
return self._client.call(cntx, 'start_workflow_execution', **kwargs)
def stop_workflow_execution(self, workbook_name, execution_id):
"""Stops the workflow execution with the given id.
:param workbook_name: Workbook name.
:param execution_id: Workflow execution id.
:return: Workflow execution.
"""
# TODO(m4dcoder): refactor auth context
cntx = {}
kwargs = {'workbook_name': workbook_name,
'execution_id': execution_id}
return self._client.call(cntx, 'stop_workflow_execution', **kwargs)
def convey_task_result(self, workbook_name, execution_id,
task_id, state, result):
"""Conveys task result to Mistral Engine.
This method should be used by clients of Mistral Engine to update
state of a task once task action has been performed. One of the
clients of this method is Mistral REST API server that receives
task result from the outside action handlers.
Note: calling this method serves an event notifying Mistral that
it possibly needs to move the workflow on, i.e. run other workflow
tasks for which all dependencies are satisfied.
:param workbook_name: Workbook name.
:param execution_id: Workflow execution id.
:param task_id: Task id.
:param state: New task state.
:param result: Task result data.
:return: Task.
"""
# TODO(m4dcoder): refactor auth context
cntx = {}
kwargs = {'workbook_name': workbook_name,
'execution_id': execution_id,
'task_id': task_id,
'state': state,
'result': result}
return self._client.call(cntx, 'convey_task_result', **kwargs)
def get_workflow_execution_state(self, workbook_name, execution_id):
"""Gets the workflow execution state.
:param workbook_name: Workbook name.
:param execution_id: Workflow execution id.
:return: Current workflow state.
"""
# TODO(m4dcoder): refactor auth context
cntx = {}
kwargs = {'workbook_name': workbook_name,
'execution_id': execution_id}
return self._client.call(
cntx, 'get_workflow_execution_state', **kwargs)
def get_task_state(self, workbook_name, execution_id, task_id):
"""Gets task state.
:param workbook_name: Workbook name.
:param execution_id: Workflow execution id.
:param task_id: Task id.
:return: Current task state.
"""
# TODO(m4dcoder): refactor auth context
cntx = {}
kwargs = {'workbook_name': workbook_name,
'executioin_id': execution_id,
'task_id': task_id}
return self._client.call(cntx, 'get_task_state', **kwargs)

View File

@ -1,7 +1,5 @@
# -*- coding: utf-8 -*-
#
# Copyright 2013 - Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
@ -16,29 +14,28 @@
from oslo import messaging
from oslo.config import cfg
from mistral.openstack.common import log as logging
from mistral.engine.scalable.executor import client
from mistral.engine import abstract_engine as abs_eng
from mistral import engine
from mistral.engine import executor
LOG = logging.getLogger(__name__)
class ScalableEngine(abs_eng.AbstractEngine):
@classmethod
def _notify_task_executors(cls, tasks):
class DefaultEngine(engine.Engine):
def _notify_task_executors(self, tasks):
# TODO(m4dcoder): Use a pool for transport and client
if not cls.transport:
cls.transport = messaging.get_transport(cfg.CONF)
ex_client = client.ExecutorClient(cls.transport)
if not self.transport:
self.transport = messaging.get_transport(cfg.CONF)
exctr = executor.ExecutorClient(self.transport)
for task in tasks:
# TODO(m4dcoder): Fill request context argument with auth info
context = {}
ex_client.handle_task(context, task=task)
exctr.handle_task(context, task=task)
LOG.info("Submitted task for execution: '%s'" % task)
@classmethod
def _run_tasks(cls, tasks):
def _run_tasks(self, tasks):
# TODO(rakhmerov):
# This call outside of DB transaction creates a window
# when the engine may crash and DB will not be consistent with
@ -47,8 +44,4 @@ class ScalableEngine(abs_eng.AbstractEngine):
# However, making this call in DB transaction is really bad
# since it makes transaction much longer in time and under load
# may overload DB with open transactions.
cls._notify_task_executors(tasks)
def get_engine():
return ScalableEngine
self._notify_task_executors(tasks)

View File

@ -19,9 +19,8 @@ from oslo.config import cfg
from mistral.openstack.common import log as logging
from mistral.db import api as db_api
from mistral import exceptions as exc
from mistral import engine
from mistral.engine import client
from mistral.engine import states
from mistral.engine import executor
from mistral.actions import action_factory as a_f
@ -29,10 +28,7 @@ LOG = logging.getLogger(__name__)
WORKFLOW_TRACE = logging.getLogger(cfg.CONF.workflow_trace_log_name)
class Executor(object):
def __init__(self, transport=None):
self.transport = engine.get_transport(transport)
self.engine = client.EngineClient(self.transport)
class DefaultExecutor(executor.Executor):
def _do_task_action(self, task):
"""Executes the action defined by the task and return result.

View File

@ -1,7 +1,5 @@
# -*- coding: utf-8 -*-
#
# Copyright 2013 - Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
@ -14,15 +12,42 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import abc
from oslo import messaging
from oslo.config import cfg
from stevedore import driver
from mistral.openstack.common import log as logging
from mistral import engine
LOG = logging.getLogger(__name__)
def get_executor(name, transport):
mgr = driver.DriverManager(
namespace='mistral.executor.drivers',
name=name,
invoke_on_load=True,
invoke_kwds={'transport': transport})
return mgr.driver
class Executor(object):
"""Abstract class for task execution."""
__metaclass__ = abc.ABCMeta
def __init__(self, transport=None):
self.transport = engine.get_transport(transport)
self.engine = engine.EngineClient(self.transport)
@abc.abstractmethod
def handle_task(self, cntx, **kwargs):
raise NotImplementedError()
class ExecutorClient(object):
"""
RPC client for the Executor.

View File

@ -16,7 +16,6 @@
from mistral.db import api as db_api
from mistral import engine
from mistral.engine import client
from mistral.openstack.common import log
from mistral.openstack.common import periodic_task
from mistral.openstack.common import threadgroup
@ -34,7 +33,7 @@ class MistralPeriodicTasks(periodic_task.PeriodicTasks):
def __init__(self, transport=None):
super(MistralPeriodicTasks, self).__init__()
self.transport = engine.get_transport(transport)
self.engine = client.EngineClient(self.transport)
self.engine = engine.EngineClient(self.transport)
@periodic_task.periodic_task(spacing=1, run_immediately=True)
def scheduler_triggers(self, ctx):

View File

@ -21,7 +21,7 @@ from mistral import exceptions as ex
from webtest.app import AppError
from mistral.tests.api import base
from mistral.db import api as db_api
from mistral.engine import client
from mistral import engine
# TODO: later we need additional tests verifying all the errors etc.
@ -75,7 +75,7 @@ class TestExecutionsController(base.FunctionalTest):
self.assertEqual(resp.status_int, 200)
self.assertDictEqual(UPDATED_EXEC, canonize(resp.json))
@mock.patch.object(client.EngineClient, 'start_workflow_execution',
@mock.patch.object(engine.EngineClient, 'start_workflow_execution',
mock.MagicMock(return_value=EXECS[0]))
def test_post(self):
new_exec = EXECS[0].copy()
@ -86,7 +86,7 @@ class TestExecutionsController(base.FunctionalTest):
self.assertEqual(resp.status_int, 201)
self.assertDictEqual(EXECS[0], canonize(resp.json))
@mock.patch.object(client.EngineClient, 'start_workflow_execution',
@mock.patch.object(engine.EngineClient, 'start_workflow_execution',
mock.MagicMock(side_effect=ex.MistralException))
def test_post_throws_exception(self):
with self.assertRaises(AppError) as context:

View File

@ -18,7 +18,7 @@ import mock
from mistral.tests.api import base
from mistral.db import api as db_api
from mistral.engine import client
from mistral import engine
# TODO: later we need additional tests verifying all the errors etc.
@ -48,7 +48,7 @@ class TestTasksController(base.FunctionalTest):
self.assertEqual(resp.status_int, 200)
self.assertDictEqual(TASKS[0], resp.json)
@mock.patch.object(client.EngineClient, "convey_task_result",
@mock.patch.object(engine.EngineClient, "convey_task_result",
mock.MagicMock(return_value=UPDATED_TASK))
def test_put(self):
resp = self.app.put_json(

View File

@ -34,9 +34,8 @@ from mistral.db.sqlalchemy import api as db_api
from mistral.openstack.common import log as logging
from mistral.openstack.common.db.sqlalchemy import session
from mistral import version
from mistral.engine import client
from mistral.engine.scalable import engine
from mistral.engine.scalable.executor import server
from mistral import engine
from mistral.engine import executor
RESOURCES_PATH = 'tests/resources/'
@ -121,10 +120,11 @@ class DbTestCase(BaseTest):
class EngineTestCase(DbTestCase):
transport = get_fake_transport()
backend = engine.get_engine(cfg.CONF.engine.engine, transport)
def __init__(self, *args, **kwargs):
super(EngineTestCase, self).__init__(*args, **kwargs)
self.engine = client.EngineClient(self.transport)
self.engine = engine.EngineClient(self.transport)
@classmethod
def mock_task_result(cls, workbook_name, execution_id,
@ -133,8 +133,13 @@ class EngineTestCase(DbTestCase):
Mock the engine convey_task_results to send request directly
to the engine instead of going through the oslo.messaging transport.
"""
return engine.ScalableEngine.convey_task_result(
workbook_name, execution_id, task_id, state, result)
cntx = {}
kwargs = {'workbook_name': workbook_name,
'execution_id': execution_id,
'task_id': task_id,
'state': state,
'result': result}
return cls.backend.convey_task_result(cntx, **kwargs)
@classmethod
def mock_start_workflow(cls, workbook_name, task_name, context=None):
@ -142,8 +147,11 @@ class EngineTestCase(DbTestCase):
Mock the engine start_workflow_execution to send request directly
to the engine instead of going through the oslo.messaging transport.
"""
return engine.ScalableEngine.start_workflow_execution(
workbook_name, task_name, context)
cntx = {}
kwargs = {'workbook_name': workbook_name,
'task_name': task_name,
'context': context}
return cls.backend.start_workflow_execution(cntx, **kwargs)
@classmethod
def mock_get_workflow_state(cls, workbook_name, execution_id):
@ -151,8 +159,10 @@ class EngineTestCase(DbTestCase):
Mock the engine get_workflow_execution_state to send request directly
to the engine instead of going through the oslo.messaging transport.
"""
return engine.ScalableEngine.get_workflow_execution_state(
workbook_name, execution_id)
cntx = {}
kwargs = {'workbook_name': workbook_name,
'execution_id': execution_id}
return cls.backend.get_workflow_execution_state(cntx, **kwargs)
@classmethod
def mock_run_tasks(cls, tasks):
@ -160,9 +170,9 @@ class EngineTestCase(DbTestCase):
Mock the engine _run_tasks to send requests directly to the task
executor instead of going through the oslo.messaging transport.
"""
executor = server.Executor(transport=cls.transport)
exctr = executor.get_executor(cfg.CONF.engine.engine, cls.transport)
for task in tasks:
executor.handle_task({}, task=task)
exctr.handle_task({}, task=task)
@classmethod
def mock_handle_task(cls, cntx, **kwargs):
@ -170,5 +180,5 @@ class EngineTestCase(DbTestCase):
Mock the executor handle_task to send requests directory to the task
executor instead of going through the oslo.messaging transport.
"""
executor = server.Executor(transport=cls.transport)
return executor.handle_task(cntx, **kwargs)
exctr = executor.get_executor(cfg.CONF.engine.engine, cls.transport)
return exctr.handle_task(cntx, **kwargs)

View File

@ -22,9 +22,9 @@ from mistral.openstack.common import log as logging
from mistral.db import api as db_api
from mistral.actions import std_actions
from mistral import expressions
from mistral.engine.scalable import engine
from mistral import engine
from mistral.engine import states
from mistral.engine import client
from mistral.engine.drivers.default import engine as concrete_engine
LOG = logging.getLogger(__name__)
@ -40,10 +40,10 @@ cfg.CONF.set_default('auth_enable', False, group='pecan')
@mock.patch.object(
client.EngineClient, 'start_workflow_execution',
engine.EngineClient, 'start_workflow_execution',
mock.MagicMock(side_effect=base.EngineTestCase.mock_start_workflow))
@mock.patch.object(
client.EngineClient, 'convey_task_result',
engine.EngineClient, 'convey_task_result',
mock.MagicMock(side_effect=base.EngineTestCase.mock_task_result))
@mock.patch.object(
db_api, 'workbook_get',
@ -54,7 +54,7 @@ cfg.CONF.set_default('auth_enable', False, group='pecan')
mock.MagicMock(return_value={'state': states.SUCCESS}))
class TestScalableEngine(base.EngineTestCase):
@mock.patch.object(
engine.ScalableEngine, "_notify_task_executors",
concrete_engine.DefaultEngine, "_notify_task_executors",
mock.MagicMock(return_value=""))
def test_engine_one_task(self):
execution = self.engine.start_workflow_execution(WB_NAME, "create-vms",
@ -72,11 +72,11 @@ class TestScalableEngine(base.EngineTestCase):
self.assertEqual(task['state'], states.SUCCESS)
@mock.patch.object(
client.EngineClient, 'get_workflow_execution_state',
engine.EngineClient, 'get_workflow_execution_state',
mock.MagicMock(
side_effect=base.EngineTestCase.mock_get_workflow_state))
@mock.patch.object(
engine.ScalableEngine, "_notify_task_executors",
concrete_engine.DefaultEngine, "_notify_task_executors",
mock.MagicMock(return_value=""))
def test_engine_multiple_tasks(self):
execution = self.engine.start_workflow_execution(WB_NAME, "backup-vms",
@ -116,7 +116,7 @@ class TestScalableEngine(base.EngineTestCase):
WB_NAME, execution['id']))
@mock.patch.object(
engine.ScalableEngine, '_run_tasks',
concrete_engine.DefaultEngine, '_run_tasks',
mock.MagicMock(side_effect=base.EngineTestCase.mock_run_tasks))
@mock.patch.object(
states, "get_state_by_http_status_code",
@ -135,7 +135,7 @@ class TestScalableEngine(base.EngineTestCase):
self.assertEqual(task['state'], states.SUCCESS)
@mock.patch.object(
engine.ScalableEngine, '_run_tasks',
concrete_engine.DefaultEngine, '_run_tasks',
mock.MagicMock(side_effect=base.EngineTestCase.mock_run_tasks))
@mock.patch.object(
expressions, "evaluate", mock.MagicMock(side_effect=lambda x, y: x))
@ -201,7 +201,7 @@ class TestScalableEngine(base.EngineTestCase):
self._assert_multiple_items(tasks, 4, state=states.SUCCESS)
@mock.patch.object(
engine.ScalableEngine, '_run_tasks',
concrete_engine.DefaultEngine, '_run_tasks',
mock.MagicMock(side_effect=base.EngineTestCase.mock_run_tasks))
@mock.patch.object(
expressions, "evaluate", mock.MagicMock(side_effect=lambda x, y: x))

View File

@ -28,8 +28,8 @@ from mistral.openstack.common import importutils
from mistral.engine import states
from mistral.db import api as db_api
from mistral.actions import std_actions
from mistral.engine import client as engine
from mistral.engine.scalable.executor import client as executor
from mistral import engine
from mistral.engine import executor
# We need to make sure that all configuration properties are registered.

View File

@ -22,10 +22,10 @@ from oslo.config import cfg
from mistral.openstack.common import log as logging
from mistral.tests import base
from mistral.db import api as db_api
from mistral.engine.scalable import engine
from mistral.actions import std_actions
from mistral import engine
from mistral.engine import states
from mistral.engine import client
from mistral.engine.drivers.default import engine as concrete_engine
from mistral.utils.openstack import keystone
@ -62,13 +62,13 @@ def create_workbook(definition_path):
@mock.patch.object(
client.EngineClient, 'start_workflow_execution',
engine.EngineClient, 'start_workflow_execution',
mock.MagicMock(side_effect=base.EngineTestCase.mock_start_workflow))
@mock.patch.object(
client.EngineClient, 'convey_task_result',
engine.EngineClient, 'convey_task_result',
mock.MagicMock(side_effect=base.EngineTestCase.mock_task_result))
@mock.patch.object(
engine.ScalableEngine, '_run_tasks',
concrete_engine.DefaultEngine, '_run_tasks',
mock.MagicMock(side_effect=base.EngineTestCase.mock_run_tasks))
class DataFlowTest(base.EngineTestCase):
def _check_in_context_execution(self, task):

View File

@ -24,10 +24,10 @@ from mistral import exceptions as exc
from mistral.openstack.common import log as logging
from mistral.tests import base
from mistral.db import api as db_api
from mistral.engine import client
from mistral.engine.scalable import engine
from mistral.actions import std_actions
from mistral import engine
from mistral.engine import states
from mistral.engine.drivers.default import engine as concrete_engine
from mistral import dsl_parser as parser
@ -59,13 +59,13 @@ class FailBeforeSuccessMocker(object):
@mock.patch.object(
client.EngineClient, 'start_workflow_execution',
engine.EngineClient, 'start_workflow_execution',
mock.MagicMock(side_effect=base.EngineTestCase.mock_start_workflow))
@mock.patch.object(
client.EngineClient, 'convey_task_result',
engine.EngineClient, 'convey_task_result',
mock.MagicMock(side_effect=base.EngineTestCase.mock_task_result))
@mock.patch.object(
engine.ScalableEngine, '_run_tasks',
concrete_engine.DefaultEngine, '_run_tasks',
mock.MagicMock(side_effect=base.EngineTestCase.mock_run_tasks))
@mock.patch.object(
db_api, 'workbook_get',

View File

@ -26,4 +26,8 @@ packages =
console_scripts =
mistral-server = mistral.cmd.launch:main
mistral.engine.drivers =
default = mistral.engine.drivers.default.engine:DefaultEngine
mistral.executor.drivers =
default = mistral.engine.drivers.default.executor:DefaultExecutor