Refactoring workflow handler

* Introduced new class Workflow that manages life-cycle of running
  workflows and is responsible for managing workflow persistent state
* Moved all workflow level logic to workflow handler and Workflow class
* Changed semantics if how workflows start errors are handled.
  Previously, in case of invalid user input Mistral engine would store
  information about error in "state_info" field of workflow execution
  and bubble up an exception to the user. This approach was incorrect
  for a number of reasons including broken semantics: if an exception
  was raised due to invalid input it's normal to expect that system
  state has not changed. After this refactoring, engine only raises
  an exception in case of bad user input. That way behavior is
  consistent with the idea of exceptional situations.
* Fixed unit tests in according to the previous point
* Fixed a number of logical issues in tests. For example, in
  test_default_engine.py we expected one type of errors (e.g. env not
  found) but effectively received another one (invalid input).

Partially implements: blueprint mistral-engine-error-handling

Change-Id: I09070411fd833df8284cb80db69b8401a40eb6fe
This commit is contained in:
Renat Akhmerov 2016-06-01 13:33:20 +07:00
parent 7b2857a17b
commit e2c89f777d
36 changed files with 773 additions and 620 deletions

View File

@ -48,7 +48,7 @@ def setup_db():
try:
models.Workbook.metadata.create_all(b.get_engine())
except sa.exc.OperationalError as e:
raise exc.DBException("Failed to setup database: %s" % e)
raise exc.DBError("Failed to setup database: %s" % e)
def drop_db():
@ -58,7 +58,7 @@ def drop_db():
models.Workbook.metadata.drop_all(b.get_engine())
_facade = None
except Exception as e:
raise exc.DBException("Failed to drop database: %s" % e)
raise exc.DBError("Failed to drop database: %s" % e)
# Transaction management.
@ -183,7 +183,7 @@ def get_workbook(name):
wb = _get_workbook(name)
if not wb:
raise exc.DBEntityNotFoundException(
raise exc.DBEntityNotFoundError(
"Workbook not found [workbook_name=%s]" % name
)
@ -207,7 +207,7 @@ def create_workbook(values, session=None):
try:
wb.save(session=session)
except db_exc.DBDuplicateEntry as e:
raise exc.DBDuplicateEntryException(
raise exc.DBDuplicateEntryError(
"Duplicate entry for WorkbookDefinition: %s" % e.columns
)
@ -219,7 +219,7 @@ def update_workbook(name, values, session=None):
wb = _get_workbook(name)
if not wb:
raise exc.DBEntityNotFoundException(
raise exc.DBEntityNotFoundError(
"Workbook not found [workbook_name=%s]" % name
)
@ -241,7 +241,7 @@ def delete_workbook(name, session=None):
wb = _get_workbook(name)
if not wb:
raise exc.DBEntityNotFoundException(
raise exc.DBEntityNotFoundError(
"Workbook not found [workbook_name=%s]" % name
)
@ -284,7 +284,7 @@ def get_workflow_definition(identifier):
else _get_workflow_definition(identifier))
if not wf_def:
raise exc.DBEntityNotFoundException(
raise exc.DBEntityNotFoundError(
"Workflow not found [workflow_identifier=%s]" % identifier
)
@ -295,7 +295,7 @@ def get_workflow_definition_by_id(id):
wf_def = _get_workflow_definition_by_id(id)
if not wf_def:
raise exc.DBEntityNotFoundException(
raise exc.DBEntityNotFoundError(
"Workflow not found [workflow_id=%s]" % id
)
@ -324,7 +324,7 @@ def get_workflow_definitions(limit=None, marker=None, sort_keys=None,
query
)
except Exception as e:
raise exc.DBQueryEntryException(
raise exc.DBQueryEntryError(
"Failed when querying database, error type: %s, "
"error message: %s" % (e.__class__.__name__, e.message)
)
@ -339,7 +339,7 @@ def create_workflow_definition(values, session=None):
try:
wf_def.save(session=session)
except db_exc.DBDuplicateEntry as e:
raise exc.DBDuplicateEntryException(
raise exc.DBDuplicateEntryError(
"Duplicate entry for WorkflowDefinition: %s" % e.columns
)
@ -366,7 +366,7 @@ def update_workflow_definition(identifier, values, session=None):
try:
[get_cron_trigger(name) for name in cron_triggers]
except exc.DBEntityNotFoundException:
except exc.DBEntityNotFoundError:
raise exc.NotAllowedException(
"Can not update scope of workflow that has triggers "
"associated in other tenants."
@ -403,7 +403,7 @@ def delete_workflow_definition(identifier, session=None):
cron_triggers = _get_associated_cron_triggers(identifier)
if cron_triggers:
raise exc.DBException(
raise exc.DBError(
"Can't delete workflow that has triggers associated. "
"[workflow_identifier=%s], [cron_trigger_name(s)=%s]" %
(identifier, ', '.join(cron_triggers))
@ -449,7 +449,7 @@ def get_action_definition_by_id(id):
action_def = _get_db_object_by_id(models.ActionDefinition, id)
if not action_def:
raise exc.DBEntityNotFoundException(
raise exc.DBEntityNotFoundError(
"Action not found [action_id=%s]" % id
)
@ -460,7 +460,7 @@ def get_action_definition(name):
a_def = _get_action_definition(name)
if not a_def:
raise exc.DBEntityNotFoundException(
raise exc.DBEntityNotFoundError(
"Action definition not found [action_name=%s]" % name
)
@ -485,7 +485,7 @@ def get_action_definitions(limit=None, marker=None, sort_keys=['name'],
query
)
except Exception as e:
raise exc.DBQueryEntryException(
raise exc.DBQueryEntryError(
"Failed when querying database, error type: %s, "
"error message: %s" % (e.__class__.__name__, e.message)
)
@ -500,7 +500,7 @@ def create_action_definition(values, session=None):
try:
a_def.save(session=session)
except db_exc.DBDuplicateEntry as e:
raise exc.DBDuplicateEntryException(
raise exc.DBDuplicateEntryError(
"Duplicate entry for action %s: %s" % (a_def.name, e.columns)
)
@ -512,7 +512,7 @@ def update_action_definition(name, values, session=None):
a_def = _get_action_definition(name)
if not a_def:
raise exc.DBEntityNotFoundException(
raise exc.DBEntityNotFoundError(
"Action definition not found [action_name=%s]" % name
)
@ -534,7 +534,7 @@ def delete_action_definition(name, session=None):
a_def = _get_action_definition(name)
if not a_def:
raise exc.DBEntityNotFoundException(
raise exc.DBEntityNotFoundError(
"Action definition not found [action_name=%s]" % name
)
@ -556,7 +556,7 @@ def get_execution(id):
ex = _get_execution(id)
if not ex:
raise exc.DBEntityNotFoundException(
raise exc.DBEntityNotFoundError(
"Execution not found [execution_id=%s]" % id
)
@ -584,7 +584,7 @@ def create_execution(values, session=None):
try:
ex.save(session=session)
except db_exc.DBDuplicateEntry as e:
raise exc.DBDuplicateEntryException(
raise exc.DBDuplicateEntryError(
"Duplicate entry for Execution: %s" % e.columns
)
@ -596,7 +596,7 @@ def update_execution(id, values, session=None):
ex = _get_execution(id)
if not ex:
raise exc.DBEntityNotFoundException(
raise exc.DBEntityNotFoundError(
"Execution not found [execution_id=%s]" % id
)
@ -618,7 +618,7 @@ def delete_execution(id, session=None):
ex = _get_execution(id)
if not ex:
raise exc.DBEntityNotFoundException(
raise exc.DBEntityNotFoundError(
"Execution not found [execution_id=%s]" % id
)
@ -644,7 +644,7 @@ def get_action_execution(id):
a_ex = _get_action_execution(id)
if not a_ex:
raise exc.DBEntityNotFoundException(
raise exc.DBEntityNotFoundError(
"ActionExecution not found [id=%s]" % id
)
@ -672,7 +672,7 @@ def create_action_execution(values, session=None):
try:
a_ex.save(session=session)
except db_exc.DBDuplicateEntry as e:
raise exc.DBDuplicateEntryException(
raise exc.DBDuplicateEntryError(
"Duplicate entry for ActionExecution: %s" % e.columns
)
@ -684,7 +684,7 @@ def update_action_execution(id, values, session=None):
a_ex = _get_action_execution(id)
if not a_ex:
raise exc.DBEntityNotFoundException(
raise exc.DBEntityNotFoundError(
"ActionExecution not found [id=%s]" % id
)
@ -706,7 +706,7 @@ def delete_action_execution(id, session=None):
a_ex = _get_action_execution(id)
if not a_ex:
raise exc.DBEntityNotFoundException(
raise exc.DBEntityNotFoundError(
"ActionExecution not found [id=%s]" % id
)
@ -732,7 +732,7 @@ def get_workflow_execution(id):
wf_ex = _get_workflow_execution(id)
if not wf_ex:
raise exc.DBEntityNotFoundException(
raise exc.DBEntityNotFoundError(
"WorkflowExecution not found [id=%s]" % id
)
@ -761,7 +761,7 @@ def get_workflow_executions(limit=None, marker=None, sort_keys=['created_at'],
query
)
except Exception as e:
raise exc.DBQueryEntryException(
raise exc.DBQueryEntryError(
"Failed when quering database, error type: %s, "
"error message: %s" % (e.__class__.__name__, e.message)
)
@ -776,7 +776,7 @@ def create_workflow_execution(values, session=None):
try:
wf_ex.save(session=session)
except db_exc.DBDuplicateEntry as e:
raise exc.DBDuplicateEntryException(
raise exc.DBDuplicateEntryError(
"Duplicate entry for WorkflowExecution: %s" % e.columns
)
@ -788,7 +788,7 @@ def update_workflow_execution(id, values, session=None):
wf_ex = _get_workflow_execution(id)
if not wf_ex:
raise exc.DBEntityNotFoundException(
raise exc.DBEntityNotFoundError(
"WorkflowExecution not found [id=%s]" % id
)
@ -810,7 +810,7 @@ def delete_workflow_execution(id, session=None):
wf_ex = _get_workflow_execution(id)
if not wf_ex:
raise exc.DBEntityNotFoundException(
raise exc.DBEntityNotFoundError(
"WorkflowExecution not found [id=%s]" % id
)
@ -832,7 +832,7 @@ def get_task_execution(id):
task_ex = _get_task_execution(id)
if not task_ex:
raise exc.DBEntityNotFoundException(
raise exc.DBEntityNotFoundError(
"Task execution not found [id=%s]" % id
)
@ -856,7 +856,7 @@ def create_task_execution(values, session=None):
try:
task_ex.save(session=session)
except db_exc.DBDuplicateEntry as e:
raise exc.DBDuplicateEntryException(
raise exc.DBDuplicateEntryError(
"Duplicate entry for TaskExecution: %s" % e.columns
)
@ -868,7 +868,7 @@ def update_task_execution(id, values, session=None):
task_ex = _get_task_execution(id)
if not task_ex:
raise exc.DBEntityNotFoundException(
raise exc.DBEntityNotFoundError(
"TaskExecution not found [id=%s]" % id
)
@ -890,7 +890,7 @@ def delete_task_execution(id, session=None):
task_ex = _get_task_execution(id)
if not task_ex:
raise exc.DBEntityNotFoundException(
raise exc.DBEntityNotFoundError(
"TaskExecution not found [id=%s]" % id
)
@ -920,7 +920,7 @@ def create_delayed_call(values, session=None):
try:
delayed_call.save(session)
except db_exc.DBDuplicateEntry as e:
raise exc.DBDuplicateEntryException(
raise exc.DBDuplicateEntryError(
"Duplicate entry for DelayedCall: %s" % e.columns
)
@ -932,7 +932,7 @@ def delete_delayed_call(id, session=None):
delayed_call = _get_delayed_call(id)
if not delayed_call:
raise exc.DBEntityNotFoundException(
raise exc.DBEntityNotFoundError(
"DelayedCall not found [id=%s]" % id
)
@ -982,7 +982,7 @@ def get_delayed_call(id, session=None):
delayed_call = _get_delayed_call(id=id, session=session)
if not delayed_call:
raise exc.DBEntityNotFoundException(
raise exc.DBEntityNotFoundError(
"Delayed Call not found [id=%s]" % id
)
@ -1020,7 +1020,7 @@ def get_cron_trigger(name):
cron_trigger = _get_cron_trigger(name)
if not cron_trigger:
raise exc.DBEntityNotFoundException(
raise exc.DBEntityNotFoundError(
"Cron trigger not found [name=%s]" % name
)
@ -1054,14 +1054,14 @@ def create_cron_trigger(values, session=None):
try:
cron_trigger.save(session=session)
except db_exc.DBDuplicateEntry as e:
raise exc.DBDuplicateEntryException(
raise exc.DBDuplicateEntryError(
"Duplicate entry for cron trigger %s: %s"
% (cron_trigger.name, e.columns)
)
# TODO(nmakhotkin): Remove this 'except' after fixing
# https://bugs.launchpad.net/oslo.db/+bug/1458583.
except db_exc.DBError as e:
raise exc.DBDuplicateEntryException(
raise exc.DBDuplicateEntryError(
"Duplicate entry for cron trigger: %s" % e
)
@ -1073,7 +1073,7 @@ def update_cron_trigger(name, values, session=None, query_filter=None):
cron_trigger = _get_cron_trigger(name)
if not cron_trigger:
raise exc.DBEntityNotFoundException(
raise exc.DBEntityNotFoundError(
"Cron trigger not found [name=%s]" % name
)
@ -1122,7 +1122,7 @@ def delete_cron_trigger(name, session=None):
cron_trigger = _get_cron_trigger(name)
if not cron_trigger:
raise exc.DBEntityNotFoundException(
raise exc.DBEntityNotFoundError(
"Cron trigger not found [name=%s]" % name
)
@ -1156,7 +1156,7 @@ def get_environment(name):
env = _get_environment(name)
if not env:
raise exc.DBEntityNotFoundException(
raise exc.DBEntityNotFoundError(
"Environment not found [name=%s]" % name
)
@ -1180,7 +1180,7 @@ def create_environment(values, session=None):
try:
env.save(session=session)
except db_exc.DBDuplicateEntry as e:
raise exc.DBDuplicateEntryException(
raise exc.DBDuplicateEntryError(
"Duplicate entry for Environment: %s" % e.columns
)
@ -1192,7 +1192,7 @@ def update_environment(name, values, session=None):
env = _get_environment(name)
if not env:
raise exc.DBEntityNotFoundException(
raise exc.DBEntityNotFoundError(
"Environment not found [name=%s]" % name
)
@ -1216,7 +1216,7 @@ def delete_environment(name, session=None):
env = _get_environment(name)
if not env:
raise exc.DBEntityNotFoundException(
raise exc.DBEntityNotFoundError(
"Environment not found [name=%s]" % name
)
@ -1278,7 +1278,7 @@ def create_resource_member(values, session=None):
try:
res_member.save(session=session)
except db_exc.DBDuplicateEntry as e:
raise exc.DBDuplicateEntryException(
raise exc.DBDuplicateEntryError(
"Duplicate entry for ResourceMember: %s" % e.columns
)
@ -1299,7 +1299,7 @@ def get_resource_member(resource_id, res_type, member_id):
).first()
if not res_member:
raise exc.DBEntityNotFoundException(
raise exc.DBEntityNotFoundError(
"Resource member not found [resource_id=%s, member_id=%s]" %
(resource_id, member_id)
)
@ -1329,7 +1329,7 @@ def update_resource_member(resource_id, res_type, member_id, values,
# Only member who is not the owner of the resource can update the
# membership status.
if member_id != security.get_project_id():
raise exc.DBEntityNotFoundException(
raise exc.DBEntityNotFoundError(
"Resource member not found [resource_id=%s, member_id=%s]" %
(resource_id, member_id)
)
@ -1343,7 +1343,7 @@ def update_resource_member(resource_id, res_type, member_id, values,
).first()
if not res_member:
raise exc.DBEntityNotFoundException(
raise exc.DBEntityNotFoundError(
"Resource member not found [resource_id=%s, member_id=%s]" %
(resource_id, member_id)
)
@ -1362,7 +1362,7 @@ def delete_resource_member(resource_id, res_type, member_id, session=None):
res_member = query.filter(_get_criterion(resource_id, member_id)).first()
if not res_member:
raise exc.DBEntityNotFoundException(
raise exc.DBEntityNotFoundError(
"Resource member not found [resource_id=%s, member_id=%s]" %
(resource_id, member_id)
)

View File

@ -20,10 +20,10 @@ import six
from mistral.db.v2 import api as db_api
from mistral.engine import rpc
from mistral.engine import utils as e_utils
from mistral.engine import workflow_handler as wf_handler
from mistral import exceptions as exc
from mistral import expressions as expr
from mistral.services import action_manager as a_m
from mistral.services import executions as wf_ex_service
from mistral.services import scheduler
from mistral.services import security
from mistral import utils
@ -36,7 +36,6 @@ from mistral.workflow import utils as wf_utils
LOG = logging.getLogger(__name__)
_RUN_EXISTING_ACTION_PATH = 'mistral.engine.actions._run_existing_action'
_RESUME_WORKFLOW_PATH = 'mistral.engine.actions._resume_workflow'
@six.add_metaclass(abc.ABCMeta)
@ -381,24 +380,13 @@ class WorkflowAction(Action):
wf_params[k] = v
del input_dict[k]
wf_ex, _ = wf_ex_service.create_workflow_execution(
wf_def.name,
wf_handler.start_workflow(
wf_def.id,
input_dict,
"sub-workflow execution",
wf_params,
wf_spec
wf_params
)
scheduler.schedule_call(
None,
_RESUME_WORKFLOW_PATH,
0,
wf_ex_id=wf_ex.id,
env=None
)
# TODO(rakhmerov): Add info logging.
def run(self, input_dict, target, index=0, desc='', save=True):
raise NotImplemented('Does not apply to this WorkflowAction.')
@ -411,10 +399,6 @@ class WorkflowAction(Action):
pass
def _resume_workflow(wf_ex_id, env):
rpc.get_engine_client().resume_workflow(wf_ex_id, env=env)
def _run_existing_action(action_ex_id, target):
action_ex = db_api.get_action_execution(action_ex_id)
action_def = db_api.get_action_definition(action_ex.name)

View File

@ -78,19 +78,19 @@ class Engine(object):
raise NotImplementedError
@abc.abstractmethod
def resume_workflow(self, wf_ex_id):
def resume_workflow(self, wf_ex_id, env=None):
"""Resumes workflow.
:param wf_ex_id: Execution id.
:param env: Workflow environment.
:return: Workflow execution object.
"""
raise NotImplementedError
@abc.abstractmethod
def rerun_workflow(self, wf_ex_id, task_ex_id, reset=True, env=None):
def rerun_workflow(self, task_ex_id, reset=True, env=None):
"""Rerun workflow from the specified task.
:param wf_ex_id: Workflow execution id.
:param task_ex_id: Task execution id.
:param reset: If True, reset task state including deleting its action
executions.

View File

@ -20,14 +20,8 @@ from mistral.db.v2 import api as db_api
from mistral.db.v2.sqlalchemy import models as db_models
from mistral.engine import action_handler
from mistral.engine import base
from mistral.engine import dispatcher
from mistral.engine import workflow_handler as wf_handler
from mistral.services import executions as wf_ex_service
from mistral.services import workflows as wf_service
from mistral import utils as u
from mistral.workflow import base as wf_base
from mistral.workflow import commands
from mistral.workflow import states
LOG = logging.getLogger(__name__)
@ -47,23 +41,12 @@ class DefaultEngine(base.Engine, coordination.Service):
def start_workflow(self, wf_identifier, wf_input, description='',
**params):
with db_api.transaction():
# TODO(rakhmerov): It needs to be hidden in workflow_handler and
# Workflow abstraction.
# The new workflow execution will be in an IDLE
# state on initial record creation.
wf_ex, wf_spec = wf_ex_service.create_workflow_execution(
wf_ex = wf_handler.start_workflow(
wf_identifier,
wf_input,
description,
params
)
wf_handler.set_workflow_state(wf_ex, states.RUNNING)
wf_ctrl = wf_base.get_controller(wf_ex, wf_spec)
cmds = wf_ctrl.continue_workflow()
dispatcher.dispatch_workflow_commands(wf_ex, cmds)
return wf_ex.get_clone()
@ -115,88 +98,40 @@ class DefaultEngine(base.Engine, coordination.Service):
with db_api.transaction():
wf_ex = wf_handler.lock_workflow_execution(wf_ex_id)
wf_handler.set_workflow_state(wf_ex, states.PAUSED)
wf_handler.pause_workflow(wf_ex)
return wf_ex
@staticmethod
def _continue_workflow(wf_ex, task_ex=None, reset=True, env=None):
wf_ex = wf_service.update_workflow_execution_env(wf_ex, env)
wf_handler.set_workflow_state(
wf_ex,
states.RUNNING,
set_upstream=True
)
wf_ctrl = wf_base.get_controller(wf_ex)
# TODO(rakhmerov): Add error handling.
# Calculate commands to process next.
cmds = wf_ctrl.continue_workflow(task_ex=task_ex, reset=reset, env=env)
# When resuming a workflow we need to ignore all 'pause'
# commands because workflow controller takes tasks that
# completed within the period when the workflow was paused.
# TODO(rakhmerov): This all should be in workflow handler, it's too
# specific for engine level.
cmds = list(
filter(
lambda c: not isinstance(c, commands.PauseWorkflow),
cmds
)
)
# Since there's no explicit task causing the operation
# we need to mark all not processed tasks as processed
# because workflow controller takes only completed tasks
# with flag 'processed' equal to False.
for t_ex in wf_ex.task_executions:
if states.is_completed(t_ex.state) and not t_ex.processed:
t_ex.processed = True
dispatcher.dispatch_workflow_commands(wf_ex, cmds)
if not cmds:
wf_handler.check_workflow_completion(wf_ex)
return wf_ex.get_clone()
return wf_ex.get_clone()
@u.log_exec(LOG)
def rerun_workflow(self, wf_ex_id, task_ex_id, reset=True, env=None):
# TODO(rakhmerov): Rewrite this functionality with Task abstraction.
def rerun_workflow(self, task_ex_id, reset=True, env=None):
with db_api.transaction():
wf_ex = wf_handler.lock_workflow_execution(wf_ex_id)
task_ex = db_api.get_task_execution(task_ex_id)
if task_ex.workflow_execution.id != wf_ex_id:
raise ValueError('Workflow execution ID does not match.')
wf_ex = wf_handler.lock_workflow_execution(
task_ex.workflow_execution_id
)
if wf_ex.state == states.PAUSED:
return wf_ex.get_clone()
wf_handler.rerun_workflow(wf_ex, task_ex, reset=reset, env=env)
# TODO(rakhmerov): This should be a call to workflow handler.
return self._continue_workflow(wf_ex, task_ex, reset, env=env)
return wf_ex.get_clone()
@u.log_exec(LOG)
def resume_workflow(self, wf_ex_id, env=None):
# TODO(rakhmerov): Rewrite this functionality with Task abstraction.
with db_api.transaction():
wf_ex = wf_handler.lock_workflow_execution(wf_ex_id)
if (not states.is_paused(wf_ex.state) and
not states.is_idle(wf_ex.state)):
return wf_ex.get_clone()
wf_handler.resume_workflow(wf_ex, env=env)
return self._continue_workflow(wf_ex, env=env)
return wf_ex.get_clone()
@u.log_exec(LOG)
def stop_workflow(self, wf_ex_id, state, message=None):
with db_api.transaction():
wf_ex = wf_handler.lock_workflow_execution(wf_ex_id)
return wf_handler.stop_workflow(wf_ex, state, message)
wf_handler.stop_workflow(wf_ex, state, message)
return wf_ex.get_clone()
@u.log_exec(LOG)
def rollback_workflow(self, wf_ex_id):

View File

@ -31,11 +31,7 @@ def dispatch_workflow_commands(wf_ex, wf_cmds):
if isinstance(cmd, (commands.RunTask, commands.RunExistingTask)):
task_handler.run_task(cmd)
elif isinstance(cmd, commands.SetWorkflowState):
# TODO(rakhmerov): Make just a single call to workflow_handler
if states.is_completed(cmd.new_state):
wf_handler.stop_workflow(cmd.wf_ex, cmd.new_state, cmd.msg)
else:
wf_handler.set_workflow_state(wf_ex, cmd.new_state, cmd.msg)
wf_handler.set_workflow_state(wf_ex, cmd.new_state, cmd.msg)
elif isinstance(cmd, commands.Noop):
# Do nothing.
pass

View File

@ -58,7 +58,7 @@ def run_task(wf_cmd):
return
if task.is_completed():
wf_handler.check_workflow_completion(wf_cmd.wf_ex)
wf_handler.on_task_complete(task.task_ex)
def on_action_complete(action_ex):
@ -102,7 +102,7 @@ def on_action_complete(action_ex):
return
if task.is_completed():
wf_handler.check_workflow_completion(wf_ex)
wf_handler.on_task_complete(task_ex)
def fail_task(task_ex, msg):
@ -120,7 +120,7 @@ def continue_task(task_ex):
task.run()
if task.is_completed():
wf_handler.check_workflow_completion(task_ex.workflow_execution)
wf_handler.on_task_complete(task_ex)
def complete_task(task_ex, state, state_info):
@ -130,7 +130,7 @@ def complete_task(task_ex, state, state_info):
task.complete(state, state_info)
if task.is_completed():
wf_handler.check_workflow_completion(task_ex.workflow_execution)
wf_handler.on_task_complete(task_ex)
def _build_task_from_execution(task_ex, task_spec=None):

View File

@ -22,10 +22,12 @@ from mistral import utils
# TODO(rakhmerov): This method is too abstract, validation rules may vary
# depending on object type (action, wf), it's not clear what it can be
# depending on object type (action, wf), it's not clear what it can be
# applied to.
def validate_input(definition, input, spec=None):
input_param_names = copy.deepcopy(list((input or {}).keys()))
# TODO(rakhmerov): It must not do any manipulations with parameters
# (input_dict)!
def validate_input(definition, input_dict, spec=None):
input_param_names = copy.deepcopy(list((input_dict or {}).keys()))
missing_param_names = []
spec_input = (spec.get_input() if spec else
@ -33,7 +35,8 @@ def validate_input(definition, input, spec=None):
for p_name, p_value in six.iteritems(spec_input):
if p_value is utils.NotDefined and p_name not in input_param_names:
missing_param_names.append(p_name)
missing_param_names.append(str(p_name))
if p_name in input_param_names:
input_param_names.remove(p_name)
@ -55,7 +58,7 @@ def validate_input(definition, input, spec=None):
msg % tuple(msg_props)
)
else:
utils.merge_dicts(input, spec_input, overwrite=False)
utils.merge_dicts(input_dict, spec_input, overwrite=False)
def resolve_workflow_definition(parent_wf_name, parent_wf_spec_name,

View File

@ -12,208 +12,110 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from oslo_config import cfg
from oslo_log import log as logging
import traceback as tb
from mistral.db.v2 import api as db_api
from mistral.db.v2.sqlalchemy import models as db_models
from mistral.engine import rpc
from mistral.engine import workflows
from mistral import exceptions as exc
from mistral.services import scheduler
from mistral import utils
from mistral.utils import wf_trace
from mistral.workbook import parser as spec_parser
from mistral.workflow import base as wf_base
from mistral.workflow import data_flow
from mistral.workflow import states
from mistral.workflow import utils as wf_utils
LOG = logging.getLogger(__name__)
def start_workflow(wf_identifier, wf_input, desc, params):
wf = workflows.Workflow(
db_api.get_workflow_definition(wf_identifier)
)
wf.start(wf_input, desc=desc, params=params)
return wf.wf_ex
def stop_workflow(wf_ex, state, msg=None):
wf = workflows.Workflow(
db_api.get_workflow_definition(wf_ex.workflow_id),
wf_ex=wf_ex
)
# In this case we should not try to handle possible errors. Instead,
# we need to let them pop up since the typical way of failing objects
# doesn't work here. Failing a workflow is the same as stopping it
# with ERROR state.
wf.stop(state, msg)
def fail_workflow(wf_ex, msg=None):
stop_workflow(wf_ex, states.ERROR, msg)
def on_task_complete(task_ex):
wf_ex = task_ex.workflow_execution
check_workflow_completion(wf_ex)
wf = workflows.Workflow(
db_api.get_workflow_definition(wf_ex.workflow_id),
wf_ex=wf_ex
)
def check_workflow_completion(wf_ex):
if states.is_paused_or_completed(wf_ex.state):
return
# Workflow is not completed if there are any incomplete task
# executions that are not in WAITING state. If all incomplete
# tasks are waiting and there are no unhandled errors, then these
# tasks will not reach completion. In this case, mark the
# workflow complete.
incomplete_tasks = wf_utils.find_incomplete_task_executions(wf_ex)
if any(not states.is_waiting(t.state) for t in incomplete_tasks):
return
wf_spec = spec_parser.get_workflow_spec(wf_ex.spec)
wf_ctrl = wf_base.get_controller(wf_ex, wf_spec)
if wf_ctrl.all_errors_handled():
succeed_workflow(
wf_ex,
wf_ctrl.evaluate_workflow_final_context(),
wf_spec
)
else:
state_info = wf_utils.construct_fail_info_message(wf_ctrl, wf_ex)
fail_workflow(wf_ex, state_info)
def stop_workflow(wf_ex, state, message=None):
if state == states.SUCCESS:
wf_ctrl = wf_base.get_controller(wf_ex)
final_context = {}
try:
final_context = wf_ctrl.evaluate_workflow_final_context()
except Exception as e:
LOG.warning(
'Failed to get final context for %s: %s' % (wf_ex, e)
)
wf_spec = spec_parser.get_workflow_spec(wf_ex.spec)
return succeed_workflow(
wf_ex,
final_context,
wf_spec,
message
)
elif state == states.ERROR:
return fail_workflow(wf_ex, message)
return wf_ex
def succeed_workflow(wf_ex, final_context, wf_spec, state_info=None):
# Fail workflow if output is not successfully evaluated.
try:
wf_ex.output = data_flow.evaluate_workflow_output(
wf_spec,
final_context
wf.on_task_complete(task_ex)
except exc.MistralException as e:
msg = (
"Failed to handle task completion [wf_ex=%s, task_ex=%s]: %s\n%s"
% (wf_ex, task_ex, e, tb.format_exc())
)
except Exception as e:
return fail_workflow(wf_ex, e.message)
# Set workflow execution to success until after output is evaluated.
set_workflow_state(wf_ex, states.SUCCESS, state_info)
LOG.error(msg)
if wf_ex.task_execution_id:
_schedule_send_result_to_parent_workflow(wf_ex)
return wf_ex
fail_workflow(wf.wf_ex, msg)
def fail_workflow(wf_ex, state_info):
if states.is_paused_or_completed(wf_ex.state):
return wf_ex
set_workflow_state(wf_ex, states.ERROR, state_info)
# When we set an ERROR state we should safely set output value getting
# w/o exceptions due to field size limitations.
state_info = utils.cut_by_kb(
state_info,
cfg.CONF.engine.execution_field_size_limit_kb
def pause_workflow(wf_ex, msg=None):
wf = workflows.Workflow(
db_api.get_workflow_definition(wf_ex.workflow_id),
wf_ex=wf_ex
)
wf_ex.output = {'result': state_info}
if wf_ex.task_execution_id:
_schedule_send_result_to_parent_workflow(wf_ex)
return wf_ex
wf.set_state(states.PAUSED, msg)
def _schedule_send_result_to_parent_workflow(wf_ex):
scheduler.schedule_call(
None,
'mistral.engine.workflow_handler.send_result_to_parent_workflow',
0,
wf_ex_id=wf_ex.id
def rerun_workflow(wf_ex, task_ex, reset=True, env=None):
if wf_ex.state == states.PAUSED:
return wf_ex.get_clone()
wf = workflows.Workflow(
db_api.get_workflow_definition(wf_ex.workflow_id),
wf_ex=wf_ex
)
def send_result_to_parent_workflow(wf_ex_id):
wf_ex = db_api.get_workflow_execution(wf_ex_id)
if wf_ex.state == states.SUCCESS:
rpc.get_engine_client().on_action_complete(
wf_ex.id,
wf_utils.Result(data=wf_ex.output)
)
elif wf_ex.state == states.ERROR:
err_msg = (
wf_ex.state_info or
'Failed subworkflow [execution_id=%s]' % wf_ex.id
)
rpc.get_engine_client().on_action_complete(
wf_ex.id,
wf_utils.Result(error=err_msg)
)
wf.rerun(task_ex, reset=reset, env=env)
# TODO(rakhmerov): Should not be public, should be encapsulated inside Workflow
# abstraction.
def set_workflow_state(wf_ex, state, state_info=None, set_upstream=False):
cur_state = wf_ex.state
def resume_workflow(wf_ex, env=None):
if not states.is_paused_or_idle(wf_ex.state):
return wf_ex.get_clone()
if states.is_valid_transition(cur_state, state):
wf_ex.state = state
wf_ex.state_info = state_info
wf = workflows.Workflow(
db_api.get_workflow_definition(wf_ex.workflow_id),
wf_ex=wf_ex
)
wf_trace.info(
wf_ex,
"Execution of workflow '%s' [%s -> %s]"
% (wf_ex.workflow_name, cur_state, state)
)
wf.resume(env=env)
def set_workflow_state(wf_ex, state, msg=None):
if states.is_completed(state):
stop_workflow(wf_ex, state, msg)
elif states.is_paused(state):
pause_workflow(wf_ex, msg)
else:
msg = ("Can't change workflow execution state from %s to %s. "
"[workflow=%s, execution_id=%s]" %
(cur_state, state, wf_ex.name, wf_ex.id))
raise exc.WorkflowException(msg)
# Workflow result should be accepted by parent workflows (if any)
# only if it completed successfully or failed.
wf_ex.accepted = wf_ex.state in (states.SUCCESS, states.ERROR)
# If specified, then recursively set the state of the parent workflow
# executions to the same state. Only changing state to RUNNING is
# supported.
# TODO(rakhmerov): I don't like this hardcoded special case. It's
# used only to continue the workflow (rerun) but at the first glance
# seems like a generic behavior. Need to handle it differently.
if set_upstream and state == states.RUNNING and wf_ex.task_execution_id:
task_ex = db_api.get_task_execution(wf_ex.task_execution_id)
parent_wf_ex = lock_workflow_execution(task_ex.workflow_execution_id)
set_workflow_state(
parent_wf_ex,
state,
state_info=state_info,
set_upstream=set_upstream
raise exc.MistralError(
'Invalid workflow state [wf_ex=%s, state=%s]' % (wf_ex, state)
)
# TODO(rakhmerov): How do we need to set task state properly?
# It doesn't seem right to intervene into the parent workflow
# internals. We just need to communicate changes back to parent
# worklfow and it should do what's needed itself.
task_ex.state = state
task_ex.state_info = None
task_ex.processed = False
def lock_workflow_execution(wf_ex_id):
# Locks a workflow execution using the db_api.acquire_lock function.

371
mistral/engine/workflows.py Normal file
View File

@ -0,0 +1,371 @@
# Copyright 2016 - Nokia Networks.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import abc
import copy
from oslo_config import cfg
from oslo_log import log as logging
import six
from mistral.db.v2 import api as db_api
from mistral.db.v2.sqlalchemy import models as db_models
from mistral.engine import dispatcher
from mistral.engine import rpc
from mistral.engine import utils as eng_utils
from mistral import exceptions as exc
from mistral.services import scheduler
from mistral.services import workflows as wf_service
from mistral import utils
from mistral.utils import wf_trace
from mistral.workbook import parser as spec_parser
from mistral.workflow import base as wf_base
from mistral.workflow import commands
from mistral.workflow import data_flow
from mistral.workflow import states
from mistral.workflow import utils as wf_utils
LOG = logging.getLogger(__name__)
_SEND_RESULT_TO_PARENT_WORKFLOW_PATH = (
'mistral.engine.workflows._send_result_to_parent_workflow'
)
@six.add_metaclass(abc.ABCMeta)
class Workflow(object):
"""Workflow.
Represents a workflow and defines interface that can be used by
Mistral engine or its components in order to manipulate with workflows.
"""
def __init__(self, wf_def, wf_ex=None):
self.wf_def = wf_def
self.wf_ex = wf_ex
self.wf_spec = spec_parser.get_workflow_spec(wf_def.spec)
def start(self, input_dict, desc='', params=None):
"""Start workflow.
:param input_dict: Workflow input.
:param desc: Workflow execution description.
:param params: Workflow type specific parameters.
"""
wf_trace.info(self.wf_ex, "Starting workflow: %s" % self.wf_def)
# TODO(rakhmerov): This call implicitly changes input_dict! Fix it!
# After fix we need to move validation after adding risky fields.
eng_utils.validate_input(self.wf_def, input_dict, self.wf_spec)
self._create_execution(input_dict, desc, params)
self.set_state(states.RUNNING)
wf_ctrl = wf_base.get_controller(self.wf_ex, self.wf_spec)
cmds = wf_ctrl.continue_workflow()
dispatcher.dispatch_workflow_commands(self.wf_ex, cmds)
def stop(self, state, msg=None):
"""Stop workflow.
:param state: New workflow state.
:param msg: Additional explaining message.
"""
if state == states.SUCCESS:
wf_ctrl = wf_base.get_controller(self.wf_ex)
final_context = {}
try:
final_context = wf_ctrl.evaluate_workflow_final_context()
except Exception as e:
LOG.warning(
'Failed to get final context for %s: %s' % (self.wf_ex, e)
)
return self._succeed_workflow(final_context, msg)
elif state == states.ERROR:
return self._fail_workflow(msg)
def on_task_complete(self, task_ex):
"""Handle task completion event.
:param task_ex: Task execution that's completed.
"""
self._check_and_complete()
def resume(self, env=None):
"""Resume workflow.
:param env: Environment.
"""
wf_service.update_workflow_execution_env(self.wf_ex, env)
self.set_state(states.RUNNING, recursive=True)
self._continue_workflow(env=env)
def rerun(self, task_ex, reset=True, env=None):
"""Rerun workflow from the given task.
:param task_ex: Task execution that the workflow needs to rerun from.
:param reset: If True, reset task state including deleting its action
executions.
:param env: Environment.
"""
wf_service.update_workflow_execution_env(self.wf_ex, env)
self.set_state(states.RUNNING, recursive=True)
self._continue_workflow(task_ex, reset, env=env)
def lock(self):
return db_api.acquire_lock(db_models.WorkflowExecution, self.wf_ex.id)
def _create_execution(self, input_dict, desc, params):
self.wf_ex = db_api.create_workflow_execution({
'name': self.wf_def.name,
'description': desc,
'workflow_name': self.wf_def.name,
'workflow_id': self.wf_def.id,
'spec': self.wf_spec.to_dict(),
'state': states.IDLE,
'output': {},
'task_execution_id': params.get('task_execution_id'),
'runtime_context': {
'index': params.get('index', 0)
},
})
self.wf_ex.input = input_dict or {}
self.wf_ex.context = copy.deepcopy(input_dict) or {}
env = _get_environment(params)
if env:
params['env'] = env
self.wf_ex.params = params
data_flow.add_openstack_data_to_context(self.wf_ex)
data_flow.add_execution_to_context(self.wf_ex)
data_flow.add_environment_to_context(self.wf_ex)
data_flow.add_workflow_variables_to_context(self.wf_ex, self.wf_spec)
def set_state(self, state, state_info=None, recursive=False):
cur_state = self.wf_ex.state
if states.is_valid_transition(cur_state, state):
self.wf_ex.state = state
self.wf_ex.state_info = state_info
wf_trace.info(
self.wf_ex,
"Execution of workflow '%s' [%s -> %s]"
% (self.wf_ex.workflow_name, cur_state, state)
)
else:
msg = ("Can't change workflow execution state from %s to %s. "
"[workflow=%s, execution_id=%s]" %
(cur_state, state, self.wf_ex.name, self.wf_ex.id))
raise exc.WorkflowException(msg)
# Workflow result should be accepted by parent workflows (if any)
# only if it completed successfully or failed.
self.wf_ex.accepted = state in (states.SUCCESS, states.ERROR)
if recursive and self.wf_ex.task_execution_id:
parent_task_ex = db_api.get_task_execution(
self.wf_ex.task_execution_id
)
parent_wf = Workflow(
db_api.get_workflow_definition(parent_task_ex.workflow_id),
parent_task_ex.workflow_execution
)
parent_wf.lock()
parent_wf.set_state(state, recursive=recursive)
# TODO(rakhmerov): It'd be better to use instance of Task here.
parent_task_ex.state = state
parent_task_ex.state_info = None
parent_task_ex.processed = False
def _continue_workflow(self, task_ex=None, reset=True, env=None):
wf_ctrl = wf_base.get_controller(self.wf_ex)
# Calculate commands to process next.
cmds = wf_ctrl.continue_workflow(task_ex=task_ex, reset=reset, env=env)
# When resuming a workflow we need to ignore all 'pause'
# commands because workflow controller takes tasks that
# completed within the period when the workflow was paused.
cmds = list(
filter(lambda c: not isinstance(c, commands.PauseWorkflow), cmds)
)
# Since there's no explicit task causing the operation
# we need to mark all not processed tasks as processed
# because workflow controller takes only completed tasks
# with flag 'processed' equal to False.
for t_ex in self.wf_ex.task_executions:
if states.is_completed(t_ex.state) and not t_ex.processed:
t_ex.processed = True
dispatcher.dispatch_workflow_commands(self.wf_ex, cmds)
if not cmds:
self._check_and_complete()
def _check_and_complete(self):
if states.is_paused_or_completed(self.wf_ex.state):
return
# Workflow is not completed if there are any incomplete task
# executions that are not in WAITING state. If all incomplete
# tasks are waiting and there are unhandled errors, then these
# tasks will not reach completion. In this case, mark the
# workflow complete.
incomplete_tasks = wf_utils.find_incomplete_task_executions(self.wf_ex)
if any(not states.is_waiting(t.state) for t in incomplete_tasks):
return
wf_ctrl = wf_base.get_controller(self.wf_ex, self.wf_spec)
if wf_ctrl.all_errors_handled():
self._succeed_workflow(wf_ctrl.evaluate_workflow_final_context())
else:
self._fail_workflow(_build_fail_info_message(wf_ctrl, self.wf_ex))
def _succeed_workflow(self, final_context, msg=None):
self.wf_ex.output = data_flow.evaluate_workflow_output(
self.wf_spec,
final_context
)
# Set workflow execution to success until after output is evaluated.
self.set_state(states.SUCCESS, msg)
if self.wf_ex.task_execution_id:
self._schedule_send_result_to_parent_workflow()
def _fail_workflow(self, msg):
if states.is_paused_or_completed(self.wf_ex.state):
return
self.set_state(states.ERROR, state_info=msg)
# When we set an ERROR state we should safely set output value getting
# w/o exceptions due to field size limitations.
msg = utils.cut_by_kb(
msg,
cfg.CONF.engine.execution_field_size_limit_kb
)
self.wf_ex.output = {'result': msg}
if self.wf_ex.task_execution_id:
self._schedule_send_result_to_parent_workflow()
def _schedule_send_result_to_parent_workflow(self):
scheduler.schedule_call(
None,
_SEND_RESULT_TO_PARENT_WORKFLOW_PATH,
0,
wf_ex_id=self.wf_ex.id
)
def _get_environment(params):
env = params.get('env', {})
if isinstance(env, dict):
return env
if isinstance(env, six.string_types):
env_db = db_api.load_environment(env)
if not env_db:
raise exc.InputException(
'Environment is not found: %s' % env
)
return env_db.variables
raise exc.InputException(
'Unexpected value type for environment [env=%s, type=%s]'
% (env, type(env))
)
def _send_result_to_parent_workflow(wf_ex_id):
wf_ex = db_api.get_workflow_execution(wf_ex_id)
if wf_ex.state == states.SUCCESS:
rpc.get_engine_client().on_action_complete(
wf_ex.id,
wf_utils.Result(data=wf_ex.output)
)
elif wf_ex.state == states.ERROR:
err_msg = (
wf_ex.state_info or
'Failed subworkflow [execution_id=%s]' % wf_ex.id
)
rpc.get_engine_client().on_action_complete(
wf_ex.id,
wf_utils.Result(error=err_msg)
)
def _build_fail_info_message(wf_ctrl, wf_ex):
# Try to find where error is exactly.
failed_tasks = sorted(
filter(
lambda t: not wf_ctrl.is_error_handled_for(t),
wf_utils.find_error_task_executions(wf_ex)
),
key=lambda t: t.name
)
msg = ('Failure caused by error in tasks: %s\n' %
', '.join([t.name for t in failed_tasks]))
for t in failed_tasks:
msg += '\n %s [task_ex_id=%s] -> %s\n' % (t.name, t.id, t.state_info)
for i, ex in enumerate(t.executions):
if ex.state == states.ERROR:
output = (ex.output or dict()).get('result', 'Unknown')
msg += (
' [action_ex_id=%s, idx=%s]: %s\n' % (
ex.id,
i,
str(output)
)
)
return msg

View File

@ -14,18 +14,39 @@
# limitations under the License.
# TODO(rakhmerov): Can we make one parent for errors and exceptions?
class MistralError(Exception):
"""Mistral specific error.
Reserved for situations that can't automatically handled. When it occurs
Reserved for situations that can't be automatically handled. When it occurs
it signals that there is a major environmental problem like invalid startup
configuration or implementation problem (e.g. some code doesn't take care
of certain corner cases). From architectural perspective it's pointless to
try to handle this type of problems except doing some finalization work
like transaction rollback, deleting temporary files etc.
"""
message = "An unknown error occurred"
http_code = 500
def __init__(self, message=None):
super(MistralError, self).__init__(message)
if message is not None:
self.message = message
super(MistralError, self).__init__(
'%d: %s' % (self.http_code, self.message))
@property
def code(self):
"""This is here for webob to read.
https://github.com/Pylons/webob/blob/master/webob/exc.py
"""
return self.http_code
def __str__(self):
return self.message
class MistralException(Exception):
@ -46,6 +67,13 @@ class MistralException(Exception):
message = "An unknown exception occurred"
http_code = 500
def __init__(self, message=None):
if message is not None:
self.message = message
super(MistralException, self).__init__(
'%d: %s' % (self.http_code, self.message))
@property
def code(self):
"""This is here for webob to read.
@ -57,30 +85,23 @@ class MistralException(Exception):
def __str__(self):
return self.message
def __init__(self, message=None):
if message is not None:
self.message = message
super(MistralException, self).__init__(
'%d: %s' % (self.http_code, self.message))
# Database errors.
# Database exceptions.
class DBException(MistralException):
class DBError(MistralError):
http_code = 400
class DBDuplicateEntryException(DBException):
class DBDuplicateEntryError(DBError):
http_code = 409
message = "Database object already exists"
class DBQueryEntryException(DBException):
class DBQueryEntryError(DBError):
http_code = 400
class DBEntityNotFoundException(DBException):
class DBEntityNotFoundError(DBError):
http_code = 404
message = "Object not found"
@ -101,7 +122,7 @@ class InvalidModelException(DSLParsingException):
message = "Wrong entity definition"
# Various common exceptions.
# Various common exceptions and errors.
class YaqlEvaluationException(MistralException):
http_code = 400

View File

@ -69,7 +69,7 @@ def register_action_class(name, action_class_str, attributes,
LOG.debug("Registering action in DB: %s" % name)
db_api.create_action_definition(values)
except exc.DBDuplicateEntryException:
except exc.DBDuplicateEntryError:
LOG.debug("Action %s already exists in DB." % name)

View File

@ -1,95 +0,0 @@
# Copyright 2016 - StackStorm, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import copy
import six
from oslo_log import log as logging
from mistral.db.v2 import api as db_api
from mistral.engine import utils as eng_utils
from mistral.utils import wf_trace
from mistral.workbook import parser as spec_parser
from mistral.workflow import data_flow
from mistral.workflow import states
LOG = logging.getLogger(__name__)
def canonize_workflow_params(params):
# Resolve environment parameter.
env = params.get('env', {})
if not isinstance(env, dict) and not isinstance(env, six.string_types):
raise ValueError(
'Unexpected type for environment [environment=%s]' % str(env)
)
if isinstance(env, six.string_types):
env_db = db_api.get_environment(env)
env = env_db.variables
params['env'] = env
return params
def _create_workflow_execution(wf_def, wf_spec, wf_input, desc, params):
wf_ex = db_api.create_workflow_execution({
'name': wf_def.name,
'description': desc,
'workflow_name': wf_def.name,
'workflow_id': wf_def.id,
'spec': wf_spec.to_dict(),
'params': params or {},
'state': states.IDLE,
'input': wf_input or {},
'output': {},
'context': copy.deepcopy(wf_input) or {},
'task_execution_id': params.get('task_execution_id'),
'runtime_context': {
'index': params.get('index', 0)
},
})
data_flow.add_openstack_data_to_context(wf_ex)
data_flow.add_execution_to_context(wf_ex)
data_flow.add_environment_to_context(wf_ex)
data_flow.add_workflow_variables_to_context(wf_ex, wf_spec)
return wf_ex
def create_workflow_execution(wf_identifier, wf_input, description, params,
wf_spec=None):
params = canonize_workflow_params(params)
wf_def = db_api.get_workflow_definition(wf_identifier)
if wf_spec is None:
wf_spec = spec_parser.get_workflow_spec(wf_def.spec)
eng_utils.validate_input(wf_def, wf_input, wf_spec)
wf_ex = _create_workflow_execution(
wf_def,
wf_spec,
wf_input,
description,
params
)
wf_trace.info(wf_ex, "Starting workflow: '%s'" % wf_identifier)
return wf_ex, wf_spec

View File

@ -102,7 +102,7 @@ def advance_cron_trigger(t):
'next_execution_time': t.next_execution_time
}
)
except exc.DBEntityNotFoundException as e:
except exc.DBEntityNotFoundError as e:
# Cron trigger was probably already deleted by a different process.
LOG.debug(
"Cron trigger named '%s' does not exist anymore: %s",

View File

@ -111,7 +111,7 @@ def update_workflow_execution_env(wf_ex, env):
if wf_ex.state not in [states.IDLE, states.PAUSED, states.ERROR]:
raise exc.NotAllowedException(
'Updating env to workflow execution is only permitted if '
'it is in idle, paused, or re-runnable state.'
'it is in IDLE, PAUSED, or ERROR state.'
)
wf_ex.params['env'] = utils.merge_dicts(wf_ex.params['env'], env)

View File

@ -142,7 +142,7 @@ MOCK_ACTION_NOT_COMPLETE = mock.MagicMock(
MOCK_AD_HOC_ACTION = mock.MagicMock(return_value=AD_HOC_ACTION_EX_DB)
MOCK_ACTIONS = mock.MagicMock(return_value=[ACTION_EX_DB])
MOCK_EMPTY = mock.MagicMock(return_value=[])
MOCK_NOT_FOUND = mock.MagicMock(side_effect=exc.DBEntityNotFoundException())
MOCK_NOT_FOUND = mock.MagicMock(side_effect=exc.DBEntityNotFoundError())
MOCK_DELETE = mock.MagicMock(return_value=None)

View File

@ -119,8 +119,8 @@ MOCK_ACTIONS = mock.MagicMock(return_value=[ACTION_DB])
MOCK_UPDATED_ACTION = mock.MagicMock(return_value=UPDATED_ACTION_DB)
MOCK_DELETE = mock.MagicMock(return_value=None)
MOCK_EMPTY = mock.MagicMock(return_value=[])
MOCK_NOT_FOUND = mock.MagicMock(side_effect=exc.DBEntityNotFoundException())
MOCK_DUPLICATE = mock.MagicMock(side_effect=exc.DBDuplicateEntryException())
MOCK_NOT_FOUND = mock.MagicMock(side_effect=exc.DBEntityNotFoundError())
MOCK_DUPLICATE = mock.MagicMock(side_effect=exc.DBDuplicateEntryError())
class TestActionsController(base.APITest):

View File

@ -63,8 +63,8 @@ MOCK_TRIGGER = mock.MagicMock(return_value=TRIGGER_DB)
MOCK_TRIGGERS = mock.MagicMock(return_value=[TRIGGER_DB])
MOCK_DELETE = mock.MagicMock(return_value=None)
MOCK_EMPTY = mock.MagicMock(return_value=[])
MOCK_NOT_FOUND = mock.MagicMock(side_effect=exc.DBEntityNotFoundException())
MOCK_DUPLICATE = mock.MagicMock(side_effect=exc.DBDuplicateEntryException())
MOCK_NOT_FOUND = mock.MagicMock(side_effect=exc.DBEntityNotFoundError())
MOCK_DUPLICATE = mock.MagicMock(side_effect=exc.DBDuplicateEntryError())
class TestCronTriggerController(base.APITest):

View File

@ -107,8 +107,8 @@ MOCK_ENVIRONMENT = mock.MagicMock(return_value=ENVIRONMENT_DB)
MOCK_ENVIRONMENTS = mock.MagicMock(return_value=[ENVIRONMENT_DB])
MOCK_UPDATED_ENVIRONMENT = mock.MagicMock(return_value=UPDATED_ENVIRONMENT_DB)
MOCK_EMPTY = mock.MagicMock(return_value=[])
MOCK_NOT_FOUND = mock.MagicMock(side_effect=exc.DBEntityNotFoundException())
MOCK_DUPLICATE = mock.MagicMock(side_effect=exc.DBDuplicateEntryException())
MOCK_NOT_FOUND = mock.MagicMock(side_effect=exc.DBEntityNotFoundError())
MOCK_DUPLICATE = mock.MagicMock(side_effect=exc.DBDuplicateEntryError())
MOCK_DELETE = mock.MagicMock(return_value=None)

View File

@ -111,7 +111,7 @@ MOCK_WF_EXECUTIONS = mock.MagicMock(return_value=[WF_EX])
MOCK_UPDATED_WF_EX = mock.MagicMock(return_value=UPDATED_WF_EX)
MOCK_DELETE = mock.MagicMock(return_value=None)
MOCK_EMPTY = mock.MagicMock(return_value=[])
MOCK_NOT_FOUND = mock.MagicMock(side_effect=exc.DBEntityNotFoundException())
MOCK_NOT_FOUND = mock.MagicMock(side_effect=exc.DBEntityNotFoundError())
MOCK_ACTION_EXC = mock.MagicMock(side_effect=exc.ActionException())

View File

@ -127,7 +127,7 @@ MOCK_WF_EX = mock.MagicMock(return_value=WF_EX)
MOCK_TASK = mock.MagicMock(return_value=TASK_EX)
MOCK_TASKS = mock.MagicMock(return_value=[TASK_EX])
MOCK_EMPTY = mock.MagicMock(return_value=[])
MOCK_NOT_FOUND = mock.MagicMock(side_effect=exc.DBEntityNotFoundException())
MOCK_NOT_FOUND = mock.MagicMock(side_effect=exc.DBEntityNotFoundError())
MOCK_ERROR_TASK = mock.MagicMock(return_value=ERROR_TASK_EX)
MOCK_ERROR_ITEMS_TASK = mock.MagicMock(return_value=ERROR_ITEMS_TASK_EX)

View File

@ -97,8 +97,8 @@ MOCK_WORKBOOKS = mock.MagicMock(return_value=[WORKBOOK_DB])
MOCK_UPDATED_WORKBOOK = mock.MagicMock(return_value=UPDATED_WORKBOOK_DB)
MOCK_DELETE = mock.MagicMock(return_value=None)
MOCK_EMPTY = mock.MagicMock(return_value=[])
MOCK_NOT_FOUND = mock.MagicMock(side_effect=exc.DBEntityNotFoundException())
MOCK_DUPLICATE = mock.MagicMock(side_effect=exc.DBDuplicateEntryException())
MOCK_NOT_FOUND = mock.MagicMock(side_effect=exc.DBEntityNotFoundError())
MOCK_DUPLICATE = mock.MagicMock(side_effect=exc.DBDuplicateEntryError())
class TestWorkbooksController(base.APITest):

View File

@ -160,8 +160,8 @@ MOCK_WFS = mock.MagicMock(return_value=[WF_DB])
MOCK_UPDATED_WF = mock.MagicMock(return_value=UPDATED_WF_DB)
MOCK_DELETE = mock.MagicMock(return_value=None)
MOCK_EMPTY = mock.MagicMock(return_value=[])
MOCK_NOT_FOUND = mock.MagicMock(side_effect=exc.DBEntityNotFoundException())
MOCK_DUPLICATE = mock.MagicMock(side_effect=exc.DBDuplicateEntryException())
MOCK_NOT_FOUND = mock.MagicMock(side_effect=exc.DBEntityNotFoundError())
MOCK_DUPLICATE = mock.MagicMock(side_effect=exc.DBDuplicateEntryError())
class TestWorkflowsController(base.APITest):

View File

@ -83,7 +83,7 @@ class WorkbookTest(SQLAlchemyTest):
db_api.create_workbook(WORKBOOKS[0])
self.assertRaises(
exc.DBDuplicateEntryException,
exc.DBDuplicateEntryError,
db_api.create_workbook,
WORKBOOKS[0]
)
@ -153,7 +153,7 @@ class WorkbookTest(SQLAlchemyTest):
db_api.delete_workbook(created.name)
self.assertRaises(
exc.DBEntityNotFoundException,
exc.DBEntityNotFoundError,
db_api.get_workbook,
created.name
)
@ -284,7 +284,7 @@ class WorkflowDefinitionTest(SQLAlchemyTest):
db_api.create_workflow_definition(WF_DEFINITIONS[0])
self.assertRaises(
exc.DBDuplicateEntryException,
exc.DBDuplicateEntryError,
db_api.create_workflow_definition,
WF_DEFINITIONS[0]
)
@ -424,7 +424,7 @@ class WorkflowDefinitionTest(SQLAlchemyTest):
db_api.delete_workflow_definition(identifier)
self.assertRaises(
exc.DBEntityNotFoundException,
exc.DBEntityNotFoundError,
db_api.get_workflow_definition,
identifier
)
@ -536,7 +536,7 @@ class ActionDefinitionTest(SQLAlchemyTest):
db_api.create_action_definition(ACTION_DEFINITIONS[0])
self.assertRaises(
exc.DBDuplicateEntryException,
exc.DBDuplicateEntryError,
db_api.create_action_definition,
ACTION_DEFINITIONS[0]
)
@ -606,7 +606,7 @@ class ActionDefinitionTest(SQLAlchemyTest):
db_api.delete_action_definition(created.name)
self.assertRaises(
exc.DBEntityNotFoundException,
exc.DBEntityNotFoundError,
db_api.get_action_definition,
created.name
)
@ -726,7 +726,7 @@ class ActionExecutionTest(SQLAlchemyTest):
db_api.delete_action_execution(created.id)
self.assertRaises(
exc.DBEntityNotFoundException,
exc.DBEntityNotFoundError,
db_api.get_action_execution,
created.id
)
@ -738,7 +738,7 @@ class ActionExecutionTest(SQLAlchemyTest):
auth_context.set_ctx(test_base.get_context(default=False))
self.assertRaises(
exc.DBEntityNotFoundException,
exc.DBEntityNotFoundError,
db_api.delete_action_execution,
created.id
)
@ -880,7 +880,7 @@ class WorkflowExecutionTest(SQLAlchemyTest):
db_api.delete_workflow_execution(created.id)
self.assertRaises(
exc.DBEntityNotFoundException,
exc.DBEntityNotFoundError,
db_api.get_workflow_execution,
created.id
)
@ -1130,7 +1130,7 @@ class TaskExecutionTest(SQLAlchemyTest):
db_api.delete_task_execution(created.id)
self.assertRaises(
exc.DBEntityNotFoundException,
exc.DBEntityNotFoundError,
db_api.get_task_execution,
created.id
)
@ -1203,7 +1203,7 @@ class CronTriggerTest(SQLAlchemyTest):
db_api.create_cron_trigger(CRON_TRIGGERS[0])
self.assertRaises(
exc.DBDuplicateEntryException,
exc.DBDuplicateEntryError,
db_api.create_cron_trigger,
CRON_TRIGGERS[0]
)
@ -1302,7 +1302,7 @@ class CronTriggerTest(SQLAlchemyTest):
self.assertEqual(1, rowcount)
self.assertRaises(
exc.DBEntityNotFoundException,
exc.DBEntityNotFoundError,
db_api.get_cron_trigger,
created.name
)
@ -1365,7 +1365,7 @@ class EnvironmentTest(SQLAlchemyTest):
db_api.create_environment(ENVIRONMENTS[0])
self.assertRaises(
exc.DBDuplicateEntryException,
exc.DBDuplicateEntryError,
db_api.create_environment,
ENVIRONMENTS[0]
)
@ -1432,7 +1432,7 @@ class EnvironmentTest(SQLAlchemyTest):
db_api.delete_environment(created.name)
self.assertRaises(
exc.DBEntityNotFoundException,
exc.DBEntityNotFoundError,
db_api.get_environment,
created.name
)
@ -1462,7 +1462,7 @@ class TXTest(SQLAlchemyTest):
self.assertFalse(self.is_db_session_open())
self.assertRaises(
exc.DBEntityNotFoundException,
exc.DBEntityNotFoundError,
db_api.get_workbook,
created['id']
)
@ -1525,12 +1525,12 @@ class TXTest(SQLAlchemyTest):
self.assertFalse(self.is_db_session_open())
self.assertRaises(
exc.DBEntityNotFoundException,
exc.DBEntityNotFoundError,
db_api.get_workflow_execution,
created.id
)
self.assertRaises(
exc.DBEntityNotFoundException,
exc.DBEntityNotFoundError,
db_api.get_workbook,
created_wb.name
)
@ -1548,12 +1548,12 @@ class TXTest(SQLAlchemyTest):
db_api.create_workbook(WORKBOOKS[0])
except exc.DBDuplicateEntryException:
except exc.DBDuplicateEntryError:
pass
self.assertFalse(self.is_db_session_open())
self.assertRaises(
exc.DBEntityNotFoundException,
exc.DBEntityNotFoundError,
db_api.get_workbook,
created.name
)
@ -1633,7 +1633,7 @@ class ResourceMemberTest(SQLAlchemyTest):
# Tenant A can not see membership of resource shared to Tenant B.
self.assertRaises(
exc.DBEntityNotFoundException,
exc.DBEntityNotFoundError,
db_api.get_resource_member,
'123e4567-e89b-12d3-a456-426655440000',
'workflow',
@ -1644,7 +1644,7 @@ class ResourceMemberTest(SQLAlchemyTest):
db_api.create_resource_member(RESOURCE_MEMBERS[0])
self.assertRaises(
exc.DBDuplicateEntryException,
exc.DBDuplicateEntryError,
db_api.create_resource_member,
RESOURCE_MEMBERS[0]
)
@ -1695,7 +1695,7 @@ class ResourceMemberTest(SQLAlchemyTest):
created = db_api.create_resource_member(RESOURCE_MEMBERS[0])
self.assertRaises(
exc.DBEntityNotFoundException,
exc.DBEntityNotFoundError,
db_api.update_resource_member,
created.resource_id,
'workflow',
@ -1726,7 +1726,7 @@ class ResourceMemberTest(SQLAlchemyTest):
auth_context.set_ctx(user_context)
self.assertRaises(
exc.DBEntityNotFoundException,
exc.DBEntityNotFoundError,
db_api.delete_resource_member,
created.resource_id,
'workflow',
@ -1743,7 +1743,7 @@ class ResourceMemberTest(SQLAlchemyTest):
)
self.assertRaises(
exc.DBEntityNotFoundException,
exc.DBEntityNotFoundError,
db_api.delete_resource_member,
created.resource_id,
'workflow',
@ -1752,7 +1752,7 @@ class ResourceMemberTest(SQLAlchemyTest):
def test_delete_nonexistent_resource_member(self):
self.assertRaises(
exc.DBEntityNotFoundException,
exc.DBEntityNotFoundError,
db_api.delete_resource_member,
'nonexitent_resource',
'workflow',
@ -1768,7 +1768,7 @@ class WorkflowSharingTest(SQLAlchemyTest):
auth_context.set_ctx(user_context)
self.assertRaises(
exc.DBEntityNotFoundException,
exc.DBEntityNotFoundError,
db_api.get_workflow_definition,
wf.id
)
@ -1836,7 +1836,7 @@ class WorkflowSharingTest(SQLAlchemyTest):
auth_context.set_ctx(user_context)
self.assertRaises(
exc.DBEntityNotFoundException,
exc.DBEntityNotFoundError,
db_api.get_workflow_definition,
wf.id
)
@ -1872,7 +1872,7 @@ class WorkflowSharingTest(SQLAlchemyTest):
auth_context.set_ctx(test_base.get_context())
self.assertRaises(
exc.DBException,
exc.DBError,
db_api.delete_workflow_definition,
wf.id
)

View File

@ -89,7 +89,7 @@ ENVIRONMENT_DB = models.Environment(
)
MOCK_ENVIRONMENT = mock.MagicMock(return_value=ENVIRONMENT_DB)
MOCK_NOT_FOUND = mock.MagicMock(side_effect=exc.DBEntityNotFoundException())
MOCK_NOT_FOUND = mock.MagicMock(side_effect=exc.DBEntityNotFoundError())
class DefaultEngineTest(base.DbTestCase):
@ -210,7 +210,7 @@ class DefaultEngineTest(base.DbTestCase):
self.assertDictEqual(wf_ex.params.get('env', {}), env)
@mock.patch.object(db_api, "get_environment", MOCK_ENVIRONMENT)
@mock.patch.object(db_api, "load_environment", MOCK_ENVIRONMENT)
def test_start_workflow_with_saved_env(self):
wf_input = {
'param1': '<% env().key1 %>',
@ -223,7 +223,8 @@ class DefaultEngineTest(base.DbTestCase):
'wb.wf',
wf_input,
env='test',
task_name='task2')
task_name='task2'
)
self.assertIsNotNone(wf_ex)
@ -233,23 +234,40 @@ class DefaultEngineTest(base.DbTestCase):
@mock.patch.object(db_api, "get_environment", MOCK_NOT_FOUND)
def test_start_workflow_env_not_found(self):
self.assertRaises(exc.DBEntityNotFoundException,
self.engine.start_workflow,
'wb.wf',
{'param1': '<% env().key1 %>'},
env='foo',
task_name='task2')
e = self.assertRaises(
exc.InputException,
self.engine.start_workflow,
'wb.wf',
{
'param1': '<% env().key1 %>',
'param2': 'some value'
},
env='foo',
task_name='task2'
)
self.assertEqual("Environment is not found: foo", e.message)
def test_start_workflow_with_env_type_error(self):
self.assertRaises(ValueError,
self.engine.start_workflow,
'wb.wf',
{'param1': '<% env().key1 %>'},
env=True,
task_name='task2')
e = self.assertRaises(
exc.InputException,
self.engine.start_workflow,
'wb.wf',
{
'param1': '<% env().key1 %>',
'param2': 'some value'
},
env=True,
task_name='task2'
)
self.assertIn(
'Unexpected value type for environment',
e.message
)
def test_start_workflow_missing_parameters(self):
self.assertRaises(
e = self.assertRaises(
exc.InputException,
self.engine.start_workflow,
'wb.wf',
@ -257,15 +275,25 @@ class DefaultEngineTest(base.DbTestCase):
task_name='task2'
)
self.assertIn("Invalid input", e.message)
self.assertIn("missing=['param2']", e.message)
def test_start_workflow_unexpected_parameters(self):
self.assertRaises(
e = self.assertRaises(
exc.InputException,
self.engine.start_workflow,
'wb.wf',
{'param1': 'Hey', 'param2': 'Hi', 'unexpected_param': 'val'},
{
'param1': 'Hey',
'param2': 'Hi',
'unexpected_param': 'val'
},
task_name='task2'
)
self.assertIn("Invalid input", e.message)
self.assertIn("unexpected=['unexpected_param']", e.message)
def test_on_action_complete(self):
wf_input = {'param1': 'Hey', 'param2': 'Hi'}

View File

@ -241,7 +241,7 @@ class DirectWorkflowRerunTest(base.EngineTestCase):
self.assertIsNotNone(task_2_ex.state_info)
# Resume workflow and re-run failed task.
self.engine.rerun_workflow(wf_ex.id, task_2_ex.id)
self.engine.rerun_workflow(task_2_ex.id)
wf_ex = db_api.get_workflow_execution(wf_ex.id)
@ -347,7 +347,7 @@ class DirectWorkflowRerunTest(base.EngineTestCase):
}
# Resume workflow and re-run failed task.
self.engine.rerun_workflow(wf_ex.id, task_21_ex.id, env=updated_env)
self.engine.rerun_workflow(task_21_ex.id, env=updated_env)
wf_ex = db_api.get_workflow_execution(wf_ex.id)
@ -482,7 +482,6 @@ class DirectWorkflowRerunTest(base.EngineTestCase):
e = self.assertRaises(
exc.MistralError,
self.engine.rerun_workflow,
wf_ex.id,
task_1_ex.id
)
@ -531,7 +530,7 @@ class DirectWorkflowRerunTest(base.EngineTestCase):
self.assertEqual(3, len(task_1_action_exs))
# Resume workflow and re-run failed task.
self.engine.rerun_workflow(wf_ex.id, task_1_ex.id, reset=False)
self.engine.rerun_workflow(task_1_ex.id, reset=False)
wf_ex = db_api.get_workflow_execution(wf_ex.id)
@ -616,7 +615,7 @@ class DirectWorkflowRerunTest(base.EngineTestCase):
self.assertEqual(4, len(task_1_action_exs))
# Resume workflow and re-run failed task.
self.engine.rerun_workflow(wf_ex.id, task_1_ex.id, reset=False)
self.engine.rerun_workflow(task_1_ex.id, reset=False)
wf_ex = db_api.get_workflow_execution(wf_ex.id)
@ -705,7 +704,6 @@ class DirectWorkflowRerunTest(base.EngineTestCase):
# Resume workflow and re-run failed task.
self.engine.rerun_workflow(
wf_ex.id,
task_1_ex.id,
reset=False,
env=updated_env
@ -798,7 +796,7 @@ class DirectWorkflowRerunTest(base.EngineTestCase):
self.assertIsNotNone(task_3_ex.state_info)
# Resume workflow and re-run failed task.
self.engine.rerun_workflow(wf_ex.id, task_3_ex.id)
self.engine.rerun_workflow(task_3_ex.id)
wf_ex = db_api.get_workflow_execution(wf_ex.id)
@ -894,7 +892,7 @@ class DirectWorkflowRerunTest(base.EngineTestCase):
self.assertIsNotNone(task_2_ex.state_info)
# Resume workflow and re-run failed task.
self.engine.rerun_workflow(wf_ex.id, task_1_ex.id)
self.engine.rerun_workflow(task_1_ex.id)
wf_ex = db_api.get_workflow_execution(wf_ex.id)
@ -921,7 +919,7 @@ class DirectWorkflowRerunTest(base.EngineTestCase):
self.assertEqual(states.WAITING, task_3_ex.state)
# Resume workflow and re-run failed task.
self.engine.rerun_workflow(wf_ex.id, task_2_ex.id)
self.engine.rerun_workflow(task_2_ex.id)
wf_ex = db_api.get_workflow_execution(wf_ex.id)
@ -1022,7 +1020,7 @@ class DirectWorkflowRerunTest(base.EngineTestCase):
self.assertEqual(3, len(task_1_action_exs))
# Resume workflow and re-run failed task. Re-run #1 with no reset.
self.engine.rerun_workflow(wf_ex.id, task_1_ex.id, reset=False)
self.engine.rerun_workflow(task_1_ex.id, reset=False)
wf_ex = db_api.get_workflow_execution(wf_ex.id)
@ -1043,7 +1041,7 @@ class DirectWorkflowRerunTest(base.EngineTestCase):
self.assertEqual(5, len(task_1_action_exs))
# Resume workflow and re-run failed task. Re-run #2 with reset.
self.engine.rerun_workflow(wf_ex.id, task_1_ex.id, reset=True)
self.engine.rerun_workflow(task_1_ex.id, reset=True)
wf_ex = db_api.get_workflow_execution(wf_ex.id)
@ -1064,7 +1062,7 @@ class DirectWorkflowRerunTest(base.EngineTestCase):
self.assertEqual(8, len(task_1_action_exs))
# Resume workflow and re-run failed task. Re-run #3 with no reset.
self.engine.rerun_workflow(wf_ex.id, task_1_ex.id, reset=False)
self.engine.rerun_workflow(task_1_ex.id, reset=False)
wf_ex = db_api.get_workflow_execution(wf_ex.id)
@ -1085,7 +1083,7 @@ class DirectWorkflowRerunTest(base.EngineTestCase):
self.assertEqual(10, len(task_1_action_exs))
# Resume workflow and re-run failed task. Re-run #4 with no reset.
self.engine.rerun_workflow(wf_ex.id, task_1_ex.id, reset=False)
self.engine.rerun_workflow(task_1_ex.id, reset=False)
wf_ex = db_api.get_workflow_execution(wf_ex.id)
@ -1160,7 +1158,7 @@ class DirectWorkflowRerunTest(base.EngineTestCase):
self.assertIsNotNone(task_2_ex.state_info)
# Resume workflow and re-run failed task.
self.engine.rerun_workflow(wf_ex.id, task_2_ex.id)
self.engine.rerun_workflow(task_2_ex.id)
wf_ex = db_api.get_workflow_execution(wf_ex.id)
self.assertEqual(states.RUNNING, wf_ex.state)
@ -1261,7 +1259,7 @@ class DirectWorkflowRerunTest(base.EngineTestCase):
self.assertIsNotNone(sub_wf_task_ex.state_info)
# Resume workflow and re-run failed subworkflow task.
self.engine.rerun_workflow(sub_wf_ex.id, sub_wf_task_ex.id)
self.engine.rerun_workflow(sub_wf_task_ex.id)
sub_wf_ex = db_api.get_workflow_execution(sub_wf_ex.id)

View File

@ -13,7 +13,6 @@
# limitations under the License.
from oslo_config import cfg
import testtools
from mistral.actions import base as actions_base
from mistral.db.v2 import api as db_api
@ -63,20 +62,6 @@ class MyAction(actions_base.Action):
raise NotImplementedError
def expect_size_limit_exception(field_name):
def logger(test_func):
def wrapped(*args, **kwargs):
with testtools.ExpectedException(exc.SizeLimitExceededException,
value_re="Size of '%s' is 1KB "
"which exceeds the limit"
" of 0KB" % field_name):
return test_func(*args, **kwargs)
return wrapped
return logger
def generate_workflow(tokens):
new_wf = WF
long_string = ''.join('A' for _ in range(1024))
@ -91,17 +76,35 @@ class ExecutionFieldsSizeLimitTest(base.EngineTestCase):
def setUp(self):
"""Resets the size limit config between tests"""
super(ExecutionFieldsSizeLimitTest, self).setUp()
cfg.CONF.set_default('execution_field_size_limit_kb', 0,
group='engine')
cfg.CONF.set_default(
'execution_field_size_limit_kb',
0,
group='engine'
)
test_base.register_action_class('my_action', MyAction)
def tearDown(self):
"""Restores the size limit config to default"""
super(ExecutionFieldsSizeLimitTest, self).tearDown()
cfg.CONF.set_default(
'execution_field_size_limit_kb',
1024,
group='engine'
)
def test_default_limit(self):
cfg.CONF.set_default('execution_field_size_limit_kb', -1,
group='engine')
cfg.CONF.set_default(
'execution_field_size_limit_kb',
-1,
group='engine'
)
new_wf = generate_workflow(
['__ACTION_INPUT_', '__WORKFLOW_INPUT__',
'__TASK_PUBLISHED__'])
['__ACTION_INPUT_', '__WORKFLOW_INPUT__', '__TASK_PUBLISHED__']
)
wf_service.create_workflows(new_wf)
@ -110,25 +113,38 @@ class ExecutionFieldsSizeLimitTest(base.EngineTestCase):
self.await_execution_success(wf_ex.id)
@expect_size_limit_exception('input')
def test_workflow_input_default_value_limit(self):
new_wf = generate_workflow(['__WORKFLOW_INPUT__'])
wf_service.create_workflows(new_wf)
# Start workflow.
self.engine.start_workflow('wf', {})
e = self.assertRaises(
exc.SizeLimitExceededException,
self.engine.start_workflow,
'wf',
{}
)
self.assertEqual(
"Size of 'input' is 1KB which exceeds the limit of 0KB",
e.message
)
@expect_size_limit_exception('input')
def test_workflow_input_limit(self):
wf_service.create_workflows(WF)
# Start workflow.
self.engine.start_workflow(
e = self.assertRaises(
exc.SizeLimitExceededException,
self.engine.start_workflow,
'wf',
{
'workflow_input': ''.join('A' for _ in range(1024))
}
{'workflow_input': ''.join('A' for _ in range(1024))}
)
self.assertEqual(
"Size of 'input' is 1KB which exceeds the limit of 0KB",
e.message
)
def test_action_input_limit(self):
@ -149,9 +165,10 @@ class ExecutionFieldsSizeLimitTest(base.EngineTestCase):
wf_service.create_workflows(WF)
# Start workflow.
wf_ex = self.engine.start_workflow('wf', {
'action_output_length': 1024
})
wf_ex = self.engine.start_workflow(
'wf',
{'action_output_length': 1024}
)
self.await_execution_error(wf_ex.id)
@ -189,16 +206,22 @@ class ExecutionFieldsSizeLimitTest(base.EngineTestCase):
task_ex.state_info
)
@expect_size_limit_exception('params')
def test_workflow_params_limit(self):
wf_service.create_workflows(WF)
# Start workflow.
long_string = ''.join('A' for _ in range(1024))
self.engine.start_workflow('wf', {}, '', env={'param': long_string})
def tearDown(self):
"""Restores the size limit config to default"""
super(ExecutionFieldsSizeLimitTest, self).tearDown()
cfg.CONF.set_default('execution_field_size_limit_kb', 1024,
group='engine')
e = self.assertRaises(
exc.SizeLimitExceededException,
self.engine.start_workflow,
'wf',
{},
'',
env={'param': long_string}
)
self.assertIn(
"Size of 'params' is 1KB which exceeds the limit of 0KB",
e.message
)

View File

@ -104,7 +104,7 @@ class ReverseWorkflowRerunTest(base.EngineTestCase):
self.assertIsNotNone(task_2_ex.state_info)
# Resume workflow and re-run failed task.
self.engine.rerun_workflow(wf_ex.id, task_2_ex.id)
self.engine.rerun_workflow(task_2_ex.id)
wf_ex = db_api.get_workflow_execution(wf_ex.id)
self.assertEqual(states.RUNNING, wf_ex.state)
@ -203,7 +203,7 @@ class ReverseWorkflowRerunTest(base.EngineTestCase):
}
# Resume workflow and re-run failed task.
self.engine.rerun_workflow(wf_ex.id, task_2_ex.id, env=updated_env)
self.engine.rerun_workflow(task_2_ex.id, env=updated_env)
wf_ex = db_api.get_workflow_execution(wf_ex.id)
self.assertEqual(states.RUNNING, wf_ex.state)
@ -305,7 +305,6 @@ class ReverseWorkflowRerunTest(base.EngineTestCase):
e = self.assertRaises(
exc.MistralError,
self.engine.rerun_workflow,
wf_ex.id,
task_1_ex.id
)

View File

@ -289,7 +289,7 @@ class SchedulerServiceTest(base.DbTestCase):
eventlet.sleep(WAIT)
self.assertRaises(exc.DBEntityNotFoundException,
self.assertRaises(exc.DBEntityNotFoundError,
db_api.get_delayed_call,
calls[0].id
)

View File

@ -145,7 +145,7 @@ class TriggerServiceV2Test(base.DbTestCase):
# But creation with the same count and first time
# simultaneously leads to error.
self.assertRaises(
exc.DBDuplicateEntryException,
exc.DBDuplicateEntryError,
t_s.create_cron_trigger,
'trigger-%s' % utils.generate_unicode_uuid(),
self.wf.name,

View File

@ -165,7 +165,7 @@ class WorkflowServiceTest(base.DbTestCase):
def test_update_non_existing_workflow_failed(self):
exception = self.assertRaises(
exc.DBEntityNotFoundException,
exc.DBEntityNotFoundError,
wf_service.update_workflows,
WORKFLOW
)

View File

@ -24,19 +24,19 @@ class ExceptionTestCase(base.BaseTest):
"""Test cases for exception code."""
def test_nf_with_message(self):
exc = exceptions.DBEntityNotFoundException('check_for_this')
exc = exceptions.DBEntityNotFoundError('check_for_this')
self.assertIn('check_for_this',
six.text_type(exc))
self.assertEqual(404, exc.http_code)
def test_nf_with_no_message(self):
exc = exceptions.DBEntityNotFoundException()
exc = exceptions.DBEntityNotFoundError()
self.assertIn("Object not found",
six.text_type(exc))
self.assertEqual(404, exc.http_code,)
def test_duplicate_obj_code(self):
exc = exceptions.DBDuplicateEntryException()
exc = exceptions.DBDuplicateEntryError()
self.assertIn("Database object already exists",
six.text_type(exc))
self.assertEqual(409, exc.http_code,)

View File

@ -20,9 +20,9 @@ import json
import pecan
import six
from webob import Response
from wsme import exc
from wsme import exc as wsme_exc
from mistral import exceptions as ex
from mistral import exceptions as exc
def wrap_wsme_controller_exception(func):
@ -35,10 +35,14 @@ def wrap_wsme_controller_exception(func):
def wrapped(*args, **kwargs):
try:
return func(*args, **kwargs)
except ex.MistralException as excp:
pecan.response.translatable_error = excp
raise exc.ClientSideError(msg=six.text_type(excp),
status_code=excp.http_code)
except (exc.MistralException, exc.MistralError) as e:
pecan.response.translatable_error = e
raise wsme_exc.ClientSideError(
msg=six.text_type(e),
status_code=e.http_code
)
return wrapped
@ -52,31 +56,33 @@ def wrap_pecan_controller_exception(func):
def wrapped(*args, **kwargs):
try:
return func(*args, **kwargs)
except ex.MistralException as excp:
except (exc.MistralException, exc.MistralError) as e:
return Response(
status=excp.http_code,
status=e.http_code,
content_type='application/json',
body=json.dumps(dict(
faultstring=six.text_type(excp))))
body=json.dumps(dict(faultstring=six.text_type(e)))
)
return wrapped
def validate_query_params(limit, sort_keys, sort_dirs):
if limit is not None and limit <= 0:
raise exc.ClientSideError("Limit must be positive.")
raise wsme_exc.ClientSideError("Limit must be positive.")
if len(sort_keys) < len(sort_dirs):
raise exc.ClientSideError("Length of sort_keys must be equal or "
"greater than sort_dirs.")
raise wsme_exc.ClientSideError(
"Length of sort_keys must be equal or greater than sort_dirs."
)
if len(sort_keys) > len(sort_dirs):
sort_dirs.extend(['asc'] * (len(sort_keys) - len(sort_dirs)))
for sort_dir in sort_dirs:
if sort_dir not in ['asc', 'desc']:
raise exc.ClientSideError("Unknown sort direction, must be 'desc' "
"or 'asc'.")
raise wsme_exc.ClientSideError(
"Unknown sort direction, must be 'desc' or 'asc'."
)
def validate_fields(fields, object_fields):
@ -93,6 +99,6 @@ def validate_fields(fields, object_fields):
invalid_fields = set(fields) - set(object_fields)
if invalid_fields:
raise exc.ClientSideError(
raise wsme_exc.ClientSideError(
'Field(s) %s are invalid.' % ', '.join(invalid_fields)
)

View File

@ -111,6 +111,8 @@ class WorkflowController(object):
if self._is_paused_or_completed():
return []
# TODO(rakhmerov): I think it should rather be a new method
# rerun_task() because it covers a different use case.
if task_ex:
return self._get_rerun_commands([task_ex], reset, env=env)
@ -199,7 +201,11 @@ class WorkflowController(object):
:param env: A set of environment variables to overwrite.
:return: List of workflow commands.
"""
for task_ex in task_exs:
# TODO(rakhmerov): It is wrong that we update something in
# workflow controller, by design it should not change system
# state. Fix it, it should happen outside.
self._update_task_ex_env(task_ex, env)
cmds = [commands.RunExistingTask(t_e, reset) for t_e in task_exs]

View File

@ -69,6 +69,10 @@ def is_paused_or_completed(state):
return is_paused(state) or is_completed(state)
def is_paused_or_idle(state):
return is_paused(state) or is_idle(state)
def is_valid_transition(from_state, to_state):
if is_invalid(from_state) or is_invalid(to_state):
return False

View File

@ -106,33 +106,3 @@ def find_incomplete_task_executions(wf_ex):
def find_error_task_executions(wf_ex):
return find_task_executions_with_state(wf_ex, states.ERROR)
def construct_fail_info_message(wf_ctrl, wf_ex):
# Try to find where error is exactly.
failed_tasks = sorted(
filter(
lambda t: not wf_ctrl.is_error_handled_for(t),
find_error_task_executions(wf_ex)
),
key=lambda t: t.name
)
msg = ('Failure caused by error in tasks: %s\n' %
', '.join([t.name for t in failed_tasks]))
for t in failed_tasks:
msg += '\n %s [task_ex_id=%s] -> %s\n' % (t.name, t.id, t.state_info)
for i, ex in enumerate(t.executions):
if ex.state == states.ERROR:
output = (ex.output or dict()).get('result', 'Unknown')
msg += (
' [action_ex_id=%s, idx=%s]: %s\n' % (
ex.id,
i,
str(output)
)
)
return msg

View File

@ -607,10 +607,12 @@ class ExecutionTestsV2(base.TestCase):
@test.attr(type='negative')
def test_create_execution_forgot_input_params(self):
self.assertRaises(exceptions.BadRequest,
self.client.create_execution,
self.reverse_wf['name'],
params={"task_name": "nonexist"})
self.assertRaises(
exceptions.BadRequest,
self.client.create_execution,
self.reverse_wf['name'],
params={"task_name": "nonexist"}
)
@test.attr(type='sanity')
def test_action_ex_concurrency(self):