Get rid of with_items.py module in favor of WithItemsTask class

Change-Id: I05ea770ebec98e50313c9c3fcc2f6732bcf8acae
This commit is contained in:
Renat Akhmerov 2016-12-14 14:27:45 +07:00
parent 73ee6ce915
commit bfe9122818
6 changed files with 216 additions and 255 deletions

View File

@ -14,6 +14,7 @@
# limitations under the License. # limitations under the License.
import abc import abc
import copy
from oslo_log import log as logging from oslo_log import log as logging
from osprofiler import profiler from osprofiler import profiler
import six import six
@ -29,7 +30,6 @@ from mistral.utils import wf_trace
from mistral.workflow import base as wf_base from mistral.workflow import base as wf_base
from mistral.workflow import data_flow from mistral.workflow import data_flow
from mistral.workflow import states from mistral.workflow import states
from mistral.workflow import with_items
LOG = logging.getLogger(__name__) LOG = logging.getLogger(__name__)
@ -414,6 +414,17 @@ class WithItemsTask(RegularTask):
Takes care of processing "with-items" tasks. Takes care of processing "with-items" tasks.
""" """
_CAPACITY = 'capacity'
_CONCURRENCY = 'concurrency'
_COUNT = 'count'
_WITH_ITEMS = 'with_items'
_DEFAULT_WITH_ITEMS = {
_COUNT: 0,
_CONCURRENCY: 0,
_CAPACITY: 0
}
@profiler.trace('with-items-task-on-action-complete') @profiler.trace('with-items-task-on-action-complete')
def on_action_complete(self, action_ex): def on_action_complete(self, action_ex):
assert self.task_ex assert self.task_ex
@ -430,32 +441,27 @@ class WithItemsTask(RegularTask):
states.CANCELLED: 'One or more action executions was cancelled.' states.CANCELLED: 'One or more action executions was cancelled.'
} }
with_items.increase_capacity(self.task_ex) self._increase_capacity()
if with_items.is_completed(self.task_ex): if self.is_completed():
state = with_items.get_final_state(self.task_ex) state = self._get_final_state()
self.complete(state, state_info[state]) self.complete(state, state_info[state])
return return
if (with_items.has_more_iterations(self.task_ex) if self._has_more_iterations() and self._get_concurrency():
and with_items.get_concurrency(self.task_ex)):
self._schedule_actions() self._schedule_actions()
def _schedule_actions(self): def _schedule_actions(self):
with_items_values = self._get_with_items_values() with_items_values = self._get_with_items_values()
if with_items.is_new(self.task_ex): if self._is_new():
with_items.validate_values(with_items_values) self._validate_values(with_items_values)
action_count = len(six.next(iter(with_items_values.values()))) action_count = len(six.next(iter(with_items_values.values())))
with_items.prepare_runtime_context( self._prepare_runtime_context(action_count)
self.task_ex,
self.task_spec,
action_count
)
input_dicts = self._get_input_dicts(with_items_values) input_dicts = self._get_input_dicts(with_items_values)
@ -478,7 +484,7 @@ class WithItemsTask(RegularTask):
safe_rerun=self.task_spec.get_safe_rerun() safe_rerun=self.task_spec.get_safe_rerun()
) )
with_items.decrease_capacity(self.task_ex, 1) self._decrease_capacity(1)
def _get_with_items_values(self): def _get_with_items_values(self):
"""Returns all values evaluated from 'with-items' expression. """Returns all values evaluated from 'with-items' expression.
@ -510,6 +516,24 @@ class WithItemsTask(RegularTask):
ctx_view ctx_view
) )
def _validate_values(self, with_items_values):
# Take only mapped values and check them.
values = list(with_items_values.values())
if not all([isinstance(v, list) for v in values]):
raise exc.InputException(
"Wrong input format for: %s. List type is"
" expected for each value." % with_items_values
)
required_len = len(values[0])
if not all(len(v) == required_len for v in values):
raise exc.InputException(
"Wrong input format for: %s. All arrays must"
" have the same length." % with_items_values
)
def _get_input_dicts(self, with_items_values): def _get_input_dicts(self, with_items_values):
"""Calculate input dictionaries for another portion of actions. """Calculate input dictionaries for another portion of actions.
@ -518,7 +542,7 @@ class WithItemsTask(RegularTask):
""" """
result = [] result = []
for i in with_items.get_next_indices(self.task_ex): for i in self._get_next_indexes():
ctx = {} ctx = {}
for k, v in with_items_values.items(): for k, v in with_items_values.items():
@ -529,3 +553,139 @@ class WithItemsTask(RegularTask):
result.append((i, self._get_action_input(ctx))) result.append((i, self._get_action_input(ctx)))
return result return result
def _get_with_items_context(self):
return self.task_ex.runtime_context.get(
self._WITH_ITEMS,
self._DEFAULT_WITH_ITEMS
)
def _get_with_items_count(self):
return self._get_with_items_context()[self._COUNT]
def _get_with_items_capacity(self):
return self._get_with_items_context()[self._CAPACITY]
def _get_concurrency(self):
return self.task_ex.runtime_context.get(self._CONCURRENCY)
def is_completed(self):
find_cancelled = lambda x: x.accepted and x.state == states.CANCELLED
if list(filter(find_cancelled, self.task_ex.executions)):
return True
execs = list(filter(lambda t: t.accepted, self.task_ex.executions))
count = self._get_with_items_count() or 1
return count == len(execs)
def _get_final_state(self):
find_error = lambda x: x.accepted and x.state == states.ERROR
find_cancelled = lambda x: x.accepted and x.state == states.CANCELLED
if list(filter(find_cancelled, self.task_ex.executions)):
return states.CANCELLED
elif list(filter(find_error, self.task_ex.executions)):
return states.ERROR
else:
return states.SUCCESS
def _get_accepted_executions(self):
# Choose only if not accepted but completed.
return list(
filter(
lambda x: x.accepted and states.is_completed(x.state),
self.task_ex.executions
)
)
def _get_unaccepted_executions(self):
# Choose only if not accepted but completed.
return list(
filter(
lambda x: not x.accepted and states.is_completed(x.state),
self.task_ex.executions
)
)
def _get_next_start_index(self):
f = lambda x: (
x.accepted or
states.is_running(x.state) or
states.is_idle(x.state)
)
return len(list(filter(f, self.task_ex.executions)))
def _get_next_indexes(self):
capacity = self._get_with_items_capacity()
count = self._get_with_items_count()
def _get_indexes(exs):
return sorted(set([ex.runtime_context['index'] for ex in exs]))
accepted = _get_indexes(self._get_accepted_executions())
unaccepted = _get_indexes(self._get_unaccepted_executions())
candidates = sorted(list(set(unaccepted) - set(accepted)))
if candidates:
indices = copy.copy(candidates)
if max(candidates) < count - 1:
indices += list(six.moves.range(max(candidates) + 1, count))
else:
i = self._get_next_start_index()
indices = list(six.moves.range(i, count))
return indices[:capacity]
def _increase_capacity(self):
ctx = self._get_with_items_context()
concurrency = self._get_concurrency()
if concurrency and ctx[self._CAPACITY] < concurrency:
ctx[self._CAPACITY] += 1
self.task_ex.runtime_context.update({self._WITH_ITEMS: ctx})
def _decrease_capacity(self, count):
ctx = self._get_with_items_context()
capacity = ctx[self._CAPACITY]
if capacity is not None:
if capacity >= count:
ctx[self._CAPACITY] -= count
else:
raise RuntimeError(
"Can't decrease with-items capacity"
" [capacity=%s, count=%s]" % (capacity, count)
)
self.task_ex.runtime_context.update({self._WITH_ITEMS: ctx})
def _is_new(self):
return not self.task_ex.runtime_context.get(self._WITH_ITEMS)
def _prepare_runtime_context(self, action_count):
runtime_ctx = self.task_ex.runtime_context
with_items_spec = self.task_spec.get_with_items()
if with_items_spec and not runtime_ctx.get(self._WITH_ITEMS):
# Prepare current indexes and parallel limitation.
runtime_ctx[self._WITH_ITEMS] = {
self._CAPACITY: self._get_concurrency(),
self._COUNT: action_count
}
def _has_more_iterations(self):
# See action executions which have been already
# accepted or are still running.
action_exs = list(filter(
lambda x: x.accepted or x.state == states.RUNNING,
self.task_ex.executions
))
return self._get_with_items_count() > len(action_exs)

