Remove the single-message interface to websockets
This is longer used and the multiple-message interface should be used. Change-Id: I84a2d593ae3bad2559de530de8678925a2109547 Closes-Bug: #1646887
This commit is contained in:
@@ -126,35 +126,6 @@ class WebsocketClient(object):
|
|||||||
def recv(self):
|
def recv(self):
|
||||||
return json.loads(self._ws.recv())
|
return json.loads(self._ws.recv())
|
||||||
|
|
||||||
def wait_for_message(self, execution_id, timeout=None):
|
|
||||||
"""Wait for a message for a mistral execution ID
|
|
||||||
|
|
||||||
This method blocks until a message is received on the message queue
|
|
||||||
with the execution ID passed in.
|
|
||||||
|
|
||||||
A timeout can be provided in seconds, if no timeout is provided it
|
|
||||||
will block forever until a message is received. If no message is
|
|
||||||
received (for example, Zaqar is down) then it will block until manually
|
|
||||||
killed.
|
|
||||||
|
|
||||||
DEPRECATED: Use wait_for_messages. This method will be removed when
|
|
||||||
all commands have been migrated.
|
|
||||||
"""
|
|
||||||
|
|
||||||
if timeout is None:
|
|
||||||
LOG.warning("Waiting for messages on queue '{}' with no timeout."
|
|
||||||
.format(self._queue_name))
|
|
||||||
|
|
||||||
self._ws.settimeout(timeout)
|
|
||||||
|
|
||||||
while True:
|
|
||||||
try:
|
|
||||||
body = self.recv()['body']
|
|
||||||
except websocket.WebSocketTimeoutException:
|
|
||||||
raise exceptions.WebSocketTimeout()
|
|
||||||
if body['payload']['execution']['id'] == execution_id:
|
|
||||||
return body['payload']
|
|
||||||
|
|
||||||
def wait_for_messages(self, timeout=None):
|
def wait_for_messages(self, timeout=None):
|
||||||
"""Wait for messages on a Zaqar queue
|
"""Wait for messages on a Zaqar queue
|
||||||
|
|
||||||
|
@@ -43,11 +43,6 @@ class FakeClientManager(object):
|
|||||||
|
|
||||||
class FakeWebSocket(object):
|
class FakeWebSocket(object):
|
||||||
|
|
||||||
def wait_for_message(self, execution_id, timeout=None):
|
|
||||||
return {
|
|
||||||
'status': 'SUCCESS'
|
|
||||||
}
|
|
||||||
|
|
||||||
def wait_for_messages(self, timeout=None):
|
def wait_for_messages(self, timeout=None):
|
||||||
yield {
|
yield {
|
||||||
'execution': {'id': 'IDID'},
|
'execution': {'id': 'IDID'},
|
||||||
|
@@ -48,43 +48,6 @@ class TestPlugin(base.TestCase):
|
|||||||
self.assertEqual(clientmgr.get_endpoint_for_service_type.call_count, 2)
|
self.assertEqual(clientmgr.get_endpoint_for_service_type.call_count, 2)
|
||||||
ws_create_connection.assert_called_with("ws://0.0.0.0")
|
ws_create_connection.assert_called_with("ws://0.0.0.0")
|
||||||
|
|
||||||
@mock.patch.object(plugin.WebsocketClient, "recv")
|
|
||||||
@mock.patch("websocket.create_connection")
|
|
||||||
def test_handle_websocket(self, ws_create_connection, recv_mock):
|
|
||||||
|
|
||||||
send_ack = {
|
|
||||||
"headers": {
|
|
||||||
"status": 200
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
# Creating the websocket sends three messages and closing sends one.
|
|
||||||
# The one being tested is wrapped between these
|
|
||||||
recv_mock.side_effect = [send_ack, send_ack, send_ack, {
|
|
||||||
"body": {
|
|
||||||
"payload": {
|
|
||||||
"status": 200,
|
|
||||||
"message": "Result for IDID",
|
|
||||||
"execution": {"id": "IDID"},
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}, send_ack]
|
|
||||||
|
|
||||||
clientmgr = mock.MagicMock()
|
|
||||||
clientmgr.get_endpoint_for_service_type.return_value = fakes.WS_URL
|
|
||||||
clientmgr.auth.get_token.return_value = "TOKEN"
|
|
||||||
clientmgr.auth_ref.project_id = "ID"
|
|
||||||
|
|
||||||
client = plugin.make_client(clientmgr)
|
|
||||||
|
|
||||||
with client.messaging_websocket() as ws:
|
|
||||||
payload = ws.wait_for_message("IDID")
|
|
||||||
self.assertEqual(payload, {
|
|
||||||
"status": 200,
|
|
||||||
"message": "Result for IDID",
|
|
||||||
"execution": {"id": "IDID"},
|
|
||||||
})
|
|
||||||
|
|
||||||
@mock.patch.object(plugin.WebsocketClient, "recv")
|
@mock.patch.object(plugin.WebsocketClient, "recv")
|
||||||
@mock.patch("websocket.create_connection")
|
@mock.patch("websocket.create_connection")
|
||||||
def test_handle_websocket_multiple(self, ws_create_connection, recv_mock):
|
def test_handle_websocket_multiple(self, ws_create_connection, recv_mock):
|
||||||
|
@@ -352,7 +352,6 @@ pxe_ssh,192.168.122.2,stack,"KEY2",00:0b:d0:69:7e:58""")
|
|||||||
tripleoclient = self.app.client_manager.tripleoclient
|
tripleoclient = self.app.client_manager.tripleoclient
|
||||||
websocket = tripleoclient.messaging_websocket()
|
websocket = tripleoclient.messaging_websocket()
|
||||||
websocket.wait_for_messages.return_value = self.mock_websocket_success
|
websocket.wait_for_messages.return_value = self.mock_websocket_success
|
||||||
websocket.wait_for_message.side_effect = self.mock_websocket_success
|
|
||||||
self.websocket = websocket
|
self.websocket = websocket
|
||||||
|
|
||||||
uuid4_patcher = mock.patch('uuid.uuid4', return_value="UUID4")
|
uuid4_patcher = mock.patch('uuid.uuid4', return_value="UUID4")
|
||||||
|
@@ -90,10 +90,6 @@ class TestDeleteNode(fakes.TestDeleteNode):
|
|||||||
]
|
]
|
||||||
parsed_args = self.check_parser(self.cmd, argslist, verifylist)
|
parsed_args = self.check_parser(self.cmd, argslist, verifylist)
|
||||||
|
|
||||||
self.websocket.wait_for_message.return_value = {
|
|
||||||
"status": "SUCCESS"
|
|
||||||
}
|
|
||||||
|
|
||||||
self.stack_name.return_value = None
|
self.stack_name.return_value = None
|
||||||
|
|
||||||
self.assertRaises(exceptions.InvalidConfiguration,
|
self.assertRaises(exceptions.InvalidConfiguration,
|
||||||
@@ -170,10 +166,6 @@ class TestProvideNode(fakes.TestOvercloudNode):
|
|||||||
# Get the command object to test
|
# Get the command object to test
|
||||||
self.cmd = overcloud_node.ProvideNode(self.app, None)
|
self.cmd = overcloud_node.ProvideNode(self.app, None)
|
||||||
|
|
||||||
self.websocket.wait_for_message.return_value = {
|
|
||||||
"status": "SUCCESS",
|
|
||||||
"message": "Success"
|
|
||||||
}
|
|
||||||
self.websocket.wait_for_messages.return_value = iter([{
|
self.websocket.wait_for_messages.return_value = iter([{
|
||||||
"status": "SUCCESS",
|
"status": "SUCCESS",
|
||||||
"message": "Success"
|
"message": "Success"
|
||||||
@@ -383,10 +375,6 @@ class TestImportNode(fakes.TestOvercloudNode):
|
|||||||
|
|
||||||
def _check_workflow_call(self, parsed_args, introspect=False,
|
def _check_workflow_call(self, parsed_args, introspect=False,
|
||||||
provide=False, local=True, no_deploy_image=False):
|
provide=False, local=True, no_deploy_image=False):
|
||||||
self.websocket.wait_for_message.return_value = {
|
|
||||||
"status": "SUCCESS",
|
|
||||||
"message": "Success",
|
|
||||||
}
|
|
||||||
self.websocket.wait_for_messages.return_value = [{
|
self.websocket.wait_for_messages.return_value = [{
|
||||||
"status": "SUCCESS",
|
"status": "SUCCESS",
|
||||||
"message": "Success",
|
"message": "Success",
|
||||||
@@ -488,10 +476,6 @@ class TestConfigureNode(fakes.TestOvercloudNode):
|
|||||||
self.workflow = self.app.client_manager.workflow_engine
|
self.workflow = self.app.client_manager.workflow_engine
|
||||||
client = self.app.client_manager.tripleoclient
|
client = self.app.client_manager.tripleoclient
|
||||||
self.websocket = client.messaging_websocket()
|
self.websocket = client.messaging_websocket()
|
||||||
self.websocket.wait_for_message.return_value = {
|
|
||||||
"status": "SUCCESS",
|
|
||||||
"message": ""
|
|
||||||
}
|
|
||||||
self.websocket.wait_for_messages.return_value = iter([{
|
self.websocket.wait_for_messages.return_value = iter([{
|
||||||
"status": "SUCCESS",
|
"status": "SUCCESS",
|
||||||
"message": ""
|
"message": ""
|
||||||
|
@@ -41,28 +41,6 @@ def start_workflow(workflow_client, identifier, workflow_input):
|
|||||||
return execution
|
return execution
|
||||||
|
|
||||||
|
|
||||||
def wait_for_message(mistral, websocket, execution, timeout=None):
|
|
||||||
"""Wait for messages on a websocket.
|
|
||||||
|
|
||||||
Given an instance of mistral client, a websocket and a Mistral execution
|
|
||||||
wait for messages on that websocket queue that match the execution ID until
|
|
||||||
the timeout is reached.
|
|
||||||
|
|
||||||
If no timeout is provided, this method will block forever.
|
|
||||||
|
|
||||||
If a timeout is reached, called check_execution_status which will look up
|
|
||||||
the execution on Mistral and log information about it.
|
|
||||||
|
|
||||||
DEPRECATED: Use wait_for_messages. This method will be removed when
|
|
||||||
all commands have been migrated.
|
|
||||||
"""
|
|
||||||
try:
|
|
||||||
return websocket.wait_for_message(execution.id, timeout=timeout)
|
|
||||||
except exceptions.WebSocketTimeout:
|
|
||||||
check_execution_status(mistral, execution.id)
|
|
||||||
raise
|
|
||||||
|
|
||||||
|
|
||||||
def wait_for_messages(mistral, websocket, execution, timeout=None):
|
def wait_for_messages(mistral, websocket, execution, timeout=None):
|
||||||
"""Wait for messages on a websocket.
|
"""Wait for messages on a websocket.
|
||||||
|
|
||||||
|
Reference in New Issue
Block a user