Remove execution from workflow message send

Serializing all the execution in a message can make the message too big.
This change was done in tripleo-common. this supports that change
This change still supports the old format and is backwards compatible.

Partial-Bug: #1812172
Change-Id: I40ee028366222f38f5ba1db58d171f98be75d009
This commit is contained in:
apetrich 2019-01-15 14:16:00 +01:00
parent dcbd17c07a
commit cad7916ce8
12 changed files with 121 additions and 58 deletions

View File

@ -47,7 +47,7 @@ class FakeWebSocket(object):
def wait_for_messages(self, timeout=None):
yield {
'execution': {'id': 'IDID'},
'execution_id': 'IDID',
'status': 'SUCCESS',
}

View File

@ -38,7 +38,7 @@ class TestOvercloudCredentials(test_plugin.TestPluginV1):
self.app.client_manager.tripleoclient = self.tripleoclient
self.websocket.wait_for_messages.return_value = iter([{
"execution": {"id": "IDID"},
"execution_id": "IDID",
"status": "SUCCESS",
"message": {
"overcloudrc": "OVERCLOUDRC CONTENTS",

View File

@ -65,7 +65,7 @@ class TestPlugin(base.TestCase):
"payload": {
"status": 200,
"message": "Result for IDID",
"execution": {"id": "IDID"},
"execution_id": "IDID",
}
}
}, send_ack]
@ -82,7 +82,7 @@ class TestPlugin(base.TestCase):
self.assertEqual(payload, {
"status": 200,
"message": "Result for IDID",
"execution": {"id": "IDID"},
"execution_id": "IDID",
})
# We only want to test the first message, as there is only one.
# The last one, the send_ack will be used by the context

View File

@ -63,7 +63,7 @@ class TestDeleteNode(fakes.TestDeleteNode):
parsed_args = self.check_parser(self.cmd, argslist, verifylist)
self.websocket.wait_for_messages.return_value = iter([{
"execution": {"id": "IDID"},
"execution_id": "IDID",
"status": "SUCCESS"
}])
@ -109,7 +109,7 @@ class TestDeleteNode(fakes.TestDeleteNode):
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
self.websocket.wait_for_messages.return_value = iter([{
"execution": {"id": "IDID"},
"execution_id": "IDID",
"status": "SUCCESS"
}])
@ -136,7 +136,7 @@ class TestDeleteNode(fakes.TestDeleteNode):
self.websocket.wait_for_messages.return_value = iter([{
"status": "FAILED",
"execution": {"id": "IDID"},
"execution_id": "IDID",
"message": """Failed to run action ERROR: Couldn't find \
following instances in stack overcloud: wrong_instance"""
}])
@ -172,7 +172,7 @@ class TestProvideNode(fakes.TestOvercloudNode):
self.websocket.wait_for_messages.return_value = iter([{
"status": "SUCCESS",
"message": "Success",
"execution": {"id": "IDID"}
"execution_id": "IDID"
}])
def test_provide_all_manageable_nodes(self):
@ -250,7 +250,7 @@ class TestIntrospectNode(fakes.TestOvercloudNode):
"status": "SUCCESS",
"message": "Success",
"introspected_nodes": {},
"execution": {"id": "IDID"}
"execution_id": "IDID"
}] * 2)
self.cmd.take_action(parsed_args)
@ -274,7 +274,7 @@ class TestIntrospectNode(fakes.TestOvercloudNode):
self.websocket.wait_for_messages.return_value = [{
"status": "SUCCESS",
"message": "Success",
"execution": {"id": "IDID"},
"execution_id": "IDID",
}]
self.cmd.take_action(parsed_args)
@ -359,7 +359,7 @@ class TestCleanNode(fakes.TestOvercloudNode):
"status": "SUCCESS",
"message": "Success",
"cleaned_nodes": {},
"execution": {"id": "IDID"}
"execution_id": "IDID"
}] * 2)
self.cmd.take_action(parsed_args)
@ -383,7 +383,7 @@ class TestCleanNode(fakes.TestOvercloudNode):
self.websocket.wait_for_messages.return_value = [{
"status": "SUCCESS",
"message": "Success",
"execution": {"id": "IDID"}
"execution_id": "IDID"
}]
self.cmd.take_action(parsed_args)
@ -488,7 +488,7 @@ class TestImportNode(fakes.TestOvercloudNode):
"registered_nodes": [{
"uuid": "MOCK_NODE_UUID"
}],
"execution": {"id": "IDID"}
"execution_id": "IDID"
}]
self.cmd.take_action(parsed_args)
@ -645,7 +645,7 @@ class TestImportNodeMultiArch(fakes.TestOvercloudNode):
"registered_nodes": [{
"uuid": "MOCK_NODE_UUID"
}],
"execution": {"id": "IDID"}
"execution_id": "IDID"
}]
self.cmd.take_action(parsed_args)
@ -751,7 +751,7 @@ class TestConfigureNode(fakes.TestOvercloudNode):
self.websocket.wait_for_messages.return_value = iter([{
"status": "SUCCESS",
"message": "",
"execution": {"id": "IDID"}
"execution_id": "IDID"
}])
# Get the command object to test
@ -779,7 +779,7 @@ class TestConfigureNode(fakes.TestOvercloudNode):
self.websocket.wait_for_messages.return_value = iter([{
"status": "FAILED",
"message": "Test failure.",
"execution": {"id": "IDID"}
"execution_id": "IDID"
}])
parsed_args = self.check_parser(self.cmd, ['--all-manageable'], [])
@ -808,7 +808,7 @@ class TestConfigureNode(fakes.TestOvercloudNode):
self.websocket.wait_for_messages.return_value = iter([{
"status": "FAILED",
"message": "Test failure.",
"execution": {"id": "IDID"}
"execution_id": "IDID"
}])
parsed_args = self.check_parser(self.cmd, ['node_uuid1'], [])
@ -935,7 +935,7 @@ class TestDiscoverNode(fakes.TestOvercloudNode):
"registered_nodes": [{
"uuid": "MOCK_NODE_UUID"
}],
"execution": {"id": "IDID"}
"execution_id": "IDID"
}]
def test_with_ip_range(self):

