Changing publishing mechanism to allow referencing context variables

* All previously published variables can be now access using '$.'
  as in other places. E.g., if we had variable 'var1' in the context
  we can use '$.var1' to access it in 'publish'.
* Action/workflow result can be referenced using "$.taskName.".
  E.g, if task name is 'task0' and its action/workflow returned {x: y}
  then 'x' can be referenced using '$.task0.x' in 'publish'.
* Fixed a lot of unit tests
* Fixed with-items implementation (part related to task output)

Change-Id: Ib7532572dd677b517efd40a812cd3633b5de756f
Closes-Bug: #1412635
This commit is contained in:
Renat Akhmerov 2015-01-23 19:36:31 +06:00
parent 4d088ee033
commit 93a33c169d
18 changed files with 198 additions and 206 deletions

View File

@ -1,12 +1,13 @@
---
version: "2.0"
name: "test"
version: 2.0
name: test
workflows:
test:
type: direct
tasks:
hello:
action: std.echo output="Hello"
publish:
result: $
result: {$.hello}

View File

@ -3,23 +3,26 @@ version: '2.0'
wf:
type: direct
tasks:
hello:
action: std.echo output="Hello"
policies:
wait-before: 1
publish:
result: $
result: {$.hello}
wf1:
type: reverse
input:
- farewell
tasks:
addressee:
action: std.echo output="John"
publish:
name: $
name: $.adressee
goodbye:
action: std.echo output="{$.farewell}, {$.name}"
requires: [addressee]

View File

