diff --git a/mistral/tests/unit/engine/test_with_items.py b/mistral/tests/unit/engine/test_with_items.py index 6ae29516..58780559 100644 --- a/mistral/tests/unit/engine/test_with_items.py +++ b/mistral/tests/unit/engine/test_with_items.py @@ -858,6 +858,41 @@ class WithItemsEngineTest(base.EngineTestCase): self.assertEqual(states.SUCCESS, task_ex.state) + def test_with_items_concurrency_gt_list_length(self): + workflow_definition = """--- + version: "2.0" + + concurrency_test: + type: direct + + input: + - names: ["John", "Ivan"] + + tasks: + task1: + with-items: name in <% $.names %> + action: std.echo output=<% $.name %> + concurrency: 3 + """ + + wf_service.create_workflows(workflow_definition) + + # Start workflow. + wf_ex = self.engine.start_workflow('concurrency_test', {}) + + self._await( + lambda: self.is_execution_success(wf_ex.id), + ) + + wf_ex = db_api.get_execution(wf_ex.id) + task_ex = self._assert_single_item(wf_ex.task_executions, name='task1') + result = data_flow.get_task_execution_result(task_ex) + + self.assertEqual(states.SUCCESS, task_ex.state) + self.assertTrue(isinstance(result, list)) + self.assertIn('John', result) + self.assertIn('Ivan', result) + def test_with_items_retry_policy(self): workflow = """--- version: "2.0" diff --git a/mistral/workflow/with_items.py b/mistral/workflow/with_items.py index f80749e4..b769b988 100644 --- a/mistral/workflow/with_items.py +++ b/mistral/workflow/with_items.py @@ -98,14 +98,11 @@ def get_indices_for_loop(task_ex): if max(indices) < count - 1: indices += list(six.moves.range(max(indices) + 1, count)) + else: + index = get_index(task_ex) + indices = list(six.moves.range(index, count)) - return indices[:capacity] if capacity else indices - - index = get_index(task_ex) - - number_to_execute = capacity if capacity else count - index - - return list(six.moves.range(index, index + number_to_execute)) + return indices[:capacity] def decrease_capacity(task_ex, count):