Splitting executions into different tables

* Having different types of execution objects in different
  tables will give less contention on DB tables and hence better
  performance so DB schema was changed accordingly
* Fixed all unit tests and places in the code where we assumed
  polymorphic access to execution objects
* Other minor fixes

TODO(in upcoming patches):
* DB migration script

Change-Id: Ibc8408e12dd85e143302d7fdddace32954551ac5
This commit is contained in:
Renat Akhmerov 2016-07-26 14:50:56 +07:00
parent 3f204aa061
commit c7aa89e03d
50 changed files with 753 additions and 715 deletions

View File

@ -201,45 +201,6 @@ def delete_action_definitions(**kwargs):
return IMPL.delete_action_definitions(**kwargs)
# Common executions.
def get_execution(id):
return IMPL.get_execution(id)
def load_execution(name):
"""Unlike get_execution this method is allowed to return None."""
return IMPL.load_execution(name)
def get_executions(**kwargs):
return IMPL.get_executions(**kwargs)
def ensure_execution_exists(id):
return IMPL.ensure_execution_exists(id)
def create_execution(values):
return IMPL.create_execution(values)
def update_execution(id, values):
return IMPL.update_execution(id, values)
def create_or_update_execution(id, values):
return IMPL.create_or_update_execution(id, values)
def delete_execution(id):
return IMPL.delete_execution(id)
def delete_executions(**kwargs):
IMPL.delete_executions(**kwargs)
# Action executions.
def get_action_execution(id):

View File

@ -565,80 +565,6 @@ def delete_action_definitions(**kwargs):
return _delete_all(models.ActionDefinition, **kwargs)
# Common executions.
def get_execution(id):
ex = _get_db_object_by_id(models.Execution, id)
if not ex:
raise exc.DBEntityNotFoundError(
"Execution not found [execution_id=%s]" % id
)
return ex
def load_execution(id):
return _get_db_object_by_id(models.Execution, id)
def ensure_execution_exists(id):
get_execution(id)
def get_executions(**kwargs):
return _get_executions(**kwargs)
@b.session_aware()
def create_execution(values, session=None):
ex = models.Execution()
ex.update(values.copy())
try:
ex.save(session=session)
except db_exc.DBDuplicateEntry as e:
raise exc.DBDuplicateEntryError(
"Duplicate entry for Execution: %s" % e.columns
)
return ex
@b.session_aware()
def update_execution(id, values, session=None):
ex = get_execution(id)
ex.update(values.copy())
return ex
@b.session_aware()
def create_or_update_execution(id, values, session=None):
if not _get_db_object_by_id(models.Execution, id):
return create_execution(values)
else:
return update_execution(id, values)
@b.session_aware()
def delete_execution(id, session=None):
ex = get_execution(id)
session.delete(ex)
@b.session_aware()
def delete_executions(**kwargs):
return _delete_all(models.Execution, **kwargs)
def _get_executions(**kwargs):
return _get_collection_sorted_by_time(models.Execution, **kwargs)
# Action executions.
def get_action_execution(id):

View File