View File

@ -687,7 +687,7 @@ class DataFlowTest(test_base.BaseTest):
'action': 'my_action' 'action': 'my_action'
}, },
runtime_context={ runtime_context={
'with_items_context': {'count': 1} 'with_items': {'count': 1}
} }
) )

View File

@ -40,10 +40,10 @@ WB = """
--- ---
version: "2.0" version: "2.0"
name: wb1 name: wb
workflows: workflows:
with_items: wf:
type: direct type: direct
input: input:
@ -62,10 +62,10 @@ WB_WITH_STATIC_VAR = """
--- ---
version: "2.0" version: "2.0"
name: wb1 name: wb
workflows: workflows:
with_items: wf:
type: direct type: direct
input: input:
@ -85,10 +85,10 @@ WB_MULTI_ARRAY = """
--- ---
version: "2.0" version: "2.0"
name: wb1 name: wb
workflows: workflows:
with_items: wf:
type: direct type: direct
input: input:
@ -111,10 +111,10 @@ WB_ACTION_CONTEXT = """
--- ---
version: "2.0" version: "2.0"
name: wb1 name: wb
workflows: workflows:
wf1_with_items: wf:
type: direct type: direct
input: input:
@ -170,7 +170,7 @@ class WithItemsEngineTest(base.EngineTestCase):
def _assert_capacity(self, capacity, task_ex): def _assert_capacity(self, capacity, task_ex):
self.assertEqual( self.assertEqual(
capacity, capacity,
task_ex.runtime_context['with_items_context']['capacity'] task_ex.runtime_context['with_items']['capacity']
) )
@staticmethod @staticmethod
@ -187,7 +187,7 @@ class WithItemsEngineTest(base.EngineTestCase):
wb_service.create_workbook_v2(WB) wb_service.create_workbook_v2(WB)
# Start workflow. # Start workflow.
wf_ex = self.engine.start_workflow('wb1.with_items', WF_INPUT) wf_ex = self.engine.start_workflow('wb.wf', WF_INPUT)
self.await_workflow_success(wf_ex.id) self.await_workflow_success(wf_ex.id)
@ -199,7 +199,7 @@ class WithItemsEngineTest(base.EngineTestCase):
task1_ex = self._assert_single_item(task_execs, name='task1') task1_ex = self._assert_single_item(task_execs, name='task1')
with_items_ctx = task1_ex.runtime_context['with_items_context'] with_items_ctx = task1_ex.runtime_context['with_items']
self.assertEqual(3, with_items_ctx['count']) self.assertEqual(3, with_items_ctx['count'])
@ -227,7 +227,7 @@ class WithItemsEngineTest(base.EngineTestCase):
wf_text = """--- wf_text = """---
version: "2.0" version: "2.0"
with_items: wf:
type: direct type: direct
tasks: tasks:
@ -243,7 +243,7 @@ class WithItemsEngineTest(base.EngineTestCase):
wf_service.create_workflows(wf_text) wf_service.create_workflows(wf_text)
# Start workflow. # Start workflow.
wf_ex = self.engine.start_workflow('with_items', {}) wf_ex = self.engine.start_workflow('wf', {})
self.await_workflow_success(wf_ex.id) self.await_workflow_success(wf_ex.id)
@ -257,7 +257,7 @@ class WithItemsEngineTest(base.EngineTestCase):
wf_text = """--- wf_text = """---
version: "2.0" version: "2.0"
with_items: wf:
type: direct type: direct
tasks: tasks:
@ -269,7 +269,7 @@ class WithItemsEngineTest(base.EngineTestCase):
wf_service.create_workflows(wf_text) wf_service.create_workflows(wf_text)
# Start workflow. # Start workflow.
wf_ex = self.engine.start_workflow('with_items', {}) wf_ex = self.engine.start_workflow('wf', {})
self.await_workflow_error(wf_ex.id) self.await_workflow_error(wf_ex.id)
@ -294,19 +294,19 @@ class WithItemsEngineTest(base.EngineTestCase):
name: wb1 name: wb1
workflows: workflows:
with_items: wf:
type: direct type: direct
tasks: tasks:
task1: task1:
with-items: i in [1, 2, 3] with-items: i in [1, 2, 3]
workflow: subworkflow workflow: subwf
on-error: task2 on-error: task2
task2: task2:
action: std.echo output="With-items failed" action: std.echo output="With-items failed"
subworkflow: subwf:
type: direct type: direct
tasks: tasks:
@ -317,7 +317,7 @@ class WithItemsEngineTest(base.EngineTestCase):
wb_service.create_workbook_v2(wb_text) wb_service.create_workbook_v2(wb_text)
# Start workflow. # Start workflow.
wf_ex = self.engine.start_workflow('wb1.with_items', {}) wf_ex = self.engine.start_workflow('wb1.wf', {})
self.await_workflow_success(wf_ex.id) self.await_workflow_success(wf_ex.id)
@ -334,7 +334,7 @@ class WithItemsEngineTest(base.EngineTestCase):
wf_input.update({'greeting': 'Hello'}) wf_input.update({'greeting': 'Hello'})
# Start workflow. # Start workflow.
wf_ex = self.engine.start_workflow('wb1.with_items', wf_input) wf_ex = self.engine.start_workflow('wb.wf', wf_input)
self.await_workflow_success(wf_ex.id) self.await_workflow_success(wf_ex.id)
@ -362,7 +362,7 @@ class WithItemsEngineTest(base.EngineTestCase):
wf_input = {'arrayI': ['a', 'b', 'c'], 'arrayJ': [1, 2, 3]} wf_input = {'arrayI': ['a', 'b', 'c'], 'arrayJ': [1, 2, 3]}
# Start workflow. # Start workflow.
wf_ex = self.engine.start_workflow('wb1.with_items', wf_input) wf_ex = self.engine.start_workflow('wb.wf', wf_input)
self.await_workflow_success(wf_ex.id) self.await_workflow_success(wf_ex.id)
@ -391,7 +391,7 @@ class WithItemsEngineTest(base.EngineTestCase):
wb_service.create_workbook_v2(WB_ACTION_CONTEXT) wb_service.create_workbook_v2(WB_ACTION_CONTEXT)
# Start workflow. # Start workflow.
wf_ex = self.engine.start_workflow('wb1.wf1_with_items', WF_INPUT_URLS) wf_ex = self.engine.start_workflow('wb.wf', WF_INPUT_URLS)
with db_api.transaction(): with db_api.transaction():
wf_ex = db_api.get_workflow_execution(wf_ex.id) wf_ex = db_api.get_workflow_execution(wf_ex.id)
@ -580,7 +580,7 @@ class WithItemsEngineTest(base.EngineTestCase):
wb_service.create_workbook_v2(WB) wb_service.create_workbook_v2(WB)
# Start workflow. # Start workflow.
wf_ex = self.engine.start_workflow('wb1.with_items', WF_INPUT_ONE_ITEM) wf_ex = self.engine.start_workflow('wb.wf', WF_INPUT_ONE_ITEM)
self.await_workflow_success(wf_ex.id) self.await_workflow_success(wf_ex.id)
@ -770,7 +770,7 @@ class WithItemsEngineTest(base.EngineTestCase):
wf_with_concurrency_2 = """--- wf_with_concurrency_2 = """---
version: "2.0" version: "2.0"
concurrency_test: wf:
type: direct type: direct
input: input:
@ -786,7 +786,7 @@ class WithItemsEngineTest(base.EngineTestCase):
wf_service.create_workflows(wf_with_concurrency_2) wf_service.create_workflows(wf_with_concurrency_2)
# Start workflow. # Start workflow.
wf_ex = self.engine.start_workflow('concurrency_test', {}) wf_ex = self.engine.start_workflow('wf', {})
with db_api.transaction(): with db_api.transaction():
wf_ex = db_api.get_workflow_execution(wf_ex.id) wf_ex = db_api.get_workflow_execution(wf_ex.id)

