Handle multiple messages from workflow executions

Currently, tripleoclient will ignore messages from workflows if they
don't match the execution ID of the worflow it starts. This can happen
if the workflow starts sub-workflows, which run under their own
execution ID. tripleoclient will also stop listening for messages after
it recieves the first message from the workflow it started. Meaning, it
assumes the workflow to send one, and only one, message.

This change tackles the first limitation, by turning the message
consumption into a generator which yields each message one at a time.
It then starts a convention of assuming the workflow has finished if it
recieves a message with the top level execution ID and it contains a
status that isn't "RUNNING". For example: {"status": "SUCCESS"}.

Partial-Bug: #1646887
Change-Id: Ida07718b4fab41e0a6088f2db8f9b42b6fb09f72
This commit is contained in:
Dougal Matthews 2017-03-08 09:24:13 +00:00
parent cacaea1c93
commit 0747748743
4 changed files with 165 additions and 2 deletions

View File

@ -136,6 +136,9 @@ class WebsocketClient(object):
will block forever until a message is received. If no message is
received (for example, Zaqar is down) then it will block until manually
killed.
DEPRECATED: Use wait_for_messages. This method will be removed when
all commands have been migrated.
"""
if timeout is None:
@ -152,6 +155,31 @@ class WebsocketClient(object):
if body['payload']['execution']['id'] == execution_id:
return body['payload']
def wait_for_messages(self, timeout=None):
"""Wait for messages on a Zaqar queue
A timeout can be provided in seconds, if no timeout is provided it
will block forever until a message is received. If no message is
received (for example, Zaqar is down) then it will block until manually
killed.
If no timeout is provided this method will never stop waiting for new
messages. It is the responsibility of the consumer to stop consuming
messages.
"""
if timeout is None:
LOG.warning("Waiting for messages on queue '{}' with no timeout."
.format(self._queue_name))
self._ws.settimeout(timeout)
while True:
try:
yield self.recv()['body']['payload']
except websocket.WebSocketTimeoutException:
raise exceptions.WebSocketTimeout()
def __enter__(self):
"""Return self to allow usage as a context manager"""
return self

View File

@ -85,6 +85,47 @@ class TestPlugin(base.TestCase):
"execution": {"id": "IDID"},
})
@mock.patch.object(plugin.WebsocketClient, "recv")
@mock.patch("websocket.create_connection")
def test_handle_websocket_multiple(self, ws_create_connection, recv_mock):
send_ack = {
"headers": {
"status": 200
}
}
# Creating the websocket sends three messages and closing sends one.
# The one being tested is wrapped between these
recv_mock.side_effect = [send_ack, send_ack, send_ack, {
"body": {
"payload": {
"status": 200,
"message": "Result for IDID",
"execution": {"id": "IDID"},
}
}
}, send_ack]
clientmgr = mock.MagicMock()
clientmgr.get_endpoint_for_service_type.return_value = fakes.WS_URL
clientmgr.auth.get_token.return_value = "TOKEN"
clientmgr.auth_ref.project_id = "ID"
client = plugin.make_client(clientmgr)
with client.messaging_websocket() as ws:
for payload in ws.wait_for_messages():
self.assertEqual(payload, {
"status": 200,
"message": "Result for IDID",
"execution": {"id": "IDID"},
})
# We only want to test the first message, as there is only one.
# The last one, the send_ack will be used by the context
# manager when it is closed.
break
@mock.patch("websocket.create_connection")
def test_websocket_creation_error(self, ws_create_connection):

View File

@ -0,0 +1,60 @@
# -*- coding: utf-8 -*-
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import mock
from osc_lib.tests import utils
from tripleoclient import exceptions
from tripleoclient.workflows import base
class TestBaseWorkflows(utils.TestCommand):
def test_wait_for_messages_success(self):
payload_a = {
'status': 'ERROR',
'execution': {'id': 2}
}
payload_b = {
'status': 'ERROR',
'execution': {'id': 1}
}
mistral = mock.Mock()
websocket = mock.Mock()
websocket.wait_for_messages.return_value = iter([payload_a, payload_b])
execution = mock.Mock()
execution.id = 1
messages = list(base.wait_for_messages(mistral, websocket, execution))
self.assertEqual([payload_a, payload_b], messages)
self.assertFalse(mistral.executions.get.called)
websocket.wait_for_messages.assert_called_with(timeout=None)
def test_wait_for_messages_timeout(self):
mistral = mock.Mock()
websocket = mock.Mock()
websocket.wait_for_messages.side_effect = exceptions.WebSocketTimeout
execution = mock.Mock()
execution.id = 1
messages = base.wait_for_messages(mistral, websocket, execution)
self.assertRaises(exceptions.WebSocketTimeout, list, messages)
self.assertTrue(mistral.executions.get.called)
websocket.wait_for_messages.assert_called_with(timeout=None)

View File

@ -52,6 +52,9 @@ def wait_for_message(mistral, websocket, execution, timeout=None):
If a timeout is reached, called check_execution_status which will look up
the execution on Mistral and log information about it.
DEPRECATED: Use wait_for_messages. This method will be removed when
all commands have been migrated.
"""
try:
return websocket.wait_for_message(execution.id, timeout=timeout)
@ -60,6 +63,37 @@ def wait_for_message(mistral, websocket, execution, timeout=None):
raise
def wait_for_messages(mistral, websocket, execution, timeout=None):
"""Wait for messages on a websocket.
Given an instance of mistral client, a websocket and a Mistral execution
wait for messages on that websocket queue that match the execution ID until
the timeout is reached.
If no timeout is provided, this method will block forever.
If a timeout is reached, called check_execution_status which will look up
the execution on Mistral and log information about it.
"""
try:
for payload in websocket.wait_for_messages(timeout=timeout):
yield payload
# If the message is from a sub-workflow, we just need to pass it
# on to be displayed. This should never be the last message - so
# continue and wait for the next.
if payload['execution']['id'] != execution.id:
continue
# Check the status of the payload, if we are not given one
# default to running and assume it is just an "in progress"
# message from the workflow.
# Workflows should end with SUCCESS or ERROR statuses.
if payload.get('status', 'RUNNING') != "RUNNING":
raise StopIteration
except exceptions.WebSocketTimeout:
check_execution_status(mistral, execution.id)
raise
def check_execution_status(workflow_client, execution_id):
"""Check the status of a workflow that timeout when waiting for messages
@ -70,12 +104,12 @@ def check_execution_status(workflow_client, execution_id):
state = execution.state
if state == 'RUNNING':
message = ("The WebSocket timed out before the Workflow completed.")
message = "The WebSocket timed out before the Workflow completed."
elif state == 'SUCCESS':
message = ("The Workflow finished successfully but no messages were "
"received before the WebSocket timed out.")
elif state == 'ERROR':
message = ("The Workflow errored and no messages were received.")
message = "The Workflow errored and no messages were received."
else:
message = "Unknown Execution state."