Remove execution from workflow message send

Serializing all the execution in a message can make the message too big.
This change was done in tripleo-common. this supports that change
This change still supports the old format and is backwards compatible.

Partial-Bug: #1812172
Change-Id: I40ee028366222f38f5ba1db58d171f98be75d009
(cherry picked from commit cad7916ce8)
This commit is contained in:
apetrich 2019-01-15 14:16:00 +01:00 committed by Dougal Matthews
parent 074bb6e3a0
commit 092449a9c6
10 changed files with 125 additions and 45 deletions

View File

@ -47,7 +47,7 @@ class FakeWebSocket(object):
def wait_for_messages(self, timeout=None): def wait_for_messages(self, timeout=None):
yield { yield {
'execution': {'id': 'IDID'}, 'execution_id': 'IDID',
'status': 'SUCCESS', 'status': 'SUCCESS',
} }

View File

@ -66,7 +66,7 @@ class TestPlugin(base.TestCase):
"payload": { "payload": {
"status": 200, "status": 200,
"message": "Result for IDID", "message": "Result for IDID",
"execution": {"id": "IDID"}, "execution_id": "IDID",
} }
} }
}, send_ack] }, send_ack]
@ -84,7 +84,7 @@ class TestPlugin(base.TestCase):
self.assertEqual(payload, { self.assertEqual(payload, {
"status": 200, "status": 200,
"message": "Result for IDID", "message": "Result for IDID",
"execution": {"id": "IDID"}, "execution_id": "IDID",
}) })
# We only want to test the first message, as there is only one. # We only want to test the first message, as there is only one.
# The last one, the send_ack will be used by the context # The last one, the send_ack will be used by the context

View File

