Start websocket client before workflows
When we start a workflow in the client, we need to create the websocket connection beforehand. If the workflow is very quick (like create_overcloudrc), it could be finished before we subscribe to the Zaqar queue properly, and thus we wouldn't get the message. Change-Id: I56c5f3a094094f7ba2158d8a434122ccb496f6b4 Closes-Bug: #1794418
This commit is contained in:
parent
4462be0aae
commit
2a26ef2cf1
@ -110,13 +110,13 @@ def create_overcloudrc(clients, **workflow_input):
|
|||||||
workflow_client = clients.workflow_engine
|
workflow_client = clients.workflow_engine
|
||||||
tripleoclients = clients.tripleoclient
|
tripleoclients = clients.tripleoclient
|
||||||
|
|
||||||
execution = base.start_workflow(
|
|
||||||
workflow_client,
|
|
||||||
'tripleo.deployment.v1.create_overcloudrc',
|
|
||||||
workflow_input=workflow_input
|
|
||||||
)
|
|
||||||
|
|
||||||
with tripleoclients.messaging_websocket() as ws:
|
with tripleoclients.messaging_websocket() as ws:
|
||||||
|
execution = base.start_workflow(
|
||||||
|
workflow_client,
|
||||||
|
'tripleo.deployment.v1.create_overcloudrc',
|
||||||
|
workflow_input=workflow_input
|
||||||
|
)
|
||||||
|
|
||||||
for payload in base.wait_for_messages(workflow_client, ws, execution):
|
for payload in base.wait_for_messages(workflow_client, ws, execution):
|
||||||
# the workflow will return the overcloudrc data, an error message
|
# the workflow will return the overcloudrc data, an error message
|
||||||
# or blank.
|
# or blank.
|
||||||
@ -281,13 +281,13 @@ def config_download_export(clients, **workflow_input):
|
|||||||
workflow_client = clients.workflow_engine
|
workflow_client = clients.workflow_engine
|
||||||
tripleoclients = clients.tripleoclient
|
tripleoclients = clients.tripleoclient
|
||||||
|
|
||||||
execution = base.start_workflow(
|
|
||||||
workflow_client,
|
|
||||||
'tripleo.deployment.v1.config_download_export',
|
|
||||||
workflow_input=workflow_input
|
|
||||||
)
|
|
||||||
|
|
||||||
with tripleoclients.messaging_websocket() as ws:
|
with tripleoclients.messaging_websocket() as ws:
|
||||||
|
execution = base.start_workflow(
|
||||||
|
workflow_client,
|
||||||
|
'tripleo.deployment.v1.config_download_export',
|
||||||
|
workflow_input=workflow_input
|
||||||
|
)
|
||||||
|
|
||||||
for payload in base.wait_for_messages(workflow_client, ws, execution,
|
for payload in base.wait_for_messages(workflow_client, ws, execution,
|
||||||
_WORKFLOW_TIMEOUT):
|
_WORKFLOW_TIMEOUT):
|
||||||
if 'message' in payload:
|
if 'message' in payload:
|
||||||
@ -323,13 +323,13 @@ def get_deployment_status(clients, **workflow_input):
|
|||||||
workflow_client = clients.workflow_engine
|
workflow_client = clients.workflow_engine
|
||||||
tripleoclients = clients.tripleoclient
|
tripleoclients = clients.tripleoclient
|
||||||
|
|
||||||
execution = base.start_workflow(
|
|
||||||
workflow_client,
|
|
||||||
'tripleo.deployment.v1.get_deployment_status',
|
|
||||||
workflow_input=workflow_input
|
|
||||||
)
|
|
||||||
|
|
||||||
with tripleoclients.messaging_websocket() as ws:
|
with tripleoclients.messaging_websocket() as ws:
|
||||||
|
execution = base.start_workflow(
|
||||||
|
workflow_client,
|
||||||
|
'tripleo.deployment.v1.get_deployment_status',
|
||||||
|
workflow_input=workflow_input
|
||||||
|
)
|
||||||
|
|
||||||
for payload in base.wait_for_messages(workflow_client, ws, execution,
|
for payload in base.wait_for_messages(workflow_client, ws, execution,
|
||||||
_WORKFLOW_TIMEOUT):
|
_WORKFLOW_TIMEOUT):
|
||||||
if 'message' in payload:
|
if 'message' in payload:
|
||||||
@ -347,13 +347,13 @@ def get_deployment_failures(clients, **workflow_input):
|
|||||||
workflow_client = clients.workflow_engine
|
workflow_client = clients.workflow_engine
|
||||||
tripleoclients = clients.tripleoclient
|
tripleoclients = clients.tripleoclient
|
||||||
|
|
||||||
execution = base.start_workflow(
|
|
||||||
workflow_client,
|
|
||||||
'tripleo.deployment.v1.get_deployment_failures',
|
|
||||||
workflow_input=workflow_input
|
|
||||||
)
|
|
||||||
|
|
||||||
with tripleoclients.messaging_websocket() as ws:
|
with tripleoclients.messaging_websocket() as ws:
|
||||||
|
execution = base.start_workflow(
|
||||||
|
workflow_client,
|
||||||
|
'tripleo.deployment.v1.get_deployment_failures',
|
||||||
|
workflow_input=workflow_input
|
||||||
|
)
|
||||||
|
|
||||||
for payload in base.wait_for_messages(workflow_client, ws, execution,
|
for payload in base.wait_for_messages(workflow_client, ws, execution,
|
||||||
_WORKFLOW_TIMEOUT):
|
_WORKFLOW_TIMEOUT):
|
||||||
if 'message' in payload:
|
if 'message' in payload:
|
||||||
|
@ -95,13 +95,13 @@ def delete_deployment_plan(clients, **workflow_input):
|
|||||||
workflow_client = clients.workflow_engine
|
workflow_client = clients.workflow_engine
|
||||||
tripleoclients = clients.tripleoclient
|
tripleoclients = clients.tripleoclient
|
||||||
|
|
||||||
execution = base.start_workflow(
|
|
||||||
workflow_client,
|
|
||||||
'tripleo.plan_management.v1.delete_deployment_plan',
|
|
||||||
workflow_input=workflow_input
|
|
||||||
)
|
|
||||||
|
|
||||||
with tripleoclients.messaging_websocket() as ws:
|
with tripleoclients.messaging_websocket() as ws:
|
||||||
|
execution = base.start_workflow(
|
||||||
|
workflow_client,
|
||||||
|
'tripleo.plan_management.v1.delete_deployment_plan',
|
||||||
|
workflow_input=workflow_input
|
||||||
|
)
|
||||||
|
|
||||||
for payload in base.wait_for_messages(workflow_client, ws, execution,
|
for payload in base.wait_for_messages(workflow_client, ws, execution,
|
||||||
_WORKFLOW_TIMEOUT):
|
_WORKFLOW_TIMEOUT):
|
||||||
if 'message' in payload:
|
if 'message' in payload:
|
||||||
@ -311,13 +311,13 @@ def export_deployment_plan(clients, **workflow_input):
|
|||||||
workflow_client = clients.workflow_engine
|
workflow_client = clients.workflow_engine
|
||||||
tripleoclients = clients.tripleoclient
|
tripleoclients = clients.tripleoclient
|
||||||
|
|
||||||
execution = base.start_workflow(
|
|
||||||
workflow_client,
|
|
||||||
'tripleo.plan_management.v1.export_deployment_plan',
|
|
||||||
workflow_input=workflow_input
|
|
||||||
)
|
|
||||||
|
|
||||||
with tripleoclients.messaging_websocket() as ws:
|
with tripleoclients.messaging_websocket() as ws:
|
||||||
|
execution = base.start_workflow(
|
||||||
|
workflow_client,
|
||||||
|
'tripleo.plan_management.v1.export_deployment_plan',
|
||||||
|
workflow_input=workflow_input
|
||||||
|
)
|
||||||
|
|
||||||
for payload in base.wait_for_messages(workflow_client, ws, execution,
|
for payload in base.wait_for_messages(workflow_client, ws, execution,
|
||||||
_WORKFLOW_TIMEOUT):
|
_WORKFLOW_TIMEOUT):
|
||||||
if 'message' in payload:
|
if 'message' in payload:
|
||||||
|
@ -83,23 +83,23 @@ def fetch_logs(clients, container, server_name, timeout=None,
|
|||||||
workflow_client = clients.workflow_engine
|
workflow_client = clients.workflow_engine
|
||||||
tripleoclients = clients.tripleoclient
|
tripleoclients = clients.tripleoclient
|
||||||
|
|
||||||
execution = base.start_workflow(
|
with tripleoclients.messaging_websocket() as ws:
|
||||||
workflow_client,
|
execution = base.start_workflow(
|
||||||
'tripleo.support.v1.fetch_logs',
|
workflow_client,
|
||||||
workflow_input=workflow_input
|
'tripleo.support.v1.fetch_logs',
|
||||||
)
|
workflow_input=workflow_input
|
||||||
|
)
|
||||||
|
|
||||||
websocket = tripleoclients.messaging_websocket()
|
messages = base.wait_for_messages(workflow_client,
|
||||||
messages = base.wait_for_messages(workflow_client,
|
ws,
|
||||||
websocket,
|
execution,
|
||||||
execution,
|
timeout)
|
||||||
timeout)
|
|
||||||
|
|
||||||
for message in messages:
|
for message in messages:
|
||||||
if message['status'] != 'SUCCESS':
|
if message['status'] != 'SUCCESS':
|
||||||
raise LogFetchError(message['message'])
|
raise LogFetchError(message['message'])
|
||||||
if message['message']:
|
if message['message']:
|
||||||
print('{}'.format(message['message']))
|
print('{}'.format(message['message']))
|
||||||
|
|
||||||
|
|
||||||
def delete_container(clients, container, timeout=None, concurrency=None):
|
def delete_container(clients, container, timeout=None, concurrency=None):
|
||||||
@ -122,20 +122,20 @@ def delete_container(clients, container, timeout=None, concurrency=None):
|
|||||||
workflow_client = clients.workflow_engine
|
workflow_client = clients.workflow_engine
|
||||||
tripleoclients = clients.tripleoclient
|
tripleoclients = clients.tripleoclient
|
||||||
|
|
||||||
execution = base.start_workflow(
|
with tripleoclients.messaging_websocket() as ws:
|
||||||
workflow_client,
|
execution = base.start_workflow(
|
||||||
'tripleo.support.v1.delete_container',
|
workflow_client,
|
||||||
workflow_input=workflow_input
|
'tripleo.support.v1.delete_container',
|
||||||
)
|
workflow_input=workflow_input
|
||||||
|
)
|
||||||
|
|
||||||
websocket = tripleoclients.messaging_websocket()
|
messages = base.wait_for_messages(workflow_client,
|
||||||
messages = base.wait_for_messages(workflow_client,
|
ws,
|
||||||
websocket,
|
execution,
|
||||||
execution,
|
timeout)
|
||||||
timeout)
|
|
||||||
|
|
||||||
for message in messages:
|
for message in messages:
|
||||||
if message['status'] != 'SUCCESS':
|
if message['status'] != 'SUCCESS':
|
||||||
raise ContainerDeleteFailed(message['message'])
|
raise ContainerDeleteFailed(message['message'])
|
||||||
if message['message']:
|
if message['message']:
|
||||||
print('{}'.format(message['message']))
|
print('{}'.format(message['message']))
|
||||||
|
Loading…
Reference in New Issue
Block a user