Merge "Fix race in execution finishing"
This commit is contained in:
commit
14a3dfd84b
@ -12,6 +12,7 @@
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import json
|
||||
import mock
|
||||
|
||||
from osc_lib.tests import utils
|
||||
@ -89,6 +90,7 @@ class TestBaseWorkflows(utils.TestCommand):
|
||||
}
|
||||
|
||||
mistral = mock.Mock()
|
||||
mistral.executions.get.return_value.state = 'RUNNING'
|
||||
websocket = mock.Mock()
|
||||
websocket.wait_for_messages.return_value = iter([payload_a, payload_b])
|
||||
execution = mock.Mock()
|
||||
@ -139,6 +141,7 @@ class TestBaseWorkflows(utils.TestCommand):
|
||||
}
|
||||
|
||||
mistral = mock.Mock()
|
||||
mistral.executions.get.return_value.state = 'RUNNING'
|
||||
websocket = mock.Mock()
|
||||
websocket.wait_for_messages.return_value = iter([payload_a, payload_b])
|
||||
execution = mock.Mock()
|
||||
@ -172,3 +175,31 @@ class TestBaseWorkflows(utils.TestCommand):
|
||||
|
||||
self.assertRaises(ex.WorkflowActionError,
|
||||
base.call_action, mistral, action)
|
||||
|
||||
def test_wait_for_messages_execution_complete(self):
|
||||
payload_a = {
|
||||
'status': 'RUNNING',
|
||||
'execution_id': 'aaaa',
|
||||
'root_execution_id': 'aaaa'
|
||||
}
|
||||
payload_b = {
|
||||
'status': 'SUCCESS',
|
||||
'execution_id': 'aaaa',
|
||||
'root_execution_id': 'aaaa'
|
||||
}
|
||||
|
||||
mistral = mock.Mock()
|
||||
mistral.executions.get.return_value.state = 'SUCCESS'
|
||||
mistral.executions.get.return_value.output = json.dumps(payload_b)
|
||||
websocket = mock.Mock()
|
||||
websocket.wait_for_messages.return_value = iter([payload_a])
|
||||
execution = mock.Mock()
|
||||
execution.id = 'aaaa'
|
||||
|
||||
messages = list(base.wait_for_messages(mistral, websocket, execution))
|
||||
|
||||
# Assert only payload_b was returned
|
||||
self.assertEqual([payload_a, payload_b], messages)
|
||||
mistral.executions.get.assert_called_with('aaaa')
|
||||
|
||||
websocket.wait_for_messages.assert_called_with(timeout=None)
|
||||
|
@ -91,8 +91,12 @@ def wait_for_messages(mistral, websocket, execution, timeout=None):
|
||||
# default to running and assume it is just an "in progress"
|
||||
# message from the workflow.
|
||||
# Workflows should end with SUCCESS or ERROR statuses.
|
||||
if payload.get('status', 'RUNNING') != "RUNNING" or \
|
||||
mistral.executions.get(execution.id).state != "RUNNING":
|
||||
if payload.get('status', 'RUNNING') != "RUNNING":
|
||||
return
|
||||
execution = mistral.executions.get(execution.id)
|
||||
if execution.state != "RUNNING":
|
||||
# yield the output as the last payload which was missed
|
||||
yield json.loads(execution.output)
|
||||
return
|
||||
except (exceptions.WebSocketTimeout, exceptions.WebSocketConnectionClosed):
|
||||
check_execution_status(mistral, execution.id)
|
||||
|
Loading…
Reference in New Issue
Block a user