Add a method to wait for messages on the websocket
This also adds the ability to use the websocket client as a context manager which simplifies the requirement of calling cleanup. Co-Authored-By: Dougal Matthews <dougal@redhat.com> Change-Id: I80ce73a4fdf6d1f12495960f2c4bf460f1875e4e
This commit is contained in:
parent
c20e88ace3
commit
2f652b44ca
|
@ -121,6 +121,28 @@ class WebsocketClient(object):
|
|||
def recv(self):
|
||||
return json.loads(self._ws.recv())
|
||||
|
||||
def wait_for_message(self, execution_id):
|
||||
"""Wait for a message for a mistral execution ID
|
||||
|
||||
This blocks until a message is received on the provided queue name
|
||||
with the execution ID.
|
||||
|
||||
TODO(d0ugal): Add a timeout/break for the case when a message is
|
||||
never arrives.
|
||||
"""
|
||||
while True:
|
||||
body = self.recv()['body']
|
||||
if body['payload']['execution']['id'] == execution_id:
|
||||
return body['payload']
|
||||
|
||||
def __enter__(self):
|
||||
"""Return self to allow usage as a context manager"""
|
||||
return self
|
||||
|
||||
def __exit__(self, *exc):
|
||||
"""Call cleanup when exiting the context manager"""
|
||||
self.cleanup()
|
||||
|
||||
|
||||
class ClientWrapper(object):
|
||||
|
||||
|
@ -130,7 +152,6 @@ class ClientWrapper(object):
|
|||
|
||||
def messaging_websocket(self, queue_name='tripleo'):
|
||||
"""Returns a websocket for the messaging service"""
|
||||
|
||||
if self._messaging_websocket is not None:
|
||||
return self._messaging_websocket
|
||||
self._messaging_websocket = WebsocketClient(self._instance, queue_name)
|
||||
|
|
|
@ -47,3 +47,41 @@ class TestPlugin(base.TestCase):
|
|||
self.assertEqual(clientmgr.auth.get_token.call_count, 1)
|
||||
self.assertEqual(clientmgr.get_endpoint_for_service_type.call_count, 1)
|
||||
ws_create_connection.assert_called_once_with("ws://0.0.0.0")
|
||||
|
||||
@mock.patch.object(plugin.WebsocketClient, "recv")
|
||||
@mock.patch("websocket.create_connection")
|
||||
def test_handle_websocket(self, ws_create_connection, recv_mock):
|
||||
|
||||
send_ack = {
|
||||
"headers": {
|
||||
"status": 200
|
||||
}
|
||||
}
|
||||
|
||||
# Creating the websocket sends three messages and closing sends one.
|
||||
# The one being tested is wrapped between these
|
||||
recv_mock.side_effect = [send_ack, send_ack, send_ack, {
|
||||
"body": {
|
||||
"payload": {
|
||||
"status": 200,
|
||||
"message": "Result for IDID",
|
||||
"execution": {"id": "IDID"},
|
||||
}
|
||||
}
|
||||
}, send_ack]
|
||||
|
||||
clientmgr = mock.MagicMock()
|
||||
clientmgr._api_version.__getitem__.return_value = '1'
|
||||
clientmgr.get_endpoint_for_service_type.return_value = fakes.AUTH_URL
|
||||
clientmgr.auth.get_token.return_value = "TOKEN"
|
||||
clientmgr.identity.projects.get.return_value = mock.MagicMock(id="ID")
|
||||
|
||||
client = plugin.make_client(clientmgr)
|
||||
|
||||
with client.messaging_websocket() as ws:
|
||||
payload = ws.wait_for_message("IDID")
|
||||
self.assertEqual(payload, {
|
||||
"status": 200,
|
||||
"message": "Result for IDID",
|
||||
"execution": {"id": "IDID"},
|
||||
})
|
||||
|
|
Loading…
Reference in New Issue