Change execution mechanism for 'join' tasks

* In order to avoid duplicates of same 'join' tasks we now use
  named locks to exclusively create a task execution for 'join'
  tasks in DB. Transaction that does that is always separate
  from task completion logic and is very short. This is needed
  to eliminate DB contention on same records of task execution
  table. This is also a reason to use a separate mechanism such
  as named locks, and additionally this reduces a number possible
  scenarios for getting into deadlocks because for task executions
  we have too many different access patterns that can lead to them
  in case of doing locking on right on their table records.
  So this approach guarantees that there's only one transaction
  creates a new task execution object for 'join' task and schedules
  'refresh_task_state' job that check 'join' completion.
* Dropped scheduler 'unique_key' column with unique constraint
  because in practice it causes DB deadlocks (at least on MySQL)
  while simultaneously inserting and updating the table
* Instead of 'unique_key' column we added non-unique 'key' column
  that can potentially be used for squashing delayed calls
  by scheduler itself (not implemented yet)
* Adjusted Scheduler implementation and tests accordingly
* Fixed task() YAQL function to work without precisely resolve
  task execution object in case it's called for the current
  task. Previously it was dependent on the luck and we were
  lucky enough that tests were passing.
* Increased length of 'unique_key' column for task executions to
  250 which is close to a limit for string fields participating
  in unique constraints.

Change-Id: Ib7aaa20c2c8834ab0f2d9c90457677c9edb62805
This commit is contained in:
Renat Akhmerov 2016-08-17 16:39:13 +07:00
parent c7bf9bb91a
commit 1b0f0cddd6
25 changed files with 469 additions and 254 deletions

View File

@ -0,0 +1,33 @@
# Copyright 2016 OpenStack Foundation.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""increate_task_execution_unique_key_size
Revision ID: 018
Revises: 017
Create Date: 2016-08-17 17:47:30.325182
"""
# revision identifiers, used by Alembic.
revision = '018'
down_revision = '017'
from alembic import op
import sqlalchemy as sa
def upgrade():
op.alter_column('task_executions_v2', 'unique_key', type_=sa.String(250))

View File

@ -0,0 +1,49 @@
# Copyright 2016 OpenStack Foundation.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Change scheduler schema.
Revision ID: 019
Revises: 018
Create Date: 2016-08-17 17:54:51.952949
"""
# revision identifiers, used by Alembic.
revision = '019'
down_revision = '018'
from alembic import op
import sqlalchemy as sa
def upgrade():
op.drop_index(
'delayed_calls_v2_processing_execution_time',
table_name='delayed_calls_v2'
)
op.drop_index('unique_key', table_name='delayed_calls_v2')
op.drop_column('delayed_calls_v2', 'unique_key')
op.add_column(
'delayed_calls_v2',
sa.Column('key', sa.String(length=250), nullable=True)
)
op.create_index(
'delayed_calls_v2_execution_time',
'delayed_calls_v2',
['execution_time'],
unique=False
)

View File

@ -308,14 +308,14 @@ def get_task_executions(limit=None, marker=None, sort_keys=['created_at'],
)
def get_incomplete_task_executions(**kwargs):
return IMPL.get_incomplete_task_executions(**kwargs)
def create_task_execution(values):
return IMPL.create_task_execution(values)
def insert_or_ignore_task_execution(values):
return IMPL.insert_or_ignore_task_execution(values)
def update_task_execution(id, values):
return IMPL.update_task_execution(id, values)
@ -342,10 +342,6 @@ def create_delayed_call(values):
return IMPL.create_delayed_call(values)
def insert_or_ignore_delayed_call(values):
return IMPL.insert_or_ignore_delayed_call(values)
def delete_delayed_call(id):
return IMPL.delete_delayed_call(id)
@ -533,8 +529,8 @@ def get_named_locks(limit=None, marker=None):
return IMPL.get_named_locks(limit=limit, marker=marker)
def delete_named_lock(name):
return IMPL.delete_named_lock(name)
def delete_named_lock(lock_id):
return IMPL.delete_named_lock(lock_id)
@contextlib.contextmanager

View File

