From 2a26ef2cf19571e39fc3071b19500808c6d14fcc Mon Sep 17 00:00:00 2001 From: Thomas Herve Date: Wed, 26 Sep 2018 11:32:08 +0200 Subject: [PATCH] Start websocket client before workflows When we start a workflow in the client, we need to create the websocket connection beforehand. If the workflow is very quick (like create_overcloudrc), it could be finished before we subscribe to the Zaqar queue properly, and thus we wouldn't get the message. Change-Id: I56c5f3a094094f7ba2158d8a434122ccb496f6b4 Closes-Bug: #1794418 --- tripleoclient/workflows/deployment.py | 48 ++++++++--------- tripleoclient/workflows/plan_management.py | 24 ++++----- tripleoclient/workflows/support.py | 60 +++++++++++----------- 3 files changed, 66 insertions(+), 66 deletions(-) diff --git a/tripleoclient/workflows/deployment.py b/tripleoclient/workflows/deployment.py index d1fda50cd..f07a1f407 100644 --- a/tripleoclient/workflows/deployment.py +++ b/tripleoclient/workflows/deployment.py @@ -110,13 +110,13 @@ def create_overcloudrc(clients, **workflow_input): workflow_client = clients.workflow_engine tripleoclients = clients.tripleoclient - execution = base.start_workflow( - workflow_client, - 'tripleo.deployment.v1.create_overcloudrc', - workflow_input=workflow_input - ) - with tripleoclients.messaging_websocket() as ws: + execution = base.start_workflow( + workflow_client, + 'tripleo.deployment.v1.create_overcloudrc', + workflow_input=workflow_input + ) + for payload in base.wait_for_messages(workflow_client, ws, execution): # the workflow will return the overcloudrc data, an error message # or blank. @@ -281,13 +281,13 @@ def config_download_export(clients, **workflow_input): workflow_client = clients.workflow_engine tripleoclients = clients.tripleoclient - execution = base.start_workflow( - workflow_client, - 'tripleo.deployment.v1.config_download_export', - workflow_input=workflow_input - ) - with tripleoclients.messaging_websocket() as ws: + execution = base.start_workflow( + workflow_client, + 'tripleo.deployment.v1.config_download_export', + workflow_input=workflow_input + ) + for payload in base.wait_for_messages(workflow_client, ws, execution, _WORKFLOW_TIMEOUT): if 'message' in payload: @@ -323,13 +323,13 @@ def get_deployment_status(clients, **workflow_input): workflow_client = clients.workflow_engine tripleoclients = clients.tripleoclient - execution = base.start_workflow( - workflow_client, - 'tripleo.deployment.v1.get_deployment_status', - workflow_input=workflow_input - ) - with tripleoclients.messaging_websocket() as ws: + execution = base.start_workflow( + workflow_client, + 'tripleo.deployment.v1.get_deployment_status', + workflow_input=workflow_input + ) + for payload in base.wait_for_messages(workflow_client, ws, execution, _WORKFLOW_TIMEOUT): if 'message' in payload: @@ -347,13 +347,13 @@ def get_deployment_failures(clients, **workflow_input): workflow_client = clients.workflow_engine tripleoclients = clients.tripleoclient - execution = base.start_workflow( - workflow_client, - 'tripleo.deployment.v1.get_deployment_failures', - workflow_input=workflow_input - ) - with tripleoclients.messaging_websocket() as ws: + execution = base.start_workflow( + workflow_client, + 'tripleo.deployment.v1.get_deployment_failures', + workflow_input=workflow_input + ) + for payload in base.wait_for_messages(workflow_client, ws, execution, _WORKFLOW_TIMEOUT): if 'message' in payload: diff --git a/tripleoclient/workflows/plan_management.py b/tripleoclient/workflows/plan_management.py index f94d6052e..146e9fdef 100644 --- a/tripleoclient/workflows/plan_management.py +++ b/tripleoclient/workflows/plan_management.py @@ -95,13 +95,13 @@ def delete_deployment_plan(clients, **workflow_input): workflow_client = clients.workflow_engine tripleoclients = clients.tripleoclient - execution = base.start_workflow( - workflow_client, - 'tripleo.plan_management.v1.delete_deployment_plan', - workflow_input=workflow_input - ) - with tripleoclients.messaging_websocket() as ws: + execution = base.start_workflow( + workflow_client, + 'tripleo.plan_management.v1.delete_deployment_plan', + workflow_input=workflow_input + ) + for payload in base.wait_for_messages(workflow_client, ws, execution, _WORKFLOW_TIMEOUT): if 'message' in payload: @@ -311,13 +311,13 @@ def export_deployment_plan(clients, **workflow_input): workflow_client = clients.workflow_engine tripleoclients = clients.tripleoclient - execution = base.start_workflow( - workflow_client, - 'tripleo.plan_management.v1.export_deployment_plan', - workflow_input=workflow_input - ) - with tripleoclients.messaging_websocket() as ws: + execution = base.start_workflow( + workflow_client, + 'tripleo.plan_management.v1.export_deployment_plan', + workflow_input=workflow_input + ) + for payload in base.wait_for_messages(workflow_client, ws, execution, _WORKFLOW_TIMEOUT): if 'message' in payload: diff --git a/tripleoclient/workflows/support.py b/tripleoclient/workflows/support.py index e8b7f2009..2f9dab342 100644 --- a/tripleoclient/workflows/support.py +++ b/tripleoclient/workflows/support.py @@ -83,23 +83,23 @@ def fetch_logs(clients, container, server_name, timeout=None, workflow_client = clients.workflow_engine tripleoclients = clients.tripleoclient - execution = base.start_workflow( - workflow_client, - 'tripleo.support.v1.fetch_logs', - workflow_input=workflow_input - ) + with tripleoclients.messaging_websocket() as ws: + execution = base.start_workflow( + workflow_client, + 'tripleo.support.v1.fetch_logs', + workflow_input=workflow_input + ) - websocket = tripleoclients.messaging_websocket() - messages = base.wait_for_messages(workflow_client, - websocket, - execution, - timeout) + messages = base.wait_for_messages(workflow_client, + ws, + execution, + timeout) - for message in messages: - if message['status'] != 'SUCCESS': - raise LogFetchError(message['message']) - if message['message']: - print('{}'.format(message['message'])) + for message in messages: + if message['status'] != 'SUCCESS': + raise LogFetchError(message['message']) + if message['message']: + print('{}'.format(message['message'])) def delete_container(clients, container, timeout=None, concurrency=None): @@ -122,20 +122,20 @@ def delete_container(clients, container, timeout=None, concurrency=None): workflow_client = clients.workflow_engine tripleoclients = clients.tripleoclient - execution = base.start_workflow( - workflow_client, - 'tripleo.support.v1.delete_container', - workflow_input=workflow_input - ) + with tripleoclients.messaging_websocket() as ws: + execution = base.start_workflow( + workflow_client, + 'tripleo.support.v1.delete_container', + workflow_input=workflow_input + ) - websocket = tripleoclients.messaging_websocket() - messages = base.wait_for_messages(workflow_client, - websocket, - execution, - timeout) + messages = base.wait_for_messages(workflow_client, + ws, + execution, + timeout) - for message in messages: - if message['status'] != 'SUCCESS': - raise ContainerDeleteFailed(message['message']) - if message['message']: - print('{}'.format(message['message'])) + for message in messages: + if message['status'] != 'SUCCESS': + raise ContainerDeleteFailed(message['message']) + if message['message']: + print('{}'.format(message['message']))