python-tripleoclient/tripleoclient/workflows/base.py
James Slagle 026cb4a76e Handle failed actions
The result of syncrhonously called Mistral actions wasn't being checked
to see if the action passed or failed. The result is now checked and if
the action has failed, an exception will be raised.

Change-Id: I95ae8c98fec94cf91f3f209b593f6c1815729fd4
Closes-Bug: #1686811
2017-05-04 14:47:21 -04:00

100 lines
3.5 KiB
Python

# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import json
import logging
from tripleoclient import exceptions
LOG = logging.getLogger(__name__)
def call_action(workflow_client, action, **input_):
"""Trigger a Mistral action and parse the JSON response"""
result = workflow_client.action_executions.create(
action, input_,
save_result=True, run_sync=True)
# Parse the JSON output. Mistral client should do this for us really.
output = json.loads(result.output)['result']
if result.state == 'ERROR':
raise exceptions.WorkflowActionError(action, output)
return output
def start_workflow(workflow_client, identifier, workflow_input):
execution = workflow_client.executions.create(
identifier,
workflow_input=workflow_input
)
print("Started Mistral Workflow {}. Execution ID: {}".format(
identifier, execution.id))
return execution
def wait_for_messages(mistral, websocket, execution, timeout=None):
"""Wait for messages on a websocket.
Given an instance of mistral client, a websocket and a Mistral execution
wait for messages on that websocket queue that match the execution ID until
the timeout is reached.
If no timeout is provided, this method will block forever.
If a timeout is reached, called check_execution_status which will look up
the execution on Mistral and log information about it.
"""
try:
for payload in websocket.wait_for_messages(timeout=timeout):
yield payload
# If the message is from a sub-workflow, we just need to pass it
# on to be displayed. This should never be the last message - so
# continue and wait for the next.
if payload['execution']['id'] != execution.id:
continue
# Check the status of the payload, if we are not given one
# default to running and assume it is just an "in progress"
# message from the workflow.
# Workflows should end with SUCCESS or ERROR statuses.
if payload.get('status', 'RUNNING') != "RUNNING":
raise StopIteration
except exceptions.WebSocketTimeout:
check_execution_status(mistral, execution.id)
raise
def check_execution_status(workflow_client, execution_id):
"""Check the status of a workflow that timeout when waiting for messages
The status will be logged.
"""
execution = workflow_client.executions.get(execution_id)
state = execution.state
if state == 'RUNNING':
message = "The WebSocket timed out before the Workflow completed."
elif state == 'SUCCESS':
message = ("The Workflow finished successfully but no messages were "
"received before the WebSocket timed out.")
elif state == 'ERROR':
message = "The Workflow errored and no messages were received."
else:
message = "Unknown Execution state."
LOG.error(("Timed out waiting for messages from Execution "
"(ID: {}, State: {}). {}").format(execution_id, state, message))