Make executor able to work in isolated environment

The idea is to simplify executor as much as possible (first of all,
remove all db calls). As of now, all the work related to creating,
resolving and updating tasks, converting action params and results and
so on should be done by engine. Executer here is barely an action
runner that receives an action name, params and some kind of session
identifier (task id in our case) and reports back the status and
results along with the session identifier to reference the task it
have done.

In order to keep the model simple, we had to make several assumptions:
 - Any task that was queued for execution is expected to be executed
 and eventually report back by putting its results into
 `convey_task_result` queue.
 - All executors are the same. Each of them should be able to execute
 the action and return the same result.
 - All executor should be also able to access any external resource
 without any additional params evaluation on an executor side

Conflicts:
	mistral/engine/drivers/default/executor.py

Change-Id: I4f1f8fb08cd977ba90f69462108e15f9cfb26250
This commit is contained in:
Kirill Izotov 2014-07-09 15:33:27 +07:00
parent 4dcd1d76c8
commit 5566129d64
15 changed files with 429 additions and 271 deletions

View File

@ -20,6 +20,7 @@ from stevedore import extension
from mistral.actions import base
from mistral.actions import std_actions
from mistral import exceptions as exc
from mistral import expressions as expr
from mistral.openstack.common import log as logging
from mistral.workbook import actions
from mistral.workbook import tasks
@ -169,5 +170,43 @@ def create_action(db_task):
(db_task, e))
def resolve_adhoc_action_name(workbook, action_name):
action_spec = workbook.get_action(action_name)
if not action_spec:
msg = 'Ad-hoc action class is not registered ' \
'[workbook=%s, action=%s, action_spec=%s]' % \
(workbook, action_name, action_spec)
raise exc.ActionException(msg)
base_cls = get_action_class(action_spec.clazz)
if not base_cls:
msg = 'Ad-hoc action base class is not registered ' \
'[workbook=%s, action=%s, base_class=%s]' % \
(workbook, action_name, base_cls)
raise exc.ActionException(msg)
return action_spec.clazz
def convert_adhoc_action_params(workbook, action_name, params):
base_params = workbook.get_action(action_name).base_parameters
if not base_params:
return {}
return expr.evaluate_recursively(base_params, params)
def convert_adhoc_action_result(workbook, action_name, result):
transformer = workbook.get_action(action_name).output
if not transformer:
return result
# Use base action result as a context for evaluating expressions.
return expr.evaluate_recursively(transformer, result)
# Registering actions on module load.
_register_action_classes()

View File

