Raise proper exception at webscocket close
After fixing a bug with https://review.openstack.org/#/c/603802/ the
return introduced there is bogus, we have to raise a proper exception
and handle it like timeouts, to get all the mistral allright.
Change-Id: Idcdbd38129f5694c5452f3f8aca0388df80476b2
(cherry picked from commit fee9f8cf15
)
This commit is contained in:
parent
7f8d8999d3
commit
531c1504ab
|
@ -28,6 +28,10 @@ class WebSocketTimeout(Exception):
|
||||||
"""Timed out waiting for messages on the websocket"""
|
"""Timed out waiting for messages on the websocket"""
|
||||||
|
|
||||||
|
|
||||||
|
class WebSocketConnectionClosed(Exception):
|
||||||
|
"""Websocket connection is closed before wait for messages"""
|
||||||
|
|
||||||
|
|
||||||
class NotFound(Exception):
|
class NotFound(Exception):
|
||||||
"""Resource not found"""
|
"""Resource not found"""
|
||||||
|
|
||||||
|
|
|
@ -146,20 +146,21 @@ class WebsocketClient(object):
|
||||||
messages.
|
messages.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
if not self._ws.connected:
|
|
||||||
return
|
|
||||||
|
|
||||||
if timeout is None:
|
if timeout is None:
|
||||||
LOG.warning("Waiting for messages on queue '{}' with no timeout."
|
LOG.warning("Waiting for messages on queue '{}' with no timeout."
|
||||||
.format(self._queue_name))
|
.format(self._queue_name))
|
||||||
|
|
||||||
self._ws.settimeout(timeout)
|
try:
|
||||||
|
self._ws.settimeout(timeout)
|
||||||
|
while True:
|
||||||
|
message = self.recv()
|
||||||
|
LOG.debug(message)
|
||||||
|
yield message['body']['payload']
|
||||||
|
|
||||||
while True:
|
except websocket.WebSocketTimeoutException:
|
||||||
try:
|
raise exceptions.WebSocketTimeout()
|
||||||
yield self.recv()['body']['payload']
|
except websocket.WebSocketConnectionClosedException:
|
||||||
except websocket.WebSocketTimeoutException:
|
raise exceptions.WebSocketConnectionClosed()
|
||||||
raise exceptions.WebSocketTimeout()
|
|
||||||
|
|
||||||
def __enter__(self):
|
def __enter__(self):
|
||||||
"""Return self to allow usage as a context manager"""
|
"""Return self to allow usage as a context manager"""
|
||||||
|
|
|
@ -16,7 +16,7 @@ import mock
|
||||||
|
|
||||||
from osc_lib.tests import utils
|
from osc_lib.tests import utils
|
||||||
|
|
||||||
from tripleoclient import exceptions
|
from tripleoclient import exceptions as ex
|
||||||
from tripleoclient.workflows import base
|
from tripleoclient.workflows import base
|
||||||
|
|
||||||
|
|
||||||
|
@ -48,13 +48,28 @@ class TestBaseWorkflows(utils.TestCommand):
|
||||||
def test_wait_for_messages_timeout(self):
|
def test_wait_for_messages_timeout(self):
|
||||||
mistral = mock.Mock()
|
mistral = mock.Mock()
|
||||||
websocket = mock.Mock()
|
websocket = mock.Mock()
|
||||||
websocket.wait_for_messages.side_effect = exceptions.WebSocketTimeout
|
websocket.wait_for_messages.side_effect = ex.WebSocketTimeout
|
||||||
execution = mock.Mock()
|
execution = mock.Mock()
|
||||||
execution.id = 1
|
execution.id = 1
|
||||||
|
|
||||||
messages = base.wait_for_messages(mistral, websocket, execution)
|
messages = base.wait_for_messages(mistral, websocket, execution)
|
||||||
|
|
||||||
self.assertRaises(exceptions.WebSocketTimeout, list, messages)
|
self.assertRaises(ex.WebSocketTimeout, list, messages)
|
||||||
|
|
||||||
|
self.assertTrue(mistral.executions.get.called)
|
||||||
|
websocket.wait_for_messages.assert_called_with(timeout=None)
|
||||||
|
|
||||||
|
def test_wait_for_messages_connection_closed(self):
|
||||||
|
mistral = mock.Mock()
|
||||||
|
websocket = mock.Mock()
|
||||||
|
websocket.wait_for_messages.side_effect = ex.WebSocketConnectionClosed
|
||||||
|
|
||||||
|
execution = mock.Mock()
|
||||||
|
execution.id = 1
|
||||||
|
|
||||||
|
messages = base.wait_for_messages(mistral, websocket, execution)
|
||||||
|
|
||||||
|
self.assertRaises(ex.WebSocketConnectionClosed, list, messages)
|
||||||
|
|
||||||
self.assertTrue(mistral.executions.get.called)
|
self.assertTrue(mistral.executions.get.called)
|
||||||
websocket.wait_for_messages.assert_called_with(timeout=None)
|
websocket.wait_for_messages.assert_called_with(timeout=None)
|
||||||
|
@ -78,5 +93,5 @@ class TestBaseWorkflows(utils.TestCommand):
|
||||||
result.state = 'ERROR'
|
result.state = 'ERROR'
|
||||||
mistral.action_executions.create = mock.Mock(return_value=result)
|
mistral.action_executions.create = mock.Mock(return_value=result)
|
||||||
|
|
||||||
self.assertRaises(exceptions.WorkflowActionError,
|
self.assertRaises(ex.WorkflowActionError,
|
||||||
base.call_action, mistral, action)
|
base.call_action, mistral, action)
|
||||||
|
|
|
@ -71,8 +71,8 @@ def wait_for_messages(mistral, websocket, execution, timeout=None):
|
||||||
# Workflows should end with SUCCESS or ERROR statuses.
|
# Workflows should end with SUCCESS or ERROR statuses.
|
||||||
if payload.get('status', 'RUNNING') != "RUNNING" or \
|
if payload.get('status', 'RUNNING') != "RUNNING" or \
|
||||||
mistral.executions.get(execution.id).state != "RUNNING":
|
mistral.executions.get(execution.id).state != "RUNNING":
|
||||||
raise StopIteration
|
return
|
||||||
except exceptions.WebSocketTimeout:
|
except (exceptions.WebSocketTimeout, exceptions.WebSocketConnectionClosed):
|
||||||
check_execution_status(mistral, execution.id)
|
check_execution_status(mistral, execution.id)
|
||||||
raise
|
raise
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue