Merge "Change execution mechanism for 'join' tasks"

This commit is contained in:
Jenkins 2016-09-08 10:10:32 +00:00 committed by Gerrit Code Review
commit f3a40c1866
25 changed files with 469 additions and 254 deletions

View File

@ -0,0 +1,33 @@
# Copyright 2016 OpenStack Foundation.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""increate_task_execution_unique_key_size
Revision ID: 018
Revises: 017
Create Date: 2016-08-17 17:47:30.325182
"""
# revision identifiers, used by Alembic.
revision = '018'
down_revision = '017'
from alembic import op
import sqlalchemy as sa
def upgrade():
op.alter_column('task_executions_v2', 'unique_key', type_=sa.String(250))

View File

@ -0,0 +1,49 @@
# Copyright 2016 OpenStack Foundation.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Change scheduler schema.
Revision ID: 019
Revises: 018
Create Date: 2016-08-17 17:54:51.952949
"""
# revision identifiers, used by Alembic.
revision = '019'
down_revision = '018'
from alembic import op
import sqlalchemy as sa
def upgrade():
op.drop_index(
'delayed_calls_v2_processing_execution_time',
table_name='delayed_calls_v2'
)
op.drop_index('unique_key', table_name='delayed_calls_v2')
op.drop_column('delayed_calls_v2', 'unique_key')
op.add_column(
'delayed_calls_v2',
sa.Column('key', sa.String(length=250), nullable=True)
)
op.create_index(
'delayed_calls_v2_execution_time',
'delayed_calls_v2',
['execution_time'],
unique=False
)

View File

@ -308,14 +308,14 @@ def get_task_executions(limit=None, marker=None, sort_keys=['created_at'],
) )
def get_incomplete_task_executions(**kwargs):
return IMPL.get_incomplete_task_executions(**kwargs)
def create_task_execution(values): def create_task_execution(values):
return IMPL.create_task_execution(values) return IMPL.create_task_execution(values)
def insert_or_ignore_task_execution(values):
return IMPL.insert_or_ignore_task_execution(values)
def update_task_execution(id, values): def update_task_execution(id, values):
return IMPL.update_task_execution(id, values) return IMPL.update_task_execution(id, values)
@ -342,10 +342,6 @@ def create_delayed_call(values):
return IMPL.create_delayed_call(values) return IMPL.create_delayed_call(values)
def insert_or_ignore_delayed_call(values):
return IMPL.insert_or_ignore_delayed_call(values)
def delete_delayed_call(id): def delete_delayed_call(id):
return IMPL.delete_delayed_call(id) return IMPL.delete_delayed_call(id)
@ -533,8 +529,8 @@ def get_named_locks(limit=None, marker=None):
return IMPL.get_named_locks(limit=limit, marker=marker) return IMPL.get_named_locks(limit=limit, marker=marker)
def delete_named_lock(name): def delete_named_lock(lock_id):
return IMPL.delete_named_lock(name) return IMPL.delete_named_lock(lock_id)
@contextlib.contextmanager @contextlib.contextmanager

View File

@ -34,6 +34,7 @@ from mistral.db.v2.sqlalchemy import filters as db_filters
from mistral.db.v2.sqlalchemy import models from mistral.db.v2.sqlalchemy import models
from mistral import exceptions as exc from mistral import exceptions as exc
from mistral.services import security from mistral.services import security
from mistral import utils
CONF = cfg.CONF CONF = cfg.CONF
@ -796,6 +797,23 @@ def get_task_executions(**kwargs):
return _get_task_executions(**kwargs) return _get_task_executions(**kwargs)
def get_incomplete_task_executions(**kwargs):
query = b.model_query(models.TaskExecution)
query = query.filter_by(**kwargs)
query = query.filter(
sa.or_(
models.TaskExecution.state == 'IDLE',
models.TaskExecution.state == 'RUNNING',
models.TaskExecution.state == 'WAITING',
models.TaskExecution.state == 'DELAYED'
)
)
return query.all()
@b.session_aware() @b.session_aware()
def create_task_execution(values, session=None): def create_task_execution(values, session=None):
task_ex = models.TaskExecution() task_ex = models.TaskExecution()
@ -812,10 +830,6 @@ def create_task_execution(values, session=None):
return task_ex return task_ex
def insert_or_ignore_task_execution(values):
insert_or_ignore(models.TaskExecution, values.copy())
@b.session_aware() @b.session_aware()
def update_task_execution(id, values, session=None): def update_task_execution(id, values, session=None):
task_ex = get_task_execution(id) task_ex = get_task_execution(id)
@ -866,10 +880,6 @@ def create_delayed_call(values, session=None):
return delayed_call return delayed_call
def insert_or_ignore_delayed_call(values):
insert_or_ignore(models.DelayedCall, values.copy())
@b.session_aware() @b.session_aware()
def delete_delayed_call(id, session=None): def delete_delayed_call(id, session=None):
delayed_call = get_delayed_call(id) delayed_call = get_delayed_call(id)
@ -1388,9 +1398,17 @@ def create_named_lock(name, session=None):
# session may not immediately issue an SQL query to a database # session may not immediately issue an SQL query to a database
# and instead just schedule it whereas we need to make sure to # and instead just schedule it whereas we need to make sure to
# issue a query immediately. # issue a query immediately.
session.flush()
insert = models.NamedLock.__table__.insert() insert = models.NamedLock.__table__.insert()
session.execute(insert.values(name=name)) lock_id = utils.generate_unicode_uuid()
session.execute(insert.values(id=lock_id, name=name))
session.flush()
return lock_id
def get_named_locks(**kwargs): def get_named_locks(**kwargs):
@ -1398,22 +1416,27 @@ def get_named_locks(**kwargs):
@b.session_aware() @b.session_aware()
def delete_named_lock(name, session=None): def delete_named_lock(lock_id, session=None):
# This method has to work without SQLAlchemy session because # This method has to work without SQLAlchemy session because
# session may not immediately issue an SQL query to a database # session may not immediately issue an SQL query to a database
# and instead just schedule it whereas we need to make sure to # and instead just schedule it whereas we need to make sure to
# issue a query immediately. # issue a query immediately.
session.flush()
table = models.NamedLock.__table__ table = models.NamedLock.__table__
delete = table.delete() delete = table.delete()
session.execute(delete.where(table.c.name == name)) session.execute(delete.where(table.c.id == lock_id))
session.flush()
@contextlib.contextmanager @contextlib.contextmanager
def named_lock(name): def named_lock(name):
lock_id = None
try: try:
create_named_lock(name) lock_id = create_named_lock(name)
yield yield
finally: finally:
delete_named_lock(name) delete_named_lock(lock_id)