@ -98,26 +98,9 @@ class ActionDefinition(Definition):
# Execution objects.
class Execution(mb.MistralSecureModelBase):
"""Abstract execution object."""
__abstract__ = True
__tablename__ = 'executions_v2'
__table_args__ = (
sa.Index('%s_project_id' % __tablename__, 'project_id'),
sa.Index('%s_scope' % __tablename__, 'scope'),
sa.Index('%s_state' % __tablename__, 'state'),
sa.Index('%s_type' % __tablename__, 'type'),
sa.Index('%s_updated_at' % __tablename__, 'updated_at'),
)
type = sa.Column(sa.String(50))
__mapper_args__ = {
'polymorphic_on': type,
'polymorphic_identity': 'execution'
}
# Main properties.
# Common properties.
id = mb.id_column()
name = sa.Column(sa.String(80))
description = sa.Column(sa.String(255), nullable=True)
@ -128,34 +111,44 @@ class Execution(mb.MistralSecureModelBase):
state_info = sa.Column(sa.Text(), nullable=True)
tags = sa.Column(st.JsonListType())
# Runtime context like iteration_no of a repeater.
# Effectively internal engine properties which will be used to determine
# execution of a task.
# Internal properties which can be used by engine.
runtime_context = sa.Column(st.JsonLongDictType())
class ActionExecution(Execution):
"""Contains action execution information."""
__mapper_args__ = {
'polymorphic_identity': 'action_execution'
}
__tablename__ = 'action_executions_v2'
__table_args__ = (
sa.Index('%s_project_id' % __tablename__, 'project_id'),
sa.Index('%s_scope' % __tablename__, 'scope'),
sa.Index('%s_state' % __tablename__, 'state'),
sa.Index('%s_updated_at' % __tablename__, 'updated_at')
)
# Main properties.
accepted = sa.Column(sa.Boolean(), default=False)
input = sa.Column(st.JsonLongDictType(), nullable=True)
output = sa.orm.deferred(sa.Column(st.JsonLongDictType(), nullable=True))
class WorkflowExecution(ActionExecution):
class WorkflowExecution(Execution):
"""Contains workflow execution information."""
__mapper_args__ = {
'polymorphic_identity': 'workflow_execution'
}
__tablename__ = 'workflow_executions_v2'
__table_args__ = (
sa.Index('%s_project_id' % __tablename__, 'project_id'),
sa.Index('%s_scope' % __tablename__, 'scope'),
sa.Index('%s_state' % __tablename__, 'state'),
sa.Index('%s_updated_at' % __tablename__, 'updated_at'),
)
# Main properties.
accepted = sa.Column(sa.Boolean(), default=False)
input = sa.Column(st.JsonLongDictType(), nullable=True)
output = sa.orm.deferred(sa.Column(st.JsonLongDictType(), nullable=True))
params = sa.Column(st.JsonLongDictType())
# TODO(rakhmerov): We need to get rid of this field at all.
@ -165,9 +158,14 @@ class WorkflowExecution(ActionExecution):
class TaskExecution(Execution):
"""Contains task runtime information."""
__mapper_args__ = {
'polymorphic_identity': 'task_execution'
}
__tablename__ = 'task_executions_v2'
__table_args__ = (
sa.Index('%s_project_id' % __tablename__, 'project_id'),
sa.Index('%s_scope' % __tablename__, 'scope'),
sa.Index('%s_state' % __tablename__, 'state'),
sa.Index('%s_updated_at' % __tablename__, 'updated_at'),
)
# Main properties.
action_spec = sa.Column(st.JsonLongDictType())
@ -181,6 +179,14 @@ class TaskExecution(Execution):
in_context = sa.Column(st.JsonLongDictType())
published = sa.Column(st.JsonLongDictType())
@property
def executions(self):
return (
self.action_executions
if self.spec.get('action')
else self.workflow_executions
)
for cls in utils.iter_subclasses(Execution):
event.listen(
@ -232,33 +238,53 @@ def register_length_validator(attr_name):
lambda t, v, o, i: validate_long_type_length(cls, attr_name, v)
)
# Many-to-one for 'Execution' and 'TaskExecution'.
# Many-to-one for 'ActionExecution' and 'TaskExecution'.
Execution.task_execution_id = sa.Column(
ActionExecution.task_execution_id = sa.Column(
sa.String(36),
sa.ForeignKey(TaskExecution.id),
sa.ForeignKey(TaskExecution.id, ondelete='CASCADE'),
nullable=True
)
TaskExecution.executions = relationship(
Execution,
TaskExecution.action_executions = relationship(
ActionExecution,
backref=backref('task_execution', remote_side=[TaskExecution.id]),
cascade='all, delete-orphan',
foreign_keys=Execution.task_execution_id,
foreign_keys=ActionExecution.task_execution_id,
lazy='select'
)
sa.Index(
'%s_task_execution_id' % Execution.__tablename__,
Execution.task_execution_id
'%s_task_execution_id' % ActionExecution.__tablename__,
'task_execution_id'
)
# Many-to-one for 'WorkflowExecution' and 'TaskExecution'.
WorkflowExecution.task_execution_id = sa.Column(
sa.String(36),
sa.ForeignKey(TaskExecution.id, ondelete='CASCADE'),
nullable=True
)
TaskExecution.workflow_executions = relationship(
WorkflowExecution,
backref=backref('task_execution', remote_side=[TaskExecution.id]),
cascade='all, delete-orphan',
foreign_keys=WorkflowExecution.task_execution_id,
lazy='select'
)
sa.Index(
'%s_task_execution_id' % WorkflowExecution.__tablename__,
'task_execution_id'
)
# Many-to-one for 'TaskExecution' and 'WorkflowExecution'.
TaskExecution.workflow_execution_id = sa.Column(
sa.String(36),
sa.ForeignKey(WorkflowExecution.id)
sa.ForeignKey(WorkflowExecution.id, ondelete='CASCADE')
)
WorkflowExecution.task_executions = relationship(

View File

@ -158,7 +158,7 @@ class Action(object):
if self.task_ex:
# Add to collection explicitly so that it's in a proper
# state within the current session.
self.task_ex.executions.append(self.action_ex)
self.task_ex.action_executions.append(self.action_ex)
def _inject_action_ctx_for_validating(self, input_dict):
if a_m.has_action_context(

View File

@ -56,7 +56,7 @@ class Engine(object):
raise NotImplementedError
@abc.abstractmethod
def on_action_complete(self, action_ex_id, result):
def on_action_complete(self, action_ex_id, result, wf_action=False):
"""Accepts action result and continues the workflow.
Action execution result here is a result which comes from an
@ -64,7 +64,11 @@ class Engine(object):
:param action_ex_id: Action execution id.
:param result: Action/workflow result. Instance of
mistral.workflow.base.Result
:return:
:param wf_action: If True it means that the given id points to
a workflow execution rather than action execution. It happens
when a nested workflow execution sends its result to a parent
workflow.
:return: Action(or workflow if wf_action=True) execution object.
"""
raise NotImplementedError

View File

@ -82,9 +82,12 @@ class DefaultEngine(base.Engine, coordination.Service):
@u.log_exec(LOG)
@profiler.trace('engine-on-action-complete')
def on_action_complete(self, action_ex_id, result):
def on_action_complete(self, action_ex_id, result, wf_action=False):
with db_api.transaction():
action_ex = db_api.get_action_execution(action_ex_id)
if wf_action:
action_ex = db_api.get_workflow_execution(action_ex_id)
else:
action_ex = db_api.get_action_execution(action_ex_id)
task_ex = action_ex.task_execution

View File

@ -291,13 +291,12 @@ class RetryPolicy(base.TaskPolicy):
"""
super(RetryPolicy, self).after_task_complete(task_ex, task_spec)
# TODO(m4dcoder): If the task_ex.executions collection is not called,
# TODO(m4dcoder): If the task_ex.action_executions and
# task_ex.workflow_executions collection are not called,
# then the retry_no in the runtime_context of the task_ex will not
# be updated accurately. To be exact, the retry_no will be one
# iteration behind. task_ex.executions was originally called in
# get_task_execution_result but it was refactored to use
# db_api.get_action_executions to support session-less use cases.
action_ex = task_ex.executions # noqa
# iteration behind.
ex = task_ex.executions # noqa
context_key = 'retry_task_policy'

View File

@ -168,13 +168,14 @@ class EngineServer(object):
return self._engine.on_task_state_change(task_ex_id, state, state_info)
def on_action_complete(self, rpc_ctx, action_ex_id, result_data,
result_error):
result_error, wf_action):
"""Receives RPC calls to communicate action result to engine.
:param rpc_ctx: RPC request context.
:param action_ex_id: Action execution id.
:param result_data: Action result data.
:param result_error: Action result error.
:param wf_action: True if given id points to a workflow execution.
:return: Action execution.
"""
@ -185,7 +186,7 @@ class EngineServer(object):
" action_ex_id=%s, result=%s]" % (rpc_ctx, action_ex_id, result)
)
return self._engine.on_action_complete(action_ex_id, result)
return self._engine.on_action_complete(action_ex_id, result, wf_action)
def pause_workflow(self, rpc_ctx, execution_id):
"""Receives calls over RPC to pause workflows on engine.
@ -358,7 +359,7 @@ class EngineClient(base.Engine):
)
@wrap_messaging_exception
def on_action_complete(self, action_ex_id, result):
def on_action_complete(self, action_ex_id, result, wf_action=False):
"""Conveys action result to Mistral Engine.
This method should be used by clients of Mistral Engine to update
@ -372,7 +373,11 @@ class EngineClient(base.Engine):
:param action_ex_id: Action execution id.
:param result: Action execution result.
:return: Task.
:param wf_action: If True it means that the given id points to
a workflow execution rather than action execution. It happens
when a nested workflow execution sends its result to a parent
workflow.
:return: Action(or workflow if wf_action=True) execution object.
"""
return self._client.sync_call(
@ -380,7 +385,8 @@ class EngineClient(base.Engine):
'on_action_complete',
action_ex_id=action_ex_id,
result_data=result.data,
result_error=result.error
result_error=result.error,
wf_action=wf_action
)
@wrap_messaging_exception

View File

@ -90,7 +90,6 @@ def on_action_complete(action_ex):
try:
task.on_action_complete(action_ex)
except exc.MistralException as e:
task_ex = action_ex.task_execution
wf_ex = task_ex.workflow_execution
msg = ("Failed to handle action completion [wf=%s, task=%s,"

View File

@ -142,7 +142,7 @@ class Task(object):
if not self.task_spec.get_keep_result():
# Destroy task result.
for ex in self.task_ex.executions:
for ex in self.task_ex.action_executions:
if hasattr(ex, 'output'):
ex.output = {}
@ -291,16 +291,15 @@ class RegularTask(Task):
# Reset state of processed task and related action executions.
if self.reset_flag:
action_exs = self.task_ex.executions
execs = self.task_ex.executions
else:
action_exs = db_api.get_action_executions(
task_execution_id=self.task_ex.id,
state=states.ERROR,
accepted=True
execs = filter(
lambda e: e.accepted and e.state == states.ERROR,
self.task_ex.executions
)
for action_ex in action_exs:
action_ex.accepted = False
for ex in execs:
ex.accepted = False
def _schedule_actions(self):
# Regular task schedules just one action.

View File

@ -369,31 +369,33 @@ def _send_result_to_parent_workflow(wf_ex_id):
wf_ex = db_api.get_workflow_execution(wf_ex_id)
if wf_ex.state == states.SUCCESS:
rpc.get_engine_client().on_action_complete(
wf_ex.id,
wf_utils.Result(data=wf_ex.output)
)
result = wf_utils.Result(data=wf_ex.output)
elif wf_ex.state == states.ERROR:
err_msg = (
wf_ex.state_info or
'Failed subworkflow [execution_id=%s]' % wf_ex.id
)
rpc.get_engine_client().on_action_complete(
wf_ex.id,
wf_utils.Result(error=err_msg)
)
result = wf_utils.Result(error=err_msg)
elif wf_ex.state == states.CANCELLED:
err_msg = (
wf_ex.state_info or
'Cancelled subworkflow [execution_id=%s]' % wf_ex.id
)
rpc.get_engine_client().on_action_complete(
wf_ex.id,
wf_utils.Result(error=err_msg, cancel=True)
result = wf_utils.Result(error=err_msg, cancel=True)
else:
raise RuntimeError(
"Method _send_result_to_parent_workflow() must never be called"
" if a workflow is not in SUCCESS, ERROR or CNCELLED state."
)
rpc.get_engine_client().on_action_complete(
wf_ex.id,
result,
wf_action=True
)
def _build_fail_info_message(wf_ctrl, wf_ex):
# Try to find where error is exactly.
@ -411,7 +413,7 @@ def _build_fail_info_message(wf_ctrl, wf_ex):
for t in failed_tasks:
msg += '\n %s [task_ex_id=%s] -> %s\n' % (t.name, t.id, t.state_info)
for i, ex in enumerate(t.executions):
for i, ex in enumerate(t.action_executions):
if ex.state == states.ERROR:
output = (ex.output or dict()).get('result', 'Unknown')
msg += (
@ -422,6 +424,17 @@ def _build_fail_info_message(wf_ctrl, wf_ex):
)
)
for i, ex in enumerate(t.workflow_executions):
if ex.state == states.ERROR:
output = (ex.output or dict()).get('result', 'Unknown')
msg += (
' [wf_ex_id=%s, idx=%s]: %s\n' % (
ex.id,
i,
str(output)
)
)
return msg

View File

@ -29,7 +29,7 @@ import testtools.matchers as ttm
from mistral import context as auth_context
from mistral.db.sqlalchemy import base as db_sa_base
from mistral.db.sqlalchemy import sqlite_lock
from mistral.db.v2 import api as db_api_v2
from mistral.db.v2 import api as db_api
from mistral.services import action_manager
from mistral.services import security
from mistral.tests.unit import config as test_config
@ -229,7 +229,7 @@ class DbTestCase(BaseTest):
cfg.CONF.set_default('max_overflow', -1, group='database')
cfg.CONF.set_default('max_pool_size', 1000, group='database')
db_api_v2.setup_db()
db_api.setup_db()
action_manager.sync_db()
@ -244,14 +244,16 @@ class DbTestCase(BaseTest):
with mock.patch('mistral.services.security.get_project_id',
new=mock.MagicMock(return_value=ctx.project_id)):
with db_api_v2.transaction():
db_api_v2.delete_event_triggers()
db_api_v2.delete_executions()
db_api_v2.delete_workbooks()
db_api_v2.delete_cron_triggers()
db_api_v2.delete_workflow_definitions()
db_api_v2.delete_environments()
db_api_v2.delete_resource_members()
with db_api.transaction():
db_api.delete_event_triggers()
db_api.delete_cron_triggers()
db_api.delete_workflow_executions()
db_api.delete_task_executions()
db_api.delete_action_executions()
db_api.delete_workbooks()
db_api.delete_workflow_definitions()
db_api.delete_environments()
db_api.delete_resource_members()
sqlite_lock.cleanup()

View File

@ -755,25 +755,26 @@ class ActionExecutionTest(SQLAlchemyTest):
self.assertIsNone(db_api.load_action_execution("not-existing-id"))
def test_update_action_execution(self):
created = db_api.create_action_execution(ACTION_EXECS[0])
with db_api.transaction():
created = db_api.create_action_execution(ACTION_EXECS[0])
self.assertIsNone(created.updated_at)
self.assertIsNone(created.updated_at)
updated = db_api.update_execution(
created.id,
{'state': 'RUNNING', 'state_info': "Running..."}
)
updated = db_api.update_action_execution(
created.id,
{'state': 'RUNNING', 'state_info': "Running..."}
)
self.assertEqual('RUNNING', updated.state)
self.assertEqual(
'RUNNING',
db_api.load_action_execution(updated.id).state
)
self.assertEqual('RUNNING', updated.state)
self.assertEqual(
'RUNNING',
db_api.load_action_execution(updated.id).state
)
fetched = db_api.get_action_execution(created.id)
fetched = db_api.get_action_execution(created.id)
self.assertEqual(updated, fetched)
self.assertIsNotNone(fetched.updated_at)
self.assertEqual(updated, fetched)
self.assertIsNotNone(fetched.updated_at)
def test_create_or_update_action_execution(self):
id = 'not-existing-id'
@ -785,20 +786,21 @@ class ActionExecutionTest(SQLAlchemyTest):
self.assertIsNotNone(created)
self.assertIsNotNone(created.id)
updated = db_api.create_or_update_action_execution(
created.id,
{'state': 'RUNNING'}
)
with db_api.transaction():
updated = db_api.create_or_update_action_execution(
created.id,
{'state': 'RUNNING'}
)
self.assertEqual('RUNNING', updated.state)
self.assertEqual(
'RUNNING',
db_api.load_action_execution(updated.id).state
)
self.assertEqual('RUNNING', updated.state)
self.assertEqual(
'RUNNING',
db_api.load_action_execution(updated.id).state
)
fetched = db_api.get_action_execution(created.id)
fetched = db_api.get_action_execution(created.id)
self.assertEqual(updated, fetched)
self.assertEqual(updated, fetched)
def test_get_action_executions(self):
created0 = db_api.create_action_execution(WF_EXECS[0])
@ -909,50 +911,55 @@ class WorkflowExecutionTest(SQLAlchemyTest):
self.assertIsNone(db_api.load_workflow_execution("not-existing-id"))
def test_update_workflow_execution(self):
created = db_api.create_workflow_execution(WF_EXECS[0])
with db_api.transaction():
created = db_api.create_workflow_execution(WF_EXECS[0])
self.assertIsNone(created.updated_at)
self.assertIsNone(created.updated_at)
updated = db_api.update_execution(
created.id,
{'state': 'RUNNING', 'state_info': "Running..."}
)
updated = db_api.update_workflow_execution(
created.id,
{'state': 'RUNNING', 'state_info': "Running..."}
)
self.assertEqual('RUNNING', updated.state)
self.assertEqual(
'RUNNING',
db_api.load_workflow_execution(updated.id).state
)
self.assertEqual('RUNNING', updated.state)
self.assertEqual(
'RUNNING',
db_api.load_workflow_execution(updated.id).state
)
fetched = db_api.get_workflow_execution(created.id)
fetched = db_api.get_workflow_execution(created.id)
self.assertEqual(updated, fetched)
self.assertIsNotNone(fetched.updated_at)
self.assertEqual(updated, fetched)
self.assertIsNotNone(fetched.updated_at)
def test_create_or_update_workflow_execution(self):
id = 'not-existing-id'
self.assertIsNone(db_api.load_workflow_execution(id))
created = db_api.create_or_update_workflow_execution(id, WF_EXECS[0])
with db_api.transaction():
created = db_api.create_or_update_workflow_execution(
id,
WF_EXECS[0]
)
self.assertIsNotNone(created)
self.assertIsNotNone(created.id)
self.assertIsNotNone(created)
self.assertIsNotNone(created.id)
updated = db_api.create_or_update_workflow_execution(
created.id,
{'state': 'RUNNING'}
)
updated = db_api.create_or_update_workflow_execution(
created.id,
{'state': 'RUNNING'}
)
self.assertEqual('RUNNING', updated.state)
self.assertEqual(
'RUNNING',
db_api.load_workflow_execution(updated.id).state
)
self.assertEqual('RUNNING', updated.state)
self.assertEqual(
'RUNNING',
db_api.load_workflow_execution(updated.id).state
)
fetched = db_api.get_workflow_execution(created.id)
fetched = db_api.get_workflow_execution(created.id)
self.assertEqual(updated, fetched)
self.assertEqual(updated, fetched)
def test_get_workflow_executions(self):
created0 = db_api.create_workflow_execution(WF_EXECS[0])
@ -991,7 +998,7 @@ class WorkflowExecutionTest(SQLAlchemyTest):
)
self.assertEqual('FAILED', updated.state)
state_info = db_api.load_execution(updated.id).state_info
state_info = db_api.load_workflow_execution(updated.id).state_info
self.assertEqual(
65535,
len(state_info)
@ -1020,8 +1027,6 @@ class WorkflowExecutionTest(SQLAlchemyTest):
self.assertEqual(TASK_EXECS[0]['name'], task_ex.name)
# Make sure that polymorphic load works correctly.
self.assertEqual(2, len(db_api.get_executions()))
self.assertEqual(1, len(db_api.get_workflow_executions()))
self.assertEqual(1, len(db_api.get_task_executions()))
@ -1107,35 +1112,40 @@ class TaskExecutionTest(SQLAlchemyTest):
task = db_api.create_task_execution(values)
self.assertEqual(0, len(task.executions))
self.assertEqual(0, len(task.action_executions))
self.assertEqual(0, len(task.workflow_executions))
a_ex1 = db_models.ActionExecution()
a_ex2 = db_models.ActionExecution()
task.executions.append(a_ex1)
task.executions.append(a_ex2)
task.action_executions.append(a_ex1)
task.action_executions.append(a_ex2)
self.assertEqual(2, len(task.executions))
self.assertEqual(2, len(task.action_executions))
self.assertEqual(0, len(task.workflow_executions))
# Make sure associated objects were saved.
with db_api.transaction():
task = db_api.get_task_execution(task.id)
self.assertEqual(2, len(task.executions))
self.assertEqual(2, len(task.action_executions))
self.assertNotIsInstance(task.executions[0].task_execution, list)
self.assertNotIsInstance(
task.action_executions[0].task_execution,
list
)
# Remove associated objects from collection.
with db_api.transaction():
task = db_api.get_task_execution(task.id)
del task.executions[:]
del task.action_executions[:]
# Make sure associated objects were deleted.
with db_api.transaction():
task = db_api.get_task_execution(task.id)
self.assertEqual(0, len(task.executions))
self.assertEqual(0, len(task.action_executions))
def test_update_task_execution(self):
wf_ex = db_api.create_workflow_execution(WF_EXECS[0])

View File

@ -156,11 +156,9 @@ class EngineTestCase(base.DbTestCase):
t.published)
)
a_execs = db_api.get_action_executions(
task_execution_id=t.id
)
child_execs = t.executions
for a in a_execs:
for a in child_execs:
print(
"\t\t%s [id=%s, state=%s, state_info=%s,"
" accepted=%s, output=%s]" %
@ -174,9 +172,9 @@ class EngineTestCase(base.DbTestCase):
print("\nPrinting standalone action executions...")
a_execs = db_api.get_action_executions(task_execution_id=None)
child_execs = db_api.get_action_executions(task_execution_id=None)
for a in a_execs:
for a in child_execs:
print(
"\t\t%s [id=%s, state=%s, state_info=%s, accepted=%s,"
" output=%s]" %
@ -188,110 +186,103 @@ class EngineTestCase(base.DbTestCase):
a.output)
)
# Various methods for abstract execution objects.
def is_execution_in_state(self, ex_id, state):
return db_api.get_execution(ex_id).state == state
def is_execution_success(self, ex_id):
return self.is_execution_in_state(ex_id, states.SUCCESS)
def is_execution_error(self, ex_id):
return self.is_execution_in_state(ex_id, states.ERROR)
def is_execution_paused(self, ex_id):
return self.is_execution_in_state(ex_id, states.PAUSED)
def await_execution_state(self, ex_id, state, delay=DEFAULT_DELAY,
timeout=DEFAULT_TIMEOUT):
self._await(
lambda: self.is_execution_in_state(ex_id, state), delay, timeout
)
def await_execution_success(self, ex_id, delay=DEFAULT_DELAY,
timeout=DEFAULT_TIMEOUT):
self.await_execution_state(ex_id, states.SUCCESS, delay, timeout)
def await_execution_error(self, ex_id, delay=DEFAULT_DELAY,
timeout=DEFAULT_TIMEOUT):
self.await_execution_state(ex_id, states.ERROR, delay, timeout)
def await_execution_paused(self, ex_id, delay=DEFAULT_DELAY,
timeout=DEFAULT_TIMEOUT):
self.await_execution_state(ex_id, states.PAUSED, delay, timeout)
def await_execution_cancelled(self, ex_id, delay=DEFAULT_DELAY,
timeout=DEFAULT_TIMEOUT):
self.await_execution_state(ex_id, states.CANCELLED, delay, timeout)
# Various methods for action execution objects.
def is_action_success(self, a_ex_id):
return self.is_execution_in_state(a_ex_id, states.SUCCESS)
def is_action_in_state(self, ex_id, state):
return db_api.get_action_execution(ex_id).state == state
def is_action_error(self, a_ex_id):
return self.is_execution_in_state(a_ex_id, states.ERROR)
def await_action_state(self, a_ex_id, state, delay=DEFAULT_DELAY,
def await_action_state(self, ex_id, state, delay=DEFAULT_DELAY,
timeout=DEFAULT_TIMEOUT):
self.await_execution_state(a_ex_id, state, delay, timeout)
self._await(
lambda: self.is_action_in_state(ex_id, state),
delay,
timeout
)
def await_action_success(self, t_ex_id, delay=DEFAULT_DELAY,
def is_action_success(self, ex_id):
return self.is_action_in_state(ex_id, states.SUCCESS)
def is_action_error(self, ex_id):
return self.is_action_in_state(ex_id, states.ERROR)
def await_action_success(self, ex_id, delay=DEFAULT_DELAY,
timeout=DEFAULT_TIMEOUT):
self.await_execution_success(t_ex_id, delay, timeout)
self.await_action_state(ex_id, states.SUCCESS, delay, timeout)
def await_action_error(self, t_ex_id, delay=DEFAULT_DELAY,
def await_action_error(self, ex_id, delay=DEFAULT_DELAY,
timeout=DEFAULT_TIMEOUT):
self.await_execution_error(t_ex_id, delay, timeout)
self.await_action_state(ex_id, states.ERROR, delay, timeout)
# Various methods for task execution objects.
def is_task_in_state(self, ex_id, state):
return db_api.get_task_execution(ex_id).state == state
def await_task_state(self, ex_id, state, delay=DEFAULT_DELAY,
timeout=DEFAULT_TIMEOUT):
self._await(
lambda: self.is_task_in_state(ex_id, state),
delay,
timeout
)
def is_task_success(self, task_ex_id):
return self.is_execution_in_state(task_ex_id, states.SUCCESS)
return self.is_task_in_state(task_ex_id, states.SUCCESS)
def is_task_error(self, task_ex_id):
return self.is_execution_in_state(task_ex_id, states.ERROR)
return self.is_task_in_state(task_ex_id, states.ERROR)
def is_task_delayed(self, task_ex_id):
return self.is_execution_in_state(task_ex_id, states.RUNNING_DELAYED)
return self.is_task_in_state(task_ex_id, states.RUNNING_DELAYED)
def is_task_processed(self, task_ex_id):
return db_api.get_task_execution(task_ex_id).processed
def await_task_state(self, t_ex_id, state, delay=DEFAULT_DELAY,
timeout=DEFAULT_TIMEOUT):
self.await_execution_state(t_ex_id, state, delay, timeout)
def await_task_success(self, t_ex_id, delay=DEFAULT_DELAY,
def await_task_success(self, ex_id, delay=DEFAULT_DELAY,
timeout=DEFAULT_TIMEOUT):
self.await_execution_success(t_ex_id, delay, timeout)
self.await_task_state(ex_id, states.SUCCESS, delay, timeout)
def await_task_error(self, t_ex_id, delay=DEFAULT_DELAY,
def await_task_error(self, ex_id, delay=DEFAULT_DELAY,
timeout=DEFAULT_TIMEOUT):
self.await_execution_error(t_ex_id, delay, timeout)
self.await_task_state(ex_id, states.ERROR, delay, timeout)
def await_task_delayed(self, t_ex_id, delay=DEFAULT_DELAY,
timeout=DEFAULT_TIMEOUT):
self.await_task_state(t_ex_id, states.RUNNING_DELAYED, delay, timeout)
def await_task_processed(self, t_ex_id, delay=DEFAULT_DELAY,
def await_task_cancelled(self, ex_id, delay=DEFAULT_DELAY,
timeout=DEFAULT_TIMEOUT):
self._await(lambda: self.is_task_processed(t_ex_id), delay, timeout)
self.await_task_state(ex_id, states.CANCELLED, delay, timeout)
def await_task_delayed(self, ex_id, delay=DEFAULT_DELAY,
timeout=DEFAULT_TIMEOUT):
self.await_task_state(ex_id, states.RUNNING_DELAYED, delay, timeout)
def await_task_processed(self, ex_id, delay=DEFAULT_DELAY,
timeout=DEFAULT_TIMEOUT):
self._await(lambda: self.is_task_processed(ex_id), delay, timeout)
# Various methods for workflow execution objects.
def is_workflow_in_state(self, ex_id, state):
return db_api.get_workflow_execution(ex_id).state == state
def await_workflow_state(self, ex_id, state, delay=DEFAULT_DELAY,
timeout=DEFAULT_TIMEOUT):
self.await_execution_state(ex_id, state, delay, timeout)
self._await(
lambda: self.is_workflow_in_state(ex_id, state),
delay,
timeout
)
def await_workflow_success(self, ex_id, delay=DEFAULT_DELAY,
timeout=DEFAULT_TIMEOUT):
self.await_execution_success(ex_id, delay, timeout)
self.await_workflow_state(ex_id, states.SUCCESS, delay, timeout)
def await_workflow_error(self, ex_id, delay=DEFAULT_DELAY,
timeout=DEFAULT_TIMEOUT):
self.await_execution_error(ex_id, delay, timeout)
self.await_workflow_state(ex_id, states.ERROR, delay, timeout)
def await_workflow_paused(self, ex_id, delay=DEFAULT_DELAY,
timeout=DEFAULT_TIMEOUT):
self.await_execution_paused(ex_id, delay, timeout)
self.await_workflow_state(ex_id, states.PAUSED, delay, timeout)
def await_workflow_cancelled(self, ex_id, delay=DEFAULT_DELAY,
timeout=DEFAULT_TIMEOUT):
self.await_workflow_state(ex_id, states.CANCELLED, delay, timeout)

View File

@ -62,7 +62,7 @@ class ActionContextTest(base.EngineTestCase):
wf_ex = self.engine.start_workflow('wb.wf1', {})
self.await_execution_success(wf_ex.id)
self.await_workflow_success(wf_ex.id)
wf_ex = db_api.get_workflow_execution(wf_ex.id)

View File

@ -107,7 +107,7 @@ class ActionDefaultTest(base.EngineTestCase):
wf_ex = self.engine.start_workflow('wf1', None, env=ENV)
self.await_execution_success(wf_ex.id)
self.await_workflow_success(wf_ex.id)
wf_ex = db_api.get_workflow_execution(wf_ex.id)
@ -132,7 +132,7 @@ class ActionDefaultTest(base.EngineTestCase):
wf_ex = self.engine.start_workflow('wf2', None, env=ENV)
self.await_execution_success(wf_ex.id)
self.await_workflow_success(wf_ex.id)
wf_ex = db_api.get_workflow_execution(wf_ex.id)
@ -169,7 +169,7 @@ class ActionDefaultTest(base.EngineTestCase):
env=ENV
)
self.await_execution_success(wf_ex.id)
self.await_workflow_success(wf_ex.id)
wf_ex = db_api.get_workflow_execution(wf_ex.id)
@ -207,7 +207,7 @@ class ActionDefaultTest(base.EngineTestCase):
env=ENV
)
self.await_execution_success(wf_ex.id)
self.await_workflow_success(wf_ex.id)
wf_ex = db_api.get_workflow_execution(wf_ex.id)

View File

@ -96,7 +96,7 @@ class AdhocActionsTest(base.EngineTestCase):
{'str1': 'a', 'str2': 'b'}
)
self.await_execution_success(wf_ex.id)
self.await_workflow_success(wf_ex.id)
wf_ex = db_api.get_workflow_execution(wf_ex.id)
@ -116,7 +116,7 @@ class AdhocActionsTest(base.EngineTestCase):
{'str1': 'a', 'str2': 'b'}
)
self.await_execution_success(wf_ex.id)
self.await_workflow_success(wf_ex.id)
wf_ex = db_api.get_workflow_execution(wf_ex.id)

View File

@ -60,7 +60,7 @@ class SimpleEngineCommandsTest(base.EngineTestCase):
def test_fail(self):
wf_ex = self.engine.start_workflow('my_wb.wf', {'my_var': 1})
self.await_execution_error(wf_ex.id)
self.await_workflow_error(wf_ex.id)
wf_ex = db_api.get_workflow_execution(wf_ex.id)
@ -74,7 +74,7 @@ class SimpleEngineCommandsTest(base.EngineTestCase):
def test_succeed(self):
wf_ex = self.engine.start_workflow('my_wb.wf', {'my_var': 2})
self.await_execution_success(wf_ex.id)
self.await_workflow_success(wf_ex.id)
wf_ex = db_api.get_workflow_execution(wf_ex.id)
@ -88,7 +88,7 @@ class SimpleEngineCommandsTest(base.EngineTestCase):
def test_pause(self):
wf_ex = self.engine.start_workflow('my_wb.wf', {'my_var': 3})
self.await_execution_paused(wf_ex.id)
self.await_workflow_paused(wf_ex.id)
wf_ex = db_api.get_workflow_execution(wf_ex.id)
@ -137,7 +137,7 @@ class SimpleEngineWorkflowLevelCommandsTest(base.EngineTestCase):
def test_fail(self):
wf_ex = self.engine.start_workflow('my_wb.wf', {'my_var': 1})
self.await_execution_error(wf_ex.id)
self.await_workflow_error(wf_ex.id)
wf_ex = db_api.get_workflow_execution(wf_ex.id)
@ -151,7 +151,7 @@ class SimpleEngineWorkflowLevelCommandsTest(base.EngineTestCase):
def test_succeed(self):
wf_ex = self.engine.start_workflow('my_wb.wf', {'my_var': 2})
self.await_execution_success(wf_ex.id)
self.await_workflow_success(wf_ex.id)
wf_ex = db_api.get_workflow_execution(wf_ex.id)
@ -165,7 +165,7 @@ class SimpleEngineWorkflowLevelCommandsTest(base.EngineTestCase):
def test_pause(self):
wf_ex = self.engine.start_workflow('my_wb.wf', {'my_var': 3})
self.await_execution_paused(wf_ex.id)
self.await_workflow_paused(wf_ex.id)
wf_ex = db_api.get_workflow_execution(wf_ex.id)
@ -247,7 +247,7 @@ class OrderEngineCommandsTest(base.EngineTestCase):
def test_fail_first(self):
wf_ex = self.engine.start_workflow('my_wb.fail_first_wf', None)
self.await_execution_error(wf_ex.id)
self.await_workflow_error(wf_ex.id)
wf_ex = db_api.get_workflow_execution(wf_ex.id)
@ -261,7 +261,7 @@ class OrderEngineCommandsTest(base.EngineTestCase):
def test_fail_second(self):
wf_ex = self.engine.start_workflow('my_wb.fail_second_wf', None)
self.await_execution_error(wf_ex.id)
self.await_workflow_error(wf_ex.id)
wf_ex = db_api.get_workflow_execution(wf_ex.id)
@ -277,12 +277,12 @@ class OrderEngineCommandsTest(base.EngineTestCase):
)
self.await_task_success(task2_db.id)
self.await_execution_error(wf_ex.id)
self.await_workflow_error(wf_ex.id)
def test_succeed_first(self):
wf_ex = self.engine.start_workflow('my_wb.succeed_first_wf', None)
self.await_execution_success(wf_ex.id)
self.await_workflow_success(wf_ex.id)
wf_ex = db_api.get_workflow_execution(wf_ex.id)
@ -296,7 +296,7 @@ class OrderEngineCommandsTest(base.EngineTestCase):
def test_succeed_second(self):
wf_ex = self.engine.start_workflow('my_wb.succeed_second_wf', None)
self.await_execution_success(wf_ex.id)
self.await_workflow_success(wf_ex.id)
wf_ex = db_api.get_workflow_execution(wf_ex.id)
@ -312,7 +312,7 @@ class OrderEngineCommandsTest(base.EngineTestCase):
)
self.await_task_error(task2_db.id)
self.await_execution_success(wf_ex.id)
self.await_workflow_success(wf_ex.id)
WORKBOOK4 = """
---
@ -349,7 +349,7 @@ class SimpleEngineCmdsWithMsgTest(base.EngineTestCase):
def test_fail(self):
wf_ex = self.engine.start_workflow('my_wb.wf', {'my_var': 1})
self.await_execution_error(wf_ex.id)
self.await_workflow_error(wf_ex.id)
wf_ex = db_api.get_workflow_execution(wf_ex.id)
@ -365,7 +365,7 @@ class SimpleEngineCmdsWithMsgTest(base.EngineTestCase):
def test_succeed(self):
wf_ex = self.engine.start_workflow('my_wb.wf', {'my_var': 2})
self.await_execution_success(wf_ex.id)
self.await_workflow_success(wf_ex.id)
wf_ex = db_api.get_workflow_execution(wf_ex.id)
@ -381,7 +381,7 @@ class SimpleEngineCmdsWithMsgTest(base.EngineTestCase):
def test_pause(self):
wf_ex = self.engine.start_workflow('my_wb.wf', {'my_var': 3})
self.await_execution_paused(wf_ex.id)
self.await_workflow_paused(wf_ex.id)
wf_ex = db_api.get_workflow_execution(wf_ex.id)
@ -431,7 +431,7 @@ class SimpleEngineWorkflowLevelCmdsWithMsgTest(base.EngineTestCase):
def test_fail(self):
wf_ex = self.engine.start_workflow('my_wb.wf', {'my_var': 1})
self.await_execution_error(wf_ex.id)
self.await_workflow_error(wf_ex.id)
wf_ex = db_api.get_workflow_execution(wf_ex.id)
@ -447,7 +447,7 @@ class SimpleEngineWorkflowLevelCmdsWithMsgTest(base.EngineTestCase):
def test_succeed(self):
wf_ex = self.engine.start_workflow('my_wb.wf', {'my_var': 2})
self.await_execution_success(wf_ex.id)
self.await_workflow_success(wf_ex.id)
wf_ex = db_api.get_workflow_execution(wf_ex.id)
@ -463,7 +463,7 @@ class SimpleEngineWorkflowLevelCmdsWithMsgTest(base.EngineTestCase):
def test_pause(self):
wf_ex = self.engine.start_workflow('my_wb.wf', {'my_var': 3})
self.await_execution_paused(wf_ex.id)
self.await_workflow_paused(wf_ex.id)
wf_ex = db_api.get_workflow_execution(wf_ex.id)

View File

@ -13,8 +13,6 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import mock
from oslo_config import cfg
from mistral.db.v2 import api as db_api
@ -64,7 +62,7 @@ class DataFlowEngineTest(engine_test_base.EngineTestCase):
# Start workflow.
wf_ex = self.engine.start_workflow('wf', {}, env={'from': 'Neo'})
self.await_execution_success(wf_ex.id)
self.await_workflow_success(wf_ex.id)
# Note: We need to reread execution to access related tasks.
wf_ex = db_api.get_workflow_execution(wf_ex.id)
@ -129,7 +127,7 @@ class DataFlowEngineTest(engine_test_base.EngineTestCase):
# Start workflow.
wf_ex = self.engine.start_workflow('wf', {}, env={'from': 'Neo'})
self.await_execution_success(wf_ex.id)
self.await_workflow_success(wf_ex.id)
# Note: We need to reread execution to access related tasks.
wf_ex = db_api.get_workflow_execution(wf_ex.id)
@ -193,7 +191,7 @@ class DataFlowEngineTest(engine_test_base.EngineTestCase):
# Start workflow.
wf_ex = self.engine.start_workflow('wf', {})
self.await_execution_success(wf_ex.id)
self.await_workflow_success(wf_ex.id)
# Note: We need to reread execution to access related tasks.
wf_ex = db_api.get_workflow_execution(wf_ex.id)
@ -269,7 +267,7 @@ class DataFlowEngineTest(engine_test_base.EngineTestCase):
# Start workflow.
wf_ex = self.engine.start_workflow('wf', {})
self.await_execution_success(wf_ex.id)
self.await_workflow_success(wf_ex.id)
# Note: We need to reread execution to access related tasks.
wf_ex = db_api.get_workflow_execution(wf_ex.id)
@ -350,7 +348,7 @@ class DataFlowEngineTest(engine_test_base.EngineTestCase):
env={'from': 'Neo'}
)
self.await_execution_success(wf_ex.id)
self.await_workflow_success(wf_ex.id)
# Note: We need to reread execution to access related tasks.
wf_ex = db_api.get_workflow_execution(wf_ex.id)
@ -409,7 +407,7 @@ class DataFlowEngineTest(engine_test_base.EngineTestCase):
# Start workflow.
wf_ex = self.engine.start_workflow('wf', {})
self.await_execution_success(wf_ex.id)
self.await_workflow_success(wf_ex.id)
# Note: We need to reread execution to access related tasks.
wf_ex = db_api.get_workflow_execution(wf_ex.id)
@ -442,7 +440,7 @@ class DataFlowEngineTest(engine_test_base.EngineTestCase):
# Start workflow.
wf_ex = self.engine.start_workflow('wf', {})
self.await_execution_success(wf_ex.id)
self.await_workflow_success(wf_ex.id)
# Note: We need to reread execution to access related tasks.
wf_ex = db_api.get_workflow_execution(wf_ex.id)
@ -481,7 +479,7 @@ class DataFlowEngineTest(engine_test_base.EngineTestCase):
# Start workflow.
wf_ex = self.engine.start_workflow('wf1_with_items', {})
self.await_execution_success(wf_ex.id)
self.await_workflow_success(wf_ex.id)
# Note: We need to reread execution to access related tasks.
wf_ex = db_api.get_workflow_execution(wf_ex.id)
@ -501,41 +499,38 @@ class DataFlowTest(test_base.BaseTest):
"version": '2.0',
'name': 'task1',
'with-items': 'var in [1]',
'type': 'direct'
'type': 'direct',
'action': 'my_action'
},
runtime_context={
'with_items_context': {'count': 1}
}
)
action_exs = [models.ActionExecution(
task_ex.action_executions = [models.ActionExecution(
name='my_action',
output={'result': 1},
accepted=True,
runtime_context={'index': 0}
)]
with mock.patch.object(db_api, 'get_action_executions',
return_value=action_exs):
self.assertEqual([1], data_flow.get_task_execution_result(task_ex))
self.assertEqual([1], data_flow.get_task_execution_result(task_ex))
action_exs.append(models.ActionExecution(
task_ex.action_executions.append(models.ActionExecution(
name='my_action',
output={'result': 1},
accepted=True,
runtime_context={'index': 0}
))
action_exs.append(models.ActionExecution(
task_ex.action_executions.append(models.ActionExecution(
name='my_action',
output={'result': 1},
accepted=False,
runtime_context={'index': 0}
))
with mock.patch.object(db_api, 'get_action_executions',
return_value=action_exs):
self.assertEqual(
[1, 1],
data_flow.get_task_execution_result(task_ex)
)
self.assertEqual(
[1, 1],
data_flow.get_task_execution_result(task_ex)
)

View File

@ -408,12 +408,12 @@ class DefaultEngineTest(base.DbTestCase):
wf_ex = self.engine.start_workflow(
'wb.wf', {'param1': 'Hey', 'param2': 'Hi'}, task_name="task2")
# Re-read execution to access related tasks.
wf_ex = db_api.get_execution(wf_ex.id)
wf_ex = db_api.get_workflow_execution(wf_ex.id)
self.engine.stop_workflow(wf_ex.id, 'ERROR', "Stop this!")
# Re-read from DB again
wf_ex = db_api.get_execution(wf_ex.id)
wf_ex = db_api.get_workflow_execution(wf_ex.id)
self.assertEqual('ERROR', wf_ex.state)
self.assertEqual("Stop this!", wf_ex.state_info)
@ -423,12 +423,12 @@ class DefaultEngineTest(base.DbTestCase):
wf_ex = self.engine.start_workflow(
'wb.wf', {'param1': 'Hey', 'param2': 'Hi'}, task_name="task2")
# Re-read execution to access related tasks.
wf_ex = db_api.get_execution(wf_ex.id)
wf_ex = db_api.get_workflow_execution(wf_ex.id)
self.engine.stop_workflow(wf_ex.id, 'SUCCESS', "Like this, done")
# Re-read from DB again
wf_ex = db_api.get_execution(wf_ex.id)
wf_ex = db_api.get_workflow_execution(wf_ex.id)
self.assertEqual('SUCCESS', wf_ex.state)
self.assertEqual("Like this, done", wf_ex.state_info)
@ -437,7 +437,7 @@ class DefaultEngineTest(base.DbTestCase):
wf_ex = self.engine.start_workflow(
'wb.wf', {'param1': 'Hey', 'param2': 'Hi'}, task_name="task2")
# Re-read execution to access related tasks.
wf_ex = db_api.get_execution(wf_ex.id)
wf_ex = db_api.get_workflow_execution(wf_ex.id)
self.assertNotEqual(
'PAUSE',

View File

@ -34,7 +34,7 @@ class DirectWorkflowEngineTest(base.EngineTestCase):
wf_ex = self.engine.start_workflow('wf', {})
self.await_execution_state(wf_ex.id, expected_state)
self.await_workflow_state(wf_ex.id, expected_state)
return db_api.get_workflow_execution(wf_ex.id)
@ -114,7 +114,7 @@ class DirectWorkflowEngineTest(base.EngineTestCase):
wf_service.create_workflows(wf_text)
wf_ex = self.engine.start_workflow('wf', {})
self.await_execution_error(wf_ex.id)
self.await_workflow_error(wf_ex.id)
wf_ex = db_api.get_workflow_execution(wf_ex.id)
tasks = wf_ex.task_executions
@ -146,7 +146,7 @@ class DirectWorkflowEngineTest(base.EngineTestCase):
wf_ex = self.engine.start_workflow('wf', {})
self.await_execution_success(wf_ex.id)
self.await_workflow_success(wf_ex.id)
self.assertEqual(
states.SUCCESS,
@ -453,7 +453,7 @@ class DirectWorkflowEngineTest(base.EngineTestCase):
wf_ex = self.engine.start_workflow('wf', {})
self.await_execution_success(wf_ex.id)
self.await_workflow_success(wf_ex.id)
def test_task_on_clause_has_yaql_error(self):
wf_text = """

View File

@ -225,7 +225,7 @@ class DirectWorkflowRerunTest(base.EngineTestCase):
# Run workflow and fail task.
wf_ex = self.engine.start_workflow('wb1.wf1', {})
self.await_execution_error(wf_ex.id)
self.await_workflow_error(wf_ex.id)
wf_ex = db_api.get_workflow_execution(wf_ex.id)
@ -249,7 +249,7 @@ class DirectWorkflowRerunTest(base.EngineTestCase):
self.assertIsNone(wf_ex.state_info)
# Wait for the workflow to succeed.
self.await_execution_success(wf_ex.id)
self.await_workflow_success(wf_ex.id)
wf_ex = db_api.get_workflow_execution(wf_ex.id)
@ -319,7 +319,7 @@ class DirectWorkflowRerunTest(base.EngineTestCase):
# Run workflow and fail task.
wf_ex = self.engine.start_workflow('wb1.wf1', {}, env=env)
self.await_execution_error(wf_ex.id)
self.await_workflow_error(wf_ex.id)
wf_ex = db_api.get_workflow_execution(wf_ex.id)
@ -357,7 +357,7 @@ class DirectWorkflowRerunTest(base.EngineTestCase):
self.assertDictEqual(updated_env, wf_ex.context['__env'])
# Wait for the workflow to succeed.
self.await_execution_success(wf_ex.id)
self.await_workflow_success(wf_ex.id)
wf_ex = db_api.get_workflow_execution(wf_ex.id)
@ -454,7 +454,7 @@ class DirectWorkflowRerunTest(base.EngineTestCase):
# Run workflow and fail task.
wf_ex = self.engine.start_workflow('wb1.wf1', {})
self.await_execution_error(wf_ex.id)
self.await_workflow_error(wf_ex.id)
wf_ex = db_api.get_workflow_execution(wf_ex.id)
@ -507,7 +507,7 @@ class DirectWorkflowRerunTest(base.EngineTestCase):
# Run workflow and fail task.
wf_ex = self.engine.start_workflow('wb3.wf1', {})
self.await_execution_error(wf_ex.id)
self.await_workflow_error(wf_ex.id)
wf_ex = db_api.get_workflow_execution(wf_ex.id)
@ -537,7 +537,7 @@ class DirectWorkflowRerunTest(base.EngineTestCase):
self.assertEqual(states.RUNNING, wf_ex.state)
self.assertIsNone(wf_ex.state_info)
self.await_execution_success(wf_ex.id, delay=10)
self.await_workflow_success(wf_ex.id, delay=10)
wf_ex = db_api.get_workflow_execution(wf_ex.id)
@ -596,7 +596,7 @@ class DirectWorkflowRerunTest(base.EngineTestCase):
# Run workflow and fail task.
wf_ex = self.engine.start_workflow('wb3.wf1', {})
self.await_execution_error(wf_ex.id)
self.await_workflow_error(wf_ex.id)
wf_ex = db_api.get_workflow_execution(wf_ex.id)
@ -622,7 +622,7 @@ class DirectWorkflowRerunTest(base.EngineTestCase):
self.assertEqual(states.RUNNING, wf_ex.state)
self.assertIsNone(wf_ex.state_info)
self.await_execution_success(wf_ex.id, delay=10)
self.await_workflow_success(wf_ex.id, delay=10)
wf_ex = db_api.get_workflow_execution(wf_ex.id)
@ -678,7 +678,7 @@ class DirectWorkflowRerunTest(base.EngineTestCase):
# Run workflow and fail task.
wf_ex = self.engine.start_workflow('wb3.wf1', {}, env=env)
self.await_execution_error(wf_ex.id)
self.await_workflow_error(wf_ex.id)
wf_ex = db_api.get_workflow_execution(wf_ex.id)
@ -714,7 +714,7 @@ class DirectWorkflowRerunTest(base.EngineTestCase):
self.assertEqual(states.RUNNING, wf_ex.state)
self.assertIsNone(wf_ex.state_info)
self.await_execution_success(wf_ex.id, delay=10)
self.await_workflow_success(wf_ex.id, delay=10)
wf_ex = db_api.get_workflow_execution(wf_ex.id)
@ -778,7 +778,7 @@ class DirectWorkflowRerunTest(base.EngineTestCase):
wf_ex = db_api.get_workflow_execution(wf_ex.id)
self.await_execution_error(wf_ex.id)
self.await_workflow_error(wf_ex.id)
wf_ex = db_api.get_workflow_execution(wf_ex.id)
@ -804,7 +804,7 @@ class DirectWorkflowRerunTest(base.EngineTestCase):
self.assertIsNone(wf_ex.state_info)
# Wait for the workflow to succeed.
self.await_execution_success(wf_ex.id)
self.await_workflow_success(wf_ex.id)
wf_ex = db_api.get_workflow_execution(wf_ex.id)
@ -875,7 +875,7 @@ class DirectWorkflowRerunTest(base.EngineTestCase):
self.await_task_error(task_1_ex.id)
self.await_task_error(task_2_ex.id)
self.await_execution_error(wf_ex.id)
self.await_workflow_error(wf_ex.id)
wf_ex = db_api.get_workflow_execution(wf_ex.id)
@ -927,7 +927,7 @@ class DirectWorkflowRerunTest(base.EngineTestCase):
self.assertIsNone(wf_ex.state_info)
# Wait for the workflow to succeed.
self.await_execution_success(wf_ex.id)
self.await_workflow_success(wf_ex.id)
wf_ex = db_api.get_workflow_execution(wf_ex.id)
@ -1000,7 +1000,7 @@ class DirectWorkflowRerunTest(base.EngineTestCase):
# Run workflow and fail task.
wf_ex = self.engine.start_workflow('wb3.wf1', {})
self.await_execution_error(wf_ex.id)
self.await_workflow_error(wf_ex.id)
wf_ex = db_api.get_workflow_execution(wf_ex.id)
@ -1027,7 +1027,7 @@ class DirectWorkflowRerunTest(base.EngineTestCase):
self.assertEqual(states.RUNNING, wf_ex.state)
self.assertIsNone(wf_ex.state_info)
self.await_execution_error(wf_ex.id, delay=10)
self.await_workflow_error(wf_ex.id, delay=10)
wf_ex = db_api.get_workflow_execution(wf_ex.id)
@ -1048,7 +1048,7 @@ class DirectWorkflowRerunTest(base.EngineTestCase):
self.assertEqual(states.RUNNING, wf_ex.state)
self.assertIsNone(wf_ex.state_info)
self.await_execution_error(wf_ex.id, delay=10)
self.await_workflow_error(wf_ex.id, delay=10)
wf_ex = db_api.get_workflow_execution(wf_ex.id)
@ -1069,7 +1069,7 @@ class DirectWorkflowRerunTest(base.EngineTestCase):
self.assertEqual(states.RUNNING, wf_ex.state)
self.assertIsNone(wf_ex.state_info)
self.await_execution_error(wf_ex.id, delay=10)
self.await_workflow_error(wf_ex.id, delay=10)
wf_ex = db_api.get_workflow_execution(wf_ex.id)
@ -1090,7 +1090,7 @@ class DirectWorkflowRerunTest(base.EngineTestCase):
self.assertEqual(states.RUNNING, wf_ex.state)
self.assertIsNone(wf_ex.state_info)
self.await_execution_success(wf_ex.id, delay=10)
self.await_workflow_success(wf_ex.id, delay=10)
wf_ex = db_api.get_workflow_execution(wf_ex.id)
@ -1142,7 +1142,7 @@ class DirectWorkflowRerunTest(base.EngineTestCase):
# Run workflow and fail task.
wf_ex = self.engine.start_workflow('wb1.wf1', {})
self.await_execution_error(wf_ex.id)
self.await_workflow_error(wf_ex.id)
wf_ex = db_api.get_workflow_execution(wf_ex.id)
@ -1165,7 +1165,7 @@ class DirectWorkflowRerunTest(base.EngineTestCase):
self.assertIsNone(wf_ex.state_info)
# Wait for the workflow to succeed.
self.await_execution_success(wf_ex.id)
self.await_workflow_success(wf_ex.id)
wf_ex = db_api.get_workflow_execution(wf_ex.id)
@ -1190,8 +1190,9 @@ class DirectWorkflowRerunTest(base.EngineTestCase):
self.assertEqual(states.SUCCESS, task_2_ex.state)
self.assertIsNone(task_2_ex.state_info)
task_2_action_exs = db_api.get_action_executions(
task_execution_id=task_2_ex.id)
task_2_action_exs = db_api.get_workflow_executions(
task_execution_id=task_2_ex.id
)
self.assertEqual(2, len(task_2_action_exs))
self.assertEqual(states.ERROR, task_2_action_exs[0].state)
@ -1224,7 +1225,7 @@ class DirectWorkflowRerunTest(base.EngineTestCase):
# Run workflow and fail task.
wf_ex = self.engine.start_workflow('wb1.wf1', {})
self.await_execution_error(wf_ex.id)
self.await_workflow_error(wf_ex.id)
wf_ex = db_api.get_workflow_execution(wf_ex.id)
@ -1272,7 +1273,7 @@ class DirectWorkflowRerunTest(base.EngineTestCase):
self.assertIsNone(wf_ex.state_info)
# Wait for the subworkflow to succeed.
self.await_execution_success(sub_wf_ex.id)
self.await_workflow_success(sub_wf_ex.id)
sub_wf_ex = db_api.get_workflow_execution(sub_wf_ex.id)
@ -1297,7 +1298,7 @@ class DirectWorkflowRerunTest(base.EngineTestCase):
self.assertEqual(states.SUCCESS, sub_wf_task_ex_action_exs[1].state)
# Wait for the main workflow to succeed.
self.await_execution_success(wf_ex.id)
self.await_workflow_success(wf_ex.id)
wf_ex = db_api.get_workflow_execution(wf_ex.id)
@ -1322,8 +1323,9 @@ class DirectWorkflowRerunTest(base.EngineTestCase):
self.assertEqual(states.SUCCESS, task_2_ex.state)
self.assertIsNone(task_2_ex.state_info)
task_2_action_exs = db_api.get_action_executions(
task_execution_id=task_2_ex.id)
task_2_action_exs = db_api.get_workflow_executions(
task_execution_id=task_2_ex.id
)
self.assertEqual(1, len(task_2_action_exs))
self.assertEqual(states.SUCCESS, task_1_action_exs[0].state)

View File

@ -60,11 +60,14 @@ class DirectWorkflowWithCyclesTest(base.EngineTestCase):
wf_ex = self.engine.start_workflow('wf', {})
self.await_execution_success(wf_ex.id)
self.await_workflow_success(wf_ex.id)
wf_ex = db_api.get_workflow_execution(wf_ex.id)
with db_api.transaction():
wf_ex = db_api.get_workflow_execution(wf_ex.id)
t_execs = wf_ex.task_executions
self.assertDictEqual({'cnt': 2}, wf_ex.output)
t_execs = wf_ex.task_executions
# Expecting one execution for task1 and two executions
# for task2 and task3 because of the cycle 'task2 <-> task3'.
@ -77,8 +80,6 @@ class DirectWorkflowWithCyclesTest(base.EngineTestCase):
self.assertEqual(states.SUCCESS, wf_ex.state)
self.assertTrue(all(states.SUCCESS == t_ex.state for t_ex in t_execs))
self.assertDictEqual({'cnt': 2}, wf_ex.output)
def test_complex_cycle(self):
wf_text = """
version: '2.0'
@ -121,11 +122,14 @@ class DirectWorkflowWithCyclesTest(base.EngineTestCase):
wf_ex = self.engine.start_workflow('wf', {})
self.await_execution_success(wf_ex.id)
self.await_workflow_success(wf_ex.id)
wf_ex = db_api.get_workflow_execution(wf_ex.id)
with db_api.transaction():
wf_ex = db_api.get_workflow_execution(wf_ex.id)
t_execs = wf_ex.task_executions
self.assertDictEqual({'cnt': 2}, wf_ex.output)
t_execs = wf_ex.task_executions
# Expecting one execution for task1 and task5 and two executions
# for task2, task3 and task4 because of the cycle
@ -134,6 +138,7 @@ class DirectWorkflowWithCyclesTest(base.EngineTestCase):
self._assert_multiple_items(t_execs, 2, name='task2')
self._assert_multiple_items(t_execs, 2, name='task3')
self._assert_multiple_items(t_execs, 2, name='task4')
task5_ex = self._assert_single_item(t_execs, name='task5')
self.assertEqual(8, len(t_execs))
@ -141,8 +146,10 @@ class DirectWorkflowWithCyclesTest(base.EngineTestCase):
self.assertEqual(states.SUCCESS, wf_ex.state)
self.assertTrue(all(states.SUCCESS == t_ex.state for t_ex in t_execs))
self.assertEqual(2, data_flow.get_task_execution_result(task5_ex))
self.assertDictEqual({'cnt': 2}, wf_ex.output)
with db_api.transaction():
task5_ex = db_api.get_task_execution(task5_ex.id)
self.assertEqual(2, data_flow.get_task_execution_result(task5_ex))
def test_parallel_cycles(self):
wf_text = """
@ -190,11 +197,13 @@ class DirectWorkflowWithCyclesTest(base.EngineTestCase):
wf_ex = self.engine.start_workflow('wf', {})
self.await_execution_success(wf_ex.id)
self.await_workflow_success(wf_ex.id)
wf_ex = db_api.get_workflow_execution(wf_ex.id)
with db_api.transaction():
wf_ex = db_api.get_workflow_execution(wf_ex.id)
t_execs = wf_ex.task_executions
wf_output = wf_ex.output
t_execs = wf_ex.task_executions
# NOTE: We have two cycles in parallel workflow branches
# and those branches will have their own copy of "cnt" variable
@ -215,4 +224,4 @@ class DirectWorkflowWithCyclesTest(base.EngineTestCase):
# Now workflow output is almost always 3 because the second cycle
# takes longer hence it wins because of how DB queries work: they
# order entities in ascending of creation time.
self.assertTrue(wf_ex.output['cnt'] == 2 or wf_ex.output['cnt'] == 3)
self.assertTrue(wf_output['cnt'] == 2 or wf_output['cnt'] == 3)

View File

@ -140,7 +140,7 @@ class EnvironmentTest(base.EngineTestCase):
self.assertDictEqual(wf1_ex.input, expected_wf1_input)
# Wait till workflow 'wf1' is completed.
self.await_execution_success(wf1_ex.id)
self.await_workflow_success(wf1_ex.id)
wf1_ex = db_api.get_workflow_execution(wf1_ex.id)
@ -149,7 +149,7 @@ class EnvironmentTest(base.EngineTestCase):
self.assertDictEqual(wf1_ex.output, expected_wf1_output)
# Wait till workflow 'wf2' is completed.
self.await_execution_success(wf2_ex.id)
self.await_workflow_success(wf2_ex.id)
wf2_ex = db_api.get_workflow_execution(wf2_ex.id)
@ -166,7 +166,7 @@ class EnvironmentTest(base.EngineTestCase):
self._assert_single_item(wf1_task_execs, name='task2')
for t_ex in wf1_task_execs:
a_ex = t_ex.executions[0]
a_ex = t_ex.action_executions[0]
rpc.ExecutorClient.run_action.assert_any_call(
a_ex.id,

View File

@ -91,7 +91,7 @@ class ErrorResultTest(base.EngineTestCase):
}
)
self.await_execution_success(wf_ex.id)
self.await_workflow_success(wf_ex.id)
# Note: We need to reread execution to access related tasks.
wf_ex = db_api.get_workflow_execution(wf_ex.id)
@ -123,7 +123,7 @@ class ErrorResultTest(base.EngineTestCase):
}
)
self.await_execution_success(wf_ex.id)
self.await_workflow_success(wf_ex.id)
# Note: We need to reread execution to access related tasks.
wf_ex = db_api.get_workflow_execution(wf_ex.id)
@ -155,7 +155,7 @@ class ErrorResultTest(base.EngineTestCase):
}
)
self.await_execution_success(wf_ex.id)
self.await_workflow_success(wf_ex.id)
# Note: We need to reread execution to access related tasks.
wf_ex = db_api.get_workflow_execution(wf_ex.id)