@ -34,6 +34,7 @@ from mistral.db.v2.sqlalchemy import filters as db_filters
from mistral.db.v2.sqlalchemy import models
from mistral import exceptions as exc
from mistral.services import security
from mistral import utils
CONF = cfg.CONF
@ -796,6 +797,23 @@ def get_task_executions(**kwargs):
return _get_task_executions(**kwargs)
def get_incomplete_task_executions(**kwargs):
query = b.model_query(models.TaskExecution)
query = query.filter_by(**kwargs)
query = query.filter(
sa.or_(
models.TaskExecution.state == 'IDLE',
models.TaskExecution.state == 'RUNNING',
models.TaskExecution.state == 'WAITING',
models.TaskExecution.state == 'DELAYED'
)
)
return query.all()
@b.session_aware()
def create_task_execution(values, session=None):
task_ex = models.TaskExecution()
@ -812,10 +830,6 @@ def create_task_execution(values, session=None):
return task_ex
def insert_or_ignore_task_execution(values):
insert_or_ignore(models.TaskExecution, values.copy())
@b.session_aware()
def update_task_execution(id, values, session=None):
task_ex = get_task_execution(id)
@ -866,10 +880,6 @@ def create_delayed_call(values, session=None):
return delayed_call
def insert_or_ignore_delayed_call(values):
insert_or_ignore(models.DelayedCall, values.copy())
@b.session_aware()
def delete_delayed_call(id, session=None):
delayed_call = get_delayed_call(id)
@ -1388,9 +1398,17 @@ def create_named_lock(name, session=None):
# session may not immediately issue an SQL query to a database
# and instead just schedule it whereas we need to make sure to
# issue a query immediately.
session.flush()
insert = models.NamedLock.__table__.insert()
session.execute(insert.values(name=name))
lock_id = utils.generate_unicode_uuid()
session.execute(insert.values(id=lock_id, name=name))
session.flush()
return lock_id
def get_named_locks(**kwargs):
@ -1398,22 +1416,27 @@ def get_named_locks(**kwargs):
@b.session_aware()
def delete_named_lock(name, session=None):
def delete_named_lock(lock_id, session=None):
# This method has to work without SQLAlchemy session because
# session may not immediately issue an SQL query to a database
# and instead just schedule it whereas we need to make sure to
# issue a query immediately.
session.flush()
table = models.NamedLock.__table__
delete = table.delete()
session.execute(delete.where(table.c.name == name))
session.execute(delete.where(table.c.id == lock_id))
session.flush()
@contextlib.contextmanager
def named_lock(name):
lock_id = None
try:
create_named_lock(name)
lock_id = create_named_lock(name)
yield
finally:
delete_named_lock(name)
delete_named_lock(lock_id)

View File

@ -227,7 +227,7 @@ class TaskExecution(Execution):
# Main properties.
action_spec = sa.Column(st.JsonLongDictType())
unique_key = sa.Column(sa.String(200), nullable=True)
unique_key = sa.Column(sa.String(250), nullable=True)
# Whether the task is fully processed (publishing and calculating commands
# after it). It allows to simplify workflow controller implementations
@ -242,7 +242,7 @@ class TaskExecution(Execution):
def executions(self):
return (
self.action_executions
if self.spec.get('action')
if not self.spec.get('workflow')
else self.workflow_executions
)
@ -328,26 +328,23 @@ class DelayedCall(mb.MistralModelBase):
__tablename__ = 'delayed_calls_v2'
__table_args__ = (
sa.Index(
'%s_processing_execution_time' % __tablename__,
'processing',
'execution_time'
),
sa.UniqueConstraint('unique_key', 'processing')
)
id = mb.id_column()
factory_method_path = sa.Column(sa.String(200), nullable=True)
target_method_name = sa.Column(sa.String(80), nullable=False)
method_arguments = sa.Column(st.JsonDictType())
serializers = sa.Column(st.JsonDictType())
unique_key = sa.Column(sa.String(80), nullable=True)
key = sa.Column(sa.String(250), nullable=True)
auth_context = sa.Column(st.JsonDictType())
execution_time = sa.Column(sa.DateTime, nullable=False)
processing = sa.Column(sa.Boolean, default=False, nullable=False)
sa.Index(
'%s_execution_time' % DelayedCall.__tablename__,
DelayedCall.execution_time
)
class Environment(mb.MistralSecureModelBase):
"""Contains environment variables for workflow execution."""

View File

