Avoid race condition when setting up websocket

There is a potential for a race condition if the websocket is not set
up before the call is made to Mistral. The websocket setup can take
sufficient time to set up, and if an error is thrown immediately from
a workflow, it has the potential of returning a message, and having
the websocket consume it before the websocket client is even set up.

Also, I think this has the benefit of keeping the call to Mistral
from even being made if there is something wrong with the websocket
setup.

Change-Id: Ib331037a7f5f4e59862d2b9646a83acdb18313eb
This commit is contained in:
Brad P. Crochet 2017-03-29 22:25:34 -04:00 committed by Dougal Matthews
parent 27d43fffff
commit c9b9e65059
7 changed files with 103 additions and 99 deletions

View File

@ -28,13 +28,13 @@ def register_or_update(clients, **workflow_input):
tripleoclients = clients.tripleoclient tripleoclients = clients.tripleoclient
queue_name = workflow_input['queue_name'] queue_name = workflow_input['queue_name']
execution = base.start_workflow(
workflow_client,
'tripleo.baremetal.v1.register_or_update',
workflow_input=workflow_input
)
with tripleoclients.messaging_websocket(queue_name) as ws: with tripleoclients.messaging_websocket(queue_name) as ws:
execution = base.start_workflow(
workflow_client,
'tripleo.baremetal.v1.register_or_update',
workflow_input=workflow_input
)
for payload in base.wait_for_messages(workflow_client, ws, execution): for payload in base.wait_for_messages(workflow_client, ws, execution):
if 'message' in payload: if 'message' in payload:
print(payload['message']) print(payload['message'])
@ -74,14 +74,14 @@ def provide(clients, **workflow_input):
tripleoclients = clients.tripleoclient tripleoclients = clients.tripleoclient
queue_name = workflow_input['queue_name'] queue_name = workflow_input['queue_name']
execution = base.start_workflow(
workflow_client,
'tripleo.baremetal.v1.provide',
workflow_input={'node_uuids': workflow_input['node_uuids'],
'queue_name': queue_name}
)
with tripleoclients.messaging_websocket(queue_name) as ws: with tripleoclients.messaging_websocket(queue_name) as ws:
execution = base.start_workflow(
workflow_client,
'tripleo.baremetal.v1.provide',
workflow_input={'node_uuids': workflow_input['node_uuids'],
'queue_name': queue_name}
)
for payload in base.wait_for_messages(workflow_client, ws, execution): for payload in base.wait_for_messages(workflow_client, ws, execution):
if 'message' in payload: if 'message' in payload:
print(payload['message']) print(payload['message'])
@ -105,17 +105,19 @@ def introspect(clients, **workflow_input):
tripleoclients = clients.tripleoclient tripleoclients = clients.tripleoclient
queue_name = workflow_input['queue_name'] queue_name = workflow_input['queue_name']
execution = base.start_workflow(
workflow_client,
'tripleo.baremetal.v1.introspect',
workflow_input={'node_uuids': workflow_input['node_uuids'],
'run_validations': workflow_input['run_validations'],
'queue_name': queue_name}
)
print("Waiting for introspection to finish...") print("Waiting for introspection to finish...")
with tripleoclients.messaging_websocket(queue_name) as ws: with tripleoclients.messaging_websocket(queue_name) as ws:
execution = base.start_workflow(
workflow_client,
'tripleo.baremetal.v1.introspect',
workflow_input={
'node_uuids': workflow_input['node_uuids'],
'run_validations': workflow_input['run_validations'],
'queue_name': queue_name
}
)
for payload in base.wait_for_messages(workflow_client, ws, execution): for payload in base.wait_for_messages(workflow_client, ws, execution):
if 'message' in payload: if 'message' in payload:
print(payload['message']) print(payload['message'])
@ -140,19 +142,21 @@ def introspect_manageable_nodes(clients, **workflow_input):
tripleoclients = clients.tripleoclient tripleoclients = clients.tripleoclient
queue_name = workflow_input['queue_name'] queue_name = workflow_input['queue_name']
execution = base.start_workflow(
workflow_client,
'tripleo.baremetal.v1.introspect_manageable_nodes',
workflow_input={'run_validations': workflow_input['run_validations'],
"queue_name": queue_name, }
)
print("Waiting for introspection to finish...") print("Waiting for introspection to finish...")
errors = [] errors = []
successful_node_uuids = set() successful_node_uuids = set()
with tripleoclients.messaging_websocket(queue_name) as ws: with tripleoclients.messaging_websocket(queue_name) as ws:
execution = base.start_workflow(
workflow_client,
'tripleo.baremetal.v1.introspect_manageable_nodes',
workflow_input={
'run_validations': workflow_input['run_validations'],
'queue_name': queue_name,
}
)
for payload in base.wait_for_messages(workflow_client, ws, execution): for payload in base.wait_for_messages(workflow_client, ws, execution):
if 'message' in payload: if 'message' in payload:
print(payload['message']) print(payload['message'])
@ -192,13 +196,13 @@ def provide_manageable_nodes(clients, **workflow_input):
tripleoclients = clients.tripleoclient tripleoclients = clients.tripleoclient
queue_name = workflow_input['queue_name'] queue_name = workflow_input['queue_name']
execution = base.start_workflow(
workflow_client,
'tripleo.baremetal.v1.provide_manageable_nodes',
workflow_input={"queue_name": queue_name, }
)
with tripleoclients.messaging_websocket(queue_name) as ws: with tripleoclients.messaging_websocket(queue_name) as ws:
execution = base.start_workflow(
workflow_client,
'tripleo.baremetal.v1.provide_manageable_nodes',
workflow_input={"queue_name": queue_name, }
)
for payload in base.wait_for_messages(workflow_client, ws, execution): for payload in base.wait_for_messages(workflow_client, ws, execution):
if 'message' in payload: if 'message' in payload:
print(payload['message']) print(payload['message'])
@ -220,13 +224,13 @@ def configure(clients, **workflow_input):
ooo_client = clients.tripleoclient ooo_client = clients.tripleoclient
queue_name = workflow_input['queue_name'] queue_name = workflow_input['queue_name']
execution = base.start_workflow(
workflow_client,
'tripleo.baremetal.v1.configure',
workflow_input=workflow_input
)
with ooo_client.messaging_websocket(queue_name) as ws: with ooo_client.messaging_websocket(queue_name) as ws:
execution = base.start_workflow(
workflow_client,
'tripleo.baremetal.v1.configure',
workflow_input=workflow_input
)
for payload in base.wait_for_messages(workflow_client, ws, execution): for payload in base.wait_for_messages(workflow_client, ws, execution):
if 'message' in payload: if 'message' in payload:
print(payload['message']) print(payload['message'])
@ -246,13 +250,13 @@ def configure_manageable_nodes(clients, **workflow_input):
ooo_client = clients.tripleoclient ooo_client = clients.tripleoclient
queue_name = workflow_input['queue_name'] queue_name = workflow_input['queue_name']
execution = base.start_workflow(
workflow_client,
'tripleo.baremetal.v1.configure_manageable_nodes',
workflow_input=workflow_input
)
with ooo_client.messaging_websocket(queue_name) as ws: with ooo_client.messaging_websocket(queue_name) as ws:
execution = base.start_workflow(
workflow_client,
'tripleo.baremetal.v1.configure_manageable_nodes',
workflow_input=workflow_input
)
for payload in base.wait_for_messages(workflow_client, ws, execution): for payload in base.wait_for_messages(workflow_client, ws, execution):
if 'message' in payload: if 'message' in payload:
print(payload['message']) print(payload['message'])
@ -274,15 +278,15 @@ def create_raid_configuration(clients, **workflow_input):
ooo_client = clients.tripleoclient ooo_client = clients.tripleoclient
queue_name = workflow_input['queue_name'] queue_name = workflow_input['queue_name']
execution = base.start_workflow(
workflow_client,
'tripleo.baremetal.v1.create_raid_configuration',
workflow_input=workflow_input
)
print('Creating RAID configuration for given nodes, this may take time') print('Creating RAID configuration for given nodes, this may take time')
with ooo_client.messaging_websocket(queue_name) as ws: with ooo_client.messaging_websocket(queue_name) as ws:
execution = base.start_workflow(
workflow_client,
'tripleo.baremetal.v1.create_raid_configuration',
workflow_input=workflow_input
)
for payload in base.wait_for_messages(workflow_client, ws, execution): for payload in base.wait_for_messages(workflow_client, ws, execution):
if 'message' in payload: if 'message' in payload:
print(payload['message']) print(payload['message'])

