Make deafult executor use async messaging when returning action results

Change-Id: I9f504ef0d14e4a5fa503ff792a8aa45760037008
Partial-Bug: #1624284
This commit is contained in:
Dawid Deja 2016-09-27 18:15:37 +02:00
parent a5f6facf09
commit ac486f2d20
3 changed files with 16 additions and 4 deletions

View File

@ -56,7 +56,8 @@ class Engine(object):
raise NotImplementedError raise NotImplementedError
@abc.abstractmethod @abc.abstractmethod
def on_action_complete(self, action_ex_id, result, wf_action=False): def on_action_complete(self, action_ex_id, result, wf_action=False,
async=False):
"""Accepts action result and continues the workflow. """Accepts action result and continues the workflow.
Action execution result here is a result which comes from an Action execution result here is a result which comes from an
@ -68,6 +69,8 @@ class Engine(object):
a workflow execution rather than action execution. It happens a workflow execution rather than action execution. It happens
when a nested workflow execution sends its result to a parent when a nested workflow execution sends its result to a parent
workflow. workflow.
:param async: If True, run action in asynchronous mode (w/o waiting
for completion).
:return: Action(or workflow if wf_action=True) execution object. :return: Action(or workflow if wf_action=True) execution object.
""" """
raise NotImplementedError raise NotImplementedError

View File

@ -107,7 +107,11 @@ class DefaultExecutor(base.Executor, coordination.Service):
try: try:
if action_ex_id and (action.is_sync() or result.is_error()): if action_ex_id and (action.is_sync() or result.is_error()):
self._engine_client.on_action_complete(action_ex_id, result) self._engine_client.on_action_complete(
action_ex_id,
result,
async=True
)
except Exception as e: except Exception as e:
msg = ("Exception occurred when calling engine on_action_complete" msg = ("Exception occurred when calling engine on_action_complete"

View File

@ -373,7 +373,8 @@ class EngineClient(base.Engine):
) )
@wrap_messaging_exception @wrap_messaging_exception
def on_action_complete(self, action_ex_id, result, wf_action=False): def on_action_complete(self, action_ex_id, result, wf_action=False,
async=False):
"""Conveys action result to Mistral Engine. """Conveys action result to Mistral Engine.
This method should be used by clients of Mistral Engine to update This method should be used by clients of Mistral Engine to update
@ -391,10 +392,14 @@ class EngineClient(base.Engine):
a workflow execution rather than action execution. It happens a workflow execution rather than action execution. It happens
when a nested workflow execution sends its result to a parent when a nested workflow execution sends its result to a parent
workflow. workflow.
:param async: If True, run action in asynchronous mode (w/o waiting
for completion).
:return: Action(or workflow if wf_action=True) execution object. :return: Action(or workflow if wf_action=True) execution object.
""" """
return self._client.sync_call( call = self._client.async_call if async else self._client.sync_call
return call(
auth_ctx.ctx(), auth_ctx.ctx(),
'on_action_complete', 'on_action_complete',
action_ex_id=action_ex_id, action_ex_id=action_ex_id,