@ -174,9 +174,10 @@ class Action(object):
if prev_state != state:
wf_trace.info(
None,
"Action '%s' (%s) [%s -> %s, %s]" %
"Action '%s' (%s)(task=%s) [%s -> %s, %s]" %
(self.action_ex.name,
self.action_ex.id,
self.task_ex.name if self.task_ex else None,
prev_state,
state,
_result_msg())

View File

@ -13,6 +13,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import functools
from osprofiler import profiler
from mistral import exceptions as exc
@ -20,6 +21,64 @@ from mistral.workflow import commands
from mistral.workflow import states
def _compare_task_commands(a, b):
if not isinstance(a, commands.RunTask) or not a.is_waiting():
return -1
if not isinstance(b, commands.RunTask) or not b.is_waiting():
return 1
if a.unique_key == b.unique_key:
return 0
if a.unique_key < b.unique_key:
return -1
return 1
def _rearrange_commands(cmds):
"""Takes workflow commands and does required pre-processing.
The main idea of the method is to sort task commands with 'waiting'
flag by 'unique_key' property in order guarantee the same locking
order for them in parallel transactions and thereby prevent deadlocks.
It also removes commands that don't make sense. For example, if
there are some commands after a command that changes a workflow state
then they must not be dispatched.
"""
# Remove all 'noop' commands.
cmds = list(filter(lambda c: not isinstance(c, commands.Noop), cmds))
state_cmd_idx = -1
state_cmd = None
for i, cmd in enumerate(cmds):
if isinstance(cmd, commands.SetWorkflowState):
state_cmd_idx = i
state_cmd = cmd
break
# Find a position of a 'fail|succeed|pause' command
# and sort all task commands before it.
if state_cmd_idx < 0:
cmds.sort(key=functools.cmp_to_key(_compare_task_commands))
return cmds
elif state_cmd_idx == 0:
return cmds[0:1]
res = cmds[0:state_cmd_idx]
res.sort(key=functools.cmp_to_key(_compare_task_commands))
res.append(state_cmd)
return res
@profiler.trace('dispatcher-dispatch-commands')
def dispatch_workflow_commands(wf_ex, wf_cmds):
# TODO(rakhmerov): I don't like these imports but otherwise we have
@ -30,14 +89,11 @@ def dispatch_workflow_commands(wf_ex, wf_cmds):
if not wf_cmds:
return
for cmd in wf_cmds:
for cmd in _rearrange_commands(wf_cmds):
if isinstance(cmd, (commands.RunTask, commands.RunExistingTask)):
task_handler.run_task(cmd)
elif isinstance(cmd, commands.SetWorkflowState):
wf_handler.set_workflow_state(wf_ex, cmd.new_state, cmd.msg)
elif isinstance(cmd, commands.Noop):
# Do nothing.
pass
else:
raise exc.MistralError('Unsupported workflow command: %s' % cmd)

View File

@ -48,7 +48,7 @@ class OsloRPCServer(rpc_base.RPCServer):
rpc.get_transport(),
target,
self.endpoints,
executor='eventlet',
executor='blocking',
serializer=ctx.RpcContextSerializer(ctx.JsonPayloadSerializer())
)

View File