View File

@ -88,7 +88,7 @@ class TestOvercloudDeletePlan(utils.TestCommand):
[('plans', ['test-plan'])])
self.websocket.wait_for_messages.return_value = iter([{
"execution": {"id": "IDID"},
"execution_id": "IDID",
"status": "SUCCESS"
}])
@ -104,7 +104,7 @@ class TestOvercloudDeletePlan(utils.TestCommand):
parsed_args = self.check_parser(self.cmd, argslist, verifylist)
self.websocket.wait_for_messages.return_value = iter([{
"execution": {"id": "IDID"},
"execution_id": "IDID",
"status": "SUCCESS"
}])
@ -151,7 +151,7 @@ class TestOvercloudCreatePlan(utils.TestCommand):
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
self.websocket.wait_for_messages.return_value = iter([{
"execution": {"id": "IDID"},
"execution_id": "IDID",
"status": "SUCCESS"
}])
@ -179,7 +179,7 @@ class TestOvercloudCreatePlan(utils.TestCommand):
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
self.websocket.wait_for_messages.return_value = iter([{
"execution": {"id": "IDID"},
"execution_id": "IDID",
"status": "ERROR", "message": "failed"
}])
@ -209,7 +209,7 @@ class TestOvercloudCreatePlan(utils.TestCommand):
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
self.websocket.wait_for_messages.return_value = iter([{
"execution": {"id": "IDID"},
"execution_id": "IDID",
"status": "SUCCESS"
}])
mock_result = mock.Mock(output='{"result": null}')
@ -243,7 +243,7 @@ class TestOvercloudCreatePlan(utils.TestCommand):
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
self.websocket.wait_for_messages.return_value = iter([{
"execution": {"id": "IDID"},
"execution_id": "IDID",
"status": "ERROR", "message": "failed"
}])
mock_result = mock.Mock(output='{"result": null}')
@ -310,7 +310,7 @@ class TestOvercloudCreatePlan(utils.TestCommand):
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
self.websocket.wait_for_messages.return_value = iter([{
"execution": {"id": "IDID"},
"execution_id": "IDID",
"status": "SUCCESS"
}])
mock_result = mock.Mock(output='{"result": null}')
@ -351,7 +351,7 @@ class TestOvercloudCreatePlan(utils.TestCommand):
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
self.websocket.wait_for_messages.return_value = iter([{
"execution": {"id": "IDID"},
"execution_id": "IDID",
"status": "SUCCESS"
}])
@ -410,7 +410,7 @@ class TestOvercloudDeployPlan(utils.TestCommand):
self.orch.stacks.get.return_value = None
self.websocket.wait_for_messages.return_value = iter([{
'execution': {'id': 'IDID'},
'execution_id': 'IDID',
'status': 'SUCCESS'
}])

