Stop sending workflow output to on_action_complete

In case of a subworkflow, the Result object sent via the queue
is the output of the execution, which can reach enormous sizes.
Transfer of huge messages through RPC can lead to slow work of
the queue, which in turn leads to the loss of messages.

Depends-On: Icb0163f553af690192bfab0e192a32d793db28a9
Change-Id: I1430d2989e0e1a20d0873538691be9d4c3b7158b
Closes-Bug: #1823092
Signed-off-by: Oleg Ovcharuk <vgvoleg@gmail.com>
This commit is contained in:
Oleg Ovcharuk 2019-04-04 02:10:48 +03:00 committed by Renat Akhmerov
parent 97f4fc2776
commit 9238909e08
3 changed files with 12 additions and 2 deletions

View File

@ -19,6 +19,8 @@ from oslo_config import cfg
from oslo_log import log as logging
from osprofiler import profiler
from mistral_lib import actions as ml_actions
from mistral.db import utils as db_utils
from mistral.db.v2 import api as db_api
from mistral.db.v2.sqlalchemy import models as db_models
@ -145,6 +147,12 @@ class DefaultEngine(base.Engine):
with db_api.transaction():
if wf_action:
action_ex = db_api.get_workflow_execution(action_ex_id)
# If result is None it means that it's a normal subworkflow
# output and we just need to fetch it from the model.
# This is just an optimization to not send data over RPC
if result is None:
result = ml_actions.Result(data=action_ex.output)
else:
action_ex = db_api.get_action_execution(action_ex_id)

View File

@ -153,7 +153,7 @@ class EngineServer(service_base.MistralService):
"Received RPC request 'on_action_complete'[action_ex_id=%s, "
"result=%s]",
action_ex_id,
result.cut_repr()
result.cut_repr() if result else '<unknown>'
)
return self.engine.on_action_complete(action_ex_id, result, wf_action)

View File

@ -551,7 +551,9 @@ class Workflow(object):
def _send_result_to_parent_workflow(self):
if self.wf_ex.state == states.SUCCESS:
result = ml_actions.Result(data=self.wf_ex.output)
# The result of the sub workflow is already saved
# so there's no need to send it over RPC.
result = None
elif self.wf_ex.state == states.ERROR:
err_msg = (
self.wf_ex.state_info or