@ -21,11 +21,12 @@ from oslo import messaging
import six
from stevedore import driver
# Submoules of mistral.engine will throw NoSuchOptError if configuration
# Submodules of mistral.engine will throw NoSuchOptError if configuration
# options required at top level of this __init__.py are not imported before
# the submodules are referenced.
cfg.CONF.import_opt('workflow_trace_log_name', 'mistral.config')
from mistral.actions import action_factory as a_f
from mistral import context as auth_context
from mistral.db import api as db_api
from mistral import dsl_parser as parser
@ -35,6 +36,7 @@ from mistral.engine import states
from mistral.engine import workflow
from mistral import exceptions as exc
from mistral.openstack.common import log as logging
from mistral.workbook import tasks as wb_task
LOG = logging.getLogger(__name__)
@ -42,7 +44,7 @@ WORKFLOW_TRACE = logging.getLogger(cfg.CONF.workflow_trace_log_name)
def get_transport(transport=None):
return (transport if transport else messaging.get_transport(cfg.CONF))
return transport if transport else messaging.get_transport(cfg.CONF)
def get_engine(name, transport):
@ -64,7 +66,7 @@ class Engine(object):
self.transport = get_transport(transport)
@abc.abstractmethod
def _run_tasks(cls, tasks):
def _run_task(cls, task_id, action_name, action_params):
raise NotImplementedError()
def start_workflow_execution(self, cntx, **kwargs):
@ -91,21 +93,31 @@ class Engine(object):
# Persist execution and tasks in DB.
try:
workbook = self._get_workbook(workbook_name)
execution = self._create_execution(workbook_name,
task_name,
execution = self._create_execution(workbook_name, task_name,
context)
# Create the whole tree of tasks required by target task, including
# target task itself.
tasks = self._create_tasks(
workflow.find_workflow_tasks(workbook, task_name),
workbook,
workbook_name, execution['id']
)
# Create a list of tasks that can be executed immediately (have
# their requirements satisfied, or, at that point, rather don't
# have them at all) along with the list of tasks that require some
# delay before they'll be executed.
tasks_to_start, delayed_tasks = workflow.find_resolved_tasks(tasks)
# Populate context with special variables such as `openstack` and
# `__execution`.
self._add_variables_to_data_flow_context(context, execution)
data_flow.prepare_tasks(tasks_to_start, context)
# Update task with new context and params.
executables = data_flow.prepare_tasks(tasks_to_start,
context,
workbook)
db_api.commit_tx()
except Exception as e:
@ -118,7 +130,8 @@ class Engine(object):
for task in delayed_tasks:
self._schedule_run(workbook, task, context)
self._run_tasks(tasks_to_start)
for task_id, action_name, action_params in executables:
self._run_task(task_id, action_name, action_params)
return execution
@ -172,19 +185,33 @@ class Engine(object):
else ", result = %s]" % result
WORKFLOW_TRACE.info(wf_trace_msg)
action_name = wb_task.TaskSpec(task['task_spec'])\
.get_full_action_name()
if not a_f.get_action_class(action_name):
action = a_f.resolve_adhoc_action_name(workbook, action_name)
if not action:
msg = 'Unknown action [workbook=%s, action=%s]' % \
(workbook, action_name)
raise exc.ActionException(msg)
result = a_f.convert_adhoc_action_result(workbook,
action_name,
result)
task_output = data_flow.get_task_output(task, result)
# Update task state.
task, outbound_context = self._update_task(workbook, task, state,
task_output)
task, context = self._update_task(workbook, task, state,
task_output)
execution = db_api.execution_get(task['execution_id'])
self._create_next_tasks(task, workbook)
# Determine what tasks need to be started.
tasks = db_api.tasks_get(workbook_name=task['workbook_name'],
execution_id=task['execution_id'])
tasks = db_api.tasks_get(execution_id=task['execution_id'])
new_exec_state = self._determine_execution_state(execution, tasks)
@ -194,19 +221,25 @@ class Engine(object):
(execution['id'], execution['state'], new_exec_state)
WORKFLOW_TRACE.info(wf_trace_msg)
execution = \
db_api.execution_update(execution['id'], {
"state": new_exec_state
})
execution = db_api.execution_update(execution['id'], {
"state": new_exec_state
})
LOG.info("Changed execution state: %s" % execution)
# Create a list of tasks that can be executed immediately (have
# their requirements satisfied) along with the list of tasks that
# require some delay before they'll be executed.
tasks_to_start, delayed_tasks = workflow.find_resolved_tasks(tasks)
self._add_variables_to_data_flow_context(outbound_context,
execution)
# Populate context with special variables such as `openstack` and
# `__execution`.
self._add_variables_to_data_flow_context(context, execution)
data_flow.prepare_tasks(tasks_to_start, outbound_context)
# Update task with new context and params.
executables = data_flow.prepare_tasks(tasks_to_start,
context,
workbook)
db_api.commit_tx()
except Exception as e:
@ -216,14 +249,14 @@ class Engine(object):
finally:
db_api.end_tx()
if states.is_stopped_or_finished(execution["state"]):
if states.is_stopped_or_finished(execution['state']):
return task
for task in delayed_tasks:
self._schedule_run(workbook, task, outbound_context)
self._schedule_run(workbook, task, context)
if tasks_to_start:
self._run_tasks(tasks_to_start)
for task_id, action_name, action_params in executables:
self._run_task(task_id, action_name, action_params)
return task
@ -352,7 +385,7 @@ class Engine(object):
return task, outbound_context
def _schedule_run(cls, workbook, task, outbound_context):
def _schedule_run(self, workbook, task, outbound_context):
"""Schedules task to run after the delay defined in the task
specification. If no delay is specified this method is a no-op.
"""
@ -376,22 +409,23 @@ class Engine(object):
execution_id = task['execution_id']
execution = db_api.execution_get(execution_id)
# Change state from DELAYED to IDLE to unblock processing.
# Change state from DELAYED to RUNNING.
WORKFLOW_TRACE.info("Task '%s' [%s -> %s]"
% (task['name'],
task['state'], states.IDLE))
db_task = db_api.task_update(task['id'],
{"state": states.IDLE})
task_to_start = [db_task]
data_flow.prepare_tasks(task_to_start, outbound_context)
task['state'], states.RUNNING))
executables = data_flow.prepare_tasks([task],
outbound_context,
workbook)
db_api.commit_tx()
finally:
db_api.end_tx()
if not states.is_stopped_or_finished(execution["state"]):
cls._run_tasks(task_to_start)
if states.is_stopped_or_finished(execution['state']):
return
for task_id, action_name, action_params in executables:
self._run_task(task_id, action_name, action_params)
task_spec = workbook.tasks.get(task['name'])
retries, break_on, delay_sec = task_spec.get_retry_parameters()

View File

