diff --git a/requirements.txt b/requirements.txt index 92890a85b..e5daf2b45 100644 --- a/requirements.txt +++ b/requirements.txt @@ -17,4 +17,3 @@ six>=1.10.0 # MIT osc-lib>=1.8.0 # Apache-2.0 websocket-client<=0.40.0,>=0.33.0 # LGPLv2+ tripleo-common>=7.1.0 # Apache-2.0 -python-zaqarclient>=1.0.0 # Apache-2.0 diff --git a/tripleoclient/workflows/base.py b/tripleoclient/workflows/base.py index 7592a69fc..3a1514f94 100644 --- a/tripleoclient/workflows/base.py +++ b/tripleoclient/workflows/base.py @@ -69,7 +69,8 @@ def wait_for_messages(mistral, websocket, execution, timeout=None): # default to running and assume it is just an "in progress" # message from the workflow. # Workflows should end with SUCCESS or ERROR statuses. - if payload.get('status', 'RUNNING') != "RUNNING": + if payload.get('status', 'RUNNING') != "RUNNING" or \ + mistral.executions.get(execution.id).state != "RUNNING": raise StopIteration except exceptions.WebSocketTimeout: check_execution_status(mistral, execution.id) diff --git a/tripleoclient/workflows/package_update.py b/tripleoclient/workflows/package_update.py index 5ebbcd823..941ecbbd2 100644 --- a/tripleoclient/workflows/package_update.py +++ b/tripleoclient/workflows/package_update.py @@ -21,7 +21,6 @@ from tripleoclient import exceptions from tripleoclient import utils from tripleoclient.workflows import base -from zaqarclient.transport import errors as zaqar_errors def update(clients, **workflow_input): @@ -78,8 +77,7 @@ def get_config(clients, **workflow_input): def update_ansible(clients, **workflow_input): workflow_client = clients.workflow_engine tripleoclients = clients.tripleoclient - zaqar = clients.messaging - queue = zaqar.queue(workflow_input['ansible_queue_name']) + ansible_queue = workflow_input['ansible_queue_name'] with tripleoclients.messaging_websocket() as ws: execution = base.start_workflow( @@ -87,22 +85,17 @@ def update_ansible(clients, **workflow_input): 'tripleo.package_update.v1.update_nodes', workflow_input=workflow_input ) - timeout = time.time() + 600 - # First we need to wait for the first item in the queue - while queue.stats['messages']['total'] == 0 or time.time() == timeout: - pass - # Then we can start to claim the queue - while workflow_client.executions.get(execution.id).state == 'RUNNING': - try: - claim = queue.claim(ttl=600, grace=600) - for message in claim: - pprint.pprint( - message.body['payload']['message'].splitlines()) - message.delete() - except zaqar_errors.ServiceUnavailableError: - pass - # clean the Queue - queue.delete() + + with tripleoclients.messaging_websocket(ansible_queue) as update_ws: + for payload in base.wait_for_messages(workflow_client, + update_ws, + execution): + # Need to sleep a little, to let the time for the execution + # to get the right status in between. It avoid to fall in the + # while True loop to get messages + time.sleep(5) + if payload.get('message'): + pprint.pprint(payload['message'].splitlines()) for payload in base.wait_for_messages(workflow_client, ws, execution): if payload.get('message'):