Filter messages not from waiting execution

The convention is to use the same queue name ("tripleo") for all
workflows. This can lead to messages showing from other tripleoclient
triggered workflows showing up during message polling if multiple
workflows are running at the same time.

This patch adds a check that will filter out any messages that do not
belong to the execution that is being waited on by comparing the
execution id with the root_execution_id returned in the execution
payload.

Depends-On: Icbe80c338d69efc6ce8fceb0f73f833bec588536
Change-Id: Ie6473d6a1044cdf76552d62645b4d63da2df9398
Related-Bug: #1794277
(cherry picked from commit 339c1f334c)
This commit is contained in:
James Slagle 2018-09-26 16:08:10 -04:00 committed by Dougal Matthews
parent f7badfea52
commit 074bb6e3a0
15 changed files with 149 additions and 26 deletions

View File

@ -45,7 +45,8 @@ class TestOvercloudCredentials(test_plugin.TestPluginV1):
parsed_args = self.check_parser(self.cmd, arglist, verifylist) parsed_args = self.check_parser(self.cmd, arglist, verifylist)
with mock.patch("tripleoclient.utils.open", create=True) as m: with mock.patch("tripleoclient.utils.open", create=True) as m:
self.cmd.take_action(parsed_args) with mock.patch("json.loads"):
self.cmd.take_action(parsed_args)
self.assertIn(mock.call('./overcloudrc', 'w'), m.call_args_list) self.assertIn(mock.call('./overcloudrc', 'w'), m.call_args_list)
mock_chmod.assert_has_calls([ mock_chmod.assert_has_calls([
@ -69,7 +70,8 @@ class TestOvercloudCredentials(test_plugin.TestPluginV1):
parsed_args = self.check_parser(self.cmd, arglist, verifylist) parsed_args = self.check_parser(self.cmd, arglist, verifylist)
with mock.patch("tripleoclient.utils.open", create=True) as m: with mock.patch("tripleoclient.utils.open", create=True) as m:
self.cmd.take_action(parsed_args) with mock.patch("json.loads"):
self.cmd.take_action(parsed_args)
path = "{}/overcloudrc".format(temp) path = "{}/overcloudrc".format(temp)

View File

@ -48,7 +48,10 @@ class TestCephUpgrade(utils.TestCommand):
self.app.client_manager.baremetal = mock.Mock() self.app.client_manager.baremetal = mock.Mock()
self.app.client_manager.orchestration = mock.Mock() self.app.client_manager.orchestration = mock.Mock()
self.app.client_manager.tripleoclient = FakeClientWrapper() self.app.client_manager.tripleoclient = FakeClientWrapper()
self.app.client_manager.workflow_engine = mock.Mock() workflow = execution = mock.Mock()
execution.id = "IDID"
workflow.executions.create.return_value = execution
self.app.client_manager.workflow_engine = workflow
class TestCephUpgradeConverge(utils.TestCommand): class TestCephUpgradeConverge(utils.TestCommand):

View File

@ -140,5 +140,8 @@ class TestDeployOvercloud(utils.TestCommand):
self.app.client_manager.image = mock.Mock() self.app.client_manager.image = mock.Mock()
self.app.client_manager.network = mock.Mock() self.app.client_manager.network = mock.Mock()
self.app.client_manager.orchestration = mock.Mock() self.app.client_manager.orchestration = mock.Mock()
self.app.client_manager.workflow_engine = mock.Mock() workflow = execution = mock.Mock()
execution.id = "IDID"
workflow.executions.create.return_value = execution
self.app.client_manager.workflow_engine = workflow
self.app.client_manager.tripleoclient = FakeClientWrapper() self.app.client_manager.tripleoclient = FakeClientWrapper()

View File

@ -48,7 +48,10 @@ class TestFFWDUpgradePrepare(utils.TestCommand):
self.app.client_manager.baremetal = mock.Mock() self.app.client_manager.baremetal = mock.Mock()
self.app.client_manager.orchestration = mock.Mock() self.app.client_manager.orchestration = mock.Mock()
self.app.client_manager.tripleoclient = FakeClientWrapper() self.app.client_manager.tripleoclient = FakeClientWrapper()
self.app.client_manager.workflow_engine = mock.Mock() workflow = execution = mock.Mock()
execution.id = "IDID"
workflow.executions.create.return_value = execution
self.app.client_manager.workflow_engine = workflow
class TestFFWDUpgradeRun(utils.TestCommand): class TestFFWDUpgradeRun(utils.TestCommand):
@ -70,4 +73,7 @@ class TestFFWDUpgradeConverge(utils.TestCommand):
self.app.client_manager.baremetal = mock.Mock() self.app.client_manager.baremetal = mock.Mock()
self.app.client_manager.orchestration = mock.Mock() self.app.client_manager.orchestration = mock.Mock()
self.app.client_manager.tripleoclient = FakeClientWrapper() self.app.client_manager.tripleoclient = FakeClientWrapper()
self.app.client_manager.workflow_engine = mock.Mock() workflow = execution = mock.Mock()
execution.id = "IDID"
workflow.executions.create.return_value = execution
self.app.client_manager.workflow_engine = workflow

View File

@ -13,6 +13,7 @@
# under the License. # under the License.
# #
import collections
import copy import copy
import json import json
import mock import mock
@ -46,6 +47,9 @@ class TestDeleteNode(fakes.TestDeleteNode):
self.workflow = self.app.client_manager.workflow_engine self.workflow = self.app.client_manager.workflow_engine
self.stack_name = self.app.client_manager.orchestration.stacks.get self.stack_name = self.app.client_manager.orchestration.stacks.get
self.stack_name.return_value = mock.Mock(stack_name="overcloud") self.stack_name.return_value = mock.Mock(stack_name="overcloud")
execution = mock.Mock()
execution.id = "IDID"
self.workflow.executions.create.return_value = execution
# TODO(someone): This test does not pass with autospec=True, it should # TODO(someone): This test does not pass with autospec=True, it should
# probably be fixed so that it can pass with that. # probably be fixed so that it can pass with that.
@ -132,6 +136,7 @@ class TestDeleteNode(fakes.TestDeleteNode):
self.websocket.wait_for_messages.return_value = iter([{ self.websocket.wait_for_messages.return_value = iter([{
"status": "FAILED", "status": "FAILED",
"execution": {"id": "IDID"},
"message": """Failed to run action ERROR: Couldn't find \ "message": """Failed to run action ERROR: Couldn't find \
following instances in stack overcloud: wrong_instance""" following instances in stack overcloud: wrong_instance"""
}]) }])
@ -155,6 +160,9 @@ class TestProvideNode(fakes.TestOvercloudNode):
super(TestProvideNode, self).setUp() super(TestProvideNode, self).setUp()
self.workflow = self.app.client_manager.workflow_engine self.workflow = self.app.client_manager.workflow_engine
execution = mock.Mock()
execution.id = "IDID"
self.workflow.executions.create.return_value = execution
client = self.app.client_manager.tripleoclient client = self.app.client_manager.tripleoclient
self.websocket = client.messaging_websocket() self.websocket = client.messaging_websocket()
@ -163,7 +171,8 @@ class TestProvideNode(fakes.TestOvercloudNode):
self.websocket.wait_for_messages.return_value = iter([{ self.websocket.wait_for_messages.return_value = iter([{
"status": "SUCCESS", "status": "SUCCESS",
"message": "Success" "message": "Success",
"execution": {"id": "IDID"}
}]) }])
def test_provide_all_manageable_nodes(self): def test_provide_all_manageable_nodes(self):
@ -227,6 +236,9 @@ class TestIntrospectNode(fakes.TestOvercloudNode):
super(TestIntrospectNode, self).setUp() super(TestIntrospectNode, self).setUp()
self.workflow = self.app.client_manager.workflow_engine self.workflow = self.app.client_manager.workflow_engine
execution = mock.Mock()
execution.id = "IDID"
self.workflow.executions.create.return_value = execution
client = self.app.client_manager.tripleoclient client = self.app.client_manager.tripleoclient
self.websocket = client.messaging_websocket() self.websocket = client.messaging_websocket()
@ -237,7 +249,8 @@ class TestIntrospectNode(fakes.TestOvercloudNode):
self.websocket.wait_for_messages.return_value = iter([{ self.websocket.wait_for_messages.return_value = iter([{
"status": "SUCCESS", "status": "SUCCESS",
"message": "Success", "message": "Success",
"introspected_nodes": {} "introspected_nodes": {},
"execution": {"id": "IDID"}
}] * 2) }] * 2)
self.cmd.take_action(parsed_args) self.cmd.take_action(parsed_args)
@ -261,6 +274,7 @@ class TestIntrospectNode(fakes.TestOvercloudNode):
self.websocket.wait_for_messages.return_value = [{ self.websocket.wait_for_messages.return_value = [{
"status": "SUCCESS", "status": "SUCCESS",
"message": "Success", "message": "Success",
"execution": {"id": "IDID"},
}] }]
self.cmd.take_action(parsed_args) self.cmd.take_action(parsed_args)
@ -347,7 +361,6 @@ class TestImportNode(fakes.TestOvercloudNode):
"00:0b:d0:69:7e:58" "00:0b:d0:69:7e:58"
] ]
}] }]
self.json_file = tempfile.NamedTemporaryFile( self.json_file = tempfile.NamedTemporaryFile(
mode='w', delete=False, suffix='.json') mode='w', delete=False, suffix='.json')
json.dump(self.nodes_list, self.json_file) json.dump(self.nodes_list, self.json_file)
@ -355,12 +368,23 @@ class TestImportNode(fakes.TestOvercloudNode):
self.addCleanup(os.unlink, self.json_file.name) self.addCleanup(os.unlink, self.json_file.name)
self.workflow = self.app.client_manager.workflow_engine self.workflow = self.app.client_manager.workflow_engine
execution = mock.Mock()
execution.id = "IDID"
self.workflow.executions.create.return_value = execution
client = self.app.client_manager.tripleoclient client = self.app.client_manager.tripleoclient
self.websocket = client.messaging_websocket() self.websocket = client.messaging_websocket()
# Get the command object to test # Get the command object to test
self.cmd = overcloud_node.ImportNode(self.app, None) self.cmd = overcloud_node.ImportNode(self.app, None)
image = collections.namedtuple('image', ['id', 'name'])
self.app.client_manager.image = mock.Mock()
self.app.client_manager.image.images.list.return_value = [
image(id=1, name='bm-deploy-kernel'),
image(id=2, name='bm-deploy-ramdisk'),
image(id=3, name='overcloud-full'),
]
def _check_workflow_call(self, parsed_args, introspect=False, def _check_workflow_call(self, parsed_args, introspect=False,
provide=False, local=True, no_deploy_image=False): provide=False, local=True, no_deploy_image=False):
self.websocket.wait_for_messages.return_value = [{ self.websocket.wait_for_messages.return_value = [{
@ -368,7 +392,8 @@ class TestImportNode(fakes.TestOvercloudNode):
"message": "Success", "message": "Success",
"registered_nodes": [{ "registered_nodes": [{
"uuid": "MOCK_NODE_UUID" "uuid": "MOCK_NODE_UUID"
}] }],
"execution": {"id": "IDID"}
}] }]
self.cmd.take_action(parsed_args) self.cmd.take_action(parsed_args)
@ -459,11 +484,15 @@ class TestConfigureNode(fakes.TestOvercloudNode):
super(TestConfigureNode, self).setUp() super(TestConfigureNode, self).setUp()
self.workflow = self.app.client_manager.workflow_engine self.workflow = self.app.client_manager.workflow_engine
execution = mock.Mock()
execution.id = "IDID"
self.workflow.executions.create.return_value = execution
client = self.app.client_manager.tripleoclient client = self.app.client_manager.tripleoclient
self.websocket = client.messaging_websocket() self.websocket = client.messaging_websocket()
self.websocket.wait_for_messages.return_value = iter([{ self.websocket.wait_for_messages.return_value = iter([{
"status": "SUCCESS", "status": "SUCCESS",
"message": "" "message": "",
"execution": {"id": "IDID"}
}]) }])
# Get the command object to test # Get the command object to test
@ -490,7 +519,8 @@ class TestConfigureNode(fakes.TestOvercloudNode):
def test_failed_to_configure_all_manageable_nodes(self): def test_failed_to_configure_all_manageable_nodes(self):
self.websocket.wait_for_messages.return_value = iter([{ self.websocket.wait_for_messages.return_value = iter([{
"status": "FAILED", "status": "FAILED",
"message": "Test failure." "message": "Test failure.",
"execution": {"id": "IDID"}
}]) }])
parsed_args = self.check_parser(self.cmd, ['--all-manageable'], []) parsed_args = self.check_parser(self.cmd, ['--all-manageable'], [])
@ -518,7 +548,8 @@ class TestConfigureNode(fakes.TestOvercloudNode):
def test_failed_to_configure_specified_nodes(self): def test_failed_to_configure_specified_nodes(self):
self.websocket.wait_for_messages.return_value = iter([{ self.websocket.wait_for_messages.return_value = iter([{
"status": "FAILED", "status": "FAILED",
"message": "Test failure." "message": "Test failure.",
"execution": {"id": "IDID"}
}]) }])
parsed_args = self.check_parser(self.cmd, ['node_uuid1'], []) parsed_args = self.check_parser(self.cmd, ['node_uuid1'], [])
@ -631,6 +662,9 @@ class TestDiscoverNode(fakes.TestOvercloudNode):
super(TestDiscoverNode, self).setUp() super(TestDiscoverNode, self).setUp()
self.workflow = self.app.client_manager.workflow_engine self.workflow = self.app.client_manager.workflow_engine
execution = mock.Mock()
execution.id = "IDID"
self.workflow.executions.create.return_value = execution
client = self.app.client_manager.tripleoclient client = self.app.client_manager.tripleoclient
self.websocket = client.messaging_websocket() self.websocket = client.messaging_websocket()
@ -641,7 +675,8 @@ class TestDiscoverNode(fakes.TestOvercloudNode):
"message": "Success", "message": "Success",
"registered_nodes": [{ "registered_nodes": [{
"uuid": "MOCK_NODE_UUID" "uuid": "MOCK_NODE_UUID"
}] }],
"execution": {"id": "IDID"}
}] }]
def test_with_ip_range(self): def test_with_ip_range(self):

