From ac486f2d20bdcaf0629d01d27e484d897c80ee0f Mon Sep 17 00:00:00 2001 From: Dawid Deja Date: Tue, 27 Sep 2016 18:15:37 +0200 Subject: [PATCH] Make deafult executor use async messaging when returning action results Change-Id: I9f504ef0d14e4a5fa503ff792a8aa45760037008 Partial-Bug: #1624284 --- mistral/engine/base.py | 5 ++++- mistral/engine/default_executor.py | 6 +++++- mistral/engine/rpc_backend/rpc.py | 9 +++++++-- 3 files changed, 16 insertions(+), 4 deletions(-) diff --git a/mistral/engine/base.py b/mistral/engine/base.py index ac9e8000..8859dc9f 100644 --- a/mistral/engine/base.py +++ b/mistral/engine/base.py @@ -56,7 +56,8 @@ class Engine(object): raise NotImplementedError @abc.abstractmethod - def on_action_complete(self, action_ex_id, result, wf_action=False): + def on_action_complete(self, action_ex_id, result, wf_action=False, + async=False): """Accepts action result and continues the workflow. Action execution result here is a result which comes from an @@ -68,6 +69,8 @@ class Engine(object): a workflow execution rather than action execution. It happens when a nested workflow execution sends its result to a parent workflow. + :param async: If True, run action in asynchronous mode (w/o waiting + for completion). :return: Action(or workflow if wf_action=True) execution object. """ raise NotImplementedError diff --git a/mistral/engine/default_executor.py b/mistral/engine/default_executor.py index 7b34a077..210c965a 100644 --- a/mistral/engine/default_executor.py +++ b/mistral/engine/default_executor.py @@ -107,7 +107,11 @@ class DefaultExecutor(base.Executor, coordination.Service): try: if action_ex_id and (action.is_sync() or result.is_error()): - self._engine_client.on_action_complete(action_ex_id, result) + self._engine_client.on_action_complete( + action_ex_id, + result, + async=True + ) except Exception as e: msg = ("Exception occurred when calling engine on_action_complete" diff --git a/mistral/engine/rpc_backend/rpc.py b/mistral/engine/rpc_backend/rpc.py index 62c6475e..64dd09fb 100644 --- a/mistral/engine/rpc_backend/rpc.py +++ b/mistral/engine/rpc_backend/rpc.py @@ -373,7 +373,8 @@ class EngineClient(base.Engine): ) @wrap_messaging_exception - def on_action_complete(self, action_ex_id, result, wf_action=False): + def on_action_complete(self, action_ex_id, result, wf_action=False, + async=False): """Conveys action result to Mistral Engine. This method should be used by clients of Mistral Engine to update @@ -391,10 +392,14 @@ class EngineClient(base.Engine): a workflow execution rather than action execution. It happens when a nested workflow execution sends its result to a parent workflow. + :param async: If True, run action in asynchronous mode (w/o waiting + for completion). :return: Action(or workflow if wf_action=True) execution object. """ - return self._client.sync_call( + call = self._client.async_call if async else self._client.sync_call + + return call( auth_ctx.ctx(), 'on_action_complete', action_ex_id=action_ex_id,