View File

@ -227,7 +227,7 @@ class TaskExecution(Execution):
# Main properties. # Main properties.
action_spec = sa.Column(st.JsonLongDictType()) action_spec = sa.Column(st.JsonLongDictType())
unique_key = sa.Column(sa.String(200), nullable=True) unique_key = sa.Column(sa.String(250), nullable=True)
# Whether the task is fully processed (publishing and calculating commands # Whether the task is fully processed (publishing and calculating commands
# after it). It allows to simplify workflow controller implementations # after it). It allows to simplify workflow controller implementations
@ -242,7 +242,7 @@ class TaskExecution(Execution):
def executions(self): def executions(self):
return ( return (
self.action_executions self.action_executions
if self.spec.get('action') if not self.spec.get('workflow')
else self.workflow_executions else self.workflow_executions
) )
@ -328,26 +328,23 @@ class DelayedCall(mb.MistralModelBase):
__tablename__ = 'delayed_calls_v2' __tablename__ = 'delayed_calls_v2'
__table_args__ = (
sa.Index(
'%s_processing_execution_time' % __tablename__,
'processing',
'execution_time'
),
sa.UniqueConstraint('unique_key', 'processing')
)
id = mb.id_column() id = mb.id_column()
factory_method_path = sa.Column(sa.String(200), nullable=True) factory_method_path = sa.Column(sa.String(200), nullable=True)
target_method_name = sa.Column(sa.String(80), nullable=False) target_method_name = sa.Column(sa.String(80), nullable=False)
method_arguments = sa.Column(st.JsonDictType()) method_arguments = sa.Column(st.JsonDictType())
serializers = sa.Column(st.JsonDictType()) serializers = sa.Column(st.JsonDictType())
unique_key = sa.Column(sa.String(80), nullable=True) key = sa.Column(sa.String(250), nullable=True)
auth_context = sa.Column(st.JsonDictType()) auth_context = sa.Column(st.JsonDictType())
execution_time = sa.Column(sa.DateTime, nullable=False) execution_time = sa.Column(sa.DateTime, nullable=False)
processing = sa.Column(sa.Boolean, default=False, nullable=False) processing = sa.Column(sa.Boolean, default=False, nullable=False)
sa.Index(
'%s_execution_time' % DelayedCall.__tablename__,
DelayedCall.execution_time
)
class Environment(mb.MistralSecureModelBase): class Environment(mb.MistralSecureModelBase):
"""Contains environment variables for workflow execution.""" """Contains environment variables for workflow execution."""

View File

@ -174,9 +174,10 @@ class Action(object):
if prev_state != state: if prev_state != state:
wf_trace.info( wf_trace.info(
None, None,
"Action '%s' (%s) [%s -> %s, %s]" % "Action '%s' (%s)(task=%s) [%s -> %s, %s]" %
(self.action_ex.name, (self.action_ex.name,
self.action_ex.id, self.action_ex.id,
self.task_ex.name if self.task_ex else None,
prev_state, prev_state,
state, state,
_result_msg()) _result_msg())

View File

@ -13,6 +13,7 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
import functools
from osprofiler import profiler from osprofiler import profiler
from mistral import exceptions as exc from mistral import exceptions as exc
@ -20,6 +21,64 @@ from mistral.workflow import commands
from mistral.workflow import states from mistral.workflow import states
def _compare_task_commands(a, b):
if not isinstance(a, commands.RunTask) or not a.is_waiting():
return -1
if not isinstance(b, commands.RunTask) or not b.is_waiting():
return 1
if a.unique_key == b.unique_key:
return 0
if a.unique_key < b.unique_key:
return -1
return 1
def _rearrange_commands(cmds):
"""Takes workflow commands and does required pre-processing.
The main idea of the method is to sort task commands with 'waiting'
flag by 'unique_key' property in order guarantee the same locking
order for them in parallel transactions and thereby prevent deadlocks.
It also removes commands that don't make sense. For example, if
there are some commands after a command that changes a workflow state
then they must not be dispatched.
"""
# Remove all 'noop' commands.
cmds = list(filter(lambda c: not isinstance(c, commands.Noop), cmds))
state_cmd_idx = -1
state_cmd = None
for i, cmd in enumerate(cmds):
if isinstance(cmd, commands.SetWorkflowState):
state_cmd_idx = i
state_cmd = cmd
break
# Find a position of a 'fail|succeed|pause' command
# and sort all task commands before it.
if state_cmd_idx < 0:
cmds.sort(key=functools.cmp_to_key(_compare_task_commands))
return cmds
elif state_cmd_idx == 0:
return cmds[0:1]
res = cmds[0:state_cmd_idx]
res.sort(key=functools.cmp_to_key(_compare_task_commands))
res.append(state_cmd)
return res
@profiler.trace('dispatcher-dispatch-commands') @profiler.trace('dispatcher-dispatch-commands')
def dispatch_workflow_commands(wf_ex, wf_cmds): def dispatch_workflow_commands(wf_ex, wf_cmds):
# TODO(rakhmerov): I don't like these imports but otherwise we have # TODO(rakhmerov): I don't like these imports but otherwise we have
@ -30,14 +89,11 @@ def dispatch_workflow_commands(wf_ex, wf_cmds):
if not wf_cmds: if not wf_cmds:
return return
for cmd in wf_cmds: for cmd in _rearrange_commands(wf_cmds):
if isinstance(cmd, (commands.RunTask, commands.RunExistingTask)): if isinstance(cmd, (commands.RunTask, commands.RunExistingTask)):
task_handler.run_task(cmd) task_handler.run_task(cmd)
elif isinstance(cmd, commands.SetWorkflowState): elif isinstance(cmd, commands.SetWorkflowState):
wf_handler.set_workflow_state(wf_ex, cmd.new_state, cmd.msg) wf_handler.set_workflow_state(wf_ex, cmd.new_state, cmd.msg)
elif isinstance(cmd, commands.Noop):
# Do nothing.
pass
else: else:
raise exc.MistralError('Unsupported workflow command: %s' % cmd) raise exc.MistralError('Unsupported workflow command: %s' % cmd)

View File

@ -48,7 +48,7 @@ class OsloRPCServer(rpc_base.RPCServer):
rpc.get_transport(), rpc.get_transport(),
target, target,
self.endpoints, self.endpoints,
executor='eventlet', executor='blocking',
serializer=ctx.RpcContextSerializer(ctx.JsonPayloadSerializer()) serializer=ctx.RpcContextSerializer(ctx.JsonPayloadSerializer())
) )