View File

@ -48,7 +48,10 @@ class TestOvercloudUpdatePrepare(utils.TestCommand):
self.app.client_manager.baremetal = mock.Mock() self.app.client_manager.baremetal = mock.Mock()
self.app.client_manager.orchestration = mock.Mock() self.app.client_manager.orchestration = mock.Mock()
self.app.client_manager.tripleoclient = FakeClientWrapper() self.app.client_manager.tripleoclient = FakeClientWrapper()
self.app.client_manager.workflow_engine = mock.Mock() workflow = execution = mock.Mock()
execution.id = "IDID"
workflow.executions.create.return_value = execution
self.app.client_manager.workflow_engine = workflow
class TestOvercloudUpdateRun(utils.TestCommand): class TestOvercloudUpdateRun(utils.TestCommand):

View File

@ -47,7 +47,7 @@ class TestOvercloudUpdatePrepare(fakes.TestOvercloudUpdatePrepare):
@mock.patch('shutil.copytree', autospec=True) @mock.patch('shutil.copytree', autospec=True)
@mock.patch('six.moves.builtins.open') @mock.patch('six.moves.builtins.open')
@mock.patch('tripleoclient.v1.overcloud_deploy.DeployOvercloud.' @mock.patch('tripleoclient.v1.overcloud_deploy.DeployOvercloud.'
'_deploy_tripleo_heat_templates', autospec=True) '_deploy_tripleo_heat_templates_tmpdir', autospec=True)
def test_update_out(self, mock_deploy, mock_open, mock_copy, mock_yaml, def test_update_out(self, mock_deploy, mock_open, mock_copy, mock_yaml,
mock_abspath, mock_update, mock_logger, mock_abspath, mock_update, mock_logger,
mock_get_stack): mock_get_stack):

