diff --git a/mistral/db/sqlalchemy/migration/alembic_migrations/versions/018_increate_task_execution_unique_key_size.py b/mistral/db/sqlalchemy/migration/alembic_migrations/versions/018_increate_task_execution_unique_key_size.py new file mode 100644 index 000000000..ee7868f64 --- /dev/null +++ b/mistral/db/sqlalchemy/migration/alembic_migrations/versions/018_increate_task_execution_unique_key_size.py @@ -0,0 +1,33 @@ +# Copyright 2016 OpenStack Foundation. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""increate_task_execution_unique_key_size + +Revision ID: 018 +Revises: 017 +Create Date: 2016-08-17 17:47:30.325182 + +""" + +# revision identifiers, used by Alembic. +revision = '018' +down_revision = '017' + +from alembic import op +import sqlalchemy as sa + + +def upgrade(): + op.alter_column('task_executions_v2', 'unique_key', type_=sa.String(250)) diff --git a/mistral/db/sqlalchemy/migration/alembic_migrations/versions/019_change_scheduler_schema.py b/mistral/db/sqlalchemy/migration/alembic_migrations/versions/019_change_scheduler_schema.py new file mode 100644 index 000000000..e53214572 --- /dev/null +++ b/mistral/db/sqlalchemy/migration/alembic_migrations/versions/019_change_scheduler_schema.py @@ -0,0 +1,49 @@ +# Copyright 2016 OpenStack Foundation. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Change scheduler schema. + +Revision ID: 019 +Revises: 018 +Create Date: 2016-08-17 17:54:51.952949 + +""" + +# revision identifiers, used by Alembic. +revision = '019' +down_revision = '018' + +from alembic import op +import sqlalchemy as sa + + +def upgrade(): + op.drop_index( + 'delayed_calls_v2_processing_execution_time', + table_name='delayed_calls_v2' + ) + op.drop_index('unique_key', table_name='delayed_calls_v2') + op.drop_column('delayed_calls_v2', 'unique_key') + + op.add_column( + 'delayed_calls_v2', + sa.Column('key', sa.String(length=250), nullable=True) + ) + op.create_index( + 'delayed_calls_v2_execution_time', + 'delayed_calls_v2', + ['execution_time'], + unique=False + ) diff --git a/mistral/db/v2/api.py b/mistral/db/v2/api.py index a9f6e1655..fe88d9afe 100644 --- a/mistral/db/v2/api.py +++ b/mistral/db/v2/api.py @@ -308,14 +308,14 @@ def get_task_executions(limit=None, marker=None, sort_keys=['created_at'], ) +def get_incomplete_task_executions(**kwargs): + return IMPL.get_incomplete_task_executions(**kwargs) + + def create_task_execution(values): return IMPL.create_task_execution(values) -def insert_or_ignore_task_execution(values): - return IMPL.insert_or_ignore_task_execution(values) - - def update_task_execution(id, values): return IMPL.update_task_execution(id, values) @@ -342,10 +342,6 @@ def create_delayed_call(values): return IMPL.create_delayed_call(values) -def insert_or_ignore_delayed_call(values): - return IMPL.insert_or_ignore_delayed_call(values) - - def delete_delayed_call(id): return IMPL.delete_delayed_call(id) @@ -533,8 +529,8 @@ def get_named_locks(limit=None, marker=None): return IMPL.get_named_locks(limit=limit, marker=marker) -def delete_named_lock(name): - return IMPL.delete_named_lock(name) +def delete_named_lock(lock_id): + return IMPL.delete_named_lock(lock_id) @contextlib.contextmanager diff --git a/mistral/db/v2/sqlalchemy/api.py b/mistral/db/v2/sqlalchemy/api.py index 48925af92..3d51e8500 100644 --- a/mistral/db/v2/sqlalchemy/api.py +++ b/mistral/db/v2/sqlalchemy/api.py @@ -34,6 +34,7 @@ from mistral.db.v2.sqlalchemy import filters as db_filters from mistral.db.v2.sqlalchemy import models from mistral import exceptions as exc from mistral.services import security +from mistral import utils CONF = cfg.CONF @@ -796,6 +797,23 @@ def get_task_executions(**kwargs): return _get_task_executions(**kwargs) +def get_incomplete_task_executions(**kwargs): + query = b.model_query(models.TaskExecution) + + query = query.filter_by(**kwargs) + + query = query.filter( + sa.or_( + models.TaskExecution.state == 'IDLE', + models.TaskExecution.state == 'RUNNING', + models.TaskExecution.state == 'WAITING', + models.TaskExecution.state == 'DELAYED' + ) + ) + + return query.all() + + @b.session_aware() def create_task_execution(values, session=None): task_ex = models.TaskExecution() @@ -812,10 +830,6 @@ def create_task_execution(values, session=None): return task_ex -def insert_or_ignore_task_execution(values): - insert_or_ignore(models.TaskExecution, values.copy()) - - @b.session_aware() def update_task_execution(id, values, session=None): task_ex = get_task_execution(id) @@ -866,10 +880,6 @@ def create_delayed_call(values, session=None): return delayed_call -def insert_or_ignore_delayed_call(values): - insert_or_ignore(models.DelayedCall, values.copy()) - - @b.session_aware() def delete_delayed_call(id, session=None): delayed_call = get_delayed_call(id) @@ -1388,9 +1398,17 @@ def create_named_lock(name, session=None): # session may not immediately issue an SQL query to a database # and instead just schedule it whereas we need to make sure to # issue a query immediately. + session.flush() + insert = models.NamedLock.__table__.insert() - session.execute(insert.values(name=name)) + lock_id = utils.generate_unicode_uuid() + + session.execute(insert.values(id=lock_id, name=name)) + + session.flush() + + return lock_id def get_named_locks(**kwargs): @@ -1398,22 +1416,27 @@ def get_named_locks(**kwargs): @b.session_aware() -def delete_named_lock(name, session=None): +def delete_named_lock(lock_id, session=None): # This method has to work without SQLAlchemy session because # session may not immediately issue an SQL query to a database # and instead just schedule it whereas we need to make sure to # issue a query immediately. + session.flush() + table = models.NamedLock.__table__ delete = table.delete() - session.execute(delete.where(table.c.name == name)) + session.execute(delete.where(table.c.id == lock_id)) + + session.flush() @contextlib.contextmanager def named_lock(name): + lock_id = None try: - create_named_lock(name) + lock_id = create_named_lock(name) yield finally: - delete_named_lock(name) + delete_named_lock(lock_id) diff --git a/mistral/db/v2/sqlalchemy/models.py b/mistral/db/v2/sqlalchemy/models.py index 3c84af5a5..fdfdcd091 100644 --- a/mistral/db/v2/sqlalchemy/models.py +++ b/mistral/db/v2/sqlalchemy/models.py @@ -227,7 +227,7 @@ class TaskExecution(Execution): # Main properties. action_spec = sa.Column(st.JsonLongDictType()) - unique_key = sa.Column(sa.String(200), nullable=True) + unique_key = sa.Column(sa.String(250), nullable=True) # Whether the task is fully processed (publishing and calculating commands # after it). It allows to simplify workflow controller implementations @@ -242,7 +242,7 @@ class TaskExecution(Execution): def executions(self): return ( self.action_executions - if self.spec.get('action') + if not self.spec.get('workflow') else self.workflow_executions ) @@ -328,26 +328,23 @@ class DelayedCall(mb.MistralModelBase): __tablename__ = 'delayed_calls_v2' - __table_args__ = ( - sa.Index( - '%s_processing_execution_time' % __tablename__, - 'processing', - 'execution_time' - ), - sa.UniqueConstraint('unique_key', 'processing') - ) - id = mb.id_column() factory_method_path = sa.Column(sa.String(200), nullable=True) target_method_name = sa.Column(sa.String(80), nullable=False) method_arguments = sa.Column(st.JsonDictType()) serializers = sa.Column(st.JsonDictType()) - unique_key = sa.Column(sa.String(80), nullable=True) + key = sa.Column(sa.String(250), nullable=True) auth_context = sa.Column(st.JsonDictType()) execution_time = sa.Column(sa.DateTime, nullable=False) processing = sa.Column(sa.Boolean, default=False, nullable=False) +sa.Index( + '%s_execution_time' % DelayedCall.__tablename__, + DelayedCall.execution_time +) + + class Environment(mb.MistralSecureModelBase): """Contains environment variables for workflow execution.""" diff --git a/mistral/engine/actions.py b/mistral/engine/actions.py index 53e8bbef6..8de2a0a69 100644 --- a/mistral/engine/actions.py +++ b/mistral/engine/actions.py @@ -174,9 +174,10 @@ class Action(object): if prev_state != state: wf_trace.info( None, - "Action '%s' (%s) [%s -> %s, %s]" % + "Action '%s' (%s)(task=%s) [%s -> %s, %s]" % (self.action_ex.name, self.action_ex.id, + self.task_ex.name if self.task_ex else None, prev_state, state, _result_msg()) diff --git a/mistral/engine/dispatcher.py b/mistral/engine/dispatcher.py index d5bed6521..32ad72c77 100644 --- a/mistral/engine/dispatcher.py +++ b/mistral/engine/dispatcher.py @@ -13,6 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. +import functools from osprofiler import profiler from mistral import exceptions as exc @@ -20,6 +21,64 @@ from mistral.workflow import commands from mistral.workflow import states +def _compare_task_commands(a, b): + if not isinstance(a, commands.RunTask) or not a.is_waiting(): + return -1 + + if not isinstance(b, commands.RunTask) or not b.is_waiting(): + return 1 + + if a.unique_key == b.unique_key: + return 0 + + if a.unique_key < b.unique_key: + return -1 + + return 1 + + +def _rearrange_commands(cmds): + """Takes workflow commands and does required pre-processing. + + The main idea of the method is to sort task commands with 'waiting' + flag by 'unique_key' property in order guarantee the same locking + order for them in parallel transactions and thereby prevent deadlocks. + It also removes commands that don't make sense. For example, if + there are some commands after a command that changes a workflow state + then they must not be dispatched. + """ + + # Remove all 'noop' commands. + cmds = list(filter(lambda c: not isinstance(c, commands.Noop), cmds)) + + state_cmd_idx = -1 + state_cmd = None + + for i, cmd in enumerate(cmds): + if isinstance(cmd, commands.SetWorkflowState): + state_cmd_idx = i + state_cmd = cmd + + break + + # Find a position of a 'fail|succeed|pause' command + # and sort all task commands before it. + if state_cmd_idx < 0: + cmds.sort(key=functools.cmp_to_key(_compare_task_commands)) + + return cmds + elif state_cmd_idx == 0: + return cmds[0:1] + + res = cmds[0:state_cmd_idx] + + res.sort(key=functools.cmp_to_key(_compare_task_commands)) + + res.append(state_cmd) + + return res + + @profiler.trace('dispatcher-dispatch-commands') def dispatch_workflow_commands(wf_ex, wf_cmds): # TODO(rakhmerov): I don't like these imports but otherwise we have @@ -30,14 +89,11 @@ def dispatch_workflow_commands(wf_ex, wf_cmds): if not wf_cmds: return - for cmd in wf_cmds: + for cmd in _rearrange_commands(wf_cmds): if isinstance(cmd, (commands.RunTask, commands.RunExistingTask)): task_handler.run_task(cmd) elif isinstance(cmd, commands.SetWorkflowState): wf_handler.set_workflow_state(wf_ex, cmd.new_state, cmd.msg) - elif isinstance(cmd, commands.Noop): - # Do nothing. - pass else: raise exc.MistralError('Unsupported workflow command: %s' % cmd) diff --git a/mistral/engine/rpc_backend/oslo/oslo_server.py b/mistral/engine/rpc_backend/oslo/oslo_server.py index 399a87e9c..c3e54212e 100644 --- a/mistral/engine/rpc_backend/oslo/oslo_server.py +++ b/mistral/engine/rpc_backend/oslo/oslo_server.py @@ -48,7 +48,7 @@ class OsloRPCServer(rpc_base.RPCServer): rpc.get_transport(), target, self.endpoints, - executor='eventlet', + executor='blocking', serializer=ctx.RpcContextSerializer(ctx.JsonPayloadSerializer()) ) diff --git a/mistral/engine/task_handler.py b/mistral/engine/task_handler.py index 0fb9995d7..4a630f46a 100644 --- a/mistral/engine/task_handler.py +++ b/mistral/engine/task_handler.py @@ -72,20 +72,9 @@ def run_task(wf_cmd): return - if not task.task_ex: - # It is possible that task execution was not created - # (and therefore not associated with Task instance). - # For example, in case of 'join' that has already been - # created by a different transaction. In this case - # we should skip post completion scheduled checks. - return - - if task.is_waiting(): + if task.is_waiting() and (task.is_created() or task.is_state_changed()): _schedule_refresh_task_state(task.task_ex) - if task.is_completed(): - wf_handler.schedule_on_task_complete(task.task_ex) - @profiler.trace('task-handler-on-action-complete') def _on_action_complete(action_ex): @@ -128,9 +117,6 @@ def _on_action_complete(action_ex): return - if task.is_completed(): - wf_handler.schedule_on_task_complete(task_ex) - def fail_task(task_ex, msg): wf_spec = spec_parser.get_workflow_spec_by_execution_id( @@ -171,9 +157,6 @@ def continue_task(task_ex): return - if task.is_completed(): - wf_handler.schedule_on_task_complete(task_ex) - def complete_task(task_ex, state, state_info): wf_spec = spec_parser.get_workflow_spec_by_execution_id( @@ -200,15 +183,12 @@ def complete_task(task_ex, state, state_info): return - if task.is_completed(): - wf_handler.schedule_on_task_complete(task_ex) - -def _build_task_from_execution(wf_spec, task_ex, task_spec=None): +def _build_task_from_execution(wf_spec, task_ex): return _create_task( task_ex.workflow_execution, wf_spec, - task_spec or spec_parser.get_task_spec(task_ex.spec), + wf_spec.get_task(task_ex.name), task_ex.in_context, task_ex ) @@ -223,7 +203,8 @@ def _build_task_from_command(cmd): spec_parser.get_task_spec(cmd.task_ex.spec), cmd.ctx, task_ex=cmd.task_ex, - unique_key=cmd.task_ex.unique_key + unique_key=cmd.task_ex.unique_key, + waiting=cmd.task_ex.state == states.WAITING ) if cmd.reset: @@ -237,39 +218,26 @@ def _build_task_from_command(cmd): cmd.wf_spec, cmd.task_spec, cmd.ctx, - unique_key=cmd.unique_key + unique_key=cmd.unique_key, + waiting=cmd.is_waiting() ) - if cmd.is_waiting(): - task.defer() - return task raise exc.MistralError('Unsupported workflow command: %s' % cmd) def _create_task(wf_ex, wf_spec, task_spec, ctx, task_ex=None, - unique_key=None): + unique_key=None, waiting=False): if task_spec.get_with_items(): - return tasks.WithItemsTask( - wf_ex, - wf_spec, - task_spec, - ctx, - task_ex, - unique_key - ) + cls = tasks.WithItemsTask + else: + cls = tasks.RegularTask - return tasks.RegularTask( - wf_ex, - wf_spec, - task_spec, - ctx, - task_ex, - unique_key - ) + return cls(wf_ex, wf_spec, task_spec, ctx, task_ex, unique_key, waiting) +@profiler.trace('task-handler-refresh-task-state') def _refresh_task_state(task_ex_id): with db_api.transaction(): task_ex = db_api.get_task_execution(task_ex_id) @@ -315,7 +283,6 @@ def _schedule_refresh_task_state(task_ex, delay=0): :param task_ex: Task execution. :param delay: Delay. - :return: """ key = 'th_c_t_s_a-%s' % task_ex.id @@ -323,7 +290,7 @@ def _schedule_refresh_task_state(task_ex, delay=0): None, _REFRESH_TASK_STATE_PATH, delay, - unique_key=key, + key=key, task_ex_id=task_ex.id ) @@ -366,7 +333,7 @@ def schedule_on_action_complete(action_ex, delay=0): None, _SCHEDULED_ON_ACTION_COMPLETE_PATH, delay, - unique_key=key, + key=key, action_ex_id=action_ex.id, wf_action=isinstance(action_ex, models.WorkflowExecution) ) diff --git a/mistral/engine/tasks.py b/mistral/engine/tasks.py index caac96c53..180b8fd79 100644 --- a/mistral/engine/tasks.py +++ b/mistral/engine/tasks.py @@ -47,15 +47,17 @@ class Task(object): @profiler.trace('task-create') def __init__(self, wf_ex, wf_spec, task_spec, ctx, task_ex=None, - unique_key=None): + unique_key=None, waiting=False): self.wf_ex = wf_ex self.task_spec = task_spec self.ctx = ctx self.task_ex = task_ex self.wf_spec = wf_spec self.unique_key = unique_key - self.waiting = False + self.waiting = waiting self.reset_flag = False + self.created = False + self.state_changed = False def is_completed(self): return self.task_ex and states.is_completed(self.task_ex.state) @@ -63,6 +65,12 @@ class Task(object): def is_waiting(self): return self.waiting + def is_created(self): + return self.created + + def is_state_changed(self): + return self.state_changed + @abc.abstractmethod def on_action_complete(self, action_ex): """Handle action completion. @@ -82,21 +90,24 @@ class Task(object): This method puts task to a waiting state. """ - if not self.task_ex: - t_execs = db_api.get_task_executions( - workflow_execution_id=self.wf_ex.id, - name=self.task_spec.get_name() - ) + with db_api.named_lock(self.unique_key): + if not self.task_ex: + t_execs = db_api.get_task_executions( + workflow_execution_id=self.wf_ex.id, + unique_key=self.unique_key + ) - self.task_ex = t_execs[0] if t_execs else None + self.task_ex = t_execs[0] if t_execs else None - if not self.task_ex: - self._create_task_execution() + msg = 'Task is waiting.' - if self.task_ex: - self.set_state(states.WAITING, 'Task is deferred.') - - self.waiting = True + if not self.task_ex: + self._create_task_execution( + state=states.WAITING, + state_info=msg + ) + elif self.task_ex.state != states.WAITING: + self.set_state(states.WAITING, msg) def reset(self): self.reset_flag = True @@ -124,6 +135,8 @@ class Task(object): state_info) ) + self.state_changed = True + self.task_ex.state = state self.task_ex.state_info = state_info @@ -172,7 +185,7 @@ class Task(object): wf_ctrl = wf_base.get_controller(self.wf_ex, self.wf_spec) # Calculate commands to process next. - cmds = wf_ctrl.continue_workflow(self.task_ex) + cmds = wf_ctrl.continue_workflow(task_ex=self.task_ex) # Mark task as processed after all decisions have been made # upon its completion. @@ -192,14 +205,20 @@ class Task(object): for p in policies.build_policies(policies_spec, self.wf_spec): p.after_task_complete(self.task_ex, self.task_spec) - def _create_task_execution(self, state=states.RUNNING): + def _create_task_execution(self, state=states.RUNNING, state_info=None): + task_id = utils.generate_unicode_uuid() + task_name = self.task_spec.get_name() + + data_flow.add_current_task_to_context(self.ctx, task_id, task_name) + values = { - 'id': utils.generate_unicode_uuid(), - 'name': self.task_spec.get_name(), + 'id': task_id, + 'name': task_name, 'workflow_execution_id': self.wf_ex.id, 'workflow_name': self.wf_ex.workflow_name, 'workflow_id': self.wf_ex.workflow_id, 'state': state, + 'state_info': state_info, 'spec': self.task_spec.to_dict(), 'unique_key': self.unique_key, 'in_context': self.ctx, @@ -208,23 +227,13 @@ class Task(object): 'project_id': self.wf_ex.project_id } - db_api.insert_or_ignore_task_execution(values) - - # Since 'insert_or_ignore' cannot return a valid count of updated - # rows the only reliable way to check if insert operation has created - # an object is try to load this object by just generated uuid. - task_ex = db_api.load_task_execution(values['id']) - - if not task_ex: - return False - - self.task_ex = task_ex + self.task_ex = db_api.create_task_execution(values) # Add to collection explicitly so that it's in a proper # state within the current session. self.wf_ex.task_executions.append(self.task_ex) - return True + self.created = True def _get_action_defaults(self): action_name = self.task_spec.get_action_name() @@ -262,12 +271,12 @@ class RegularTask(Task): self._run_existing() def _run_new(self): - if not self._create_task_execution(): - # Task with the same unique key has already been created. + if self.waiting: + self.defer() + return - if self.waiting: - return + self._create_task_execution() LOG.debug( 'Starting task [workflow=%s, task_spec=%s, init_state=%s]' % diff --git a/mistral/engine/workflow_handler.py b/mistral/engine/workflow_handler.py index 1c47192bc..a66693eae 100644 --- a/mistral/engine/workflow_handler.py +++ b/mistral/engine/workflow_handler.py @@ -27,7 +27,9 @@ from mistral.workflow import states LOG = logging.getLogger(__name__) -_ON_TASK_COMPLETE_PATH = 'mistral.engine.workflow_handler._on_task_complete' +_CHECK_AND_COMPLETE_PATH = ( + 'mistral.engine.workflow_handler._check_and_complete' +) @profiler.trace('workflow-handler-start-workflow') @@ -38,6 +40,8 @@ def start_workflow(wf_identifier, wf_input, desc, params): wf.start(wf_input, desc=desc, params=params) + _schedule_check_and_complete(wf.wf_ex) + return wf.wf_ex @@ -73,13 +77,11 @@ def cancel_workflow(wf_ex, msg=None): stop_workflow(wf_ex, states.CANCELLED, msg) -@profiler.trace('workflow-handler-on-task-complete') -def _on_task_complete(task_ex_id): +@profiler.trace('workflow-handler-check-and-complete') +def _check_and_complete(wf_ex_id): # Note: This method can only be called via scheduler. with db_api.transaction(): - task_ex = db_api.get_task_execution(task_ex_id) - - wf_ex = task_ex.workflow_execution + wf_ex = db_api.get_workflow_execution(wf_ex_id) wf = workflows.Workflow( db_api.get_workflow_definition(wf_ex.workflow_id), @@ -87,11 +89,11 @@ def _on_task_complete(task_ex_id): ) try: - wf.on_task_complete(task_ex) + wf.check_and_complete() except exc.MistralException as e: msg = ( - "Failed to handle task completion [wf_ex=%s, task_ex=%s]:" - " %s\n%s" % (wf_ex, task_ex, e, tb.format_exc()) + "Failed to check and complete [wf_ex=%s]:" + " %s\n%s" % (wf_ex, e, tb.format_exc()) ) LOG.error(msg) @@ -105,7 +107,7 @@ def _on_task_complete(task_ex_id): # algorithm for increasing delay for rescheduling so that we don't # put too serious load onto scheduler. delay = 1 - schedule_on_task_complete(task_ex, delay) + _schedule_check_and_complete(wf_ex, delay) def pause_workflow(wf_ex, msg=None): @@ -128,6 +130,11 @@ def rerun_workflow(wf_ex, task_ex, reset=True, env=None): wf.rerun(task_ex, reset=reset, env=env) + _schedule_check_and_complete(wf_ex) + + if wf_ex.task_execution_id: + _schedule_check_and_complete(wf_ex.task_execution.workflow_execution) + def resume_workflow(wf_ex, env=None): if not states.is_paused_or_idle(wf_ex.state): @@ -153,9 +160,9 @@ def set_workflow_state(wf_ex, state, msg=None): ) -@profiler.trace('workflow-handler-schedule-on-task-complete') -def schedule_on_task_complete(task_ex, delay=0): - """Schedules task completion check. +@profiler.trace('workflow-handler-schedule-check-and-complete') +def _schedule_check_and_complete(wf_ex, delay=0): + """Schedules workflow completion check. This method provides transactional decoupling of task completion from workflow completion check. It's needed in non-locking model in order to @@ -165,16 +172,17 @@ def schedule_on_task_complete(task_ex, delay=0): have in this case (time between transactions) whereas scheduler is a special component that is designed to be resistant to failures. - :param task_ex: Task execution. + :param wf_ex: Workflow execution. :param delay: Minimum amount of time before task completion check should be made. """ - key = 'wfh_on_t_c-%s' % task_ex.workflow_execution_id + # TODO(rakhmerov): update docstring + key = 'wfh_on_c_a_c-%s' % wf_ex.id scheduler.schedule_call( None, - _ON_TASK_COMPLETE_PATH, + _CHECK_AND_COMPLETE_PATH, delay, - unique_key=key, - task_ex_id=task_ex.id + key=key, + wf_ex_id=wf_ex.id ) diff --git a/mistral/engine/workflows.py b/mistral/engine/workflows.py index acf985ca6..3cde89651 100644 --- a/mistral/engine/workflows.py +++ b/mistral/engine/workflows.py @@ -56,10 +56,18 @@ class Workflow(object): def __init__(self, wf_def, wf_ex=None): self.wf_def = wf_def self.wf_ex = wf_ex - self.wf_spec = spec_parser.get_workflow_spec_by_definition_id( - wf_def.id, - wf_def.updated_at - ) + + if wf_ex: + # We're processing a workflow that's already in progress. + self.wf_spec = spec_parser.get_workflow_spec_by_execution_id( + wf_ex.id + ) + else: + # New workflow execution. + self.wf_spec = spec_parser.get_workflow_spec_by_definition_id( + wf_def.id, + wf_def.updated_at + ) @profiler.trace('workflow-start') def start(self, input_dict, desc='', params=None): @@ -115,17 +123,6 @@ class Workflow(object): elif state == states.CANCELLED: return self._cancel_workflow(msg) - @profiler.trace('workflow-on-task-complete') - def on_task_complete(self, task_ex): - """Handle task completion event. - - :param task_ex: Task execution that's completed. - """ - - assert self.wf_ex - - self._check_and_complete() - def resume(self, env=None): """Resume workflow. @@ -190,10 +187,10 @@ class Workflow(object): if states.is_completed(t_ex.state) and not t_ex.processed: t_ex.processed = True - dispatcher.dispatch_workflow_commands(self.wf_ex, cmds) - - if not cmds: - self._check_and_complete() + if cmds: + dispatcher.dispatch_workflow_commands(self.wf_ex, cmds) + else: + self.check_and_complete() @profiler.trace('workflow-lock') def lock(self): @@ -276,13 +273,16 @@ class Workflow(object): parent_task_ex.state_info = None parent_task_ex.processed = False - def _check_and_complete(self): + @profiler.trace('workflow-check-and-complete') + def check_and_complete(self): if states.is_paused_or_completed(self.wf_ex.state): return # Workflow is not completed if there are any incomplete task # executions. - incomplete_tasks = wf_utils.find_incomplete_task_executions(self.wf_ex) + incomplete_tasks = db_api.get_incomplete_task_executions( + workflow_execution_id=self.wf_ex.id, + ) if incomplete_tasks: return diff --git a/mistral/services/scheduler.py b/mistral/services/scheduler.py index 77f016a7d..efa7337f9 100644 --- a/mistral/services/scheduler.py +++ b/mistral/services/scheduler.py @@ -37,7 +37,7 @@ _schedulers = {} def schedule_call(factory_method_path, target_method_name, - run_after, serializers=None, unique_key=None, **method_args): + run_after, serializers=None, key=None, **method_args): """Schedules call and lately invokes target_method. Add this call specification to DB, and then after run_after @@ -54,11 +54,8 @@ def schedule_call(factory_method_path, target_method_name, { "result": "mistral.utils.serializer.ResultSerializer"} Serializer for the object type must implement serializer interface in mistral/utils/serializer.py - :param unique_key: Unique key which in combination with 'processing' - flag restricts a number of delayed calls if it's passed. For example, - if we schedule two calls but pass the same unique key for them then - we won't get two of them in DB if both have same value of 'processing' - flag. + :param key: Key which can potentially be used for squashing similar + delayed calls. :param method_args: Target method keyword arguments. """ ctx_serializer = context.RpcContextSerializer( @@ -95,12 +92,12 @@ def schedule_call(factory_method_path, target_method_name, 'execution_time': execution_time, 'auth_context': ctx, 'serializers': serializers, - 'unique_key': unique_key, + 'key': key, 'method_arguments': method_args, 'processing': False } - db_api.insert_or_ignore_delayed_call(values) + db_api.create_delayed_call(values) class CallScheduler(periodic_task.PeriodicTasks): diff --git a/mistral/tests/unit/db/v2/test_insert_or_ignore.py b/mistral/tests/unit/db/v2/test_insert_or_ignore.py index ce12087ed..5ee4a054e 100644 --- a/mistral/tests/unit/db/v2/test_insert_or_ignore.py +++ b/mistral/tests/unit/db/v2/test_insert_or_ignore.py @@ -15,20 +15,39 @@ from oslo_config import cfg -from oslo_utils import timeutils from mistral.db.v2.sqlalchemy import api as db_api from mistral.db.v2.sqlalchemy import models as db_models from mistral.tests.unit import base as test_base +WF_EX = { + 'id': '1', + 'spec': {}, + 'start_params': {'task': 'my_task1'}, + 'state': 'IDLE', + 'state_info': "Running...", + 'created_at': None, + 'updated_at': None, + 'context': None, + 'task_id': None, + 'trust_id': None, + 'description': None, + 'output': None +} -DELAYED_CALL = { - 'factory_method_path': 'my_factory_method', - 'target_method_name': 'my_target_method', - 'method_arguments': None, - 'serializers': None, - 'auth_context': None, - 'execution_time': timeutils.utcnow() + +TASK_EX = { + 'workflow_execution_id': '1', + 'workflow_name': 'my_wf', + 'name': 'my_task1', + 'spec': None, + 'action_spec': None, + 'state': 'IDLE', + 'tags': ['deployment'], + 'in_context': None, + 'runtime_context': None, + 'created_at': None, + 'updated_at': None } @@ -38,6 +57,8 @@ class InsertOrIgnoreTest(test_base.DbTestCase): cfg.CONF.set_default('auth_enable', True, group='pecan') + db_api.create_workflow_execution(WF_EX) + self.addCleanup( cfg.CONF.set_default, 'auth_enable', @@ -47,46 +68,46 @@ class InsertOrIgnoreTest(test_base.DbTestCase): def test_insert_or_ignore_without_conflicts(self): db_api.insert_or_ignore( - db_models.DelayedCall, - DELAYED_CALL.copy() + db_models.TaskExecution, + TASK_EX.copy() ) - delayed_calls = db_api.get_delayed_calls() + task_execs = db_api.get_task_executions() - self.assertEqual(1, len(delayed_calls)) + self.assertEqual(1, len(task_execs)) - delayed_call = delayed_calls[0] + task_ex = task_execs[0] - self._assert_dict_contains_subset(DELAYED_CALL, delayed_call.to_dict()) + self._assert_dict_contains_subset(TASK_EX, task_ex.to_dict()) def test_insert_or_ignore_with_conflicts(self): # Insert the first object. - values = DELAYED_CALL.copy() + values = TASK_EX.copy() values['unique_key'] = 'key' - db_api.insert_or_ignore(db_models.DelayedCall, values) + db_api.insert_or_ignore(db_models.TaskExecution, values) - delayed_calls = db_api.get_delayed_calls() + task_execs = db_api.get_task_executions() - self.assertEqual(1, len(delayed_calls)) + self.assertEqual(1, len(task_execs)) - delayed_call = delayed_calls[0] + task_ex = task_execs[0] - self._assert_dict_contains_subset(DELAYED_CALL, delayed_call.to_dict()) + self._assert_dict_contains_subset(TASK_EX, task_ex.to_dict()) # Insert the second object with the same unique key. # We must not get exceptions and new object must not be saved. - values = DELAYED_CALL.copy() + values = TASK_EX.copy() values['unique_key'] = 'key' - db_api.insert_or_ignore(db_models.DelayedCall, values) + db_api.insert_or_ignore(db_models.TaskExecution, values) - delayed_calls = db_api.get_delayed_calls() + task_execs = db_api.get_task_executions() - self.assertEqual(1, len(delayed_calls)) + self.assertEqual(1, len(task_execs)) - delayed_call = delayed_calls[0] + task_ex = task_execs[0] - self._assert_dict_contains_subset(DELAYED_CALL, delayed_call.to_dict()) + self._assert_dict_contains_subset(TASK_EX, task_ex.to_dict()) diff --git a/mistral/tests/unit/db/v2/test_sqlalchemy_db_api.py b/mistral/tests/unit/db/v2/test_sqlalchemy_db_api.py index f3a3964f4..a686568e2 100644 --- a/mistral/tests/unit/db/v2/test_sqlalchemy_db_api.py +++ b/mistral/tests/unit/db/v2/test_sqlalchemy_db_api.py @@ -2246,13 +2246,13 @@ class LockTest(SQLAlchemyTest): self.assertEqual('lock1', locks[0].name) - db_api.delete_named_lock('invalid_lock_name') + db_api.delete_named_lock('invalid_lock_id') locks = db_api.get_named_locks() self.assertEqual(1, len(locks)) - db_api.delete_named_lock(locks[0].name) + db_api.delete_named_lock(locks[0].id) locks = db_api.get_named_locks() diff --git a/mistral/tests/unit/engine/base.py b/mistral/tests/unit/engine/base.py index 3d131aa2a..524e5bb00 100644 --- a/mistral/tests/unit/engine/base.py +++ b/mistral/tests/unit/engine/base.py @@ -141,8 +141,8 @@ class EngineTestCase(base.DbTestCase): for w in wf_execs: print( - "\n%s [state=%s, state_info=%s, output=%s]" % - (w.name, w.state, w.state_info, w.output) + "\n%s (%s) [state=%s, state_info=%s, output=%s]" % + (w.name, w.id, w.state, w.state_info, w.output) ) for t in w.task_executions: diff --git a/mistral/tests/unit/engine/test_dataflow.py b/mistral/tests/unit/engine/test_dataflow.py index 0ce5a78b3..a2323af21 100644 --- a/mistral/tests/unit/engine/test_dataflow.py +++ b/mistral/tests/unit/engine/test_dataflow.py @@ -163,6 +163,7 @@ class DataFlowEngineTest(engine_test_base.EngineTestCase): self.assertDictEqual(exp_published_arr[0], task1.published) self.assertDictEqual(exp_published_arr[1], task2.published) self.assertDictEqual(exp_published_arr[2], task3.published) + self.assertIn(exp_published_arr[0]['progress'], notify_published_arr) self.assertIn(exp_published_arr[1]['progress'], notify_published_arr) self.assertIn(exp_published_arr[2]['progress'], notify_published_arr) diff --git a/mistral/tests/unit/services/test_scheduler.py b/mistral/tests/unit/services/test_scheduler.py index d952e2546..15db8fb44 100644 --- a/mistral/tests/unit/services/test_scheduler.py +++ b/mistral/tests/unit/services/test_scheduler.py @@ -341,46 +341,3 @@ class SchedulerServiceTest(base.DbTestCase): db_api.get_delayed_call(calls[0].id) db_api.delete_delayed_call(calls[0].id) - - def test_schedule_with_unique_key(self): - method_args = {'name': 'task', 'id': '321'} - - key = 'my_unique_key' - - scheduler.schedule_call( - None, - TARGET_METHOD_PATH, - DELAY, - unique_key=key, - **method_args - ) - - self.assertEqual(1, len(db_api.get_delayed_calls())) - - # Schedule the call for the second time, number of calls - # must not change due to the same unique key. - scheduler.schedule_call( - None, - TARGET_METHOD_PATH, - DELAY, - unique_key=key, - **method_args - ) - - calls = db_api.get_delayed_calls() - - self.assertEqual(1, len(calls)) - - # Now change 'processing' flag and make sure we can schedule - # one more call because DB constraint allows it. - db_api.update_delayed_call(calls[0].id, {'processing': True}) - - scheduler.schedule_call( - None, - TARGET_METHOD_PATH, - DELAY, - unique_key=key, - **method_args - ) - - self.assertEqual(2, len(db_api.get_delayed_calls())) diff --git a/mistral/tests/unit/test_command_dispatcher.py b/mistral/tests/unit/test_command_dispatcher.py new file mode 100644 index 000000000..2479a7278 --- /dev/null +++ b/mistral/tests/unit/test_command_dispatcher.py @@ -0,0 +1,75 @@ +# Copyright 2013 - Mirantis, Inc. +# Copyright 2015 - StackStorm, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from mistral.engine import dispatcher +from mistral.tests.unit import base +from mistral.workflow import commands + + +def _print_commands(cmds): + print("commands:") + + for cmd in cmds: + if isinstance(cmd, commands.RunTask): + print("%s, %s, %s" % (type(cmd), cmd.is_waiting(), cmd.unique_key)) + else: + print("%s" % type(cmd)) + + +class CommandDispatcherTest(base.BaseTest): + def setUp(self): + super(CommandDispatcherTest, self).setUp() + + def test_rearrange_commands(self): + no_wait = commands.RunTask(None, None, None, None) + fail = commands.FailWorkflow(None, None, None, None) + succeed = commands.SucceedWorkflow(None, None, None, None) + + wait1 = commands.RunTask(None, None, None, None) + wait1.wait = True + wait1.unique_key = 'wait1' + + wait2 = commands.RunTask(None, None, None, None) + wait2.wait = True + wait2.unique_key = 'wait2' + + wait3 = commands.RunTask(None, None, None, None) + wait3.wait = True + wait3.unique_key = 'wait3' + + # 'set state' command is the first, others must be ignored. + initial = [fail, no_wait, wait1, wait3, wait2] + expected = [fail] + + cmds = dispatcher._rearrange_commands(initial) + + self.assertEqual(expected, cmds) + + # 'set state' command is the last, tasks before it must be sorted. + initial = [no_wait, wait2, wait1, wait3, succeed] + expected = [no_wait, wait1, wait2, wait3, succeed] + + cmds = dispatcher._rearrange_commands(initial) + + self.assertEqual(expected, cmds) + + # 'set state' command is in the middle, tasks before it must be sorted + # and the task after it must be ignored. + initial = [wait3, wait2, no_wait, succeed, wait1] + expected = [no_wait, wait2, wait3, succeed] + + cmds = dispatcher._rearrange_commands(initial) + + self.assertEqual(expected, cmds) diff --git a/mistral/utils/yaql_utils.py b/mistral/utils/yaql_utils.py index 0b5a6b564..122390fba 100644 --- a/mistral/utils/yaql_utils.py +++ b/mistral/utils/yaql_utils.py @@ -20,6 +20,8 @@ from mistral import utils from mistral.workflow import utils as wf_utils from oslo_serialization import jsonutils from stevedore import extension + + ROOT_CONTEXT = None @@ -37,6 +39,7 @@ def get_yaql_context(data_context): if isinstance(data_context, dict): new_ctx['__env'] = data_context.get('__env') new_ctx['__execution'] = data_context.get('__execution') + new_ctx['__task_execution'] = data_context.get('__task_execution') return new_ctx @@ -86,11 +89,18 @@ def task_(context, task_name): wf_ex = db_api.get_workflow_execution(context['__execution']['id']) - task_execs = wf_utils.find_task_executions_by_name(wf_ex, task_name) + # This section may not exist in a context if it's calculated not in + # task scope. + cur_task = context['__task_execution'] - # TODO(rakhmerov): Account for multiple executions (i.e. in case of - # cycles). - task_ex = task_execs[-1] if len(task_execs) > 0 else None + if cur_task and cur_task['name'] == task_name: + task_ex = db_api.get_task_execution(cur_task['id']) + else: + task_execs = wf_utils.find_task_executions_by_name(wf_ex, task_name) + + # TODO(rakhmerov): Account for multiple executions (i.e. in case of + # cycles). + task_ex = task_execs[-1] if len(task_execs) > 0 else None if not task_ex: raise ValueError( diff --git a/mistral/workbook/v2/workflows.py b/mistral/workbook/v2/workflows.py index 8b72da2fa..fe150cd36 100644 --- a/mistral/workbook/v2/workflows.py +++ b/mistral/workbook/v2/workflows.py @@ -130,6 +130,9 @@ class WorkflowSpec(base.BaseSpec): def get_tasks(self): return self._tasks + def get_task(self, name): + return self._tasks[name] + class DirectWorkflowSpec(WorkflowSpec): _polymorphic_value = 'direct' diff --git a/mistral/workflow/data_flow.py b/mistral/workflow/data_flow.py index 48fd088b3..f0498cc74 100644 --- a/mistral/workflow/data_flow.py +++ b/mistral/workflow/data_flow.py @@ -138,6 +138,15 @@ def evaluate_workflow_output(wf_spec, ctx): return output or ctx +def add_current_task_to_context(ctx, task_id, task_name): + ctx['__task_execution'] = { + 'id': task_id, + 'name': task_name + } + + return ctx + + def add_openstack_data_to_context(wf_ex): wf_ex.context = wf_ex.context or {} diff --git a/mistral/workflow/direct_workflow.py b/mistral/workflow/direct_workflow.py index b5937313e..92092832c 100644 --- a/mistral/workflow/direct_workflow.py +++ b/mistral/workflow/direct_workflow.py @@ -113,6 +113,8 @@ class DirectWorkflowController(base.WorkflowController): cmds = [] + ctx = data_flow.evaluate_task_outbound_context(task_ex) + for t_n, params in self._find_next_tasks(task_ex): t_s = self.wf_spec.get_tasks()[t_n] @@ -126,7 +128,7 @@ class DirectWorkflowController(base.WorkflowController): self.wf_ex, self.wf_spec, t_s, - data_flow.evaluate_task_outbound_context(task_ex), + ctx, params ) diff --git a/mistral/workflow/reverse_workflow.py b/mistral/workflow/reverse_workflow.py index 8b17554d2..ebeb076eb 100644 --- a/mistral/workflow/reverse_workflow.py +++ b/mistral/workflow/reverse_workflow.py @@ -40,7 +40,7 @@ class ReverseWorkflowController(base.WorkflowController): __workflow_type__ = "reverse" - def _find_next_commands(self, task_ex=None): + def _find_next_commands(self, task_ex): """Finds all tasks with resolved dependencies. This method finds all tasks with resolved dependencies and diff --git a/mistral/workflow/utils.py b/mistral/workflow/utils.py index ee145f107..66e8c2ce1 100644 --- a/mistral/workflow/utils.py +++ b/mistral/workflow/utils.py @@ -73,6 +73,12 @@ class ResultSerializer(serializers.Serializer): ) +# TODO(rakhmerov): Most of these 'find..' methods should be replaced +# with corresponding methods on DB API because in most cases +# we don't need to fetch the whole collection of task executions +# from DB. We can just query only needed objects by needed criteria. +# In practice it leads to serious performance loss. + def find_task_execution_not_state(wf_ex, task_spec, state): task_execs = [ t for t in wf_ex.task_executions @@ -126,11 +132,6 @@ def find_successful_task_executions(wf_ex): return find_task_executions_with_state(wf_ex, states.SUCCESS) -def find_incomplete_task_executions(wf_ex): - return [t for t in wf_ex.task_executions - if not states.is_completed(t.state)] - - def find_error_task_executions(wf_ex): return find_task_executions_with_state(wf_ex, states.ERROR)