View File

@ -30,13 +30,13 @@ def deploy(clients, **workflow_input):
tripleoclients = clients.tripleoclient tripleoclients = clients.tripleoclient
queue_name = workflow_input['queue_name'] queue_name = workflow_input['queue_name']
execution = base.start_workflow(
workflow_client,
'tripleo.deployment.v1.deploy_plan',
workflow_input=workflow_input
)
with tripleoclients.messaging_websocket(queue_name) as ws: with tripleoclients.messaging_websocket(queue_name) as ws:
execution = base.start_workflow(
workflow_client,
'tripleo.deployment.v1.deploy_plan',
workflow_input=workflow_input
)
# The deploy workflow ends once the Heat create/update starts. This # The deploy workflow ends once the Heat create/update starts. This
# means that is shouldn't take very long. Wait for six minutes for # means that is shouldn't take very long. Wait for six minutes for
# messages from the workflow. # messages from the workflow.

View File

@ -25,13 +25,13 @@ def update(clients, **workflow_input):
tripleoclients = clients.tripleoclient tripleoclients = clients.tripleoclient
queue_name = workflow_input['queue_name'] queue_name = workflow_input['queue_name']
execution = base.start_workflow(
workflow_client,
'tripleo.package_update.v1.package_update_plan',
workflow_input=workflow_input
)
with tripleoclients.messaging_websocket(queue_name) as ws: with tripleoclients.messaging_websocket(queue_name) as ws:
execution = base.start_workflow(
workflow_client,
'tripleo.package_update.v1.package_update_plan',
workflow_input=workflow_input
)
for payload in base.wait_for_messages(workflow_client, ws, execution): for payload in base.wait_for_messages(workflow_client, ws, execution):
assert payload['status'] == "SUCCESS", pprint.pformat(payload) assert payload['status'] == "SUCCESS", pprint.pformat(payload)
@ -91,12 +91,12 @@ def clear_breakpoints(clients, **workflow_input):
workflow_input['queue_name'] = str(uuid.uuid4()) workflow_input['queue_name'] = str(uuid.uuid4())
queue_name = workflow_input['queue_name'] queue_name = workflow_input['queue_name']
execution = base.start_workflow(
workflow_client,
'tripleo.package_update.v1.clear_breakpoints',
workflow_input=workflow_input
)
with tripleoclients.messaging_websocket(queue_name) as ws: with tripleoclients.messaging_websocket(queue_name) as ws:
execution = base.start_workflow(
workflow_client,
'tripleo.package_update.v1.clear_breakpoints',
workflow_input=workflow_input
)
for payload in base.wait_for_messages(workflow_client, ws, execution): for payload in base.wait_for_messages(workflow_client, ws, execution):
assert payload['status'] == "SUCCESS", pprint.pformat(payload) assert payload['status'] == "SUCCESS", pprint.pformat(payload)