View File

@ -111,7 +111,7 @@ class ExecutionFieldsSizeLimitTest(base.EngineTestCase):
# Start workflow.
wf_ex = self.engine.start_workflow('wf', {})
self.await_execution_success(wf_ex.id)
self.await_workflow_success(wf_ex.id)
def test_workflow_input_default_value_limit(self):
new_wf = generate_workflow(['__WORKFLOW_INPUT__'])
@ -170,7 +170,7 @@ class ExecutionFieldsSizeLimitTest(base.EngineTestCase):
{'action_output_length': 1024}
)
self.await_execution_error(wf_ex.id)
self.await_workflow_error(wf_ex.id)
# Note: We need to reread execution to access related tasks.
wf_ex = db_api.get_workflow_execution(wf_ex.id)
@ -189,7 +189,7 @@ class ExecutionFieldsSizeLimitTest(base.EngineTestCase):
# Start workflow.
wf_ex = self.engine.start_workflow('wf', {})
self.await_execution_error(wf_ex.id)
self.await_workflow_error(wf_ex.id)
# Note: We need to reread execution to access related tasks.
wf_ex = db_api.get_workflow_execution(wf_ex.id)

View File

@ -74,7 +74,7 @@ class JavaScriptEngineTest(base.EngineTestCase):
# Start workflow.
wf_ex = self.engine.start_workflow('test_js.js_test', {'num': 50})
self.await_execution_success(wf_ex.id)
self.await_workflow_success(wf_ex.id)
# Note: We need to reread execution to access related tasks.
wf_ex = db_api.get_workflow_execution(wf_ex.id)
@ -93,7 +93,7 @@ class JavaScriptEngineTest(base.EngineTestCase):
# Start workflow.
wf_ex = self.engine.start_workflow('test_js.js_test', {'num': 50})
self.await_execution_success(wf_ex.id)
self.await_workflow_success(wf_ex.id)
# Note: We need to reread execution to access related tasks.
wf_ex = db_api.get_workflow_execution(wf_ex.id)

