Fix with-items concurrency greater than the number of items
If the list provided to with-items has length less than the value provided in concurrency, the engine throws an exception index out of range. Change-Id: Ib9425f00a735c2fa09b685486aff02511094d991
This commit is contained in:
parent
c11ef5c435
commit
e858a26f50
@ -858,6 +858,41 @@ class WithItemsEngineTest(base.EngineTestCase):
|
||||
|
||||
self.assertEqual(states.SUCCESS, task_ex.state)
|
||||
|
||||
def test_with_items_concurrency_gt_list_length(self):
|
||||
workflow_definition = """---
|
||||
version: "2.0"
|
||||
|
||||
concurrency_test:
|
||||
type: direct
|
||||
|
||||
input:
|
||||
- names: ["John", "Ivan"]
|
||||
|
||||
tasks:
|
||||
task1:
|
||||
with-items: name in <% $.names %>
|
||||
action: std.echo output=<% $.name %>
|
||||
concurrency: 3
|
||||
"""
|
||||
|
||||
wf_service.create_workflows(workflow_definition)
|
||||
|
||||
# Start workflow.
|
||||
wf_ex = self.engine.start_workflow('concurrency_test', {})
|
||||
|
||||
self._await(
|
||||
lambda: self.is_execution_success(wf_ex.id),
|
||||
)
|
||||
|
||||
wf_ex = db_api.get_execution(wf_ex.id)
|
||||
task_ex = self._assert_single_item(wf_ex.task_executions, name='task1')
|
||||
result = data_flow.get_task_execution_result(task_ex)
|
||||
|
||||
self.assertEqual(states.SUCCESS, task_ex.state)
|
||||
self.assertTrue(isinstance(result, list))
|
||||
self.assertIn('John', result)
|
||||
self.assertIn('Ivan', result)
|
||||
|
||||
def test_with_items_retry_policy(self):
|
||||
workflow = """---
|
||||
version: "2.0"
|
||||
|
@ -98,14 +98,11 @@ def get_indices_for_loop(task_ex):
|
||||
|
||||
if max(indices) < count - 1:
|
||||
indices += list(six.moves.range(max(indices) + 1, count))
|
||||
else:
|
||||
index = get_index(task_ex)
|
||||
indices = list(six.moves.range(index, count))
|
||||
|
||||
return indices[:capacity] if capacity else indices
|
||||
|
||||
index = get_index(task_ex)
|
||||
|
||||
number_to_execute = capacity if capacity else count - index
|
||||
|
||||
return list(six.moves.range(index, index + number_to_execute))
|
||||
return indices[:capacity]
|
||||
|
||||
|
||||
def decrease_capacity(task_ex, count):
|
||||
|
Loading…
Reference in New Issue
Block a user