@ -14,17 +14,45 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import inspect
from oslo.config import cfg
from mistral.actions import action_factory as a_f
from mistral.db import api as db_api
from mistral.engine import states
from mistral import exceptions as exc
from mistral import expressions as expr
from mistral.openstack.common import log as logging
from mistral.services import trusts
from mistral.workbook import tasks as wb_task
LOG = logging.getLogger(__name__)
CONF = cfg.CONF
_ACTION_CTX_PARAM = 'action_context'
def _has_action_context_param(action_cls):
arg_spec = inspect.getargspec(action_cls.__init__)
return _ACTION_CTX_PARAM in arg_spec.args
def _get_action_context(db_task, openstack_context):
result = {
'workbook_name': db_task['workbook_name'],
'execution_id': db_task['execution_id'],
'task_id': db_task['id'],
'task_name': db_task['name'],
'task_tags': db_task['tags'],
}
if openstack_context:
result.update({'openstack': openstack_context})
return result
def evaluate_task_parameters(task, context):
params = task['task_spec'].get('parameters', {})
@ -32,16 +60,51 @@ def evaluate_task_parameters(task, context):
return expr.evaluate_recursively(params, context)
def prepare_tasks(tasks, context):
def prepare_tasks(tasks, context, workbook):
results = []
for task in tasks:
# TODO(rakhmerov): Inbound context should be a merge of outbound
# contexts of task dependencies, if any.
task['in_context'] = context
task['parameters'] = evaluate_task_parameters(task, context)
# TODO(rakhmerov): Inbound context should be a merge of
# outbound contexts of task dependencies, if any.
action_params = evaluate_task_parameters(task, context)
db_api.task_update(task['id'],
{'in_context': task['in_context'],
'parameters': task['parameters']})
{'state': states.RUNNING,
'in_context': context,
'parameters': action_params})
# Get action name. Unwrap ad-hoc and reevaluate params if
# necessary.
action_name = wb_task.TaskSpec(task['task_spec'])\
.get_full_action_name()
openstack_ctx = context.get('openstack')
if not a_f.get_action_class(action_name):
# If action is not found in registered actions try to find
# ad-hoc action definition.
if openstack_ctx is not None:
action_params.update({'openstack': openstack_ctx})
action = a_f.resolve_adhoc_action_name(workbook, action_name)
if not action:
msg = 'Unknown action [workbook=%s, action=%s]' % \
(workbook, action_name)
raise exc.ActionException(msg)
action_params = a_f.convert_adhoc_action_params(workbook,
action_name,
action_params)
action_name = action
if _has_action_context_param(a_f.get_action_class(action_name)):
action_params[_ACTION_CTX_PARAM] = \
_get_action_context(task, openstack_ctx)
results.append((task['id'], action_name, action_params))
return results
def get_task_output(task, result):

View File

@ -12,9 +12,6 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from oslo.config import cfg
from oslo import messaging
from mistral import context as auth_context
from mistral import engine
from mistral.engine import executor
@ -25,16 +22,7 @@ LOG = logging.getLogger(__name__)
class DefaultEngine(engine.Engine):
def _notify_task_executors(self, tasks):
# TODO(m4dcoder): Use a pool for transport and client
if not self.transport:
self.transport = messaging.get_transport(cfg.CONF)
exctr = executor.ExecutorClient(self.transport)
for task in tasks:
LOG.info("Submitted task for execution: '%s'" % task)
exctr.handle_task(auth_context.ctx(), task=task)
def _run_tasks(self, tasks):
def _run_task(self, task_id, action_name, params):
# TODO(rakhmerov):
# This call outside of DB transaction creates a window
# when the engine may crash and DB will not be consistent with
@ -43,4 +31,12 @@ class DefaultEngine(engine.Engine):
# However, making this call in DB transaction is really bad
# since it makes transaction much longer in time and under load
# may overload DB with open transactions.
self._notify_task_executors(tasks)
# TODO(m4dcoder): Use a pool for transport and client
exctr = executor.ExecutorClient(self.transport)
LOG.info("Submitted task for execution: '%s'" % task_id)
exctr.handle_task(auth_context.ctx(),
task_id=task_id,
action_name=action_name,
params=params)

View File