View File

@ -14,12 +14,16 @@
from mistral.db.v2.sqlalchemy import models from mistral.db.v2.sqlalchemy import models
from mistral.engine import tasks
from mistral.tests.unit import base from mistral.tests.unit import base
from mistral.workflow import states from mistral.workflow import states
from mistral.workflow import with_items
class WithItemsTest(base.BaseTest): # TODO(rakhmerov): This test is a legacy of the previous 'with-items'
# implementation when most of its logic was in with_items.py module.
# It makes sense to add more test for various methods of WithItemsTask.
class WithItemsTaskTest(base.BaseTest):
@staticmethod @staticmethod
def get_action_ex(accepted, state, index): def get_action_ex(accepted, state, index):
return models.ActionExecution( return models.ActionExecution(
@ -35,7 +39,7 @@ class WithItemsTest(base.BaseTest):
'action': 'myaction' 'action': 'myaction'
}, },
runtime_context={ runtime_context={
'with_items_context': { 'with_items': {
'capacity': 3, 'capacity': 3,
'count': 6 'count': 6
} }
@ -44,6 +48,8 @@ class WithItemsTest(base.BaseTest):
workflow_executions=[] workflow_executions=[]
) )
task = tasks.WithItemsTask(None, None, None, {}, task_ex)
# Set 3 items: 2 success and 1 error unaccepted. # Set 3 items: 2 success and 1 error unaccepted.
task_ex.action_executions += [ task_ex.action_executions += [
self.get_action_ex(True, states.SUCCESS, 0), self.get_action_ex(True, states.SUCCESS, 0),
@ -52,6 +58,6 @@ class WithItemsTest(base.BaseTest):
] ]
# Then call get_indices and expect [2, 3, 4]. # Then call get_indices and expect [2, 3, 4].
indices = with_items.get_next_indices(task_ex) indexes = task._get_next_indexes()
self.assertListEqual([2, 3, 4], indices) self.assertListEqual([2, 3, 4], indexes)