View File

@ -35,13 +35,13 @@ def get_overcloud_passwords(clients, **workflow_input):
tripleoclients = clients.tripleoclient tripleoclients = clients.tripleoclient
queue_name = workflow_input['queue_name'] queue_name = workflow_input['queue_name']
execution = base.start_workflow(
workflow_client,
'tripleo.plan_management.v1.get_passwords',
workflow_input=workflow_input
)
with tripleoclients.messaging_websocket(queue_name) as ws: with tripleoclients.messaging_websocket(queue_name) as ws:
execution = base.start_workflow(
workflow_client,
'tripleo.plan_management.v1.get_passwords',
workflow_input=workflow_input
)
# Getting the passwords is a quick operation, but to allow space for # Getting the passwords is a quick operation, but to allow space for
# delays or heavy loads, timeout after 60 seconds. # delays or heavy loads, timeout after 60 seconds.
for payload in base.wait_for_messages(workflow_client, ws, execution, for payload in base.wait_for_messages(workflow_client, ws, execution,

View File

@ -48,13 +48,13 @@ def create_default_plan(clients, **workflow_input):
tripleoclients = clients.tripleoclient tripleoclients = clients.tripleoclient
queue_name = workflow_input['queue_name'] queue_name = workflow_input['queue_name']
execution = base.start_workflow(
workflow_client,
'tripleo.plan_management.v1.create_default_deployment_plan',
workflow_input=workflow_input
)
with tripleoclients.messaging_websocket(queue_name) as ws: with tripleoclients.messaging_websocket(queue_name) as ws:
execution = base.start_workflow(
workflow_client,
'tripleo.plan_management.v1.create_default_deployment_plan',
workflow_input=workflow_input
)
for payload in base.wait_for_messages(workflow_client, ws, execution, for payload in base.wait_for_messages(workflow_client, ws, execution,
_WORKFLOW_TIMEOUT): _WORKFLOW_TIMEOUT):
if 'message' in payload: if 'message' in payload:
@ -72,12 +72,12 @@ def _create_update_deployment_plan(clients, workflow, **workflow_input):
tripleoclients = clients.tripleoclient tripleoclients = clients.tripleoclient
queue_name = workflow_input['queue_name'] queue_name = workflow_input['queue_name']
execution = base.start_workflow(
workflow_client, workflow,
workflow_input=workflow_input
)
with tripleoclients.messaging_websocket(queue_name) as ws: with tripleoclients.messaging_websocket(queue_name) as ws:
execution = base.start_workflow(
workflow_client, workflow,
workflow_input=workflow_input
)
for payload in base.wait_for_messages(workflow_client, ws, execution, for payload in base.wait_for_messages(workflow_client, ws, execution,
_WORKFLOW_TIMEOUT): _WORKFLOW_TIMEOUT):
if 'message' in payload: if 'message' in payload:

View File

@ -26,13 +26,13 @@ def delete_node(clients, **workflow_input):
tripleoclients = clients.tripleoclient tripleoclients = clients.tripleoclient
queue_name = workflow_input['queue_name'] queue_name = workflow_input['queue_name']
execution = base.start_workflow(
workflow_client,
'tripleo.scale.v1.delete_node',
workflow_input=workflow_input
)
with tripleoclients.messaging_websocket(queue_name) as ws: with tripleoclients.messaging_websocket(queue_name) as ws:
execution = base.start_workflow(
workflow_client,
'tripleo.scale.v1.delete_node',
workflow_input=workflow_input
)
for payload in base.wait_for_messages(workflow_client, ws, execution, for payload in base.wait_for_messages(workflow_client, ws, execution,
360): 360):
if payload['status'] != "SUCCESS": if payload['status'] != "SUCCESS":

View File

@ -37,13 +37,13 @@ def delete_stack(clients, stack):
queue_name = workflow_input['queue_name'] queue_name = workflow_input['queue_name']
execution = base.start_workflow(
workflow_client,
'tripleo.stack.v1.delete_stack',
workflow_input=workflow_input
)
with tripleoclient.messaging_websocket(queue_name) as ws: with tripleoclient.messaging_websocket(queue_name) as ws:
execution = base.start_workflow(
workflow_client,
'tripleo.stack.v1.delete_stack',
workflow_input=workflow_input
)
for payload in base.wait_for_messages(workflow_client, ws, execution): for payload in base.wait_for_messages(workflow_client, ws, execution):
if payload['status'] != "SUCCESS": if payload['status'] != "SUCCESS":
raise InvalidConfiguration(payload['message']) raise InvalidConfiguration(payload['message'])