View File

@ -64,12 +64,15 @@ class JoinEngineTest(base.EngineTestCase):
# Start workflow.
wf_ex = self.engine.start_workflow('wf', {})
self.await_execution_success(wf_ex.id)
self.await_workflow_success(wf_ex.id)
# Note: We need to reread execution to access related tasks.
wf_ex = db_api.get_workflow_execution(wf_ex.id)
with db_api.transaction():
wf_ex = db_api.get_workflow_execution(wf_ex.id)
tasks = wf_ex.task_executions
self.assertDictEqual({'result': '1,2'}, wf_ex.output)
tasks = wf_ex.task_executions
task1 = self._assert_single_item(tasks, name='task1')
task2 = self._assert_single_item(tasks, name='task2')
@ -79,8 +82,6 @@ class JoinEngineTest(base.EngineTestCase):
self.assertEqual(states.SUCCESS, task2.state)
self.assertEqual(states.SUCCESS, task3.state)
self.assertDictEqual({'result': '1,2'}, wf_ex.output)
def test_full_join_with_errors(self):
wf_text = """---
version: '2.0'
@ -116,12 +117,15 @@ class JoinEngineTest(base.EngineTestCase):
# Start workflow.
wf_ex = self.engine.start_workflow('wf', {})
self.await_execution_success(wf_ex.id)
self.await_workflow_success(wf_ex.id)
# Note: We need to reread execution to access related tasks.
wf_ex = db_api.get_workflow_execution(wf_ex.id)
with db_api.transaction():
wf_ex = db_api.get_workflow_execution(wf_ex.id)
tasks = wf_ex.task_executions
self.assertDictEqual({'result': '1-1'}, wf_ex.output)
tasks = wf_ex.task_executions
task1 = self._assert_single_item(tasks, name='task1')
task2 = self._assert_single_item(tasks, name='task2')
@ -131,8 +135,6 @@ class JoinEngineTest(base.EngineTestCase):
self.assertEqual(states.ERROR, task2.state)
self.assertEqual(states.SUCCESS, task3.state)
self.assertDictEqual({'result': '1-1'}, wf_ex.output)
def test_full_join_with_conditions(self):
wf_text = """---
version: '2.0'
@ -248,12 +250,15 @@ class JoinEngineTest(base.EngineTestCase):
# Start workflow.
wf_ex = self.engine.start_workflow('wf', {})
self.await_execution_success(wf_ex.id)
self.await_workflow_success(wf_ex.id)
# Note: We need to reread execution to access related tasks.
wf_ex = db_api.get_workflow_execution(wf_ex.id)
with db_api.transaction():
wf_ex = db_api.get_workflow_execution(wf_ex.id)
tasks = wf_ex.task_executions
self.assertDictEqual({'result': '1,2'}, wf_ex.output)
tasks = wf_ex.task_executions
self.assertEqual(4, len(tasks))
@ -271,7 +276,6 @@ class JoinEngineTest(base.EngineTestCase):
self.await_task_error(task3.id)
self.assertDictEqual({'result4': '1,2'}, task4.published)
self.assertDictEqual({'result': '1,2'}, wf_ex.output)
def test_partial_join_triggers_once(self):
wf_text = """---
@ -328,12 +332,13 @@ class JoinEngineTest(base.EngineTestCase):
# Start workflow.
wf_ex = self.engine.start_workflow('wf', {})
self.await_execution_success(wf_ex.id)
self.await_workflow_success(wf_ex.id)
# Note: We need to reread execution to access related tasks.
wf_ex = db_api.get_workflow_execution(wf_ex.id)
with db_api.transaction():
wf_ex = db_api.get_workflow_execution(wf_ex.id)
tasks = wf_ex.task_executions
tasks = wf_ex.task_executions
self.assertEqual(5, len(tasks))
@ -399,12 +404,13 @@ class JoinEngineTest(base.EngineTestCase):
# Start workflow.
wf_ex = self.engine.start_workflow('wf', {})
self.await_execution_success(wf_ex.id)
self.await_workflow_success(wf_ex.id)
# Note: We need to reread execution to access related tasks.
wf_ex = db_api.get_workflow_execution(wf_ex.id)
with db_api.transaction():
wf_ex = db_api.get_workflow_execution(wf_ex.id)
tasks = wf_ex.task_executions
tasks = wf_ex.task_executions
self.assertEqual(4, len(tasks))
@ -479,19 +485,20 @@ class JoinEngineTest(base.EngineTestCase):
# Start workflow.
wf_ex = self.engine.start_workflow('main', {})
self.await_execution_success(wf_ex.id)
self.await_workflow_success(wf_ex.id)
# Note: We need to reread execution to access related tasks.
wf_ex = db_api.get_workflow_execution(wf_ex.id)
with db_api.transaction():
wf_ex = db_api.get_workflow_execution(wf_ex.id)
self.assertDictEqual(
{
'var1': True,
'is_done': True,
'var2': True
},
wf_ex.output
)
self.assertDictEqual(
{
'var1': True,
'is_done': True,
'var2': True
},
wf_ex.output
)
@testtools.skip('https://bugs.launchpad.net/mistral/+bug/1424461')
def test_full_join_parallel_published_vars_complex(self):
@ -541,22 +548,23 @@ class JoinEngineTest(base.EngineTestCase):
wf_service.create_workflows(wf_text)
# Start workflow.
exec_db = self.engine.start_workflow('main', {})
wf_ex = self.engine.start_workflow('main', {})
self.await_execution_success(exec_db.id)
self.await_workflow_success(wf_ex.id)
# Note: We need to reread execution to access related tasks.
exec_db = db_api.get_execution(exec_db.id)
with db_api.transaction():
wf_ex = db_api.get_workflow_execution(wf_ex.id)
self.assertDictEqual(
{
'var_a': 1,
'var_b': 1,
'var_c': 1,
'var_d': 1
},
exec_db.output
)
self.assertDictEqual(
{
'var_a': 1,
'var_b': 1,
'var_c': 1,
'var_d': 1
},
wf_ex.output
)
def test_full_join_with_branch_errors(self):
wf_text = """---
@ -601,10 +609,12 @@ class JoinEngineTest(base.EngineTestCase):
wf_ex = self.engine.start_workflow('main', {})
self.await_execution_error(wf_ex.id)
self.await_workflow_error(wf_ex.id)
wf_ex = db_api.get_workflow_execution(wf_ex.id)
tasks = wf_ex.task_executions
with db_api.transaction():
wf_ex = db_api.get_workflow_execution(wf_ex.id)
tasks = wf_ex.task_executions
task10 = self._assert_single_item(tasks, name='task10')
task21 = self._assert_single_item(tasks, name='task21')
@ -651,11 +661,12 @@ class JoinEngineTest(base.EngineTestCase):
wf_ex = self.engine.start_workflow('test-join', {})
self.await_execution_success(wf_ex.id)
self.await_workflow_success(wf_ex.id)
wf_ex = db_api.get_workflow_execution(wf_ex.id)
with db_api.transaction():
wf_ex = db_api.get_workflow_execution(wf_ex.id)
tasks = wf_ex.task_executions
tasks = wf_ex.task_executions
self._assert_multiple_items(tasks, 5, state=states.SUCCESS)
@ -682,10 +693,11 @@ class JoinEngineTest(base.EngineTestCase):
wf_ex = self.engine.start_workflow('wf', {})
self.await_execution_success(wf_ex.id)
self.await_workflow_success(wf_ex.id)
wf_ex = db_api.get_workflow_execution(wf_ex.id)
with db_api.transaction():
wf_ex = db_api.get_workflow_execution(wf_ex.id)
tasks = wf_ex.task_executions
tasks = wf_ex.task_executions
self._assert_multiple_items(tasks, 3, state=states.SUCCESS)
self._assert_multiple_items(tasks, 3, state=states.SUCCESS)

