Merge "Refactor and improve 'with-items' algorithms"
This commit is contained in:
commit
c6dcfe92c0
@ -14,7 +14,6 @@
|
||||
# limitations under the License.
|
||||
|
||||
import abc
|
||||
import operator
|
||||
from oslo_log import log as logging
|
||||
from osprofiler import profiler
|
||||
import six
|
||||
@ -409,11 +408,6 @@ class RegularTask(Task):
|
||||
return actions.PythonAction(action_def, task_ex=self.task_ex)
|
||||
|
||||
|
||||
# TODO(rakhmerov): Concurrency support is currently dropped since it doesn't
|
||||
# fit into non-locking transactional model. It needs to be restored later on.
|
||||
# A possible solution should be able to read and write a number of currently
|
||||
# running actions atomically which is now impossible w/o locks with JSON
|
||||
# field "runtime_context".
|
||||
class WithItemsTask(RegularTask):
|
||||
"""With-items task.
|
||||
|
||||
@ -450,14 +444,27 @@ class WithItemsTask(RegularTask):
|
||||
self._schedule_actions()
|
||||
|
||||
def _schedule_actions(self):
|
||||
input_dicts = self._get_with_items_input()
|
||||
with_items_values = self._get_with_items_values()
|
||||
|
||||
if with_items.is_new(self.task_ex):
|
||||
with_items.validate_values(with_items_values)
|
||||
|
||||
action_count = len(six.next(iter(with_items_values.values())))
|
||||
|
||||
with_items.prepare_runtime_context(
|
||||
self.task_ex,
|
||||
self.task_spec,
|
||||
action_count
|
||||
)
|
||||
|
||||
input_dicts = self._get_input_dicts(with_items_values)
|
||||
|
||||
if not input_dicts:
|
||||
self.complete(states.SUCCESS)
|
||||
|
||||
return
|
||||
|
||||
for idx, input_dict in input_dicts:
|
||||
for i, input_dict in input_dicts:
|
||||
target = self._get_target(input_dict)
|
||||
|
||||
action = self._build_action()
|
||||
@ -467,33 +474,30 @@ class WithItemsTask(RegularTask):
|
||||
action.schedule(
|
||||
input_dict,
|
||||
target,
|
||||
index=idx,
|
||||
index=i,
|
||||
safe_rerun=self.task_spec.get_safe_rerun()
|
||||
)
|
||||
|
||||
def _get_with_items_input(self):
|
||||
"""Calculate input array for separating each action input.
|
||||
with_items.decrease_capacity(self.task_ex, 1)
|
||||
|
||||
def _get_with_items_values(self):
|
||||
"""Returns all values evaluated from 'with-items' expression.
|
||||
|
||||
Example:
|
||||
DSL:
|
||||
with_items:
|
||||
- itemX in <% $.arrayI %>
|
||||
- itemY in <% $.arrayJ %>
|
||||
with-items:
|
||||
- var1 in <% $.arrayI %>
|
||||
- var2 in <% $.arrayJ %>
|
||||
|
||||
Assume arrayI = [1, 2], arrayJ = ['a', 'b'].
|
||||
with_items_input = {
|
||||
"itemX": [1, 2],
|
||||
"itemY": ['a', 'b']
|
||||
}
|
||||
where arrayI = [1,2,3] and arrayJ = [a,b,c]
|
||||
|
||||
Then we get separated input:
|
||||
inputs_per_item = [
|
||||
{'itemX': 1, 'itemY': 'a'},
|
||||
{'itemX': 2, 'itemY': 'b'}
|
||||
]
|
||||
The result of the method in this case will be:
|
||||
{
|
||||
'var1': [1,2,3],
|
||||
'var2': [a,b,c]
|
||||
}
|
||||
|
||||
:return: the list of tuples containing indexes
|
||||
and the corresponding input dict.
|
||||
:return: Evaluated 'with-items' expression values.
|
||||
"""
|
||||
ctx_view = data_flow.ContextView(
|
||||
self.ctx,
|
||||
@ -501,48 +505,27 @@ class WithItemsTask(RegularTask):
|
||||
self.wf_ex.input
|
||||
)
|
||||
|
||||
with_items_inputs = expr.evaluate_recursively(
|
||||
return expr.evaluate_recursively(
|
||||
self.task_spec.get_with_items(),
|
||||
ctx_view
|
||||
)
|
||||
|
||||
with_items.validate_input(with_items_inputs)
|
||||
def _get_input_dicts(self, with_items_values):
|
||||
"""Calculate input dictionaries for another portion of actions.
|
||||
|
||||
inputs_per_item = []
|
||||
:return: a list of tuples containing indexes and
|
||||
corresponding input dicts.
|
||||
"""
|
||||
result = []
|
||||
|
||||
for key, value in with_items_inputs.items():
|
||||
for index, item in enumerate(value):
|
||||
iter_context = {key: item}
|
||||
for i in with_items.get_next_indices(self.task_ex):
|
||||
ctx = {}
|
||||
|
||||
if index >= len(inputs_per_item):
|
||||
inputs_per_item.append(iter_context)
|
||||
else:
|
||||
inputs_per_item[index].update(iter_context)
|
||||
for k, v in with_items_values.items():
|
||||
ctx.update({k: v[i]})
|
||||
|
||||
action_inputs = []
|
||||
ctx = utils.merge_dicts(ctx, self.ctx)
|
||||
|
||||
for item_input in inputs_per_item:
|
||||
new_ctx = utils.merge_dicts(item_input, self.ctx)
|
||||
result.append((i, self._get_action_input(ctx)))
|
||||
|
||||
action_inputs.append(self._get_action_input(new_ctx))
|
||||
|
||||
with_items.prepare_runtime_context(
|
||||
self.task_ex,
|
||||
self.task_spec,
|
||||
action_inputs
|
||||
)
|
||||
|
||||
indices = with_items.get_indices_for_loop(self.task_ex)
|
||||
|
||||
with_items.decrease_capacity(self.task_ex, len(indices))
|
||||
|
||||
if indices:
|
||||
current_inputs = operator.itemgetter(*indices)(action_inputs)
|
||||
|
||||
return zip(
|
||||
indices,
|
||||
current_inputs if isinstance(current_inputs, tuple)
|
||||
else [current_inputs]
|
||||
)
|
||||
|
||||
return []
|
||||
return result
|
||||
|
@ -605,14 +605,11 @@ class WithItemsEngineTest(base.EngineTestCase):
|
||||
|
||||
self.assertIn(task1_ex.published['result'], ['Guy'])
|
||||
|
||||
@testtools.skip('Restore concurrency support.')
|
||||
def test_with_items_concurrency_1(self):
|
||||
wf_with_concurrency_1 = """---
|
||||
version: "2.0"
|
||||
|
||||
concurrency_test:
|
||||
type: direct
|
||||
|
||||
wf:
|
||||
input:
|
||||
- names: ["John", "Ivan", "Mistral"]
|
||||
|
||||
@ -626,12 +623,12 @@ class WithItemsEngineTest(base.EngineTestCase):
|
||||
wf_service.create_workflows(wf_with_concurrency_1)
|
||||
|
||||
# Start workflow.
|
||||
wf_ex = self.engine.start_workflow('concurrency_test', {})
|
||||
wf_ex = self.engine.start_workflow('wf', {})
|
||||
|
||||
with db_api.transaction():
|
||||
wf_ex = db_api.get_workflow_execution(wf_ex.id)
|
||||
|
||||
# Also initialize a lazy collections.
|
||||
# Also initialize lazy collections.
|
||||
task_ex = wf_ex.task_executions[0]
|
||||
|
||||
self._assert_capacity(0, task_ex)
|
||||
@ -643,6 +640,10 @@ class WithItemsEngineTest(base.EngineTestCase):
|
||||
wf_utils.Result("John")
|
||||
)
|
||||
|
||||
# Wait till the delayed on_action_complete is processed.
|
||||
# 1 is always there to periodically check WF completion.
|
||||
self._await(lambda: len(db_api.get_delayed_calls()) == 1)
|
||||
|
||||
with db_api.transaction():
|
||||
task_ex = db_api.get_task_execution(task_ex.id)
|
||||
|
||||
@ -655,6 +656,8 @@ class WithItemsEngineTest(base.EngineTestCase):
|
||||
wf_utils.Result("Ivan")
|
||||
)
|
||||
|
||||
self._await(lambda: len(db_api.get_delayed_calls()) == 1)
|
||||
|
||||
with db_api.transaction():
|
||||
task_ex = db_api.get_task_execution(task_ex.id)
|
||||
|
||||
@ -667,6 +670,8 @@ class WithItemsEngineTest(base.EngineTestCase):
|
||||
wf_utils.Result("Mistral")
|
||||
)
|
||||
|
||||
self._await(lambda: len(db_api.get_delayed_calls()) == 1)
|
||||
|
||||
task_ex = db_api.get_task_execution(task_ex.id)
|
||||
|
||||
self._assert_capacity(1, task_ex)
|
||||
@ -761,7 +766,6 @@ class WithItemsEngineTest(base.EngineTestCase):
|
||||
)
|
||||
self.assertEqual(states.ERROR, wf_ex.state)
|
||||
|
||||
@testtools.skip('Restore concurrency support.')
|
||||
def test_with_items_concurrency_2(self):
|
||||
wf_with_concurrency_2 = """---
|
||||
version: "2.0"
|
||||
@ -800,6 +804,10 @@ class WithItemsEngineTest(base.EngineTestCase):
|
||||
wf_utils.Result("John")
|
||||
)
|
||||
|
||||
# Wait till the delayed on_action_complete is processed.
|
||||
# 1 is always there to periodically check WF completion.
|
||||
self._await(lambda: len(db_api.get_delayed_calls()) == 1)
|
||||
|
||||
with db_api.transaction():
|
||||
task_ex = db_api.get_task_execution(task_ex.id)
|
||||
|
||||
@ -814,6 +822,8 @@ class WithItemsEngineTest(base.EngineTestCase):
|
||||
wf_utils.Result("Ivan")
|
||||
)
|
||||
|
||||
self._await(lambda: len(db_api.get_delayed_calls()) == 1)
|
||||
|
||||
with db_api.transaction():
|
||||
task_ex = db_api.get_task_execution(task_ex.id)
|
||||
|
||||
@ -828,17 +838,23 @@ class WithItemsEngineTest(base.EngineTestCase):
|
||||
wf_utils.Result("Mistral")
|
||||
)
|
||||
|
||||
self._await(lambda: len(db_api.get_delayed_calls()) == 1)
|
||||
|
||||
with db_api.transaction():
|
||||
task_ex = db_api.get_task_execution(task_ex.id)
|
||||
|
||||
self._assert_capacity(1, task_ex)
|
||||
|
||||
incomplete_action = self._get_incomplete_action(task_ex)
|
||||
|
||||
# 4th iteration complete.
|
||||
self.engine.on_action_complete(
|
||||
self._get_incomplete_action(task_ex).id,
|
||||
incomplete_action.id,
|
||||
wf_utils.Result("Hello")
|
||||
)
|
||||
|
||||
self._await(lambda: len(db_api.get_delayed_calls()) == 1)
|
||||
|
||||
task_ex = db_api.get_task_execution(task_ex.id)
|
||||
|
||||
self._assert_capacity(2, task_ex)
|
||||
@ -861,7 +877,6 @@ class WithItemsEngineTest(base.EngineTestCase):
|
||||
|
||||
self.assertEqual(states.SUCCESS, task_ex.state)
|
||||
|
||||
@testtools.skip('Restore concurrency support.')
|
||||
def test_with_items_concurrency_2_fail(self):
|
||||
wf_with_concurrency_2_fail = """---
|
||||
version: "2.0"
|
||||
@ -903,7 +918,6 @@ class WithItemsEngineTest(base.EngineTestCase):
|
||||
|
||||
self.assertEqual('With-items failed', result)
|
||||
|
||||
@testtools.skip('Restore concurrency support.')
|
||||
def test_with_items_concurrency_3(self):
|
||||
wf_with_concurrency_3 = """---
|
||||
version: "2.0"
|
||||
@ -942,26 +956,40 @@ class WithItemsEngineTest(base.EngineTestCase):
|
||||
wf_utils.Result("John")
|
||||
)
|
||||
|
||||
task_ex = db_api.get_task_execution(task_ex.id)
|
||||
# Wait till the delayed on_action_complete is processed.
|
||||
# 1 is always there to periodically check WF completion.
|
||||
self._await(lambda: len(db_api.get_delayed_calls()) == 1)
|
||||
|
||||
self._assert_capacity(1, task_ex)
|
||||
with db_api.transaction():
|
||||
task_ex = db_api.get_task_execution(task_ex.id)
|
||||
|
||||
self._assert_capacity(1, task_ex)
|
||||
|
||||
incomplete_action = self._get_incomplete_action(task_ex)
|
||||
|
||||
# 2nd iteration complete.
|
||||
self.engine.on_action_complete(
|
||||
self._get_incomplete_action(task_ex).id,
|
||||
incomplete_action.id,
|
||||
wf_utils.Result("Ivan")
|
||||
)
|
||||
|
||||
task_ex = db_api.get_task_execution(task_ex.id)
|
||||
self._await(lambda: len(db_api.get_delayed_calls()) == 1)
|
||||
|
||||
self._assert_capacity(2, task_ex)
|
||||
with db_api.transaction():
|
||||
task_ex = db_api.get_task_execution(task_ex.id)
|
||||
|
||||
self._assert_capacity(2, task_ex)
|
||||
|
||||
incomplete_action = self._get_incomplete_action(task_ex)
|
||||
|
||||
# 3rd iteration complete.
|
||||
self.engine.on_action_complete(
|
||||
self._get_incomplete_action(task_ex).id,
|
||||
incomplete_action.id,
|
||||
wf_utils.Result("Mistral")
|
||||
)
|
||||
|
||||
self._await(lambda: len(db_api.get_delayed_calls()) == 1)
|
||||
|
||||
task_ex = db_api.get_task_execution(task_ex.id)
|
||||
|
||||
self._assert_capacity(3, task_ex)
|
||||
@ -1070,12 +1098,12 @@ class WithItemsEngineTest(base.EngineTestCase):
|
||||
self.assertEqual(12, len(task1_executions))
|
||||
self._assert_multiple_items(task1_executions, 3, accepted=True)
|
||||
|
||||
@testtools.skip('Restore concurrency support.')
|
||||
@testtools.skip('Repair with-items concurrency')
|
||||
def test_with_items_retry_policy_concurrency(self):
|
||||
wf_text = """---
|
||||
version: "2.0"
|
||||
|
||||
with_items_retry_concurrency:
|
||||
wf:
|
||||
tasks:
|
||||
task1:
|
||||
with-items: i in [1, 2, 3, 4]
|
||||
@ -1093,7 +1121,7 @@ class WithItemsEngineTest(base.EngineTestCase):
|
||||
wf_service.create_workflows(wf_text)
|
||||
|
||||
# Start workflow.
|
||||
wf_ex = self.engine.start_workflow('with_items_retry_concurrency', {})
|
||||
wf_ex = self.engine.start_workflow('wf', {})
|
||||
|
||||
self.await_workflow_success(wf_ex.id)
|
||||
|
||||
@ -1118,7 +1146,7 @@ class WithItemsEngineTest(base.EngineTestCase):
|
||||
wf_text = """---
|
||||
version: "2.0"
|
||||
|
||||
with_items_env:
|
||||
wf:
|
||||
tasks:
|
||||
task1:
|
||||
with-items: i in [1, 2, 3, 4]
|
||||
@ -1128,11 +1156,7 @@ class WithItemsEngineTest(base.EngineTestCase):
|
||||
wf_service.create_workflows(wf_text)
|
||||
|
||||
# Start workflow.
|
||||
wf_ex = self.engine.start_workflow(
|
||||
'with_items_env',
|
||||
{},
|
||||
env={'name': 'Mistral'}
|
||||
)
|
||||
wf_ex = self.engine.start_workflow('wf', {}, env={'name': 'Mistral'})
|
||||
|
||||
self.await_workflow_success(wf_ex.id)
|
||||
|
||||
@ -1217,7 +1241,6 @@ class WithItemsEngineTest(base.EngineTestCase):
|
||||
self.assertIn(3, result_task2)
|
||||
self.assertIn(4, result_task2)
|
||||
|
||||
@testtools.skip('Restore concurrency support.')
|
||||
def test_with_items_subflow_concurrency_gt_list_length(self):
|
||||
wb_text = """---
|
||||
version: "2.0"
|
||||
|
@ -28,7 +28,7 @@ class WithItemsTest(base.BaseTest):
|
||||
runtime_context={'index': index}
|
||||
)
|
||||
|
||||
def test_get_indices(self):
|
||||
def test_get_next_indices(self):
|
||||
# Task execution for running 6 items with concurrency=3.
|
||||
task_ex = models.TaskExecution(
|
||||
spec={
|
||||
@ -52,9 +52,6 @@ class WithItemsTest(base.BaseTest):
|
||||
]
|
||||
|
||||
# Then call get_indices and expect [2, 3, 4].
|
||||
indices = with_items.get_indices_for_loop(task_ex)
|
||||
indices = with_items.get_next_indices(task_ex)
|
||||
|
||||
# TODO(rakhmerov): Restore concurrency support.
|
||||
# With disabled 'concurrency' support we expect indices 2,3,4,5
|
||||
# because overall count is 6 and two indices were already processed.
|
||||
self.assertListEqual([2, 3, 4, 5], indices)
|
||||
self.assertListEqual([2, 3, 4], indices)
|
||||
|
@ -43,10 +43,18 @@ def get_count(task_ex):
|
||||
return _get_context(task_ex)[_COUNT]
|
||||
|
||||
|
||||
def is_completed(task_ex):
|
||||
find_cancel = lambda x: x.accepted and x.state == states.CANCELLED
|
||||
def get_capacity(task_ex):
|
||||
return _get_context(task_ex)[_CAPACITY]
|
||||
|
||||
if list(filter(find_cancel, task_ex.executions)):
|
||||
|
||||
def get_concurrency(task_ex):
|
||||
return task_ex.runtime_context.get(_CONCURRENCY)
|
||||
|
||||
|
||||
def is_completed(task_ex):
|
||||
find_cancelled = lambda x: x.accepted and x.state == states.CANCELLED
|
||||
|
||||
if list(filter(find_cancelled, task_ex.executions)):
|
||||
return True
|
||||
|
||||
execs = list(filter(lambda t: t.accepted, task_ex.executions))
|
||||
@ -65,15 +73,11 @@ def get_index(task_ex):
|
||||
return len(list(filter(f, task_ex.executions)))
|
||||
|
||||
|
||||
def get_concurrency(task_ex):
|
||||
return task_ex.runtime_context.get(_CONCURRENCY)
|
||||
|
||||
|
||||
def get_final_state(task_ex):
|
||||
find_error = lambda x: x.accepted and x.state == states.ERROR
|
||||
find_cancel = lambda x: x.accepted and x.state == states.CANCELLED
|
||||
find_cancelled = lambda x: x.accepted and x.state == states.CANCELLED
|
||||
|
||||
if list(filter(find_cancel, task_ex.executions)):
|
||||
if list(filter(find_cancelled, task_ex.executions)):
|
||||
return states.CANCELLED
|
||||
elif list(filter(find_error, task_ex.executions)):
|
||||
return states.ERROR
|
||||
@ -110,11 +114,8 @@ def _get_unaccepted_executions(task_ex):
|
||||
)
|
||||
|
||||
|
||||
def get_indices_for_loop(task_ex):
|
||||
# TODO(rakhmerov): For now we assume that capacity is unlimited.
|
||||
# TODO(rakhmerov): We need to re-implement 'concurrency' completely.
|
||||
# capacity = _get_context(task_ex)[_CAPACITY]
|
||||
capacity = get_concurrency(task_ex)
|
||||
def get_next_indices(task_ex):
|
||||
capacity = get_capacity(task_ex)
|
||||
count = get_count(task_ex)
|
||||
|
||||
accepted = _get_with_item_indices(_get_accepted_executions(task_ex))
|
||||
@ -133,49 +134,57 @@ def get_indices_for_loop(task_ex):
|
||||
return indices[:capacity]
|
||||
|
||||
|
||||
def decrease_capacity(task_ex, count):
|
||||
with_items_context = _get_context(task_ex)
|
||||
def increase_capacity(task_ex):
|
||||
ctx = _get_context(task_ex)
|
||||
concurrency = get_concurrency(task_ex)
|
||||
|
||||
if with_items_context[_CAPACITY] is not None:
|
||||
if with_items_context[_CAPACITY] >= count:
|
||||
with_items_context[_CAPACITY] -= count
|
||||
if concurrency and ctx[_CAPACITY] < concurrency:
|
||||
ctx[_CAPACITY] += 1
|
||||
|
||||
task_ex.runtime_context.update({_WITH_ITEMS: ctx})
|
||||
|
||||
|
||||
def decrease_capacity(task_ex, count):
|
||||
ctx = _get_context(task_ex)
|
||||
|
||||
capacity = ctx[_CAPACITY]
|
||||
|
||||
if capacity is not None:
|
||||
if capacity >= count:
|
||||
ctx[_CAPACITY] -= count
|
||||
else:
|
||||
raise exc.WorkflowException(
|
||||
"Impossible to apply current with-items concurrency."
|
||||
raise RuntimeError(
|
||||
"Can't decrease with-items capacity [capacity=%s, count=%s]"
|
||||
% (capacity, count)
|
||||
)
|
||||
|
||||
task_ex.runtime_context.update({_WITH_ITEMS: with_items_context})
|
||||
task_ex.runtime_context.update({_WITH_ITEMS: ctx})
|
||||
|
||||
|
||||
def increase_capacity(task_ex):
|
||||
with_items_context = _get_context(task_ex)
|
||||
max_concurrency = get_concurrency(task_ex)
|
||||
|
||||
if max_concurrency and with_items_context[_CAPACITY] < max_concurrency:
|
||||
with_items_context[_CAPACITY] += 1
|
||||
task_ex.runtime_context.update({_WITH_ITEMS: with_items_context})
|
||||
def is_new(task_ex):
|
||||
return not task_ex.runtime_context.get(_WITH_ITEMS)
|
||||
|
||||
|
||||
def prepare_runtime_context(task_ex, task_spec, input_dicts):
|
||||
runtime_context = task_ex.runtime_context
|
||||
def prepare_runtime_context(task_ex, task_spec, action_count):
|
||||
runtime_ctx = task_ex.runtime_context
|
||||
with_items_spec = task_spec.get_with_items()
|
||||
|
||||
if with_items_spec and not runtime_context.get(_WITH_ITEMS):
|
||||
if with_items_spec and not runtime_ctx.get(_WITH_ITEMS):
|
||||
# Prepare current indexes and parallel limitation.
|
||||
runtime_context[_WITH_ITEMS] = {
|
||||
runtime_ctx[_WITH_ITEMS] = {
|
||||
_CAPACITY: get_concurrency(task_ex),
|
||||
_COUNT: len(input_dicts)
|
||||
_COUNT: action_count
|
||||
}
|
||||
|
||||
|
||||
def validate_input(with_items_input):
|
||||
def validate_values(with_items_values):
|
||||
# Take only mapped values and check them.
|
||||
values = list(with_items_input.values())
|
||||
values = list(with_items_values.values())
|
||||
|
||||
if not all([isinstance(v, list) for v in values]):
|
||||
raise exc.InputException(
|
||||
"Wrong input format for: %s. List type is"
|
||||
" expected for each value." % with_items_input
|
||||
" expected for each value." % with_items_values
|
||||
)
|
||||
|
||||
required_len = len(values[0])
|
||||
@ -183,7 +192,7 @@ def validate_input(with_items_input):
|
||||
if not all(len(v) == required_len for v in values):
|
||||
raise exc.InputException(
|
||||
"Wrong input format for: %s. All arrays must"
|
||||
" have the same length." % with_items_input
|
||||
" have the same length." % with_items_values
|
||||
)
|
||||
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user