@ -50,13 +50,13 @@ workflows:
- str2
output:
workflow_result: $.result # Access to execution context variables
concat_task_result: $.task.concat # Access to the same but via 'task'
concat_task_result: $.concat # Access to the same but via task name
tasks:
concat:
action: concat_twice s1={$.str1} s2={$.str2}
publish:
result: $
result: $.concat
"""
@ -76,6 +76,8 @@ class AdhocActionsTest(base.EngineTestCase):
exec_db = db_api.get_execution(exec_db.id)
self.maxDiff = None
self.assertDictEqual(
{
'workflow_result': 'a+b and a+b',

View File

@ -19,8 +19,8 @@ from mistral.db.v2 import api as db_api
from mistral.db.v2.sqlalchemy import models
from mistral.openstack.common import log as logging
from mistral.services import workbooks as wb_service
from mistral.tests import base as testbase
from mistral.tests.unit.engine1 import base as testengine1
from mistral.tests import base as test_base
from mistral.tests.unit.engine1 import base as engine_test_base
from mistral.workflow import data_flow
from mistral.workflow import states
from mistral.workflow import utils
@ -43,27 +43,26 @@ workflows:
tasks:
task1:
action: std.echo output="Hi,"
action: std.echo output="Hi"
publish:
hi: $
hi: $.task1
on-success:
- task2
task2:
action: std.echo output="Morpheus"
publish:
username: $
to: $.task2
on-success:
- task3
task3:
action: std.echo output="{$.hi} {$.username}.Nebuchadnezzar!"
publish:
result: $
result: "{$.hi}, {$.to}! Sincerely, your {$.__env.from}."
"""
class DataFlowEngineTest(testengine1.EngineTestCase):
class DataFlowEngineTest(engine_test_base.EngineTestCase):
def setUp(self):
super(DataFlowEngineTest, self).setUp()
@ -71,12 +70,14 @@ class DataFlowEngineTest(testengine1.EngineTestCase):
def test_trivial_dataflow(self):
# Start workflow.
exec_db = self.engine.start_workflow('wb.wf1', {})
self._await(
lambda: self.is_execution_success(exec_db.id),
exec_db = self.engine.start_workflow(
'wb.wf1',
{},
environment={'from': 'Neo'}
)
self._await(lambda: self.is_execution_success(exec_db.id))
# Note: We need to reread execution to access related tasks.
exec_db = db_api.get_execution(exec_db.id)
@ -89,39 +90,15 @@ class DataFlowEngineTest(testengine1.EngineTestCase):
task3 = self._assert_single_item(tasks, name='task3')
self.assertEqual(states.SUCCESS, task3.state)
self.assertDictEqual({'hi': 'Hi'}, task1.output)
self.assertDictEqual({'to': 'Morpheus'}, task2.output)
self.assertDictEqual(
{
'task': {
'task1': {'hi': 'Hi,'},
},
'hi': 'Hi,',
},
task1.output
)
self.assertDictEqual(
{
'task': {
'task2': {'username': 'Morpheus'},
},
'username': 'Morpheus',
},
task2.output
)
self.assertDictEqual(
{
'task': {
'task3': {'result': 'Hi, Morpheus.Nebuchadnezzar!'},
},
'result': 'Hi, Morpheus.Nebuchadnezzar!',
},
{'result': 'Hi, Morpheus! Sincerely, your Neo.'},
task3.output
)
class DataFlowTest(testbase.BaseTest):
class DataFlowTest(test_base.BaseTest):
def test_evaluate_task_output_simple(self):
"""Test simplest green-path scenario:
action status is SUCCESS, action output is string
@ -131,16 +108,15 @@ class DataFlowTest(testbase.BaseTest):
Expected to get publish variables AS IS.
"""
publish_dict = {'foo': 'bar'}
action_output = "string data"
task_db = models.Task(name="task1")
action_output = 'string data'
task_db = models.Task(name='task1')
task_spec = mock.MagicMock()
task_spec.get_publish = mock.MagicMock(return_value=publish_dict)
raw_result = utils.TaskResult(data=action_output, error=None)
res = data_flow.evaluate_task_output(task_db, task_spec, raw_result)
self.assertEqual(res['foo'], "bar")
self.assertEqual(res['task']['task1'], publish_dict)
self.assertEqual(res['foo'], 'bar')
def test_evaluate_task_output(self):
"""Test green-path scenario with evaluations
@ -150,19 +126,39 @@ class DataFlowTest(testbase.BaseTest):
Expected to get resolved publish variables.
"""
publish_dict = {'a': '{$.akey}', 'e': "$.__env.ekey"}
action_output = {'akey': "adata"}
env = {'ekey': "edata"}
task_db = models.Task(name="task1")
task_db.in_context = {'__env': env}
in_context = {
'var': 'val',
'__env': {'ekey': 'edata'}
}
action_output = {'akey': 'adata'}
publish = {
'v': '{$.var}',
'e': '$.__env.ekey',
'a': '{$.task1.akey}'
}
task_db = models.Task(name='task1')
task_db.in_context = in_context
task_spec = mock.MagicMock()
task_spec.get_publish = mock.MagicMock(return_value=publish_dict)
task_spec.get_publish = mock.MagicMock(return_value=publish)
raw_result = utils.TaskResult(data=action_output, error=None)
res = data_flow.evaluate_task_output(task_db, task_spec, raw_result)
self.assertEqual(res['a'], "adata")
self.assertEqual(res['e'], "edata")
self.assertEqual(res['task']['task1'], {'a': "adata", 'e': 'edata'})
self.assertEqual(3, len(res))
# Resolved from inbound context.
self.assertEqual(res['v'], 'val')
# Resolved from environment.
self.assertEqual(res['e'], 'edata')
# Resolved from action output.
self.assertEqual(res['a'], 'adata')
def test_evaluate_task_output_with_error(self):
"""Test handling ERROR in action
@ -171,14 +167,22 @@ class DataFlowTest(testbase.BaseTest):
Expected to get action error.
"""
publish_dict = {'foo': '$.akey'}
action_output = "error data"
task_db = models.Task(name="task1")
publish = {'foo': '$.akey'}
action_output = 'error data'
task_db = models.Task(name='task1')
task_spec = mock.MagicMock()
task_spec.get_publish = mock.MagicMock(return_value=publish_dict)
task_spec.get_publish = mock.MagicMock(return_value=publish)
raw_result = utils.TaskResult(data=None, error=action_output)
res = data_flow.evaluate_task_output(task_db, task_spec, raw_result)
self.assertDictEqual(
res, {'error': action_output, 'task': {'task1': action_output}})
res,
{
'error': action_output,
'task': {'task1': action_output}
}
)

View File

@ -54,7 +54,7 @@ workflows:
task1:
action: std.echo output="{$.param1}"
publish:
result: $
result: $.task1
task2:
action: std.echo output="{$.param2}"
@ -250,13 +250,7 @@ class DefaultEngineTest(base.DbTestCase):
self._assert_dict_contains_subset(wf_input, task1_db.in_context)
self.assertIn('__execution', task_db.in_context)
self.assertDictEqual({'output': 'Hey'}, task1_db.input)
self.assertDictEqual(
{
'result': 'Hey',
'task': {'task1': {'result': 'Hey'}}
},
task1_db.output
)
self.assertDictEqual({'result': 'Hey'}, task1_db.output)
exec_db = db_api.get_execution(exec_db.id)
@ -290,7 +284,7 @@ class DefaultEngineTest(base.DbTestCase):
self._assert_dict_contains_subset(in_context, task2_db.in_context)
self.assertIn('__execution', task_db.in_context)
self.assertDictEqual({'output': 'Hi'}, task2_db.input)
self.assertDictEqual({'task': {'task2': None}}, task2_db.output)
self.assertDictEqual({}, task2_db.output)
self.assertEqual(2, len(exec_db.tasks))

View File

@ -53,14 +53,14 @@ workflows:
action: std.echo output='{$.param1}'
target: $.__env.var1
publish:
result1: $
result1: $.task1
task2:
requires: [task1]
action: std.echo output="'{$.result1} & {$.param2}'"
target: $.__env.var1
publish:
final_result: $
final_result: $.task2
wf2:
type: direct
@ -76,7 +76,7 @@ workflows:
param2: $.__env.var3
task_name: task2
publish:
slogan: "{$.final_result} is a cool {$.__env.var4}!"
slogan: "{$.task1.final_result} is a cool {$.__env.var4}!"
"""

View File

@ -39,14 +39,14 @@ wf:
task1:
action: std.echo output=1
publish:
result1: $
result1: $.task1
on-complete:
- task3
task2:
action: std.echo output=2
publish:
result2: $
result2: $.task2
on-complete:
- task3
@ -54,7 +54,7 @@ wf:
join: all
action: std.echo output="{$.result1},{$.result2}"
publish:
result3: $
result3: $.task3
"""
@ -72,7 +72,7 @@ wf:
task1:
action: std.echo output=1
publish:
result1: $
result1: $.task1
on-complete:
- task3
@ -85,7 +85,7 @@ wf:
join: all
action: std.echo output="{$.result1}-{$.result1}"
publish:
result3: $
result3: $.task3
"""
WF_FULL_JOIN_WITH_CONDITIONS = """
@ -102,14 +102,14 @@ wf:
task1:
action: std.echo output=1
publish:
result1: $
result1: $.task1
on-complete:
- task3
task2:
action: std.echo output=2
publish:
result2: $
result2: $.task2
on-complete:
- task3: $.result2 = 11111
- task4: $.result2 = 2
@ -118,12 +118,12 @@ wf:
join: all
action: std.echo output="{$.result1}-{$.result1}"
publish:
result3: $
result3: $.task3
task4:
action: std.echo output=4
publish:
result4: $
result4: $.task4
"""
WF_PARTIAL_JOIN = """
@ -140,14 +140,14 @@ wf:
task1:
action: std.echo output=1
publish:
result1: $
result1: $.task1
on-complete:
- task4
task2:
action: std.echo output=2
publish:
result2: $
result2: $.task2
on-complete:
- task4
@ -167,7 +167,7 @@ wf:
join: 2
action: std.echo output="{$.result1},{$.result2}"
publish:
result4: $
result4: $.task4
"""
WF_PARTIAL_JOIN_TRIGGERS_ONCE = """
@ -213,7 +213,7 @@ wf:
join: 2
action: std.echo output="{$.result1},{$.result2},{$.result3},{$.result4}"
publish:
result5: $
result5: $.task5
"""
WF_DISCRIMINATOR = """
@ -252,7 +252,7 @@ wf:
join: one
action: std.echo output="{$.result1},{$.result2},{$.result3}"
publish:
result4: $
result4: $.task4
"""
@ -356,7 +356,6 @@ class JoinEngineTest(base.EngineTestCase):
self.assertDictEqual(
{
'result4': '1,2',
'task': {'task4': {'result4': '1,2'}}
},
task4.output
)

View File

@ -43,14 +43,14 @@ wf:
task1:
action: std.echo output={$.num1}
publish:
result1: $
result1: $.task1
on-complete:
- task3
task2:
action: std.echo output={$.num2}
publish:
result2: $
result2: $.task2
on-complete:
- task3
@ -66,12 +66,12 @@ wf:
task4:
action: std.echo output=4
publish:
result4: $
result4: $.task4
task5:
action: std.echo output=5
publish:
result5: $
result5: $.task5
"""