View File

@ -80,7 +80,7 @@ class NoopTaskEngineTest(base.EngineTestCase):
# Start workflow.
wf_ex = self.engine.start_workflow('wf', {'num1': 1, 'num2': 1})
self.await_execution_success(wf_ex.id)
self.await_workflow_success(wf_ex.id)
# Note: We need to reread execution to access related tasks.
wf_ex = db_api.get_workflow_execution(wf_ex.id)
@ -107,7 +107,7 @@ class NoopTaskEngineTest(base.EngineTestCase):
# Start workflow.
wf_ex = self.engine.start_workflow('wf', {'num1': 1, 'num2': 2})
self.await_execution_success(wf_ex.id)
self.await_workflow_success(wf_ex.id)
# Note: We need to reread execution to access related tasks.
wf_ex = db_api.get_workflow_execution(wf_ex.id)

View File

@ -422,7 +422,7 @@ class PoliciesTest(base.EngineTestCase):
task_ex.runtime_context
)
self.await_execution_success(wf_ex.id)
self.await_workflow_success(wf_ex.id)
def test_wait_before_policy_from_var(self):
wb_service.create_workbook_v2(WAIT_BEFORE_FROM_VAR)
@ -431,12 +431,12 @@ class PoliciesTest(base.EngineTestCase):
exec_db = self.engine.start_workflow('wb.wf1', {'wait_before': 1})
# Note: We need to reread execution to access related tasks.
exec_db = db_api.get_execution(exec_db.id)
exec_db = db_api.get_workflow_execution(exec_db.id)
task_db = exec_db.task_executions[0]
self.assertEqual(states.RUNNING_DELAYED, task_db.state)
self.await_execution_success(exec_db.id)
self.await_workflow_success(exec_db.id)
def test_wait_after_policy(self):
wb_service.create_workbook_v2(WAIT_AFTER_WB)
@ -484,7 +484,7 @@ class PoliciesTest(base.EngineTestCase):
self.await_task_delayed(task_ex.id, delay=0.5)
self.await_task_error(task_ex.id)
self.await_execution_error(wf_ex.id)
self.await_workflow_error(wf_ex.id)
wf_ex = db_api.get_workflow_execution(wf_ex.id)
task_ex = wf_ex.task_executions[0]
@ -536,7 +536,7 @@ class PoliciesTest(base.EngineTestCase):
self.await_task_success(task_ex.id)
self.await_execution_success(wf_ex.id)
self.await_workflow_success(wf_ex.id)
wf_ex = db_api.get_workflow_execution(wf_ex.id)
task_ex = wf_ex.task_executions[0]
@ -575,7 +575,7 @@ class PoliciesTest(base.EngineTestCase):
self.await_task_error(task_ex.id)
self.await_execution_error(wf_ex.id)
self.await_workflow_error(wf_ex.id)
wf_ex = db_api.get_workflow_execution(wf_ex.id)
task_ex = wf_ex.task_executions[0]
@ -614,7 +614,7 @@ class PoliciesTest(base.EngineTestCase):
self.await_task_error(task_ex.id)
self.await_execution_error(wf_ex.id)
self.await_workflow_error(wf_ex.id)
wf_ex = db_api.get_workflow_execution(wf_ex.id)
task_ex = wf_ex.task_executions[0]
@ -654,7 +654,7 @@ class PoliciesTest(base.EngineTestCase):
self.await_task_success(task_ex.id)
self.await_execution_success(wf_ex.id)
self.await_workflow_success(wf_ex.id)
wf_ex = db_api.get_workflow_execution(wf_ex.id)
task_ex = wf_ex.task_executions[0]
@ -691,7 +691,7 @@ class PoliciesTest(base.EngineTestCase):
self.await_task_success(task_ex.id)
self.await_execution_success(wf_ex.id)
self.await_workflow_success(wf_ex.id)
wf_ex = db_api.get_workflow_execution(wf_ex.id)
task_ex = wf_ex.task_executions[0]
@ -727,7 +727,7 @@ class PoliciesTest(base.EngineTestCase):
self.await_task_error(task_ex.id)
self.await_execution_error(wf_ex.id)
self.await_workflow_error(wf_ex.id)
wf_ex = db_api.get_workflow_execution(wf_ex.id)
task_ex = wf_ex.task_executions[0]
@ -769,7 +769,7 @@ class PoliciesTest(base.EngineTestCase):
task_ex = wf_ex.task_executions[0]
self.await_task_error(task_ex.id)
self.await_execution_error(wf_ex.id)
self.await_workflow_error(wf_ex.id)
wf_ex = db_api.get_workflow_execution(wf_ex.id)
task_ex = wf_ex.task_executions[0]
@ -813,7 +813,7 @@ class PoliciesTest(base.EngineTestCase):
self.await_task_success(task_ex.id)
self.await_execution_success(wf_ex.id)
self.await_workflow_success(wf_ex.id)
wf_ex = db_api.get_workflow_execution(wf_ex.id)
task_ex = wf_ex.task_executions[0]
@ -843,7 +843,7 @@ class PoliciesTest(base.EngineTestCase):
self._assert_single_item(wf_ex.task_executions, name='task1')
self.await_execution_success(wf_ex.id)
self.await_workflow_success(wf_ex.id)
def test_timeout_policy_success_after_timeout(self):
wb_service.create_workbook_v2(TIMEOUT_WB2)
@ -857,7 +857,7 @@ class PoliciesTest(base.EngineTestCase):
self.assertEqual(states.RUNNING, task_ex.state)
self.await_execution_error(wf_ex.id)
self.await_workflow_error(wf_ex.id)
# Wait until timeout exceeds.
self._sleep(1)
@ -896,7 +896,7 @@ class PoliciesTest(base.EngineTestCase):
self.assertEqual(states.IDLE, task_ex.state)
self.await_execution_paused(wf_ex.id)
self.await_workflow_paused(wf_ex.id)
self._sleep(1)
@ -905,7 +905,7 @@ class PoliciesTest(base.EngineTestCase):
wf_ex = db_api.get_workflow_execution(wf_ex.id)
self._assert_single_item(wf_ex.task_executions, name='task1')
self.await_execution_success(wf_ex.id)
self.await_workflow_success(wf_ex.id)
wf_ex = db_api.get_workflow_execution(wf_ex.id)
task_ex = self._assert_single_item(
@ -935,7 +935,7 @@ class PoliciesTest(base.EngineTestCase):
self.assertEqual(states.IDLE, task_ex.state)
# Verify wf paused by pause-before
self.await_execution_paused(wf_ex.id)
self.await_workflow_paused(wf_ex.id)
# Allow wait-before to expire
self._sleep(2)
@ -943,7 +943,7 @@ class PoliciesTest(base.EngineTestCase):
wf_ex = db_api.get_workflow_execution(wf_ex.id)
# Verify wf still paused (wait-before didn't reactivate)
self.await_execution_paused(wf_ex.id)
self.await_workflow_paused(wf_ex.id)
task_ex = db_api.get_task_execution(task_ex.id)
self.assertEqual(states.IDLE, task_ex.state)
@ -953,7 +953,7 @@ class PoliciesTest(base.EngineTestCase):
wf_ex = db_api.get_workflow_execution(wf_ex.id)
self._assert_single_item(wf_ex.task_executions, name='task1')
self.await_execution_success(wf_ex.id)
self.await_workflow_success(wf_ex.id)
wf_ex = db_api.get_workflow_execution(wf_ex.id)
task_ex = self._assert_single_item(
@ -974,7 +974,7 @@ class PoliciesTest(base.EngineTestCase):
# Start workflow.
wf_ex = self.engine.start_workflow('wb.wf1', {})
self.await_execution_success(wf_ex.id)
self.await_workflow_success(wf_ex.id)
wf_ex = db_api.get_workflow_execution(wf_ex.id)
task_ex = self._assert_single_item(
@ -1046,9 +1046,9 @@ class PoliciesTest(base.EngineTestCase):
# Start workflow.
wf_ex = self.engine.start_workflow('wf', {})
self.await_execution_success(wf_ex.id)
self.await_workflow_success(wf_ex.id)
# Note: We need to reread execution to access related tasks.
wf_ex = db_api.get_execution(wf_ex.id)
wf_ex = db_api.get_workflow_execution(wf_ex.id)
self.assertEqual(2, len(wf_ex.task_executions))

View File

@ -71,7 +71,7 @@ class EngineProfilerTest(base.EngineTestCase):
self.assertIsNotNone(wf_ex)
self.assertEqual(states.RUNNING, wf_ex['state'])
self.await_execution_success(wf_ex['id'])
self.await_workflow_success(wf_ex['id'])
self.assertGreater(self.mock_profiler_log_func.call_count, 0)
@ -94,6 +94,6 @@ class EngineProfilerTest(base.EngineTestCase):
self.assertIsNotNone(wf_ex)
self.assertEqual(states.RUNNING, wf_ex['state'])
self.await_execution_success(wf_ex['id'])
self.await_workflow_success(wf_ex['id'])
self.assertEqual(self.mock_profiler_log_func.call_count, 0)

View File

@ -159,7 +159,7 @@ class EngineActionRaceConditionTest(base.EngineTestCase):
self.unblock_action()
self.await_execution_success(wf_ex.id)
self.await_workflow_success(wf_ex.id)
wf_ex = db_api.get_workflow_execution(wf_ex.id)
@ -194,7 +194,7 @@ class EngineActionRaceConditionTest(base.EngineTestCase):
self.unblock_action()
self.await_task_success(task2_ex.id)
self.await_execution_success(wf_ex.id)
self.await_workflow_success(wf_ex.id)
task1_ex = db_api.get_task_execution(task1_ex.id)
task1_action_ex = db_api.get_action_executions(

View File

@ -82,7 +82,7 @@ class ReverseWorkflowEngineTest(base.EngineTestCase):
self.assertDictEqual({'task_name': 'task1'}, wf_ex.params)
# Wait till workflow 'wf1' is completed.
self.await_execution_success(wf_ex.id)
self.await_workflow_success(wf_ex.id)
wf_ex = db_api.get_workflow_execution(wf_ex.id)
@ -112,7 +112,7 @@ class ReverseWorkflowEngineTest(base.EngineTestCase):
self.assertDictEqual({'task_name': 'task2'}, wf_ex.params)
# Wait till workflow 'wf1' is completed.
self.await_execution_success(wf_ex.id)
self.await_workflow_success(wf_ex.id)
wf_ex = db_api.get_workflow_execution(wf_ex.id)
@ -144,7 +144,7 @@ class ReverseWorkflowEngineTest(base.EngineTestCase):
task_name='task4'
)
self.await_execution_success(wf_ex.id)
self.await_workflow_success(wf_ex.id)
tasks = db_api.get_task_executions()

View File

@ -89,7 +89,7 @@ class ReverseWorkflowRerunTest(base.EngineTestCase):
# Run workflow and fail task.
wf_ex = self.engine.start_workflow('wb1.wf1', {}, task_name='t3')
self.await_execution_error(wf_ex.id)
self.await_workflow_error(wf_ex.id)
wf_ex = db_api.get_workflow_execution(wf_ex.id)
self.assertEqual(states.ERROR, wf_ex.state)
@ -111,7 +111,7 @@ class ReverseWorkflowRerunTest(base.EngineTestCase):
self.assertIsNone(wf_ex.state_info)
# Wait for the workflow to succeed.
self.await_execution_success(wf_ex.id)
self.await_workflow_success(wf_ex.id)
wf_ex = db_api.get_workflow_execution(wf_ex.id)
self.assertEqual(states.SUCCESS, wf_ex.state)
@ -180,7 +180,7 @@ class ReverseWorkflowRerunTest(base.EngineTestCase):
env=env
)
self.await_execution_error(wf_ex.id)
self.await_workflow_error(wf_ex.id)
wf_ex = db_api.get_workflow_execution(wf_ex.id)
self.assertEqual(states.ERROR, wf_ex.state)
@ -212,7 +212,7 @@ class ReverseWorkflowRerunTest(base.EngineTestCase):
self.assertDictEqual(updated_env, wf_ex.context['__env'])
# Wait for the workflow to succeed.
self.await_execution_success(wf_ex.id)
self.await_workflow_success(wf_ex.id)
wf_ex = db_api.get_workflow_execution(wf_ex.id)
self.assertEqual(states.SUCCESS, wf_ex.state)
@ -287,7 +287,7 @@ class ReverseWorkflowRerunTest(base.EngineTestCase):
# Run workflow and fail task.
wf_ex = self.engine.start_workflow('wb1.wf1', {}, task_name='t3')
self.await_execution_error(wf_ex.id)
self.await_workflow_error(wf_ex.id)
wf_ex = db_api.get_workflow_execution(wf_ex.id)
self.assertEqual(states.ERROR, wf_ex.state)

View File

@ -74,7 +74,7 @@ class TestSafeRerun(base.EngineTestCase):
wf_service.create_workflows(wf_text)
wf_ex = self.engine.start_workflow('wf', {})
self.await_execution_success(wf_ex.id)
self.await_workflow_success(wf_ex.id)
# Note: We need to reread execution to access related tasks.
wf_ex = db_api.get_workflow_execution(wf_ex.id)
@ -119,7 +119,7 @@ class TestSafeRerun(base.EngineTestCase):
wf_service.create_workflows(wf_text)
wf_ex = self.engine.start_workflow('wf', {})
self.await_execution_success(wf_ex.id)
self.await_workflow_success(wf_ex.id)
# Note: We need to reread execution to access related tasks.
wf_ex = db_api.get_workflow_execution(wf_ex.id)
@ -153,7 +153,7 @@ class TestSafeRerun(base.EngineTestCase):
wf_service.create_workflows(wf_text)
wf_ex = self.engine.start_workflow('wf', {})
self.await_execution_success(wf_ex.id)
self.await_workflow_success(wf_ex.id)
# Note: We need to reread execution to access related tasks.
wf_ex = db_api.get_workflow_execution(wf_ex.id)

View File

@ -48,7 +48,7 @@ class ExecutionStateInfoTest(base.EngineTestCase):
# Start workflow.
wf_ex = self.engine.start_workflow('test_wf', {})
self.await_execution_error(wf_ex.id)
self.await_workflow_error(wf_ex.id)
# Note: We need to reread execution to access related tasks.
wf_ex = db_api.get_workflow_execution(wf_ex.id)
@ -72,7 +72,7 @@ class ExecutionStateInfoTest(base.EngineTestCase):
# Start workflow.
wf_ex = self.engine.start_workflow('test_wf', {})
self.await_execution_error(wf_ex.id)
self.await_workflow_error(wf_ex.id)
# Note: We need to reread execution to access related tasks.
wf_ex = db_api.get_workflow_execution(wf_ex.id)
@ -98,7 +98,7 @@ class ExecutionStateInfoTest(base.EngineTestCase):
# Start workflow.
wf_ex = self.engine.start_workflow('test_wf', {})
self.await_execution_error(wf_ex.id)
self.await_workflow_error(wf_ex.id)
# Note: We need to reread execution to access related tasks.
wf_ex = db_api.get_workflow_execution(wf_ex.id)
@ -133,7 +133,7 @@ class ExecutionStateInfoTest(base.EngineTestCase):
wf_ex = self.engine.start_workflow('wf', {})
self.await_execution_error(wf_ex.id)
self.await_workflow_error(wf_ex.id)
wf_ex = db_api.get_workflow_execution(wf_ex.id)

View File

@ -143,7 +143,7 @@ class SubworkflowsTest(base.EngineTestCase):
)
# Wait till workflow 'wf1' is completed.
self.await_execution_success(wf1_ex.id)
self.await_workflow_success(wf1_ex.id)
wf1_ex = db_api.get_workflow_execution(wf1_ex.id)
@ -153,7 +153,7 @@ class SubworkflowsTest(base.EngineTestCase):
)
# Wait till workflow 'wf2' is completed.
self.await_execution_success(wf2_ex.id)
self.await_workflow_success(wf2_ex.id, timeout=4)
wf2_ex = db_api.get_workflow_execution(wf2_ex.id)
@ -193,15 +193,15 @@ class SubworkflowsTest(base.EngineTestCase):
wf2_ex = self._assert_single_item(wf_execs, name='wb1.wf2')
# Wait till workflow 'wf1' is completed.
self.await_execution_error(wf1_ex.id)
self.await_workflow_error(wf1_ex.id)
# Wait till workflow 'wf2' is completed, its state must be ERROR.
self.await_execution_error(wf2_ex.id)
self.await_workflow_error(wf2_ex.id)
def test_subworkflow_yaql_error(self):
wf_ex = self.engine.start_workflow('wb2.wf1', None)
self.await_execution_error(wf_ex.id)
self.await_workflow_error(wf_ex.id)
wf_execs = db_api.get_workflow_executions()
@ -248,7 +248,7 @@ class SubworkflowsTest(base.EngineTestCase):
self.assertDictContainsSubset(expected_start_params, wf1_ex.params)
# Wait till workflow 'wf1' is completed.
self.await_execution_success(wf1_ex.id)
self.await_workflow_success(wf1_ex.id)
# Wait till workflow 'wf2' is completed.
self.await_execution_success(wf2_ex.id)
self.await_workflow_success(wf2_ex.id)

