Executor fails actions if they are redelivered

With this change executor will fail actions that are redelivered
and have flag safe_rerun set to false

Implements blueprint mistral-task-delivery-model

Change-Id: Ie0e728cf59af9fe44c8fd1d243439a82d9478ff4
This commit is contained in:
Dawid Deja 2016-07-13 16:42:49 +02:00
parent e4033c4290
commit 2197126489
8 changed files with 210 additions and 7 deletions

View File

@ -85,6 +85,7 @@ class MistralContext(BaseContext):
"roles",
"is_admin",
"is_trust_scoped",
"redelivered"
])
def __repr__(self):

View File

@ -128,13 +128,16 @@ class Executor(object):
@abc.abstractmethod
def run_action(self, action_ex_id, action_class_str, attributes,
action_params):
action_params, safe_rerun, redelivered=False):
"""Runs action.
:param action_ex_id: Corresponding action execution id.
:param action_class_str: Path to action class in dot notation.
:param attributes: Attributes of action class which will be set to.
:param action_params: Action parameters.
:param safe_rerun: Tells if given action can be safely rerun.
:param redelivered: Tells if given action was run before on another
executor.
"""
raise NotImplementedError()

View File

@ -34,13 +34,16 @@ class DefaultExecutor(base.Executor, coordination.Service):
@profiler.trace('executor-run-action')
def run_action(self, action_ex_id, action_class_str, attributes,
action_params):
action_params, safe_rerun, redelivered=False):
"""Runs action.
:param action_ex_id: Action execution id.
:param action_class_str: Path to action class in dot notation.
:param attributes: Attributes of action class which will be set to.
:param action_params: Action parameters.
:param safe_rerun: Tells if given action can be safely rerun.
:param redelivered: Tells if given action was run before on another
executor.
"""
def send_error_back(error_msg):
@ -56,6 +59,16 @@ class DefaultExecutor(base.Executor, coordination.Service):
return error_result
if redelivered and not safe_rerun:
msg = (
"Request to run action %s was redelivered, but action %s"
" cannot be re-run safely. The only safe thing to do is fail"
" action."
% (action_class_str, action_class_str)
)
return send_error_back(msg)
action_cls = a_f.construct_action_class(action_class_str, attributes)
# Instantiate action.

View File

@ -147,11 +147,15 @@ class KombuRPCServer(rpc_base.RPCServer, kombu_base.Base):
is_async = request.get('async', False)
rpc_ctx = request.get('rpc_ctx')
redelivered = message.delivery_info.get('redelivered', None)
rpc_method_name = request.get('rpc_method')
arguments = request.get('arguments')
correlation_id = message.properties['correlation_id']
reply_to = message.properties['reply_to']
if redelivered is not None:
rpc_ctx['redelivered'] = redelivered
rpc_context = self._set_auth_ctx(rpc_ctx)
rpc_method = self._get_rpc_method(rpc_method_name)

View File

@ -480,7 +480,7 @@ class ExecutorServer(object):
self._executor = executor
def run_action(self, rpc_ctx, action_ex_id, action_class_str,
attributes, params):
attributes, params, safe_rerun):
"""Receives calls over RPC to run action on executor.
:param rpc_ctx: RPC request context dictionary.
@ -488,6 +488,7 @@ class ExecutorServer(object):
:param action_class_str: Action class name.
:param attributes: Action class attributes.
:param params: Action input parameters.
:param safe_rerun: Tells if given action can be safely rerun.
:return: Action result.
"""
@ -497,11 +498,15 @@ class ExecutorServer(object):
% (rpc_ctx, action_ex_id, action_class_str, attributes, params)
)
redelivered = rpc_ctx.redelivered or False
return self._executor.run_action(
action_ex_id,
action_class_str,
attributes,
params
params,
safe_rerun,
redelivered
)
@ -538,6 +543,7 @@ class ExecutorClient(base.Executor):
'action_class_str': action_class_str,
'attributes': attributes,
'params': action_params,
'safe_rerun': safe_rerun
}
rpc_client_method = (self._client.async_call

View File

@ -174,9 +174,9 @@ class KombuServerTestCase(base.KombuTestCase):
)
def test__on_message_rpc_method_not_found(self):
request = {
'rpc_ctx': self.ctx,
'rpc_ctx': {},
'rpc_method': 'not_found_method',
'arguments': None
'arguments': {}
}
message = mock.MagicMock()
@ -213,6 +213,7 @@ class KombuServerTestCase(base.KombuTestCase):
'reply_to': None,
'correlation_id': None
}
message.delivery_info.get.return_value = False
rpc_method = mock.MagicMock(return_value=result)
get_rpc_method.return_value = rpc_method
@ -248,6 +249,7 @@ class KombuServerTestCase(base.KombuTestCase):
'reply_to': reply_to,
'correlation_id': correlation_id
}
message.delivery_info.get.return_value = False
rpc_method = mock.MagicMock(return_value=result)
get_rpc_method.return_value = rpc_method