@ -72,20 +72,9 @@ def run_task(wf_cmd):
return
if not task.task_ex:
# It is possible that task execution was not created
# (and therefore not associated with Task instance).
# For example, in case of 'join' that has already been
# created by a different transaction. In this case
# we should skip post completion scheduled checks.
return
if task.is_waiting():
if task.is_waiting() and (task.is_created() or task.is_state_changed()):
_schedule_refresh_task_state(task.task_ex)
if task.is_completed():
wf_handler.schedule_on_task_complete(task.task_ex)
@profiler.trace('task-handler-on-action-complete')
def _on_action_complete(action_ex):
@ -128,9 +117,6 @@ def _on_action_complete(action_ex):
return
if task.is_completed():
wf_handler.schedule_on_task_complete(task_ex)
def fail_task(task_ex, msg):
wf_spec = spec_parser.get_workflow_spec_by_execution_id(
@ -171,9 +157,6 @@ def continue_task(task_ex):
return
if task.is_completed():
wf_handler.schedule_on_task_complete(task_ex)
def complete_task(task_ex, state, state_info):
wf_spec = spec_parser.get_workflow_spec_by_execution_id(
@ -200,15 +183,12 @@ def complete_task(task_ex, state, state_info):
return
if task.is_completed():
wf_handler.schedule_on_task_complete(task_ex)
def _build_task_from_execution(wf_spec, task_ex, task_spec=None):
def _build_task_from_execution(wf_spec, task_ex):
return _create_task(
task_ex.workflow_execution,
wf_spec,
task_spec or spec_parser.get_task_spec(task_ex.spec),
wf_spec.get_task(task_ex.name),
task_ex.in_context,
task_ex
)
@ -223,7 +203,8 @@ def _build_task_from_command(cmd):
spec_parser.get_task_spec(cmd.task_ex.spec),
cmd.ctx,
task_ex=cmd.task_ex,
unique_key=cmd.task_ex.unique_key
unique_key=cmd.task_ex.unique_key,
waiting=cmd.task_ex.state == states.WAITING
)
if cmd.reset:
@ -237,39 +218,26 @@ def _build_task_from_command(cmd):
cmd.wf_spec,
cmd.task_spec,
cmd.ctx,
unique_key=cmd.unique_key
unique_key=cmd.unique_key,
waiting=cmd.is_waiting()
)
if cmd.is_waiting():
task.defer()
return task
raise exc.MistralError('Unsupported workflow command: %s' % cmd)
def _create_task(wf_ex, wf_spec, task_spec, ctx, task_ex=None,
unique_key=None):
unique_key=None, waiting=False):
if task_spec.get_with_items():
return tasks.WithItemsTask(
wf_ex,
wf_spec,
task_spec,
ctx,
task_ex,
unique_key
)
cls = tasks.WithItemsTask
else:
cls = tasks.RegularTask
return tasks.RegularTask(
wf_ex,
wf_spec,
task_spec,
ctx,
task_ex,
unique_key
)
return cls(wf_ex, wf_spec, task_spec, ctx, task_ex, unique_key, waiting)
@profiler.trace('task-handler-refresh-task-state')
def _refresh_task_state(task_ex_id):
with db_api.transaction():
task_ex = db_api.get_task_execution(task_ex_id)
@ -315,7 +283,6 @@ def _schedule_refresh_task_state(task_ex, delay=0):
:param task_ex: Task execution.
:param delay: Delay.
:return:
"""
key = 'th_c_t_s_a-%s' % task_ex.id
@ -323,7 +290,7 @@ def _schedule_refresh_task_state(task_ex, delay=0):
None,
_REFRESH_TASK_STATE_PATH,
delay,
unique_key=key,
key=key,
task_ex_id=task_ex.id
)
@ -366,7 +333,7 @@ def schedule_on_action_complete(action_ex, delay=0):
None,
_SCHEDULED_ON_ACTION_COMPLETE_PATH,
delay,
unique_key=key,
key=key,
action_ex_id=action_ex.id,
wf_action=isinstance(action_ex, models.WorkflowExecution)
)

View File

@ -47,15 +47,17 @@ class Task(object):
@profiler.trace('task-create')
def __init__(self, wf_ex, wf_spec, task_spec, ctx, task_ex=None,
unique_key=None):
unique_key=None, waiting=False):
self.wf_ex = wf_ex
self.task_spec = task_spec
self.ctx = ctx
self.task_ex = task_ex
self.wf_spec = wf_spec
self.unique_key = unique_key
self.waiting = False
self.waiting = waiting
self.reset_flag = False
self.created = False
self.state_changed = False
def is_completed(self):
return self.task_ex and states.is_completed(self.task_ex.state)
@ -63,6 +65,12 @@ class Task(object):
def is_waiting(self):
return self.waiting
def is_created(self):
return self.created
def is_state_changed(self):
return self.state_changed
@abc.abstractmethod
def on_action_complete(self, action_ex):
"""Handle action completion.
@ -82,21 +90,24 @@ class Task(object):
This method puts task to a waiting state.
"""
with db_api.named_lock(self.unique_key):
if not self.task_ex:
t_execs = db_api.get_task_executions(
workflow_execution_id=self.wf_ex.id,
name=self.task_spec.get_name()
unique_key=self.unique_key
)
self.task_ex = t_execs[0] if t_execs else None
msg = 'Task is waiting.'
if not self.task_ex:
self._create_task_execution()
if self.task_ex:
self.set_state(states.WAITING, 'Task is deferred.')
self.waiting = True
self._create_task_execution(
state=states.WAITING,
state_info=msg
)
elif self.task_ex.state != states.WAITING:
self.set_state(states.WAITING, msg)
def reset(self):
self.reset_flag = True
@ -124,6 +135,8 @@ class Task(object):
state_info)
)
self.state_changed = True
self.task_ex.state = state
self.task_ex.state_info = state_info
@ -172,7 +185,7 @@ class Task(object):
wf_ctrl = wf_base.get_controller(self.wf_ex, self.wf_spec)
# Calculate commands to process next.
cmds = wf_ctrl.continue_workflow(self.task_ex)
cmds = wf_ctrl.continue_workflow(task_ex=self.task_ex)
# Mark task as processed after all decisions have been made
# upon its completion.
@ -192,14 +205,20 @@ class Task(object):
for p in policies.build_policies(policies_spec, self.wf_spec):
p.after_task_complete(self.task_ex, self.task_spec)
def _create_task_execution(self, state=states.RUNNING):
def _create_task_execution(self, state=states.RUNNING, state_info=None):
task_id = utils.generate_unicode_uuid()
task_name = self.task_spec.get_name()
data_flow.add_current_task_to_context(self.ctx, task_id, task_name)
values = {
'id': utils.generate_unicode_uuid(),
'name': self.task_spec.get_name(),
'id': task_id,
'name': task_name,
'workflow_execution_id': self.wf_ex.id,
'workflow_name': self.wf_ex.workflow_name,
'workflow_id': self.wf_ex.workflow_id,
'state': state,
'state_info': state_info,
'spec': self.task_spec.to_dict(),
'unique_key': self.unique_key,
'in_context': self.ctx,
@ -208,23 +227,13 @@ class Task(object):
'project_id': self.wf_ex.project_id
}
db_api.insert_or_ignore_task_execution(values)
# Since 'insert_or_ignore' cannot return a valid count of updated
# rows the only reliable way to check if insert operation has created
# an object is try to load this object by just generated uuid.
task_ex = db_api.load_task_execution(values['id'])
if not task_ex:
return False
self.task_ex = task_ex
self.task_ex = db_api.create_task_execution(values)
# Add to collection explicitly so that it's in a proper
# state within the current session.
self.wf_ex.task_executions.append(self.task_ex)
return True
self.created = True
def _get_action_defaults(self):
action_name = self.task_spec.get_action_name()
@ -262,12 +271,12 @@ class RegularTask(Task):
self._run_existing()
def _run_new(self):
if not self._create_task_execution():
# Task with the same unique key has already been created.
if self.waiting:
self.defer()
return
if self.waiting:
return
self._create_task_execution()
LOG.debug(
'Starting task [workflow=%s, task_spec=%s, init_state=%s]' %

View File

@ -27,7 +27,9 @@ from mistral.workflow import states
LOG = logging.getLogger(__name__)
_ON_TASK_COMPLETE_PATH = 'mistral.engine.workflow_handler._on_task_complete'
_CHECK_AND_COMPLETE_PATH = (
'mistral.engine.workflow_handler._check_and_complete'
)
@profiler.trace('workflow-handler-start-workflow')
@ -38,6 +40,8 @@ def start_workflow(wf_identifier, wf_input, desc, params):
wf.start(wf_input, desc=desc, params=params)
_schedule_check_and_complete(wf.wf_ex)
return wf.wf_ex
@ -73,13 +77,11 @@ def cancel_workflow(wf_ex, msg=None):
stop_workflow(wf_ex, states.CANCELLED, msg)
@profiler.trace('workflow-handler-on-task-complete')
def _on_task_complete(task_ex_id):
@profiler.trace('workflow-handler-check-and-complete')
def _check_and_complete(wf_ex_id):
# Note: This method can only be called via scheduler.
with db_api.transaction():
task_ex = db_api.get_task_execution(task_ex_id)
wf_ex = task_ex.workflow_execution
wf_ex = db_api.get_workflow_execution(wf_ex_id)
wf = workflows.Workflow(
db_api.get_workflow_definition(wf_ex.workflow_id),
@ -87,11 +89,11 @@ def _on_task_complete(task_ex_id):
)
try:
wf.on_task_complete(task_ex)
wf.check_and_complete()
except exc.MistralException as e:
msg = (
"Failed to handle task completion [wf_ex=%s, task_ex=%s]:"
" %s\n%s" % (wf_ex, task_ex, e, tb.format_exc())
"Failed to check and complete [wf_ex=%s]:"
" %s\n%s" % (wf_ex, e, tb.format_exc())
)
LOG.error(msg)
@ -105,7 +107,7 @@ def _on_task_complete(task_ex_id):
# algorithm for increasing delay for rescheduling so that we don't
# put too serious load onto scheduler.
delay = 1
schedule_on_task_complete(task_ex, delay)
_schedule_check_and_complete(wf_ex, delay)
def pause_workflow(wf_ex, msg=None):
@ -128,6 +130,11 @@ def rerun_workflow(wf_ex, task_ex, reset=True, env=None):
wf.rerun(task_ex, reset=reset, env=env)
_schedule_check_and_complete(wf_ex)
if wf_ex.task_execution_id:
_schedule_check_and_complete(wf_ex.task_execution.workflow_execution)
def resume_workflow(wf_ex, env=None):
if not states.is_paused_or_idle(wf_ex.state):
@ -153,9 +160,9 @@ def set_workflow_state(wf_ex, state, msg=None):
)
@profiler.trace('workflow-handler-schedule-on-task-complete')
def schedule_on_task_complete(task_ex, delay=0):
"""Schedules task completion check.
@profiler.trace('workflow-handler-schedule-check-and-complete')
def _schedule_check_and_complete(wf_ex, delay=0):
"""Schedules workflow completion check.
This method provides transactional decoupling of task completion from
workflow completion check. It's needed in non-locking model in order to
@ -165,16 +172,17 @@ def schedule_on_task_complete(task_ex, delay=0):
have in this case (time between transactions) whereas scheduler is a
special component that is designed to be resistant to failures.
:param task_ex: Task execution.
:param wf_ex: Workflow execution.
:param delay: Minimum amount of time before task completion check
should be made.
"""
key = 'wfh_on_t_c-%s' % task_ex.workflow_execution_id
# TODO(rakhmerov): update docstring
key = 'wfh_on_c_a_c-%s' % wf_ex.id
scheduler.schedule_call(
None,
_ON_TASK_COMPLETE_PATH,
_CHECK_AND_COMPLETE_PATH,
delay,
unique_key=key,
task_ex_id=task_ex.id
key=key,
wf_ex_id=wf_ex.id
)