@ -63,7 +63,7 @@ class TestDeleteNode(fakes.TestDeleteNode):
parsed_args = self.check_parser(self.cmd, argslist, verifylist) parsed_args = self.check_parser(self.cmd, argslist, verifylist)
self.websocket.wait_for_messages.return_value = iter([{ self.websocket.wait_for_messages.return_value = iter([{
"execution": {"id": "IDID"}, "execution_id": "IDID",
"status": "SUCCESS" "status": "SUCCESS"
}]) }])
@ -109,7 +109,7 @@ class TestDeleteNode(fakes.TestDeleteNode):
parsed_args = self.check_parser(self.cmd, arglist, verifylist) parsed_args = self.check_parser(self.cmd, arglist, verifylist)
self.websocket.wait_for_messages.return_value = iter([{ self.websocket.wait_for_messages.return_value = iter([{
"execution": {"id": "IDID"}, "execution_id": "IDID",
"status": "SUCCESS" "status": "SUCCESS"
}]) }])
@ -136,7 +136,7 @@ class TestDeleteNode(fakes.TestDeleteNode):
self.websocket.wait_for_messages.return_value = iter([{ self.websocket.wait_for_messages.return_value = iter([{
"status": "FAILED", "status": "FAILED",
"execution": {"id": "IDID"}, "execution_id": "IDID",
"message": """Failed to run action ERROR: Couldn't find \ "message": """Failed to run action ERROR: Couldn't find \
following instances in stack overcloud: wrong_instance""" following instances in stack overcloud: wrong_instance"""
}]) }])
@ -172,7 +172,7 @@ class TestProvideNode(fakes.TestOvercloudNode):
self.websocket.wait_for_messages.return_value = iter([{ self.websocket.wait_for_messages.return_value = iter([{
"status": "SUCCESS", "status": "SUCCESS",
"message": "Success", "message": "Success",
"execution": {"id": "IDID"} "execution_id": "IDID"
}]) }])
def test_provide_all_manageable_nodes(self): def test_provide_all_manageable_nodes(self):
@ -250,7 +250,7 @@ class TestIntrospectNode(fakes.TestOvercloudNode):
"status": "SUCCESS", "status": "SUCCESS",
"message": "Success", "message": "Success",
"introspected_nodes": {}, "introspected_nodes": {},
"execution": {"id": "IDID"} "execution_id": "IDID"
}] * 2) }] * 2)
self.cmd.take_action(parsed_args) self.cmd.take_action(parsed_args)
@ -274,7 +274,7 @@ class TestIntrospectNode(fakes.TestOvercloudNode):
self.websocket.wait_for_messages.return_value = [{ self.websocket.wait_for_messages.return_value = [{
"status": "SUCCESS", "status": "SUCCESS",
"message": "Success", "message": "Success",
"execution": {"id": "IDID"}, "execution_id": "IDID",
}] }]
self.cmd.take_action(parsed_args) self.cmd.take_action(parsed_args)
@ -393,7 +393,7 @@ class TestImportNode(fakes.TestOvercloudNode):
"registered_nodes": [{ "registered_nodes": [{
"uuid": "MOCK_NODE_UUID" "uuid": "MOCK_NODE_UUID"
}], }],
"execution": {"id": "IDID"} "execution_id": "IDID"
}] }]
self.cmd.take_action(parsed_args) self.cmd.take_action(parsed_args)
@ -492,7 +492,7 @@ class TestConfigureNode(fakes.TestOvercloudNode):
self.websocket.wait_for_messages.return_value = iter([{ self.websocket.wait_for_messages.return_value = iter([{
"status": "SUCCESS", "status": "SUCCESS",
"message": "", "message": "",
"execution": {"id": "IDID"} "execution_id": "IDID"
}]) }])
# Get the command object to test # Get the command object to test
@ -520,7 +520,7 @@ class TestConfigureNode(fakes.TestOvercloudNode):
self.websocket.wait_for_messages.return_value = iter([{ self.websocket.wait_for_messages.return_value = iter([{
"status": "FAILED", "status": "FAILED",
"message": "Test failure.", "message": "Test failure.",
"execution": {"id": "IDID"} "execution_id": "IDID"
}]) }])
parsed_args = self.check_parser(self.cmd, ['--all-manageable'], []) parsed_args = self.check_parser(self.cmd, ['--all-manageable'], [])
@ -549,7 +549,7 @@ class TestConfigureNode(fakes.TestOvercloudNode):
self.websocket.wait_for_messages.return_value = iter([{ self.websocket.wait_for_messages.return_value = iter([{
"status": "FAILED", "status": "FAILED",
"message": "Test failure.", "message": "Test failure.",
"execution": {"id": "IDID"} "execution_id": "IDID"
}]) }])
parsed_args = self.check_parser(self.cmd, ['node_uuid1'], []) parsed_args = self.check_parser(self.cmd, ['node_uuid1'], [])
@ -676,7 +676,7 @@ class TestDiscoverNode(fakes.TestOvercloudNode):
"registered_nodes": [{ "registered_nodes": [{
"uuid": "MOCK_NODE_UUID" "uuid": "MOCK_NODE_UUID"
}], }],
"execution": {"id": "IDID"} "execution_id": "IDID"
}] }]
def test_with_ip_range(self): def test_with_ip_range(self):

View File