View File

@ -48,7 +48,10 @@ class TestOvercloudUpgradePrepare(utils.TestCommand):
self.app.client_manager.baremetal = mock.Mock() self.app.client_manager.baremetal = mock.Mock()
self.app.client_manager.orchestration = mock.Mock() self.app.client_manager.orchestration = mock.Mock()
self.app.client_manager.tripleoclient = FakeClientWrapper() self.app.client_manager.tripleoclient = FakeClientWrapper()
self.app.client_manager.workflow_engine = mock.Mock() workflow = execution = mock.Mock()
execution.id = "IDID"
workflow.executions.create.return_value = execution
self.app.client_manager.workflow_engine = workflow
class TestOvercloudUpgradeRun(utils.TestCommand): class TestOvercloudUpgradeRun(utils.TestCommand):

View File

@ -120,6 +120,9 @@ class TestOvercloudCreatePlan(utils.TestCommand):
self.app.client_manager.tripleoclient = self.tripleoclient self.app.client_manager.tripleoclient = self.tripleoclient
self.workflow = self.app.client_manager.workflow_engine self.workflow = self.app.client_manager.workflow_engine
execution = mock.Mock()
execution.id = "IDID"
self.workflow.executions.create.return_value = execution
self.swift = self.app.client_manager.tripleoclient.object_store self.swift = self.app.client_manager.tripleoclient.object_store
def test_create_default_plan(self): def test_create_default_plan(self):
@ -364,6 +367,9 @@ class TestOvercloudDeployPlan(utils.TestCommand):
self.cmd = overcloud_plan.DeployPlan(self.app, app_args) self.cmd = overcloud_plan.DeployPlan(self.app, app_args)
self.workflow = self.app.client_manager.workflow_engine = mock.Mock() self.workflow = self.app.client_manager.workflow_engine = mock.Mock()
execution = mock.Mock()
execution.id = "IDID"
self.workflow.executions.create.return_value = execution
self.orch = self.app.client_manager.orchestration = mock.Mock() self.orch = self.app.client_manager.orchestration = mock.Mock()
self.websocket = mock.Mock() self.websocket = mock.Mock()