View File

@ -86,7 +86,8 @@ def _run_at_target(action_ex_id, action_class_str, attributes,
action_ex_id,
action_class_str,
attributes,
action_params
action_params,
safe_rerun
)

View File

@ -0,0 +1,173 @@
# Copyright (c) 2016 Intel Corporation
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import mock
from mistral.db.v2 import api as db_api
from mistral.engine import default_executor
from mistral.engine.rpc_backend import rpc
from mistral.services import workflows as wf_service
from mistral.tests.unit.engine import base
from mistral.workflow import data_flow
from mistral.workflow import states
def _run_at_target(action_ex_id, action_class_str, attributes,
action_params, target=None, async=True, safe_rerun=False):
# We'll just call executor directly for testing purposes.
executor = default_executor.DefaultExecutor(rpc.get_engine_client())
executor.run_action(
action_ex_id,
action_class_str,
attributes,
action_params,
safe_rerun,
redelivered=True
)
MOCK_RUN_AT_TARGET = mock.MagicMock(side_effect=_run_at_target)
class TestSafeRerun(base.EngineTestCase):
@mock.patch.object(rpc.ExecutorClient, 'run_action', MOCK_RUN_AT_TARGET)
def test_safe_rerun_true(self):
wf_text = """---
version: '2.0'
wf:
tasks:
task1:
action: std.noop
safe-rerun: true
on-success:
- task2
on-error:
- task3
task2:
action: std.noop
safe-rerun: true
task3:
action: std.noop
safe-rerun: true
"""
# Note: because every task have redelivered flag set to true in mock
# function (_run_at_target), task2 and task3 have to set safe-rerun
# to true.
wf_service.create_workflows(wf_text)
wf_ex = self.engine.start_workflow('wf', {})
self.await_execution_success(wf_ex.id)
# Note: We need to reread execution to access related tasks.
wf_ex = db_api.get_workflow_execution(wf_ex.id)
tasks = wf_ex.task_executions
self.assertEqual(len(tasks), 2)
task1 = self._assert_single_item(tasks, name='task1')
task2 = self._assert_single_item(tasks, name='task2')
self.assertEqual(task1.state, states.SUCCESS)
self.assertEqual(task2.state, states.SUCCESS)
@mock.patch.object(rpc.ExecutorClient, 'run_action', MOCK_RUN_AT_TARGET)
def test_safe_rerun_false(self):
wf_text = """---
version: '2.0'
wf:
tasks:
task1:
action: std.noop
safe-rerun: false
on-success:
- task2
on-error:
- task3
task2:
action: std.noop
safe-rerun: true
task3:
action: std.noop
safe-rerun: true
"""
# Note: because every task have redelivered flag set to true in mock
# function (_run_at_target), task2 and task3 have to set safe-rerun
# to true.
wf_service.create_workflows(wf_text)
wf_ex = self.engine.start_workflow('wf', {})
self.await_execution_success(wf_ex.id)
# Note: We need to reread execution to access related tasks.
wf_ex = db_api.get_workflow_execution(wf_ex.id)
tasks = wf_ex.task_executions
self.assertEqual(len(tasks), 2)
task1 = self._assert_single_item(tasks, name='task1')
task3 = self._assert_single_item(tasks, name='task3')
self.assertEqual(task1.state, states.ERROR)
self.assertEqual(task3.state, states.SUCCESS)
@mock.patch.object(rpc.ExecutorClient, 'run_action', MOCK_RUN_AT_TARGET)
def test_safe_rerun_with_items(self):
wf_text = """---
version: '2.0'
wf:
tasks:
task1:
with-items: i in [1, 2, 3]
action: std.echo output=<% $.i %>
safe-rerun: true
publish:
result: <% task(task1).result %>
"""
wf_service.create_workflows(wf_text)
wf_ex = self.engine.start_workflow('wf', {})
self.await_execution_success(wf_ex.id)
# Note: We need to reread execution to access related tasks.
wf_ex = db_api.get_workflow_execution(wf_ex.id)
tasks = wf_ex.task_executions
self.assertEqual(len(tasks), 1)
task1 = self._assert_single_item(tasks, name='task1')
self.assertEqual(task1.state, states.SUCCESS)
result = data_flow.get_task_execution_result(task1)
self.assertIn(1, result)
self.assertIn(2, result)
self.assertIn(3, result)