View File

@ -72,20 +72,9 @@ def run_task(wf_cmd):
return return
if not task.task_ex: if task.is_waiting() and (task.is_created() or task.is_state_changed()):
# It is possible that task execution was not created
# (and therefore not associated with Task instance).
# For example, in case of 'join' that has already been
# created by a different transaction. In this case
# we should skip post completion scheduled checks.
return
if task.is_waiting():
_schedule_refresh_task_state(task.task_ex) _schedule_refresh_task_state(task.task_ex)
if task.is_completed():
wf_handler.schedule_on_task_complete(task.task_ex)
@profiler.trace('task-handler-on-action-complete') @profiler.trace('task-handler-on-action-complete')
def _on_action_complete(action_ex): def _on_action_complete(action_ex):
@ -128,9 +117,6 @@ def _on_action_complete(action_ex):
return return
if task.is_completed():
wf_handler.schedule_on_task_complete(task_ex)
def fail_task(task_ex, msg): def fail_task(task_ex, msg):
wf_spec = spec_parser.get_workflow_spec_by_execution_id( wf_spec = spec_parser.get_workflow_spec_by_execution_id(
@ -171,9 +157,6 @@ def continue_task(task_ex):
return return
if task.is_completed():
wf_handler.schedule_on_task_complete(task_ex)
def complete_task(task_ex, state, state_info): def complete_task(task_ex, state, state_info):
wf_spec = spec_parser.get_workflow_spec_by_execution_id( wf_spec = spec_parser.get_workflow_spec_by_execution_id(
@ -200,15 +183,12 @@ def complete_task(task_ex, state, state_info):
return return
if task.is_completed():
wf_handler.schedule_on_task_complete(task_ex)
def _build_task_from_execution(wf_spec, task_ex):
def _build_task_from_execution(wf_spec, task_ex, task_spec=None):
return _create_task( return _create_task(
task_ex.workflow_execution, task_ex.workflow_execution,
wf_spec, wf_spec,
task_spec or spec_parser.get_task_spec(task_ex.spec), wf_spec.get_task(task_ex.name),
task_ex.in_context, task_ex.in_context,
task_ex task_ex
) )
@ -223,7 +203,8 @@ def _build_task_from_command(cmd):
spec_parser.get_task_spec(cmd.task_ex.spec), spec_parser.get_task_spec(cmd.task_ex.spec),
cmd.ctx, cmd.ctx,
task_ex=cmd.task_ex, task_ex=cmd.task_ex,
unique_key=cmd.task_ex.unique_key unique_key=cmd.task_ex.unique_key,
waiting=cmd.task_ex.state == states.WAITING
) )
if cmd.reset: if cmd.reset:
@ -237,39 +218,26 @@ def _build_task_from_command(cmd):
cmd.wf_spec, cmd.wf_spec,
cmd.task_spec, cmd.task_spec,
cmd.ctx, cmd.ctx,
unique_key=cmd.unique_key unique_key=cmd.unique_key,
waiting=cmd.is_waiting()
) )
if cmd.is_waiting():
task.defer()
return task return task
raise exc.MistralError('Unsupported workflow command: %s' % cmd) raise exc.MistralError('Unsupported workflow command: %s' % cmd)
def _create_task(wf_ex, wf_spec, task_spec, ctx, task_ex=None, def _create_task(wf_ex, wf_spec, task_spec, ctx, task_ex=None,
unique_key=None): unique_key=None, waiting=False):
if task_spec.get_with_items(): if task_spec.get_with_items():
return tasks.WithItemsTask( cls = tasks.WithItemsTask
wf_ex, else:
wf_spec, cls = tasks.RegularTask
task_spec,
ctx,
task_ex,
unique_key
)
return tasks.RegularTask( return cls(wf_ex, wf_spec, task_spec, ctx, task_ex, unique_key, waiting)
wf_ex,
wf_spec,
task_spec,
ctx,
task_ex,
unique_key
)
@profiler.trace('task-handler-refresh-task-state')
def _refresh_task_state(task_ex_id): def _refresh_task_state(task_ex_id):
with db_api.transaction(): with db_api.transaction():
task_ex = db_api.get_task_execution(task_ex_id) task_ex = db_api.get_task_execution(task_ex_id)
@ -315,7 +283,6 @@ def _schedule_refresh_task_state(task_ex, delay=0):
:param task_ex: Task execution. :param task_ex: Task execution.
:param delay: Delay. :param delay: Delay.
:return:
""" """
key = 'th_c_t_s_a-%s' % task_ex.id key = 'th_c_t_s_a-%s' % task_ex.id
@ -323,7 +290,7 @@ def _schedule_refresh_task_state(task_ex, delay=0):
None, None,
_REFRESH_TASK_STATE_PATH, _REFRESH_TASK_STATE_PATH,
delay, delay,
unique_key=key, key=key,
task_ex_id=task_ex.id task_ex_id=task_ex.id
) )
@ -366,7 +333,7 @@ def schedule_on_action_complete(action_ex, delay=0):
None, None,
_SCHEDULED_ON_ACTION_COMPLETE_PATH, _SCHEDULED_ON_ACTION_COMPLETE_PATH,
delay, delay,
unique_key=key, key=key,
action_ex_id=action_ex.id, action_ex_id=action_ex.id,
wf_action=isinstance(action_ex, models.WorkflowExecution) wf_action=isinstance(action_ex, models.WorkflowExecution)
) )

