Filter messages not from waiting execution
The convention is to use the same queue name ("tripleo") for all workflows. This can lead to messages showing from other tripleoclient triggered workflows showing up during message polling if multiple workflows are running at the same time. This patch adds a check that will filter out any messages that do not belong to the execution that is being waited on by comparing the execution id with the root_execution_id returned in the execution payload. Depends-On: Icbe80c338d69efc6ce8fceb0f73f833bec588536 Change-Id: Ie6473d6a1044cdf76552d62645b4d63da2df9398 Related-Bug: #1794277
This commit is contained in:
parent
ec2e018457
commit
339c1f334c
|
@ -25,7 +25,11 @@ class TestOvercloudCredentials(test_plugin.TestPluginV1):
|
|||
super(TestOvercloudCredentials, self).setUp()
|
||||
|
||||
self.cmd = overcloud_credentials.OvercloudCredentials(self.app, None)
|
||||
self.app.client_manager.workflow_engine = self.workflow = mock.Mock()
|
||||
workflow = execution = mock.Mock()
|
||||
execution.id = "IDID"
|
||||
workflow.executions.create.return_value = execution
|
||||
self.app.client_manager.workflow_engine = workflow
|
||||
self.app.client_manager.workflow_engine = self.workflow = workflow
|
||||
self.tripleoclient = mock.Mock()
|
||||
self.websocket = mock.Mock()
|
||||
self.websocket.__enter__ = lambda s: self.websocket
|
||||
|
|
|
@ -139,5 +139,8 @@ class TestDeployOvercloud(utils.TestCommand):
|
|||
self.app.client_manager.image = mock.Mock()
|
||||
self.app.client_manager.network = mock.Mock()
|
||||
self.app.client_manager.orchestration = mock.Mock()
|
||||
self.app.client_manager.workflow_engine = mock.Mock()
|
||||
workflow = execution = mock.Mock()
|
||||
execution.id = "IDID"
|
||||
workflow.executions.create.return_value = execution
|
||||
self.app.client_manager.workflow_engine = workflow
|
||||
self.app.client_manager.tripleoclient = FakeClientWrapper()
|
||||
|
|
|
@ -48,7 +48,10 @@ class TestFFWDUpgradePrepare(utils.TestCommand):
|
|||
self.app.client_manager.baremetal = mock.Mock()
|
||||
self.app.client_manager.orchestration = mock.Mock()
|
||||
self.app.client_manager.tripleoclient = FakeClientWrapper()
|
||||
self.app.client_manager.workflow_engine = mock.Mock()
|
||||
workflow = execution = mock.Mock()
|
||||
execution.id = "IDID"
|
||||
workflow.executions.create.return_value = execution
|
||||
self.app.client_manager.workflow_engine = workflow
|
||||
|
||||
|
||||
class TestFFWDUpgradeRun(utils.TestCommand):
|
||||
|
@ -70,4 +73,7 @@ class TestFFWDUpgradeConverge(utils.TestCommand):
|
|||
self.app.client_manager.baremetal = mock.Mock()
|
||||
self.app.client_manager.orchestration = mock.Mock()
|
||||
self.app.client_manager.tripleoclient = FakeClientWrapper()
|
||||
self.app.client_manager.workflow_engine = mock.Mock()
|
||||
workflow = execution = mock.Mock()
|
||||
execution.id = "IDID"
|
||||
workflow.executions.create.return_value = execution
|
||||
self.app.client_manager.workflow_engine = workflow
|
||||
|
|
|
@ -47,6 +47,9 @@ class TestDeleteNode(fakes.TestDeleteNode):
|
|||
self.workflow = self.app.client_manager.workflow_engine
|
||||
self.stack_name = self.app.client_manager.orchestration.stacks.get
|
||||
self.stack_name.return_value = mock.Mock(stack_name="overcloud")
|
||||
execution = mock.Mock()
|
||||
execution.id = "IDID"
|
||||
self.workflow.executions.create.return_value = execution
|
||||
|
||||
# TODO(someone): This test does not pass with autospec=True, it should
|
||||
# probably be fixed so that it can pass with that.
|
||||
|
@ -133,6 +136,7 @@ class TestDeleteNode(fakes.TestDeleteNode):
|
|||
|
||||
self.websocket.wait_for_messages.return_value = iter([{
|
||||
"status": "FAILED",
|
||||
"execution": {"id": "IDID"},
|
||||
"message": """Failed to run action ERROR: Couldn't find \
|
||||
following instances in stack overcloud: wrong_instance"""
|
||||
}])
|
||||
|
@ -156,6 +160,9 @@ class TestProvideNode(fakes.TestOvercloudNode):
|
|||
super(TestProvideNode, self).setUp()
|
||||
|
||||
self.workflow = self.app.client_manager.workflow_engine
|
||||
execution = mock.Mock()
|
||||
execution.id = "IDID"
|
||||
self.workflow.executions.create.return_value = execution
|
||||
client = self.app.client_manager.tripleoclient
|
||||
self.websocket = client.messaging_websocket()
|
||||
|
||||
|
@ -164,7 +171,8 @@ class TestProvideNode(fakes.TestOvercloudNode):
|
|||
|
||||
self.websocket.wait_for_messages.return_value = iter([{
|
||||
"status": "SUCCESS",
|
||||
"message": "Success"
|
||||
"message": "Success",
|
||||
"execution": {"id": "IDID"}
|
||||
}])
|
||||
|
||||
def test_provide_all_manageable_nodes(self):
|
||||
|
@ -228,6 +236,9 @@ class TestIntrospectNode(fakes.TestOvercloudNode):
|
|||
super(TestIntrospectNode, self).setUp()
|
||||
|
||||
self.workflow = self.app.client_manager.workflow_engine
|
||||
execution = mock.Mock()
|
||||
execution.id = "IDID"
|
||||
self.workflow.executions.create.return_value = execution
|
||||
client = self.app.client_manager.tripleoclient
|
||||
self.websocket = client.messaging_websocket()
|
||||
|
||||
|
@ -238,7 +249,8 @@ class TestIntrospectNode(fakes.TestOvercloudNode):
|
|||
self.websocket.wait_for_messages.return_value = iter([{
|
||||
"status": "SUCCESS",
|
||||
"message": "Success",
|
||||
"introspected_nodes": {}
|
||||
"introspected_nodes": {},
|
||||
"execution": {"id": "IDID"}
|
||||
}] * 2)
|
||||
|
||||
self.cmd.take_action(parsed_args)
|
||||
|
@ -262,6 +274,7 @@ class TestIntrospectNode(fakes.TestOvercloudNode):
|
|||
self.websocket.wait_for_messages.return_value = [{
|
||||
"status": "SUCCESS",
|
||||
"message": "Success",
|
||||
"execution": {"id": "IDID"},
|
||||
}]
|
||||
|
||||
self.cmd.take_action(parsed_args)
|
||||
|
@ -355,6 +368,9 @@ class TestImportNode(fakes.TestOvercloudNode):
|
|||
self.addCleanup(os.unlink, self.json_file.name)
|
||||
|
||||
self.workflow = self.app.client_manager.workflow_engine
|
||||
execution = mock.Mock()
|
||||
execution.id = "IDID"
|
||||
self.workflow.executions.create.return_value = execution
|
||||
client = self.app.client_manager.tripleoclient
|
||||
self.websocket = client.messaging_websocket()
|
||||
|
||||
|
@ -376,7 +392,8 @@ class TestImportNode(fakes.TestOvercloudNode):
|
|||
"message": "Success",
|
||||
"registered_nodes": [{
|
||||
"uuid": "MOCK_NODE_UUID"
|
||||
}]
|
||||
}],
|
||||
"execution": {"id": "IDID"}
|
||||
}]
|
||||
|
||||
self.cmd.take_action(parsed_args)
|
||||
|
@ -501,6 +518,9 @@ class TestImportNodeMultiArch(fakes.TestOvercloudNode):
|
|||
self.addCleanup(os.unlink, self.json_file.name)
|
||||
|
||||
self.workflow = self.app.client_manager.workflow_engine
|
||||
execution = mock.Mock()
|
||||
execution.id = "IDID"
|
||||
self.workflow.executions.create.return_value = execution
|
||||
client = self.app.client_manager.tripleoclient
|
||||
self.websocket = client.messaging_websocket()
|
||||
|
||||
|
@ -528,7 +548,8 @@ class TestImportNodeMultiArch(fakes.TestOvercloudNode):
|
|||
"message": "Success",
|
||||
"registered_nodes": [{
|
||||
"uuid": "MOCK_NODE_UUID"
|
||||
}]
|
||||
}],
|
||||
"execution": {"id": "IDID"}
|
||||
}]
|
||||
|
||||
self.cmd.take_action(parsed_args)
|
||||
|
@ -625,11 +646,15 @@ class TestConfigureNode(fakes.TestOvercloudNode):
|
|||
super(TestConfigureNode, self).setUp()
|
||||
|
||||
self.workflow = self.app.client_manager.workflow_engine
|
||||
execution = mock.Mock()
|
||||
execution.id = "IDID"
|
||||
self.workflow.executions.create.return_value = execution
|
||||
client = self.app.client_manager.tripleoclient
|
||||
self.websocket = client.messaging_websocket()
|
||||
self.websocket.wait_for_messages.return_value = iter([{
|
||||
"status": "SUCCESS",
|
||||
"message": ""
|
||||
"message": "",
|
||||
"execution": {"id": "IDID"}
|
||||
}])
|
||||
|
||||
# Get the command object to test
|
||||
|
@ -656,7 +681,8 @@ class TestConfigureNode(fakes.TestOvercloudNode):
|
|||
def test_failed_to_configure_all_manageable_nodes(self):
|
||||
self.websocket.wait_for_messages.return_value = iter([{
|
||||
"status": "FAILED",
|
||||
"message": "Test failure."
|
||||
"message": "Test failure.",
|
||||
"execution": {"id": "IDID"}
|
||||
}])
|
||||
|
||||
parsed_args = self.check_parser(self.cmd, ['--all-manageable'], [])
|
||||
|
@ -684,7 +710,8 @@ class TestConfigureNode(fakes.TestOvercloudNode):
|
|||
def test_failed_to_configure_specified_nodes(self):
|
||||
self.websocket.wait_for_messages.return_value = iter([{
|
||||
"status": "FAILED",
|
||||
"message": "Test failure."
|
||||
"message": "Test failure.",
|
||||
"execution": {"id": "IDID"}
|
||||
}])
|
||||
|
||||
parsed_args = self.check_parser(self.cmd, ['node_uuid1'], [])
|
||||
|
@ -797,6 +824,9 @@ class TestDiscoverNode(fakes.TestOvercloudNode):
|
|||
super(TestDiscoverNode, self).setUp()
|
||||
|
||||
self.workflow = self.app.client_manager.workflow_engine
|
||||
execution = mock.Mock()
|
||||
execution.id = "IDID"
|
||||
self.workflow.executions.create.return_value = execution
|
||||
client = self.app.client_manager.tripleoclient
|
||||
self.websocket = client.messaging_websocket()
|
||||
|
||||
|
@ -807,7 +837,8 @@ class TestDiscoverNode(fakes.TestOvercloudNode):
|
|||
"message": "Success",
|
||||
"registered_nodes": [{
|
||||
"uuid": "MOCK_NODE_UUID"
|
||||
}]
|
||||
}],
|
||||
"execution": {"id": "IDID"}
|
||||
}]
|
||||
|
||||
def test_with_ip_range(self):
|
||||
|
|
|
@ -48,7 +48,10 @@ class TestOvercloudUpdatePrepare(utils.TestCommand):
|
|||
self.app.client_manager.baremetal = mock.Mock()
|
||||
self.app.client_manager.orchestration = mock.Mock()
|
||||
self.app.client_manager.tripleoclient = FakeClientWrapper()
|
||||
self.app.client_manager.workflow_engine = mock.Mock()
|
||||
workflow = execution = mock.Mock()
|
||||
execution.id = "IDID"
|
||||
workflow.executions.create.return_value = execution
|
||||
self.app.client_manager.workflow_engine = workflow
|
||||
|
||||
|
||||
class TestOvercloudUpdateRun(utils.TestCommand):
|
||||
|
|
|
@ -47,7 +47,7 @@ class TestOvercloudUpdatePrepare(fakes.TestOvercloudUpdatePrepare):
|
|||
@mock.patch('shutil.copytree', autospec=True)
|
||||
@mock.patch('six.moves.builtins.open')
|
||||
@mock.patch('tripleoclient.v1.overcloud_deploy.DeployOvercloud.'
|
||||
'_deploy_tripleo_heat_templates', autospec=True)
|
||||
'_deploy_tripleo_heat_templates_tmpdir', autospec=True)
|
||||
def test_update_out(self, mock_deploy, mock_open, mock_copy, mock_yaml,
|
||||
mock_abspath, mock_update, mock_logger,
|
||||
mock_get_stack):
|
||||
|
|
|
@ -48,7 +48,10 @@ class TestOvercloudUpgradePrepare(utils.TestCommand):
|
|||
self.app.client_manager.baremetal = mock.Mock()
|
||||
self.app.client_manager.orchestration = mock.Mock()
|
||||
self.app.client_manager.tripleoclient = FakeClientWrapper()
|
||||
self.app.client_manager.workflow_engine = mock.Mock()
|
||||
workflow = execution = mock.Mock()
|
||||
execution.id = "IDID"
|
||||
workflow.executions.create.return_value = execution
|
||||
self.app.client_manager.workflow_engine = workflow
|
||||
|
||||
|
||||
class TestOvercloudUpgradeRun(utils.TestCommand):
|
||||
|
|
|
@ -135,6 +135,9 @@ class TestOvercloudCreatePlan(utils.TestCommand):
|
|||
self.app.client_manager.tripleoclient = self.tripleoclient
|
||||
|
||||
self.workflow = self.app.client_manager.workflow_engine
|
||||
execution = mock.Mock()
|
||||
execution.id = "IDID"
|
||||
self.workflow.executions.create.return_value = execution
|
||||
self.swift = self.app.client_manager.tripleoclient.object_store
|
||||
|
||||
def test_create_default_plan(self):
|
||||
|
@ -376,6 +379,9 @@ class TestOvercloudDeployPlan(utils.TestCommand):
|
|||
self.cmd = overcloud_plan.DeployPlan(self.app, app_args)
|
||||
|
||||
self.workflow = self.app.client_manager.workflow_engine = mock.Mock()
|
||||
execution = mock.Mock()
|
||||
execution.id = "IDID"
|
||||
self.workflow.executions.create.return_value = execution
|
||||
self.orch = self.app.client_manager.orchestration = mock.Mock()
|
||||
|
||||
self.websocket = mock.Mock()
|
||||
|
|
|
@ -145,6 +145,9 @@ class TestOvercloudListRole(utils.TestCommand):
|
|||
self.tripleoclient = mock.Mock()
|
||||
self.tripleoclient.messaging_websocket.return_value = self.websocket
|
||||
self.app.client_manager.tripleoclient = self.tripleoclient
|
||||
execution = mock.Mock()
|
||||
execution.id = "IDID"
|
||||
self.workflow.executions.create.return_value = execution
|
||||
|
||||
def test_list_empty(self):
|
||||
self.websocket.wait_for_messages.return_value = [{
|
||||
|
@ -224,6 +227,9 @@ class TestOvercloudShowRole(utils.TestCommand):
|
|||
self.tripleoclient = mock.Mock()
|
||||
self.tripleoclient.messaging_websocket.return_value = self.websocket
|
||||
self.app.client_manager.tripleoclient = self.tripleoclient
|
||||
execution = mock.Mock()
|
||||
execution.id = "IDID"
|
||||
self.workflow.executions.create.return_value = execution
|
||||
|
||||
def test_role_not_found(self):
|
||||
self.websocket.wait_for_messages.return_value = [{
|
||||
|
|
|
@ -37,15 +37,18 @@ class TestCreateRAID(fakes.TestBaremetal):
|
|||
tripleoclient = self.app.client_manager.tripleoclient
|
||||
websocket = tripleoclient.messaging_websocket()
|
||||
websocket.wait_for_messages.return_value = iter([
|
||||
{'status': "SUCCESS"}
|
||||
{'status': "SUCCESS",
|
||||
'execution': {'id': 'IDID'}}
|
||||
])
|
||||
self.websocket = websocket
|
||||
|
||||
self.workflow.executions.create.return_value = mock.MagicMock(
|
||||
execution = mock.MagicMock(
|
||||
output=json.dumps({
|
||||
"result": None
|
||||
})
|
||||
)
|
||||
execution.id = "IDID"
|
||||
self.workflow.executions.create.return_value = execution
|
||||
|
||||
def test_ok(self):
|
||||
conf = json.dumps(self.conf)
|
||||
|
|
|
@ -32,6 +32,9 @@ class TestBaremetalWorkflows(utils.TestCommand):
|
|||
self.websocket.__exit__ = lambda s, *exc: None
|
||||
self.tripleoclient.messaging_websocket.return_value = self.websocket
|
||||
self.app.client_manager.tripleoclient = self.tripleoclient
|
||||
execution = mock.Mock()
|
||||
execution.id = "IDID"
|
||||
self.workflow.executions.create.return_value = execution
|
||||
|
||||
self.message_success = iter([{
|
||||
"execution": {"id": "IDID"},
|
||||
|
|
|
@ -25,11 +25,13 @@ class TestBaseWorkflows(utils.TestCommand):
|
|||
def test_wait_for_messages_success(self):
|
||||
payload_a = {
|
||||
'status': 'ERROR',
|
||||
'execution': {'id': 2}
|
||||
'execution': {'id': 2,
|
||||
'root_execution_id': 1}
|
||||
}
|
||||
payload_b = {
|
||||
'status': 'ERROR',
|
||||
'execution': {'id': 1}
|
||||
'execution': {'id': 1,
|
||||
'root_execution_id': 1}
|
||||
}
|
||||
|
||||
mistral = mock.Mock()
|
||||
|
@ -59,6 +61,31 @@ class TestBaseWorkflows(utils.TestCommand):
|
|||
self.assertTrue(mistral.executions.get.called)
|
||||
websocket.wait_for_messages.assert_called_with(timeout=None)
|
||||
|
||||
def test_wait_for_messages_different_execution(self):
|
||||
payload_a = {
|
||||
'status': 'RUNNING',
|
||||
'execution': {'id': 'aaaa',
|
||||
'root_execution_id': 'aaaa'}
|
||||
}
|
||||
payload_b = {
|
||||
'status': 'RUNNING',
|
||||
'execution': {'id': 'bbbb',
|
||||
'root_execution_id': 'bbbb'}
|
||||
}
|
||||
|
||||
mistral = mock.Mock()
|
||||
websocket = mock.Mock()
|
||||
websocket.wait_for_messages.return_value = iter([payload_a, payload_b])
|
||||
execution = mock.Mock()
|
||||
execution.id = 'aaaa'
|
||||
|
||||
messages = list(base.wait_for_messages(mistral, websocket, execution))
|
||||
|
||||
# Assert only payload_a was returned
|
||||
self.assertEqual([payload_a], messages)
|
||||
|
||||
websocket.wait_for_messages.assert_called_with(timeout=None)
|
||||
|
||||
def test_call_action_success(self):
|
||||
mistral = mock.Mock()
|
||||
action = 'test-action'
|
||||
|
|
|
@ -42,6 +42,9 @@ class TestParameterWorkflows(utils.TestCommand):
|
|||
self.websocket.__exit__ = lambda s, *exc: None
|
||||
self.tripleoclient.messaging_websocket.return_value = self.websocket
|
||||
self.app.client_manager.tripleoclient = self.tripleoclient
|
||||
execution = mock.Mock()
|
||||
execution.id = "IDID"
|
||||
self.workflow.executions.create.return_value = execution
|
||||
|
||||
def test_get_overcloud_passwords(self):
|
||||
self.websocket.wait_for_messages.return_value = iter([{
|
||||
|
|
|
@ -43,7 +43,9 @@ class TestPlanCreationWorkflows(utils.TestCommand):
|
|||
autospec=True)
|
||||
def test_create_plan_from_templates_success(self, mock_tarball):
|
||||
output = mock.Mock(output='{"result": ""}')
|
||||
output.id = "IDID"
|
||||
self.workflow.action_executions.create.return_value = output
|
||||
self.workflow.executions.create.return_value = output
|
||||
self.websocket.wait_for_messages.return_value = self.message_success
|
||||
|
||||
plan_management.create_plan_from_templates(
|
||||
|
@ -65,7 +67,9 @@ class TestPlanCreationWorkflows(utils.TestCommand):
|
|||
autospec=True)
|
||||
def test_create_plan_from_templates_container_error(self, mock_tarball):
|
||||
error = mock.Mock(output='{"result": "Error"}')
|
||||
error.id = "IDID"
|
||||
self.workflow.action_executions.create.return_value = error
|
||||
self.workflow.executions.create.return_value = error
|
||||
|
||||
self.assertRaises(
|
||||
exceptions.PlanCreationError,
|
||||
|
@ -87,7 +91,9 @@ class TestPlanCreationWorkflows(utils.TestCommand):
|
|||
def test_create_plan_from_templates_roles_data(self, mock_tarball,
|
||||
mock_norm_path):
|
||||
output = mock.Mock(output='{"result": ""}')
|
||||
output.id = "IDID"
|
||||
self.workflow.action_executions.create.return_value = output
|
||||
self.workflow.executions.create.return_value = output
|
||||
self.websocket.wait_for_messages.return_value = self.message_success
|
||||
|
||||
mock_open_context = mock.mock_open()
|
||||
|
@ -118,7 +124,9 @@ class TestPlanCreationWorkflows(utils.TestCommand):
|
|||
autospec=True)
|
||||
def test_create_plan_from_templates_plan_env_data(self, mock_tarball):
|
||||
output = mock.Mock(output='{"result": ""}')
|
||||
output.id = "IDID"
|
||||
self.workflow.action_executions.create.return_value = output
|
||||
self.workflow.executions.create.return_value = output
|
||||
self.websocket.wait_for_messages.return_value = self.message_success
|
||||
|
||||
mock_open_context = mock.mock_open()
|
||||
|
@ -149,7 +157,9 @@ class TestPlanCreationWorkflows(utils.TestCommand):
|
|||
autospec=True)
|
||||
def test_create_plan_from_templates_networks_data(self, mock_tarball):
|
||||
output = mock.Mock(output='{"result": ""}')
|
||||
output.id = "IDID"
|
||||
self.workflow.action_executions.create.return_value = output
|
||||
self.workflow.executions.create.return_value = output
|
||||
self.websocket.wait_for_messages.return_value = self.message_success
|
||||
|
||||
mock_open_context = mock.mock_open()
|
||||
|
@ -178,7 +188,9 @@ class TestPlanCreationWorkflows(utils.TestCommand):
|
|||
|
||||
def test_delete_plan(self):
|
||||
output = mock.Mock(output='{"result": ""}')
|
||||
output.id = "IDID"
|
||||
self.workflow.action_executions.create.return_value = output
|
||||
self.workflow.executions.create.return_value = output
|
||||
self.websocket.wait_for_messages.return_value = self.message_success
|
||||
|
||||
plan_management.delete_deployment_plan(
|
||||
|
@ -193,7 +205,9 @@ class TestPlanCreationWorkflows(utils.TestCommand):
|
|||
autospec=True)
|
||||
def test_create_plan_with_password_gen_disabled(self, mock_tarball):
|
||||
output = mock.Mock(output='{"result": ""}')
|
||||
output.id = "IDID"
|
||||
self.workflow.action_executions.create.return_value = output
|
||||
self.workflow.executions.create.return_value = output
|
||||
self.websocket.wait_for_messages.return_value = self.message_success
|
||||
|
||||
plan_management.create_plan_from_templates(
|
||||
|
@ -226,8 +240,10 @@ class TestPlanUpdateWorkflows(base.TestCommand):
|
|||
self.websocket.__enter__ = lambda s: self.websocket
|
||||
self.websocket.__exit__ = lambda s, *exc: None
|
||||
self.tripleoclient.messaging_websocket.return_value = self.websocket
|
||||
self.workflow.action_executions.create.return_value = mock.Mock(
|
||||
output='{"result": ""}')
|
||||
output = mock.Mock(output='{"result": ""}')
|
||||
output.id = "IDID"
|
||||
self.workflow.action_executions.create.return_value = output
|
||||
self.workflow.executions.create.return_value = output
|
||||
self.message_success = iter([{
|
||||
"execution": {"id": "IDID"},
|
||||
"status": "SUCCESS",
|
||||
|
|
|
@ -59,7 +59,16 @@ def wait_for_messages(mistral, websocket, execution, timeout=None):
|
|||
"""
|
||||
try:
|
||||
for payload in websocket.wait_for_messages(timeout=timeout):
|
||||
yield payload
|
||||
# Ignore messages whose root_execution_id does not match the
|
||||
# id of the execution for which we are waiting.
|
||||
if payload['execution']['id'] != execution.id and \
|
||||
payload['execution'].get('root_execution_id', '') != \
|
||||
execution.id:
|
||||
|
||||
LOG.debug("Ignoring message from execution %s"
|
||||
% payload['execution']['id'])
|
||||
else:
|
||||
yield payload
|
||||
# If the message is from a sub-workflow, we just need to pass it
|
||||
# on to be displayed. This should never be the last message - so
|
||||
# continue and wait for the next.
|
||||
|
|
Loading…
Reference in New Issue