Refactoring task output: full redesign of DB models

* Created a Execution hierarcy mapped on a single table:
  - ActionExecution
  - WorkflowExecution
  - TaskExecution
* Created necessary associations within Execution hierarchy
* Created necessary methods for Execution objects on DB API
* Created Definition hierarchy mapped on separate tables:
  - Workbook
  - WorkflowDefinition
  - ActionDefinition
* Renamed attributes
  - 'wf_name' -> 'workflow_name'
  - 'wf_spec' -> 'spec'
* Fixed all unit tests

TODO:
* Complete renaming throughout the code
* Further refactoring of workflow engine

Change-Id: I0032bea573d9200025f9f7dd951e93cb6f1661bb
This commit is contained in:
Renat Akhmerov 2015-02-25 18:36:30 +06:00
parent 7707bd5b27
commit bdefdc6a6f
58 changed files with 1463 additions and 1157 deletions

View File

@ -224,11 +224,14 @@ class MistralHTTPAction(HTTPAction):
allow_redirects=None,
proxies=None,
verify=None):
a_ctx = action_context
headers = headers or {}
headers.update({
'Mistral-Workflow-Name': action_context.get('workflow_name'),
'Mistral-Execution-Id': action_context.get('execution_id'),
'Mistral-Task-Id': action_context.get('task_id'),
'Mistral-Workflow-Name': a_ctx.get('workflow_name'),
'Mistral-Execution-Id': a_ctx.get('workflow_execution_id'),
'Mistral-Task-Id': a_ctx.get('task_id'),
})
super(MistralHTTPAction, self).__init__(

View File

@ -87,7 +87,7 @@ class ActionsController(rest.RestController, hooks.HookController):
"""Return the named action."""
LOG.info("Fetch action [name=%s]" % name)
db_model = db_api.get_action(name)
db_model = db_api.get_action_definition(name)
return Action.from_dict(db_model.to_dict())
@ -136,13 +136,13 @@ class ActionsController(rest.RestController, hooks.HookController):
LOG.info("Delete action [name=%s]" % name)
with db_api.transaction():
db_model = db_api.get_action(name)
db_model = db_api.get_action_definition(name)
if db_model.is_system:
msg = "Attempt to delete a system action: %s" % name
raise exc.DataAccessException(msg)
db_api.delete_action(name)
db_api.delete_action_definition(name)
@wsme_pecan.wsexpose(Actions)
def get_all(self):
@ -154,6 +154,6 @@ class ActionsController(rest.RestController, hooks.HookController):
LOG.info("Fetch actions.")
action_list = [Action.from_dict(db_model.to_dict())
for db_model in db_api.get_actions()]
for db_model in db_api.get_action_definitions()]
return Actions(actions=action_list)

View File