View File

@ -47,15 +47,17 @@ class Task(object):
@profiler.trace('task-create') @profiler.trace('task-create')
def __init__(self, wf_ex, wf_spec, task_spec, ctx, task_ex=None, def __init__(self, wf_ex, wf_spec, task_spec, ctx, task_ex=None,
unique_key=None): unique_key=None, waiting=False):
self.wf_ex = wf_ex self.wf_ex = wf_ex
self.task_spec = task_spec self.task_spec = task_spec
self.ctx = ctx self.ctx = ctx
self.task_ex = task_ex self.task_ex = task_ex
self.wf_spec = wf_spec self.wf_spec = wf_spec
self.unique_key = unique_key self.unique_key = unique_key
self.waiting = False self.waiting = waiting
self.reset_flag = False self.reset_flag = False
self.created = False
self.state_changed = False
def is_completed(self): def is_completed(self):
return self.task_ex and states.is_completed(self.task_ex.state) return self.task_ex and states.is_completed(self.task_ex.state)
@ -63,6 +65,12 @@ class Task(object):
def is_waiting(self): def is_waiting(self):
return self.waiting return self.waiting
def is_created(self):
return self.created
def is_state_changed(self):
return self.state_changed
@abc.abstractmethod @abc.abstractmethod
def on_action_complete(self, action_ex): def on_action_complete(self, action_ex):
"""Handle action completion. """Handle action completion.
@ -82,21 +90,24 @@ class Task(object):
This method puts task to a waiting state. This method puts task to a waiting state.
""" """
with db_api.named_lock(self.unique_key):
if not self.task_ex: if not self.task_ex:
t_execs = db_api.get_task_executions( t_execs = db_api.get_task_executions(
workflow_execution_id=self.wf_ex.id, workflow_execution_id=self.wf_ex.id,
name=self.task_spec.get_name() unique_key=self.unique_key
) )
self.task_ex = t_execs[0] if t_execs else None self.task_ex = t_execs[0] if t_execs else None
msg = 'Task is waiting.'
if not self.task_ex: if not self.task_ex:
self._create_task_execution() self._create_task_execution(
state=states.WAITING,
if self.task_ex: state_info=msg
self.set_state(states.WAITING, 'Task is deferred.') )
elif self.task_ex.state != states.WAITING:
self.waiting = True self.set_state(states.WAITING, msg)
def reset(self): def reset(self):
self.reset_flag = True self.reset_flag = True
@ -124,6 +135,8 @@ class Task(object):
state_info) state_info)
) )
self.state_changed = True
self.task_ex.state = state self.task_ex.state = state
self.task_ex.state_info = state_info self.task_ex.state_info = state_info
@ -172,7 +185,7 @@ class Task(object):
wf_ctrl = wf_base.get_controller(self.wf_ex, self.wf_spec) wf_ctrl = wf_base.get_controller(self.wf_ex, self.wf_spec)
# Calculate commands to process next. # Calculate commands to process next.
cmds = wf_ctrl.continue_workflow(self.task_ex) cmds = wf_ctrl.continue_workflow(task_ex=self.task_ex)
# Mark task as processed after all decisions have been made # Mark task as processed after all decisions have been made
# upon its completion. # upon its completion.
@ -192,14 +205,20 @@ class Task(object):
for p in policies.build_policies(policies_spec, self.wf_spec): for p in policies.build_policies(policies_spec, self.wf_spec):
p.after_task_complete(self.task_ex, self.task_spec) p.after_task_complete(self.task_ex, self.task_spec)
def _create_task_execution(self, state=states.RUNNING): def _create_task_execution(self, state=states.RUNNING, state_info=None):
task_id = utils.generate_unicode_uuid()
task_name = self.task_spec.get_name()
data_flow.add_current_task_to_context(self.ctx, task_id, task_name)
values = { values = {
'id': utils.generate_unicode_uuid(), 'id': task_id,
'name': self.task_spec.get_name(), 'name': task_name,
'workflow_execution_id': self.wf_ex.id, 'workflow_execution_id': self.wf_ex.id,
'workflow_name': self.wf_ex.workflow_name, 'workflow_name': self.wf_ex.workflow_name,
'workflow_id': self.wf_ex.workflow_id, 'workflow_id': self.wf_ex.workflow_id,
'state': state, 'state': state,
'state_info': state_info,
'spec': self.task_spec.to_dict(), 'spec': self.task_spec.to_dict(),
'unique_key': self.unique_key, 'unique_key': self.unique_key,
'in_context': self.ctx, 'in_context': self.ctx,
@ -208,23 +227,13 @@ class Task(object):
'project_id': self.wf_ex.project_id 'project_id': self.wf_ex.project_id
} }
db_api.insert_or_ignore_task_execution(values) self.task_ex = db_api.create_task_execution(values)
# Since 'insert_or_ignore' cannot return a valid count of updated
# rows the only reliable way to check if insert operation has created
# an object is try to load this object by just generated uuid.
task_ex = db_api.load_task_execution(values['id'])
if not task_ex:
return False
self.task_ex = task_ex
# Add to collection explicitly so that it's in a proper # Add to collection explicitly so that it's in a proper
# state within the current session. # state within the current session.
self.wf_ex.task_executions.append(self.task_ex) self.wf_ex.task_executions.append(self.task_ex)
return True self.created = True
def _get_action_defaults(self): def _get_action_defaults(self):
action_name = self.task_spec.get_action_name() action_name = self.task_spec.get_action_name()
@ -262,12 +271,12 @@ class RegularTask(Task):
self._run_existing() self._run_existing()
def _run_new(self): def _run_new(self):
if not self._create_task_execution(): if self.waiting:
# Task with the same unique key has already been created. self.defer()
return return
if self.waiting: self._create_task_execution()
return
LOG.debug( LOG.debug(
'Starting task [workflow=%s, task_spec=%s, init_state=%s]' % 'Starting task [workflow=%s, task_spec=%s, init_state=%s]' %

View File

@ -27,7 +27,9 @@ from mistral.workflow import states
LOG = logging.getLogger(__name__) LOG = logging.getLogger(__name__)
_ON_TASK_COMPLETE_PATH = 'mistral.engine.workflow_handler._on_task_complete' _CHECK_AND_COMPLETE_PATH = (
'mistral.engine.workflow_handler._check_and_complete'
)
@profiler.trace('workflow-handler-start-workflow') @profiler.trace('workflow-handler-start-workflow')
@ -38,6 +40,8 @@ def start_workflow(wf_identifier, wf_input, desc, params):
wf.start(wf_input, desc=desc, params=params) wf.start(wf_input, desc=desc, params=params)
_schedule_check_and_complete(wf.wf_ex)
return wf.wf_ex return wf.wf_ex
@ -73,13 +77,11 @@ def cancel_workflow(wf_ex, msg=None):
stop_workflow(wf_ex, states.CANCELLED, msg) stop_workflow(wf_ex, states.CANCELLED, msg)
@profiler.trace('workflow-handler-on-task-complete') @profiler.trace('workflow-handler-check-and-complete')
def _on_task_complete(task_ex_id): def _check_and_complete(wf_ex_id):
# Note: This method can only be called via scheduler. # Note: This method can only be called via scheduler.
with db_api.transaction(): with db_api.transaction():
task_ex = db_api.get_task_execution(task_ex_id) wf_ex = db_api.get_workflow_execution(wf_ex_id)
wf_ex = task_ex.workflow_execution
wf = workflows.Workflow( wf = workflows.Workflow(
db_api.get_workflow_definition(wf_ex.workflow_id), db_api.get_workflow_definition(wf_ex.workflow_id),
@ -87,11 +89,11 @@ def _on_task_complete(task_ex_id):
) )
try: try:
wf.on_task_complete(task_ex) wf.check_and_complete()
except exc.MistralException as e: except exc.MistralException as e:
msg = ( msg = (
"Failed to handle task completion [wf_ex=%s, task_ex=%s]:" "Failed to check and complete [wf_ex=%s]:"
" %s\n%s" % (wf_ex, task_ex, e, tb.format_exc()) " %s\n%s" % (wf_ex, e, tb.format_exc())
) )
LOG.error(msg) LOG.error(msg)
@ -105,7 +107,7 @@ def _on_task_complete(task_ex_id):
# algorithm for increasing delay for rescheduling so that we don't # algorithm for increasing delay for rescheduling so that we don't
# put too serious load onto scheduler. # put too serious load onto scheduler.
delay = 1 delay = 1
schedule_on_task_complete(task_ex, delay) _schedule_check_and_complete(wf_ex, delay)
def pause_workflow(wf_ex, msg=None): def pause_workflow(wf_ex, msg=None):
@ -128,6 +130,11 @@ def rerun_workflow(wf_ex, task_ex, reset=True, env=None):
wf.rerun(task_ex, reset=reset, env=env) wf.rerun(task_ex, reset=reset, env=env)
_schedule_check_and_complete(wf_ex)
if wf_ex.task_execution_id:
_schedule_check_and_complete(wf_ex.task_execution.workflow_execution)
def resume_workflow(wf_ex, env=None): def resume_workflow(wf_ex, env=None):
if not states.is_paused_or_idle(wf_ex.state): if not states.is_paused_or_idle(wf_ex.state):
@ -153,9 +160,9 @@ def set_workflow_state(wf_ex, state, msg=None):
) )
@profiler.trace('workflow-handler-schedule-on-task-complete') @profiler.trace('workflow-handler-schedule-check-and-complete')
def schedule_on_task_complete(task_ex, delay=0): def _schedule_check_and_complete(wf_ex, delay=0):
"""Schedules task completion check. """Schedules workflow completion check.
This method provides transactional decoupling of task completion from This method provides transactional decoupling of task completion from
workflow completion check. It's needed in non-locking model in order to workflow completion check. It's needed in non-locking model in order to
@ -165,16 +172,17 @@ def schedule_on_task_complete(task_ex, delay=0):
have in this case (time between transactions) whereas scheduler is a have in this case (time between transactions) whereas scheduler is a
special component that is designed to be resistant to failures. special component that is designed to be resistant to failures.
:param task_ex: Task execution. :param wf_ex: Workflow execution.
:param delay: Minimum amount of time before task completion check :param delay: Minimum amount of time before task completion check
should be made. should be made.
""" """
key = 'wfh_on_t_c-%s' % task_ex.workflow_execution_id # TODO(rakhmerov): update docstring
key = 'wfh_on_c_a_c-%s' % wf_ex.id
scheduler.schedule_call( scheduler.schedule_call(
None, None,
_ON_TASK_COMPLETE_PATH, _CHECK_AND_COMPLETE_PATH,
delay, delay,
unique_key=key, key=key,
task_ex_id=task_ex.id wf_ex_id=wf_ex.id
) )