@ -72,6 +72,13 @@ class TestOvercloudDeletePlan(utils.TestCommand):
self.app.client_manager.workflow_engine = mock.Mock() self.app.client_manager.workflow_engine = mock.Mock()
self.workflow = self.app.client_manager.workflow_engine self.workflow = self.app.client_manager.workflow_engine
self.websocket = mock.Mock()
self.websocket.__enter__ = lambda s: self.websocket
self.websocket.__exit__ = lambda s, *exc: None
self.tripleoclient = mock.Mock()
self.tripleoclient.messaging_websocket.return_value = self.websocket
self.app.client_manager.tripleoclient = self.tripleoclient
@mock.patch( @mock.patch(
'tripleoclient.workflows.plan_management.delete_deployment_plan', 'tripleoclient.workflows.plan_management.delete_deployment_plan',
autospec=True) autospec=True)
@ -79,6 +86,11 @@ class TestOvercloudDeletePlan(utils.TestCommand):
parsed_args = self.check_parser(self.cmd, ['test-plan'], parsed_args = self.check_parser(self.cmd, ['test-plan'],
[('plans', ['test-plan'])]) [('plans', ['test-plan'])])
self.websocket.wait_for_messages.return_value = iter([{
"execution_id": "IDID",
"status": "SUCCESS"
}])
self.cmd.take_action(parsed_args) self.cmd.take_action(parsed_args)
delete_deployment_plan_mock.assert_called_once_with( delete_deployment_plan_mock.assert_called_once_with(
@ -93,6 +105,11 @@ class TestOvercloudDeletePlan(utils.TestCommand):
verifylist = [('plans', ['test-plan1', 'test-plan2'])] verifylist = [('plans', ['test-plan1', 'test-plan2'])]
parsed_args = self.check_parser(self.cmd, argslist, verifylist) parsed_args = self.check_parser(self.cmd, argslist, verifylist)
self.websocket.wait_for_messages.return_value = iter([{
"execution_id": "IDID",
"status": "SUCCESS"
}])
self.cmd.take_action(parsed_args) self.cmd.take_action(parsed_args)
expected = [ expected = [
@ -136,7 +153,7 @@ class TestOvercloudCreatePlan(utils.TestCommand):
parsed_args = self.check_parser(self.cmd, arglist, verifylist) parsed_args = self.check_parser(self.cmd, arglist, verifylist)
self.websocket.wait_for_messages.return_value = iter([{ self.websocket.wait_for_messages.return_value = iter([{
"execution": {"id": "IDID"}, "execution_id": "IDID",
"status": "SUCCESS" "status": "SUCCESS"
}]) }])
@ -164,7 +181,7 @@ class TestOvercloudCreatePlan(utils.TestCommand):
parsed_args = self.check_parser(self.cmd, arglist, verifylist) parsed_args = self.check_parser(self.cmd, arglist, verifylist)
self.websocket.wait_for_messages.return_value = iter([{ self.websocket.wait_for_messages.return_value = iter([{
"execution": {"id": "IDID"}, "execution_id": "IDID",
"status": "ERROR", "message": "failed" "status": "ERROR", "message": "failed"
}]) }])
@ -194,7 +211,7 @@ class TestOvercloudCreatePlan(utils.TestCommand):
parsed_args = self.check_parser(self.cmd, arglist, verifylist) parsed_args = self.check_parser(self.cmd, arglist, verifylist)
self.websocket.wait_for_messages.return_value = iter([{ self.websocket.wait_for_messages.return_value = iter([{
"execution": {"id": "IDID"}, "execution_id": "IDID",
"status": "SUCCESS" "status": "SUCCESS"
}]) }])
mock_result = mock.Mock(output='{"result": null}') mock_result = mock.Mock(output='{"result": null}')
@ -229,7 +246,7 @@ class TestOvercloudCreatePlan(utils.TestCommand):
parsed_args = self.check_parser(self.cmd, arglist, verifylist) parsed_args = self.check_parser(self.cmd, arglist, verifylist)
self.websocket.wait_for_messages.return_value = iter([{ self.websocket.wait_for_messages.return_value = iter([{
"execution": {"id": "IDID"}, "execution_id": "IDID",
"status": "ERROR", "message": "failed" "status": "ERROR", "message": "failed"
}]) }])
mock_result = mock.Mock(output='{"result": null}') mock_result = mock.Mock(output='{"result": null}')
@ -297,7 +314,7 @@ class TestOvercloudCreatePlan(utils.TestCommand):
parsed_args = self.check_parser(self.cmd, arglist, verifylist) parsed_args = self.check_parser(self.cmd, arglist, verifylist)
self.websocket.wait_for_messages.return_value = iter([{ self.websocket.wait_for_messages.return_value = iter([{
"execution": {"id": "IDID"}, "execution_id": "IDID",
"status": "SUCCESS" "status": "SUCCESS"
}]) }])
mock_result = mock.Mock(output='{"result": null}') mock_result = mock.Mock(output='{"result": null}')
@ -339,7 +356,7 @@ class TestOvercloudCreatePlan(utils.TestCommand):
parsed_args = self.check_parser(self.cmd, arglist, verifylist) parsed_args = self.check_parser(self.cmd, arglist, verifylist)
self.websocket.wait_for_messages.return_value = iter([{ self.websocket.wait_for_messages.return_value = iter([{
"execution": {"id": "IDID"}, "execution_id": "IDID",
"status": "SUCCESS" "status": "SUCCESS"
}]) }])
@ -398,7 +415,7 @@ class TestOvercloudDeployPlan(utils.TestCommand):
self.orch.stacks.get.return_value = None self.orch.stacks.get.return_value = None
self.websocket.wait_for_messages.return_value = iter([{ self.websocket.wait_for_messages.return_value = iter([{
'execution': {'id': 'IDID'}, 'execution_id': 'IDID',
'status': 'SUCCESS' 'status': 'SUCCESS'
}]) }])

View File

@ -38,7 +38,7 @@ class TestCreateRAID(fakes.TestBaremetal):
websocket = tripleoclient.messaging_websocket() websocket = tripleoclient.messaging_websocket()
websocket.wait_for_messages.return_value = iter([ websocket.wait_for_messages.return_value = iter([
{'status': "SUCCESS", {'status': "SUCCESS",
'execution': {'id': 'IDID'}} 'execution_id': 'IDID'}
]) ])
self.websocket = websocket self.websocket = websocket

View File

@ -37,13 +37,13 @@ class TestBaremetalWorkflows(utils.TestCommand):
self.workflow.executions.create.return_value = execution self.workflow.executions.create.return_value = execution
self.message_success = iter([{ self.message_success = iter([{
"execution": {"id": "IDID"}, "execution_id": "IDID",
"status": "SUCCESS", "status": "SUCCESS",
"message": "Success.", "message": "Success.",
"registered_nodes": [], "registered_nodes": [],
}]) }])
self.message_failed = iter([{ self.message_failed = iter([{
"execution": {"id": "IDID"}, "execution_id": "IDID",
"status": "FAIL", "status": "FAIL",
"message": "Fail.", "message": "Fail.",
}]) }])
@ -127,7 +127,7 @@ class TestBaremetalWorkflows(utils.TestCommand):
def test_provide_error_with_format_message(self): def test_provide_error_with_format_message(self):
self.websocket.wait_for_messages.return_value = iter([{ self.websocket.wait_for_messages.return_value = iter([{
"execution": {"id": "IDID"}, "execution_id": "IDID",
"status": "FAIL", "status": "FAIL",
"message": ['Error1', 'Error2'] "message": ['Error1', 'Error2']
}]) }])
@ -175,7 +175,7 @@ class TestBaremetalWorkflows(utils.TestCommand):
def test_introspect_manageable_nodes_success(self): def test_introspect_manageable_nodes_success(self):
self.websocket.wait_for_messages.return_value = iter([{ self.websocket.wait_for_messages.return_value = iter([{
"execution": {"id": "IDID"}, "execution_id": "IDID",
"status": "SUCCESS", "status": "SUCCESS",
"introspected_nodes": {}, "introspected_nodes": {},
}]) }])
@ -209,7 +209,7 @@ class TestBaremetalWorkflows(utils.TestCommand):
def test_introspect_manageable_nodes_mixed_status(self): def test_introspect_manageable_nodes_mixed_status(self):
self.websocket.wait_for_messages.return_value = iter([{ self.websocket.wait_for_messages.return_value = iter([{
"execution": {"id": "IDID"}, "execution_id": "IDID",
"status": "SUCCESS", "status": "SUCCESS",
"introspected_nodes": {'node1': {'error': None}, "introspected_nodes": {'node1': {'error': None},
'node2': {'error': 'Error'}} 'node2': {'error': 'Error'}}

View File

@ -25,13 +25,13 @@ class TestBaseWorkflows(utils.TestCommand):
def test_wait_for_messages_success(self): def test_wait_for_messages_success(self):
payload_a = { payload_a = {
'status': 'ERROR', 'status': 'ERROR',
'execution': {'id': 2, 'execution_id': 2,
'root_execution_id': 1} 'root_execution_id': 1
} }
payload_b = { payload_b = {
'status': 'ERROR', 'status': 'ERROR',
'execution': {'id': 1, 'execution_id': 1,
'root_execution_id': 1} 'root_execution_id': 1
} }
mistral = mock.Mock() mistral = mock.Mock()
@ -77,6 +77,56 @@ class TestBaseWorkflows(utils.TestCommand):
websocket.wait_for_messages.assert_called_with(timeout=None) websocket.wait_for_messages.assert_called_with(timeout=None)
def test_wait_for_messages_different_execution(self): def test_wait_for_messages_different_execution(self):
payload_a = {
'status': 'RUNNING',
'execution_id': 'aaaa',
'root_execution_id': 'aaaa'
}
payload_b = {
'status': 'RUNNING',
'execution_id': 'bbbb',
'root_execution_id': 'bbbb'
}
mistral = mock.Mock()
websocket = mock.Mock()
websocket.wait_for_messages.return_value = iter([payload_a, payload_b])
execution = mock.Mock()
execution.id = 'aaaa'
messages = list(base.wait_for_messages(mistral, websocket, execution))
# Assert only payload_a was returned
self.assertEqual([payload_a], messages)
websocket.wait_for_messages.assert_called_with(timeout=None)
def test_backwards_compat_wait_for_messages_success(self):
payload_a = {
'status': 'ERROR',
'execution': {'id': 2,
'root_execution_id': 1}
}
payload_b = {
'status': 'ERROR',
'execution': {'id': 1,
'root_execution_id': 1}
}
mistral = mock.Mock()
websocket = mock.Mock()
websocket.wait_for_messages.return_value = iter([payload_a, payload_b])
execution = mock.Mock()
execution.id = 1
messages = list(base.wait_for_messages(mistral, websocket, execution))
self.assertEqual([payload_a, payload_b], messages)
self.assertFalse(mistral.executions.get.called)
websocket.wait_for_messages.assert_called_with(timeout=None)
def test_backwards_compatible_call_with_different_execution(self):
payload_a = { payload_a = {
'status': 'RUNNING', 'status': 'RUNNING',
'execution': {'id': 'aaaa', 'execution': {'id': 'aaaa',

View File

@ -48,7 +48,7 @@ class TestParameterWorkflows(utils.TestCommand):
def test_get_overcloud_passwords(self): def test_get_overcloud_passwords(self):
self.websocket.wait_for_messages.return_value = iter([{ self.websocket.wait_for_messages.return_value = iter([{
"execution": {"id": "IDID"}, "execution_id": "IDID",
"status": "SUCCESS", "status": "SUCCESS",
"message": "passwords", "message": "passwords",
}]) }])
@ -77,7 +77,7 @@ class TestParameterWorkflows(utils.TestCommand):
mock_safe_load.return_value = plan_env_data mock_safe_load.return_value = plan_env_data
self.websocket.wait_for_messages.return_value = iter([{ self.websocket.wait_for_messages.return_value = iter([{
"execution": {"id": "IDID"}, "execution_id": "IDID",
"status": "SUCCESS", "status": "SUCCESS",
"message": "", "message": "",
"result": {} "result": {}
@ -110,7 +110,7 @@ class TestParameterWorkflows(utils.TestCommand):
mock_safe_load.return_value = plan_env_data mock_safe_load.return_value = plan_env_data
self.websocket.wait_for_messages.return_value = iter([{ self.websocket.wait_for_messages.return_value = iter([{
"execution": {"id": "IDID"}, "execution_id": "IDID",
"status": "FAILED", "status": "FAILED",
"message": "workflow failure", "message": "workflow failure",
"result": "" "result": ""
@ -158,7 +158,7 @@ class TestParameterWorkflows(utils.TestCommand):
def test_check_deprecated_params_no_output(self): def test_check_deprecated_params_no_output(self):
self.websocket.wait_for_messages.return_value = iter([{ self.websocket.wait_for_messages.return_value = iter([{
"execution": {"id": "IDID"}, "execution_id": "IDID",
"status": "SUCCESS", "status": "SUCCESS",
}]) }])
@ -176,7 +176,7 @@ class TestParameterWorkflows(utils.TestCommand):
'deprecated': True, 'deprecated': True,
'user_defined': True}] 'user_defined': True}]
self.websocket.wait_for_messages.return_value = iter([{ self.websocket.wait_for_messages.return_value = iter([{
"execution": {"id": "IDID"}, "execution_id": "IDID",
"status": "SUCCESS", "status": "SUCCESS",
"deprecated": deprecated_params "deprecated": deprecated_params
}]) }])
@ -197,7 +197,7 @@ class TestParameterWorkflows(utils.TestCommand):
deprecated_params = [{'parameter': 'TestParameter1', deprecated_params = [{'parameter': 'TestParameter1',
'deprecated': True}] 'deprecated': True}]
self.websocket.wait_for_messages.return_value = iter([{ self.websocket.wait_for_messages.return_value = iter([{
"execution": {"id": "IDID"}, "execution_id": "IDID",
"status": "SUCCESS", "status": "SUCCESS",
"deprecated": deprecated_params "deprecated": deprecated_params
}]) }])

View File

@ -35,7 +35,7 @@ class TestPlanCreationWorkflows(utils.TestCommand):
self.app.client_manager.tripleoclient = self.tripleoclient self.app.client_manager.tripleoclient = self.tripleoclient
self.message_success = iter([{ self.message_success = iter([{
"execution": {"id": "IDID"}, "execution_id": "IDID",
"status": "SUCCESS", "status": "SUCCESS",
}]) }])
@ -255,7 +255,7 @@ class TestPlanUpdateWorkflows(base.TestCommand):
self.workflow.action_executions.create.return_value = output self.workflow.action_executions.create.return_value = output
self.workflow.executions.create.return_value = output self.workflow.executions.create.return_value = output
self.message_success = iter([{ self.message_success = iter([{
"execution": {"id": "IDID"}, "execution_id": "IDID",
"status": "SUCCESS", "status": "SUCCESS",
}]) }])
self.websocket.wait_for_messages.return_value = self.message_success self.websocket.wait_for_messages.return_value = self.message_success

View File

@ -60,19 +60,32 @@ def wait_for_messages(mistral, websocket, execution, timeout=None):
try: try:
for payload in websocket.wait_for_messages(timeout=timeout): for payload in websocket.wait_for_messages(timeout=timeout):
# Ignore messages whose root_execution_id does not match the # Ignore messages whose root_execution_id does not match the
# id of the execution for which we are waiting. # id of the execution for which we are waiting
if payload['execution']['id'] != execution.id and \
payload['execution'].get('root_execution_id', '') != \ # New versions of tripleo-common don't sent the execution anymore
execution.id: # but keeping the old way ot getting it is important to keep
# backwards compatibility.
# TODO(apetrich) payload.execution is deprecated and will be
# removed from stein. We should keep this until payload.execution
# is removed from the LTS
payload_exec_id = payload.get('execution_id') or \
payload.get('execution', {}).get('id')
payload_root_exec_id = payload.get('root_execution_id', '') or \
payload.get('execution', {}).get('root_execution_id', '')
if payload_exec_id != execution.id and \
payload_root_exec_id != execution.id:
LOG.debug("Ignoring message from execution %s" LOG.debug("Ignoring message from execution %s"
% payload['execution']['id']) % payload_exec_id)
else: else:
yield payload yield payload
# If the message is from a sub-workflow, we just need to pass it # If the message is from a sub-workflow, we just need to pass it
# on to be displayed. This should never be the last message - so # on to be displayed. This should never be the last message - so
# continue and wait for the next. # continue and wait for the next.
if payload['execution']['id'] != execution.id: if payload_exec_id != execution.id:
continue continue
# Check the status of the payload, if we are not given one # Check the status of the payload, if we are not given one
# default to running and assume it is just an "in progress" # default to running and assume it is just an "in progress"