View File

@ -37,15 +37,18 @@ class TestCreateRAID(fakes.TestBaremetal):
tripleoclient = self.app.client_manager.tripleoclient tripleoclient = self.app.client_manager.tripleoclient
websocket = tripleoclient.messaging_websocket() websocket = tripleoclient.messaging_websocket()
websocket.wait_for_messages.return_value = iter([ websocket.wait_for_messages.return_value = iter([
{'status': "SUCCESS"} {'status': "SUCCESS",
'execution': {'id': 'IDID'}}
]) ])
self.websocket = websocket self.websocket = websocket
self.workflow.executions.create.return_value = mock.MagicMock( execution = mock.MagicMock(
output=json.dumps({ output=json.dumps({
"result": None "result": None
}) })
) )
execution.id = "IDID"
self.workflow.executions.create.return_value = execution
def test_ok(self): def test_ok(self):
conf = json.dumps(self.conf) conf = json.dumps(self.conf)

View File

@ -32,6 +32,9 @@ class TestBaremetalWorkflows(utils.TestCommand):
self.websocket.__exit__ = lambda s, *exc: None self.websocket.__exit__ = lambda s, *exc: None
self.tripleoclient.messaging_websocket.return_value = self.websocket self.tripleoclient.messaging_websocket.return_value = self.websocket
self.app.client_manager.tripleoclient = self.tripleoclient self.app.client_manager.tripleoclient = self.tripleoclient
execution = mock.Mock()
execution.id = "IDID"
self.workflow.executions.create.return_value = execution
self.message_success = iter([{ self.message_success = iter([{
"execution": {"id": "IDID"}, "execution": {"id": "IDID"},

View File

@ -25,11 +25,13 @@ class TestBaseWorkflows(utils.TestCommand):
def test_wait_for_messages_success(self): def test_wait_for_messages_success(self):
payload_a = { payload_a = {
'status': 'ERROR', 'status': 'ERROR',
'execution': {'id': 2} 'execution': {'id': 2,
'root_execution_id': 1}
} }
payload_b = { payload_b = {
'status': 'ERROR', 'status': 'ERROR',
'execution': {'id': 1} 'execution': {'id': 1,
'root_execution_id': 1}
} }
mistral = mock.Mock() mistral = mock.Mock()
@ -74,6 +76,31 @@ class TestBaseWorkflows(utils.TestCommand):
self.assertTrue(mistral.executions.get.called) self.assertTrue(mistral.executions.get.called)
websocket.wait_for_messages.assert_called_with(timeout=None) websocket.wait_for_messages.assert_called_with(timeout=None)
def test_wait_for_messages_different_execution(self):
payload_a = {
'status': 'RUNNING',
'execution': {'id': 'aaaa',
'root_execution_id': 'aaaa'}
}
payload_b = {
'status': 'RUNNING',
'execution': {'id': 'bbbb',
'root_execution_id': 'bbbb'}
}
mistral = mock.Mock()
websocket = mock.Mock()
websocket.wait_for_messages.return_value = iter([payload_a, payload_b])
execution = mock.Mock()
execution.id = 'aaaa'
messages = list(base.wait_for_messages(mistral, websocket, execution))
# Assert only payload_a was returned
self.assertEqual([payload_a], messages)
websocket.wait_for_messages.assert_called_with(timeout=None)
def test_call_action_success(self): def test_call_action_success(self):
mistral = mock.Mock() mistral = mock.Mock()
action = 'test-action' action = 'test-action'

View File

@ -42,6 +42,9 @@ class TestParameterWorkflows(utils.TestCommand):
self.websocket.__exit__ = lambda s, *exc: None self.websocket.__exit__ = lambda s, *exc: None
self.tripleoclient.messaging_websocket.return_value = self.websocket self.tripleoclient.messaging_websocket.return_value = self.websocket
self.app.client_manager.tripleoclient = self.tripleoclient self.app.client_manager.tripleoclient = self.tripleoclient
execution = mock.Mock()
execution.id = "IDID"
self.workflow.executions.create.return_value = execution
def test_get_overcloud_passwords(self): def test_get_overcloud_passwords(self):
self.websocket.wait_for_messages.return_value = iter([{ self.websocket.wait_for_messages.return_value = iter([{

View File

@ -43,7 +43,9 @@ class TestPlanCreationWorkflows(utils.TestCommand):
autospec=True) autospec=True)
def test_create_plan_from_templates_success(self, mock_tarball): def test_create_plan_from_templates_success(self, mock_tarball):
output = mock.Mock(output='{"result": ""}') output = mock.Mock(output='{"result": ""}')
output.id = "IDID"
self.workflow.action_executions.create.return_value = output self.workflow.action_executions.create.return_value = output
self.workflow.executions.create.return_value = output
self.websocket.wait_for_messages.return_value = self.message_success self.websocket.wait_for_messages.return_value = self.message_success
plan_management.create_plan_from_templates( plan_management.create_plan_from_templates(
@ -67,7 +69,9 @@ class TestPlanCreationWorkflows(utils.TestCommand):
autospec=True) autospec=True)
def test_create_plan_from_templates_container_error(self, mock_tarball): def test_create_plan_from_templates_container_error(self, mock_tarball):
error = mock.Mock(output='{"result": "Error"}') error = mock.Mock(output='{"result": "Error"}')
error.id = "IDID"
self.workflow.action_executions.create.return_value = error self.workflow.action_executions.create.return_value = error
self.workflow.executions.create.return_value = error
self.assertRaises( self.assertRaises(
exceptions.PlanCreationError, exceptions.PlanCreationError,
@ -88,7 +92,9 @@ class TestPlanCreationWorkflows(utils.TestCommand):
autospec=True) autospec=True)
def test_create_plan_from_templates_roles_data(self, mock_tarball): def test_create_plan_from_templates_roles_data(self, mock_tarball):
output = mock.Mock(output='{"result": ""}') output = mock.Mock(output='{"result": ""}')
output.id = "IDID"
self.workflow.action_executions.create.return_value = output self.workflow.action_executions.create.return_value = output
self.workflow.executions.create.return_value = output
self.websocket.wait_for_messages.return_value = self.message_success self.websocket.wait_for_messages.return_value = self.message_success
mock_open_context = mock.mock_open() mock_open_context = mock.mock_open()
@ -121,7 +127,9 @@ class TestPlanCreationWorkflows(utils.TestCommand):
autospec=True) autospec=True)
def test_create_plan_from_templates_plan_env_data(self, mock_tarball): def test_create_plan_from_templates_plan_env_data(self, mock_tarball):
output = mock.Mock(output='{"result": ""}') output = mock.Mock(output='{"result": ""}')
output.id = "IDID"
self.workflow.action_executions.create.return_value = output self.workflow.action_executions.create.return_value = output
self.workflow.executions.create.return_value = output
self.websocket.wait_for_messages.return_value = self.message_success self.websocket.wait_for_messages.return_value = self.message_success
mock_open_context = mock.mock_open() mock_open_context = mock.mock_open()
@ -154,7 +162,9 @@ class TestPlanCreationWorkflows(utils.TestCommand):
autospec=True) autospec=True)
def test_create_plan_from_templates_networks_data(self, mock_tarball): def test_create_plan_from_templates_networks_data(self, mock_tarball):
output = mock.Mock(output='{"result": ""}') output = mock.Mock(output='{"result": ""}')
output.id = "IDID"
self.workflow.action_executions.create.return_value = output self.workflow.action_executions.create.return_value = output
self.workflow.executions.create.return_value = output
self.websocket.wait_for_messages.return_value = self.message_success self.websocket.wait_for_messages.return_value = self.message_success
mock_open_context = mock.mock_open() mock_open_context = mock.mock_open()
@ -184,8 +194,11 @@ class TestPlanCreationWorkflows(utils.TestCommand):
'test-overcloud', 'network_data.yaml', mock_open_context()) 'test-overcloud', 'network_data.yaml', mock_open_context())
def test_delete_plan(self): def test_delete_plan(self):
self.workflow.action_executions.create.return_value = ( output = mock.Mock(output='{"result": ""}')
mock.Mock(output='{"result": null}')) output.id = "IDID"
self.workflow.action_executions.create.return_value = output
self.workflow.executions.create.return_value = output
self.websocket.wait_for_messages.return_value = self.message_success
plan_management.delete_deployment_plan( plan_management.delete_deployment_plan(
self.workflow, self.workflow,
@ -200,7 +213,9 @@ class TestPlanCreationWorkflows(utils.TestCommand):
autospec=True) autospec=True)
def test_create_plan_with_password_gen_disabled(self, mock_tarball): def test_create_plan_with_password_gen_disabled(self, mock_tarball):
output = mock.Mock(output='{"result": ""}') output = mock.Mock(output='{"result": ""}')
output.id = "IDID"
self.workflow.action_executions.create.return_value = output self.workflow.action_executions.create.return_value = output
self.workflow.executions.create.return_value = output
self.websocket.wait_for_messages.return_value = self.message_success self.websocket.wait_for_messages.return_value = self.message_success
plan_management.create_plan_from_templates( plan_management.create_plan_from_templates(
@ -235,8 +250,10 @@ class TestPlanUpdateWorkflows(base.TestCommand):
self.websocket.__enter__ = lambda s: self.websocket self.websocket.__enter__ = lambda s: self.websocket
self.websocket.__exit__ = lambda s, *exc: None self.websocket.__exit__ = lambda s, *exc: None
self.tripleoclient.messaging_websocket.return_value = self.websocket self.tripleoclient.messaging_websocket.return_value = self.websocket
self.workflow.action_executions.create.return_value = mock.Mock( output = mock.Mock(output='{"result": ""}')
output='{"result": ""}') output.id = "IDID"
self.workflow.action_executions.create.return_value = output
self.workflow.executions.create.return_value = output
self.message_success = iter([{ self.message_success = iter([{
"execution": {"id": "IDID"}, "execution": {"id": "IDID"},
"status": "SUCCESS", "status": "SUCCESS",

View File

@ -59,7 +59,16 @@ def wait_for_messages(mistral, websocket, execution, timeout=None):
""" """
try: try:
for payload in websocket.wait_for_messages(timeout=timeout): for payload in websocket.wait_for_messages(timeout=timeout):
yield payload # Ignore messages whose root_execution_id does not match the
# id of the execution for which we are waiting.
if payload['execution']['id'] != execution.id and \
payload['execution'].get('root_execution_id', '') != \
execution.id:
LOG.debug("Ignoring message from execution %s"
% payload['execution']['id'])
else:
yield payload
# If the message is from a sub-workflow, we just need to pass it # If the message is from a sub-workflow, we just need to pass it
# on to be displayed. This should never be the last message - so # on to be displayed. This should never be the last message - so
# continue and wait for the next. # continue and wait for the next.