View File

@ -58,7 +58,7 @@ class TaskDefaultsDirectWorkflowEngineTest(base.EngineTestCase):
# Start workflow.
wf_ex = self.engine.start_workflow('wf', {})
self.await_execution_success(wf_ex.id)
self.await_workflow_success(wf_ex.id)
# Note: We need to reread execution to access related tasks.
wf_ex = db_api.get_workflow_execution(wf_ex.id)
@ -107,7 +107,7 @@ class TaskDefaultsReverseWorkflowEngineTest(base.EngineTestCase):
# Start workflow.
wf_ex = self.engine.start_workflow('wf', {}, task_name='task2')
self.await_execution_error(wf_ex.id)
self.await_workflow_error(wf_ex.id)
# Note: We need to reread execution to access related tasks.
wf_ex = db_api.get_workflow_execution(wf_ex.id)
@ -150,7 +150,7 @@ class TaskDefaultsReverseWorkflowEngineTest(base.EngineTestCase):
# Start workflow.
wf_ex = self.engine.start_workflow('wf', {}, task_name='task2')
self.await_execution_error(wf_ex.id)
self.await_workflow_error(wf_ex.id)
# Note: We need to reread execution to access related tasks.
wf_ex = db_api.get_workflow_execution(wf_ex.id)
@ -187,7 +187,7 @@ class TaskDefaultsReverseWorkflowEngineTest(base.EngineTestCase):
# Start workflow.
wf_ex = self.engine.start_workflow('wf', {}, task_name='task1')
self.await_execution_success(wf_ex.id)
self.await_workflow_success(wf_ex.id)
# Workflow must work at least 2 seconds (1+1).
self.assertGreater(
@ -232,7 +232,7 @@ class TaskDefaultsReverseWorkflowEngineTest(base.EngineTestCase):
# Start workflow.
wf_ex = self.engine.start_workflow('wf', {}, task_name='task2')
self.await_execution_success(wf_ex.id)
self.await_workflow_success(wf_ex.id)
# Note: We need to reread execution to access related tasks.
wf_ex = db_api.get_workflow_execution(wf_ex.id)

View File

@ -69,7 +69,7 @@ class TaskPublishTest(base.EngineTestCase):
# Run workflow and fail task.
wf_ex = self.engine.start_workflow('wb1.wf1', {})
self.await_execution_error(wf_ex.id)
self.await_workflow_error(wf_ex.id)
wf_ex = db_api.get_workflow_execution(wf_ex.id)

View File

@ -185,7 +185,7 @@ class WithItemsEngineTest(base.EngineTestCase):
# Start workflow.
wf_ex = self.engine.start_workflow('wb1.with_items', WF_INPUT)
self.await_execution_success(wf_ex.id)
self.await_workflow_success(wf_ex.id)
# Note: We need to reread execution to access related tasks.
wf_ex = db_api.get_workflow_execution(wf_ex.id)
@ -200,7 +200,10 @@ class WithItemsEngineTest(base.EngineTestCase):
# Since we know that we can receive results in random order,
# check is not depend on order of items.
result = data_flow.get_task_execution_result(task1_ex)
with db_api.transaction():
task1_ex = db_api.get_task_execution(task1_ex.id)
result = data_flow.get_task_execution_result(task1_ex)
self.assertIsInstance(result, list)
@ -237,7 +240,7 @@ class WithItemsEngineTest(base.EngineTestCase):
# Start workflow.
wf_ex = self.engine.start_workflow('with_items', {})
self.await_execution_success(wf_ex.id)
self.await_workflow_success(wf_ex.id)
# Note: We need to reread execution to access related tasks.
wf_ex = db_api.get_workflow_execution(wf_ex.id)
@ -262,14 +265,18 @@ class WithItemsEngineTest(base.EngineTestCase):
# Start workflow.
wf_ex = self.engine.start_workflow('with_items', {})
self.await_execution_error(wf_ex.id)
self.await_workflow_error(wf_ex.id)
# Note: We need to reread execution to access related tasks.
wf_ex = db_api.get_workflow_execution(wf_ex.id)
tasks = wf_ex.task_executions
task1 = self._assert_single_item(tasks, name='task1')
result = data_flow.get_task_execution_result(task1)
with db_api.transaction():
task1 = db_api.get_task_execution(task1.id)
result = data_flow.get_task_execution_result(task1)
self.assertEqual(states.ERROR, task1.state)
self.assertIsInstance(result, list)
@ -307,7 +314,7 @@ class WithItemsEngineTest(base.EngineTestCase):
# Start workflow.
wf_ex = self.engine.start_workflow('wb1.with_items', {})
self.await_execution_success(wf_ex.id)
self.await_workflow_success(wf_ex.id)
# Note: We need to reread execution to access related tasks.
wf_ex = db_api.get_workflow_execution(wf_ex.id)
@ -323,14 +330,18 @@ class WithItemsEngineTest(base.EngineTestCase):
# Start workflow.
wf_ex = self.engine.start_workflow('wb1.with_items', wf_input)
self.await_execution_success(wf_ex.id)
self.await_workflow_success(wf_ex.id)
# Note: We need to reread execution to access related tasks.
wf_ex = db_api.get_workflow_execution(wf_ex.id)
tasks = wf_ex.task_executions
task1 = self._assert_single_item(tasks, name='task1')
result = data_flow.get_task_execution_result(task1)
with db_api.transaction():
task1 = db_api.get_task_execution(task1.id)
result = data_flow.get_task_execution_result(task1)
self.assertIsInstance(result, list)
@ -349,7 +360,7 @@ class WithItemsEngineTest(base.EngineTestCase):
# Start workflow.
wf_ex = self.engine.start_workflow('wb1.with_items', wf_input)
self.await_execution_success(wf_ex.id)
self.await_workflow_success(wf_ex.id)
# Note: We need to reread execution to access related tasks.
wf_ex = db_api.get_workflow_execution(wf_ex.id)
@ -360,7 +371,10 @@ class WithItemsEngineTest(base.EngineTestCase):
# Since we know that we can receive results in random order,
# check is not depend on order of items.
result = data_flow.get_task_execution_result(task1_ex)
with db_api.transaction():
task1_ex = db_api.get_task_execution(task1_ex.id)
result = data_flow.get_task_execution_result(task1_ex)
self.assertIsInstance(result, list)
@ -389,7 +403,7 @@ class WithItemsEngineTest(base.EngineTestCase):
wf_utils.Result("Mistral")
)
self.await_execution_success(wf_ex.id)
self.await_workflow_success(wf_ex.id)
# Note: We need to reread execution to access related tasks.
wf_ex = db_api.get_workflow_execution(wf_ex.id)
@ -435,7 +449,7 @@ class WithItemsEngineTest(base.EngineTestCase):
wf_input = {'names_info': []}
wf_ex = self.engine.start_workflow('wb1.with_items', wf_input)
self.await_execution_success(wf_ex.id)
self.await_workflow_success(wf_ex.id)
# Note: We need to reread execution to access related tasks.
wf_ex = db_api.get_workflow_execution(wf_ex.id)
@ -470,7 +484,7 @@ class WithItemsEngineTest(base.EngineTestCase):
# Start workflow.
wf_ex = self.engine.start_workflow('wb1.with_items', {})
self.await_execution_success(wf_ex.id)
self.await_workflow_success(wf_ex.id)
# Note: We need to reread execution to access related tasks.
wf_ex = db_api.get_workflow_execution(wf_ex.id)
@ -481,7 +495,10 @@ class WithItemsEngineTest(base.EngineTestCase):
state=states.SUCCESS
)
result = data_flow.get_task_execution_result(task1_ex)
with db_api.transaction():
task1_ex = db_api.get_task_execution(task1_ex.id)
result = data_flow.get_task_execution_result(task1_ex)
# Since we know that we can receive results in random order,
# check is not depend on order of items.
@ -538,7 +555,7 @@ class WithItemsEngineTest(base.EngineTestCase):
# Start workflow.
wf_ex = self.engine.start_workflow('wb1.with_items', {})
self.await_execution_success(wf_ex.id)
self.await_workflow_success(wf_ex.id)
# Note: We need to reread execution to access related tasks.
wf_ex = db_api.get_workflow_execution(wf_ex.id)
@ -560,7 +577,7 @@ class WithItemsEngineTest(base.EngineTestCase):
# Start workflow.
wf_ex = self.engine.start_workflow('wb1.with_items', WF_INPUT_ONE_ITEM)
self.await_execution_success(wf_ex.id)
self.await_workflow_success(wf_ex.id)
# Note: We need to reread execution to access related tasks.
wf_ex = db_api.get_workflow_execution(wf_ex.id)
@ -575,7 +592,10 @@ class WithItemsEngineTest(base.EngineTestCase):
state=states.SUCCESS
)
result = data_flow.get_task_execution_result(task1_ex)
with db_api.transaction():
task1_ex = db_api.get_task_execution(task1_ex.id)
result = data_flow.get_task_execution_result(task1_ex)
self.assertIsInstance(result, list)
self.assertIn('Guy', result)
@ -604,7 +624,7 @@ class WithItemsEngineTest(base.EngineTestCase):
# Start workflow.
wf_ex = self.engine.start_workflow('concurrency_test', {})
wf_ex = db_api.get_execution(wf_ex.id)
wf_ex = db_api.get_workflow_execution(wf_ex.id)
task_ex = wf_ex.task_executions[0]
task_ex = db_api.get_task_execution(task_ex.id)
@ -644,13 +664,14 @@ class WithItemsEngineTest(base.EngineTestCase):
self.assert_capacity(1, task_ex)
self.await_execution_success(wf_ex.id)
task_ex = db_api.get_task_execution(task_ex.id)
self.await_workflow_success(wf_ex.id)
# Since we know that we can receive results in random order,
# check is not depend on order of items.
result = data_flow.get_task_execution_result(task_ex)
with db_api.transaction():
task_ex = db_api.get_task_execution(task_ex.id)
result = data_flow.get_task_execution_result(task_ex)
self.assertIsInstance(result, list)
@ -686,7 +707,7 @@ class WithItemsEngineTest(base.EngineTestCase):
{'concurrency': 2}
)
self.await_execution_success(wf_ex.id)
self.await_workflow_success(wf_ex.id)
wf_ex = db_api.get_workflow_execution(wf_ex.id)
@ -696,7 +717,10 @@ class WithItemsEngineTest(base.EngineTestCase):
# Since we know that we can receive results in random order,
# check is not depend on order of items.
result = data_flow.get_task_execution_result(task_ex)
with db_api.transaction():
task_ex = db_api.get_task_execution(task_ex.id)
result = data_flow.get_task_execution_result(task_ex)
self.assertIsInstance(result, list)
@ -758,7 +782,7 @@ class WithItemsEngineTest(base.EngineTestCase):
# Start workflow.
wf_ex = self.engine.start_workflow('concurrency_test', {})
wf_ex = db_api.get_execution(wf_ex.id)
wf_ex = db_api.get_workflow_execution(wf_ex.id)
task_ex = wf_ex.task_executions[0]
self.assert_capacity(0, task_ex)
@ -806,13 +830,14 @@ class WithItemsEngineTest(base.EngineTestCase):
self.assert_capacity(2, task_ex)
self.await_execution_success(wf_ex.id)
task_ex = db_api.get_task_execution(task_ex.id)
self.await_workflow_success(wf_ex.id)
# Since we know that we can receive results in random order,
# check is not depend on order of items.
result = data_flow.get_task_execution_result(task_ex)
with db_api.transaction():
task_ex = db_api.get_task_execution(task_ex.id)
result = data_flow.get_task_execution_result(task_ex)
self.assertIsInstance(result, list)
@ -846,9 +871,9 @@ class WithItemsEngineTest(base.EngineTestCase):
# Start workflow.
wf_ex = self.engine.start_workflow('concurrency_test_fail', {})
self.await_execution_success(wf_ex.id)
self.await_workflow_success(wf_ex.id)
wf_ex = db_api.get_execution(wf_ex.id)
wf_ex = db_api.get_workflow_execution(wf_ex.id)
task_exs = wf_ex.task_executions
@ -856,10 +881,12 @@ class WithItemsEngineTest(base.EngineTestCase):
task_2 = self._assert_single_item(task_exs, name='task2')
self.assertEqual(
'With-items failed',
data_flow.get_task_execution_result(task_2)
)
with db_api.transaction():
task_2 = db_api.get_task_execution(task_2.id)
result = data_flow.get_task_execution_result(task_2)
self.assertEqual('With-items failed', result)
def test_with_items_concurrency_3(self):
wf_with_concurrency_3 = """---
@ -884,7 +911,7 @@ class WithItemsEngineTest(base.EngineTestCase):
# Start workflow.
wf_ex = self.engine.start_workflow('concurrency_test', {})
wf_ex = db_api.get_execution(wf_ex.id)
wf_ex = db_api.get_workflow_execution(wf_ex.id)
task_ex = wf_ex.task_executions[0]
self.assert_capacity(0, task_ex)
@ -920,15 +947,16 @@ class WithItemsEngineTest(base.EngineTestCase):
self.assert_capacity(3, task_ex)
self.await_execution_success(wf_ex.id)
self.await_workflow_success(wf_ex.id)
task_ex = db_api.get_task_execution(task_ex.id)
with db_api.transaction():
task_ex = db_api.get_task_execution(task_ex.id)
self.assertEqual(states.SUCCESS, task_ex.state)
self.assertEqual(states.SUCCESS, task_ex.state)
# Since we know that we can receive results in random order,
# check is not depend on order of items.
result = data_flow.get_task_execution_result(task_ex)
# Since we know that we can receive results in random order,
# check is not depend on order of items.
result = data_flow.get_task_execution_result(task_ex)
self.assertIsInstance(result, list)
@ -958,9 +986,9 @@ class WithItemsEngineTest(base.EngineTestCase):
# Start workflow.
wf_ex = self.engine.start_workflow('concurrency_test', {})
self.await_execution_success(wf_ex.id)
self.await_workflow_success(wf_ex.id)
wf_ex = db_api.get_execution(wf_ex.id)
wf_ex = db_api.get_workflow_execution(wf_ex.id)
task_ex = self._assert_single_item(
wf_ex.task_executions,
@ -968,7 +996,10 @@ class WithItemsEngineTest(base.EngineTestCase):
state=states.SUCCESS
)
result = data_flow.get_task_execution_result(task_ex)
with db_api.transaction():
task_ex = db_api.get_task_execution(task_ex.id)
result = data_flow.get_task_execution_result(task_ex)
self.assertIsInstance(result, list)
self.assertIn('John', result)
@ -997,7 +1028,7 @@ class WithItemsEngineTest(base.EngineTestCase):
# Start workflow.
wf_ex = self.engine.start_workflow('with_items_retry', {})
self.await_execution_success(wf_ex.id)
self.await_workflow_success(wf_ex.id)
# Note: We need to reread execution to access related tasks.
wf_ex = db_api.get_workflow_execution(wf_ex.id)
@ -1039,7 +1070,7 @@ class WithItemsEngineTest(base.EngineTestCase):
# Start workflow.
wf_ex = self.engine.start_workflow('with_items_retry_concurrency', {})
self.await_execution_success(wf_ex.id)
self.await_workflow_success(wf_ex.id)
# Note: We need to reread execution to access related tasks.
wf_ex = db_api.get_workflow_execution(wf_ex.id)
@ -1073,7 +1104,7 @@ class WithItemsEngineTest(base.EngineTestCase):
env={'name': 'Mistral'}
)
self.await_execution_success(wf_ex.id)
self.await_workflow_success(wf_ex.id)
# Note: We need to reread execution to access related tasks.
wf_ex = db_api.get_workflow_execution(wf_ex.id)
@ -1081,7 +1112,10 @@ class WithItemsEngineTest(base.EngineTestCase):
tasks = wf_ex.task_executions
task1 = self._assert_single_item(tasks, name='task1')
result = data_flow.get_task_execution_result(task1)
with db_api.transaction():
task1 = db_api.get_task_execution(task1.id)
result = data_flow.get_task_execution_result(task1)
self.assertEqual(
[
@ -1120,7 +1154,7 @@ class WithItemsEngineTest(base.EngineTestCase):
# Start workflow.
wf_ex = self.engine.start_workflow('wb1.with_items', {})
self.await_execution_success(wf_ex.id)
self.await_workflow_success(wf_ex.id)
# Note: We need to reread execution to access related tasks.
wf_ex = db_api.get_workflow_execution(wf_ex.id)
@ -1138,8 +1172,12 @@ class WithItemsEngineTest(base.EngineTestCase):
state=states.SUCCESS
)
result_task1 = data_flow.get_task_execution_result(task1_ex)
result_task2 = data_flow.get_task_execution_result(task2_ex)
with db_api.transaction():
task1_ex = db_api.get_task_execution(task1_ex.id)
task2_ex = db_api.get_task_execution(task2_ex.id)
result_task1 = data_flow.get_task_execution_result(task1_ex)
result_task2 = data_flow.get_task_execution_result(task2_ex)
# Since we know that we can receive results in random order,
# check is not depend on order of items.
@ -1185,9 +1223,9 @@ class WithItemsEngineTest(base.EngineTestCase):
names = ["Peter", "Susan", "Edmund", "Lucy", "Aslan", "Caspian"]
wf_ex = self.engine.start_workflow('wb1.main', {'names': names})
self.await_execution_success(wf_ex.id)
self.await_workflow_success(wf_ex.id)
wf_ex = db_api.get_execution(wf_ex.id)
wf_ex = db_api.get_workflow_execution(wf_ex.id)
task_ex = self._assert_single_item(
wf_ex.task_executions,
@ -1195,9 +1233,11 @@ class WithItemsEngineTest(base.EngineTestCase):
state=states.SUCCESS
)
result = [
item['result']
for item in data_flow.get_task_execution_result(task_ex)
]
with db_api.transaction():
task_ex = db_api.get_task_execution(task_ex.id)
task_result = data_flow.get_task_execution_result(task_ex)
result = [item['result'] for item in task_result]
self.assertListEqual(sorted(result), sorted(names))

View File

@ -21,7 +21,6 @@ from mistral.workflow import states
class WorkflowCancelTest(base.EngineTestCase):
def test_cancel_workflow(self):
workflow = """
version: '2.0'
@ -40,6 +39,7 @@ class WorkflowCancelTest(base.EngineTestCase):
"""
wf_service.create_workflows(workflow)
wf_ex = self.engine.start_workflow('wf', {})
self.engine.stop_workflow(
@ -48,18 +48,18 @@ class WorkflowCancelTest(base.EngineTestCase):
"Cancelled by user."
)
self.await_execution_cancelled(wf_ex.id)
self.await_workflow_cancelled(wf_ex.id)
wf_ex = db_api.get_execution(wf_ex.id)
wf_ex = db_api.get_workflow_execution(wf_ex.id)
task_1_ex = self._assert_single_item(
wf_ex.task_executions,
name='task1'
)
self.await_execution_success(task_1_ex.id)
self.await_task_success(task_1_ex.id)
wf_ex = db_api.get_execution(wf_ex.id)
wf_ex = db_api.get_workflow_execution(wf_ex.id)
task_1_ex = self._assert_single_item(
wf_ex.task_executions,
@ -89,11 +89,12 @@ class WorkflowCancelTest(base.EngineTestCase):
"""
wf_service.create_workflows(workflow)
wf_ex = self.engine.start_workflow('wf', {})
self.engine.pause_workflow(wf_ex.id)
self.await_execution_paused(wf_ex.id)
self.await_workflow_paused(wf_ex.id)
self.engine.stop_workflow(
wf_ex.id,
@ -101,18 +102,18 @@ class WorkflowCancelTest(base.EngineTestCase):
"Cancelled by user."
)
self.await_execution_cancelled(wf_ex.id)
self.await_workflow_cancelled(wf_ex.id)
wf_ex = db_api.get_execution(wf_ex.id)
wf_ex = db_api.get_workflow_execution(wf_ex.id)
task_1_ex = self._assert_single_item(
wf_ex.task_executions,
name='task1'
)
self.await_execution_success(task_1_ex.id)
self.await_task_success(task_1_ex.id)
wf_ex = db_api.get_execution(wf_ex.id)
wf_ex = db_api.get_workflow_execution(wf_ex.id)
task_1_ex = self._assert_single_item(
wf_ex.task_executions,
@ -136,9 +137,10 @@ class WorkflowCancelTest(base.EngineTestCase):
"""
wf_service.create_workflows(workflow)
wf_ex = self.engine.start_workflow('wf', {})
self.await_execution_success(wf_ex.id)
self.await_workflow_success(wf_ex.id)
self.engine.stop_workflow(
wf_ex.id,
@ -146,7 +148,7 @@ class WorkflowCancelTest(base.EngineTestCase):
"Cancelled by user."
)
wf_ex = db_api.get_execution(wf_ex.id)
wf_ex = db_api.get_workflow_execution(wf_ex.id)
task_1_ex = self._assert_single_item(
wf_ex.task_executions,
@ -185,6 +187,7 @@ class WorkflowCancelTest(base.EngineTestCase):
"""
wb_service.create_workbook_v2(workbook)
wf_ex = self.engine.start_workflow('wb.wf', {})
self.engine.stop_workflow(
@ -193,24 +196,29 @@ class WorkflowCancelTest(base.EngineTestCase):
"Cancelled by user."
)
self.await_execution_cancelled(wf_ex.id)
self.await_workflow_cancelled(wf_ex.id)
wf_ex = db_api.get_workflow_execution(wf_ex.id)
wf_ex = db_api.get_execution(wf_ex.id)
task_ex = self._assert_single_item(wf_ex.task_executions, name='taskx')
self.await_execution_cancelled(task_ex.id)
self.await_task_cancelled(task_ex.id)
wf_ex = db_api.get_workflow_execution(wf_ex.id)
wf_ex = db_api.get_execution(wf_ex.id)
task_ex = self._assert_single_item(wf_ex.task_executions, name='taskx')
action_exs = db_api.get_action_executions(task_execution_id=task_ex.id)
subwf_execs = db_api.get_workflow_executions(
task_execution_id=task_ex.id
)
self.assertEqual(states.CANCELLED, wf_ex.state)
self.assertEqual("Cancelled by user.", wf_ex.state_info)
self.assertEqual(states.CANCELLED, task_ex.state)
self.assertEqual("Cancelled by user.", task_ex.state_info)
self.assertEqual(1, len(action_exs))
self.assertEqual(states.CANCELLED, action_exs[0].state)
self.assertEqual("Cancelled by user.", action_exs[0].state_info)
self.assertEqual(1, len(subwf_execs))
self.assertEqual(states.CANCELLED, subwf_execs[0].state)
self.assertEqual("Cancelled by user.", subwf_execs[0].state_info)
def test_cancel_child_workflow(self):
workbook = """
@ -239,9 +247,11 @@ class WorkflowCancelTest(base.EngineTestCase):
"""
wb_service.create_workbook_v2(workbook)
wf_ex = self.engine.start_workflow('wb.wf', {})
self.engine.start_workflow('wb.wf', {})
wf_execs = db_api.get_workflow_executions()
wf_ex = self._assert_single_item(wf_execs, name='wb.wf')
task_ex = self._assert_single_item(wf_ex.task_executions, name='taskx')
subwf_ex = self._assert_single_item(wf_execs, name='wb.subwf')
@ -252,11 +262,12 @@ class WorkflowCancelTest(base.EngineTestCase):
"Cancelled by user."
)
self.await_execution_cancelled(subwf_ex.id)
self.await_execution_cancelled(task_ex.id)
self.await_execution_cancelled(wf_ex.id)
self.await_workflow_cancelled(subwf_ex.id)
self.await_task_cancelled(task_ex.id)
self.await_workflow_cancelled(wf_ex.id)
wf_execs = db_api.get_workflow_executions()
wf_ex = self._assert_single_item(wf_execs, name='wb.wf')
task_ex = self._assert_single_item(wf_ex.task_executions, name='taskx')
subwf_ex = self._assert_single_item(wf_execs, name='wb.subwf')
@ -295,6 +306,7 @@ class WorkflowCancelTest(base.EngineTestCase):
wait-before: 1
"""
wb_service.create_workbook_v2(workbook)
wf_ex = self.engine.start_workflow('wb.wf', {})
self.engine.stop_workflow(
@ -303,13 +315,15 @@ class WorkflowCancelTest(base.EngineTestCase):
"Cancelled by user."
)
wf_ex = db_api.get_execution(wf_ex.id)
wf_ex = db_api.get_workflow_execution(wf_ex.id)
task_ex = self._assert_single_item(wf_ex.task_executions, name='taskx')
self.await_execution_cancelled(wf_ex.id)
self.await_execution_cancelled(task_ex.id)
self.await_workflow_cancelled(wf_ex.id)
self.await_task_cancelled(task_ex.id)
wf_execs = db_api.get_workflow_executions()
wf_ex = self._assert_single_item(wf_execs, name='wb.wf')
task_ex = self._assert_single_item(wf_ex.task_executions, name='taskx')
subwf_exs = self._assert_multiple_items(wf_execs, 2, name='wb.subwf')
@ -351,9 +365,11 @@ class WorkflowCancelTest(base.EngineTestCase):
"""
wb_service.create_workbook_v2(workbook)
wf_ex = self.engine.start_workflow('wb.wf', {})
self.engine.start_workflow('wb.wf', {})
wf_execs = db_api.get_workflow_executions()
wf_ex = self._assert_single_item(wf_execs, name='wb.wf')
task_ex = self._assert_single_item(wf_ex.task_executions, name='taskx')
subwf_exs = self._assert_multiple_items(wf_execs, 2, name='wb.subwf')
@ -364,12 +380,13 @@ class WorkflowCancelTest(base.EngineTestCase):
"Cancelled by user."
)
self.await_execution_cancelled(subwf_exs[0].id)
self.await_execution_success(subwf_exs[1].id)
self.await_execution_cancelled(task_ex.id)
self.await_execution_cancelled(wf_ex.id)
self.await_workflow_cancelled(subwf_exs[0].id)
self.await_workflow_success(subwf_exs[1].id)
self.await_task_cancelled(task_ex.id)
self.await_workflow_cancelled(wf_ex.id)
wf_execs = db_api.get_workflow_executions()
wf_ex = self._assert_single_item(wf_execs, name='wb.wf')
task_ex = self._assert_single_item(wf_ex.task_executions, name='taskx')
subwf_exs = self._assert_multiple_items(wf_execs, 2, name='wb.subwf')
@ -411,9 +428,11 @@ class WorkflowCancelTest(base.EngineTestCase):
"""
wb_service.create_workbook_v2(workbook)
wf_ex = self.engine.start_workflow('wb.wf', {})
self.engine.start_workflow('wb.wf', {})
wf_execs = db_api.get_workflow_executions()
wf_ex = self._assert_single_item(wf_execs, name='wb.wf')
task_ex = self._assert_single_item(wf_ex.task_executions, name='taskx')
subwf_exs = self._assert_multiple_items(wf_execs, 2, name='wb.subwf')
@ -430,12 +449,13 @@ class WorkflowCancelTest(base.EngineTestCase):
"Failed by user."
)
self.await_execution_cancelled(subwf_exs[0].id)
self.await_execution_error(subwf_exs[1].id)
self.await_execution_error(task_ex.id)
self.await_execution_error(wf_ex.id)
self.await_workflow_cancelled(subwf_exs[0].id)
self.await_workflow_error(subwf_exs[1].id)
self.await_task_error(task_ex.id)
self.await_workflow_error(wf_ex.id)
wf_execs = db_api.get_workflow_executions()
wf_ex = self._assert_single_item(wf_execs, name='wb.wf')
task_ex = self._assert_single_item(wf_ex.task_executions, name='taskx')
subwf_exs = self._assert_multiple_items(wf_execs, 2, name='wb.subwf')
@ -477,9 +497,11 @@ class WorkflowCancelTest(base.EngineTestCase):
"""
wb_service.create_workbook_v2(workbook)
wf_ex = self.engine.start_workflow('wb.wf', {})
self.engine.start_workflow('wb.wf', {})
wf_execs = db_api.get_workflow_executions()
wf_ex = self._assert_single_item(wf_execs, name='wb.wf')
task_ex = self._assert_single_item(wf_ex.task_executions, name='taskx')
subwf_exs = self._assert_multiple_items(wf_execs, 2, name='wb.subwf')
@ -496,12 +518,13 @@ class WorkflowCancelTest(base.EngineTestCase):
"Cancelled by user."
)
self.await_execution_cancelled(subwf_exs[0].id)
self.await_execution_error(subwf_exs[1].id)
self.await_execution_error(task_ex.id)
self.await_execution_error(wf_ex.id)
self.await_workflow_cancelled(subwf_exs[0].id)
self.await_workflow_error(subwf_exs[1].id)
self.await_task_error(task_ex.id)
self.await_workflow_error(wf_ex.id)
wf_execs = db_api.get_workflow_executions()
wf_ex = self._assert_single_item(wf_execs, name='wb.wf')
task_ex = self._assert_single_item(wf_ex.task_executions, name='taskx')
subwf_exs = self._assert_multiple_items(wf_execs, 2, name='wb.subwf')