View File

@ -56,6 +56,14 @@ class Workflow(object):
def __init__(self, wf_def, wf_ex=None): def __init__(self, wf_def, wf_ex=None):
self.wf_def = wf_def self.wf_def = wf_def
self.wf_ex = wf_ex self.wf_ex = wf_ex
if wf_ex:
# We're processing a workflow that's already in progress.
self.wf_spec = spec_parser.get_workflow_spec_by_execution_id(
wf_ex.id
)
else:
# New workflow execution.
self.wf_spec = spec_parser.get_workflow_spec_by_definition_id( self.wf_spec = spec_parser.get_workflow_spec_by_definition_id(
wf_def.id, wf_def.id,
wf_def.updated_at wf_def.updated_at
@ -115,17 +123,6 @@ class Workflow(object):
elif state == states.CANCELLED: elif state == states.CANCELLED:
return self._cancel_workflow(msg) return self._cancel_workflow(msg)
@profiler.trace('workflow-on-task-complete')
def on_task_complete(self, task_ex):
"""Handle task completion event.
:param task_ex: Task execution that's completed.
"""
assert self.wf_ex
self._check_and_complete()
def resume(self, env=None): def resume(self, env=None):
"""Resume workflow. """Resume workflow.
@ -190,10 +187,10 @@ class Workflow(object):
if states.is_completed(t_ex.state) and not t_ex.processed: if states.is_completed(t_ex.state) and not t_ex.processed:
t_ex.processed = True t_ex.processed = True
if cmds:
dispatcher.dispatch_workflow_commands(self.wf_ex, cmds) dispatcher.dispatch_workflow_commands(self.wf_ex, cmds)
else:
if not cmds: self.check_and_complete()
self._check_and_complete()
@profiler.trace('workflow-lock') @profiler.trace('workflow-lock')
def lock(self): def lock(self):
@ -276,13 +273,16 @@ class Workflow(object):
parent_task_ex.state_info = None parent_task_ex.state_info = None
parent_task_ex.processed = False parent_task_ex.processed = False
def _check_and_complete(self): @profiler.trace('workflow-check-and-complete')
def check_and_complete(self):
if states.is_paused_or_completed(self.wf_ex.state): if states.is_paused_or_completed(self.wf_ex.state):
return return
# Workflow is not completed if there are any incomplete task # Workflow is not completed if there are any incomplete task
# executions. # executions.
incomplete_tasks = wf_utils.find_incomplete_task_executions(self.wf_ex) incomplete_tasks = db_api.get_incomplete_task_executions(
workflow_execution_id=self.wf_ex.id,
)
if incomplete_tasks: if incomplete_tasks:
return return

View File

@ -37,7 +37,7 @@ _schedulers = {}
def schedule_call(factory_method_path, target_method_name, def schedule_call(factory_method_path, target_method_name,
run_after, serializers=None, unique_key=None, **method_args): run_after, serializers=None, key=None, **method_args):
"""Schedules call and lately invokes target_method. """Schedules call and lately invokes target_method.
Add this call specification to DB, and then after run_after Add this call specification to DB, and then after run_after
@ -54,11 +54,8 @@ def schedule_call(factory_method_path, target_method_name,
{ "result": "mistral.utils.serializer.ResultSerializer"} { "result": "mistral.utils.serializer.ResultSerializer"}
Serializer for the object type must implement serializer interface Serializer for the object type must implement serializer interface
in mistral/utils/serializer.py in mistral/utils/serializer.py
:param unique_key: Unique key which in combination with 'processing' :param key: Key which can potentially be used for squashing similar
flag restricts a number of delayed calls if it's passed. For example, delayed calls.
if we schedule two calls but pass the same unique key for them then
we won't get two of them in DB if both have same value of 'processing'
flag.
:param method_args: Target method keyword arguments. :param method_args: Target method keyword arguments.
""" """
ctx_serializer = context.RpcContextSerializer( ctx_serializer = context.RpcContextSerializer(
@ -95,12 +92,12 @@ def schedule_call(factory_method_path, target_method_name,
'execution_time': execution_time, 'execution_time': execution_time,
'auth_context': ctx, 'auth_context': ctx,
'serializers': serializers, 'serializers': serializers,
'unique_key': unique_key, 'key': key,
'method_arguments': method_args, 'method_arguments': method_args,
'processing': False 'processing': False
} }
db_api.insert_or_ignore_delayed_call(values) db_api.create_delayed_call(values)
class CallScheduler(periodic_task.PeriodicTasks): class CallScheduler(periodic_task.PeriodicTasks):

View File

@ -15,20 +15,39 @@
from oslo_config import cfg from oslo_config import cfg
from oslo_utils import timeutils
from mistral.db.v2.sqlalchemy import api as db_api from mistral.db.v2.sqlalchemy import api as db_api
from mistral.db.v2.sqlalchemy import models as db_models from mistral.db.v2.sqlalchemy import models as db_models
from mistral.tests.unit import base as test_base from mistral.tests.unit import base as test_base
WF_EX = {
'id': '1',
'spec': {},
'start_params': {'task': 'my_task1'},
'state': 'IDLE',
'state_info': "Running...",
'created_at': None,
'updated_at': None,
'context': None,
'task_id': None,
'trust_id': None,
'description': None,
'output': None
}
DELAYED_CALL = {
'factory_method_path': 'my_factory_method', TASK_EX = {
'target_method_name': 'my_target_method', 'workflow_execution_id': '1',
'method_arguments': None, 'workflow_name': 'my_wf',
'serializers': None, 'name': 'my_task1',
'auth_context': None, 'spec': None,
'execution_time': timeutils.utcnow() 'action_spec': None,
'state': 'IDLE',
'tags': ['deployment'],
'in_context': None,
'runtime_context': None,
'created_at': None,
'updated_at': None
} }
@ -38,6 +57,8 @@ class InsertOrIgnoreTest(test_base.DbTestCase):
cfg.CONF.set_default('auth_enable', True, group='pecan') cfg.CONF.set_default('auth_enable', True, group='pecan')
db_api.create_workflow_execution(WF_EX)
self.addCleanup( self.addCleanup(
cfg.CONF.set_default, cfg.CONF.set_default,
'auth_enable', 'auth_enable',
@ -47,46 +68,46 @@ class InsertOrIgnoreTest(test_base.DbTestCase):
def test_insert_or_ignore_without_conflicts(self): def test_insert_or_ignore_without_conflicts(self):
db_api.insert_or_ignore( db_api.insert_or_ignore(
db_models.DelayedCall, db_models.TaskExecution,
DELAYED_CALL.copy() TASK_EX.copy()
) )
delayed_calls = db_api.get_delayed_calls() task_execs = db_api.get_task_executions()
self.assertEqual(1, len(delayed_calls)) self.assertEqual(1, len(task_execs))
delayed_call = delayed_calls[0] task_ex = task_execs[0]
self._assert_dict_contains_subset(DELAYED_CALL, delayed_call.to_dict()) self._assert_dict_contains_subset(TASK_EX, task_ex.to_dict())
def test_insert_or_ignore_with_conflicts(self): def test_insert_or_ignore_with_conflicts(self):
# Insert the first object. # Insert the first object.
values = DELAYED_CALL.copy() values = TASK_EX.copy()
values['unique_key'] = 'key' values['unique_key'] = 'key'
db_api.insert_or_ignore(db_models.DelayedCall, values) db_api.insert_or_ignore(db_models.TaskExecution, values)
delayed_calls = db_api.get_delayed_calls() task_execs = db_api.get_task_executions()
self.assertEqual(1, len(delayed_calls)) self.assertEqual(1, len(task_execs))
delayed_call = delayed_calls[0] task_ex = task_execs[0]
self._assert_dict_contains_subset(DELAYED_CALL, delayed_call.to_dict()) self._assert_dict_contains_subset(TASK_EX, task_ex.to_dict())
# Insert the second object with the same unique key. # Insert the second object with the same unique key.
# We must not get exceptions and new object must not be saved. # We must not get exceptions and new object must not be saved.
values = DELAYED_CALL.copy() values = TASK_EX.copy()
values['unique_key'] = 'key' values['unique_key'] = 'key'
db_api.insert_or_ignore(db_models.DelayedCall, values) db_api.insert_or_ignore(db_models.TaskExecution, values)
delayed_calls = db_api.get_delayed_calls() task_execs = db_api.get_task_executions()
self.assertEqual(1, len(delayed_calls)) self.assertEqual(1, len(task_execs))
delayed_call = delayed_calls[0] task_ex = task_execs[0]
self._assert_dict_contains_subset(DELAYED_CALL, delayed_call.to_dict()) self._assert_dict_contains_subset(TASK_EX, task_ex.to_dict())

View File

@ -2246,13 +2246,13 @@ class LockTest(SQLAlchemyTest):
self.assertEqual('lock1', locks[0].name) self.assertEqual('lock1', locks[0].name)
db_api.delete_named_lock('invalid_lock_name') db_api.delete_named_lock('invalid_lock_id')
locks = db_api.get_named_locks() locks = db_api.get_named_locks()
self.assertEqual(1, len(locks)) self.assertEqual(1, len(locks))
db_api.delete_named_lock(locks[0].name) db_api.delete_named_lock(locks[0].id)
locks = db_api.get_named_locks() locks = db_api.get_named_locks()

View File

@ -141,8 +141,8 @@ class EngineTestCase(base.DbTestCase):
for w in wf_execs: for w in wf_execs:
print( print(
"\n%s [state=%s, state_info=%s, output=%s]" % "\n%s (%s) [state=%s, state_info=%s, output=%s]" %
(w.name, w.state, w.state_info, w.output) (w.name, w.id, w.state, w.state_info, w.output)
) )
for t in w.task_executions: for t in w.task_executions:

View File

@ -163,6 +163,7 @@ class DataFlowEngineTest(engine_test_base.EngineTestCase):
self.assertDictEqual(exp_published_arr[0], task1.published) self.assertDictEqual(exp_published_arr[0], task1.published)
self.assertDictEqual(exp_published_arr[1], task2.published) self.assertDictEqual(exp_published_arr[1], task2.published)
self.assertDictEqual(exp_published_arr[2], task3.published) self.assertDictEqual(exp_published_arr[2], task3.published)
self.assertIn(exp_published_arr[0]['progress'], notify_published_arr) self.assertIn(exp_published_arr[0]['progress'], notify_published_arr)
self.assertIn(exp_published_arr[1]['progress'], notify_published_arr) self.assertIn(exp_published_arr[1]['progress'], notify_published_arr)
self.assertIn(exp_published_arr[2]['progress'], notify_published_arr) self.assertIn(exp_published_arr[2]['progress'], notify_published_arr)

View File

@ -341,46 +341,3 @@ class SchedulerServiceTest(base.DbTestCase):
db_api.get_delayed_call(calls[0].id) db_api.get_delayed_call(calls[0].id)
db_api.delete_delayed_call(calls[0].id) db_api.delete_delayed_call(calls[0].id)
def test_schedule_with_unique_key(self):
method_args = {'name': 'task', 'id': '321'}
key = 'my_unique_key'
scheduler.schedule_call(
None,
TARGET_METHOD_PATH,
DELAY,
unique_key=key,
**method_args
)
self.assertEqual(1, len(db_api.get_delayed_calls()))
# Schedule the call for the second time, number of calls
# must not change due to the same unique key.
scheduler.schedule_call(
None,
TARGET_METHOD_PATH,
DELAY,
unique_key=key,
**method_args
)
calls = db_api.get_delayed_calls()
self.assertEqual(1, len(calls))
# Now change 'processing' flag and make sure we can schedule
# one more call because DB constraint allows it.
db_api.update_delayed_call(calls[0].id, {'processing': True})
scheduler.schedule_call(
None,
TARGET_METHOD_PATH,
DELAY,
unique_key=key,
**method_args
)
self.assertEqual(2, len(db_api.get_delayed_calls()))

View File

@ -0,0 +1,75 @@
# Copyright 2013 - Mirantis, Inc.
# Copyright 2015 - StackStorm, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from mistral.engine import dispatcher
from mistral.tests.unit import base
from mistral.workflow import commands
def _print_commands(cmds):
print("commands:")
for cmd in cmds:
if isinstance(cmd, commands.RunTask):
print("%s, %s, %s" % (type(cmd), cmd.is_waiting(), cmd.unique_key))
else:
print("%s" % type(cmd))
class CommandDispatcherTest(base.BaseTest):
def setUp(self):
super(CommandDispatcherTest, self).setUp()
def test_rearrange_commands(self):
no_wait = commands.RunTask(None, None, None, None)
fail = commands.FailWorkflow(None, None, None, None)
succeed = commands.SucceedWorkflow(None, None, None, None)
wait1 = commands.RunTask(None, None, None, None)
wait1.wait = True
wait1.unique_key = 'wait1'
wait2 = commands.RunTask(None, None, None, None)
wait2.wait = True
wait2.unique_key = 'wait2'
wait3 = commands.RunTask(None, None, None, None)
wait3.wait = True
wait3.unique_key = 'wait3'
# 'set state' command is the first, others must be ignored.
initial = [fail, no_wait, wait1, wait3, wait2]
expected = [fail]
cmds = dispatcher._rearrange_commands(initial)
self.assertEqual(expected, cmds)
# 'set state' command is the last, tasks before it must be sorted.
initial = [no_wait, wait2, wait1, wait3, succeed]
expected = [no_wait, wait1, wait2, wait3, succeed]
cmds = dispatcher._rearrange_commands(initial)
self.assertEqual(expected, cmds)
# 'set state' command is in the middle, tasks before it must be sorted
# and the task after it must be ignored.
initial = [wait3, wait2, no_wait, succeed, wait1]
expected = [no_wait, wait2, wait3, succeed]
cmds = dispatcher._rearrange_commands(initial)
self.assertEqual(expected, cmds)

View File

@ -20,6 +20,8 @@ from mistral import utils
from mistral.workflow import utils as wf_utils from mistral.workflow import utils as wf_utils
from oslo_serialization import jsonutils from oslo_serialization import jsonutils
from stevedore import extension from stevedore import extension
ROOT_CONTEXT = None ROOT_CONTEXT = None
@ -37,6 +39,7 @@ def get_yaql_context(data_context):
if isinstance(data_context, dict): if isinstance(data_context, dict):
new_ctx['__env'] = data_context.get('__env') new_ctx['__env'] = data_context.get('__env')
new_ctx['__execution'] = data_context.get('__execution') new_ctx['__execution'] = data_context.get('__execution')
new_ctx['__task_execution'] = data_context.get('__task_execution')
return new_ctx return new_ctx
@ -86,6 +89,13 @@ def task_(context, task_name):
wf_ex = db_api.get_workflow_execution(context['__execution']['id']) wf_ex = db_api.get_workflow_execution(context['__execution']['id'])
# This section may not exist in a context if it's calculated not in
# task scope.
cur_task = context['__task_execution']
if cur_task and cur_task['name'] == task_name:
task_ex = db_api.get_task_execution(cur_task['id'])
else:
task_execs = wf_utils.find_task_executions_by_name(wf_ex, task_name) task_execs = wf_utils.find_task_executions_by_name(wf_ex, task_name)
# TODO(rakhmerov): Account for multiple executions (i.e. in case of # TODO(rakhmerov): Account for multiple executions (i.e. in case of

View File

@ -130,6 +130,9 @@ class WorkflowSpec(base.BaseSpec):
def get_tasks(self): def get_tasks(self):
return self._tasks return self._tasks
def get_task(self, name):
return self._tasks[name]
class DirectWorkflowSpec(WorkflowSpec): class DirectWorkflowSpec(WorkflowSpec):
_polymorphic_value = 'direct' _polymorphic_value = 'direct'

View File

@ -138,6 +138,15 @@ def evaluate_workflow_output(wf_spec, ctx):
return output or ctx return output or ctx
def add_current_task_to_context(ctx, task_id, task_name):
ctx['__task_execution'] = {
'id': task_id,
'name': task_name
}
return ctx
def add_openstack_data_to_context(wf_ex): def add_openstack_data_to_context(wf_ex):
wf_ex.context = wf_ex.context or {} wf_ex.context = wf_ex.context or {}

View File

@ -113,6 +113,8 @@ class DirectWorkflowController(base.WorkflowController):
cmds = [] cmds = []
ctx = data_flow.evaluate_task_outbound_context(task_ex)
for t_n, params in self._find_next_tasks(task_ex): for t_n, params in self._find_next_tasks(task_ex):
t_s = self.wf_spec.get_tasks()[t_n] t_s = self.wf_spec.get_tasks()[t_n]
@ -126,7 +128,7 @@ class DirectWorkflowController(base.WorkflowController):
self.wf_ex, self.wf_ex,
self.wf_spec, self.wf_spec,
t_s, t_s,
data_flow.evaluate_task_outbound_context(task_ex), ctx,
params params
) )

View File

@ -40,7 +40,7 @@ class ReverseWorkflowController(base.WorkflowController):
__workflow_type__ = "reverse" __workflow_type__ = "reverse"
def _find_next_commands(self, task_ex=None): def _find_next_commands(self, task_ex):
"""Finds all tasks with resolved dependencies. """Finds all tasks with resolved dependencies.
This method finds all tasks with resolved dependencies and This method finds all tasks with resolved dependencies and

View File

@ -73,6 +73,12 @@ class ResultSerializer(serializers.Serializer):
) )
# TODO(rakhmerov): Most of these 'find..' methods should be replaced
# with corresponding methods on DB API because in most cases
# we don't need to fetch the whole collection of task executions
# from DB. We can just query only needed objects by needed criteria.
# In practice it leads to serious performance loss.
def find_task_execution_not_state(wf_ex, task_spec, state): def find_task_execution_not_state(wf_ex, task_spec, state):
task_execs = [ task_execs = [
t for t in wf_ex.task_executions t for t in wf_ex.task_executions
@ -126,11 +132,6 @@ def find_successful_task_executions(wf_ex):
return find_task_executions_with_state(wf_ex, states.SUCCESS) return find_task_executions_with_state(wf_ex, states.SUCCESS)
def find_incomplete_task_executions(wf_ex):
return [t for t in wf_ex.task_executions
if not states.is_completed(t.state)]
def find_error_task_executions(wf_ex): def find_error_task_executions(wf_ex):
return find_task_executions_with_state(wf_ex, states.ERROR) return find_task_executions_with_state(wf_ex, states.ERROR)