Files
python-mistralclient/mistralclient/tests/unit/v2/test_cli_executions.py
Michal Gershenzon 817a237267 Filter workflow executions by creating task execution id
This will enable the mistral UI to go from task execution screen
to the workflow execution screen of the workflow execution created
by the task.

Change-Id: Ia2ac765a26acc8dcd27134f5f467cddef03fea95
Implements: blueprint filter-workflow-executions-by-creating-task-id
2016-12-06 09:41:26 +00:00

290 lines
7.3 KiB
Python

# Copyright 2014 - Mirantis, Inc.
# Copyright 2015 - StackStorm, Inc.
# Copyright 2016 - Brocade Communications Systems, Inc.
#
# All Rights Reserved
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
import mock
import pkg_resources as pkg
import six
import sys
from mistralclient.api.v2 import executions
from mistralclient.commands.v2 import executions as execution_cmd
from mistralclient.tests.unit import base
EXEC = executions.Execution(
mock,
{
'id': '123',
'workflow_id': '123e4567-e89b-12d3-a456-426655440000',
'workflow_name': 'some',
'description': '',
'state': 'RUNNING',
'state_info': None,
'created_at': '1',
'updated_at': '1',
'task_execution_id': None
}
)
SUB_WF_EXEC = executions.Execution(
mock,
{
'id': '456',
'workflow_id': '123e4567-e89b-12d3-a456-426655440000',
'workflow_name': 'some_sub_wf',
'description': '',
'state': 'RUNNING',
'state_info': None,
'created_at': '1',
'updated_at': '1',
'task_execution_id': 'abc'
}
)
EX_RESULT = (
'123',
'123e4567-e89b-12d3-a456-426655440000',
'some',
'',
'<none>',
'RUNNING',
None,
'1',
'1'
)
SUB_WF_EX_RESULT = (
'456',
'123e4567-e89b-12d3-a456-426655440000',
'some_sub_wf',
'',
'abc',
'RUNNING',
None,
'1',
'1'
)
class TestCLIExecutionsV2(base.BaseCommandTest):
stdout = six.moves.StringIO()
stderr = six.moves.StringIO()
def setUp(self):
super(TestCLIExecutionsV2, self).setUp()
# Redirect stdout and stderr so it doesn't pollute the test result.
sys.stdout = self.stdout
sys.stderr = self.stderr
def tearDown(self):
super(TestCLIExecutionsV2, self).tearDown()
# Reset to original stdout and stderr.
sys.stdout = sys.__stdout__
sys.stderr = sys.__stderr__
def test_create_wf_input_string(self):
self.client.executions.create.return_value = EXEC
result = self.call(
execution_cmd.Create,
app_args=['id', '{ "context": true }']
)
self.assertEqual(
EX_RESULT,
result[1]
)
def test_create_wf_input_file(self):
self.client.executions.create.return_value = EXEC
path = pkg.resource_filename(
'mistralclient',
'tests/unit/resources/ctx.json'
)
result = self.call(
execution_cmd.Create,
app_args=['id', path]
)
self.assertEqual(
EX_RESULT,
result[1]
)
def test_create_with_description(self):
self.client.executions.create.return_value = EXEC
result = self.call(
execution_cmd.Create,
app_args=['id', '{ "context": true }', '-d', '']
)
self.assertEqual(
EX_RESULT,
result[1]
)
def test_update_state(self):
states = ['RUNNING', 'SUCCESS', 'PAUSED', 'ERROR', 'CANCELLED']
for state in states:
self.client.executions.update.return_value = executions.Execution(
mock,
{
'id': '123',
'workflow_id': '123e4567-e89b-12d3-a456-426655440000',
'workflow_name': 'some',
'description': '',
'state': state,
'state_info': None,
'created_at': '1',
'updated_at': '1',
'task_execution_id': None
}
)
ex_result = list(EX_RESULT)
ex_result[5] = state
ex_result = tuple(ex_result)
result = self.call(
execution_cmd.Update,
app_args=['id', '-s', state]
)
self.assertEqual(
ex_result,
result[1]
)
def test_update_invalid_state(self):
states = ['IDLE', 'WAITING', 'DELAYED']
for state in states:
self.assertRaises(
SystemExit,
self.call,
execution_cmd.Update,
app_args=['id', '-s', state]
)
def test_resume_update_env(self):
self.client.executions.update.return_value = EXEC
result = self.call(
execution_cmd.Update,
app_args=['id', '-s', 'RUNNING', '--env', '{"k1": "foobar"}']
)
self.assertEqual(
EX_RESULT,
result[1]
)
def test_update_description(self):
self.client.executions.update.return_value = EXEC
result = self.call(
execution_cmd.Update,
app_args=['id', '-d', 'foobar']
)
self.assertEqual(
EX_RESULT,
result[1]
)
def test_list(self):
self.client.executions.list.return_value = [EXEC, SUB_WF_EXEC]
result = self.call(execution_cmd.List)
self.assertEqual(
[EX_RESULT, SUB_WF_EX_RESULT],
result[1]
)
def test_list_with_pagination(self):
self.client.executions.list.return_value = [EXEC]
self.call(execution_cmd.List)
self.client.executions.list.assert_called_once_with(
limit=None,
marker='',
sort_dirs='asc',
sort_keys='created_at',
task=None
)
self.call(
execution_cmd.List,
app_args=[
'--limit', '5',
'--sort_dirs', 'id, Workflow',
'--sort_keys', 'desc',
'--marker', 'abc'
]
)
self.client.executions.list.assert_called_with(
limit=5,
marker='abc',
sort_dirs='id, Workflow',
sort_keys='desc',
task=None
)
def test_get(self):
self.client.executions.get.return_value = EXEC
result = self.call(execution_cmd.Get, app_args=['id'])
self.assertEqual(
EX_RESULT,
result[1]
)
def test_get_sub_wf_ex(self):
self.client.executions.get.return_value = SUB_WF_EXEC
result = self.call(execution_cmd.Get, app_args=['id'])
self.assertEqual(
SUB_WF_EX_RESULT,
result[1]
)
def test_delete(self):
self.call(execution_cmd.Delete, app_args=['id'])
self.client.executions.delete.assert_called_once_with('id')
def test_delete_with_multi_names(self):
self.call(execution_cmd.Delete, app_args=['id1', 'id2'])
self.assertEqual(2, self.client.executions.delete.call_count)
self.assertEqual(
[mock.call('id1'), mock.call('id2')],
self.client.executions.delete.call_args_list
)