View File

@ -56,6 +56,14 @@ class Workflow(object):
def __init__(self, wf_def, wf_ex=None):
self.wf_def = wf_def
self.wf_ex = wf_ex
if wf_ex:
# We're processing a workflow that's already in progress.
self.wf_spec = spec_parser.get_workflow_spec_by_execution_id(
wf_ex.id
)
else:
# New workflow execution.
self.wf_spec = spec_parser.get_workflow_spec_by_definition_id(
wf_def.id,
wf_def.updated_at
@ -115,17 +123,6 @@ class Workflow(object):
elif state == states.CANCELLED:
return self._cancel_workflow(msg)
@profiler.trace('workflow-on-task-complete')
def on_task_complete(self, task_ex):
"""Handle task completion event.
:param task_ex: Task execution that's completed.
"""
assert self.wf_ex
self._check_and_complete()
def resume(self, env=None):
"""Resume workflow.
@ -190,10 +187,10 @@ class Workflow(object):
if states.is_completed(t_ex.state) and not t_ex.processed:
t_ex.processed = True
if cmds:
dispatcher.dispatch_workflow_commands(self.wf_ex, cmds)
if not cmds:
self._check_and_complete()
else:
self.check_and_complete()
@profiler.trace('workflow-lock')
def lock(self):
@ -276,13 +273,16 @@ class Workflow(object):
parent_task_ex.state_info = None
parent_task_ex.processed = False
def _check_and_complete(self):
@profiler.trace('workflow-check-and-complete')
def check_and_complete(self):
if states.is_paused_or_completed(self.wf_ex.state):
return
# Workflow is not completed if there are any incomplete task
# executions.
incomplete_tasks = wf_utils.find_incomplete_task_executions(self.wf_ex)
incomplete_tasks = db_api.get_incomplete_task_executions(
workflow_execution_id=self.wf_ex.id,
)
if incomplete_tasks:
return

View File

@ -37,7 +37,7 @@ _schedulers = {}
def schedule_call(factory_method_path, target_method_name,
run_after, serializers=None, unique_key=None, **method_args):
run_after, serializers=None, key=None, **method_args):
"""Schedules call and lately invokes target_method.
Add this call specification to DB, and then after run_after
@ -54,11 +54,8 @@ def schedule_call(factory_method_path, target_method_name,
{ "result": "mistral.utils.serializer.ResultSerializer"}
Serializer for the object type must implement serializer interface
in mistral/utils/serializer.py
:param unique_key: Unique key which in combination with 'processing'
flag restricts a number of delayed calls if it's passed. For example,
if we schedule two calls but pass the same unique key for them then
we won't get two of them in DB if both have same value of 'processing'
flag.
:param key: Key which can potentially be used for squashing similar
delayed calls.
:param method_args: Target method keyword arguments.
"""
ctx_serializer = context.RpcContextSerializer(
@ -95,12 +92,12 @@ def schedule_call(factory_method_path, target_method_name,
'execution_time': execution_time,
'auth_context': ctx,
'serializers': serializers,
'unique_key': unique_key,
'key': key,
'method_arguments': method_args,
'processing': False
}
db_api.insert_or_ignore_delayed_call(values)
db_api.create_delayed_call(values)
class CallScheduler(periodic_task.PeriodicTasks):

View File

@ -15,20 +15,39 @@
from oslo_config import cfg
from oslo_utils import timeutils
from mistral.db.v2.sqlalchemy import api as db_api
from mistral.db.v2.sqlalchemy import models as db_models
from mistral.tests.unit import base as test_base
WF_EX = {
'id': '1',
'spec': {},
'start_params': {'task': 'my_task1'},
'state': 'IDLE',
'state_info': "Running...",
'created_at': None,
'updated_at': None,
'context': None,
'task_id': None,
'trust_id': None,
'description': None,
'output': None
}
DELAYED_CALL = {
'factory_method_path': 'my_factory_method',
'target_method_name': 'my_target_method',
'method_arguments': None,
'serializers': None,
'auth_context': None,
'execution_time': timeutils.utcnow()
TASK_EX = {
'workflow_execution_id': '1',
'workflow_name': 'my_wf',
'name': 'my_task1',
'spec': None,
'action_spec': None,
'state': 'IDLE',
'tags': ['deployment'],
'in_context': None,
'runtime_context': None,
'created_at': None,
'updated_at': None
}
@ -38,6 +57,8 @@ class InsertOrIgnoreTest(test_base.DbTestCase):
cfg.CONF.set_default('auth_enable', True, group='pecan')
db_api.create_workflow_execution(WF_EX)
self.addCleanup(
cfg.CONF.set_default,
'auth_enable',
@ -47,46 +68,46 @@ class InsertOrIgnoreTest(test_base.DbTestCase):
def test_insert_or_ignore_without_conflicts(self):
db_api.insert_or_ignore(
db_models.DelayedCall,
DELAYED_CALL.copy()
db_models.TaskExecution,
TASK_EX.copy()
)
delayed_calls = db_api.get_delayed_calls()
task_execs = db_api.get_task_executions()
self.assertEqual(1, len(delayed_calls))
self.assertEqual(1, len(task_execs))
delayed_call = delayed_calls[0]
task_ex = task_execs[0]
self._assert_dict_contains_subset(DELAYED_CALL, delayed_call.to_dict())
self._assert_dict_contains_subset(TASK_EX, task_ex.to_dict())
def test_insert_or_ignore_with_conflicts(self):
# Insert the first object.
values = DELAYED_CALL.copy()
values = TASK_EX.copy()
values['unique_key'] = 'key'
db_api.insert_or_ignore(db_models.DelayedCall, values)
db_api.insert_or_ignore(db_models.TaskExecution, values)
delayed_calls = db_api.get_delayed_calls()
task_execs = db_api.get_task_executions()
self.assertEqual(1, len(delayed_calls))
self.assertEqual(1, len(task_execs))
delayed_call = delayed_calls[0]
task_ex = task_execs[0]
self._assert_dict_contains_subset(DELAYED_CALL, delayed_call.to_dict())
self._assert_dict_contains_subset(TASK_EX, task_ex.to_dict())
# Insert the second object with the same unique key.
# We must not get exceptions and new object must not be saved.
values = DELAYED_CALL.copy()
values = TASK_EX.copy()
values['unique_key'] = 'key'
db_api.insert_or_ignore(db_models.DelayedCall, values)
db_api.insert_or_ignore(db_models.TaskExecution, values)
delayed_calls = db_api.get_delayed_calls()
task_execs = db_api.get_task_executions()
self.assertEqual(1, len(delayed_calls))
self.assertEqual(1, len(task_execs))
delayed_call = delayed_calls[0]
task_ex = task_execs[0]
self._assert_dict_contains_subset(DELAYED_CALL, delayed_call.to_dict())
self._assert_dict_contains_subset(TASK_EX, task_ex.to_dict())

View File

@ -2246,13 +2246,13 @@ class LockTest(SQLAlchemyTest):
self.assertEqual('lock1', locks[0].name)
db_api.delete_named_lock('invalid_lock_name')
db_api.delete_named_lock('invalid_lock_id')
locks = db_api.get_named_locks()
self.assertEqual(1, len(locks))
db_api.delete_named_lock(locks[0].name)
db_api.delete_named_lock(locks[0].id)
locks = db_api.get_named_locks()

View File

@ -141,8 +141,8 @@ class EngineTestCase(base.DbTestCase):
for w in wf_execs:
print(
"\n%s [state=%s, state_info=%s, output=%s]" %
(w.name, w.state, w.state_info, w.output)
"\n%s (%s) [state=%s, state_info=%s, output=%s]" %
(w.name, w.id, w.state, w.state_info, w.output)
)
for t in w.task_executions:

View File

@ -163,6 +163,7 @@ class DataFlowEngineTest(engine_test_base.EngineTestCase):
self.assertDictEqual(exp_published_arr[0], task1.published)
self.assertDictEqual(exp_published_arr[1], task2.published)
self.assertDictEqual(exp_published_arr[2], task3.published)
self.assertIn(exp_published_arr[0]['progress'], notify_published_arr)
self.assertIn(exp_published_arr[1]['progress'], notify_published_arr)
self.assertIn(exp_published_arr[2]['progress'], notify_published_arr)

View File

@ -341,46 +341,3 @@ class SchedulerServiceTest(base.DbTestCase):
db_api.get_delayed_call(calls[0].id)
db_api.delete_delayed_call(calls[0].id)
def test_schedule_with_unique_key(self):
method_args = {'name': 'task', 'id': '321'}
key = 'my_unique_key'
scheduler.schedule_call(
None,
TARGET_METHOD_PATH,
DELAY,
unique_key=key,
**method_args
)
self.assertEqual(1, len(db_api.get_delayed_calls()))
# Schedule the call for the second time, number of calls
# must not change due to the same unique key.
scheduler.schedule_call(
None,
TARGET_METHOD_PATH,
DELAY,
unique_key=key,
**method_args
)
calls = db_api.get_delayed_calls()
self.assertEqual(1, len(calls))
# Now change 'processing' flag and make sure we can schedule
# one more call because DB constraint allows it.
db_api.update_delayed_call(calls[0].id, {'processing': True})
scheduler.schedule_call(
None,
TARGET_METHOD_PATH,
DELAY,
unique_key=key,
**method_args
)
self.assertEqual(2, len(db_api.get_delayed_calls()))

View File

@ -0,0 +1,75 @@
# Copyright 2013 - Mirantis, Inc.
# Copyright 2015 - StackStorm, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from mistral.engine import dispatcher
from mistral.tests.unit import base
from mistral.workflow import commands
def _print_commands(cmds):
print("commands:")
for cmd in cmds:
if isinstance(cmd, commands.RunTask):
print("%s, %s, %s" % (type(cmd), cmd.is_waiting(), cmd.unique_key))
else:
print("%s" % type(cmd))
class CommandDispatcherTest(base.BaseTest):
def setUp(self):
super(CommandDispatcherTest, self).setUp()
def test_rearrange_commands(self):
no_wait = commands.RunTask(None, None, None, None)
fail = commands.FailWorkflow(None, None, None, None)
succeed = commands.SucceedWorkflow(None, None, None, None)
wait1 = commands.RunTask(None, None, None, None)
wait1.wait = True
wait1.unique_key = 'wait1'
wait2 = commands.RunTask(None, None, None, None)
wait2.wait = True
wait2.unique_key = 'wait2'
wait3 = commands.RunTask(None, None, None, None)
wait3.wait = True
wait3.unique_key = 'wait3'
# 'set state' command is the first, others must be ignored.
initial = [fail, no_wait, wait1, wait3, wait2]
expected = [fail]
cmds = dispatcher._rearrange_commands(initial)
self.assertEqual(expected, cmds)
# 'set state' command is the last, tasks before it must be sorted.
initial = [no_wait, wait2, wait1, wait3, succeed]
expected = [no_wait, wait1, wait2, wait3, succeed]
cmds = dispatcher._rearrange_commands(initial)
self.assertEqual(expected, cmds)
# 'set state' command is in the middle, tasks before it must be sorted
# and the task after it must be ignored.
initial = [wait3, wait2, no_wait, succeed, wait1]
expected = [no_wait, wait2, wait3, succeed]
cmds = dispatcher._rearrange_commands(initial)
self.assertEqual(expected, cmds)

View File

@ -19,6 +19,8 @@ from mistral.db.v2 import api as db_api
from mistral.workflow import utils as wf_utils
from oslo_serialization import jsonutils
from stevedore import extension
ROOT_CONTEXT = None
@ -36,6 +38,7 @@ def get_yaql_context(data_context):
if isinstance(data_context, dict):
new_ctx['__env'] = data_context.get('__env')
new_ctx['__execution'] = data_context.get('__execution')
new_ctx['__task_execution'] = data_context.get('__task_execution')
return new_ctx
@ -85,6 +88,13 @@ def task_(context, task_name):
wf_ex = db_api.get_workflow_execution(context['__execution']['id'])
# This section may not exist in a context if it's calculated not in
# task scope.
cur_task = context['__task_execution']
if cur_task and cur_task['name'] == task_name:
task_ex = db_api.get_task_execution(cur_task['id'])
else:
task_execs = wf_utils.find_task_executions_by_name(wf_ex, task_name)
# TODO(rakhmerov): Account for multiple executions (i.e. in case of

View File

@ -130,6 +130,9 @@ class WorkflowSpec(base.BaseSpec):
def get_tasks(self):
return self._tasks
def get_task(self, name):
return self._tasks[name]
class DirectWorkflowSpec(WorkflowSpec):
_polymorphic_value = 'direct'

View File

@ -138,6 +138,15 @@ def evaluate_workflow_output(wf_spec, ctx):
return output or ctx
def add_current_task_to_context(ctx, task_id, task_name):
ctx['__task_execution'] = {
'id': task_id,
'name': task_name
}
return ctx
def add_openstack_data_to_context(wf_ex):
wf_ex.context = wf_ex.context or {}

View File

@ -113,6 +113,8 @@ class DirectWorkflowController(base.WorkflowController):
cmds = []
ctx = data_flow.evaluate_task_outbound_context(task_ex)
for t_n, params in self._find_next_tasks(task_ex):
t_s = self.wf_spec.get_tasks()[t_n]
@ -126,7 +128,7 @@ class DirectWorkflowController(base.WorkflowController):
self.wf_ex,
self.wf_spec,
t_s,
data_flow.evaluate_task_outbound_context(task_ex),
ctx,
params
)

View File

@ -40,7 +40,7 @@ class ReverseWorkflowController(base.WorkflowController):
__workflow_type__ = "reverse"
def _find_next_commands(self, task_ex=None):
def _find_next_commands(self, task_ex):
"""Finds all tasks with resolved dependencies.
This method finds all tasks with resolved dependencies and

View File

@ -70,6 +70,12 @@ class ResultSerializer(serializers.Serializer):
)
# TODO(rakhmerov): Most of these 'find..' methods should be replaced
# with corresponding methods on DB API because in most cases
# we don't need to fetch the whole collection of task executions
# from DB. We can just query only needed objects by needed criteria.
# In practice it leads to serious performance loss.
def find_task_execution_not_state(wf_ex, task_spec, state):
task_execs = [
t for t in wf_ex.task_executions
@ -123,11 +129,6 @@ def find_successful_task_executions(wf_ex):
return find_task_executions_with_state(wf_ex, states.SUCCESS)
def find_incomplete_task_executions(wf_ex):
return [t for t in wf_ex.task_executions
if not states.is_completed(t.state)]
def find_error_task_executions(wf_ex):
return find_task_executions_with_state(wf_ex, states.ERROR)