Get message from websocket instead from zaqarclient directly

Use the websocket api to get and print the real time
ansible execution for minor update instead of using the
zaqarclient call directly.
This is safer and avoid brutal claim of the messages.

Change-Id: I7e324b9e037197082c23a19b4e4b8832daaf5aee
(cherry picked from commit eaa2e9ce75)
This commit is contained in:
Mathieu Bultel 2017-12-06 08:39:00 +01:00 committed by mandreou
parent a187d18ddc
commit f31795fb7e
3 changed files with 14 additions and 21 deletions

View File

@ -17,4 +17,3 @@ six>=1.10.0 # MIT
osc-lib>=1.8.0 # Apache-2.0 osc-lib>=1.8.0 # Apache-2.0
websocket-client<=0.40.0,>=0.33.0 # LGPLv2+ websocket-client<=0.40.0,>=0.33.0 # LGPLv2+
tripleo-common>=7.1.0 # Apache-2.0 tripleo-common>=7.1.0 # Apache-2.0
python-zaqarclient>=1.0.0 # Apache-2.0

View File

@ -69,7 +69,8 @@ def wait_for_messages(mistral, websocket, execution, timeout=None):
# default to running and assume it is just an "in progress" # default to running and assume it is just an "in progress"
# message from the workflow. # message from the workflow.
# Workflows should end with SUCCESS or ERROR statuses. # Workflows should end with SUCCESS or ERROR statuses.
if payload.get('status', 'RUNNING') != "RUNNING": if payload.get('status', 'RUNNING') != "RUNNING" or \
mistral.executions.get(execution.id).state != "RUNNING":
raise StopIteration raise StopIteration
except exceptions.WebSocketTimeout: except exceptions.WebSocketTimeout:
check_execution_status(mistral, execution.id) check_execution_status(mistral, execution.id)

View File

@ -21,7 +21,6 @@ from tripleoclient import exceptions
from tripleoclient import utils from tripleoclient import utils
from tripleoclient.workflows import base from tripleoclient.workflows import base
from zaqarclient.transport import errors as zaqar_errors
def update(clients, **workflow_input): def update(clients, **workflow_input):
@ -78,8 +77,7 @@ def get_config(clients, **workflow_input):
def update_ansible(clients, **workflow_input): def update_ansible(clients, **workflow_input):
workflow_client = clients.workflow_engine workflow_client = clients.workflow_engine
tripleoclients = clients.tripleoclient tripleoclients = clients.tripleoclient
zaqar = clients.messaging ansible_queue = workflow_input['ansible_queue_name']
queue = zaqar.queue(workflow_input['ansible_queue_name'])
with tripleoclients.messaging_websocket() as ws: with tripleoclients.messaging_websocket() as ws:
execution = base.start_workflow( execution = base.start_workflow(
@ -87,22 +85,17 @@ def update_ansible(clients, **workflow_input):
'tripleo.package_update.v1.update_nodes', 'tripleo.package_update.v1.update_nodes',
workflow_input=workflow_input workflow_input=workflow_input
) )
timeout = time.time() + 600
# First we need to wait for the first item in the queue with tripleoclients.messaging_websocket(ansible_queue) as update_ws:
while queue.stats['messages']['total'] == 0 or time.time() == timeout: for payload in base.wait_for_messages(workflow_client,
pass update_ws,
# Then we can start to claim the queue execution):
while workflow_client.executions.get(execution.id).state == 'RUNNING': # Need to sleep a little, to let the time for the execution
try: # to get the right status in between. It avoid to fall in the
claim = queue.claim(ttl=600, grace=600) # while True loop to get messages
for message in claim: time.sleep(5)
pprint.pprint( if payload.get('message'):
message.body['payload']['message'].splitlines()) pprint.pprint(payload['message'].splitlines())
message.delete()
except zaqar_errors.ServiceUnavailableError:
pass
# clean the Queue
queue.delete()
for payload in base.wait_for_messages(workflow_client, ws, execution): for payload in base.wait_for_messages(workflow_client, ws, execution):
if payload.get('message'): if payload.get('message'):