Add an optional timeout when waiting for websocket messages
This patch adds a mechanism for setting a timeout when waiting for websocket messages. It then adds it to workflow executions which are fairly predictable. This means that they always take roughly the same length of time. Other workflows like baremetal introspection can be much slower or quicker depending on the the users environment. Closes-Bug: #1618445 Change-Id: I656735d58b1b676148e6ceacfc9861b3c5f44e5d
This commit is contained in:
parent
6ffe1ca57b
commit
579d1b1318
|
@ -22,7 +22,10 @@ class Timeout(Exception):
|
|||
|
||||
class WorkflowServiceError(Exception):
|
||||
"""The service type is unknown"""
|
||||
pass
|
||||
|
||||
|
||||
class WebSocketTimeout(Exception):
|
||||
"""Timed out waiting for messages on the websocket"""
|
||||
|
||||
|
||||
class NotFound(Exception):
|
||||
|
|
|
@ -24,6 +24,8 @@ from osc_lib import utils
|
|||
from swiftclient import client as swift_client
|
||||
import websocket
|
||||
|
||||
from tripleoclient import exceptions
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
DEFAULT_TRIPLEOCLIENT_API_VERSION = '1'
|
||||
|
@ -124,17 +126,29 @@ class WebsocketClient(object):
|
|||
def recv(self):
|
||||
return json.loads(self._ws.recv())
|
||||
|
||||
def wait_for_message(self, execution_id):
|
||||
def wait_for_message(self, execution_id, timeout=None):
|
||||
"""Wait for a message for a mistral execution ID
|
||||
|
||||
This blocks until a message is received on the provided queue name
|
||||
with the execution ID.
|
||||
This method blocks until a message is received on the message queue
|
||||
with the execution ID passed in.
|
||||
|
||||
TODO(d0ugal): Add a timeout/break for the case when a message is
|
||||
never arrives.
|
||||
A timeout can be provided in seconds, if no timeout is provided it
|
||||
will block forever until a message is received. If no message is
|
||||
received (for example, Zaqar is down) then it will block until manually
|
||||
killed.
|
||||
"""
|
||||
|
||||
if timeout is None:
|
||||
LOG.warning("Waiting for messages on queue '{}' with no timeout."
|
||||
.format(self._queue_name))
|
||||
|
||||
self._ws.settimeout(timeout)
|
||||
|
||||
while True:
|
||||
body = self.recv()['body']
|
||||
try:
|
||||
body = self.recv()['body']
|
||||
except websocket.WebSocketTimeoutException:
|
||||
raise exceptions.WebSocketTimeout()
|
||||
if body['payload']['execution']['id'] == execution_id:
|
||||
return body['payload']
|
||||
|
||||
|
|
|
@ -44,7 +44,7 @@ class FakeClientManager(object):
|
|||
|
||||
class FakeWebSocket(object):
|
||||
|
||||
def wait_for_message(self, execution_id):
|
||||
def wait_for_message(self, execution_id, timeout=None):
|
||||
return {
|
||||
'status': 'SUCCESS'
|
||||
}
|
||||
|
|
|
@ -10,6 +10,11 @@
|
|||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
import json
|
||||
import logging
|
||||
|
||||
from tripleoclient import exceptions
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def call_action(workflow_client, action, **input_):
|
||||
|
@ -34,3 +39,45 @@ def start_workflow(workflow_client, identifier, workflow_input):
|
|||
identifier, execution.id))
|
||||
|
||||
return execution
|
||||
|
||||
|
||||
def wait_for_message(mistral, websocket, execution, timeout=None):
|
||||
"""Wait for messages on a websocket.
|
||||
|
||||
Given an instance of mistral client, a websocket and a Mistral execution
|
||||
wait for messages on that websocket queue that match the execution ID until
|
||||
the timeout is reached.
|
||||
|
||||
If no timeout is provided, this method will block forever.
|
||||
|
||||
If a timeout is reached, called check_execution_status which will look up
|
||||
the execution on Mistral and log information about it.
|
||||
"""
|
||||
try:
|
||||
return websocket.wait_for_message(execution.id, timeout=timeout)
|
||||
except exceptions.WebSocketTimeout:
|
||||
check_execution_status(mistral, execution.id)
|
||||
raise
|
||||
|
||||
|
||||
def check_execution_status(workflow_client, execution_id):
|
||||
"""Check the status of a workflow that timeout when waiting for messages
|
||||
|
||||
The status will be logged.
|
||||
"""
|
||||
|
||||
execution = workflow_client.executions.get(execution_id)
|
||||
state = execution.state
|
||||
|
||||
if state == 'RUNNING':
|
||||
message = ("The WebSocket timed out before the Workflow completed.")
|
||||
elif state == 'SUCCESS':
|
||||
message = ("The Workflow finished successfully but no messages were "
|
||||
"received before the WebSocket timed out.")
|
||||
elif state == 'ERROR':
|
||||
message = ("The Workflow errored and no messages were received.")
|
||||
else:
|
||||
message = "Unknown Execution state."
|
||||
|
||||
LOG.error(("Timed out waiting for messages from Execution "
|
||||
"(ID: {}, State: {}). {}").format(execution_id, state, message))
|
||||
|
|
|
@ -42,6 +42,8 @@ def get_overcloud_passwords(clients, **workflow_input):
|
|||
)
|
||||
|
||||
with tripleoclients.messaging_websocket(queue_name) as ws:
|
||||
message = ws.wait_for_message(execution.id)
|
||||
assert message['status'] == "SUCCESS"
|
||||
return message['message']
|
||||
# Getting the passwords is a quick operation, but to allow space for
|
||||
# delays or heavy loads, timeout after 60 seconds.
|
||||
payload = base.wait_for_message(workflow_client, ws, execution, 60)
|
||||
assert payload['status'] == "SUCCESS"
|
||||
return payload['message']
|
||||
|
|
|
@ -19,6 +19,13 @@ from tripleoclient import exceptions
|
|||
from tripleoclient.workflows import base
|
||||
|
||||
|
||||
# Plan management workflows should generally be quick. However, the creation
|
||||
# of the default plan in instack has demonstrated that sometimes it can take
|
||||
# several minutes. This timeout value of 6 minutes is the same as the timeout
|
||||
# used in Instack.
|
||||
_WORKFLOW_TIMEOUT = 360 # 6 * 60 seconds
|
||||
|
||||
|
||||
def _upload_templates(swift_client, container_name, tht_root, roles_file=None):
|
||||
"""tarball up a given directory and upload it to Swift to be extracted"""
|
||||
|
||||
|
@ -47,7 +54,8 @@ def create_default_plan(clients, **workflow_input):
|
|||
)
|
||||
|
||||
with tripleoclients.messaging_websocket(queue_name) as ws:
|
||||
payload = ws.wait_for_message(execution.id)
|
||||
payload = base.wait_for_message(workflow_client, ws, execution,
|
||||
_WORKFLOW_TIMEOUT)
|
||||
|
||||
if payload['status'] == 'SUCCESS':
|
||||
print ("Default plan created")
|
||||
|
@ -67,7 +75,8 @@ def _create_update_deployment_plan(clients, workflow, **workflow_input):
|
|||
)
|
||||
|
||||
with tripleoclients.messaging_websocket(queue_name) as ws:
|
||||
return ws.wait_for_message(execution.id)
|
||||
return base.wait_for_message(workflow_client, ws, execution,
|
||||
_WORKFLOW_TIMEOUT)
|
||||
|
||||
|
||||
def create_deployment_plan(clients, **workflow_input):
|
||||
|
|
Loading…
Reference in New Issue