diff --git a/mistral/api/controllers/v2/execution.py b/mistral/api/controllers/v2/execution.py index 945feb908..784a849f1 100644 --- a/mistral/api/controllers/v2/execution.py +++ b/mistral/api/controllers/v2/execution.py @@ -23,6 +23,7 @@ from wsme import types as wtypes import wsmeext.pecan as wsme_pecan from mistral.api import access_control as acl +from mistral.api.controllers.v2 import execution_report from mistral.api.controllers.v2 import resources from mistral.api.controllers.v2 import task from mistral.api.controllers.v2 import types @@ -82,6 +83,7 @@ def _get_workflow_execution(id, must_exist=True): class ExecutionsController(rest.RestController): tasks = task.ExecutionTasksController() + report = execution_report.ExecutionReportController() @rest_utils.wrap_wsme_controller_exception @wsme_pecan.wsexpose(resources.Execution, wtypes.text) diff --git a/mistral/api/controllers/v2/execution_report.py b/mistral/api/controllers/v2/execution_report.py new file mode 100644 index 000000000..d1750577e --- /dev/null +++ b/mistral/api/controllers/v2/execution_report.py @@ -0,0 +1,164 @@ +# Copyright 2019 - Nokia Networks. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from oslo_log import log as logging +from pecan import rest +import wsmeext.pecan as wsme_pecan + +from mistral.api.controllers.v2 import resources +from mistral.api.controllers.v2 import types +from mistral.db.v2 import api as db_api +from mistral.db.v2.sqlalchemy import models as db_models +from mistral.utils import rest_utils +from mistral.workflow import states + + +LOG = logging.getLogger(__name__) + + +def create_workflow_execution_entry(wf_ex): + return resources.WorkflowExecutionReportEntry.from_db_model(wf_ex) + + +def create_task_execution_entry(task_ex): + return resources.TaskExecutionReportEntry.from_db_model(task_ex) + + +def create_action_execution_entry(action_ex): + return resources.ActionExecutionReportEntry.from_db_model(action_ex) + + +def update_statistics_with_task(stat, task_ex): + if task_ex.state == states.RUNNING: + stat.increment_running() + elif task_ex.state == states.SUCCESS: + stat.increment_success() + elif task_ex.state == states.ERROR: + stat.increment_error() + elif task_ex.state == states.IDLE: + stat.increment_idle() + elif task_ex.state == states.PAUSED: + stat.increment_paused() + + +def analyse_task_execution(task_ex_id, stat, filters, cur_depth): + with db_api.transaction(): + task_ex = db_api.get_task_execution(task_ex_id) + + if filters['errors_only'] and task_ex.state != states.ERROR: + return None + + update_statistics_with_task(stat, task_ex) + + entry = create_task_execution_entry(task_ex) + + child_executions = task_ex.executions + + entry.action_executions = [] + entry.workflow_executions = [] + + for c_ex in child_executions: + if isinstance(c_ex, db_models.ActionExecution): + entry.action_executions.append( + create_action_execution_entry(c_ex) + ) + else: + entry.workflow_executions.append( + analyse_workflow_execution(c_ex.id, stat, filters, cur_depth) + ) + + return entry + + +def analyse_workflow_execution(wf_ex_id, stat, filters, cur_depth): + with db_api.transaction(): + wf_ex = db_api.get_workflow_execution(wf_ex_id) + + entry = create_workflow_execution_entry(wf_ex) + + max_depth = filters['max_depth'] + + # Don't get deeper into the workflow task executions if + # maximum depth is defined and the current depth exceeds it. + if 0 <= max_depth < cur_depth: + return entry + + task_execs = wf_ex.task_executions + + entry.task_executions = [] + + for t_ex in task_execs: + task_exec_entry = analyse_task_execution( + t_ex.id, + stat, + filters, + cur_depth + 1 + ) + + if task_exec_entry: + entry.task_executions.append(task_exec_entry) + + return entry + + +def build_report(wf_ex_id, filters): + report = resources.ExecutionReport() + + stat = resources.ExecutionReportStatistics() + + report.statistics = stat + report.root_workflow_execution = analyse_workflow_execution( + wf_ex_id, + stat, + filters, + 0 + ) + + return report + + +class ExecutionReportController(rest.RestController): + @rest_utils.wrap_wsme_controller_exception + @wsme_pecan.wsexpose(resources.ExecutionReport, types.uuid, bool, int) + def get(self, workflow_execution_id, errors_only=False, max_depth=-1): + """Return workflow execution report. + + :param workflow_execution_id: The ID of the workflow execution to + generate a report for. + :param errors_only: Optional. If True, only error paths of the + execution tree are included into the report. The root execution + (with the specified id) is always included, but its tasks may + or may not be included depending on this flag's value. + :param max_depth: Optional. Limits the depth of recursion while + obtaining the execution tree. That is, subworkflows of what + maximum depth will be included into the report. If a value of the + flag is a negative number then no limit is set. + The root execution has depth 0 so if the flag is 0 then only + the root execution, its tasks and their actions will be included. + If some of the tasks in turn run workflows then these subworkflows + will be also included but without their tasks. The algorithm will + fully analyse their tasks only if max_depth is greater than zero. + """ + + LOG.info( + "Fetch execution report [workflow_execution_id=%s]", + workflow_execution_id + ) + + filters = { + 'errors_only': errors_only, + 'max_depth': max_depth + } + + return build_report(workflow_execution_id, filters) diff --git a/mistral/api/controllers/v2/resources.py b/mistral/api/controllers/v2/resources.py index d19e62a0f..3d6eee051 100644 --- a/mistral/api/controllers/v2/resources.py +++ b/mistral/api/controllers/v2/resources.py @@ -721,3 +721,158 @@ class EventTriggers(resource.ResourceList): "marker=123e4567-e89b-12d3-a456-426655440000") return triggers_sample + + +class BaseExecutionReportEntry(resource.Resource): + """Execution report entry resource.""" + + id = wtypes.text + name = wtypes.text + created_at = wtypes.text + updated_at = wtypes.text + state = wtypes.text + state_info = wtypes.text + + @classmethod + def sample(cls): + # TODO(rakhmerov): complete + + return cls( + id='123e4567-e89b-12d3-a456-426655441414', + created_at='2019-01-30T00:00:00.000000', + updated_at='2019-01-30T00:00:00.000000', + state=states.SUCCESS + ) + + +class ActionExecutionReportEntry(BaseExecutionReportEntry): + """Action execution report entry resource.""" + + accepted = bool + last_heartbeat = wtypes.text + + @classmethod + def sample(cls): + sample = super(ActionExecutionReportEntry, cls).sample() + + sample.accepted = True + sample.last_heartbeat = '2019-01-30T00:00:00.000000' + + return sample + + +class WorkflowExecutionReportEntry(BaseExecutionReportEntry): + """Workflow execution report entry resource.""" + + # NOTE(rakhmerov): task_executions has to be declared below + # after we declare a class for task execution entry resource. + + @classmethod + def sample(cls): + sample = super(WorkflowExecutionReportEntry, cls).sample() + + # We can't define a non-empty list task executions here because + # the needed class is not defined yet. Since this is just a sample + # we can sacrifice it. + sample.task_executions = [] + + return sample + + +class TaskExecutionReportEntry(BaseExecutionReportEntry): + """Task execution report entity resource.""" + + action_executions = [ActionExecutionReportEntry] + workflow_executions = [WorkflowExecutionReportEntry] + + @classmethod + def sample(cls): + sample = super(TaskExecutionReportEntry, cls).sample() + + sample.action_executions = [ActionExecutionReportEntry.sample()] + sample.workflow_executions = [] + + return sample + + +# We have to declare this field later because of the dynamic binding. +# It can't be within WorkflowExecutionReportEntry before +# TaskExecutionReportEntry is declared. +WorkflowExecutionReportEntry.task_executions = [TaskExecutionReportEntry] +wtypes.registry.reregister(WorkflowExecutionReportEntry) + + +class ExecutionReportStatistics(resource.Resource): + """Execution report statistics. + + TODO(rakhmerov): There's much more we can add here. For example, + information about action, average (and also min and max) task execution + run time etc. + """ + + total_tasks_count = wtypes.IntegerType(minimum=0) + running_tasks_count = wtypes.IntegerType(minimum=0) + success_tasks_count = wtypes.IntegerType(minimum=0) + error_tasks_count = wtypes.IntegerType(minimum=0) + idle_tasks_count = wtypes.IntegerType(minimum=0) + paused_tasks_count = wtypes.IntegerType(minimum=0) + + def __init__(self, **kw): + self.total_tasks_count = 0 + self.running_tasks_count = 0 + self.success_tasks_count = 0 + self.error_tasks_count = 0 + self.idle_tasks_count = 0 + self.paused_tasks_count = 0 + + super(ExecutionReportStatistics, self).__init__(**kw) + + def increment_running(self): + self.running_tasks_count += 1 + self.total_tasks_count += 1 + + def increment_success(self): + self.success_tasks_count += 1 + self.total_tasks_count += 1 + + def increment_error(self): + self.error_tasks_count += 1 + self.total_tasks_count += 1 + + def increment_idle(self): + self.idle_tasks_count += 1 + self.total_tasks_count += 1 + + def increment_paused(self): + self.paused_tasks_count += 1 + self.total_tasks_count += 1 + + @classmethod + def sample(cls): + return cls( + total_tasks_count=10, + running_tasks_count=3, + success_tasks_count=5, + error_tasks_count=2, + idle_tasks_count=0, + paused_tasks_count=0 + ) + + +class ExecutionReport(resource.Resource): + """Execution report resource.""" + + statistics = ExecutionReportStatistics + """General statistics about the workflow execution hierarchy.""" + + root_workflow_execution = WorkflowExecutionReportEntry + """Root entry of the report associated with a workflow execution.""" + + @classmethod + def sample(cls): + sample = cls() + + sample.statistics = ExecutionReportStatistics.sample() + sample.root_workflow_execution = WorkflowExecutionReportEntry.sample() + + return sample diff --git a/mistral/api/controllers/v2/task.py b/mistral/api/controllers/v2/task.py index 070b3b62d..5974da629 100644 --- a/mistral/api/controllers/v2/task.py +++ b/mistral/api/controllers/v2/task.py @@ -36,8 +36,15 @@ from mistral.workflow import states LOG = logging.getLogger(__name__) -STATE_TYPES = wtypes.Enum(str, states.IDLE, states.RUNNING, states.SUCCESS, - states.ERROR, states.RUNNING_DELAYED) + +STATE_TYPES = wtypes.Enum( + str, + states.IDLE, + states.RUNNING, + states.SUCCESS, + states.ERROR, + states.RUNNING_DELAYED +) def _get_task_resource_with_result(task_ex): diff --git a/mistral/tests/unit/api/v2/test_execution_report.py b/mistral/tests/unit/api/v2/test_execution_report.py new file mode 100644 index 000000000..295745ac2 --- /dev/null +++ b/mistral/tests/unit/api/v2/test_execution_report.py @@ -0,0 +1,406 @@ +# Copyright 2019 - Nokia Networks, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from mistral.services import workbooks as wb_service +from mistral.services import workflows as wf_service +from mistral.tests.unit.api import base +from mistral.tests.unit.engine import base as engine_base +from mistral.workflow import states + + +class TestExecutionReportController(base.APITest, engine_base.EngineTestCase): + def test_simple_sequence_wf(self): + wf_text = """--- + version: '2.0' + + wf: + tasks: + task1: + action: std.noop + on-success: task2 + + task2: + action: std.fail + """ + + wf_service.create_workflows(wf_text) + + wf_ex = self.engine.start_workflow('wf') + + self.await_workflow_error(wf_ex.id) + + resp = self.app.get('/v2/executions/%s/report' % wf_ex.id) + + self.assertEqual(200, resp.status_int) + + # Now let's verify the response structure + + self.assertIn('root_workflow_execution', resp.json) + + root_wf_ex = resp.json['root_workflow_execution'] + + self.assertIsInstance(root_wf_ex, dict) + self.assertEqual(wf_ex.id, root_wf_ex['id']) + self.assertEqual(wf_ex.name, root_wf_ex['name']) + self.assertEqual(states.ERROR, root_wf_ex['state']) + self.assertGreater(len(root_wf_ex['state_info']), 0) + + tasks = root_wf_ex['task_executions'] + + self.assertIsInstance(tasks, list) + + self.assertEqual(2, len(tasks)) + + # Verify task1 info. + task1 = self._assert_single_item( + tasks, + name='task1', + state=states.SUCCESS + ) + + self.assertEqual(0, len(task1['workflow_executions'])) + self.assertEqual(1, len(task1['action_executions'])) + + task1_action = task1['action_executions'][0] + + self.assertEqual(states.SUCCESS, task1_action['state']) + self.assertEqual('std.noop', task1_action['name']) + + # Verify task2 info. + task2 = self._assert_single_item( + tasks, + name='task2', + state=states.ERROR + ) + + self.assertEqual(1, len(task2['action_executions'])) + + task2_action = task2['action_executions'][0] + + self.assertEqual(0, len(task2['workflow_executions'])) + self.assertEqual(states.ERROR, task2_action['state']) + + # Verify statistics. + stat = resp.json['statistics'] + + self.assertEqual(1, stat['error_tasks_count']) + self.assertEqual(0, stat['idle_tasks_count']) + self.assertEqual(0, stat['paused_tasks_count']) + self.assertEqual(0, stat['running_tasks_count']) + self.assertEqual(1, stat['success_tasks_count']) + self.assertEqual(2, stat['total_tasks_count']) + + def test_nested_wf(self): + wb_text = """--- + version: '2.0' + + name: wb + + workflows: + parent_wf: + tasks: + task1: + action: std.noop + on-success: task2 + + task2: + workflow: sub_wf + on-success: task3 + + task3: + action: std.fail + + sub_wf: + tasks: + task1: + action: std.noop + on-success: task2 + + task2: + action: std.fail + """ + + wb_service.create_workbook_v2(wb_text) + + wf_ex = self.engine.start_workflow('wb.parent_wf') + + self.await_workflow_error(wf_ex.id) + + resp = self.app.get('/v2/executions/%s/report' % wf_ex.id) + + self.assertEqual(200, resp.status_int) + + # Now let's verify the response structure + + self.assertIn('root_workflow_execution', resp.json) + + root_wf_ex = resp.json['root_workflow_execution'] + + self.assertIsInstance(root_wf_ex, dict) + self.assertEqual('wb.parent_wf', root_wf_ex['name']) + self.assertEqual(states.ERROR, root_wf_ex['state']) + self.assertGreater(len(root_wf_ex['state_info']), 0) + + tasks = root_wf_ex['task_executions'] + + self.assertIsInstance(tasks, list) + + self.assertEqual(2, len(tasks)) + + # Verify task1 info. + task1 = self._assert_single_item(tasks, name='task1') + + self.assertEqual(states.SUCCESS, task1['state']) + self.assertEqual(0, len(task1['workflow_executions'])) + self.assertEqual(1, len(task1['action_executions'])) + + task1_action = task1['action_executions'][0] + self.assertEqual(states.SUCCESS, task1_action['state']) + self.assertEqual('std.noop', task1_action['name']) + + # Verify task2 info. + task2 = self._assert_single_item(tasks, name='task2') + + self.assertEqual(states.ERROR, task2['state']) + self.assertEqual(0, len(task2['action_executions'])) + self.assertEqual(1, len(task2['workflow_executions'])) + + sub_wf_entry = task2['workflow_executions'][0] + self.assertEqual(states.ERROR, sub_wf_entry['state']) + + sub_wf_tasks = sub_wf_entry['task_executions'] + + self.assertEqual(2, len(sub_wf_tasks)) + + sub_wf_task1 = self._assert_single_item( + sub_wf_tasks, + name='task1', + state=states.SUCCESS + ) + sub_wf_task2 = self._assert_single_item( + sub_wf_tasks, + name='task2', + state=states.ERROR + ) + + self.assertEqual(1, len(sub_wf_task1['action_executions'])) + self.assertEqual( + states.SUCCESS, + sub_wf_task1['action_executions'][0]['state'] + ) + + self.assertEqual(1, len(sub_wf_task2['action_executions'])) + self.assertEqual( + states.ERROR, + sub_wf_task2['action_executions'][0]['state'] + ) + + # Verify statistics. + stat = resp.json['statistics'] + + self.assertEqual(2, stat['error_tasks_count']) + self.assertEqual(0, stat['idle_tasks_count']) + self.assertEqual(0, stat['paused_tasks_count']) + self.assertEqual(0, stat['running_tasks_count']) + self.assertEqual(2, stat['success_tasks_count']) + self.assertEqual(4, stat['total_tasks_count']) + + def test_nested_wf_errors_only(self): + wb_text = """--- + version: '2.0' + + name: wb + + workflows: + parent_wf: + tasks: + task1: + action: std.noop + on-success: task2 + + task2: + workflow: sub_wf + on-success: task3 + + task3: + action: std.fail + + sub_wf: + tasks: + task1: + action: std.noop + on-success: task2 + + task2: + action: std.fail + """ + + wb_service.create_workbook_v2(wb_text) + + wf_ex = self.engine.start_workflow('wb.parent_wf') + + self.await_workflow_error(wf_ex.id) + + resp = self.app.get( + '/v2/executions/%s/report?errors_only=true' % wf_ex.id + ) + + self.assertEqual(200, resp.status_int) + + # Now let's verify the response structure + + self.assertIn('root_workflow_execution', resp.json) + + root_wf_ex = resp.json['root_workflow_execution'] + + self.assertIsInstance(root_wf_ex, dict) + self.assertEqual('wb.parent_wf', root_wf_ex['name']) + self.assertEqual(states.ERROR, root_wf_ex['state']) + self.assertGreater(len(root_wf_ex['state_info']), 0) + + tasks = root_wf_ex['task_executions'] + + self.assertIsInstance(tasks, list) + + self.assertEqual(1, len(tasks)) + + # There must be only task2 in the response. + # Verify task2 info. + task2 = self._assert_single_item(tasks, name='task2') + + self.assertEqual(states.ERROR, task2['state']) + self.assertEqual(0, len(task2['action_executions'])) + self.assertEqual(1, len(task2['workflow_executions'])) + + sub_wf_entry = task2['workflow_executions'][0] + self.assertEqual(states.ERROR, sub_wf_entry['state']) + + sub_wf_tasks = sub_wf_entry['task_executions'] + + self.assertEqual(1, len(sub_wf_tasks)) + + sub_wf_task2 = self._assert_single_item( + sub_wf_tasks, + name='task2', + state=states.ERROR + ) + + self.assertEqual(1, len(sub_wf_task2['action_executions'])) + self.assertEqual( + states.ERROR, + sub_wf_task2['action_executions'][0]['state'] + ) + + # Verify statistics. + stat = resp.json['statistics'] + + self.assertEqual(2, stat['error_tasks_count']) + self.assertEqual(0, stat['idle_tasks_count']) + self.assertEqual(0, stat['paused_tasks_count']) + self.assertEqual(0, stat['running_tasks_count']) + self.assertEqual(0, stat['success_tasks_count']) + self.assertEqual(2, stat['total_tasks_count']) + + def test_nested_wf_max_depth(self): + wb_text = """--- + version: '2.0' + + name: wb + + workflows: + parent_wf: + tasks: + task1: + action: std.noop + on-success: task2 + + task2: + workflow: sub_wf + on-success: task3 + + task3: + action: std.fail + + sub_wf: + tasks: + task1: + action: std.noop + on-success: task2 + + task2: + action: std.fail + """ + + wb_service.create_workbook_v2(wb_text) + + wf_ex = self.engine.start_workflow('wb.parent_wf') + + self.await_workflow_error(wf_ex.id) + + resp = self.app.get('/v2/executions/%s/report?max_depth=0' % wf_ex.id) + + self.assertEqual(200, resp.status_int) + + # Now let's verify the response structure + + self.assertIn('root_workflow_execution', resp.json) + + root_wf_ex = resp.json['root_workflow_execution'] + + self.assertIsInstance(root_wf_ex, dict) + self.assertEqual('wb.parent_wf', root_wf_ex['name']) + self.assertEqual(states.ERROR, root_wf_ex['state']) + self.assertGreater(len(root_wf_ex['state_info']), 0) + + tasks = root_wf_ex['task_executions'] + + self.assertIsInstance(tasks, list) + + self.assertEqual(2, len(tasks)) + + # Verify task1 info. + task1 = self._assert_single_item(tasks, name='task1') + + self.assertEqual(states.SUCCESS, task1['state']) + self.assertEqual(0, len(task1['workflow_executions'])) + self.assertEqual(1, len(task1['action_executions'])) + + task1_action = task1['action_executions'][0] + self.assertEqual(states.SUCCESS, task1_action['state']) + self.assertEqual('std.noop', task1_action['name']) + + # Verify task2 info. + task2 = self._assert_single_item(tasks, name='task2') + + self.assertEqual(states.ERROR, task2['state']) + self.assertEqual(0, len(task2['action_executions'])) + self.assertEqual(1, len(task2['workflow_executions'])) + + sub_wf_entry = task2['workflow_executions'][0] + self.assertEqual(states.ERROR, sub_wf_entry['state']) + + # We still must have an entry for the subworkflow itself + # but it must not have info about task executions because + # we've now limited max depth. + self.assertNotIn('task_executions', sub_wf_entry) + + # Verify statistics. + stat = resp.json['statistics'] + + self.assertEqual(1, stat['error_tasks_count']) + self.assertEqual(0, stat['idle_tasks_count']) + self.assertEqual(0, stat['paused_tasks_count']) + self.assertEqual(0, stat['running_tasks_count']) + self.assertEqual(1, stat['success_tasks_count']) + self.assertEqual(2, stat['total_tasks_count'])