View File

@ -197,7 +197,7 @@ class WorkflowResumeTest(base.EngineTestCase):
# Start workflow.
wf_ex = self.engine.start_workflow('wb.wf1', {})
self.await_execution_paused(wf_ex.id)
self.await_workflow_paused(wf_ex.id)
wf_ex = db_api.get_workflow_execution(wf_ex.id)
@ -210,7 +210,7 @@ class WorkflowResumeTest(base.EngineTestCase):
self.assertEqual(2, len(wf_ex.task_executions))
self.await_execution_success(wf_ex.id)
self.await_workflow_success(wf_ex.id)
wf_ex = db_api.get_workflow_execution(wf_ex.id)
@ -242,7 +242,7 @@ class WorkflowResumeTest(base.EngineTestCase):
self.assertEqual(states.RUNNING, wf_ex.state)
self.await_execution_success(wf_ex.id)
self.await_workflow_success(wf_ex.id)
wf_ex = db_api.get_workflow_execution(wf_ex.id)
self.assertEqual(states.SUCCESS, wf_ex.state)
@ -254,7 +254,7 @@ class WorkflowResumeTest(base.EngineTestCase):
# Start workflow.
wf_ex = self.engine.start_workflow('wb.wf1', {})
self.await_execution_paused(wf_ex.id)
self.await_workflow_paused(wf_ex.id)
wf_ex = db_api.get_workflow_execution(wf_ex.id)
@ -263,7 +263,7 @@ class WorkflowResumeTest(base.EngineTestCase):
wf_ex = self.engine.resume_workflow(wf_ex.id)
self.await_execution_success(wf_ex.id)
self.await_workflow_success(wf_ex.id)
wf_ex = db_api.get_workflow_execution(wf_ex.id)
@ -278,7 +278,7 @@ class WorkflowResumeTest(base.EngineTestCase):
# Start workflow.
wf_ex = self.engine.start_workflow('wb.wf1', {})
self.await_execution_paused(wf_ex.id)
self.await_workflow_paused(wf_ex.id)
wf_ex = db_api.get_workflow_execution(wf_ex.id)
@ -298,7 +298,7 @@ class WorkflowResumeTest(base.EngineTestCase):
self.engine.resume_workflow(wf_ex.id)
self.await_execution_success(wf_ex.id)
self.await_workflow_success(wf_ex.id)
wf_ex = db_api.get_workflow_execution(wf_ex.id)
@ -311,7 +311,7 @@ class WorkflowResumeTest(base.EngineTestCase):
# Start workflow.
wf_ex = self.engine.start_workflow('wb.wf1', {})
self.await_execution_paused(wf_ex.id)
self.await_workflow_paused(wf_ex.id)
wf_ex = db_api.get_workflow_execution(wf_ex.id)
@ -343,7 +343,7 @@ class WorkflowResumeTest(base.EngineTestCase):
self.engine.on_action_complete(task2_action_ex.id, utils.Result())
self.await_execution_success(wf_ex.id)
self.await_workflow_success(wf_ex.id)
wf_ex = db_api.get_workflow_execution(wf_ex.id)
@ -356,7 +356,7 @@ class WorkflowResumeTest(base.EngineTestCase):
wf_ex = self.engine.start_workflow('wb.wf1', {})
self.await_execution_paused(wf_ex.id)
self.await_workflow_paused(wf_ex.id)
wf_ex = db_api.get_workflow_execution(wf_ex.id)
@ -388,7 +388,7 @@ class WorkflowResumeTest(base.EngineTestCase):
# Start workflow.
wf_ex = self.engine.start_workflow('wb.wf1', {}, env=env)
self.await_execution_paused(wf_ex.id)
self.await_workflow_paused(wf_ex.id)
wf_ex = db_api.get_workflow_execution(wf_ex.id)
@ -418,7 +418,7 @@ class WorkflowResumeTest(base.EngineTestCase):
# Update the env variables and resume workflow.
self.engine.resume_workflow(wf_ex.id, env=updated_env)
self.await_execution_success(wf_ex.id)
self.await_workflow_success(wf_ex.id)
wf_ex = db_api.get_workflow_execution(wf_ex.id)
@ -434,7 +434,12 @@ class WorkflowResumeTest(base.EngineTestCase):
self.assertEqual(states.SUCCESS, task_2_ex.state)
task_2_result = data_flow.get_task_execution_result(task_2_ex)
# Re-read task execution, otherwise lazy loading of action executions
# may not work.
with db_api.transaction():
task_2_ex = db_api.get_task_execution(task_2_ex.id)
task_2_result = data_flow.get_task_execution_result(task_2_ex)
self.assertEqual(updated_env['var1'], task_2_result)
@ -446,6 +451,11 @@ class WorkflowResumeTest(base.EngineTestCase):
self.assertEqual(states.SUCCESS, task_3_ex.state)
task_3_result = data_flow.get_task_execution_result(task_3_ex)
# Re-read task execution, otherwise lazy loading of action executions
# may not work.
with db_api.transaction():
task_3_ex = db_api.get_task_execution(task_3_ex.id)
task_3_result = data_flow.get_task_execution_result(task_3_ex)
self.assertEqual(updated_env['var2'], task_3_result)