@ -90,8 +90,6 @@ class Execution(resource.Resource):
if params:
setattr(e, 'params', json.dumps(params))
setattr(e, 'workflow_name', d['wf_name'])
return e
@classmethod
@ -125,7 +123,7 @@ class ExecutionsController(rest.RestController):
"""Return the specified Execution."""
LOG.info("Fetch execution [id=%s]" % id)
return Execution.from_dict(db_api.get_execution(id).to_dict())
return Execution.from_dict(db_api.get_workflow_execution(id).to_dict())
@rest_utils.wrap_wsme_controller_exception
@wsme_pecan.wsexpose(Execution, wtypes.text, body=Execution)
@ -137,7 +135,7 @@ class ExecutionsController(rest.RestController):
"""
LOG.info("Update execution [id=%s, execution=%s]" %
(id, execution))
db_api.ensure_execution_exists(id)
db_api.ensure_workflow_execution_exists(id)
# Currently we can change only state.
if not execution.state:
@ -192,7 +190,7 @@ class ExecutionsController(rest.RestController):
"""Delete the specified Execution."""
LOG.info("Delete execution [id=%s]" % id)
return db_api.delete_execution(id)
return db_api.delete_workflow_execution(id)
@wsme_pecan.wsexpose(Executions)
def get_all(self):
@ -200,6 +198,6 @@ class ExecutionsController(rest.RestController):
LOG.info("Fetch executions")
executions = [Execution.from_dict(db_model.to_dict())
for db_model in db_api.get_executions()]
for db_model in db_api.get_workflow_executions()]
return Executions(executions=executions)

View File

@ -39,9 +39,8 @@ class Task(resource.Resource):
id = wtypes.text
name = wtypes.text
# TODO(rakhmerov): Inconsistent with 'workflow_name' for executions.
wf_name = wtypes.text
execution_id = wtypes.text
workflow_name = wtypes.text
workflow_execution_id = wtypes.text
state = wtypes.text
"state can take one of the following values: \
@ -71,8 +70,8 @@ class Task(resource.Resource):
def sample(cls):
return cls(
id='123e4567-e89b-12d3-a456-426655440000',
wf_name='book',
execution_id='123e4567-e89b-12d3-a456-426655440000',
workflow_name='flow',
workflow_execution_id='123e4567-e89b-12d3-a456-426655440000',
name='task',
description='tell when you are done',
state=states.SUCCESS,
@ -102,7 +101,7 @@ class TasksController(rest.RestController):
"""Return the specified task."""
LOG.info("Fetch task [id=%s]" % id)
db_model = db_api.get_task(id)
db_model = db_api.get_task_execution(id)
return Task.from_dict(db_model.to_dict())
@ -139,18 +138,23 @@ class TasksController(rest.RestController):
LOG.info("Fetch tasks")
tasks = [Task.from_dict(db_model.to_dict())
for db_model in db_api.get_tasks()]
for db_model in db_api.get_task_executions()]
return Tasks(tasks=tasks)
class ExecutionTasksController(rest.RestController):
@wsme_pecan.wsexpose(Tasks, wtypes.text)
def get_all(self, execution_id):
"""Return all tasks within the execution."""
def get_all(self, workflow_execution_id):
"""Return all tasks within the workflow execution."""
LOG.info("Fetch tasks")
tasks = [Task.from_dict(db_model.to_dict())
for db_model in db_api.get_tasks(execution_id=execution_id)]
task_execs = db_api.get_task_executions(
workflow_execution_id=workflow_execution_id
)
return Tasks(tasks=tasks)
return Tasks(
tasks=[
Task.from_dict(db_model.to_dict()) for db_model in task_execs
]
)

View File

@ -95,7 +95,7 @@ class WorkflowsController(rest.RestController, hooks.HookController):
"""Return the named workflow."""
LOG.info("Fetch workflow [name=%s]" % name)
db_model = db_api.get_workflow(name)
db_model = db_api.get_workflow_definition(name)
return Workflow.from_dict(db_model.to_dict())
@ -144,7 +144,7 @@ class WorkflowsController(rest.RestController, hooks.HookController):
"""Delete the named workflow."""
LOG.info("Delete workflow [name=%s]" % name)
db_api.delete_workflow(name)
db_api.delete_workflow_definition(name)
@wsme_pecan.wsexpose(Workflows)
def get_all(self):
@ -156,6 +156,6 @@ class WorkflowsController(rest.RestController, hooks.HookController):
LOG.info("Fetch workflows.")
workflows_list = [Workflow.from_dict(db_model.to_dict())
for db_model in db_api.get_workflows()]
for db_model in db_api.get_workflow_definitions()]
return Workflows(workflows=workflows_list)

View File

@ -24,6 +24,7 @@ from sqlalchemy.ext import declarative
from sqlalchemy.orm import attributes
from mistral.services import security
from mistral import utils
def _generate_unicode_uuid():
@ -31,9 +32,11 @@ def _generate_unicode_uuid():
def id_column():
return sa.Column(sa.String(36),
primary_key=True,
default=_generate_unicode_uuid)
return sa.Column(
sa.String(36),
primary_key=True,
default=_generate_unicode_uuid
)
class _MistralModelBase(oslo_models.ModelBase, oslo_models.TimestampMixin):
@ -41,12 +44,21 @@ class _MistralModelBase(oslo_models.ModelBase, oslo_models.TimestampMixin):
__table__ = None
def __init__(self, **kwargs):
for key, value in kwargs.items():
setattr(self, key, value)
def __eq__(self, other):
if type(self) is not type(other):
return False
for col in self.__table__.columns:
if getattr(self, col.name) != getattr(other, col.name):
# In case of single table inheritance a class attribute
# corresponding to a table column may not exist so we need
# to skip these attributes.
if (hasattr(self, col.name)
and hasattr(other, col.name)
and getattr(self, col.name) != getattr(other, col.name)):
return False
return True
@ -61,7 +73,7 @@ class _MistralModelBase(oslo_models.ModelBase, oslo_models.TimestampMixin):
unloaded = attributes.instance_state(self).unloaded
for col in self.__table__.columns:
if col.name not in unloaded:
if col.name not in unloaded and hasattr(self, col.name):
d[col.name] = getattr(self, col.name)
datetime_to_str(d, 'created_at')
@ -85,7 +97,7 @@ MistralModelBase = declarative.declarative_base(cls=_MistralModelBase)
class MistralSecureModelBase(MistralModelBase):
"""Mixin adding model properties related to security."""
"""Base class for all secure models."""
__abstract__ = True
@ -99,10 +111,11 @@ def _set_project_id(target, value, oldvalue, initiator):
def register_secure_model_hooks():
# Make sure 'project_id' is always properly set.
for sec_model_class in MistralSecureModelBase.__subclasses__():
event.listen(
sec_model_class.project_id,
'set',
_set_project_id,
retval=True
)
for sec_model_class in utils.iter_subclasses(MistralSecureModelBase):
if '__abstract__' not in sec_model_class.__dict__:
event.listen(
sec_model_class.project_id,
'set',
_set_project_id,
retval=True
)

View File

@ -94,43 +94,78 @@ def delete_workbook(name):
def delete_workbooks(**kwargs):
IMPL.delete_workbooks(**kwargs)
# Workflows.
# Workflow definitions.
def get_workflow_definition(name):
return IMPL.get_workflow_definition(name)
def get_workflow(name):
return IMPL.get_workflow(name)
def load_workflow_definition(name):
"""Unlike get_workflow_definition this method is allowed to return None."""
return IMPL.load_workflow_definition(name)
def load_workflow(name):
"""Unlike get_workflow this method is allowed to return None."""
return IMPL.load_workflow(name)
def get_workflow_definitions():
return IMPL.get_workflow_definitions()
def get_workflows():
return IMPL.get_workflows()
def create_workflow_definition(values):
return IMPL.create_workflow_definition(values)
def create_workflow(values):
return IMPL.create_workflow(values)
def update_workflow_definition(name, values):
return IMPL.update_workflow_definition(name, values)
def update_workflow(name, values):
return IMPL.update_workflow(name, values)
def create_or_update_workflow_definition(name, values):
return IMPL.create_or_update_workflow_definition(name, values)
def create_or_update_workflow(name, values):
return IMPL.create_or_update_workflow(name, values)
def delete_workflow_definition(name):
IMPL.delete_workflow_definition(name)
def delete_workflow(name):
IMPL.delete_workflow(name)
def delete_workflow_definitions(**kwargs):
IMPL.delete_workflow_definitions(**kwargs)
def delete_workflows(**kwargs):
IMPL.delete_workflows(**kwargs)
# Action definitions.
def get_action_definition(name):
return IMPL.get_action_definition(name)
# Executions.
def load_action_definition(name):
"""Unlike get_action_definition this method is allowed to return None."""
return IMPL.load_action_definition(name)
def get_action_definitions(**kwargs):
return IMPL.get_action_definitions(**kwargs)
def create_action_definition(values):
return IMPL.create_action_definition(values)
def update_action_definition(name, values):
return IMPL.update_action_definition(name, values)
def create_or_update_action_definition(name, values):
return IMPL.create_or_update_action_definition(name, values)
def delete_action_definition(name):
return IMPL.delete_action_definition(name)
def delete_action_definitions(**kwargs):
return IMPL.delete_action_definitions(**kwargs)
# Common executions.
def get_execution(id):
return IMPL.get_execution(id)
@ -169,46 +204,78 @@ def delete_executions(**kwargs):
IMPL.delete_executions(**kwargs)
# Tasks.
# Workflow executions.
def get_task(id):
return IMPL.get_task(id)
def get_workflow_execution(id):
return IMPL.get_workflow_execution(id)
def load_task(name):
"""Unlike get_task this method is allowed to return None."""
return IMPL.load_task(name)
def load_workflow_execution(name):
"""Unlike get_execution this method is allowed to return None."""
return IMPL.load_workflow_execution(name)
def get_tasks(**kwargs):
return IMPL.get_tasks(**kwargs)
def get_workflow_executions(**kwargs):
return IMPL.get_workflow_executions(**kwargs)
def create_task(values):
return IMPL.create_task(values)
def ensure_workflow_execution_exists(id):
return IMPL.ensure_workflow_execution_exists(id)
def update_task(id, values):
return IMPL.update_task(id, values)
def create_workflow_execution(values):
return IMPL.create_workflow_execution(values)
def create_or_update_task(id, values):
return IMPL.create_or_update_task(id, values)
def update_workflow_execution(id, values):
return IMPL.update_workflow_execution(id, values)
def delete_task(id):
return IMPL.delete_task(id)
def create_or_update_workflow_execution(id, values):
return IMPL.create_or_update_workflow_execution(id, values)
def delete_tasks(**kwargs):
return IMPL.delete_tasks(**kwargs)
def delete_workflow_execution(id):
return IMPL.delete_workflow_execution(id)
# Action invocations.
def delete_workflow_executions(**kwargs):
IMPL.delete_workflow_executions(**kwargs)
def delete_action_invocations(**kwargs):
return IMPL.delete_action_invocations(**kwargs)
# Tasks executions.
def get_task_execution(id):
return IMPL.get_task_execution(id)
def load_task_execution(name):
"""Unlike get_task_execution this method is allowed to return None."""
return IMPL.load_task_execution(name)
def get_task_executions(**kwargs):
return IMPL.get_task_executions(**kwargs)
def create_task_execution(values):
return IMPL.create_task_execution(values)
def update_task_execution(id, values):
return IMPL.update_task_execution(id, values)
def create_or_update_task_execution(id, values):
return IMPL.create_or_update_task_execution(id, values)
def delete_task_execution(id):
return IMPL.delete_task_execution(id)
def delete_task_executions(**kwargs):
return IMPL.delete_task_executions(**kwargs)
# Delayed calls.
@ -226,41 +293,6 @@ def get_delayed_calls_to_start(time):
return IMPL.get_delayed_calls_to_start(time)
# Actions.
def get_action(name):
return IMPL.get_action(name)
def load_action(name):
"""Unlike get_action this method is allowed to return None."""
return IMPL.load_action(name)
def get_actions(**kwargs):
return IMPL.get_actions(**kwargs)
def create_action(values):
return IMPL.create_action(values)
def update_action(name, values):
return IMPL.update_action(name, values)
def create_or_update_action(name, values):
return IMPL.create_or_update_action(name, values)
def delete_action(name):
return IMPL.delete_action(name)
def delete_actions(**kwargs):
return IMPL.delete_actions(**kwargs)
# Cron triggers.
def get_cron_trigger(name):

View File

@ -42,7 +42,7 @@ def get_backend():
def setup_db():
try:
models.WorkbookDefinition.metadata.create_all(b.get_engine())
models.Workbook.metadata.create_all(b.get_engine())
except sa.exc.OperationalError as e:
raise exc.DBException("Failed to setup database: %s" % e)
@ -51,7 +51,7 @@ def drop_db():
global _facade
try:
models.WorkbookDefinition.metadata.drop_all(b.get_engine())
models.Workbook.metadata.drop_all(b.get_engine())
_facade = None
except Exception as e:
raise exc.DBException("Failed to drop database: %s" % e)
@ -121,7 +121,7 @@ def _get_db_object_by_id(model, id):
return _secure_query(model).filter_by(id=id).first()
# Workbooks.
# Workbook definitions.
def get_workbook(name):
wb = _get_workbook(name)
@ -138,20 +138,21 @@ def load_workbook(name):
def get_workbooks(**kwargs):
return _get_collection_sorted_by_name(models.WorkbookDefinition, **kwargs)
return _get_collection_sorted_by_name(models.Workbook, **kwargs)
@b.session_aware()
def create_workbook(values, session=None):
wb = models.WorkbookDefinition()
wb = models.Workbook()
wb.update(values.copy())
try:
wb.save(session=session)
except db_exc.DBDuplicateEntry as e:
raise exc.DBDuplicateEntry("Duplicate entry for Workbook: %s"
% e.columns)
raise exc.DBDuplicateEntry(
"Duplicate entry for WorkbookDefinition: %s" % e.columns
)
return wb
@ -171,9 +172,7 @@ def update_workbook(name, values, session=None):
@b.session_aware()
def create_or_update_workbook(name, values, session=None):
wb = _get_workbook(name)
if not wb:
if not _get_workbook(name):
return create_workbook(values)
else:
return update_workbook(name, values)
@ -191,36 +190,37 @@ def delete_workbook(name, session=None):
def _get_workbook(name):
return _get_db_object_by_name(models.WorkbookDefinition, name)
return _get_db_object_by_name(models.Workbook, name)
@b.session_aware()
def delete_workbooks(**kwargs):
return _delete_all(models.WorkbookDefinition, **kwargs)
return _delete_all(models.Workbook, **kwargs)
# Workflows.
# Workflow definitions.
def get_workflow(name):
wf = _get_workflow(name)
def get_workflow_definition(name):
wf = _get_workflow_definition(name)
if not wf:
raise exc.NotFoundException(
"Workflow not found [workflow_name=%s]" % name)
"Workflow not found [workflow_name=%s]" % name
)
return wf
def load_workflow(name):
return _get_workflow(name)
def load_workflow_definition(name):
return _get_workflow_definition(name)
def get_workflows(**kwargs):
def get_workflow_definitions(**kwargs):
return _get_collection_sorted_by_name(models.WorkflowDefinition, **kwargs)
@b.session_aware()
def create_workflow(values, session=None):
def create_workflow_definition(values, session=None):
wf = models.WorkflowDefinition()
wf.update(values.copy())
@ -228,15 +228,16 @@ def create_workflow(values, session=None):
try:
wf.save(session=session)
except db_exc.DBDuplicateEntry as e:
raise exc.DBDuplicateEntry("Duplicate entry for workflow: %s"
% e.columns)
raise exc.DBDuplicateEntry(
"Duplicate entry for WorkflowDefinition: %s" % e.columns
)
return wf
@b.session_aware()
def update_workflow(name, values, session=None):
wf = _get_workflow(name)
def update_workflow_definition(name, values, session=None):
wf = _get_workflow_definition(name)
if not wf:
raise exc.NotFoundException(
@ -248,43 +249,122 @@ def update_workflow(name, values, session=None):
@b.session_aware()
def create_or_update_workflow(name, values, session=None):
wf = _get_workflow(name)
if not wf:
return create_workflow(values)
def create_or_update_workflow_definition(name, values, session=None):
if not _get_workflow_definition(name):
return create_workflow_definition(values)
else:
return update_workflow(name, values)
return update_workflow_definition(name, values)
@b.session_aware()
def delete_workflow(name, session=None):
wf = _get_workflow(name)
def delete_workflow_definition(name, session=None):
wf = _get_workflow_definition(name)
if not wf:
raise exc.NotFoundException(
"Workflow not found [workflow_name=%s]" % name)
"Workflow not found [workflow_name=%s]" % name
)
session.delete(wf)
@b.session_aware()
def delete_workflows(**kwargs):
def delete_workflow_definitions(**kwargs):
return _delete_all(models.WorkflowDefinition, **kwargs)
def _get_workflow(name):
def _get_workflow_definition(name):
return _get_db_object_by_name(models.WorkflowDefinition, name)
# Executions.
# Action definitions.
def get_action_definition(name):
a_def = _get_action_definition(name)
if not a_def:
raise exc.NotFoundException(
"Action definition not found [action_name=%s]" % name
)
return a_def
def load_action_definition(name):
return _get_action_definition(name)
def get_action_definitions(**kwargs):
return _get_collection_sorted_by_name(models.ActionDefinition, **kwargs)
@b.session_aware()
def create_action_definition(values, session=None):
a_def = models.ActionDefinition()
a_def.update(values)
try:
a_def.save(session=session)
except db_exc.DBDuplicateEntry as e:
raise exc.DBDuplicateEntry(
"Duplicate entry for action %s: %s" % (a_def.name, e.columns)
)
return a_def
@b.session_aware()
def update_action_definition(name, values, session=None):
a_def = _get_action_definition(name)
if not a_def:
raise exc.NotFoundException(
"Action definition not found [action_name=%s]" % name)
a_def.update(values.copy())
return a_def
@b.session_aware()
def create_or_update_action_definition(name, values, session=None):
if not _get_action_definition(name):
return create_action_definition(values)
else:
return update_action_definition(name, values)
@b.session_aware()
def delete_action_definition(name, session=None):
a_def = _get_action_definition(name)
if not a_def:
raise exc.NotFoundException(
"Action definition not found [action_name=%s]" % name
)
session.delete(a_def)
@b.session_aware()
def delete_action_definitions(**kwargs):
return _delete_all(models.ActionDefinition, **kwargs)
def _get_action_definition(name):
return _get_db_object_by_name(models.ActionDefinition, name)
# Common executions.
def get_execution(id):
execution = _get_execution(id)
if not execution:
raise exc.NotFoundException(
"Execution not found [execution_id=%s]" % id)
"Execution not found [execution_id=%s]" % id
)
return execution
@ -303,37 +383,37 @@ def get_executions(**kwargs):
@b.session_aware()
def create_execution(values, session=None):
execution = models.WorkflowExecution()
ex = models.Execution()
execution.update(values.copy())
ex.update(values.copy())
try:
execution.save(session=session)
ex.save(session=session)
except db_exc.DBDuplicateEntry as e:
raise exc.DBDuplicateEntry("Duplicate entry for Execution: %s"
% e.columns)
raise exc.DBDuplicateEntry(
"Duplicate entry for Execution: %s" % e.columns
)
return execution
return ex
@b.session_aware()
def update_execution(id, values, session=None):
execution = _get_execution(id)
ex = _get_execution(id)
if not execution:
if not ex:
raise exc.NotFoundException(
"Execution not found [execution_id=%s]" % id)
"Execution not found [execution_id=%s]" % id
)
execution.update(values.copy())
ex.update(values.copy())
return execution
return ex
@b.session_aware()
def create_or_update_execution(id, values, session=None):
execution = _get_execution(id)
if not execution:
if not _get_execution(id):
return create_execution(values)
else:
return update_execution(id, values)
@ -341,119 +421,190 @@ def create_or_update_execution(id, values, session=None):
@b.session_aware()
def delete_execution(id, session=None):
execution = _get_execution(id)
ex = _get_execution(id)
if not execution:
if not ex:
raise exc.NotFoundException(
"Execution not found [execution_id=%s]" % id)
session.delete(execution)
session.delete(ex)
@b.session_aware()
def delete_executions(**kwargs):
_delete_all(models.TaskExecution)
return _delete_all(models.WorkflowExecution, **kwargs)
return _delete_all(models.Execution, **kwargs)
def _get_executions(**kwargs):
return _get_collection_sorted_by_time(models.WorkflowExecution, **kwargs)
return _get_collection_sorted_by_time(models.Execution, **kwargs)
def _get_execution(id):
return _get_db_object_by_id(models.Execution, id)
# Workflow executions.
def get_workflow_execution(id):
wf_ex = _get_workflow_execution(id)
if not wf_ex:
raise exc.NotFoundException(
"Execution not found [execution_id=%s]" % id)
return wf_ex
def load_workflow_execution(id):
return _get_workflow_execution(id)
def ensure_workflow_execution_exists(id):
get_workflow_execution(id)
def get_workflow_executions(**kwargs):
return _get_workflow_executions(**kwargs)
@b.session_aware()
def create_workflow_execution(values, session=None):
wf_ex = models.WorkflowExecution()
wf_ex.update(values.copy())
try:
wf_ex.save(session=session)
except db_exc.DBDuplicateEntry as e:
raise exc.DBDuplicateEntry("Duplicate entry for Execution: %s"
% e.columns)
return wf_ex
@b.session_aware()
def update_workflow_execution(id, values, session=None):
wf_ex = _get_workflow_execution(id)
if not wf_ex:
raise exc.NotFoundException(
"Execution not found [execution_id=%s]" % id)
wf_ex.update(values.copy())
return wf_ex
@b.session_aware()
def create_or_update_workflow_execution(id, values, session=None):
if not _get_workflow_execution(id):
return create_workflow_execution(values)
else:
return update_workflow_execution(id, values)
@b.session_aware()
def delete_workflow_execution(id, session=None):
wf_ex = _get_workflow_execution(id)
if not wf_ex:
raise exc.NotFoundException(
"Execution not found [execution_id=%s]" % id)
session.delete(wf_ex)
@b.session_aware()
def delete_workflow_executions(**kwargs):
return _delete_all(models.WorkflowExecution, **kwargs)
def _get_workflow_executions(**kwargs):
return _get_collection_sorted_by_time(models.WorkflowExecution, **kwargs)
def _get_workflow_execution(id):
return _get_db_object_by_id(models.WorkflowExecution, id)
# Tasks.
# Tasks executions.
def get_task(id):
task = _get_task(id)
def get_task_execution(id):
task_ex = _get_task_execution(id)
if not task:
raise exc.NotFoundException(
"Task not found [task_id=%s]" % id)
if not task_ex:
raise exc.NotFoundException("Task not found [task_id=%s]" % id)
return task
return task_ex
def load_task(id):
return _get_task(id)
def load_task_execution(id):
return _get_task_execution(id)
def get_tasks(**kwargs):
return _get_tasks(**kwargs)
def get_task_executions(**kwargs):
return _get_task_executions(**kwargs)
@b.session_aware()
def create_task(values, session=None):
task = models.TaskExecution()
def create_task_execution(values, session=None):
task_ex = models.TaskExecution()
task.update(values)
task_ex.update(values)
try:
task.save(session=session)
task_ex.save(session=session)
except db_exc.DBDuplicateEntry as e:
raise exc.DBDuplicateEntry("Duplicate entry for Task: %s"
% e.columns)
raise exc.DBDuplicateEntry("Duplicate entry for Task: %s" % e.columns)
return task
return task_ex
@b.session_aware()
def update_task(id, values, session=None):
task = _get_task(id)
def update_task_execution(id, values, session=None):
task_ex = _get_task_execution(id)
if not task:
if not task_ex:
raise exc.NotFoundException(
"Task not found [task_id=%s]" % id)
task.update(values.copy())
task_ex.update(values.copy())
return task
return task_ex
@b.session_aware()
def create_or_update_task(id, values, session=None):
task = _get_task(id)
if not task:
return create_task(values)
def create_or_update_task_execution(id, values, session=None):
if not _get_task_execution(id):
return create_task_execution(values)
else:
return update_task(id, values)
return update_task_execution(id, values)
@b.session_aware()
def delete_task(id, session=None):
task = _get_task(id)
def delete_task_execution(id, session=None):
task_ex = _get_task_execution(id)
if not task:
if not task_ex:
raise exc.NotFoundException(
"Task not found [task_id=%s]" % id)
session.delete(task)
session.delete(task_ex)
@b.session_aware()
def delete_tasks(**kwargs):
def delete_task_executions(**kwargs):
return _delete_all(models.TaskExecution, **kwargs)
def _get_task(id):
def _get_task_execution(id):
return _get_db_object_by_id(models.TaskExecution, id)
def _get_tasks(**kwargs):
def _get_task_executions(**kwargs):
return _get_collection_sorted_by_time(models.TaskExecution, **kwargs)
# Action invocations.
@b.session_aware()
def delete_action_invocations(**kwargs):
return _delete_all(models.ActionExecution, **kwargs)
# Delayed calls.
@b.session_aware()
@ -499,84 +650,6 @@ def _get_delayed_call(delayed_call_id, session=None):
return query.filter_by(id=delayed_call_id).first()
# Actions.
def get_action(name):
action = _get_action(name)
if not action:
raise exc.NotFoundException(
"Action not found [action_name=%s]" % name)
return action
def load_action(name):
return _get_action(name)
def get_actions(**kwargs):
return _get_collection_sorted_by_name(models.ActionDefinition, **kwargs)
@b.session_aware()
def create_action(values, session=None):
action = models.ActionDefinition()
action.update(values)
try:
action.save(session=session)
except db_exc.DBDuplicateEntry as e:
raise exc.DBDuplicateEntry("Duplicate entry for action %s: %s"
% (action.name, e.columns))
return action
@b.session_aware()
def update_action(name, values, session=None):
action = _get_action(name)
if not action:
raise exc.NotFoundException(
"Action not found [action_name=%s]" % name)
action.update(values.copy())
return action
@b.session_aware()
def create_or_update_action(name, values, session=None):
action = _get_action(name)
if not action:
return create_action(values)
else:
return update_action(name, values)
@b.session_aware()
def delete_action(name, session=None):
action = _get_action(name)
if not action:
raise exc.NotFoundException(
"Action not found [action_name=%s]" % name)
session.delete(action)
@b.session_aware()
def delete_actions(**kwargs):
return _delete_all(models.ActionDefinition, **kwargs)
def _get_action(name):
return _get_db_object_by_name(models.ActionDefinition, name)
# Cron triggers.
def get_cron_trigger(name):

View File

@ -17,6 +17,8 @@ import hashlib
import json
import sqlalchemy as sa
from sqlalchemy import event
from sqlalchemy.ext.declarative import declared_attr
from sqlalchemy.orm import backref
from sqlalchemy.orm import relationship
from mistral.db.sqlalchemy import model_base as mb
@ -27,7 +29,18 @@ from mistral import utils
# Definition objects.
class WorkbookDefinition(mb.MistralSecureModelBase):
class Definition(mb.MistralSecureModelBase):
__abstract__ = True
id = mb.id_column()
name = sa.Column(sa.String(80))
definition = sa.Column(sa.Text(), nullable=True)
spec = sa.Column(st.JsonDictType())
tags = sa.Column(st.JsonListType())
# There's no WorkbookExecution so we safely omit "Definition" in the name.
class Workbook(Definition):
"""Contains info about workbook (including definition in Mistral DSL)."""
__tablename__ = 'workbooks_v2'
@ -36,105 +49,124 @@ class WorkbookDefinition(mb.MistralSecureModelBase):
sa.UniqueConstraint('name', 'project_id'),
)
id = mb.id_column()
name = sa.Column(sa.String(80))
definition = sa.Column(sa.Text(), nullable=True)
spec = sa.Column(st.JsonDictType())
tags = sa.Column(st.JsonListType())
class WorkflowDefinition(mb.MistralSecureModelBase):
class WorkflowDefinition(Definition):
"""Contains info about workflow (including definition in Mistral DSL)."""
__tablename__ = 'workflows_v2'
__tablename__ = 'workflow_definitions_v2'
__table_args__ = (
sa.UniqueConstraint('name', 'project_id'),
)
id = mb.id_column()
name = sa.Column(sa.String(80))
definition = sa.Column(sa.Text(), nullable=True)
spec = sa.Column(st.JsonDictType())
tags = sa.Column(st.JsonListType())
class ActionDefinition(mb.MistralSecureModelBase):
class ActionDefinition(Definition):
"""Contains info about registered Actions."""
__tablename__ = 'actions_v2'
__tablename__ = 'action_definitions_v2'
__table_args__ = (
sa.UniqueConstraint('name', 'project_id'),
)
# Main properties.
id = mb.id_column()
name = sa.Column(sa.String(200))
description = sa.Column(sa.Text())
tags = sa.Column(st.JsonListType())
input = sa.Column(sa.Text())
# Ad-hoc action properties.
definition = sa.Column(sa.Text(), nullable=True)
spec = sa.Column(st.JsonDictType())
# Service properties.
action_class = sa.Column(sa.String(200))
attributes = sa.Column(st.JsonDictType())
is_system = sa.Column(sa.Boolean())
class WorkflowExecution(mb.MistralSecureModelBase):
"""Contains workflow execution information."""
# Execution objects.
class Execution(mb.MistralSecureModelBase):
"""Abstract execution object."""
__tablename__ = 'executions_v2'
id = mb.id_column()
wf_name = sa.Column(sa.String(80))
wf_spec = sa.Column(st.JsonDictType())
start_params = sa.Column(st.JsonDictType())
state = sa.Column(sa.String(20))
state_info = sa.Column(
sa.String(1024),
nullable=True)
input = sa.Column(st.JsonDictType())
output = sa.Column(st.JsonDictType())
context = sa.Column(st.JsonDictType())
# Can't use ForeignKey constraint here because SqlAlchemy will detect
# a circular dependency and raise an error.
parent_task_id = sa.Column(sa.String(36))
type = sa.Column(sa.String(50))
# TODO(nmakhotkin): It's not used now, must be fixed later.
trust_id = sa.Column(sa.String(80))
event.listen(
# Catch and trim WorkflowExecution.state_info to always fit allocated size.
WorkflowExecution.state_info,
'set',
lambda t, v, o, i: utils.cut(v, 1020),
retval=True
)
class TaskExecution(mb.MistralSecureModelBase):
"""Contains task runtime information."""
__tablename__ = 'tasks_v2'
__mapper_args__ = {
'polymorphic_on': type,
'polymorphic_identity': 'execution'
}
# Main properties.
id = mb.id_column()
name = sa.Column(sa.String(80))
wf_name = sa.Column(sa.String(80))
workflow_name = sa.Column(sa.String(80))
spec = sa.Column(st.JsonDictType())
action_spec = sa.Column(st.JsonDictType())
state = sa.Column(sa.String(20))
state_info = sa.Column(sa.String(1024), nullable=True)
tags = sa.Column(st.JsonListType())
class ActionExecution(Execution):
"""Contains action execution information."""
__mapper_args__ = {
'polymorphic_identity': 'action_execution'
}
# Main properties.
accepted = sa.Column(sa.Boolean(), default=False)
# TODO(rakhmerov): We have to use @declared_attr here temporarily to
# resolve naming conflict with TaskExecution.
@declared_attr
def input(cls):
"'input' column, if not present already."
return Execution.__table__.c.get(
'input',
sa.Column(st.JsonDictType(), nullable=True)
)
# Note: Corresponds to MySQL 'LONGTEXT' type which is of unlimited size.
# TODO(rakhmerov): Change to LongText after refactoring.
output = sa.Column(st.JsonDictType(), nullable=True)
# output = sa.orm.deferred(sa.Column(st.LongText(), nullable=True))
class WorkflowExecution(ActionExecution):
"""Contains workflow execution information."""
__mapper_args__ = {
'polymorphic_identity': 'workflow_execution'
}
# Main properties.
start_params = sa.Column(st.JsonDictType())
# TODO(rakhmerov): We need to get rid of this field at all.
context = sa.Column(st.JsonDictType())
class TaskExecution(Execution):
"""Contains task runtime information."""
__mapper_args__ = {
'polymorphic_identity': 'task_execution'
}
# Main properties.
name = sa.Column(sa.String(80))
action_spec = sa.Column(st.JsonDictType())
# Data Flow properties.
# TODO(rakhmerov): 'input' is obsolete and must be removed later.
@declared_attr
def input(cls):
"'input' column, if not present already."
return Execution.__table__.c.get(
'input',
sa.Column(st.JsonDictType(), nullable=True)
)
in_context = sa.Column(st.JsonDictType())
input = sa.Column(st.JsonDictType())
# TODO(rakhmerov): Do we just need to use invocation instead of output?
# TODO(rakhmerov): We need to use action executions in the future.
result = sa.Column(st.JsonDictType())
published = sa.Column(st.JsonDictType())
@ -143,36 +175,49 @@ class TaskExecution(mb.MistralSecureModelBase):
# execution of a task.
runtime_context = sa.Column(st.JsonDictType())
# Relations.
execution_id = sa.Column(sa.String(36), sa.ForeignKey('executions_v2.id'))
execution = relationship(
'mistral.db.v2.sqlalchemy.models.WorkflowExecution',
backref="tasks",
lazy='joined'
for cls in utils.iter_subclasses(Execution):
event.listen(
# Catch and trim Execution.state_info to always fit allocated size.
cls.state_info,
'set',
lambda t, v, o, i: utils.cut(v, 1020),
retval=True
)
action_executions = relationship(
'ActionExecution',
backref='task',
cascade='all, delete-orphan',
lazy='joined'
)
# Many-to-one for 'Execution' and 'TaskExecution'.
Execution.task_execution_id = sa.Column(
sa.String(36),
sa.ForeignKey(TaskExecution.id),
nullable=True
)
TaskExecution.executions = relationship(
Execution,
backref=backref('task_execution', remote_side=[TaskExecution.id]),
cascade='all, delete-orphan',
foreign_keys=Execution.task_execution_id,
lazy='select'
)
# Many-to-one for 'TaskExecution' and 'WorkflowExecution'.
TaskExecution.workflow_execution_id = sa.Column(
sa.String(36),
sa.ForeignKey(WorkflowExecution.id)
)
WorkflowExecution.task_executions = relationship(
TaskExecution,
backref=backref('workflow_execution', remote_side=[WorkflowExecution.id]),
cascade='all, delete-orphan',
foreign_keys=TaskExecution.workflow_execution_id,
lazy='select'
)
class ActionExecution(mb.MistralSecureModelBase):
"""Contains task action invocation information."""
__tablename__ = 'action_invocations_v2'
# Main properties.
id = mb.id_column()
state = sa.Column(sa.String(20))
# Note: Corresponds to MySQL 'LONGTEXT' type which is of unlimited size.
result = sa.orm.deferred(sa.Column(st.LongText()))
# Relations.
task_id = sa.Column(sa.String(36), sa.ForeignKey('tasks_v2.id'))
# Other objects.
class DelayedCall(mb.MistralModelBase):
@ -229,7 +274,10 @@ class CronTrigger(mb.MistralSecureModelBase):
next_execution_time = sa.Column(sa.DateTime, nullable=False)
workflow_name = sa.Column(sa.String(80))
workflow_id = sa.Column(sa.String(36), sa.ForeignKey('workflows_v2.id'))
workflow_id = sa.Column(
sa.String(36),
sa.ForeignKey(WorkflowDefinition.id)
)
workflow = relationship('WorkflowDefinition', lazy='joined')
workflow_input = sa.Column(st.JsonDictType())

View File

@ -87,7 +87,7 @@ class RunTask(EngineCommand):
self.task_db = task_db
if task_db:
self.exec_db = task_db.execution
self.exec_db = task_db.workflow_execution
def run_local(self, exec_db, wf_handler, cause_task_db=None):
if self.task_db and self.task_db.state == states.IDLE:
@ -111,7 +111,7 @@ class RunTask(EngineCommand):
return
self.task_db = self._create_db_task(exec_db)
self.exec_db = self.task_db.execution
self.exec_db = self.task_db.workflow_execution
# Evaluate Data Flow properties ('input', 'in_context').
data_flow.prepare_db_task(
@ -128,8 +128,8 @@ class RunTask(EngineCommand):
p.before_task_start(self.task_db, self.task_spec)
def _create_db_task(self, exec_db):
return db_api.create_task({
'execution_id': exec_db.id,
return db_api.create_task_execution({
'workflow_execution_id': exec_db.id,
'name': self.task_spec.get_name(),
'state': states.RUNNING,
'spec': self.task_spec.to_dict(),
@ -137,7 +137,7 @@ class RunTask(EngineCommand):
'in_context': None,
'output': None,
'runtime_context': None,
'wf_name': exec_db.wf_name,
'workflow_name': exec_db.workflow_name,
'project_id': exec_db.project_id
})
@ -178,12 +178,12 @@ class RunTask(EngineCommand):
def _run_action(self):
exec_db = self.exec_db
wf_spec = spec_parser.get_workflow_spec(exec_db.wf_spec)
wf_spec = spec_parser.get_workflow_spec(exec_db.spec)
action_spec_name = self.task_spec.get_action_name()
action_db = e_utils.resolve_action(
exec_db.wf_name,
exec_db.workflow_name,
wf_spec.get_name(),
action_spec_name
)
@ -198,7 +198,7 @@ class RunTask(EngineCommand):
base_name = action_spec.get_base()
action_db = e_utils.resolve_action(
exec_db.wf_name,
exec_db.workflow_name,
wf_spec.get_name(),
base_name
)
@ -269,12 +269,12 @@ class RunTask(EngineCommand):
def _run_workflow(self):
parent_exec_db = self.exec_db
parent_wf_spec = spec_parser.get_workflow_spec(parent_exec_db.wf_spec)
parent_wf_spec = spec_parser.get_workflow_spec(parent_exec_db.spec)
wf_spec_name = self.task_spec.get_workflow_name()
wf_db = e_utils.resolve_workflow(
parent_exec_db.wf_name,
parent_exec_db.workflow_name,
parent_wf_spec.get_name(),
wf_spec_name
)

View File

@ -49,7 +49,7 @@ class DefaultEngine(base.Engine):
params = self._canonize_workflow_params(params)
with db_api.transaction():
wf_db = db_api.get_workflow(workflow_name)
wf_db = db_api.get_workflow_definition(workflow_name)
wf_spec = spec_parser.get_workflow_spec(wf_db.spec)
@ -97,9 +97,11 @@ class DefaultEngine(base.Engine):
try:
with db_api.transaction():
task_db = db_api.get_task(task_id)
task_db = db_api.get_task_execution(task_id)
task_name = task_db.name
exec_db = db_api.get_execution(task_db.execution_id)
exec_db = db_api.get_workflow_execution(
task_db.workflow_execution_id
)
exec_id = exec_db.id
result = utils.transform_result(exec_db, task_db, result)
@ -146,7 +148,7 @@ class DefaultEngine(base.Engine):
try:
with db_api.transaction():
task_db = db_api.get_task(task_id)
task_db = db_api.get_task_execution(task_id)
task_name = task_db.name
u.wf_trace.info(
@ -155,14 +157,14 @@ class DefaultEngine(base.Engine):
% (task_db.name, task_db.state, states.RUNNING)
)
task_db = db_api.update_task(
task_db = db_api.update_task_execution(
task_id,
{'state': states.RUNNING}
)
task_spec = spec_parser.get_task_spec(task_db.spec)
exec_db = task_db.execution
exec_db = task_db.workflow_execution
exec_id = exec_db.id
wf_handler = wfh_factory.create_workflow_handler(exec_db)
@ -184,7 +186,7 @@ class DefaultEngine(base.Engine):
@u.log_exec(LOG)
def pause_workflow(self, execution_id):
with db_api.transaction():
exec_db = db_api.get_execution(execution_id)
exec_db = db_api.get_workflow_execution(execution_id)
wf_handler = wfh_factory.create_workflow_handler(exec_db)
@ -196,7 +198,7 @@ class DefaultEngine(base.Engine):
def resume_workflow(self, execution_id):
try:
with db_api.transaction():
exec_db = db_api.get_execution(execution_id)
exec_db = db_api.get_workflow_execution(execution_id)
wf_handler = wfh_factory.create_workflow_handler(exec_db)
@ -233,7 +235,7 @@ class DefaultEngine(base.Engine):
with db_api.transaction():
err_msg = str(err)
exec_db = db_api.load_execution(execution_id)
exec_db = db_api.load_workflow_execution(execution_id)
if exec_db is None:
LOG.error("Cant fail workflow execution id='%s': not found.",
@ -248,7 +250,7 @@ class DefaultEngine(base.Engine):
# 1) to avoid computing and triggering next tasks
# 2) to avoid a loop in case of error in transport
wf_handler.on_task_result(
db_api.get_task(task_id),
db_api.get_task_execution(task_id),
wf_utils.TaskResult(error=err_msg)
)
@ -292,15 +294,15 @@ class DefaultEngine(base.Engine):
@staticmethod
def _create_db_execution(wf_db, wf_spec, wf_input, params):
exec_db = db_api.create_execution({
'wf_name': wf_db.name,
'wf_spec': wf_spec.to_dict(),
exec_db = db_api.create_workflow_execution({
'workflow_name': wf_db.name,
'spec': wf_spec.to_dict(),
'start_params': params or {},
'state': states.RUNNING,
'input': wf_input or {},
'output': {},
'context': copy.copy(wf_input) or {},
'parent_task_id': params.get('parent_task_id'),
'task_execution_id': params.get('parent_task_id'),
})
data_flow.add_openstack_data_to_context(exec_db.context)
@ -315,18 +317,18 @@ class DefaultEngine(base.Engine):
p.after_task_complete(task_db, task_spec, result)
def _check_subworkflow_completion(self, exec_db):
if not exec_db.parent_task_id:
if not exec_db.task_execution_id:
return
if exec_db.state == states.SUCCESS:
self._engine_client.on_task_result(
exec_db.parent_task_id,
exec_db.task_execution_id,
wf_utils.TaskResult(data=exec_db.output)
)
elif exec_db.state == states.ERROR:
err_msg = 'Failed subworkflow [execution_id=%s]' % exec_db.id
self._engine_client.on_task_result(
exec_db.parent_task_id,
exec_db.task_execution_id,
wf_utils.TaskResult(error=err_msg)
)

View File

@ -347,10 +347,10 @@ class PauseBeforePolicy(base.TaskPolicy):
wf_trace.info(
task_db,
"Workflow paused before task '%s' [%s -> %s]" %
(task_db.name, task_db.execution.state, states.PAUSED)
(task_db.name, task_db.workflow_execution.state, states.PAUSED)
)
task_db.execution.state = states.PAUSED
task_db.workflow_execution.state = states.PAUSED
task_db.state = states.IDLE
@ -372,7 +372,7 @@ class ConcurrencyPolicy(base.TaskPolicy):
def fail_task_if_incomplete(task_id, timeout):
task_db = db_api.get_task(task_id)
task_db = db_api.get_task_execution(task_id)
if not states.is_completed(task_db.state):
msg = "Task timed out [task=%s, timeout(s)=%s]." % (task_id, timeout)

View File

@ -66,10 +66,10 @@ def resolve_action(wf_name, wf_spec_name, action_spec_name):
action_full_name = "%s.%s" % (wb_name, action_spec_name)
action_db = db_api.load_action(action_full_name)
action_db = db_api.load_action_definition(action_full_name)
if not action_db:
action_db = db_api.load_action(action_spec_name)
action_db = db_api.load_action_definition(action_spec_name)
if not action_db:
raise exc.InvalidActionException(
@ -92,10 +92,10 @@ def resolve_workflow(parent_wf_name, parent_wf_spec_name, wf_spec_name):
wf_full_name = "%s.%s" % (wb_name, wf_spec_name)
wf_db = db_api.load_workflow(wf_full_name)
wf_db = db_api.load_workflow_definition(wf_full_name)
if not wf_db:
wf_db = db_api.load_workflow(wf_spec_name)
wf_db = db_api.load_workflow_definition(wf_spec_name)
if not wf_db:
raise exc.WorkflowException(
@ -122,12 +122,11 @@ def transform_result(exec_db, task_db, result):
action_spec_name = spec_parser.get_task_spec(
task_db.spec).get_action_name()
wf_spec_name = spec_parser.get_workflow_spec(
exec_db.wf_spec).get_name()
wf_spec_name = spec_parser.get_workflow_spec(exec_db.spec).get_name()
if action_spec_name:
return transform_action_result(
exec_db.wf_name,
exec_db.workflow_name,
wf_spec_name,
action_spec_name,
result

View File

@ -47,7 +47,7 @@ def register_standard_actions():
def get_registered_actions(**kwargs):
return db_api.get_actions(**kwargs)
return db_api.get_action_definitions(**kwargs)
def register_action_class(name, action_class_str, attributes,
@ -65,13 +65,13 @@ def register_action_class(name, action_class_str, attributes,
try:
LOG.debug("Registering action in DB: %s" % name)
db_api.create_action(values)
db_api.create_action_definition(values)
except exc.DBDuplicateEntry:
LOG.debug("Action %s already exists in DB." % name)
def _clear_system_action_db():
db_api.delete_actions(is_system=True)
db_api.delete_action_definitions(is_system=True)
def sync_db():
@ -129,7 +129,7 @@ def register_action_classes():
def get_action_db(action_name):
return db_api.load_action(action_name)
return db_api.load_action_definition(action_name)
def get_action_class(action_full_name):
@ -150,8 +150,8 @@ def get_action_class(action_full_name):
def get_action_context(task_db):
return {
_ACTION_CTX_PARAM: {
'workflow_name': task_db.execution.wf_name,
'execution_id': task_db.execution_id,
'workflow_name': task_db.workflow_name,
'workflow_execution_id': task_db.workflow_execution_id,
'task_id': task_db.id,
'task_name': task_db.name,
'task_tags': task_db.tags

View File

@ -48,13 +48,13 @@ def update_actions(definition, scope='private'):
def create_action(action_spec, definition, scope):
return db_api.create_action(
return db_api.create_action_definition(
_get_action_values(action_spec, definition, scope)
)
def create_or_update_action(action_spec, definition, scope):
action = db_api.load_action(action_spec.get_name())
action = db_api.load_action_definition(action_spec.get_name())
if action and action.is_system:
raise exc.InvalidActionException(
@ -64,7 +64,7 @@ def create_or_update_action(action_spec, definition, scope):
values = _get_action_values(action_spec, definition, scope)
return db_api.create_or_update_action(values['name'], values)
return db_api.create_or_update_action_definition(values['name'], values)
def _get_action_values(action_spec, definition, scope):

View File

@ -86,7 +86,7 @@ def create_cron_trigger(name, pattern, workflow_name, workflow_input,
start_time = datetime.datetime.now()
with db_api_v2.transaction():
wf = db_api_v2.get_workflow(workflow_name)
wf = db_api_v2.get_workflow_definition(workflow_name)
next_time = get_next_execution_time(pattern, start_time)

View File

@ -50,16 +50,16 @@ def create_workbook_v2(definition, scope='private'):
def update_workbook_v2(definition, scope='private'):
wb_values = _get_workbook_values(
values = _get_workbook_values(
spec_parser.get_workbook_spec_from_yaml(definition),
definition,
scope
)
with db_api_v2.transaction():
wb_db = db_api_v2.update_workbook(wb_values['name'], wb_values)
wb_db = db_api_v2.update_workbook(values['name'], values)
_on_workbook_update(wb_db, wb_values)
_on_workbook_update(wb_db, values)
return wb_db
@ -84,7 +84,7 @@ def _create_or_update_actions(wb_db, actions_spec):
'project_id': wb_db.project_id
}
db_api_v2.create_or_update_action(action_name, values)
db_api_v2.create_or_update_action_definition(action_name, values)
def _create_or_update_workflows(wb_db, workflows_spec):
@ -101,7 +101,7 @@ def _create_or_update_workflows(wb_db, workflows_spec):
security.add_trust_id(values)
db_api_v2.create_or_update_workflow(wf_name, values)
db_api_v2.create_or_update_workflow_definition(wf_name, values)
def _get_workbook_values(wb_spec, definition, scope):

View File

@ -73,7 +73,7 @@ def _get_workflow_values(wf_spec, definition, scope):
def _create_workflow(wf_spec, definition, scope):
return db_api.create_workflow(
return db_api.create_workflow_definition(
_get_workflow_values(wf_spec, definition, scope)
)
@ -81,4 +81,4 @@ def _create_workflow(wf_spec, definition, scope):
def _create_or_update_workflow(wf_spec, definition, scope):
values = _get_workflow_values(wf_spec, definition, scope)
return db_api.create_or_update_workflow(values['name'], values)
return db_api.create_or_update_workflow_definition(values['name'], values)

View File

@ -209,11 +209,9 @@ class DbTestCase(BaseTest):
with db_api_v2.transaction():
db_api_v2.delete_workbooks()
db_api_v2.delete_action_invocations()
db_api_v2.delete_tasks()
db_api_v2.delete_executions()
db_api_v2.delete_cron_triggers()
db_api_v2.delete_workflows()
db_api_v2.delete_workflow_definitions()
def setUp(self):
super(DbTestCase, self).setUp()

View File

@ -656,4 +656,4 @@ class TasksTestsV2(base.TestCase):
resp, body = self.client.get_list_obj('tasks')
self.assertEqual(200, resp.status)
self.assertEqual(self.direct_wf, body['tasks'][-1]['wf_name'])
self.assertEqual(self.direct_wf, body['tasks'][-1]['workflow_name'])

View File

@ -296,7 +296,7 @@ class MistralClientV2(MistralClientBase):
def get_wf_tasks(self, wf_name):
all_tasks = self.get_list_obj('tasks')[1]['tasks']
return [t for t in all_tasks if t['wf_name'] == wf_name]
return [t for t in all_tasks if t['workflow_name'] == wf_name]
class AuthProv(auth.KeystoneV2AuthProvider):

View File

@ -94,21 +94,23 @@ MOCK_DUPLICATE = mock.MagicMock(side_effect=exc.DBDuplicateEntry())
class TestActionsController(base.FunctionalTest):
@mock.patch.object(db_api, "get_action", MOCK_ACTION)
@mock.patch.object(db_api, "get_action_definition", MOCK_ACTION)
def test_get(self):
resp = self.app.get('/v2/actions/my_action')
self.assertEqual(resp.status_int, 200)
self.assertDictEqual(ACTION, resp.json)
@mock.patch.object(db_api, "get_action", MOCK_NOT_FOUND)
@mock.patch.object(db_api, "get_action_definition", MOCK_NOT_FOUND)
def test_get_not_found(self):
resp = self.app.get('/v2/actions/my_action', expect_errors=True)
self.assertEqual(resp.status_int, 404)
@mock.patch.object(db_api, "get_action", MOCK_ACTION)
@mock.patch.object(db_api, "create_or_update_action", MOCK_UPDATED_ACTION)
@mock.patch.object(db_api, "get_action_definition", MOCK_ACTION)
@mock.patch.object(
db_api, "create_or_update_action_definition", MOCK_UPDATED_ACTION
)
def test_put(self):
resp = self.app.put(
'/v2/actions',
@ -120,7 +122,9 @@ class TestActionsController(base.FunctionalTest):
self.assertEqual({"actions": [UPDATED_ACTION]}, resp.json)
@mock.patch.object(db_api, "create_or_update_action", MOCK_NOT_FOUND)
@mock.patch.object(
db_api, "create_or_update_action_definition", MOCK_NOT_FOUND
)
def test_put_not_found(self):
resp = self.app.put(
'/v2/actions',
@ -131,7 +135,7 @@ class TestActionsController(base.FunctionalTest):
self.assertEqual(404, resp.status_int)
@mock.patch.object(db_api, "get_action", MOCK_SYSTEM_ACTION)
@mock.patch.object(db_api, "get_action_definition", MOCK_SYSTEM_ACTION)
def test_put_system(self):
resp = self.app.put(
'/v2/actions',
@ -144,7 +148,7 @@ class TestActionsController(base.FunctionalTest):
self.assertIn('Attempt to modify a system action: std.echo',
resp.text)
@mock.patch.object(db_api, "create_action")
@mock.patch.object(db_api, "create_action_definition")
def test_post(self, mock_mtd):
mock_mtd.return_value = ACTION_DB
@ -168,7 +172,7 @@ class TestActionsController(base.FunctionalTest):
self.assertIsNotNone(spec)
self.assertEqual(ACTION_DB.name, spec['name'])
@mock.patch.object(db_api, "create_action", MOCK_DUPLICATE)
@mock.patch.object(db_api, "create_action_definition", MOCK_DUPLICATE)
def test_post_dup(self):
resp = self.app.post(
'/v2/actions',
@ -179,20 +183,20 @@ class TestActionsController(base.FunctionalTest):
self.assertEqual(resp.status_int, 409)
@mock.patch.object(db_api, "get_action", MOCK_ACTION)
@mock.patch.object(db_api, "delete_action", MOCK_DELETE)
@mock.patch.object(db_api, "get_action_definition", MOCK_ACTION)
@mock.patch.object(db_api, "delete_action_definition", MOCK_DELETE)
def test_delete(self):
resp = self.app.delete('/v2/actions/my_action')
self.assertEqual(resp.status_int, 204)
@mock.patch.object(db_api, "delete_action", MOCK_NOT_FOUND)
@mock.patch.object(db_api, "delete_action_definition", MOCK_NOT_FOUND)
def test_delete_not_found(self):
resp = self.app.delete('/v2/actions/my_action', expect_errors=True)
self.assertEqual(resp.status_int, 404)
@mock.patch.object(db_api, "get_action", MOCK_SYSTEM_ACTION)
@mock.patch.object(db_api, "get_action_definition", MOCK_SYSTEM_ACTION)
def test_delete_system(self):
resp = self.app.delete('/v2/actions/std.echo', expect_errors=True)
@ -200,7 +204,7 @@ class TestActionsController(base.FunctionalTest):
self.assertIn('Attempt to delete a system action: std.echo',
resp.json['faultstring'])
@mock.patch.object(db_api, "get_actions", MOCK_ACTIONS)
@mock.patch.object(db_api, "get_action_definitions", MOCK_ACTIONS)
def test_get_all(self):
resp = self.app.get('/v2/actions')
@ -209,7 +213,7 @@ class TestActionsController(base.FunctionalTest):
self.assertEqual(len(resp.json['actions']), 1)
self.assertDictEqual(ACTION, resp.json['actions'][0])
@mock.patch.object(db_api, "get_actions", MOCK_EMPTY)
@mock.patch.object(db_api, "get_action_definitions", MOCK_EMPTY)
def test_get_all_empty(self):
resp = self.app.get('/v2/actions')

View File

@ -76,7 +76,7 @@ class TestCronTriggerController(base.FunctionalTest):
self.assertEqual(resp.status_int, 404)
@mock.patch.object(db_api, "get_workflow", MOCK_WF)
@mock.patch.object(db_api, "get_workflow_definition", MOCK_WF)
@mock.patch.object(db_api, "create_cron_trigger")
def test_post(self, mock_mtd):
mock_mtd.return_value = TRIGGER_DB
@ -92,7 +92,7 @@ class TestCronTriggerController(base.FunctionalTest):
self.assertEqual('* * * * *', values['pattern'])
@mock.patch.object(db_api, "get_workflow", MOCK_WF)
@mock.patch.object(db_api, "get_workflow_definition", MOCK_WF)
@mock.patch.object(db_api, "create_cron_trigger", MOCK_DUPLICATE)
def test_post_dup(self):
resp = self.app.post_json(
@ -101,7 +101,7 @@ class TestCronTriggerController(base.FunctionalTest):
self.assertEqual(resp.status_int, 409)
@mock.patch.object(db_api, "get_workflow", MOCK_WF)
@mock.patch.object(db_api, "get_workflow_definition", MOCK_WF)
@mock.patch.object(db_api, "create_cron_trigger", MOCK_DUPLICATE)
def test_post_same_wf_and_input(self):
trig = TRIGGER.copy()

View File

@ -28,9 +28,10 @@ from mistral.workflow import states
EXEC_DB = models.WorkflowExecution(
id='123',
wf_name='some',
wf_spec={'name': 'some'},
workflow_name='some',
spec={'name': 'some'},
state=states.RUNNING,
state_info=None,
input={'foo': 'bar'},
output={},
start_params={'env': {'k1': 'abc'}},
@ -44,6 +45,7 @@ EXEC = {
'output': '{}',
'params': '{"env": {"k1": "abc"}}',
'state': 'RUNNING',
'state_info': None,
'created_at': '1970-01-01 00:00:00',
'updated_at': '1970-01-01 00:00:00',
'workflow_name': 'some'
@ -65,20 +67,26 @@ MOCK_ACTION_EXC = mock.MagicMock(side_effect=exc.ActionException())
class TestExecutionsController(base.FunctionalTest):
@mock.patch.object(db_api, 'get_execution', MOCK_EXECUTION)
@mock.patch.object(db_api, 'get_workflow_execution', MOCK_EXECUTION)
def test_get(self):
resp = self.app.get('/v2/executions/123')
self.maxDiff = None
self.assertEqual(resp.status_int, 200)
self.assertDictEqual(EXEC, resp.json)
@mock.patch.object(db_api, 'get_execution', MOCK_NOT_FOUND)
@mock.patch.object(db_api, 'get_workflow_execution', MOCK_NOT_FOUND)
def test_get_not_found(self):
resp = self.app.get('/v2/executions/123', expect_errors=True)
self.assertEqual(resp.status_int, 404)
@mock.patch.object(db_api, 'ensure_execution_exists', MOCK_EXECUTION)
@mock.patch.object(
db_api,
'ensure_workflow_execution_exists',
MOCK_EXECUTION
)
@mock.patch.object(rpc.EngineClient, 'pause_workflow',
MOCK_UPDATED_EXECUTION)
def test_put(self):
@ -87,7 +95,11 @@ class TestExecutionsController(base.FunctionalTest):
self.assertEqual(resp.status_int, 200)
self.assertDictEqual(UPDATED_EXEC, resp.json)
@mock.patch.object(db_api, 'ensure_execution_exists', MOCK_EXECUTION)
@mock.patch.object(
db_api,
'ensure_workflow_execution_exists',
MOCK_EXECUTION
)
def test_put_stop(self):
update_exec = copy.copy(EXEC)
update_exec['state'] = states.ERROR
@ -105,7 +117,7 @@ class TestExecutionsController(base.FunctionalTest):
self.assertDictEqual(update_exec, resp.json)
mock_pw.assert_called_once_with('123', 'ERROR', "Force")
@mock.patch.object(db_api, 'update_execution', MOCK_NOT_FOUND)
@mock.patch.object(db_api, 'update_workflow_execution', MOCK_NOT_FOUND)
def test_put_not_found(self):
resp = self.app.put_json(
'/v2/executions/123',
@ -139,19 +151,19 @@ class TestExecutionsController(base.FunctionalTest):
EXEC)
self.assertIn('Bad response: 400', context.message)
@mock.patch.object(db_api, 'delete_execution', MOCK_DELETE)
@mock.patch.object(db_api, 'delete_workflow_execution', MOCK_DELETE)
def test_delete(self):
resp = self.app.delete('/v2/executions/123')
self.assertEqual(resp.status_int, 204)
@mock.patch.object(db_api, 'delete_execution', MOCK_NOT_FOUND)
@mock.patch.object(db_api, 'delete_workflow_execution', MOCK_NOT_FOUND)
def test_delete_not_found(self):
resp = self.app.delete('/v2/executions/123', expect_errors=True)
self.assertEqual(resp.status_int, 404)
@mock.patch.object(db_api, 'get_executions', MOCK_EXECUTIONS)
@mock.patch.object(db_api, 'get_workflow_executions', MOCK_EXECUTIONS)
def test_get_all(self):
resp = self.app.get('/v2/executions')
@ -160,7 +172,7 @@ class TestExecutionsController(base.FunctionalTest):
self.assertEqual(len(resp.json['executions']), 1)
self.assertDictEqual(EXEC, resp.json['executions'][0])
@mock.patch.object(db_api, 'get_executions', MOCK_EMPTY)
@mock.patch.object(db_api, 'get_workflow_executions', MOCK_EMPTY)
def test_get_all_empty(self):
resp = self.app.get('/v2/executions')

View File

@ -32,7 +32,7 @@ from mistral.workflow import utils as wf_utils
TASK_DB = models.TaskExecution(
id='123',
name='task',
wf_name='flow',
workflow_name='flow',
spec={},
action_spec={},
state=states.RUNNING,
@ -41,7 +41,7 @@ TASK_DB = models.TaskExecution(
input={},
result={},
runtime_context={},
execution_id='123',
workflow_execution_id='123',
created_at=datetime.datetime(1970, 1, 1),
updated_at=datetime.datetime(1970, 1, 1)
)
@ -49,11 +49,11 @@ TASK_DB = models.TaskExecution(
TASK = {
'id': '123',
'name': 'task',
'wf_name': 'flow',
'workflow_name': 'flow',
'state': 'RUNNING',
'result': '{}',
'input': '{}',
'execution_id': '123',
'workflow_execution_id': '123',
'created_at': '1970-01-01 00:00:00',
'updated_at': '1970-01-01 00:00:00'
}
@ -80,14 +80,16 @@ MOCK_NOT_FOUND = mock.MagicMock(side_effect=exc.NotFoundException())
class TestTasksController(base.FunctionalTest):
@mock.patch.object(db_api, 'get_task', MOCK_TASK)
@mock.patch.object(db_api, 'get_task_execution', MOCK_TASK)
def test_get(self):
resp = self.app.get('/v2/tasks/123')
self.maxDiff = None
self.assertEqual(resp.status_int, 200)
self.assertDictEqual(TASK, resp.json)
@mock.patch.object(db_api, 'get_task', MOCK_NOT_FOUND)
@mock.patch.object(db_api, 'get_task_execution', MOCK_NOT_FOUND)
def test_get_not_found(self):
resp = self.app.get('/v2/tasks/123', expect_errors=True)
@ -138,7 +140,7 @@ class TestTasksController(base.FunctionalTest):
self.assertEqual(resp.status_int, 200)
@mock.patch.object(db_api, 'get_tasks', MOCK_TASKS)
@mock.patch.object(db_api, 'get_task_executions', MOCK_TASKS)
def test_get_all(self):
resp = self.app.get('/v2/tasks')
@ -147,7 +149,7 @@ class TestTasksController(base.FunctionalTest):
self.assertEqual(len(resp.json['tasks']), 1)
self.assertDictEqual(TASK, resp.json['tasks'][0])
@mock.patch.object(db_api, 'get_tasks', MOCK_EMPTY)
@mock.patch.object(db_api, 'get_task_executions', MOCK_EMPTY)
def test_get_all_empty(self):
resp = self.app.get('/v2/tasks')

View File

@ -28,7 +28,7 @@ WORKBOOK_DEF = '---'
UPDATED_WORKBOOK_DEF = '---\nVersion: 2.0'
WORKBOOK_DB = models.WorkbookDefinition(
WORKBOOK_DB = models.Workbook(
id='123',
name='book',
definition=WORKBOOK_DEF,

View File

@ -83,20 +83,22 @@ MOCK_DUPLICATE = mock.MagicMock(side_effect=exc.DBDuplicateEntry())
class TestWorkflowsController(base.FunctionalTest):
@mock.patch.object(db_api, "get_workflow", MOCK_WF)
@mock.patch.object(db_api, "get_workflow_definition", MOCK_WF)
def test_get(self):
resp = self.app.get('/v2/workflows/123')
self.assertEqual(resp.status_int, 200)
self.assertDictEqual(WF, resp.json)
@mock.patch.object(db_api, "get_workflow", MOCK_NOT_FOUND)
@mock.patch.object(db_api, "get_workflow_definition", MOCK_NOT_FOUND)
def test_get_not_found(self):
resp = self.app.get('/v2/workflows/123', expect_errors=True)
self.assertEqual(resp.status_int, 404)
@mock.patch.object(db_api, "create_or_update_workflow", MOCK_UPDATED_WF)
@mock.patch.object(
db_api, "create_or_update_workflow_definition", MOCK_UPDATED_WF
)
def test_put(self):
resp = self.app.put(
'/v2/workflows',
@ -109,7 +111,9 @@ class TestWorkflowsController(base.FunctionalTest):
self.assertEqual(resp.status_int, 200)
self.assertDictEqual({'workflows': [UPDATED_WF]}, resp.json)
@mock.patch.object(db_api, "create_or_update_workflow", MOCK_NOT_FOUND)
@mock.patch.object(
db_api, "create_or_update_workflow_definition", MOCK_NOT_FOUND
)
def test_put_not_found(self):
resp = self.app.put(
'/v2/workflows',
@ -120,7 +124,7 @@ class TestWorkflowsController(base.FunctionalTest):
self.assertEqual(resp.status_int, 404)
@mock.patch.object(db_api, "create_workflow")
@mock.patch.object(db_api, "create_workflow_definition")
def test_post(self, mock_mtd):
mock_mtd.return_value = WF_DB
@ -140,7 +144,7 @@ class TestWorkflowsController(base.FunctionalTest):
self.assertIsNotNone(spec)
self.assertEqual(WF_DB.name, spec['name'])
@mock.patch.object(db_api, "create_workflow", MOCK_DUPLICATE)
@mock.patch.object(db_api, "create_workflow_definition", MOCK_DUPLICATE)
def test_post_dup(self):
resp = self.app.post(
'/v2/workflows',
@ -151,19 +155,19 @@ class TestWorkflowsController(base.FunctionalTest):
self.assertEqual(resp.status_int, 409)
@mock.patch.object(db_api, "delete_workflow", MOCK_DELETE)
@mock.patch.object(db_api, "delete_workflow_definition", MOCK_DELETE)
def test_delete(self):
resp = self.app.delete('/v2/workflows/123')
self.assertEqual(resp.status_int, 204)
@mock.patch.object(db_api, "delete_workflow", MOCK_NOT_FOUND)
@mock.patch.object(db_api, "delete_workflow_definition", MOCK_NOT_FOUND)
def test_delete_not_found(self):
resp = self.app.delete('/v2/workflows/123', expect_errors=True)
self.assertEqual(resp.status_int, 404)
@mock.patch.object(db_api, "get_workflows", MOCK_WFS)
@mock.patch.object(db_api, "get_workflow_definitions", MOCK_WFS)
def test_get_all(self):
resp = self.app.get('/v2/workflows')
@ -172,7 +176,7 @@ class TestWorkflowsController(base.FunctionalTest):
self.assertEqual(len(resp.json['workflows']), 1)
self.assertDictEqual(WF, resp.json['workflows'][0])
@mock.patch.object(db_api, "get_workflows", MOCK_EMPTY)
@mock.patch.object(db_api, "get_workflow_definitions", MOCK_EMPTY)
def test_get_all_empty(self):
resp = self.app.get('/v2/workflows')

File diff suppressed because it is too large Load Diff

View File

@ -99,19 +99,19 @@ class EngineTestCase(base.DbTestCase):
[thread.kill() for thread in self.threads]
def is_execution_success(self, exec_id):
return db_api.get_execution(exec_id).state == states.SUCCESS
return db_api.get_workflow_execution(exec_id).state == states.SUCCESS
def is_execution_error(self, exec_id):
return db_api.get_execution(exec_id).state == states.ERROR
return db_api.get_workflow_execution(exec_id).state == states.ERROR
def is_execution_paused(self, exec_id):
return db_api.get_execution(exec_id).state == states.PAUSED
return db_api.get_workflow_execution(exec_id).state == states.PAUSED
def is_task_success(self, task_id):
return db_api.get_task(task_id).state == states.SUCCESS
return db_api.get_task_execution(task_id).state == states.SUCCESS
def is_task_error(self, task_id):
return db_api.get_task(task_id).state == states.ERROR
return db_api.get_task_execution(task_id).state == states.ERROR
def is_task_delayed(self, task_id):
return db_api.get_task(task_id).state == states.DELAYED
return db_api.get_task_execution(task_id).state == states.DELAYED

View File

@ -78,18 +78,27 @@ class ActionContextTest(base.EngineTestCase):
self._await(lambda: self.is_execution_success(exec_db.id))
exec_db = db_api.get_execution(exec_db.id)
exec_db = db_api.get_workflow_execution(exec_db.id)
self.assertEqual(states.SUCCESS, exec_db.state)
task = self._assert_single_item(exec_db.tasks, name='task1')
task = self._assert_single_item(exec_db.task_executions, name='task1')
headers = {
'Mistral-Workflow-Name': exec_db.wf_name,
'Mistral-Workflow-Name': exec_db.workflow_name,
'Mistral-Task-Id': task.id,
'Mistral-Execution-Id': exec_db.id
}
requests.request.assert_called_with(
'GET', 'https://wiki.openstack.org/wiki/mistral',
params=None, data=None, headers=headers, cookies=None, auth=None,
timeout=None, allow_redirects=None, proxies=None, verify=None)
'GET',
'https://wiki.openstack.org/wiki/mistral',
params=None,
data=None,
headers=headers,
cookies=None,
auth=None,
timeout=None,
allow_redirects=None,
proxies=None,
verify=None
)

View File

@ -126,10 +126,10 @@ class ActionDefaultTest(base.EngineTestCase):
self._await(lambda: self.is_execution_success(exec_db.id))
exec_db = db_api.get_execution(exec_db.id)
exec_db = db_api.get_workflow_execution(exec_db.id)
self.assertEqual(states.SUCCESS, exec_db.state)
self._assert_single_item(exec_db.tasks, name='task1')
self._assert_single_item(exec_db.task_executions, name='task1')
requests.request.assert_called_with(
'GET', 'https://api.library.org/books',
@ -151,17 +151,18 @@ class ActionDefaultTest(base.EngineTestCase):
self._await(lambda: self.is_execution_success(exec_db.id))
exec_db = db_api.get_execution(exec_db.id)
exec_db = db_api.get_workflow_execution(exec_db.id)
self.assertEqual(states.SUCCESS, exec_db.state)
self._assert_single_item(exec_db.tasks, name='task1')
self._assert_single_item(exec_db.task_executions, name='task1')
requests.request.assert_called_with(
'GET', 'https://api.library.org/books',
params=None, data=None, headers=None, cookies=None,
allow_redirects=None, proxies=None, verify=None,
auth=EXPECTED_ENV_AUTH,
timeout=60)
timeout=60
)
@mock.patch.object(
requests, 'request',
@ -179,16 +180,18 @@ class ActionDefaultTest(base.EngineTestCase):
]
}
exec_db = self.engine.start_workflow('wf1_with_items',
wf_input,
env=ENV)
exec_db = self.engine.start_workflow(
'wf1_with_items',
wf_input,
env=ENV
)
self._await(lambda: self.is_execution_success(exec_db.id))
exec_db = db_api.get_execution(exec_db.id)
exec_db = db_api.get_workflow_execution(exec_db.id)
self.assertEqual(states.SUCCESS, exec_db.state)
self._assert_single_item(exec_db.tasks, name='task1')
self._assert_single_item(exec_db.task_executions, name='task1')
calls = [mock.call('GET', url, params=None, data=None,
headers=None, cookies=None,
@ -215,16 +218,18 @@ class ActionDefaultTest(base.EngineTestCase):
]
}
exec_db = self.engine.start_workflow('wf2_with_items',
wf_input,
env=ENV)
exec_db = self.engine.start_workflow(
'wf2_with_items',
wf_input,
env=ENV
)
self._await(lambda: self.is_execution_success(exec_db.id))
exec_db = db_api.get_execution(exec_db.id)
exec_db = db_api.get_workflow_execution(exec_db.id)
self.assertEqual(states.SUCCESS, exec_db.state)
self._assert_single_item(exec_db.tasks, name='task1')
self._assert_single_item(exec_db.task_executions, name='task1')
calls = [mock.call('GET', url, params=None, data=None,
headers=None, cookies=None,

View File

@ -74,7 +74,7 @@ class AdhocActionsTest(base.EngineTestCase):
self._await(lambda: self.is_execution_success(exec_db.id))
exec_db = db_api.get_execution(exec_db.id)
exec_db = db_api.get_workflow_execution(exec_db.id)
self.maxDiff = None

View File

@ -64,11 +64,11 @@ class SimpleEngineCommandsTest(base.EngineTestCase):
self._await(lambda: self.is_execution_error(exec_db.id))
exec_db = db_api.get_execution(exec_db.id)
exec_db = db_api.get_workflow_execution(exec_db.id)
self.assertEqual(1, len(exec_db.tasks))
self.assertEqual(1, len(exec_db.task_executions))
self._assert_single_item(
exec_db.tasks,
exec_db.task_executions,
name='task1',
state=states.SUCCESS
)
@ -78,11 +78,11 @@ class SimpleEngineCommandsTest(base.EngineTestCase):
self._await(lambda: self.is_execution_success(exec_db.id))
exec_db = db_api.get_execution(exec_db.id)
exec_db = db_api.get_workflow_execution(exec_db.id)
self.assertEqual(1, len(exec_db.tasks))
self.assertEqual(1, len(exec_db.task_executions))
self._assert_single_item(
exec_db.tasks,
exec_db.task_executions,
name='task1',
state=states.SUCCESS
)
@ -92,11 +92,11 @@ class SimpleEngineCommandsTest(base.EngineTestCase):
self._await(lambda: self.is_execution_paused(exec_db.id))
exec_db = db_api.get_execution(exec_db.id)
exec_db = db_api.get_workflow_execution(exec_db.id)
self.assertEqual(1, len(exec_db.tasks))
self.assertEqual(1, len(exec_db.task_executions))
self._assert_single_item(
exec_db.tasks,
exec_db.task_executions,
name='task1',
state=states.SUCCESS
)
@ -142,11 +142,11 @@ class SimpleEngineWorkflowLevelCommandsTest(base.EngineTestCase):
self._await(lambda: self.is_execution_error(exec_db.id))
exec_db = db_api.get_execution(exec_db.id)
exec_db = db_api.get_workflow_execution(exec_db.id)
self.assertEqual(1, len(exec_db.tasks))
self.assertEqual(1, len(exec_db.task_executions))
self._assert_single_item(
exec_db.tasks,
exec_db.task_executions,
name='task1',
state=states.SUCCESS
)
@ -156,11 +156,11 @@ class SimpleEngineWorkflowLevelCommandsTest(base.EngineTestCase):
self._await(lambda: self.is_execution_success(exec_db.id))
exec_db = db_api.get_execution(exec_db.id)
exec_db = db_api.get_workflow_execution(exec_db.id)
self.assertEqual(1, len(exec_db.tasks))
self.assertEqual(1, len(exec_db.task_executions))
self._assert_single_item(
exec_db.tasks,
exec_db.task_executions,
name='task1',
state=states.SUCCESS
)
@ -170,11 +170,11 @@ class SimpleEngineWorkflowLevelCommandsTest(base.EngineTestCase):
self._await(lambda: self.is_execution_paused(exec_db.id))
exec_db = db_api.get_execution(exec_db.id)
exec_db = db_api.get_workflow_execution(exec_db.id)
self.assertEqual(1, len(exec_db.tasks))
self.assertEqual(1, len(exec_db.task_executions))
self._assert_single_item(
exec_db.tasks,
exec_db.task_executions,
name='task1',
state=states.SUCCESS
)
@ -252,11 +252,11 @@ class OrderEngineCommandsTest(base.EngineTestCase):
self._await(lambda: self.is_execution_error(exec_db.id))
exec_db = db_api.get_execution(exec_db.id)
exec_db = db_api.get_workflow_execution(exec_db.id)
self.assertEqual(1, len(exec_db.tasks))
self.assertEqual(1, len(exec_db.task_executions))
self._assert_single_item(
exec_db.tasks,
exec_db.task_executions,
name='task1',
state=states.SUCCESS
)
@ -266,15 +266,18 @@ class OrderEngineCommandsTest(base.EngineTestCase):
self._await(lambda: self.is_execution_error(exec_db.id))
exec_db = db_api.get_execution(exec_db.id)
exec_db = db_api.get_workflow_execution(exec_db.id)
self.assertEqual(2, len(exec_db.tasks))
self.assertEqual(2, len(exec_db.task_executions))
self._assert_single_item(
exec_db.tasks,
exec_db.task_executions,
name='task1',
state=states.SUCCESS
)
task2_db = self._assert_single_item(exec_db.tasks, name='task2')
task2_db = self._assert_single_item(
exec_db.task_executions,
name='task2'
)
self._await(lambda: self.is_task_success(task2_db.id))
self._await(lambda: self.is_execution_error(exec_db.id))
@ -284,11 +287,11 @@ class OrderEngineCommandsTest(base.EngineTestCase):
self._await(lambda: self.is_execution_success(exec_db.id))
exec_db = db_api.get_execution(exec_db.id)
exec_db = db_api.get_workflow_execution(exec_db.id)
self.assertEqual(1, len(exec_db.tasks))
self.assertEqual(1, len(exec_db.task_executions))
self._assert_single_item(
exec_db.tasks,
exec_db.task_executions,
name='task1',
state=states.SUCCESS
)
@ -298,15 +301,18 @@ class OrderEngineCommandsTest(base.EngineTestCase):
self._await(lambda: self.is_execution_success(exec_db.id))
exec_db = db_api.get_execution(exec_db.id)
exec_db = db_api.get_workflow_execution(exec_db.id)
self.assertEqual(2, len(exec_db.tasks))
self.assertEqual(2, len(exec_db.task_executions))
self._assert_single_item(
exec_db.tasks,
exec_db.task_executions,
name='task1',
state=states.SUCCESS
)
task2_db = self._assert_single_item(exec_db.tasks, name='task2')
task2_db = self._assert_single_item(
exec_db.task_executions,
name='task2'
)
self._await(lambda: self.is_task_error(task2_db.id))
self._await(lambda: self.is_execution_success(exec_db.id))

View File

@ -174,11 +174,11 @@ class DataFlowEngineTest(engine_test_base.EngineTestCase):
self._await(lambda: self.is_execution_success(exec_db.id))
# Note: We need to reread execution to access related tasks.
exec_db = db_api.get_execution(exec_db.id)
exec_db = db_api.get_workflow_execution(exec_db.id)
self.assertEqual(states.SUCCESS, exec_db.state)
tasks = exec_db.tasks
tasks = exec_db.task_executions
task1 = self._assert_single_item(tasks, name='task1')
task2 = self._assert_single_item(tasks, name='task2')
@ -201,11 +201,11 @@ class DataFlowEngineTest(engine_test_base.EngineTestCase):
self._await(lambda: self.is_execution_success(exec_db.id))
# Note: We need to reread execution to access related tasks.
exec_db = db_api.get_execution(exec_db.id)
exec_db = db_api.get_workflow_execution(exec_db.id)
self.assertEqual(states.SUCCESS, exec_db.state)
tasks = exec_db.tasks
tasks = exec_db.task_executions
self.assertEqual(2, len(tasks))
@ -230,11 +230,11 @@ class DataFlowEngineTest(engine_test_base.EngineTestCase):
self._await(lambda: self.is_execution_success(exec_db.id))
# Note: We need to reread execution to access related tasks.
exec_db = db_api.get_execution(exec_db.id)
exec_db = db_api.get_workflow_execution(exec_db.id)
self.assertEqual(states.SUCCESS, exec_db.state)
tasks = exec_db.tasks
tasks = exec_db.task_executions
self.assertEqual(6, len(tasks))
@ -278,11 +278,11 @@ class DataFlowEngineTest(engine_test_base.EngineTestCase):
self._await(lambda: self.is_execution_success(exec_db.id))
# Note: We need to reread execution to access related tasks.
exec_db = db_api.get_execution(exec_db.id)
exec_db = db_api.get_workflow_execution(exec_db.id)
self.assertEqual(states.SUCCESS, exec_db.state)
tasks = exec_db.tasks
tasks = exec_db.task_executions
task1 = self._assert_single_item(tasks, name='task1')
task2 = self._assert_single_item(tasks, name='task2')

View File

@ -119,13 +119,13 @@ class DefaultEngineTest(base.DbTestCase):
self.assertIn('__execution', exec_db.context)
# Note: We need to reread execution to access related tasks.
exec_db = db_api.get_execution(exec_db.id)
exec_db = db_api.get_workflow_execution(exec_db.id)
self.assertEqual(1, len(exec_db.tasks))
self.assertEqual(1, len(exec_db.task_executions))
task_db = exec_db.tasks[0]
task_db = exec_db.task_executions[0]
self.assertEqual('wb.wf1', task_db.wf_name)
self.assertEqual('wb.wf1', task_db.workflow_name)
self.assertEqual('task1', task_db.name)
self.assertEqual(states.RUNNING, task_db.state)
self.assertIsNotNone(task_db.spec)
@ -152,7 +152,7 @@ class DefaultEngineTest(base.DbTestCase):
self.assertIsNotNone(exec_db)
exec_db = db_api.get_execution(exec_db.id)
exec_db = db_api.get_workflow_execution(exec_db.id)
self.assertDictEqual(exec_db.start_params.get('env', {}), env)
@ -173,7 +173,7 @@ class DefaultEngineTest(base.DbTestCase):
self.assertIsNotNone(exec_db)
exec_db = db_api.get_execution(exec_db.id)
exec_db = db_api.get_workflow_execution(exec_db.id)
self.assertDictEqual(exec_db.start_params.get('env', {}), env)
@ -226,11 +226,11 @@ class DefaultEngineTest(base.DbTestCase):
self.assertEqual(states.RUNNING, exec_db.state)
# Note: We need to reread execution to access related tasks.
exec_db = db_api.get_execution(exec_db.id)
exec_db = db_api.get_workflow_execution(exec_db.id)
self.assertEqual(1, len(exec_db.tasks))
self.assertEqual(1, len(exec_db.task_executions))
task_db = exec_db.tasks[0]
task_db = exec_db.task_executions[0]
self.assertEqual('task1', task_db.name)
self.assertEqual(states.RUNNING, task_db.state)
@ -242,7 +242,7 @@ class DefaultEngineTest(base.DbTestCase):
# Finish 'task1'.
task1_db = self.engine.on_task_result(
exec_db.tasks[0].id,
exec_db.task_executions[0].id,
wf_utils.TaskResult(data='Hey')
)
@ -256,14 +256,17 @@ class DefaultEngineTest(base.DbTestCase):
self.assertDictEqual({'output': 'Hey'}, task1_db.input)
self.assertDictEqual({'result': 'Hey'}, task1_db.result)
exec_db = db_api.get_execution(exec_db.id)
exec_db = db_api.get_workflow_execution(exec_db.id)
self.assertIsNotNone(exec_db)
self.assertEqual(states.RUNNING, exec_db.state)
self.assertEqual(2, len(exec_db.tasks))
self.assertEqual(2, len(exec_db.task_executions))
task2_db = self._assert_single_item(exec_db.tasks, name='task2')
task2_db = self._assert_single_item(
exec_db.task_executions,
name='task2'
)
self.assertEqual(states.RUNNING, task2_db.state)
@ -273,7 +276,7 @@ class DefaultEngineTest(base.DbTestCase):
wf_utils.TaskResult(data='Hi')
)
exec_db = db_api.get_execution(exec_db.id)
exec_db = db_api.get_workflow_execution(exec_db.id)
self.assertIsNotNone(exec_db)
self.assertEqual(states.SUCCESS, exec_db.state)
@ -290,10 +293,10 @@ class DefaultEngineTest(base.DbTestCase):
self.assertDictEqual({'output': 'Hi'}, task2_db.input)
self.assertDictEqual({}, task2_db.result)
self.assertEqual(2, len(exec_db.tasks))
self.assertEqual(2, len(exec_db.task_executions))
self._assert_single_item(exec_db.tasks, name='task1')
self._assert_single_item(exec_db.tasks, name='task2')
self._assert_single_item(exec_db.task_executions, name='task1')
self._assert_single_item(exec_db.task_executions, name='task2')
def test_stop_workflow_fail(self):
# Start workflow.

View File

@ -40,7 +40,7 @@ class DirectWorkflowEngineTest(base.EngineTestCase):
exec_db = self.engine.start_workflow('wf', {})
self._await(lambda: self.is_execution_error(exec_db.id))
return db_api.get_execution(exec_db.id)
return db_api.get_workflow_execution(exec_db.id)
def test_direct_workflow_on_closures(self):
WORKFLOW = """
@ -76,7 +76,7 @@ class DirectWorkflowEngineTest(base.EngineTestCase):
"""
exec_db = self._run_workflow(WORKFLOW)
tasks = exec_db.tasks
tasks = exec_db.task_executions
task1 = self._assert_single_item(tasks, name='task1')
task3 = self._assert_single_item(tasks, name='task3')
task4 = self._assert_single_item(tasks, name='task4')
@ -106,7 +106,7 @@ class DirectWorkflowEngineTest(base.EngineTestCase):
action: std.echo wrong_input="Hahaha"
"""
exec_db = exec_db = self._run_workflow(WORKFLOW_WRONG_TASK_INPUT)
task_db2 = exec_db.tasks[1]
task_db2 = exec_db.task_executions[1]
self.assertIn(
"Failed to initialize action",
@ -132,7 +132,7 @@ class DirectWorkflowEngineTest(base.EngineTestCase):
action: std.echo wrong_input="Ha-ha"
"""
exec_db = self._run_workflow(WORKFLOW_WRONG_FIRST_TASK_INPUT)
task_db = exec_db.tasks[0]
task_db = exec_db.task_executions[0]
self.assertIn(
"Failed to initialize action",

View File

@ -115,7 +115,7 @@ class SubworkflowsTest(base.EngineTestCase):
self.assertDictEqual({}, exec1_db.input)
self.assertDictEqual({'env': env}, exec1_db.start_params)
db_execs = db_api.get_executions()
db_execs = db_api.get_workflow_executions()
self.assertEqual(2, len(db_execs))
@ -127,7 +127,7 @@ class SubworkflowsTest(base.EngineTestCase):
expected_start_params = {
'task_name': 'task2',
'parent_task_id': exec2_db.parent_task_id,
'parent_task_id': exec2_db.task_execution_id,
'env': env
}
@ -136,14 +136,14 @@ class SubworkflowsTest(base.EngineTestCase):
'param2': 'Clyde'
}
self.assertIsNotNone(exec2_db.parent_task_id)
self.assertIsNotNone(exec2_db.task_execution_id)
self.assertDictEqual(exec2_db.start_params, expected_start_params)
self.assertDictEqual(exec2_db.input, expected_wf1_input)
# Wait till workflow 'wf1' is completed.
self._await(lambda: self.is_execution_success(exec2_db.id))
exec2_db = db_api.get_execution(exec2_db.id)
exec2_db = db_api.get_workflow_execution(exec2_db.id)
expected_wf1_output = {'final_result': "'Bonnie & Clyde'"}
@ -152,14 +152,16 @@ class SubworkflowsTest(base.EngineTestCase):
# Wait till workflow 'wf2' is completed.
self._await(lambda: self.is_execution_success(exec1_db.id))
exec1_db = db_api.get_execution(exec1_db.id)
exec1_db = db_api.get_workflow_execution(exec1_db.id)
expected_wf2_output = {'slogan': "'Bonnie & Clyde' is a cool movie!"}
self.assertDictEqual(exec1_db.output, expected_wf2_output)
# Check if target is resolved.
tasks_exec2 = db_api.get_tasks(execution_id=exec2_db.id)
tasks_exec2 = db_api.get_task_executions(
workflow_execution_id=exec2_db.id
)
self._assert_single_item(tasks_exec2, name='task1')
self._assert_single_item(tasks_exec2, name='task2')

View File

@ -79,8 +79,8 @@ class JavaScriptEngineTest(base.EngineTestCase):
self._await(lambda: self.is_execution_success(exec_db.id))
# Note: We need to reread execution to access related tasks.
exec_db = db_api.get_execution(exec_db.id)
task_db = exec_db.tasks[0]
exec_db = db_api.get_workflow_execution(exec_db.id)
task_db = exec_db.task_executions[0]
self.assertEqual(states.SUCCESS, task_db.state)
self.assertDictEqual({}, task_db.runtime_context)
@ -98,8 +98,8 @@ class JavaScriptEngineTest(base.EngineTestCase):
self._await(lambda: self.is_execution_success(exec_db.id))
# Note: We need to reread execution to access related tasks.
exec_db = db_api.get_execution(exec_db.id)
task_db = exec_db.tasks[0]
exec_db = db_api.get_workflow_execution(exec_db.id)
task_db = exec_db.task_executions[0]
self.assertEqual(states.SUCCESS, task_db.state)
self.assertDictEqual({}, task_db.runtime_context)

View File

@ -269,9 +269,9 @@ class JoinEngineTest(base.EngineTestCase):
self._await(lambda: self.is_execution_success(exec_db.id))
# Note: We need to reread execution to access related tasks.
exec_db = db_api.get_execution(exec_db.id)
exec_db = db_api.get_workflow_execution(exec_db.id)
tasks = exec_db.tasks
tasks = exec_db.task_executions
task1 = self._assert_single_item(tasks, name='task1')
task2 = self._assert_single_item(tasks, name='task2')
@ -292,9 +292,9 @@ class JoinEngineTest(base.EngineTestCase):
self._await(lambda: self.is_execution_success(exec_db.id))
# Note: We need to reread execution to access related tasks.
exec_db = db_api.get_execution(exec_db.id)
exec_db = db_api.get_workflow_execution(exec_db.id)
tasks = exec_db.tasks
tasks = exec_db.task_executions
task1 = self._assert_single_item(tasks, name='task1')
task2 = self._assert_single_item(tasks, name='task2')
@ -315,9 +315,9 @@ class JoinEngineTest(base.EngineTestCase):
self._await(lambda: self.is_execution_success(exec_db.id))
# Note: We need to reread execution to access related tasks.
exec_db = db_api.get_execution(exec_db.id)
exec_db = db_api.get_workflow_execution(exec_db.id)
tasks = exec_db.tasks
tasks = exec_db.task_executions
self.assertEqual(3, len(tasks))
@ -340,9 +340,9 @@ class JoinEngineTest(base.EngineTestCase):
self._await(lambda: self.is_execution_success(exec_db.id))
# Note: We need to reread execution to access related tasks.
exec_db = db_api.get_execution(exec_db.id)
exec_db = db_api.get_workflow_execution(exec_db.id)
tasks = exec_db.tasks
tasks = exec_db.task_executions
self.assertEqual(4, len(tasks))
@ -374,8 +374,9 @@ class JoinEngineTest(base.EngineTestCase):
self._await(lambda: self.is_execution_success(exec_db.id))
# Note: We need to reread execution to access related tasks.
exec_db = db_api.get_execution(exec_db.id)
tasks = exec_db.tasks
exec_db = db_api.get_workflow_execution(exec_db.id)
tasks = exec_db.task_executions
self.assertEqual(5, len(tasks))
@ -405,9 +406,9 @@ class JoinEngineTest(base.EngineTestCase):
self._await(lambda: self.is_execution_success(exec_db.id))
# Note: We need to reread execution to access related tasks.
exec_db = db_api.get_execution(exec_db.id)
exec_db = db_api.get_workflow_execution(exec_db.id)
tasks = exec_db.tasks
tasks = exec_db.task_executions
self.assertEqual(4, len(tasks))
@ -428,7 +429,7 @@ class JoinEngineTest(base.EngineTestCase):
def test_full_join_parallel_published_vars(self):
wfs_tasks_join_complex = """---
version: "2.0"
version: 2.0
main:
type: direct
@ -436,6 +437,7 @@ class JoinEngineTest(base.EngineTestCase):
var1: <% $.var1 %>
var2: <% $.var2 %>
is_done: <% $.is_done %>
tasks:
init:
publish:

View File

@ -85,9 +85,9 @@ class NoopTaskEngineTest(base.EngineTestCase):
self._await(lambda: self.is_execution_success(exec_db.id))
# Note: We need to reread execution to access related tasks.
exec_db = db_api.get_execution(exec_db.id)
exec_db = db_api.get_workflow_execution(exec_db.id)
tasks = exec_db.tasks
tasks = exec_db.task_executions
self.assertEqual(4, len(tasks))
@ -112,9 +112,9 @@ class NoopTaskEngineTest(base.EngineTestCase):
self._await(lambda: self.is_execution_success(exec_db.id))
# Note: We need to reread execution to access related tasks.
exec_db = db_api.get_execution(exec_db.id)
exec_db = db_api.get_workflow_execution(exec_db.id)
tasks = exec_db.tasks
tasks = exec_db.task_executions
self.assertEqual(4, len(tasks))

View File

@ -308,8 +308,8 @@ class PoliciesTest(base.EngineTestCase):
exec_db = self.engine.start_workflow('wb.wf1', {})
# Note: We need to reread execution to access related tasks.
exec_db = db_api.get_execution(exec_db.id)
task_db = exec_db.tasks[0]
exec_db = db_api.get_workflow_execution(exec_db.id)
task_db = exec_db.task_executions[0]
self.assertEqual(states.DELAYED, task_db.state)
self.assertDictEqual(
@ -327,7 +327,7 @@ class PoliciesTest(base.EngineTestCase):
# Note: We need to reread execution to access related tasks.
exec_db = db_api.get_execution(exec_db.id)
task_db = exec_db.tasks[0]
task_db = exec_db.task_executions[0]
self.assertEqual(states.DELAYED, task_db.state)
@ -340,8 +340,8 @@ class PoliciesTest(base.EngineTestCase):
exec_db = self.engine.start_workflow('wb.wf1', {})
# Note: We need to reread execution to access related tasks.
exec_db = db_api.get_execution(exec_db.id)
task_db = exec_db.tasks[0]
exec_db = db_api.get_workflow_execution(exec_db.id)
task_db = exec_db.task_executions[0]
self.assertEqual(states.RUNNING, task_db.state)
self.assertDictEqual({}, task_db.runtime_context)
@ -359,8 +359,8 @@ class PoliciesTest(base.EngineTestCase):
exec_db = self.engine.start_workflow('wb.wf1', {})
# Note: We need to reread execution to access related tasks.
exec_db = db_api.get_execution(exec_db.id)
task_db = exec_db.tasks[0]
exec_db = db_api.get_workflow_execution(exec_db.id)
task_db = exec_db.task_executions[0]
self.assertEqual(states.RUNNING, task_db.state)
self.assertDictEqual({}, task_db.runtime_context)
@ -373,8 +373,8 @@ class PoliciesTest(base.EngineTestCase):
self._await(lambda: self.is_execution_error(exec_db.id))
exec_db = db_api.get_execution(exec_db.id)
task_db = exec_db.tasks[0]
exec_db = db_api.get_workflow_execution(exec_db.id)
task_db = exec_db.task_executions[0]
self.assertEqual(
2,
@ -388,15 +388,18 @@ class PoliciesTest(base.EngineTestCase):
exec_db = self.engine.start_workflow('wb.wf1', {})
# Note: We need to reread execution to access related tasks.
exec_db = db_api.get_execution(exec_db.id)
task_db = exec_db.tasks[0]
exec_db = db_api.get_workflow_execution(exec_db.id)
task_db = exec_db.task_executions[0]
self.assertEqual(states.RUNNING, task_db.state)
self._await(lambda: self.is_task_error(task_db.id))
exec_db = db_api.get_execution(exec_db.id)
task_db = self._assert_single_item(exec_db.tasks, name="task1")
exec_db = db_api.get_workflow_execution(exec_db.id)
task_db = self._assert_single_item(
exec_db.task_executions,
name='task1'
)
self.assertEqual(states.ERROR, task_db.state)
self.assertIsNotNone(exec_db)
@ -410,8 +413,8 @@ class PoliciesTest(base.EngineTestCase):
exec_db = self.engine.start_workflow('wb.wf1', {})
# Note: We need to reread execution to access related tasks.
exec_db = db_api.get_execution(exec_db.id)
task_db = exec_db.tasks[0]
exec_db = db_api.get_workflow_execution(exec_db.id)
task_db = exec_db.task_executions[0]
self.assertEqual(states.RUNNING, task_db.state)
@ -420,8 +423,8 @@ class PoliciesTest(base.EngineTestCase):
# Wait until timeout exceeds.
self._sleep(2)
exec_db = db_api.get_execution(exec_db.id)
tasks_db = exec_db.tasks
exec_db = db_api.get_workflow_execution(exec_db.id)
tasks_db = exec_db.task_executions
# Make sure that engine did not create extra tasks.
self.assertEqual(1, len(tasks_db))
@ -432,8 +435,11 @@ class PoliciesTest(base.EngineTestCase):
# Start workflow.
exec_db = self.engine.start_workflow('wb.wf1', {})
exec_db = db_api.get_execution(exec_db.id)
task_db = self._assert_single_item(exec_db.tasks, name="task1")
exec_db = db_api.get_workflow_execution(exec_db.id)
task_db = self._assert_single_item(
exec_db.task_executions,
name='task1'
)
self.assertEqual(states.IDLE, task_db.state)
@ -441,14 +447,20 @@ class PoliciesTest(base.EngineTestCase):
self.engine.resume_workflow(exec_db.id)
exec_db = db_api.get_execution(exec_db.id)
task_db = self._assert_single_item(exec_db.tasks, name="task1")
exec_db = db_api.get_workflow_execution(exec_db.id)
self._assert_single_item(exec_db.task_executions, name='task1')
self._await(lambda: self.is_execution_success(exec_db.id))
exec_db = db_api.get_execution(exec_db.id)
task_db = self._assert_single_item(exec_db.tasks, name="task1")
next_task_db = self._assert_single_item(exec_db.tasks, name="task2")
exec_db = db_api.get_workflow_execution(exec_db.id)
task_db = self._assert_single_item(
exec_db.task_executions,
name='task1'
)
next_task_db = self._assert_single_item(
exec_db.task_executions,
name='task2'
)
self.assertEqual(states.SUCCESS, task_db.state)
self.assertEqual(states.SUCCESS, next_task_db.state)
@ -461,8 +473,11 @@ class PoliciesTest(base.EngineTestCase):
self._await(lambda: self.is_execution_success(exec_db.id))
exec_db = db_api.get_execution(exec_db.id)
task_db = self._assert_single_item(exec_db.tasks, name="task1")
exec_db = db_api.get_workflow_execution(exec_db.id)
task_db = self._assert_single_item(
exec_db.task_executions,
name='task1'
)
self.assertEqual(states.SUCCESS, task_db.state)

View File

@ -148,25 +148,25 @@ class LongActionTest(base.EngineTestCase):
exec_db = self.engine.start_workflow('wf', None)
exec_db = db_api.get_execution(exec_db.id)
exec_db = db_api.get_workflow_execution(exec_db.id)
self.assertEqual(states.RUNNING, exec_db.state)
self.assertEqual(states.RUNNING, exec_db.tasks[0].state)
self.assertEqual(states.RUNNING, exec_db.task_executions[0].state)
self.wait_for_action()
# Here's the point when the action is blocked but already running.
# Do the same check again, it should always pass.
exec_db = db_api.get_execution(exec_db.id)
exec_db = db_api.get_workflow_execution(exec_db.id)
self.assertEqual(states.RUNNING, exec_db.state)
self.assertEqual(states.RUNNING, exec_db.tasks[0].state)
self.assertEqual(states.RUNNING, exec_db.task_executions[0].state)
self.unblock_action()
self._await(lambda: self.is_execution_success(exec_db.id))
exec_db = db_api.get_execution(exec_db.id)
exec_db = db_api.get_workflow_execution(exec_db.id)
self.assertDictEqual({'result': 'test'}, exec_db.output)
@ -177,13 +177,13 @@ class LongActionTest(base.EngineTestCase):
exec_db = self.engine.start_workflow('wf', None)
exec_db = db_api.get_execution(exec_db.id)
exec_db = db_api.get_workflow_execution(exec_db.id)
self.assertEqual(states.RUNNING, exec_db.state)
tasks = exec_db.tasks
tasks = exec_db.task_executions
task1 = self._assert_single_item(exec_db.tasks, name='task1')
task1 = self._assert_single_item(exec_db.task_executions, name='task1')
task2 = self._assert_single_item(
tasks,
name='task2',
@ -197,7 +197,7 @@ class LongActionTest(base.EngineTestCase):
self._await(lambda: self.is_task_success(task2.id))
self._await(lambda: self.is_execution_success(exec_db.id))
task1 = db_api.get_task(task1.id)
task1 = db_api.get_task_execution(task1.id)
self.assertDictEqual(
{

View File

@ -77,13 +77,13 @@ class ReverseWorkflowEngineTest(base.EngineTestCase):
# Wait till workflow 'wf1' is completed.
self._await(lambda: self.is_execution_success(exec_db.id))
exec_db = db_api.get_execution(exec_db.id)
exec_db = db_api.get_workflow_execution(exec_db.id)
self.assertEqual(1, len(exec_db.tasks))
self.assertEqual(1, len(db_api.get_tasks()))
self.assertEqual(1, len(exec_db.task_executions))
self.assertEqual(1, len(db_api.get_task_executions()))
self._assert_single_item(
exec_db.tasks,
exec_db.task_executions,
name='task1',
state=states.SUCCESS
)
@ -108,19 +108,19 @@ class ReverseWorkflowEngineTest(base.EngineTestCase):
# Wait till workflow 'wf1' is completed.
self._await(lambda: self.is_execution_success(exec_db.id))
exec_db = db_api.get_execution(exec_db.id)
exec_db = db_api.get_workflow_execution(exec_db.id)
self.assertEqual(2, len(exec_db.tasks))
self.assertEqual(2, len(db_api.get_tasks()))
self.assertEqual(2, len(exec_db.task_executions))
self.assertEqual(2, len(db_api.get_task_executions()))
self._assert_single_item(
exec_db.tasks,
exec_db.task_executions,
name='task1',
state=states.SUCCESS
)
self._assert_single_item(
exec_db.tasks,
exec_db.task_executions,
name='task2',
state=states.SUCCESS
)

View File

@ -87,7 +87,7 @@ class SubworkflowsTest(base.EngineTestCase):
self.assertDictEqual({}, exec1_db.input)
self.assertDictEqual({}, exec1_db.start_params)
db_execs = db_api.get_executions()
db_execs = db_api.get_workflow_executions()
self.assertEqual(2, len(db_execs))
@ -98,11 +98,11 @@ class SubworkflowsTest(base.EngineTestCase):
exec2_db = db_execs[1]
self.assertEqual(project_id, exec2_db.project_id)
self.assertIsNotNone(exec2_db.parent_task_id)
self.assertIsNotNone(exec2_db.task_execution_id)
self.assertDictEqual(
{
'task_name': 'task2',
'parent_task_id': exec2_db.parent_task_id
'parent_task_id': exec2_db.task_execution_id
},
exec2_db.start_params
)
@ -117,7 +117,7 @@ class SubworkflowsTest(base.EngineTestCase):
# Wait till workflow 'wf1' is completed.
self._await(lambda: self.is_execution_success(exec2_db.id))
exec2_db = db_api.get_execution(exec2_db.id)
exec2_db = db_api.get_workflow_execution(exec2_db.id)
self.assertDictEqual(
{
@ -129,7 +129,7 @@ class SubworkflowsTest(base.EngineTestCase):
# Wait till workflow 'wf2' is completed.
self._await(lambda: self.is_execution_success(exec1_db.id))
exec1_db = db_api.get_execution(exec1_db.id)
exec1_db = db_api.get_workflow_execution(exec1_db.id)
self.assertDictEqual(
{
@ -139,8 +139,12 @@ class SubworkflowsTest(base.EngineTestCase):
)
# Check project_id in tasks.
tasks_exec1 = db_api.get_tasks(execution_id=exec1_db.id)
tasks_exec2 = db_api.get_tasks(execution_id=exec2_db.id)
tasks_exec1 = db_api.get_task_executions(
workflow_execution_id=exec1_db.id
)
tasks_exec2 = db_api.get_task_executions(
workflow_execution_id=exec2_db.id
)
task1_exec1 = self._assert_single_item(tasks_exec1, name="task1")
task1_exec2 = self._assert_single_item(tasks_exec2, name="task1")
@ -155,7 +159,7 @@ class SubworkflowsTest(base.EngineTestCase):
def test_subworkflow_error(self):
exec1_db = self.engine.start_workflow('my_wb.wf2', None)
db_execs = db_api.get_executions()
db_execs = db_api.get_workflow_executions()
self.assertEqual(2, len(db_execs))
@ -181,7 +185,7 @@ class SubworkflowsTest(base.EngineTestCase):
self.assertDictEqual({}, exec1_db.input)
self.assertDictEqual({'env': env}, exec1_db.start_params)
db_execs = db_api.get_executions()
db_execs = db_api.get_workflow_executions()
self.assertEqual(2, len(db_execs))
@ -193,11 +197,11 @@ class SubworkflowsTest(base.EngineTestCase):
expected_start_params = {
'task_name': 'task2',
'parent_task_id': exec2_db.parent_task_id,
'parent_task_id': exec2_db.task_execution_id,
'env': env
}
self.assertIsNotNone(exec2_db.parent_task_id)
self.assertIsNotNone(exec2_db.task_execution_id)
self.assertDictEqual(exec2_db.start_params, expected_start_params)
# Wait till workflow 'wf1' is completed.

View File

@ -64,9 +64,9 @@ class TaskDefaultsDirectWorkflowEngineTest(base.EngineTestCase):
self._await(lambda: self.is_execution_success(exec_db.id))
# Note: We need to reread execution to access related tasks.
exec_db = db_api.get_execution(exec_db.id)
exec_db = db_api.get_workflow_execution(exec_db.id)
tasks = exec_db.tasks
tasks = exec_db.task_executions
task1 = self._assert_single_item(tasks, name='task1')
task3 = self._assert_single_item(tasks, name='task3')
@ -154,9 +154,9 @@ class TaskDefaultsReverseWorkflowEngineTest(base.EngineTestCase):
self._await(lambda: self.is_execution_error(exec_db.id))
# Note: We need to reread execution to access related tasks.
exec_db = db_api.get_execution(exec_db.id)
exec_db = db_api.get_workflow_execution(exec_db.id)
tasks = exec_db.tasks
tasks = exec_db.task_executions
self.assertEqual(1, len(tasks))
@ -179,9 +179,9 @@ class TaskDefaultsReverseWorkflowEngineTest(base.EngineTestCase):
self._await(lambda: self.is_execution_error(exec_db.id))
# Note: We need to reread execution to access related tasks.
exec_db = db_api.get_execution(exec_db.id)
exec_db = db_api.get_workflow_execution(exec_db.id)
tasks = exec_db.tasks
tasks = exec_db.task_executions
self.assertEqual(1, len(tasks))
@ -204,9 +204,9 @@ class TaskDefaultsReverseWorkflowEngineTest(base.EngineTestCase):
)
# Note: We need to reread execution to access related tasks.
exec_db = db_api.get_execution(exec_db.id)
exec_db = db_api.get_workflow_execution(exec_db.id)
tasks = exec_db.tasks
tasks = exec_db.task_executions
self.assertEqual(1, len(tasks))

View File

@ -150,9 +150,9 @@ class WithItemsEngineTest(base.EngineTestCase):
)
# Note: We need to reread execution to access related tasks.
exec_db = db_api.get_execution(exec_db.id)
exec_db = db_api.get_workflow_execution(exec_db.id)
tasks = exec_db.tasks
tasks = exec_db.task_executions
task1 = self._assert_single_item(tasks, name='task1')
with_items_context = task1.runtime_context['with_items']
@ -185,9 +185,9 @@ class WithItemsEngineTest(base.EngineTestCase):
)
# Note: We need to reread execution to access related tasks.
exec_db = db_api.get_execution(exec_db.id)
exec_db = db_api.get_workflow_execution(exec_db.id)
tasks = exec_db.tasks
tasks = exec_db.task_executions
task1 = self._assert_single_item(tasks, name='task1')
result = task1.result['result']
@ -213,9 +213,9 @@ class WithItemsEngineTest(base.EngineTestCase):
)
# Note: We need to reread execution to access related tasks.
exec_db = db_api.get_execution(exec_db.id)
exec_db = db_api.get_workflow_execution(exec_db.id)
tasks = exec_db.tasks
tasks = exec_db.task_executions
task1 = self._assert_single_item(tasks, name='task1')
# Since we know that we can receive results in random order,
@ -239,8 +239,8 @@ class WithItemsEngineTest(base.EngineTestCase):
'wb1.wf1_with_items', WF_INPUT_URLS
)
exec_db = db_api.get_execution(exec_db.id)
task_db = exec_db.tasks[0]
exec_db = db_api.get_workflow_execution(exec_db.id)
task_db = exec_db.task_executions[0]
self.engine.on_task_result(task_db.id, wf_utils.TaskResult("Ivan"))
self.engine.on_task_result(task_db.id, wf_utils.TaskResult("John"))
@ -251,9 +251,9 @@ class WithItemsEngineTest(base.EngineTestCase):
)
# Note: We need to reread execution to access related tasks.
exec_db = db_api.get_execution(exec_db.id)
exec_db = db_api.get_workflow_execution(exec_db.id)
task_db = db_api.get_task(task_db.id)
task_db = db_api.get_task_execution(task_db.id)
result = task_db.result['result']
self.assertTrue(isinstance(result, list))

View File

@ -176,21 +176,21 @@ class WorkflowResumeTest(base.EngineTestCase):
self._await(lambda: self.is_execution_paused(exec_db.id))
exec_db = db_api.get_execution(exec_db.id)
exec_db = db_api.get_workflow_execution(exec_db.id)
self.assertEqual(states.PAUSED, exec_db.state)
self.assertEqual(1, len(exec_db.tasks))
self.assertEqual(1, len(exec_db.task_executions))
self.engine.resume_workflow(exec_db.id)
exec_db = db_api.get_execution(exec_db.id)
exec_db = db_api.get_workflow_execution(exec_db.id)
self.assertEqual(states.RUNNING, exec_db.state)
self._await(lambda: self.is_execution_success(exec_db.id))
exec_db = db_api.get_execution(exec_db.id)
exec_db = db_api.get_workflow_execution(exec_db.id)
self.assertEqual(states.SUCCESS, exec_db.state)
self.assertEqual(2, len(exec_db.tasks))
self.assertEqual(2, len(exec_db.task_executions))
def test_resume_reverse(self):
wb_service.create_workbook_v2(RESUME_WORKBOOK_REVERSE)
@ -204,21 +204,21 @@ class WorkflowResumeTest(base.EngineTestCase):
# Note: We need to reread execution to access related tasks.
self.engine.pause_workflow(exec_db.id)
exec_db = db_api.get_execution(exec_db.id)
exec_db = db_api.get_workflow_execution(exec_db.id)
self.assertEqual(states.PAUSED, exec_db.state)
self.assertEqual(1, len(exec_db.tasks))
self.assertEqual(1, len(exec_db.task_executions))
self.engine.resume_workflow(exec_db.id)
exec_db = db_api.get_execution(exec_db.id)
exec_db = db_api.get_workflow_execution(exec_db.id)
self.assertEqual(states.RUNNING, exec_db.state)
self._await(lambda: self.is_execution_success(exec_db.id))
exec_db = db_api.get_execution(exec_db.id)
exec_db = db_api.get_workflow_execution(exec_db.id)
self.assertEqual(states.SUCCESS, exec_db.state)
self.assertEqual(2, len(exec_db.tasks))
self.assertEqual(2, len(exec_db.task_executions))
def test_resume_two_branches(self):
wb_service.create_workbook_v2(WORKBOOK_TWO_BRANCHES)
@ -228,23 +228,23 @@ class WorkflowResumeTest(base.EngineTestCase):
self._await(lambda: self.is_execution_paused(exec_db.id))
exec_db = db_api.get_execution(exec_db.id)
exec_db = db_api.get_workflow_execution(exec_db.id)
self.assertEqual(states.PAUSED, exec_db.state)
self.assertEqual(1, len(exec_db.tasks))
self.assertEqual(1, len(exec_db.task_executions))
self.engine.resume_workflow(exec_db.id)
exec_db = db_api.get_execution(exec_db.id)
exec_db = db_api.get_workflow_execution(exec_db.id)
self.assertEqual(states.RUNNING, exec_db.state)
self._await(lambda: self.is_execution_success(exec_db.id))
exec_db = db_api.get_execution(exec_db.id)
exec_db = db_api.get_workflow_execution(exec_db.id)
self.assertEqual(states.SUCCESS, exec_db.state)
# We can see 3 tasks in execution.
self.assertEqual(3, len(exec_db.tasks))
self.assertEqual(3, len(exec_db.task_executions))
def test_resume_two_start_tasks(self):
wb_service.create_workbook_v2(WORKBOOK_TWO_START_TASKS)
@ -254,21 +254,21 @@ class WorkflowResumeTest(base.EngineTestCase):
self._await(lambda: self.is_execution_paused(exec_db.id))
exec_db = db_api.get_execution(exec_db.id)
exec_db = db_api.get_workflow_execution(exec_db.id)
self.assertEqual(states.PAUSED, exec_db.state)
self.assertEqual(2, len(exec_db.tasks))
self.assertEqual(2, len(exec_db.task_executions))
self.engine.resume_workflow(exec_db.id)
exec_db = db_api.get_execution(exec_db.id)
exec_db = db_api.get_workflow_execution(exec_db.id)
self.assertEqual(states.RUNNING, exec_db.state)
self._await(lambda: self.is_execution_success(exec_db.id))
exec_db = db_api.get_execution(exec_db.id)
exec_db = db_api.get_workflow_execution(exec_db.id)
self.assertEqual(states.SUCCESS, exec_db.state)
self.assertEqual(3, len(exec_db.tasks))
self.assertEqual(3, len(exec_db.task_executions))
def test_resume_different_task_states(self):
wb_service.create_workbook_v2(WORKBOOK_DIFFERENT_TASK_STATES)
@ -278,18 +278,18 @@ class WorkflowResumeTest(base.EngineTestCase):
self._await(lambda: self.is_execution_paused(exec_db.id))
exec_db = db_api.get_execution(exec_db.id)
exec_db = db_api.get_workflow_execution(exec_db.id)
self.assertEqual(states.PAUSED, exec_db.state)
self.assertEqual(2, len(exec_db.tasks))
self.assertEqual(2, len(exec_db.task_executions))
task2 = self._assert_single_item(exec_db.tasks, name="task2")
task2 = self._assert_single_item(exec_db.task_executions, name='task2')
# Task2 is not finished yet.
self.assertFalse(states.is_completed(task2.state))
self.engine.resume_workflow(exec_db.id)
exec_db = db_api.get_execution(exec_db.id)
exec_db = db_api.get_workflow_execution(exec_db.id)
self.assertEqual(states.RUNNING, exec_db.state)
@ -297,10 +297,10 @@ class WorkflowResumeTest(base.EngineTestCase):
self.engine.on_task_result(task2.id, utils.TaskResult())
self._await(lambda: self.is_execution_success(exec_db.id))
exec_db = db_api.get_execution(exec_db.id)
exec_db = db_api.get_workflow_execution(exec_db.id)
self.assertEqual(states.SUCCESS, exec_db.state)
self.assertEqual(4, len(exec_db.tasks))
self.assertEqual(4, len(exec_db.task_executions))
@mock.patch.object(de.DefaultEngine, '_fail_workflow')
def test_resume_fails(self, mock_fw):
@ -310,7 +310,7 @@ class WorkflowResumeTest(base.EngineTestCase):
self._await(lambda: self.is_execution_paused(exec_db.id))
exec_db = db_api.get_execution(exec_db.id)
exec_db = db_api.get_workflow_execution(exec_db.id)
self.assertEqual(states.PAUSED, exec_db.state)
# Simulate failure and check if it is handled.

View File

@ -18,8 +18,8 @@ from mistral.tests import base
class ActionManagerTest(base.DbTestCase):
def test_action_input(self):
std_http = db_api.get_action("std.http")
std_email = db_api.get_action("std.email")
std_http = db_api.get_action_definition("std.http")
std_email = db_api.get_action_definition("std.email")
http_action_input = (
"url, method=GET, params=None, body=None, "
@ -38,8 +38,8 @@ class ActionManagerTest(base.DbTestCase):
self.assertEqual(std_email_input, std_email.input)
def test_action_description(self):
std_http = db_api.get_action("std.http")
std_echo = db_api.get_action("std.echo")
std_http = db_api.get_action_definition("std.http")
std_echo = db_api.get_action_definition("std.echo")
self.assertIn("Constructs an HTTP action", std_http.description)
self.assertIn("param body: (optional) Dictionary, bytes",

View File

@ -60,8 +60,8 @@ class ActionServiceTest(base.DbTestCase):
def setUp(self):
super(ActionServiceTest, self).setUp()
self.addCleanup(db_api.delete_actions, name='action1')
self.addCleanup(db_api.delete_actions, name='action2')
self.addCleanup(db_api.delete_action_definitions, name='action1')
self.addCleanup(db_api.delete_action_definitions, name='action2')
def test_create_actions(self):
db_actions = action_service.create_actions(ACTION_LIST)

View File

@ -116,7 +116,7 @@ class WorkbookServiceTest(base.DbTestCase):
self.assertIsNotNone(wb_db.spec)
self.assertListEqual(['test'], wb_db.tags)
db_actions = db_api.get_actions(name='my_wb.concat')
db_actions = db_api.get_action_definitions(name='my_wb.concat')
self.assertEqual(1, len(db_actions))
@ -130,7 +130,7 @@ class WorkbookServiceTest(base.DbTestCase):
self.assertEqual('concat', action_spec.get_name())
self.assertEqual('std.echo', action_spec.get_base())
db_wfs = db_api.get_workflows()
db_wfs = db_api.get_workflow_definitions()
self.assertEqual(2, len(db_wfs))
@ -153,7 +153,7 @@ class WorkbookServiceTest(base.DbTestCase):
wb_db = wb_service.create_workbook_v2(WORKBOOK)
self.assertIsNotNone(wb_db)
self.assertEqual(2, len(db_api.get_workflows()))
self.assertEqual(2, len(db_api.get_workflow_definitions()))
# Update workbook.
wb_db = wb_service.update_workbook_v2(UPDATED_WORKBOOK)
@ -163,7 +163,7 @@ class WorkbookServiceTest(base.DbTestCase):
self.assertEqual(UPDATED_WORKBOOK, wb_db.definition)
self.assertListEqual(['test'], wb_db.tags)
db_wfs = db_api.get_workflows()
db_wfs = db_api.get_workflow_definitions()
self.assertEqual(2, len(db_wfs))

View File

@ -60,7 +60,7 @@ class DirectWorkflowHandlerTest(base.BaseTest):
exec_db = models.WorkflowExecution()
exec_db.update({
'id': '1-2-3-4',
'wf_spec': wb_spec.get_workflows().get('wf1').to_dict(),
'spec': wb_spec.get_workflows().get('wf1').to_dict(),
'state': states.IDLE
})
@ -79,7 +79,7 @@ class DirectWorkflowHandlerTest(base.BaseTest):
'state': state
})
self.exec_db.tasks.append(task_db)
self.exec_db.task_executions.append(task_db)
return task_db

View File

@ -52,7 +52,7 @@ class ReverseWorkflowHandlerTest(base.BaseTest):
exec_db = models.WorkflowExecution()
exec_db.update({
'id': '1-2-3-4',
'wf_spec': wb_spec.get_workflows().get('wf1').to_dict(),
'spec': wb_spec.get_workflows().get('wf1').to_dict(),
'state': states.IDLE
})
@ -71,7 +71,7 @@ class ReverseWorkflowHandlerTest(base.BaseTest):
'state': state
})
self.exec_db.tasks.append(task_db)
self.exec_db.task_executions.append(task_db)
return task_db

View File

@ -33,7 +33,7 @@ def info(obj, msg, *args, **kvargs):
task_id = ''
if type(obj) is models.TaskExecution:
exec_id = obj.execution_id
exec_id = obj.workflow_execution_id
task_id = obj.id
elif type(obj) is models.WorkflowExecution:
exec_id = obj.id

View File

@ -45,7 +45,7 @@ class WorkflowHandler(object):
:param exec_db: Execution.
"""
self.exec_db = exec_db
self.wf_spec = spec_parser.get_workflow_spec(exec_db.wf_spec)
self.wf_spec = spec_parser.get_workflow_spec(exec_db.spec)
@abc.abstractmethod
def start_workflow(self, **params):
@ -286,7 +286,7 @@ class WorkflowHandler(object):
"""
self._set_execution_state(states.RUNNING)
tasks = self.exec_db.tasks
tasks = self.exec_db.task_executions
if not all([t.state == states.RUNNING for t in tasks]):
return self._find_commands_to_resume(tasks)
@ -309,7 +309,7 @@ class WorkflowHandler(object):
wf_trace.info(
self.exec_db,
"Execution of workflow '%s' [%s -> %s]"
% (self.exec_db.wf_name, cur_state, state)
% (self.exec_db.workflow_name, cur_state, state)
)
self.exec_db.state = state
self.exec_db.state_info = state_info

View File

@ -195,7 +195,7 @@ def add_execution_to_context(exec_db, context):
context['__execution'] = {
'id': exec_db.id,
'wf_spec': exec_db['wf_spec'],
'spec': exec_db.spec,
'start_params': exec_db.start_params,
'input': exec_db.input
}

View File

@ -78,7 +78,7 @@ class ReverseWorkflowHandler(base.WorkflowHandler):
resolved_task_specs = []
success_task_names = set()
for t in self.exec_db.tasks:
for t in self.exec_db.task_executions:
if t.state == states.SUCCESS:
success_task_names.add(t.name)
@ -145,6 +145,8 @@ class ReverseWorkflowHandler(base.WorkflowHandler):
return dep_t_specs
def _find_db_task(self, name):
db_tasks = filter(lambda t: t.name == name, self.exec_db.tasks)
db_tasks = filter(
lambda t: t.name == name, self.exec_db.task_executions
)
return db_tasks[0] if db_tasks else None

View File

@ -47,8 +47,8 @@ class TaskResultSerializer(serializer.Serializer):
def find_db_task(exec_db, task_spec):
db_tasks = [
t_db for t_db in exec_db.tasks
if t_db.name == task_spec.get_name()
t for t in exec_db.task_executions
if t.name == task_spec.get_name()
]
return db_tasks[0] if len(db_tasks) > 0 else None
@ -59,16 +59,18 @@ def find_db_tasks(exec_db, task_specs):
def find_running_tasks(exec_db):
return [t_db for t_db in exec_db.tasks if t_db.state == states.RUNNING]
return [t for t in exec_db.task_executions if t.state == states.RUNNING]
def find_completed_tasks(exec_db):
return [t_db for t_db in exec_db.tasks if states.is_completed(t_db.state)]
return [
t for t in exec_db.task_executions if states.is_completed(t.state)
]
def find_successful_tasks(exec_db):
return [t_db for t_db in exec_db.tasks if t_db.state == states.SUCCESS]
return [t for t in exec_db.task_executions if t.state == states.SUCCESS]
def find_error_tasks(exec_db):
return [t_db for t_db in exec_db.tasks if t_db.state == states.ERROR]
return [t for t in exec_db.task_executions if t.state == states.ERROR]

View File

@ -22,7 +22,7 @@ from mistral.workflow import reverse_workflow
def create_workflow_handler(exec_db, wf_spec=None):
if not wf_spec:
wf_spec = spec_parser.get_workflow_spec(exec_db['wf_spec'])
wf_spec = spec_parser.get_workflow_spec(exec_db.spec)
handler_cls = _select_workflow_handler(wf_spec)