View File

@ -26,7 +26,6 @@ from mistral import utils
from mistral.utils import inspect_utils from mistral.utils import inspect_utils
from mistral.workbook import parser as spec_parser from mistral.workbook import parser as spec_parser
from mistral.workflow import states from mistral.workflow import states
from mistral.workflow import with_items
LOG = logging.getLogger(__name__) LOG = logging.getLogger(__name__)
CONF = cfg.CONF CONF = cfg.CONF
@ -174,7 +173,10 @@ def get_task_execution_result(task_ex):
task_spec = spec_parser.get_task_spec(task_ex.spec) task_spec = spec_parser.get_task_spec(task_ex.spec)
if task_spec.get_with_items(): if task_spec.get_with_items():
if with_items.get_count(task_ex) > 0: # TODO(rakhmerov): Smell: violation of 'with-items' encapsulation.
with_items_ctx = task_ex.runtime_context.get('with_items')
if with_items_ctx and with_items_ctx.get('count') > 0:
return results return results
else: else:
return [] return []

View File

@ -1,207 +0,0 @@
# Copyright 2014 - Mirantis, Inc.
# Copyright 2016 - Brocade Communications Systems, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import copy
import six
from mistral import exceptions as exc
from mistral.workflow import states
# TODO(rakhmerov): Seems like it makes sense to get rid of this module in favor
# of implementing all the needed logic in engine.tasks.WithItemsTask directly.
_CAPACITY = 'capacity'
_CONCURRENCY = 'concurrency'
_COUNT = 'count'
_WITH_ITEMS = 'with_items_context'
_DEFAULT_WITH_ITEMS = {
_COUNT: 0,
_CONCURRENCY: 0,
_CAPACITY: 0
}
def _get_context(task_ex):
return task_ex.runtime_context.get(_WITH_ITEMS, _DEFAULT_WITH_ITEMS)
def get_count(task_ex):
return _get_context(task_ex)[_COUNT]
def get_capacity(task_ex):
return _get_context(task_ex)[_CAPACITY]
def get_concurrency(task_ex):
return task_ex.runtime_context.get(_CONCURRENCY)
def is_completed(task_ex):
find_cancelled = lambda x: x.accepted and x.state == states.CANCELLED
if list(filter(find_cancelled, task_ex.executions)):
return True
execs = list(filter(lambda t: t.accepted, task_ex.executions))
count = get_count(task_ex) or 1
return count == len(execs)
def get_index(task_ex):
f = lambda x: (
x.accepted or
states.is_running(x.state) or
states.is_idle(x.state)
)
return len(list(filter(f, task_ex.executions)))
def get_final_state(task_ex):
find_error = lambda x: x.accepted and x.state == states.ERROR
find_cancelled = lambda x: x.accepted and x.state == states.CANCELLED
if list(filter(find_cancelled, task_ex.executions)):
return states.CANCELLED
elif list(filter(find_error, task_ex.executions)):
return states.ERROR
else:
return states.SUCCESS
def _get_with_item_indices(exs):
"""Returns a list of indices in case of re-running with-items.
:param exs: List of executions.
:return: a list of numbers.
"""
return sorted(set([ex.runtime_context['index'] for ex in exs]))
def _get_accepted_executions(task_ex):
# Choose only if not accepted but completed.
return list(
filter(
lambda x: x.accepted and states.is_completed(x.state),
task_ex.executions
)
)
def _get_unaccepted_executions(task_ex):
# Choose only if not accepted but completed.
return list(
filter(
lambda x: not x.accepted and states.is_completed(x.state),
task_ex.executions
)
)
def get_next_indices(task_ex):
capacity = get_capacity(task_ex)
count = get_count(task_ex)
accepted = _get_with_item_indices(_get_accepted_executions(task_ex))
unaccepted = _get_with_item_indices(_get_unaccepted_executions(task_ex))
candidates = sorted(list(set(unaccepted) - set(accepted)))
if candidates:
indices = copy.copy(candidates)
if max(candidates) < count - 1:
indices += list(six.moves.range(max(candidates) + 1, count))
else:
index = get_index(task_ex)
indices = list(six.moves.range(index, count))
return indices[:capacity]
def increase_capacity(task_ex):
ctx = _get_context(task_ex)
concurrency = get_concurrency(task_ex)
if concurrency and ctx[_CAPACITY] < concurrency:
ctx[_CAPACITY] += 1
task_ex.runtime_context.update({_WITH_ITEMS: ctx})
def decrease_capacity(task_ex, count):
ctx = _get_context(task_ex)
capacity = ctx[_CAPACITY]
if capacity is not None:
if capacity >= count:
ctx[_CAPACITY] -= count
else:
raise RuntimeError(
"Can't decrease with-items capacity [capacity=%s, count=%s]"
% (capacity, count)
)
task_ex.runtime_context.update({_WITH_ITEMS: ctx})
def is_new(task_ex):
return not task_ex.runtime_context.get(_WITH_ITEMS)
def prepare_runtime_context(task_ex, task_spec, action_count):
runtime_ctx = task_ex.runtime_context
with_items_spec = task_spec.get_with_items()
if with_items_spec and not runtime_ctx.get(_WITH_ITEMS):
# Prepare current indexes and parallel limitation.
runtime_ctx[_WITH_ITEMS] = {
_CAPACITY: get_concurrency(task_ex),
_COUNT: action_count
}
def validate_values(with_items_values):
# Take only mapped values and check them.
values = list(with_items_values.values())
if not all([isinstance(v, list) for v in values]):
raise exc.InputException(
"Wrong input format for: %s. List type is"
" expected for each value." % with_items_values
)
required_len = len(values[0])
if not all(len(v) == required_len for v in values):
raise exc.InputException(
"Wrong input format for: %s. All arrays must"
" have the same length." % with_items_values
)
def has_more_iterations(task_ex):
# See action executions which have been already
# accepted or are still running.
action_exs = list(filter(
lambda x: x.accepted or x.state == states.RUNNING,
task_ex.executions
))
return get_count(task_ex) > len(action_exs)