View File

@ -44,9 +44,9 @@ class WorkflowStopTest(base.EngineTestCase):
def test_stop_failed(self):
self.engine.stop_workflow(self.exec_id, states.SUCCESS, "Force stop")
self.await_execution_success(self.exec_id)
self.await_workflow_success(self.exec_id)
wf_ex = db_api.get_execution(self.exec_id)
wf_ex = db_api.get_workflow_execution(self.exec_id)
self.assertEqual(states.SUCCESS, wf_ex.state)
self.assertEqual("Force stop", wf_ex.state_info)
@ -54,9 +54,9 @@ class WorkflowStopTest(base.EngineTestCase):
def test_stop_succeeded(self):
self.engine.stop_workflow(self.exec_id, states.ERROR, "Failure")
self.await_execution_error(self.exec_id)
self.await_workflow_error(self.exec_id)
wf_ex = db_api.get_execution(self.exec_id)
wf_ex = db_api.get_workflow_execution(self.exec_id)
self.assertEqual(states.ERROR, wf_ex.state)
self.assertEqual("Failure", wf_ex.state_info)

View File

@ -53,7 +53,7 @@ class WorkflowVariablesTest(base.EngineTestCase):
# Start workflow.
wf_ex = self.engine.start_workflow('wf', {'param2': 'Renat'})
self.await_execution_success(wf_ex.id)
self.await_workflow_success(wf_ex.id)
# Note: We need to reread execution to access related tasks.
wf_ex = db_api.get_workflow_execution(wf_ex.id)

View File

@ -64,7 +64,7 @@ class YAQLFunctionsEngineTest(engine_test_base.EngineTestCase):
wf_ex = self.engine.start_workflow('wf', {})
self.await_execution_success(wf_ex.id)
self.await_workflow_success(wf_ex.id)
# Reread execution to access related tasks.
wf_ex = db_api.get_workflow_execution(wf_ex.id)
@ -126,7 +126,7 @@ class YAQLFunctionsEngineTest(engine_test_base.EngineTestCase):
wf_ex = self.engine.start_workflow('wf', {})
self.await_execution_error(wf_ex.id)
self.await_workflow_error(wf_ex.id)
wf_ex = db_api.get_workflow_execution(wf_ex.id)

View File

@ -23,8 +23,9 @@ from mistral.tests.unit import base
from oslo_config import cfg
def _load_executions():
def _create_workflow_executions():
time_now = datetime.datetime.now()
wf_execs = [
{
'id': '123',
@ -58,15 +59,6 @@ def _load_executions():
'workflow_name': 'test_exec',
'state': "SUCCESS",
},
{
'id': '654',
'name': 'expired but not a parent',
'created_at': time_now - datetime.timedelta(days=15),
'updated_at': time_now - datetime.timedelta(days=10),
'workflow_name': 'test_exec',
'state': "SUCCESS",
'task_execution_id': '789'
},
{
'id': 'abc',
'name': 'cancelled_expired',
@ -88,6 +80,28 @@ def _load_executions():
for wf_exec in wf_execs:
db_api.create_workflow_execution(wf_exec)
# Create a nested workflow execution.
db_api.create_task_execution(
{
'id': '789',
'workflow_execution_id': '987',
'name': 'my_task'
}
)
db_api.create_workflow_execution(
{
'id': '654',
'name': 'expired but not a parent',
'created_at': time_now - datetime.timedelta(days=15),
'updated_at': time_now - datetime.timedelta(days=10),
'workflow_name': 'test_exec',
'state': "SUCCESS",
'task_execution_id': '789'
}
)
def _switch_context(project_id, is_admin):
_ctx = ctx.MistralContext(
@ -110,13 +124,13 @@ class ExpirationPolicyTest(base.DbTestCase):
# we want to load the executions with other project_id.
_switch_context('non_admin_project', False)
_load_executions()
_create_workflow_executions()
now = datetime.datetime.now()
# This execution has a parent wf and testing that we are
# querying only for parent wfs.
exec_child = db_api.get_execution('654')
exec_child = db_api.get_workflow_execution('654')
self.assertEqual('789', exec_child.task_execution_id)

View File

@ -96,7 +96,7 @@ class DirectWorkflowControllerTest(base.DbTestCase):
get_task_execution.return_value = task1_ex
task1_ex.executions.append(
task1_ex.action_executions.append(
models.ActionExecution(
name='std.echo',
workflow_name='wf',
@ -119,7 +119,7 @@ class DirectWorkflowControllerTest(base.DbTestCase):
# Now assume that 'task2' completed successfully.
task2_ex = self._create_task_execution('task2', states.SUCCESS)
task2_ex.executions.append(
task2_ex.action_executions.append(
models.ActionExecution(
name='std.echo',
workflow_name='wf',

View File

@ -31,17 +31,21 @@ class WithItemsTest(base.BaseTest):
def test_get_indices(self):
# Task execution for running 6 items with concurrency=3.
task_ex = models.TaskExecution(
spec={
'action': 'myaction'
},
runtime_context={
'with_items_context': {
'capacity': 3,
'count': 6
}
},
executions=[]
action_executions=[],
workflow_executions=[]
)
# Set 3 items: 2 success and 1 error unaccepted.
task_ex.executions += [
task_ex.action_executions += [
self.get_action_ex(True, states.SUCCESS, 0),
self.get_action_ex(True, states.SUCCESS, 1),
self.get_action_ex(False, states.ERROR, 2)

View File

@ -19,7 +19,6 @@ from oslo_config import cfg
from oslo_log import log as logging
from mistral import context as auth_ctx
from mistral.db.v2 import api as db_api
from mistral.db.v2.sqlalchemy import models
from mistral import expressions as expr
from mistral import utils
@ -65,20 +64,14 @@ def invalidate_task_execution_result(task_ex):
def get_task_execution_result(task_ex):
# Use of task_ex.executions requires a session to lazy load the action
# executions. This get_task_execution_result method is also invoked
# from get_all in the task execution API controller. If there is a lot of
# read against the API, it will lead to a lot of unnecessary DB locks
# which result in possible deadlocks and WF execution failures. Therefore,
# use db_api.get_action_executions here to avoid session-less use cases.
action_execs = db_api.get_action_executions(task_execution_id=task_ex.id)
action_execs.sort(
execs = task_ex.executions
execs.sort(
key=lambda x: x.runtime_context.get('index')
)
results = [
_extract_execution_result(ex)
for ex in action_execs
for ex in execs
if hasattr(ex, 'output') and ex.accepted
]

View File

@ -16,7 +16,6 @@
import copy
import six
from mistral.db.v2 import api as db_api
from mistral import exceptions as exc
from mistral.workflow import states
@ -45,13 +44,11 @@ def get_count(task_ex):
def is_completed(task_ex):
action_exs = db_api.get_action_executions(
task_execution_id=task_ex.id,
accepted=True
)
execs = list(filter(lambda t: t.accepted, task_ex.executions))
count = get_count(task_ex) or 1
return count == len(action_exs)
return count == len(execs)
def get_index(task_ex):
@ -89,7 +86,7 @@ def _get_with_item_indices(exs):
return sorted(set([ex.runtime_context['index'] for ex in exs]))
def _get_accepted_act_exs(task_ex):
def _get_accepted_executions(task_ex):
# Choose only if not accepted but completed.
return list(
filter(
@ -99,7 +96,7 @@ def _get_accepted_act_exs(task_ex):
)
def _get_unaccepted_act_exs(task_ex):
def _get_unaccepted_executions(task_ex):
# Choose only if not accepted but completed.
return list(
filter(
@ -113,8 +110,8 @@ def get_indices_for_loop(task_ex):
capacity = _get_context(task_ex)[_CAPACITY]
count = get_count(task_ex)
accepted = _get_with_item_indices(_get_accepted_act_exs(task_ex))
unaccepted = _get_with_item_indices(_get_unaccepted_act_exs(task_ex))
accepted = _get_with_item_indices(_get_accepted_executions(task_ex))
unaccepted = _get_with_item_indices(_get_unaccepted_executions(task_ex))
candidates = sorted(list(set(unaccepted) - set(accepted)))
if candidates: