Merge "Make more JSON fields in execution objects lazy-loaded"

This commit is contained in:
Zuul 2019-06-18 09:58:02 +00:00 committed by Gerrit Code Review
commit 74b2fffec2
12 changed files with 249 additions and 210 deletions

View File

@ -51,17 +51,26 @@ STATE_TYPES = wtypes.Enum(
)
def _load_deferred_output_field(ex):
if ex:
# We need to refer to this lazy-load field explicitly in
# order to make sure that it is correctly loaded.
hasattr(ex, 'output')
def _load_deferred_fields(ex, fields):
if not ex:
return ex
# We need to refer lazy-loaded fields explicitly in
# order to make sure that they are correctly loaded.
for f in fields:
hasattr(ex, f)
return ex
def _get_workflow_execution_resource_with_output(wf_ex):
_load_deferred_fields(wf_ex, ['params', 'output'])
return resources.Execution.from_db_model(wf_ex)
def _get_workflow_execution_resource(wf_ex):
_load_deferred_output_field(wf_ex)
_load_deferred_fields(wf_ex, ['params'])
return resources.Execution.from_db_model(wf_ex)
@ -75,7 +84,7 @@ def _get_workflow_execution(id, must_exist=True):
else:
wf_ex = db_api.load_workflow_execution(id)
return _load_deferred_output_field(wf_ex)
return _load_deferred_fields(wf_ex, ['params', 'output'])
# TODO(rakhmerov): Make sure to make all needed renaming on public API.
@ -398,9 +407,9 @@ class ExecutionsController(rest.RestController):
)
if include_output:
resource_function = _get_workflow_execution_resource
resource_function = _get_workflow_execution_resource_with_output
else:
resource_function = None
resource_function = _get_workflow_execution_resource
return rest_utils.get_all(
resources.Executions,

View File

@ -177,7 +177,6 @@ class Execution(mb.MistralSecureModelBase):
workflow_name = sa.Column(sa.String(255))
workflow_namespace = sa.Column(sa.String(255))
workflow_id = sa.Column(sa.String(80))
spec = sa.Column(st.JsonMediumDictType())
state = sa.Column(sa.String(20))
state_info = sa.Column(sa.Text(), nullable=True)
tags = sa.Column(st.JsonListType())
@ -199,6 +198,7 @@ class ActionExecution(Execution):
)
# Main properties.
spec = sa.Column(st.JsonMediumDictType())
accepted = sa.Column(sa.Boolean(), default=False)
input = sa.Column(st.JsonLongDictType(), nullable=True)
output = sa.orm.deferred(sa.Column(st.JsonLongDictType(), nullable=True))
@ -224,10 +224,11 @@ class WorkflowExecution(Execution):
)
# Main properties.
spec = sa.orm.deferred(sa.Column(st.JsonMediumDictType()))
accepted = sa.Column(sa.Boolean(), default=False)
input = sa.Column(st.JsonLongDictType(), nullable=True)
input = sa.orm.deferred(sa.Column(st.JsonLongDictType(), nullable=True))
output = sa.orm.deferred(sa.Column(st.JsonLongDictType(), nullable=True))
params = sa.Column(st.JsonLongDictType())
params = sa.orm.deferred(sa.Column(st.JsonLongDictType()))
# Initial workflow context containing workflow variables, environment,
# openstack security context etc.
@ -235,7 +236,7 @@ class WorkflowExecution(Execution):
# * Data stored in this structure should not be copied into inbound
# contexts of tasks. No need to duplicate it.
# * This structure does not contain workflow input.
context = sa.Column(st.JsonLongDictType())
context = sa.orm.deferred(sa.Column(st.JsonLongDictType()))
class TaskExecution(Execution):
@ -252,6 +253,7 @@ class TaskExecution(Execution):
)
# Main properties.
spec = sa.orm.deferred(sa.Column(st.JsonMediumDictType()))
action_spec = sa.Column(st.JsonLongDictType())
unique_key = sa.Column(sa.String(255), nullable=True)
type = sa.Column(sa.String(10))

View File

@ -898,7 +898,7 @@ class TestExecutionsController(base.APITest):
resource_function = kwargs['resource_function']
self.assertEqual(
execution._get_workflow_execution_resource,
execution._get_workflow_execution_resource_with_output,
resource_function
)
@ -912,7 +912,10 @@ class TestExecutionsController(base.APITest):
args, kwargs = mock_get_all.call_args
resource_function = kwargs['resource_function']
self.assertIsNone(resource_function)
self.assertEqual(
execution._get_workflow_execution_resource,
resource_function
)
@mock.patch('mistral.db.v2.api.get_workflow_executions')
@mock.patch('mistral.context.MistralContext.from_environ')

View File

@ -2112,17 +2112,18 @@ class TaskExecutionTest(SQLAlchemyTest):
self.assertIsNone(created.updated_at)
updated = db_api.update_task_execution(
created.id,
{'workflow_name': 'new_wf'}
)
with db_api.transaction():
updated = db_api.update_task_execution(
created.id,
{'workflow_name': 'new_wf'}
)
self.assertEqual('new_wf', updated.workflow_name)
self.assertEqual('new_wf', updated.workflow_name)
fetched = db_api.get_task_execution(created.id)
fetched = db_api.get_task_execution(created.id)
self.assertEqual(updated, fetched)
self.assertIsNotNone(fetched.updated_at)
self.assertEqual(updated, fetched)
self.assertIsNotNone(fetched.updated_at)
def test_create_or_update_task_execution(self):
id = 'not-existing-id'
@ -2139,20 +2140,21 @@ class TaskExecutionTest(SQLAlchemyTest):
self.assertIsNotNone(created)
self.assertIsNotNone(created.id)
updated = db_api.create_or_update_task_execution(
created.id,
{'state': 'RUNNING'}
)
with db_api.transaction():
updated = db_api.create_or_update_task_execution(
created.id,
{'state': 'RUNNING'}
)
self.assertEqual('RUNNING', updated.state)
self.assertEqual(
'RUNNING',
db_api.load_task_execution(updated.id).state
)
self.assertEqual('RUNNING', updated.state)
self.assertEqual(
'RUNNING',
db_api.load_task_execution(updated.id).state
)
fetched = db_api.get_task_execution(created.id)
fetched = db_api.get_task_execution(created.id)
self.assertEqual(updated, fetched)
self.assertEqual(updated, fetched)
def test_get_task_executions(self):
wf_ex = db_api.create_workflow_execution(WF_EXECS[0])
@ -2183,10 +2185,12 @@ class TaskExecutionTest(SQLAlchemyTest):
created.name,
'eq'
)
fetched = db_api.get_task_executions(**_filter)
self.assertEqual(1, len(fetched))
self.assertEqual(created, fetched[0])
with db_api.transaction():
fetched = db_api.get_task_executions(**_filter)
self.assertEqual(1, len(fetched))
self.assertEqual(created, fetched[0])
def test_filter_task_execution_by_not_equal_value(self):
created0, created1 = self._create_task_executions()
@ -2197,10 +2201,11 @@ class TaskExecutionTest(SQLAlchemyTest):
'neq'
)
fetched = db_api.get_task_executions(**_filter)
with db_api.transaction():
fetched = db_api.get_task_executions(**_filter)
self.assertEqual(1, len(fetched))
self.assertEqual(created1, fetched[0])
self.assertEqual(1, len(fetched))
self.assertEqual(created1, fetched[0])
def test_filter_task_execution_by_greater_than_value(self):
created0, created1 = self._create_task_executions()
@ -2210,10 +2215,12 @@ class TaskExecutionTest(SQLAlchemyTest):
created0['created_at'],
'gt'
)
fetched = db_api.get_task_executions(**_filter)
self.assertEqual(1, len(fetched))
self.assertEqual(created1, fetched[0])
with db_api.transaction():
fetched = db_api.get_task_executions(**_filter)
self.assertEqual(1, len(fetched))
self.assertEqual(created1, fetched[0])
def test_filter_task_execution_by_greater_than_equal_value(self):
created0, created1 = self._create_task_executions()
@ -2223,6 +2230,7 @@ class TaskExecutionTest(SQLAlchemyTest):
created0['created_at'],
'gte'
)
fetched = db_api.get_task_executions(**_filter)
self.assertEqual(2, len(fetched))
@ -2237,10 +2245,12 @@ class TaskExecutionTest(SQLAlchemyTest):
created1['created_at'],
'lt'
)
fetched = db_api.get_task_executions(**_filter)
self.assertEqual(1, len(fetched))
self.assertEqual(created0, fetched[0])
with db_api.transaction():
fetched = db_api.get_task_executions(**_filter)
self.assertEqual(1, len(fetched))
self.assertEqual(created0, fetched[0])
def test_filter_task_execution_by_less_than_equal_value(self):
created0, created1 = self._create_task_executions()
@ -2250,6 +2260,7 @@ class TaskExecutionTest(SQLAlchemyTest):
created1['created_at'],
'lte'
)
fetched = db_api.get_task_executions(**_filter)
self.assertEqual(2, len(fetched))
@ -2264,12 +2275,14 @@ class TaskExecutionTest(SQLAlchemyTest):
[created['created_at']],
'in'
)
fetched = db_api.get_task_executions(**_filter)
self.assertEqual(1, len(fetched))
self.assertEqual(created, fetched[0])
with db_api.transaction():
fetched = db_api.get_task_executions(**_filter)
def test_filter_task_execution_by_values_notin_list(self):
self.assertEqual(1, len(fetched))
self.assertEqual(created, fetched[0])
def test_filter_task_execution_by_values_not_in_list(self):
created0, created1 = self._create_task_executions()
_filter = filter_utils.create_or_update_filter(
@ -2277,10 +2290,12 @@ class TaskExecutionTest(SQLAlchemyTest):
[created0['created_at']],
'nin'
)
fetched = db_api.get_task_executions(**_filter)
self.assertEqual(1, len(fetched))
self.assertEqual(created1, fetched[0])
with db_api.transaction():
fetched = db_api.get_task_executions(**_filter)
self.assertEqual(1, len(fetched))
self.assertEqual(created1, fetched[0])
def test_filter_task_execution_by_multiple_columns(self):
created0, created1 = self._create_task_executions()
@ -2296,10 +2311,12 @@ class TaskExecutionTest(SQLAlchemyTest):
'eq',
_filter
)
fetched = db_api.get_task_executions(**_filter)
self.assertEqual(1, len(fetched))
self.assertEqual(created1, fetched[0])
with db_api.transaction():
fetched = db_api.get_task_executions(**_filter)
self.assertEqual(1, len(fetched))
self.assertEqual(created1, fetched[0])
def test_delete_task_execution(self):
wf_ex = db_api.create_workflow_execution(WF_EXECS[0])
@ -2307,13 +2324,14 @@ class TaskExecutionTest(SQLAlchemyTest):
values = copy.deepcopy(TASK_EXECS[0])
values.update({'workflow_execution_id': wf_ex.id})
created = db_api.create_task_execution(values)
with db_api.transaction():
created = db_api.create_task_execution(values)
fetched = db_api.get_task_execution(created.id)
fetched = db_api.get_task_execution(created.id)
self.assertEqual(created, fetched)
self.assertEqual(created, fetched)
db_api.delete_task_execution(created.id)
db_api.delete_task_execution(created.id)
self.assertRaises(
exc.DBEntityNotFoundError,
@ -2328,43 +2346,44 @@ class TaskExecutionTest(SQLAlchemyTest):
values.update({'workflow_execution_id': wf_ex.id})
values['state'] = 'RUNNING'
task_ex1 = db_api.create_task_execution(values)
with db_api.transaction():
task_ex1 = db_api.create_task_execution(values)
task_execs = db_api.get_incomplete_task_executions(
workflow_execution_id=wf_ex.id
)
self.assertEqual(1, len(task_execs))
self.assertEqual(task_ex1, task_execs[0])
self.assertEqual(
1,
db_api.get_incomplete_task_executions_count(
task_execs = db_api.get_incomplete_task_executions(
workflow_execution_id=wf_ex.id
)
)
# Add one more task.
self.assertEqual(1, len(task_execs))
self.assertEqual(task_ex1, task_execs[0])
self.assertEqual(
1,
db_api.get_incomplete_task_executions_count(
workflow_execution_id=wf_ex.id
)
)
values = copy.deepcopy(TASK_EXECS[1])
values.update({'workflow_execution_id': wf_ex.id})
values['state'] = 'SUCCESS'
# Add one more task.
db_api.create_task_execution(values)
values = copy.deepcopy(TASK_EXECS[1])
values.update({'workflow_execution_id': wf_ex.id})
values['state'] = 'SUCCESS'
# It should be still one incompleted task.
db_api.create_task_execution(values)
task_execs = db_api.get_incomplete_task_executions(
workflow_execution_id=wf_ex.id
)
# It should be still one incompleted task.
self.assertEqual(1, len(task_execs))
self.assertEqual(task_ex1, task_execs[0])
self.assertEqual(
1,
db_api.get_incomplete_task_executions_count(
task_execs = db_api.get_incomplete_task_executions(
workflow_execution_id=wf_ex.id
)
)
self.assertEqual(1, len(task_execs))
self.assertEqual(task_ex1, task_execs[0])
self.assertEqual(
1,
db_api.get_incomplete_task_executions_count(
workflow_execution_id=wf_ex.id
)
)
def test_task_execution_repr(self):
wf_ex = db_api.create_workflow_execution(WF_EXECS[0])

View File

@ -127,15 +127,15 @@ class DefaultEngineTest(base.DbTestCase):
task_execs = wf_ex.task_executions
self.assertEqual(1, len(task_execs))
self.assertEqual(1, len(task_execs))
task_ex = task_execs[0]
task_ex = task_execs[0]
self.assertEqual('wb.wf', task_ex.workflow_name)
self.assertEqual('task1', task_ex.name)
self.assertEqual(states.RUNNING, task_ex.state)
self.assertIsNotNone(task_ex.spec)
self.assertDictEqual({}, task_ex.runtime_context)
self.assertEqual('wb.wf', task_ex.workflow_name)
self.assertEqual('task1', task_ex.name)
self.assertEqual(states.RUNNING, task_ex.state)
self.assertIsNotNone(task_ex.spec)
self.assertDictEqual({}, task_ex.runtime_context)
# Data Flow properties.
action_execs = db_api.get_action_executions(
@ -196,15 +196,15 @@ class DefaultEngineTest(base.DbTestCase):
task_execs = wf_ex.task_executions
self.assertEqual(1, len(task_execs))
self.assertEqual(1, len(task_execs))
task_ex = task_execs[0]
task_ex = task_execs[0]
self.assertEqual('wb.wf', task_ex.workflow_name)
self.assertEqual('task1', task_ex.name)
self.assertEqual(states.RUNNING, task_ex.state)
self.assertIsNotNone(task_ex.spec)
self.assertDictEqual({}, task_ex.runtime_context)
self.assertEqual('wb.wf', task_ex.workflow_name)
self.assertEqual('task1', task_ex.name)
self.assertEqual(states.RUNNING, task_ex.state)
self.assertIsNotNone(task_ex.spec)
self.assertDictEqual({}, task_ex.runtime_context)
# Data Flow properties.
action_execs = db_api.get_action_executions(
@ -234,9 +234,10 @@ class DefaultEngineTest(base.DbTestCase):
self.assertIsNotNone(wf_ex)
wf_ex = db_api.get_workflow_execution(wf_ex.id)
with db_api.transaction():
wf_ex = db_api.get_workflow_execution(wf_ex.id)
self.assertDictEqual(wf_ex.params.get('env', {}), env)
self.assertDictEqual(wf_ex.params.get('env', {}), env)
@mock.patch.object(db_api, "load_environment", MOCK_ENVIRONMENT)
def test_start_workflow_with_saved_env(self):
@ -256,9 +257,10 @@ class DefaultEngineTest(base.DbTestCase):
self.assertIsNotNone(wf_ex)
wf_ex = db_api.get_workflow_execution(wf_ex.id)
with db_api.transaction():
wf_ex = db_api.get_workflow_execution(wf_ex.id)
self.assertDictEqual(wf_ex.params.get('env', {}), env)
self.assertDictEqual(wf_ex.params.get('env', {}), env)
@mock.patch.object(db_api, "get_environment", MOCK_NOT_FOUND)
def test_start_workflow_env_not_found(self):
@ -463,15 +465,15 @@ class DefaultEngineTest(base.DbTestCase):
task_execs = wf_ex.task_executions
self.assertEqual(1, len(task_execs))
self.assertEqual(1, len(task_execs))
task1_ex = task_execs[0]
task1_ex = task_execs[0]
self.assertEqual('task1', task1_ex.name)
self.assertEqual(states.RUNNING, task1_ex.state)
self.assertIsNotNone(task1_ex.spec)
self.assertDictEqual({}, task1_ex.runtime_context)
self.assertNotIn('__execution', task1_ex.in_context)
self.assertEqual('task1', task1_ex.name)
self.assertEqual(states.RUNNING, task1_ex.state)
self.assertIsNotNone(task1_ex.spec)
self.assertDictEqual({}, task1_ex.runtime_context)
self.assertNotIn('__execution', task1_ex.in_context)
action_execs = db_api.get_action_executions(
task_execution_id=task1_ex.id

View File

@ -335,10 +335,10 @@ class DirectWorkflowRerunTest(base.EngineTestCase):
wf_ex = db_api.get_workflow_execution(wf_ex.id)
task_execs = wf_ex.task_executions
self.assertEqual(states.ERROR, wf_ex.state)
self.assertIsNotNone(wf_ex.state_info)
self.assertEqual(3, len(task_execs))
self.assertDictEqual(env, wf_ex.params['env'])
self.assertEqual(states.ERROR, wf_ex.state)
self.assertIsNotNone(wf_ex.state_info)
self.assertEqual(3, len(task_execs))
self.assertDictEqual(env, wf_ex.params['env'])
task_10_ex = self._assert_single_item(task_execs, name='t10')
task_21_ex = self._assert_single_item(task_execs, name='t21')

View File

@ -116,22 +116,23 @@ class EnvironmentTest(base.EngineTestCase):
self._await(lambda: len(db_api.get_workflow_executions()) == 2, 0.5, 5)
wf_execs = db_api.get_workflow_executions()
with db_api.transaction():
wf_execs = db_api.get_workflow_executions()
self.assertEqual(2, len(wf_execs))
self.assertEqual(2, len(wf_execs))
# Execution of 'wf1'.
# Execution of 'wf1'.
wf2_ex = self._assert_single_item(wf_execs, name='my_wb.wf2')
wf1_ex = self._assert_single_item(wf_execs, name='my_wb.wf1')
wf2_ex = self._assert_single_item(wf_execs, name='my_wb.wf2')
wf1_ex = self._assert_single_item(wf_execs, name='my_wb.wf1')
expected_wf1_input = {
'param1': 'Bonnie',
'param2': 'Clyde'
}
expected_wf1_input = {
'param1': 'Bonnie',
'param2': 'Clyde'
}
self.assertIsNotNone(wf1_ex.task_execution_id)
self.assertDictEqual(wf1_ex.input, expected_wf1_input)
self.assertIsNotNone(wf1_ex.task_execution_id)
self.assertDictEqual(wf1_ex.input, expected_wf1_input)
# Wait till workflow 'wf1' is completed.
self.await_workflow_success(wf1_ex.id)
@ -389,9 +390,9 @@ class EnvironmentTest(base.EngineTestCase):
name='task1'
)
self.assertDictEqual({'result': 'val1'}, t.published)
self.assertDictEqual({'result': 'val1'}, t.published)
self.assertNotIn('__env', wf_ex.context)
self.assertNotIn('__env', wf_ex.context)
def test_subworkflow_env_no_duplicate(self):
wf_text = """---
@ -444,8 +445,8 @@ class EnvironmentTest(base.EngineTestCase):
sub_wf_ex.output
)
# The environment of the subworkflow must be empty.
# To evaluate expressions it should be taken from the
# parent workflow execution.
self.assertDictEqual({}, sub_wf_ex.params['env'])
self.assertNotIn('__env', sub_wf_ex.context)
# The environment of the subworkflow must be empty.
# To evaluate expressions it should be taken from the
# parent workflow execution.
self.assertDictEqual({}, sub_wf_ex.params['env'])
self.assertNotIn('__env', sub_wf_ex.context)

View File

@ -211,10 +211,10 @@ class ReverseWorkflowRerunTest(base.EngineTestCase):
task_execs = wf_ex.task_executions
self.assertEqual(states.ERROR, wf_ex.state)
self.assertIsNotNone(wf_ex.state_info)
self.assertEqual(2, len(task_execs))
self.assertDictEqual(env, wf_ex.params['env'])
self.assertEqual(states.ERROR, wf_ex.state)
self.assertIsNotNone(wf_ex.state_info)
self.assertEqual(2, len(task_execs))
self.assertDictEqual(env, wf_ex.params['env'])
task_1_ex = self._assert_single_item(task_execs, name='t1')
task_2_ex = self._assert_single_item(task_execs, name='t2')
@ -232,11 +232,12 @@ class ReverseWorkflowRerunTest(base.EngineTestCase):
# Resume workflow and re-run failed task.
self.engine.rerun_workflow(task_2_ex.id, env=updated_env)
wf_ex = db_api.get_workflow_execution(wf_ex.id)
with db_api.transaction():
wf_ex = db_api.get_workflow_execution(wf_ex.id)
self.assertEqual(states.RUNNING, wf_ex.state)
self.assertIsNone(wf_ex.state_info)
self.assertDictEqual(updated_env, wf_ex.params['env'])
self.assertEqual(states.RUNNING, wf_ex.state)
self.assertIsNone(wf_ex.state_info)
self.assertDictEqual(updated_env, wf_ex.params['env'])
# Wait for the workflow to succeed.
self.await_workflow_success(wf_ex.id)

View File

@ -238,30 +238,31 @@ class SubworkflowsTest(base.EngineTestCase):
self._await(lambda: len(db_api.get_workflow_executions()) == 2, 0.5, 5)
wf_execs = db_api.get_workflow_executions()
with db_api.transaction():
wf_execs = db_api.get_workflow_executions()
self.assertEqual(2, len(wf_execs))
self.assertEqual(2, len(wf_execs))
# Execution of 'wf2'.
wf1_ex = self._assert_single_item(wf_execs, name='wb1.wf1')
wf2_ex = self._assert_single_item(wf_execs, name='wb1.wf2')
# Execution of 'wf2'.
wf1_ex = self._assert_single_item(wf_execs, name='wb1.wf1')
wf2_ex = self._assert_single_item(wf_execs, name='wb1.wf2')
self.assertEqual(project_id, wf1_ex.project_id)
self.assertIsNotNone(wf1_ex.task_execution_id)
self.assertDictContainsSubset(
{
'task_name': 'task2',
'task_execution_id': wf1_ex.task_execution_id
},
wf1_ex.params
)
self.assertDictEqual(
{
'param1': 'Bonnie',
'param2': 'Clyde'
},
wf1_ex.input
)
self.assertEqual(project_id, wf1_ex.project_id)
self.assertIsNotNone(wf1_ex.task_execution_id)
self.assertDictContainsSubset(
{
'task_name': 'task2',
'task_execution_id': wf1_ex.task_execution_id
},
wf1_ex.params
)
self.assertDictEqual(
{
'param1': 'Bonnie',
'param2': 'Clyde'
},
wf1_ex.input
)
# Wait till workflow 'wf1' is completed.
self.await_workflow_success(wf1_ex.id)
@ -360,16 +361,17 @@ class SubworkflowsTest(base.EngineTestCase):
self._await(lambda: len(db_api.get_workflow_executions()) == 2, 0.5, 5)
wf_execs = db_api.get_workflow_executions()
with db_api.transaction():
wf_execs = db_api.get_workflow_executions()
self.assertEqual(2, len(wf_execs))
self.assertEqual(2, len(wf_execs))
# Execution of 'wf1'.
wf1_ex = self._assert_single_item(wf_execs, name='wb1.wf1')
wf2_ex = self._assert_single_item(wf_execs, name='wb1.wf2')
# Execution of 'wf1'.
wf1_ex = self._assert_single_item(wf_execs, name='wb1.wf1')
wf2_ex = self._assert_single_item(wf_execs, name='wb1.wf2')
self.assertIsNotNone(wf1_ex.task_execution_id)
self.assertDictContainsSubset({}, wf1_ex.params)
self.assertIsNotNone(wf1_ex.task_execution_id)
self.assertDictContainsSubset({}, wf1_ex.params)
# Wait till workflow 'wf1' is completed.
self.await_workflow_success(wf1_ex.id)

View File

@ -421,14 +421,14 @@ class WorkflowResumeTest(base.EngineTestCase):
task_execs = wf_ex.task_executions
task_1_ex = self._assert_single_item(task_execs, name='task1')
task_2_ex = self._assert_single_item(task_execs, name='task2')
task_1_ex = self._assert_single_item(task_execs, name='task1')
task_2_ex = self._assert_single_item(task_execs, name='task2')
self.assertEqual(states.PAUSED, wf_ex.state)
self.assertEqual(2, len(task_execs))
self.assertDictEqual(env, wf_ex.params['env'])
self.assertEqual(states.SUCCESS, task_1_ex.state)
self.assertEqual(states.IDLE, task_2_ex.state)
self.assertEqual(states.PAUSED, wf_ex.state)
self.assertEqual(2, len(task_execs))
self.assertDictEqual(env, wf_ex.params['env'])
self.assertEqual(states.SUCCESS, task_1_ex.state)
self.assertEqual(states.IDLE, task_2_ex.state)
# Update env in workflow execution with the following.
updated_env = {
@ -446,13 +446,13 @@ class WorkflowResumeTest(base.EngineTestCase):
task_execs = wf_ex.task_executions
self.assertDictEqual(updated_env, wf_ex.params['env'])
self.assertEqual(3, len(task_execs))
self.assertDictEqual(updated_env, wf_ex.params['env'])
self.assertEqual(3, len(task_execs))
# Check result of task2.
task_2_ex = self._assert_single_item(task_execs, name='task2')
# Check result of task2.
task_2_ex = self._assert_single_item(task_execs, name='task2')
self.assertEqual(states.SUCCESS, task_2_ex.state)
self.assertEqual(states.SUCCESS, task_2_ex.state)
# Re-read task execution, otherwise lazy loading of action executions
# may not work.
@ -461,15 +461,15 @@ class WorkflowResumeTest(base.EngineTestCase):
task_2_result = data_flow.get_task_execution_result(task_2_ex)
self.assertEqual(updated_env['var1'], task_2_result)
self.assertEqual(updated_env['var1'], task_2_result)
# Check result of task3.
task_3_ex = self._assert_single_item(
task_execs,
name='task3'
)
# Check result of task3.
task_3_ex = self._assert_single_item(
task_execs,
name='task3'
)
self.assertEqual(states.SUCCESS, task_3_ex.state)
self.assertEqual(states.SUCCESS, task_3_ex.state)
# Re-read task execution, otherwise lazy loading of action executions
# may not work.
@ -478,4 +478,4 @@ class WorkflowResumeTest(base.EngineTestCase):
task_3_result = data_flow.get_task_execution_result(task_3_ex)
self.assertEqual(updated_env['var2'], task_3_result)
self.assertEqual(updated_env['var2'], task_3_result)

View File

@ -185,19 +185,19 @@ class SpecificationCachingTest(base.DbTestCase):
self.assertEqual(0, spec_parser.get_wf_execution_spec_cache_size())
self.assertEqual(1, spec_parser.get_wf_definition_spec_cache_size())
wf_ex = db_api.create_workflow_execution({
'id': '1-2-3-4',
'name': 'wf',
'workflow_id': wf_def.id,
'spec': wf_spec.to_dict(),
'state': states.RUNNING
})
with db_api.transaction():
wf_ex = db_api.create_workflow_execution({
'id': '1-2-3-4',
'name': 'wf',
'workflow_id': wf_def.id,
'spec': wf_spec.to_dict(),
'state': states.RUNNING
})
# Check that we can get a valid spec by execution id.
wf_spec_by_exec_id = spec_parser.get_workflow_spec_by_execution_id(
wf_ex.id
)
# Check that we can get a valid spec by execution id.
wf_spec_by_exec_id = spec_parser.get_workflow_spec_by_execution_id(
wf_ex.id
)
self.assertEqual(1, len(wf_spec_by_exec_id.get_tasks()))

View File

@ -303,17 +303,17 @@ class WorkflowServiceTest(base.DbTestCase):
update_env
)
fetched = db_api.get_workflow_execution(created.id)
fetched = db_api.get_workflow_execution(created.id)
self.assertDictEqual(
wf_exec['params']['env'],
fetched.params['env']
)
self.assertDictEqual(
wf_exec['params']['env'],
fetched.params['env']
)
self.assertDictEqual(
wf_exec['context']['__env'],
fetched.context['__env']
)
self.assertDictEqual(
wf_exec['context']['__env'],
fetched.context['__env']
)
def test_with_long_task_name(self):
long_task_name = utils.generate_string(tasks.MAX_LENGTH_TASK_NAME + 1)