@ -17,7 +17,6 @@
from oslo.config import cfg
from mistral.actions import action_factory as a_f
from mistral.db import api as db_api
from mistral.engine import executor
from mistral.engine import states
from mistral import exceptions as exc
@ -30,103 +29,43 @@ WORKFLOW_TRACE = logging.getLogger(cfg.CONF.workflow_trace_log_name)
class DefaultExecutor(executor.Executor):
def _log_action_exception(self, message, task, exc):
LOG.exception("%s [task_id=%s, action='%s', action_spec='%s']\n %s" %
(message, task['id'], task['task_spec']['action'],
task['action_spec'], exc))
def _log_action_exception(self, message, task_id, action, params, ex):
LOG.exception("%s [task_id=%s, action='%s', params='%s']\n %s" %
(message, task_id, action, params, ex))
def _do_task_action(self, task):
"""Executes the action defined by the task and return result.
def handle_task(self, cntx, task_id, action_name, params={}):
"""Handle the execution of the workbook task.
:param task: a task definition
:type task: dict
:param task_id: task identifier
:type task_id: str
:param action_name: a name of the action to run
:type action_name: str
:param params: a dict of action parameters
"""
LOG.info("Starting task action [task_id=%s, "
"action='%s', action_spec='%s']" %
(task['id'], task['task_spec']['action'],
task['action_spec']))
action = a_f.create_action(task)
action_cls = a_f.get_action_class(action_name)
# TODO(dzimine): on failure, convey failure details back
try:
action = action_cls(**params)
except Exception as e:
raise exc.ActionException("Failed to create action"
"[action_name=%s, params=%s]: %s" %
(action_name, params, e))
if action.is_sync():
try:
state, result = states.SUCCESS, action.run()
except exc.ActionException as ex:
self._log_action_exception("Action failed", task, ex)
self._log_action_exception("Action failed", task_id,
action_name, params, ex)
state, result = states.ERROR, None
self.engine.convey_task_result(task['id'], state, result)
self.engine.convey_task_result(task_id, state, result)
else:
try:
action.run()
except exc.ActionException as ex:
self._log_action_exception("Action failed", task, ex)
self.engine.convey_task_result(task['id'], states.ERROR, None)
def _handle_task_error(self, task, exception):
"""Handle unexpected exception from the task execution.
:param task: the task corresponding to the exception
:type task: dict
:param exception: an exception thrown during the execution of the task
:type exception: Exception
"""
# TODO(dzimine): why exception is a parameter here?
# TODO(dzimine): convey exception details to end user
# (why task failed?)
try:
db_api.start_tx()
try:
db_api.execution_update(task['execution_id'],
{'state': states.ERROR})
db_api.task_update(task['id'],
{'state': states.ERROR})
db_api.commit_tx()
finally:
db_api.end_tx()
except Exception as ex:
LOG.exception(ex)
def handle_task(self, cntx, **kwargs):
"""Handle the execution of the workbook task.
:param cntx: a request context dict
:type cntx: MistralContext
:param kwargs: a dict of method arguments
:type kwargs: dict
"""
try:
task = kwargs.get('task', None)
if not task:
raise Exception('No task is provided to the executor.')
LOG.info("Received a task: %s" % task)
db_task = db_api.task_get(task['id'])
db_exec = db_api.execution_get(task['execution_id'])
if not db_exec or not db_task:
return
if db_exec['state'] != states.RUNNING or \
db_task['state'] != states.IDLE:
return
# Update the state to running before performing action. The
# do_task_action assigns state to the task which is the appropriate
# value to preserve.
WORKFLOW_TRACE.info("Task '%s' [%s -> %s]" % (db_task['name'],
db_task['state'],
states.RUNNING))
db_api.task_update(task['id'],
{'state': states.RUNNING})
self._do_task_action(db_task)
except Exception as ex:
self._log_action_exception("Unexpected exception while trying "
"to execute action", task, ex)
self._handle_task_error(task, ex)
self._log_action_exception("Action failed", task_id,
action_name, params, ex)
self.engine.convey_task_result(task_id, states.ERROR, None)

View File

@ -165,13 +165,15 @@ class EngineTestCase(DbTestCase):
return cls.backend.get_workflow_execution_state(cntx, **kwargs)
@classmethod
def mock_run_tasks(cls, tasks):
def mock_run_task(cls, task_id, action_name, params):
"""Mock the engine _run_tasks to send requests directly to the task
executor instead of going through the oslo.messaging transport.
"""
exctr = executor.get_executor(cfg.CONF.engine.engine, cls.transport)
for task in tasks:
exctr.handle_task({}, task=task)
exctr.handle_task(auth_context.ctx(),
task_id=task_id,
action_name=action_name,
params=params)
@classmethod
def mock_handle_task(cls, cntx, **kwargs):

View File

@ -1,19 +1,20 @@
Namespaces:
Nova:
MyActions:
actions:
create-vm:
class: std.http
concat:
class: std.echo
base-parameters:
url: http://path_to_nova/url_for_create
output: '{$.left} {$.right}'
parameters:
- image_id
- flavor_id
- left
- right
output:
vm_id: $.base_output.server_id
string: $.output
Workflow:
tasks:
create-vm-nova:
action: Nova.create-vm
build_name:
action: MyActions.concat
parameters:
image_id: 1234
flavor_id: 2
left: Stormin
right: Stanley

View File

@ -1,65 +1,26 @@
Namespaces:
MyRest:
class: std.mistral_http
base-parameters:
method: GET
headers:
X-Auth-Token: $.auth_token
MyActions:
actions:
create-vm:
concat:
class: std.echo
base-parameters:
url: http://some_host/service/action/execute
headers:
Content-Type: 'application/json'
output: '{$.left} {$.right}'
parameters:
- image_id
- flavor_id
- left
- right
output:
backup-vm:
base-parameters:
url: http://some_host/url_for_backup
parameters:
- server_id
attach-volume:
base-parameters:
url: /url_for_attach
parameters:
- size
- mnt_path
format-volume:
base-parameters:
url: /url_for_format
parameters:
- server_id
string: $.output
Workflow:
tasks:
create-vms:
action: MyRest.create-vm
build_name:
action: MyActions.concat
parameters:
image_id: 1234
flavor_id: 42
attach-volumes:
requires: create-vms
action: MyRest.attach-volume
left: Stormin
right: Stanley
greet:
requires: [build_name]
action: MyActions.concat
parameters:
size: 1234
mnt_path: /path/to/volume
format-volumes:
requires: [attach-volumes]
action: MyRest.format-volume
parameters:
server_id: 123
backup-vms:
requires:
- create-vms
action: MyRest.backup-vm
parameters:
server_id: 123
left: Greetings
right: {$.string}

View File

@ -54,6 +54,7 @@ Workflow:
f_name: $.full_name
build_greeting:
requires: [build_full_name]
action: MyService.build_greeting
publish:
greet_msg: $.greeting

View File

@ -19,6 +19,7 @@ import json
from mistral.actions import action_factory as a_f
from mistral.actions import std_actions as std
from mistral import dsl_parser as parser
from mistral.engine import data_flow
from mistral import exceptions
from mistral.openstack.common import log as logging
@ -283,3 +284,39 @@ class ActionFactoryTest(base.BaseTest):
action = a_f.create_action(db_task)
self.assertEqual("'Tango and Cash' is a cool movie!", action.run())
def test_resolve_adhoc_action_name(self):
workbook = parser.get_workbook(
base.get_resource('control_flow/one_sync_task.yaml'))
action_name = 'MyActions.concat'
action = a_f.resolve_adhoc_action_name(workbook, action_name)
self.assertEqual('std.echo', action)
def test_convert_adhoc_action_params(self):
workbook = parser.get_workbook(
base.get_resource('control_flow/one_sync_task.yaml'))
action_name = 'MyActions.concat'
params = {
'left': 'Stormin',
'right': 'Stanley'
}
parameters = a_f.convert_adhoc_action_params(workbook,
action_name,
params)
self.assertEqual({'output': 'Stormin Stanley'}, parameters)
def test_convert_adhoc_action_result(self):
workbook = parser.get_workbook(
base.get_resource('control_flow/one_sync_task.yaml'))
action_name = 'MyActions.concat'
result = {'output': 'Stormin Stanley'}
parameters = a_f.convert_adhoc_action_result(workbook,
action_name,
result)
self.assertEqual({'string': 'Stormin Stanley'}, parameters)

View File

@ -17,9 +17,11 @@ import mock
from oslo.config import cfg
from mistral.actions import std_actions
from mistral import context as auth_context
from mistral.db import api as db_api
from mistral import engine
from mistral.engine.drivers.default import engine as concrete_engine
from mistral.engine import executor
from mistral.engine import states
from mistral import expressions
from mistral.openstack.common import log as logging
@ -38,6 +40,7 @@ cfg.CONF.set_default('auth_enable', False, group='pecan')
# TODO(rakhmerov): add more tests for errors, execution stop etc.
@mock.patch.object(auth_context, 'ctx', mock.MagicMock())
@mock.patch.object(
engine.EngineClient, 'start_workflow_execution',
mock.MagicMock(side_effect=base.EngineTestCase.mock_start_workflow))
@ -48,21 +51,28 @@ cfg.CONF.set_default('auth_enable', False, group='pecan')
std_actions.HTTPAction, 'run',
mock.MagicMock(return_value={'state': states.SUCCESS}))
class TestEngine(base.EngineTestCase):
@mock.patch.object(
concrete_engine.DefaultEngine, "_notify_task_executors",
mock.MagicMock(return_value=""))
@mock.patch.object(executor.ExecutorClient, "handle_task",
mock.MagicMock())
@mock.patch.object(
db_api, 'workbook_get',
mock.MagicMock(return_value={'definition': base.get_resource(
'control_flow/one_async_task.yaml')}))
'control_flow/one_sync_task.yaml')}))
def test_with_one_task(self):
execution = self.engine.start_workflow_execution(WB_NAME, "create-vms",
execution = self.engine.start_workflow_execution(WB_NAME, "build_name",
CONTEXT)
task = db_api.tasks_get(workbook_name=WB_NAME,
execution_id=execution['id'])[0]
self.engine.convey_task_result(task['id'], states.SUCCESS, None)
executor.ExecutorClient.handle_task\
.assert_called_once_with(auth_context.ctx(),
params={'output': 'Stormin Stanley'},
task_id=task['id'],
action_name='std.echo')
self.engine.convey_task_result(task['id'],
states.SUCCESS,
{'output': 'Stormin Stanley'})
task = db_api.tasks_get(workbook_name=WB_NAME,
execution_id=execution['id'])[0]
@ -70,20 +80,22 @@ class TestEngine(base.EngineTestCase):
self.assertEqual(execution['state'], states.SUCCESS)
self.assertEqual(task['state'], states.SUCCESS)
self.assertEqual(
task['output'],
{'task': {'build_name': {'string': 'Stormin Stanley'}}})
@mock.patch.object(
engine.EngineClient, 'get_workflow_execution_state',
mock.MagicMock(
side_effect=base.EngineTestCase.mock_get_workflow_state))
@mock.patch.object(
concrete_engine.DefaultEngine, "_notify_task_executors",
mock.MagicMock(return_value=""))
@mock.patch.object(executor.ExecutorClient, "handle_task",
mock.MagicMock())
@mock.patch.object(
db_api, 'workbook_get',
mock.MagicMock(return_value={'definition': base.get_resource(
'control_flow/require_flow.yaml')}))
def test_require_flow(self):
execution = self.engine.start_workflow_execution(WB_NAME, "backup-vms",
execution = self.engine.start_workflow_execution(WB_NAME, "greet",
CONTEXT)
tasks = db_api.tasks_get(workbook_name=WB_NAME,
@ -99,9 +111,7 @@ class TestEngine(base.EngineTestCase):
self.assertEqual(2, len(tasks))
self.assertEqual(tasks[0]['state'], states.SUCCESS)
# Since we mocked out executor notification we expect IDLE
# for the second task.
self.assertEqual(tasks[1]['state'], states.IDLE)
self.assertEqual(tasks[1]['state'], states.RUNNING)
self.assertEqual(states.RUNNING,
self.engine.get_workflow_execution_state(
WB_NAME, execution['id']))
@ -121,8 +131,8 @@ class TestEngine(base.EngineTestCase):
WB_NAME, execution['id']))
@mock.patch.object(
concrete_engine.DefaultEngine, '_run_tasks',
mock.MagicMock(side_effect=base.EngineTestCase.mock_run_tasks))
concrete_engine.DefaultEngine, '_run_task',
mock.MagicMock(side_effect=base.EngineTestCase.mock_run_task))
@mock.patch.object(
expressions, "evaluate", mock.MagicMock(side_effect=lambda x, y: x))
@mock.patch.object(
@ -130,8 +140,7 @@ class TestEngine(base.EngineTestCase):
mock.MagicMock(return_value={'definition': base.get_resource(
'control_flow/one_sync_task.yaml')}))
def test_with_one_sync_task(self):
execution = self.engine.start_workflow_execution(WB_NAME,
"create-vm-nova",
execution = self.engine.start_workflow_execution(WB_NAME, "build_name",
CONTEXT)
task = db_api.tasks_get(workbook_name=WB_NAME,
@ -142,8 +151,8 @@ class TestEngine(base.EngineTestCase):
self.assertEqual(task['state'], states.SUCCESS)
@mock.patch.object(
concrete_engine.DefaultEngine, '_run_tasks',
mock.MagicMock(side_effect=base.EngineTestCase.mock_run_tasks))
concrete_engine.DefaultEngine, '_run_task',
mock.MagicMock(side_effect=base.EngineTestCase.mock_run_task))
@mock.patch.object(
expressions, "evaluate", mock.MagicMock(side_effect=lambda x, y: x))
@mock.patch.object(
@ -205,8 +214,8 @@ class TestEngine(base.EngineTestCase):
self.assertEqual(execution['state'], states.SUCCESS)
@mock.patch.object(
concrete_engine.DefaultEngine, '_run_tasks',
mock.MagicMock(side_effect=base.EngineTestCase.mock_run_tasks))
concrete_engine.DefaultEngine, '_run_task',
mock.MagicMock(side_effect=base.EngineTestCase.mock_run_task))
@mock.patch.object(
expressions, "evaluate", mock.MagicMock(side_effect=lambda x, y: x))
@mock.patch.object(
@ -259,16 +268,13 @@ class TestEngine(base.EngineTestCase):
self._assert_single_item(tasks, state=states.ERROR)
self.assertEqual(execution['state'], states.SUCCESS)
@mock.patch.object(
concrete_engine.DefaultEngine, "_notify_task_executors",
mock.MagicMock(return_value=""))
@mock.patch.object(
db_api, 'workbook_get',
mock.MagicMock(return_value={'definition': base.get_resource(
'control_flow/no_namespaces.yaml')}))
@mock.patch.object(
concrete_engine.DefaultEngine, '_run_tasks',
mock.MagicMock(side_effect=base.EngineTestCase.mock_run_tasks))
concrete_engine.DefaultEngine, '_run_task',
mock.MagicMock(side_effect=base.EngineTestCase.mock_run_task))
def test_engine_with_no_namespaces(self):
execution = self.engine.start_workflow_execution(WB_NAME, "task1", {})
@ -286,8 +292,8 @@ class TestEngine(base.EngineTestCase):
mock.MagicMock(return_value={'definition': base.get_resource(
'control_flow/one_std_task.yaml')}))
@mock.patch.object(
concrete_engine.DefaultEngine, '_run_tasks',
mock.MagicMock(side_effect=base.EngineTestCase.mock_run_tasks))
concrete_engine.DefaultEngine, '_run_task',
mock.MagicMock(side_effect=base.EngineTestCase.mock_run_task))
def test_engine_task_std_action_with_namespaces(self):
execution = self.engine.start_workflow_execution(WB_NAME,
"std_http_task", {})

View File

@ -95,9 +95,6 @@ SAMPLE_CONTEXT = {
@mock.patch.object(
executor.ExecutorClient, 'handle_task',
mock.MagicMock(side_effect=base.EngineTestCase.mock_handle_task))
@mock.patch.object(
engine.EngineClient, 'convey_task_result',
mock.MagicMock(side_effect=base.EngineTestCase.mock_task_result))
class TestExecutor(base.DbTestCase):
def __init__(self, *args, **kwargs):
super(TestExecutor, self).__init__(*args, **kwargs)
@ -122,30 +119,69 @@ class TestExecutor(base.DbTestCase):
self.assertIsInstance(self.task, dict)
self.assertIn('id', self.task)
@mock.patch.object(
std_actions.HTTPAction, 'run', mock.MagicMock(return_value={}))
def test_handle_task(self):
@mock.patch.object(std_actions.EchoAction, 'run')
@mock.patch.object(engine.EngineClient, 'convey_task_result',
mock.MagicMock())
def test_handle_task(self, action):
task_id = '12345'
action_name = 'std.echo'
params = {
'output': 'some'
}
# Send the task request to the Executor.
ex_client = executor.ExecutorClient(self.transport)
ex_client.handle_task(SAMPLE_CONTEXT, task=self.task)
ex_client.handle_task(SAMPLE_CONTEXT,
task_id=task_id,
action_name=action_name,
params=params)
# Check task execution state.
db_task = db_api.task_get(self.task['id'])
self.assertEqual(db_task['state'], states.SUCCESS)
engine.EngineClient.convey_task_result\
.assert_called_once_with(task_id,
states.SUCCESS,
action.return_value)
@mock.patch.object(
std_actions.HTTPAction, 'run',
mock.MagicMock(side_effect=exceptions.ActionException))
@mock.patch.object(engine.EngineClient, 'convey_task_result',
mock.MagicMock())
def test_handle_task_not_registered(self):
task_id = '12345'
action_name = 'not.registered'
params = {
'output': 'some'
}
# Send the task request to the Executor.
ex_client = executor.ExecutorClient(self.transport)
self.assertRaises(exceptions.ActionException, ex_client.handle_task,
SAMPLE_CONTEXT,
task_id=task_id,
action_name=action_name,
params=params)
self.assertFalse(engine.EngineClient.convey_task_result.called)
@mock.patch.object(std_actions.EchoAction, 'run',
mock.MagicMock(side_effect=exceptions.ActionException))
@mock.patch.object(engine.EngineClient, 'convey_task_result',
mock.MagicMock())
def test_handle_task_action_exception(self):
task_id = '12345'
action_name = 'std.echo'
params = {
'output': 'some'
}
# Send the task request to the Executor.
ex_client = executor.ExecutorClient(self.transport)
with mock.patch('mistral.engine.drivers.default.executor.'
'DefaultExecutor._log_action_exception') as log:
ex_client.handle_task(SAMPLE_CONTEXT, task=self.task)
ex_client.handle_task(SAMPLE_CONTEXT,
task_id=task_id,
action_name=action_name,
params=params)
self.assertTrue(log.called, "Exception must be logged")
# Check task execution state.
db_task = db_api.task_get(self.task['id'])
self.assertEqual(db_task['state'], states.ERROR)
engine.EngineClient.convey_task_result\
.assert_called_once_with(task_id,
states.ERROR,
None)

View File

@ -68,8 +68,8 @@ def create_workbook(definition_path):
engine.EngineClient, 'convey_task_result',
mock.MagicMock(side_effect=base.EngineTestCase.mock_task_result))
@mock.patch.object(
concrete_engine.DefaultEngine, '_run_tasks',
mock.MagicMock(side_effect=base.EngineTestCase.mock_run_tasks))
concrete_engine.DefaultEngine, '_run_task',
mock.MagicMock(side_effect=base.EngineTestCase.mock_run_task))
class DataFlowTest(base.EngineTestCase):
def _check_in_context_execution(self, task):
self.assertIn('__execution', task['in_context'])

View File

@ -14,10 +14,14 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import copy
from mistral.db import api as db_api
from mistral.engine import data_flow
from mistral.engine import states
from mistral.openstack.common import log as logging
from mistral.tests import base
from mistral.workbook import workbook
LOG = logging.getLogger(__name__)
@ -38,6 +42,7 @@ TASK = {
'execution_id': EXEC_ID,
'name': 'my_task',
'task_spec': {
'action': 'std.echo',
'parameters': {
'p1': 'My string',
'p2': '$.param3.param32',
@ -47,17 +52,31 @@ TASK = {
'new_key11': '$.new_key1'
}
},
'service_spec': {
'actions': {
'action': {
'output': {
# This one should not be evaluated.
'server_id': '$.server.id'
'in_context': CONTEXT
}
TASK2 = copy.deepcopy(TASK)
TASK2['task_spec']['action'] = 'some.thing'
WORKBOOK = {
'Namespaces': {
'some': {
'actions': {
'thing': {
'class': 'std.echo',
'base-parameters': {
'output': '{$.p1} {$.p2}'
}
}
}
}
},
'in_context': CONTEXT
'Workflow': {
'tasks': {
'first_task': TASK['task_spec'],
'second_task': TASK2['task_spec']
}
}
}
@ -70,18 +89,36 @@ class DataFlowModuleTest(base.DbTestCase):
self.assertEqual('val32', parameters['p2'])
def test_prepare_tasks(self):
task = db_api.task_create(EXEC_ID, TASK.copy())
tasks = [task]
wb = workbook.WorkbookSpec(WORKBOOK)
data_flow.prepare_tasks(tasks, CONTEXT)
tasks = [
db_api.task_create(EXEC_ID, TASK.copy()),
db_api.task_create(EXEC_ID, TASK2.copy())
]
db_task = db_api.task_get(tasks[0]['id'])
executables = data_flow.prepare_tasks(tasks, CONTEXT, wb)
self.assertDictEqual(CONTEXT, db_task['in_context'])
self.assertDictEqual({'p1': 'My string',
'p2': 'val32',
'p3': ''},
db_task['parameters'])
self.assertEqual(2, len(executables))
self.assertEqual(tasks[0]['id'], executables[0][0])
self.assertEqual('std.echo', executables[0][1])
self.assertDictEqual({'p2': 'val32', 'p3': '', 'p1': 'My string'},
executables[0][2])
self.assertEqual(tasks[1]['id'], executables[1][0])
self.assertEqual('std.echo', executables[1][1])
self.assertDictEqual({'output': 'My string val32'},
executables[1][2])
for task in tasks:
db_task = db_api.task_get(task['id'])
self.assertDictEqual(CONTEXT, db_task['in_context'])
self.assertDictEqual({'p1': 'My string',
'p2': 'val32',
'p3': ''},
db_task['parameters'])
self.assertEqual(states.RUNNING, db_task['state'])
def test_get_outbound_context(self):
output = data_flow.get_task_output(TASK, {'new_key1': 'new_val1'})

View File

@ -63,8 +63,8 @@ class FailBeforeSuccessMocker(object):
engine.EngineClient, 'convey_task_result',
mock.MagicMock(side_effect=base.EngineTestCase.mock_task_result))
@mock.patch.object(
concrete_engine.DefaultEngine, '_run_tasks',
mock.MagicMock(side_effect=base.EngineTestCase.mock_run_tasks))
concrete_engine.DefaultEngine, '_run_task',
mock.MagicMock(side_effect=base.EngineTestCase.mock_run_task))
@mock.patch.object(
std_actions.HTTPAction, 'run',
mock.MagicMock(return_value='result'))
@ -160,6 +160,12 @@ class TaskRetryTest(base.EngineTestCase):
retry_count, _, delay = task_spec.get_retry_parameters()
for x in xrange(0, retry_count):
tasks = db_api.tasks_get(workbook_name=WB_NAME,
execution_id=execution['id'])
self._assert_single_item(tasks, name=task_name)
self._assert_single_item(tasks, state=states.RUNNING)
self.engine.convey_task_result(tasks[0]['id'], states.ERROR,
{'output': 'result'})