View File

@ -49,7 +49,7 @@ wf:
task1:
action: std.block
publish:
result: $
result: $.task1
"""
WF_SHORT_ACTION = """
@ -79,7 +79,7 @@ wf:
task1:
action: std.echo output=1
publish:
result1: $
result1: $.task1
task2:
action: std.block
@ -202,7 +202,6 @@ class LongActionTest(base.EngineTestCase):
self.assertDictEqual(
{
'result1': 1,
'task': {'task1': {'result1': 1}}
},
task1.output
)

View File

@ -44,12 +44,12 @@ workflows:
task1:
action: std.echo output='{$.param1}'
publish:
result1: $
result1: $.task1
task2:
action: std.echo output="{$.result1} & {$.param2}"
publish:
result2: $
result2: $.task2
requires: [task1]
"""
@ -88,7 +88,7 @@ class ReverseWorkflowEngineTest(base.EngineTestCase):
state=states.SUCCESS
)
self.assertEqual('a', exec_db.output['task']['task1']['result1'])
self.assertEqual('a', exec_db.output['task1']['result1'])
self._assert_dict_contains_subset({'result1': 'a'}, exec_db.output)
def test_start_task2(self):
@ -125,7 +125,7 @@ class ReverseWorkflowEngineTest(base.EngineTestCase):
state=states.SUCCESS
)
self.assertEqual('a', exec_db.output['task']['task1']['result1'])
self.assertEqual('a & b', exec_db.output['task']['task2']['result2'])
self.assertEqual('a', exec_db.output['task1']['result1'])
self.assertEqual('a & b', exec_db.output['task2']['result2'])
self._assert_dict_contains_subset({'result1': 'a'}, exec_db.output)
self._assert_dict_contains_subset({'result2': 'a & b'}, exec_db.output)

View File

@ -50,12 +50,12 @@ workflows:
task1:
action: std.echo output='{$.param1}'
publish:
result1: $
result1: $.task1
task2:
action: std.echo output="'{$.param1} & {$.param2}'"
publish:
final_result: $
final_result: $.task2
requires: [task1]
wf2:
@ -67,7 +67,7 @@ workflows:
task1:
workflow: wf1 param1='Bonnie' param2='Clyde' task_name='task2'
publish:
slogan: "{$.final_result} is a cool movie!"
slogan: "{$.task1.final_result} is a cool movie!"
"""

View File

@ -47,7 +47,7 @@ workflows:
with-items: name_info in $.names_info
action: std.echo output={$.name_info.name}
publish:
result: $
result: $.task1
"""
@ -70,7 +70,7 @@ workflows:
with-items: name_info in $.names_info
action: std.echo output="{$.greeting}, {$.name_info.name}!"
publish:
result: $
result: $.task1
"""
@ -95,7 +95,7 @@ workflows:
- itemY in $.arrayJ
action: std.echo output="{$.itemX} {$.itemY}"
publish:
result: $
result: $.task1
"""
@ -133,6 +133,7 @@ class WithItemsEngineTest(base.EngineTestCase):
# Since we know that we can receive results in random order,
# check is not depend on order of items.
result = task1.output['result']
self.assertTrue(isinstance(result, list))
self.assertIn('John', result)

View File

@ -38,7 +38,7 @@ workflows:
task1:
action: std.echo output="Hey"
publish:
res1: $
res1: $.task1
on-complete:
- task2: $.res1 = 'Hey'
- task3: $.res1 = 'Not Hey'

View File

@ -23,19 +23,21 @@ from mistral.workflow import with_items
TASK_DICT = {
"name": "task1",
"version": "2.0",
"action": "std.echo",
"with-items": [
"item in $.array"
'name': 'task1',
'version': '2.0',
'action': 'std.echo',
'with-items': [
'item in $.array'
],
"input": {
"array": "$.my_array"
'input': {
'array': '$.my_array'
}
}
TASK_SPEC = tasks.TaskSpec(TASK_DICT)
TASK_DB = models.Task(
name="task1",
name='task1',
output=None,
)
@ -43,36 +45,21 @@ TASK_DB = models.Task(
class WithItemsCalculationsTest(base.BaseTest):
def test_calculate_output_with_key(self):
task_dict = TASK_DICT.copy()
task_dict['publish'] = {"result": "$"}
task_dict['publish'] = {'result': '$.task1'}
task_spec = tasks.TaskSpec(task_dict)
raw_result = utils.TaskResult(data="output!")
raw_result = utils.TaskResult(data='output!')
output = with_items.get_output(TASK_DB, task_spec, raw_result)
self.assertDictEqual(
{
'task': {
'task1': {
'result': ['output!']
}
},
'result': ['output!']
}, output
)
self.assertDictEqual({'result': ['output!']}, output)
def test_calculate_output_without_key(self):
raw_result = utils.TaskResult(data="output!")
raw_result = utils.TaskResult(data='output!')
output = with_items.get_output(TASK_DB, TASK_SPEC, raw_result)
self.assertDictEqual(
{
'task': {
'task1': [None]
}
},
output
)
# TODO(rakhmerov): Fix during result/output refactoring.
self.assertDictEqual({}, output)
def test_calculate_input(self):
with_items_input = {
@ -106,6 +93,7 @@ class WithItemsCalculationsTest(base.BaseTest):
'server3'
]
}
action_input_collection = with_items.calc_input(with_items_input)
self.assertListEqual(
@ -135,7 +123,7 @@ class WithItemsCalculationsTest(base.BaseTest):
with_items_input
)
self.assertIn("the same length", exception.message)
self.assertIn('the same length', exception.message)
def test_calculate_input_not_list(self):
with_items_input = {
@ -152,4 +140,4 @@ class WithItemsCalculationsTest(base.BaseTest):
with_items_input
)
self.assertIn("List type", exception.message)
self.assertIn('List type', exception.message)

View File

@ -120,11 +120,9 @@ class WorkflowHandler(object):
if not self.is_paused_or_completed():
self._set_execution_state(states.SUCCESS)
task_out_ctx = data_flow.evaluate_outbound_context(task_db)
self.exec_db.output = data_flow.evaluate_workflow_output(
self.wf_spec,
task_out_ctx
data_flow.evaluate_task_outbound_context(task_db)
)
return cmds

View File

@ -54,7 +54,7 @@ def prepare_db_task(task_db, task_spec, upstream_task_specs, exec_db,
# TODO(rakhmerov): Think if Data Flow should be a part of wf handler.
task_db.in_context = utils.merge_dicts(
task_db.in_context,
evaluate_outbound_context(cause_task_db)
evaluate_task_outbound_context(cause_task_db)
)
task_db.input = evaluate_task_input(
@ -86,7 +86,7 @@ def _evaluate_upstream_context(upstream_db_tasks):
ctx = {}
for t_db in upstream_db_tasks:
utils.merge_dicts(ctx, evaluate_outbound_context(t_db))
utils.merge_dicts(ctx, evaluate_task_outbound_context(t_db))
return ctx
@ -99,32 +99,51 @@ def evaluate_task_output(task_db, task_spec, raw_result):
:param raw_result: Raw task result that comes from action/workflow
(before publisher). Instance of mistral.workflow.base.TaskResult
:return: Complete task output that goes to Data Flow context for SUCCESS
or raw error for ERRROR
or raw error for ERROR
"""
if raw_result.is_error():
return {'error': raw_result.error,
'task': {task_db.name: raw_result.error}}
return {
'error': raw_result.error,
'task': {task_db.name: raw_result.error}
}
publish_dict = task_spec.get_publish()
# Expression context is task inbound context + action/workflow result
# accessible under key task name key.
expr_ctx = copy.deepcopy(task_db.in_context) or {}
# Combine the raw result with the environment variables as the context
# for evaulating the 'publish' clause.
context = copy.deepcopy(raw_result.data) or {}
if (task_db.in_context
and '__env' in task_db.in_context
and isinstance(context, dict)):
context['__env'] = task_db.in_context['__env']
if task_db.name in expr_ctx:
LOG.warning(
'Shadowing context variable with task name while publishing: %s' %
task_db.name
)
output = expr.evaluate_recursively(publish_dict, context)
expr_ctx[task_db.name] = copy.deepcopy(raw_result.data) or {}
# Add same result to task output under key 'task'.
# TODO(dzimine): Move this transofrmation to evaluate_outbound_context
output['task'] = {
task_db.name: copy.copy(output) or None
}
return expr.evaluate_recursively(task_spec.get_publish(), expr_ctx)
return output
def evaluate_task_outbound_context(task_db):
"""Evaluates task outbound Data Flow context.
This method assumes that complete task output (after publisher etc.)
has already been evaluated.
:param task_db: DB task.
:return: Outbound task Data Flow context.
"""
in_context = copy.deepcopy(dict(task_db.in_context)) \
if task_db.in_context is not None else {}
out_ctx = utils.merge_dicts(in_context, task_db.output)
# Add task output under key 'task.taskName'.
out_ctx = utils.merge_dicts(
out_ctx,
{task_db.name: copy.deepcopy(task_db.output) or None}
)
return out_ctx
def evaluate_workflow_output(wf_spec, context):
@ -141,24 +160,6 @@ def evaluate_workflow_output(wf_spec, context):
return output or context
def evaluate_outbound_context(task_db):
"""Evaluates task outbound Data Flow context.
This method assumes that complete task output (after publisher etc.)
has already been evaluated.
:param task_db: DB task.
:return: Outbound task Data Flow context.
"""
in_context = copy.deepcopy(dict(task_db.in_context)) \
if task_db.in_context is not None else {}
return utils.merge_dicts(
in_context,
task_db.output
)
def add_openstack_data_to_context(context):
if context is None:
context = {}

View File

@ -97,7 +97,7 @@ class DirectWorkflowHandler(base.WorkflowHandler):
t_name = task_db.name
t_state = task_db.state
ctx = data_flow.evaluate_outbound_context(task_db)
ctx = data_flow.evaluate_task_outbound_context(task_db)
if states.is_completed(t_state):
on_complete = self.get_on_complete_clause(t_name)

View File

@ -18,6 +18,7 @@ from mistral import exceptions as exc
from mistral import expressions as expr
# TODO(rakhmerov): Partially duplicates data_flow.evaluate_task_output
def get_output(task_db, task_spec, raw_result):
"""Returns output from task markered as with-items
@ -50,14 +51,14 @@ def get_output(task_db, task_spec, raw_result):
}
}
"""
t_name = task_db.name
e_data = raw_result.error
expr_ctx = copy.deepcopy(task_db.in_context) or {}
expr_ctx[task_db.name] = copy.deepcopy(raw_result.data) or {}
# Calc output for with-items (only list form is used).
output = expr.evaluate_recursively(
task_spec.get_publish(),
raw_result.data or {}
)
output = expr.evaluate_recursively(task_spec.get_publish(), expr_ctx)
if not task_db.output:
task_db.output = {}
@ -75,16 +76,16 @@ def get_output(task_db, task_spec, raw_result):
task_output[out_key] = [output.get(out_key) or e_data]
# Add same result to task output under key 'task'.
task_output['task'] = {
t_name: {
out_key: task_output[out_key]
}
}
else:
if 'task' not in task_output:
task_output.update({'task': {t_name: [None or e_data]}})
else:
task_output['task'][t_name].append(None or e_data)
# TODO(rakhmerov): Fix this during output/result refactoring.
# task_output[t_name] =
# {
# out_key: task_output[out_key]
# }
# else:
# if 'task' not in task_output:
# task_output.update({'task': {t_name: [None or e_data]}})
# else:
# task_output['task'][t_name].append(None or e_data)
return task_output
@ -104,8 +105,7 @@ def get_index(task_db):
def get_concurrency_spec(task_spec):
policies = task_spec.get_policies()
if policies:
return policies.get_concurrency()
return policies.get_concurrency() if policies else None
def get_indexes_for_loop(task_db, task_spec):
@ -124,6 +124,7 @@ def do_step(task_db):
if with_items_context['capacity'] > 0:
with_items_context['capacity'] -= 1
with_items_context['index'] += 1
task_db.runtime_context.update({'with_items': with_items_context})
@ -189,13 +190,16 @@ def validate_input(with_items_input):
if not all([isinstance(v, list) for v in values]):
raise exc.InputException(
"Wrong input format for: %s. List type is"
" expected for each value." % with_items_input)
" expected for each value." % with_items_input
)
required_len = len(values[0])
if not all(len(v) == required_len for v in values):
raise exc.InputException(
"Wrong input format for: %s. All arrays must"
" have the same length." % with_items_input)
" have the same length." % with_items_input
)
def _get_output_key(task_spec):
@ -204,6 +208,4 @@ def _get_output_key(task_spec):
def is_iterations_incomplete(task_db):
if get_index(task_db) < get_count(task_db):
return True
return False
return get_index(task_db) < get_count(task_db)