View File

@ -151,7 +151,7 @@ class TestOvercloudListRole(utils.TestCommand):
def test_list_empty(self):
self.websocket.wait_for_messages.return_value = [{
'execution': {'id': 'IDID'},
'execution_id': 'IDID',
'status': 'SUCCESS',
'available_roles': []
}]
@ -169,7 +169,7 @@ class TestOvercloudListRole(utils.TestCommand):
def test_list(self):
self.websocket.wait_for_messages.return_value = [{
'execution': {'id': 'IDID'},
'execution_id': 'IDID',
'status': 'SUCCESS',
'available_roles': [{'name': 'ObjectStorage'},
{'name': 'Compute'}]
@ -188,7 +188,7 @@ class TestOvercloudListRole(utils.TestCommand):
def test_list_with_details(self):
self.websocket.wait_for_messages.return_value = [{
'execution': {'id': 'IDID'},
'execution_id': 'IDID',
'status': 'SUCCESS',
'available_roles': [
{'name': 'Controller', 'description': 'Test description',
@ -233,7 +233,7 @@ class TestOvercloudShowRole(utils.TestCommand):
def test_role_not_found(self):
self.websocket.wait_for_messages.return_value = [{
'execution': {'id': 'IDID'},
'execution_id': 'IDID',
'status': 'SUCCESS',
'available_roles': []
}]
@ -248,7 +248,7 @@ class TestOvercloudShowRole(utils.TestCommand):
def test_role(self):
self.websocket.wait_for_messages.return_value = [{
'execution': {'id': 'IDID'},
'execution_id': 'IDID',
'status': 'SUCCESS',
'available_roles': [
{"name": "Test", "a": "b"},

View File

@ -38,7 +38,7 @@ class TestCreateRAID(fakes.TestBaremetal):
websocket = tripleoclient.messaging_websocket()
websocket.wait_for_messages.return_value = iter([
{'status': "SUCCESS",
'execution': {'id': 'IDID'}}
'execution_id': 'IDID'}
])
self.websocket = websocket

View File

@ -37,13 +37,13 @@ class TestBaremetalWorkflows(utils.TestCommand):
self.workflow.executions.create.return_value = execution
self.message_success = iter([{
"execution": {"id": "IDID"},
"execution_id": "IDID",
"status": "SUCCESS",
"message": "Success.",
"registered_nodes": [],
}])
self.message_failed = iter([{
"execution": {"id": "IDID"},
"execution_id": "IDID",
"status": "FAIL",
"message": "Fail.",
}])
@ -127,7 +127,7 @@ class TestBaremetalWorkflows(utils.TestCommand):
def test_provide_error_with_format_message(self):
self.websocket.wait_for_messages.return_value = iter([{
"execution": {"id": "IDID"},
"execution_id": "IDID",
"status": "FAIL",
"message": ['Error1', 'Error2']
}])
@ -175,7 +175,7 @@ class TestBaremetalWorkflows(utils.TestCommand):
def test_introspect_manageable_nodes_success(self):
self.websocket.wait_for_messages.return_value = iter([{
"execution": {"id": "IDID"},
"execution_id": "IDID",
"status": "SUCCESS",
"introspected_nodes": {},
}])
@ -209,7 +209,7 @@ class TestBaremetalWorkflows(utils.TestCommand):
def test_introspect_manageable_nodes_mixed_status(self):
self.websocket.wait_for_messages.return_value = iter([{
"execution": {"id": "IDID"},
"execution_id": "IDID",
"status": "SUCCESS",
"introspected_nodes": {'node1': {'error': None},
'node2': {'error': 'Error'}}
@ -342,7 +342,7 @@ class TestBaremetalWorkflows(utils.TestCommand):
def test_clean_manageable_nodes_success(self):
self.websocket.wait_for_messages.return_value = iter([{
"execution": {"id": "IDID"},
"execution_id": "IDID",
"status": "SUCCESS",
"cleaned_nodes": [],
}])

View File

@ -25,13 +25,13 @@ class TestBaseWorkflows(utils.TestCommand):
def test_wait_for_messages_success(self):
payload_a = {
'status': 'ERROR',
'execution': {'id': 2,
'root_execution_id': 1}
'execution_id': 2,
'root_execution_id': 1
}
payload_b = {
'status': 'ERROR',
'execution': {'id': 1,
'root_execution_id': 1}
'execution_id': 1,
'root_execution_id': 1
}
mistral = mock.Mock()
@ -77,6 +77,56 @@ class TestBaseWorkflows(utils.TestCommand):
websocket.wait_for_messages.assert_called_with(timeout=None)
def test_wait_for_messages_different_execution(self):
payload_a = {
'status': 'RUNNING',
'execution_id': 'aaaa',
'root_execution_id': 'aaaa'
}
payload_b = {
'status': 'RUNNING',
'execution_id': 'bbbb',
'root_execution_id': 'bbbb'
}
mistral = mock.Mock()
websocket = mock.Mock()
websocket.wait_for_messages.return_value = iter([payload_a, payload_b])
execution = mock.Mock()
execution.id = 'aaaa'
messages = list(base.wait_for_messages(mistral, websocket, execution))
# Assert only payload_a was returned
self.assertEqual([payload_a], messages)
websocket.wait_for_messages.assert_called_with(timeout=None)
def test_backwards_compat_wait_for_messages_success(self):
payload_a = {
'status': 'ERROR',
'execution': {'id': 2,
'root_execution_id': 1}
}
payload_b = {
'status': 'ERROR',
'execution': {'id': 1,
'root_execution_id': 1}
}
mistral = mock.Mock()
websocket = mock.Mock()
websocket.wait_for_messages.return_value = iter([payload_a, payload_b])
execution = mock.Mock()
execution.id = 1
messages = list(base.wait_for_messages(mistral, websocket, execution))
self.assertEqual([payload_a, payload_b], messages)
self.assertFalse(mistral.executions.get.called)
websocket.wait_for_messages.assert_called_with(timeout=None)
def test_backwards_compatible_call_with_different_execution(self):
payload_a = {
'status': 'RUNNING',
'execution': {'id': 'aaaa',

View File

@ -48,7 +48,7 @@ class TestParameterWorkflows(utils.TestCommand):
def test_get_overcloud_passwords(self):
self.websocket.wait_for_messages.return_value = iter([{
"execution": {"id": "IDID"},
"execution_id": "IDID",
"status": "SUCCESS",
"message": "passwords",
}])
@ -77,7 +77,7 @@ class TestParameterWorkflows(utils.TestCommand):
mock_safe_load.return_value = plan_env_data
self.websocket.wait_for_messages.return_value = iter([{
"execution": {"id": "IDID"},
"execution_id": "IDID",
"status": "SUCCESS",
"message": "",
"result": {}
@ -110,7 +110,7 @@ class TestParameterWorkflows(utils.TestCommand):
mock_safe_load.return_value = plan_env_data
self.websocket.wait_for_messages.return_value = iter([{
"execution": {"id": "IDID"},
"execution_id": "IDID",
"status": "FAILED",
"message": "workflow failure",
"result": ""
@ -158,7 +158,7 @@ class TestParameterWorkflows(utils.TestCommand):
def test_check_deprecated_params_no_output(self):
self.websocket.wait_for_messages.return_value = iter([{
"execution": {"id": "IDID"},
"execution_id": "IDID",
"status": "SUCCESS",
}])
@ -176,7 +176,7 @@ class TestParameterWorkflows(utils.TestCommand):
'deprecated': True,
'user_defined': True}]
self.websocket.wait_for_messages.return_value = iter([{
"execution": {"id": "IDID"},
"execution_id": "IDID",
"status": "SUCCESS",
"deprecated": deprecated_params
}])
@ -197,7 +197,7 @@ class TestParameterWorkflows(utils.TestCommand):
deprecated_params = [{'parameter': 'TestParameter1',
'deprecated': True}]
self.websocket.wait_for_messages.return_value = iter([{
"execution": {"id": "IDID"},
"execution_id": "IDID",
"status": "SUCCESS",
"deprecated": deprecated_params
}])
@ -215,7 +215,7 @@ class TestParameterWorkflows(utils.TestCommand):
def test_generate_fencing_parameters(self):
self.websocket.wait_for_messages.return_value = iter([{
"execution": {"id": "IDID"},
"execution_id": "IDID",
"status": "SUCCESS",
"fencing_parameters": "{}"
}])

View File

@ -35,7 +35,7 @@ class TestPlanCreationWorkflows(utils.TestCommand):
self.app.client_manager.tripleoclient = self.tripleoclient
self.message_success = iter([{
"execution": {"id": "IDID"},
"execution_id": "IDID",
"status": "SUCCESS",
}])
@ -245,7 +245,7 @@ class TestPlanUpdateWorkflows(base.TestCommand):
self.workflow.action_executions.create.return_value = output
self.workflow.executions.create.return_value = output
self.message_success = iter([{
"execution": {"id": "IDID"},
"execution_id": "IDID",
"status": "SUCCESS",
}])
self.websocket.wait_for_messages.return_value = self.message_success

View File

@ -60,19 +60,32 @@ def wait_for_messages(mistral, websocket, execution, timeout=None):
try:
for payload in websocket.wait_for_messages(timeout=timeout):
# Ignore messages whose root_execution_id does not match the
# id of the execution for which we are waiting.
if payload['execution']['id'] != execution.id and \
payload['execution'].get('root_execution_id', '') != \
execution.id:
# id of the execution for which we are waiting
# New versions of tripleo-common don't sent the execution anymore
# but keeping the old way ot getting it is important to keep
# backwards compatibility.
# TODO(apetrich) payload.execution is deprecated and will be
# removed from stein. We should keep this until payload.execution
# is removed from the LTS
payload_exec_id = payload.get('execution_id') or \
payload.get('execution', {}).get('id')
payload_root_exec_id = payload.get('root_execution_id', '') or \
payload.get('execution', {}).get('root_execution_id', '')
if payload_exec_id != execution.id and \
payload_root_exec_id != execution.id:
LOG.debug("Ignoring message from execution %s"
% payload['execution']['id'])
% payload_exec_id)
else:
yield payload
# If the message is from a sub-workflow, we just need to pass it
# on to be displayed. This should never be the last message - so
# continue and wait for the next.
if payload['execution']['id'] != execution.id:
if payload_exec_id != execution.id:
continue
# Check the status of the payload, if we are not given one
# default to running and assume it is just an "in progress"