From 1b0f0cddd620a3785017bb28d432cb0030b627d7 Mon Sep 17 00:00:00 2001 From: Renat Akhmerov Date: Wed, 17 Aug 2016 16:39:13 +0700 Subject: [PATCH] Change execution mechanism for 'join' tasks * In order to avoid duplicates of same 'join' tasks we now use named locks to exclusively create a task execution for 'join' tasks in DB. Transaction that does that is always separate from task completion logic and is very short. This is needed to eliminate DB contention on same records of task execution table. This is also a reason to use a separate mechanism such as named locks, and additionally this reduces a number possible scenarios for getting into deadlocks because for task executions we have too many different access patterns that can lead to them in case of doing locking on right on their table records. So this approach guarantees that there's only one transaction creates a new task execution object for 'join' task and schedules 'refresh_task_state' job that check 'join' completion. * Dropped scheduler 'unique_key' column with unique constraint because in practice it causes DB deadlocks (at least on MySQL) while simultaneously inserting and updating the table * Instead of 'unique_key' column we added non-unique 'key' column that can potentially be used for squashing delayed calls by scheduler itself (not implemented yet) * Adjusted Scheduler implementation and tests accordingly * Fixed task() YAQL function to work without precisely resolve task execution object in case it's called for the current task. Previously it was dependent on the luck and we were lucky enough that tests were passing. * Increased length of 'unique_key' column for task executions to 250 which is close to a limit for string fields participating in unique constraints. Change-Id: Ib7aaa20c2c8834ab0f2d9c90457677c9edb62805 --- ...increate_task_execution_unique_key_size.py | 33 ++++++++ .../versions/019_change_scheduler_schema.py | 49 ++++++++++++ mistral/db/v2/api.py | 16 ++-- mistral/db/v2/sqlalchemy/api.py | 49 ++++++++---- mistral/db/v2/sqlalchemy/models.py | 21 +++-- mistral/engine/actions.py | 3 +- mistral/engine/dispatcher.py | 64 ++++++++++++++- .../engine/rpc_backend/oslo/oslo_server.py | 2 +- mistral/engine/task_handler.py | 63 ++++----------- mistral/engine/tasks.py | 77 +++++++++++-------- mistral/engine/workflow_handler.py | 44 ++++++----- mistral/engine/workflows.py | 42 +++++----- mistral/services/scheduler.py | 13 ++-- .../tests/unit/db/v2/test_insert_or_ignore.py | 73 +++++++++++------- .../unit/db/v2/test_sqlalchemy_db_api.py | 4 +- mistral/tests/unit/engine/base.py | 4 +- mistral/tests/unit/engine/test_dataflow.py | 1 + mistral/tests/unit/services/test_scheduler.py | 43 ----------- mistral/tests/unit/test_command_dispatcher.py | 75 ++++++++++++++++++ mistral/utils/yaql_utils.py | 18 ++++- mistral/workbook/v2/workflows.py | 3 + mistral/workflow/data_flow.py | 9 +++ mistral/workflow/direct_workflow.py | 4 +- mistral/workflow/reverse_workflow.py | 2 +- mistral/workflow/utils.py | 11 +-- 25 files changed, 469 insertions(+), 254 deletions(-) create mode 100644 mistral/db/sqlalchemy/migration/alembic_migrations/versions/018_increate_task_execution_unique_key_size.py create mode 100644 mistral/db/sqlalchemy/migration/alembic_migrations/versions/019_change_scheduler_schema.py create mode 100644 mistral/tests/unit/test_command_dispatcher.py diff --git a/mistral/db/sqlalchemy/migration/alembic_migrations/versions/018_increate_task_execution_unique_key_size.py b/mistral/db/sqlalchemy/migration/alembic_migrations/versions/018_increate_task_execution_unique_key_size.py new file mode 100644 index 000000000..ee7868f64 --- /dev/null +++ b/mistral/db/sqlalchemy/migration/alembic_migrations/versions/018_increate_task_execution_unique_key_size.py @@ -0,0 +1,33 @@ +# Copyright 2016 OpenStack Foundation. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""increate_task_execution_unique_key_size + +Revision ID: 018 +Revises: 017 +Create Date: 2016-08-17 17:47:30.325182 + +""" + +# revision identifiers, used by Alembic. +revision = '018' +down_revision = '017' + +from alembic import op +import sqlalchemy as sa + + +def upgrade(): + op.alter_column('task_executions_v2', 'unique_key', type_=sa.String(250)) diff --git a/mistral/db/sqlalchemy/migration/alembic_migrations/versions/019_change_scheduler_schema.py b/mistral/db/sqlalchemy/migration/alembic_migrations/versions/019_change_scheduler_schema.py new file mode 100644 index 000000000..e53214572 --- /dev/null +++ b/mistral/db/sqlalchemy/migration/alembic_migrations/versions/019_change_scheduler_schema.py @@ -0,0 +1,49 @@ +# Copyright 2016 OpenStack Foundation. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Change scheduler schema. + +Revision ID: 019 +Revises: 018 +Create Date: 2016-08-17 17:54:51.952949 + +""" + +# revision identifiers, used by Alembic. +revision = '019' +down_revision = '018' + +from alembic import op +import sqlalchemy as sa + + +def upgrade(): + op.drop_index( + 'delayed_calls_v2_processing_execution_time', + table_name='delayed_calls_v2' + ) + op.drop_index('unique_key', table_name='delayed_calls_v2') + op.drop_column('delayed_calls_v2', 'unique_key') + + op.add_column( + 'delayed_calls_v2', + sa.Column('key', sa.String(length=250), nullable=True) + ) + op.create_index( + 'delayed_calls_v2_execution_time', + 'delayed_calls_v2', + ['execution_time'], + unique=False + ) diff --git a/mistral/db/v2/api.py b/mistral/db/v2/api.py index a9f6e1655..fe88d9afe 100644 --- a/mistral/db/v2/api.py +++ b/mistral/db/v2/api.py @@ -308,14 +308,14 @@ def get_task_executions(limit=None, marker=None, sort_keys=['created_at'], ) +def get_incomplete_task_executions(**kwargs): + return IMPL.get_incomplete_task_executions(**kwargs) + + def create_task_execution(values): return IMPL.create_task_execution(values) -def insert_or_ignore_task_execution(values): - return IMPL.insert_or_ignore_task_execution(values) - - def update_task_execution(id, values): return IMPL.update_task_execution(id, values) @@ -342,10 +342,6 @@ def create_delayed_call(values): return IMPL.create_delayed_call(values) -def insert_or_ignore_delayed_call(values): - return IMPL.insert_or_ignore_delayed_call(values) - - def delete_delayed_call(id): return IMPL.delete_delayed_call(id) @@ -533,8 +529,8 @@ def get_named_locks(limit=None, marker=None): return IMPL.get_named_locks(limit=limit, marker=marker) -def delete_named_lock(name): - return IMPL.delete_named_lock(name) +def delete_named_lock(lock_id): + return IMPL.delete_named_lock(lock_id) @contextlib.contextmanager diff --git a/mistral/db/v2/sqlalchemy/api.py b/mistral/db/v2/sqlalchemy/api.py index 48925af92..3d51e8500 100644 --- a/mistral/db/v2/sqlalchemy/api.py +++ b/mistral/db/v2/sqlalchemy/api.py @@ -34,6 +34,7 @@ from mistral.db.v2.sqlalchemy import filters as db_filters from mistral.db.v2.sqlalchemy import models from mistral import exceptions as exc from mistral.services import security +from mistral import utils CONF = cfg.CONF @@ -796,6 +797,23 @@ def get_task_executions(**kwargs): return _get_task_executions(**kwargs) +def get_incomplete_task_executions(**kwargs): + query = b.model_query(models.TaskExecution) + + query = query.filter_by(**kwargs) + + query = query.filter( + sa.or_( + models.TaskExecution.state == 'IDLE', + models.TaskExecution.state == 'RUNNING', + models.TaskExecution.state == 'WAITING', + models.TaskExecution.state == 'DELAYED' + ) + ) + + return query.all() + + @b.session_aware() def create_task_execution(values, session=None): task_ex = models.TaskExecution() @@ -812,10 +830,6 @@ def create_task_execution(values, session=None): return task_ex -def insert_or_ignore_task_execution(values): - insert_or_ignore(models.TaskExecution, values.copy()) - - @b.session_aware() def update_task_execution(id, values, session=None): task_ex = get_task_execution(id) @@ -866,10 +880,6 @@ def create_delayed_call(values, session=None): return delayed_call -def insert_or_ignore_delayed_call(values): - insert_or_ignore(models.DelayedCall, values.copy()) - - @b.session_aware() def delete_delayed_call(id, session=None): delayed_call = get_delayed_call(id) @@ -1388,9 +1398,17 @@ def create_named_lock(name, session=None): # session may not immediately issue an SQL query to a database # and instead just schedule it whereas we need to make sure to # issue a query immediately. + session.flush() + insert = models.NamedLock.__table__.insert() - session.execute(insert.values(name=name)) + lock_id = utils.generate_unicode_uuid() + + session.execute(insert.values(id=lock_id, name=name)) + + session.flush() + + return lock_id def get_named_locks(**kwargs): @@ -1398,22 +1416,27 @@ def get_named_locks(**kwargs): @b.session_aware() -def delete_named_lock(name, session=None): +def delete_named_lock(lock_id, session=None): # This method has to work without SQLAlchemy session because # session may not immediately issue an SQL query to a database # and instead just schedule it whereas we need to make sure to # issue a query immediately. + session.flush() + table = models.NamedLock.__table__ delete = table.delete() - session.execute(delete.where(table.c.name == name)) + session.execute(delete.where(table.c.id == lock_id)) + + session.flush() @contextlib.contextmanager def named_lock(name): + lock_id = None try: - create_named_lock(name) + lock_id = create_named_lock(name) yield finally: - delete_named_lock(name) + delete_named_lock(lock_id) diff --git a/mistral/db/v2/sqlalchemy/models.py b/mistral/db/v2/sqlalchemy/models.py index 3c84af5a5..fdfdcd091 100644 --- a/mistral/db/v2/sqlalchemy/models.py +++ b/mistral/db/v2/sqlalchemy/models.py @@ -227,7 +227,7 @@ class TaskExecution(Execution): # Main properties. action_spec = sa.Column(st.JsonLongDictType()) - unique_key = sa.Column(sa.String(200), nullable=True) + unique_key = sa.Column(sa.String(250), nullable=True) # Whether the task is fully processed (publishing and calculating commands # after it). It allows to simplify workflow controller implementations @@ -242,7 +242,7 @@ class TaskExecution(Execution): def executions(self): return ( self.action_executions - if self.spec.get('action') + if not self.spec.get('workflow') else self.workflow_executions ) @@ -328,26 +328,23 @@ class DelayedCall(mb.MistralModelBase): __tablename__ = 'delayed_calls_v2' - __table_args__ = ( - sa.Index( - '%s_processing_execution_time' % __tablename__, - 'processing', - 'execution_time' - ), - sa.UniqueConstraint('unique_key', 'processing') - ) - id = mb.id_column() factory_method_path = sa.Column(sa.String(200), nullable=True) target_method_name = sa.Column(sa.String(80), nullable=False) method_arguments = sa.Column(st.JsonDictType()) serializers = sa.Column(st.JsonDictType()) - unique_key = sa.Column(sa.String(80), nullable=True) + key = sa.Column(sa.String(250), nullable=True) auth_context = sa.Column(st.JsonDictType()) execution_time = sa.Column(sa.DateTime, nullable=False) processing = sa.Column(sa.Boolean, default=False, nullable=False) +sa.Index( + '%s_execution_time' % DelayedCall.__tablename__, + DelayedCall.execution_time +) + + class Environment(mb.MistralSecureModelBase): """Contains environment variables for workflow execution.""" diff --git a/mistral/engine/actions.py b/mistral/engine/actions.py index 22db1d8ac..996d7542e 100644 --- a/mistral/engine/actions.py +++ b/mistral/engine/actions.py @@ -174,9 +174,10 @@ class Action(object): if prev_state != state: wf_trace.info( None, - "Action '%s' (%s) [%s -> %s, %s]" % + "Action '%s' (%s)(task=%s) [%s -> %s, %s]" % (self.action_ex.name, self.action_ex.id, + self.task_ex.name if self.task_ex else None, prev_state, state, _result_msg()) diff --git a/mistral/engine/dispatcher.py b/mistral/engine/dispatcher.py index d5bed6521..32ad72c77 100644 --- a/mistral/engine/dispatcher.py +++ b/mistral/engine/dispatcher.py @@ -13,6 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. +import functools from osprofiler import profiler from mistral import exceptions as exc @@ -20,6 +21,64 @@ from mistral.workflow import commands from mistral.workflow import states +def _compare_task_commands(a, b): + if not isinstance(a, commands.RunTask) or not a.is_waiting(): + return -1 + + if not isinstance(b, commands.RunTask) or not b.is_waiting(): + return 1 + + if a.unique_key == b.unique_key: + return 0 + + if a.unique_key < b.unique_key: + return -1 + + return 1 + + +def _rearrange_commands(cmds): + """Takes workflow commands and does required pre-processing. + + The main idea of the method is to sort task commands with 'waiting' + flag by 'unique_key' property in order guarantee the same locking + order for them in parallel transactions and thereby prevent deadlocks. + It also removes commands that don't make sense. For example, if + there are some commands after a command that changes a workflow state + then they must not be dispatched. + """ + + # Remove all 'noop' commands. + cmds = list(filter(lambda c: not isinstance(c, commands.Noop), cmds)) + + state_cmd_idx = -1 + state_cmd = None + + for i, cmd in enumerate(cmds): + if isinstance(cmd, commands.SetWorkflowState): + state_cmd_idx = i + state_cmd = cmd + + break + + # Find a position of a 'fail|succeed|pause' command + # and sort all task commands before it. + if state_cmd_idx < 0: + cmds.sort(key=functools.cmp_to_key(_compare_task_commands)) + + return cmds + elif state_cmd_idx == 0: + return cmds[0:1] + + res = cmds[0:state_cmd_idx] + + res.sort(key=functools.cmp_to_key(_compare_task_commands)) + + res.append(state_cmd) + + return res + + @profiler.trace('dispatcher-dispatch-commands') def dispatch_workflow_commands(wf_ex, wf_cmds): # TODO(rakhmerov): I don't like these imports but otherwise we have @@ -30,14 +89,11 @@ def dispatch_workflow_commands(wf_ex, wf_cmds): if not wf_cmds: return - for cmd in wf_cmds: + for cmd in _rearrange_commands(wf_cmds): if isinstance(cmd, (commands.RunTask, commands.RunExistingTask)): task_handler.run_task(cmd) elif isinstance(cmd, commands.SetWorkflowState): wf_handler.set_workflow_state(wf_ex, cmd.new_state, cmd.msg) - elif isinstance(cmd, commands.Noop): - # Do nothing. - pass else: raise exc.MistralError('Unsupported workflow command: %s' % cmd) diff --git a/mistral/engine/rpc_backend/oslo/oslo_server.py b/mistral/engine/rpc_backend/oslo/oslo_server.py index 399a87e9c..c3e54212e 100644 --- a/mistral/engine/rpc_backend/oslo/oslo_server.py +++ b/mistral/engine/rpc_backend/oslo/oslo_server.py @@ -48,7 +48,7 @@ class OsloRPCServer(rpc_base.RPCServer): rpc.get_transport(), target, self.endpoints, - executor='eventlet', + executor='blocking', serializer=ctx.RpcContextSerializer(ctx.JsonPayloadSerializer()) ) diff --git a/mistral/engine/task_handler.py b/mistral/engine/task_handler.py index 0fb9995d7..4a630f46a 100644 --- a/mistral/engine/task_handler.py +++ b/mistral/engine/task_handler.py @@ -72,20 +72,9 @@ def run_task(wf_cmd): return - if not task.task_ex: - # It is possible that task execution was not created - # (and therefore not associated with Task instance). - # For example, in case of 'join' that has already been - # created by a different transaction. In this case - # we should skip post completion scheduled checks. - return - - if task.is_waiting(): + if task.is_waiting() and (task.is_created() or task.is_state_changed()): _schedule_refresh_task_state(task.task_ex) - if task.is_completed(): - wf_handler.schedule_on_task_complete(task.task_ex) - @profiler.trace('task-handler-on-action-complete') def _on_action_complete(action_ex): @@ -128,9 +117,6 @@ def _on_action_complete(action_ex): return - if task.is_completed(): - wf_handler.schedule_on_task_complete(task_ex) - def fail_task(task_ex, msg): wf_spec = spec_parser.get_workflow_spec_by_execution_id( @@ -171,9 +157,6 @@ def continue_task(task_ex): return - if task.is_completed(): - wf_handler.schedule_on_task_complete(task_ex) - def complete_task(task_ex, state, state_info): wf_spec = spec_parser.get_workflow_spec_by_execution_id( @@ -200,15 +183,12 @@ def complete_task(task_ex, state, state_info): return - if task.is_completed(): - wf_handler.schedule_on_task_complete(task_ex) - -def _build_task_from_execution(wf_spec, task_ex, task_spec=None): +def _build_task_from_execution(wf_spec, task_ex): return _create_task( task_ex.workflow_execution, wf_spec, - task_spec or spec_parser.get_task_spec(task_ex.spec), + wf_spec.get_task(task_ex.name), task_ex.in_context, task_ex ) @@ -223,7 +203,8 @@ def _build_task_from_command(cmd): spec_parser.get_task_spec(cmd.task_ex.spec), cmd.ctx, task_ex=cmd.task_ex, - unique_key=cmd.task_ex.unique_key + unique_key=cmd.task_ex.unique_key, + waiting=cmd.task_ex.state == states.WAITING ) if cmd.reset: @@ -237,39 +218,26 @@ def _build_task_from_command(cmd): cmd.wf_spec, cmd.task_spec, cmd.ctx, - unique_key=cmd.unique_key + unique_key=cmd.unique_key, + waiting=cmd.is_waiting() ) - if cmd.is_waiting(): - task.defer() - return task raise exc.MistralError('Unsupported workflow command: %s' % cmd) def _create_task(wf_ex, wf_spec, task_spec, ctx, task_ex=None, - unique_key=None): + unique_key=None, waiting=False): if task_spec.get_with_items(): - return tasks.WithItemsTask( - wf_ex, - wf_spec, - task_spec, - ctx, - task_ex, - unique_key - ) + cls = tasks.WithItemsTask + else: + cls = tasks.RegularTask - return tasks.RegularTask( - wf_ex, - wf_spec, - task_spec, - ctx, - task_ex, - unique_key - ) + return cls(wf_ex, wf_spec, task_spec, ctx, task_ex, unique_key, waiting) +@profiler.trace('task-handler-refresh-task-state') def _refresh_task_state(task_ex_id): with db_api.transaction(): task_ex = db_api.get_task_execution(task_ex_id) @@ -315,7 +283,6 @@ def _schedule_refresh_task_state(task_ex, delay=0): :param task_ex: Task execution. :param delay: Delay. - :return: """ key = 'th_c_t_s_a-%s' % task_ex.id @@ -323,7 +290,7 @@ def _schedule_refresh_task_state(task_ex, delay=0): None, _REFRESH_TASK_STATE_PATH, delay, - unique_key=key, + key=key, task_ex_id=task_ex.id ) @@ -366,7 +333,7 @@ def schedule_on_action_complete(action_ex, delay=0): None, _SCHEDULED_ON_ACTION_COMPLETE_PATH, delay, - unique_key=key, + key=key, action_ex_id=action_ex.id, wf_action=isinstance(action_ex, models.WorkflowExecution) ) diff --git a/mistral/engine/tasks.py b/mistral/engine/tasks.py index caac96c53..180b8fd79 100644 --- a/mistral/engine/tasks.py +++ b/mistral/engine/tasks.py @@ -47,15 +47,17 @@ class Task(object): @profiler.trace('task-create') def __init__(self, wf_ex, wf_spec, task_spec, ctx, task_ex=None, - unique_key=None): + unique_key=None, waiting=False): self.wf_ex = wf_ex self.task_spec = task_spec self.ctx = ctx self.task_ex = task_ex self.wf_spec = wf_spec self.unique_key = unique_key - self.waiting = False + self.waiting = waiting self.reset_flag = False + self.created = False + self.state_changed = False def is_completed(self): return self.task_ex and states.is_completed(self.task_ex.state) @@ -63,6 +65,12 @@ class Task(object): def is_waiting(self): return self.waiting + def is_created(self): + return self.created + + def is_state_changed(self): + return self.state_changed + @abc.abstractmethod def on_action_complete(self, action_ex): """Handle action completion. @@ -82,21 +90,24 @@ class Task(object): This method puts task to a waiting state. """ - if not self.task_ex: - t_execs = db_api.get_task_executions( - workflow_execution_id=self.wf_ex.id, - name=self.task_spec.get_name() - ) + with db_api.named_lock(self.unique_key): + if not self.task_ex: + t_execs = db_api.get_task_executions( + workflow_execution_id=self.wf_ex.id, + unique_key=self.unique_key + ) - self.task_ex = t_execs[0] if t_execs else None + self.task_ex = t_execs[0] if t_execs else None - if not self.task_ex: - self._create_task_execution() + msg = 'Task is waiting.' - if self.task_ex: - self.set_state(states.WAITING, 'Task is deferred.') - - self.waiting = True + if not self.task_ex: + self._create_task_execution( + state=states.WAITING, + state_info=msg + ) + elif self.task_ex.state != states.WAITING: + self.set_state(states.WAITING, msg) def reset(self): self.reset_flag = True @@ -124,6 +135,8 @@ class Task(object): state_info) ) + self.state_changed = True + self.task_ex.state = state self.task_ex.state_info = state_info @@ -172,7 +185,7 @@ class Task(object): wf_ctrl = wf_base.get_controller(self.wf_ex, self.wf_spec) # Calculate commands to process next. - cmds = wf_ctrl.continue_workflow(self.task_ex) + cmds = wf_ctrl.continue_workflow(task_ex=self.task_ex) # Mark task as processed after all decisions have been made # upon its completion. @@ -192,14 +205,20 @@ class Task(object): for p in policies.build_policies(policies_spec, self.wf_spec): p.after_task_complete(self.task_ex, self.task_spec) - def _create_task_execution(self, state=states.RUNNING): + def _create_task_execution(self, state=states.RUNNING, state_info=None): + task_id = utils.generate_unicode_uuid() + task_name = self.task_spec.get_name() + + data_flow.add_current_task_to_context(self.ctx, task_id, task_name) + values = { - 'id': utils.generate_unicode_uuid(), - 'name': self.task_spec.get_name(), + 'id': task_id, + 'name': task_name, 'workflow_execution_id': self.wf_ex.id, 'workflow_name': self.wf_ex.workflow_name, 'workflow_id': self.wf_ex.workflow_id, 'state': state, + 'state_info': state_info, 'spec': self.task_spec.to_dict(), 'unique_key': self.unique_key, 'in_context': self.ctx, @@ -208,23 +227,13 @@ class Task(object): 'project_id': self.wf_ex.project_id } - db_api.insert_or_ignore_task_execution(values) - - # Since 'insert_or_ignore' cannot return a valid count of updated - # rows the only reliable way to check if insert operation has created - # an object is try to load this object by just generated uuid. - task_ex = db_api.load_task_execution(values['id']) - - if not task_ex: - return False - - self.task_ex = task_ex + self.task_ex = db_api.create_task_execution(values) # Add to collection explicitly so that it's in a proper # state within the current session. self.wf_ex.task_executions.append(self.task_ex) - return True + self.created = True def _get_action_defaults(self): action_name = self.task_spec.get_action_name() @@ -262,12 +271,12 @@ class RegularTask(Task): self._run_existing() def _run_new(self): - if not self._create_task_execution(): - # Task with the same unique key has already been created. + if self.waiting: + self.defer() + return - if self.waiting: - return + self._create_task_execution() LOG.debug( 'Starting task [workflow=%s, task_spec=%s, init_state=%s]' % diff --git a/mistral/engine/workflow_handler.py b/mistral/engine/workflow_handler.py index 1c47192bc..a66693eae 100644 --- a/mistral/engine/workflow_handler.py +++ b/mistral/engine/workflow_handler.py @@ -27,7 +27,9 @@ from mistral.workflow import states LOG = logging.getLogger(__name__) -_ON_TASK_COMPLETE_PATH = 'mistral.engine.workflow_handler._on_task_complete' +_CHECK_AND_COMPLETE_PATH = ( + 'mistral.engine.workflow_handler._check_and_complete' +) @profiler.trace('workflow-handler-start-workflow') @@ -38,6 +40,8 @@ def start_workflow(wf_identifier, wf_input, desc, params): wf.start(wf_input, desc=desc, params=params) + _schedule_check_and_complete(wf.wf_ex) + return wf.wf_ex @@ -73,13 +77,11 @@ def cancel_workflow(wf_ex, msg=None): stop_workflow(wf_ex, states.CANCELLED, msg) -@profiler.trace('workflow-handler-on-task-complete') -def _on_task_complete(task_ex_id): +@profiler.trace('workflow-handler-check-and-complete') +def _check_and_complete(wf_ex_id): # Note: This method can only be called via scheduler. with db_api.transaction(): - task_ex = db_api.get_task_execution(task_ex_id) - - wf_ex = task_ex.workflow_execution + wf_ex = db_api.get_workflow_execution(wf_ex_id) wf = workflows.Workflow( db_api.get_workflow_definition(wf_ex.workflow_id), @@ -87,11 +89,11 @@ def _on_task_complete(task_ex_id): ) try: - wf.on_task_complete(task_ex) + wf.check_and_complete() except exc.MistralException as e: msg = ( - "Failed to handle task completion [wf_ex=%s, task_ex=%s]:" - " %s\n%s" % (wf_ex, task_ex, e, tb.format_exc()) + "Failed to check and complete [wf_ex=%s]:" + " %s\n%s" % (wf_ex, e, tb.format_exc()) ) LOG.error(msg) @@ -105,7 +107,7 @@ def _on_task_complete(task_ex_id): # algorithm for increasing delay for rescheduling so that we don't # put too serious load onto scheduler. delay = 1 - schedule_on_task_complete(task_ex, delay) + _schedule_check_and_complete(wf_ex, delay) def pause_workflow(wf_ex, msg=None): @@ -128,6 +130,11 @@ def rerun_workflow(wf_ex, task_ex, reset=True, env=None): wf.rerun(task_ex, reset=reset, env=env) + _schedule_check_and_complete(wf_ex) + + if wf_ex.task_execution_id: + _schedule_check_and_complete(wf_ex.task_execution.workflow_execution) + def resume_workflow(wf_ex, env=None): if not states.is_paused_or_idle(wf_ex.state): @@ -153,9 +160,9 @@ def set_workflow_state(wf_ex, state, msg=None): ) -@profiler.trace('workflow-handler-schedule-on-task-complete') -def schedule_on_task_complete(task_ex, delay=0): - """Schedules task completion check. +@profiler.trace('workflow-handler-schedule-check-and-complete') +def _schedule_check_and_complete(wf_ex, delay=0): + """Schedules workflow completion check. This method provides transactional decoupling of task completion from workflow completion check. It's needed in non-locking model in order to @@ -165,16 +172,17 @@ def schedule_on_task_complete(task_ex, delay=0): have in this case (time between transactions) whereas scheduler is a special component that is designed to be resistant to failures. - :param task_ex: Task execution. + :param wf_ex: Workflow execution. :param delay: Minimum amount of time before task completion check should be made. """ - key = 'wfh_on_t_c-%s' % task_ex.workflow_execution_id + # TODO(rakhmerov): update docstring + key = 'wfh_on_c_a_c-%s' % wf_ex.id scheduler.schedule_call( None, - _ON_TASK_COMPLETE_PATH, + _CHECK_AND_COMPLETE_PATH, delay, - unique_key=key, - task_ex_id=task_ex.id + key=key, + wf_ex_id=wf_ex.id ) diff --git a/mistral/engine/workflows.py b/mistral/engine/workflows.py index acf985ca6..3cde89651 100644 --- a/mistral/engine/workflows.py +++ b/mistral/engine/workflows.py @@ -56,10 +56,18 @@ class Workflow(object): def __init__(self, wf_def, wf_ex=None): self.wf_def = wf_def self.wf_ex = wf_ex - self.wf_spec = spec_parser.get_workflow_spec_by_definition_id( - wf_def.id, - wf_def.updated_at - ) + + if wf_ex: + # We're processing a workflow that's already in progress. + self.wf_spec = spec_parser.get_workflow_spec_by_execution_id( + wf_ex.id + ) + else: + # New workflow execution. + self.wf_spec = spec_parser.get_workflow_spec_by_definition_id( + wf_def.id, + wf_def.updated_at + ) @profiler.trace('workflow-start') def start(self, input_dict, desc='', params=None): @@ -115,17 +123,6 @@ class Workflow(object): elif state == states.CANCELLED: return self._cancel_workflow(msg) - @profiler.trace('workflow-on-task-complete') - def on_task_complete(self, task_ex): - """Handle task completion event. - - :param task_ex: Task execution that's completed. - """ - - assert self.wf_ex - - self._check_and_complete() - def resume(self, env=None): """Resume workflow. @@ -190,10 +187,10 @@ class Workflow(object): if states.is_completed(t_ex.state) and not t_ex.processed: t_ex.processed = True - dispatcher.dispatch_workflow_commands(self.wf_ex, cmds) - - if not cmds: - self._check_and_complete() + if cmds: + dispatcher.dispatch_workflow_commands(self.wf_ex, cmds) + else: + self.check_and_complete() @profiler.trace('workflow-lock') def lock(self): @@ -276,13 +273,16 @@ class Workflow(object): parent_task_ex.state_info = None parent_task_ex.processed = False - def _check_and_complete(self): + @profiler.trace('workflow-check-and-complete') + def check_and_complete(self): if states.is_paused_or_completed(self.wf_ex.state): return # Workflow is not completed if there are any incomplete task # executions. - incomplete_tasks = wf_utils.find_incomplete_task_executions(self.wf_ex) + incomplete_tasks = db_api.get_incomplete_task_executions( + workflow_execution_id=self.wf_ex.id, + ) if incomplete_tasks: return diff --git a/mistral/services/scheduler.py b/mistral/services/scheduler.py index 77f016a7d..efa7337f9 100644 --- a/mistral/services/scheduler.py +++ b/mistral/services/scheduler.py @@ -37,7 +37,7 @@ _schedulers = {} def schedule_call(factory_method_path, target_method_name, - run_after, serializers=None, unique_key=None, **method_args): + run_after, serializers=None, key=None, **method_args): """Schedules call and lately invokes target_method. Add this call specification to DB, and then after run_after @@ -54,11 +54,8 @@ def schedule_call(factory_method_path, target_method_name, { "result": "mistral.utils.serializer.ResultSerializer"} Serializer for the object type must implement serializer interface in mistral/utils/serializer.py - :param unique_key: Unique key which in combination with 'processing' - flag restricts a number of delayed calls if it's passed. For example, - if we schedule two calls but pass the same unique key for them then - we won't get two of them in DB if both have same value of 'processing' - flag. + :param key: Key which can potentially be used for squashing similar + delayed calls. :param method_args: Target method keyword arguments. """ ctx_serializer = context.RpcContextSerializer( @@ -95,12 +92,12 @@ def schedule_call(factory_method_path, target_method_name, 'execution_time': execution_time, 'auth_context': ctx, 'serializers': serializers, - 'unique_key': unique_key, + 'key': key, 'method_arguments': method_args, 'processing': False } - db_api.insert_or_ignore_delayed_call(values) + db_api.create_delayed_call(values) class CallScheduler(periodic_task.PeriodicTasks): diff --git a/mistral/tests/unit/db/v2/test_insert_or_ignore.py b/mistral/tests/unit/db/v2/test_insert_or_ignore.py index ce12087ed..5ee4a054e 100644 --- a/mistral/tests/unit/db/v2/test_insert_or_ignore.py +++ b/mistral/tests/unit/db/v2/test_insert_or_ignore.py @@ -15,20 +15,39 @@ from oslo_config import cfg -from oslo_utils import timeutils from mistral.db.v2.sqlalchemy import api as db_api from mistral.db.v2.sqlalchemy import models as db_models from mistral.tests.unit import base as test_base +WF_EX = { + 'id': '1', + 'spec': {}, + 'start_params': {'task': 'my_task1'}, + 'state': 'IDLE', + 'state_info': "Running...", + 'created_at': None, + 'updated_at': None, + 'context': None, + 'task_id': None, + 'trust_id': None, + 'description': None, + 'output': None +} -DELAYED_CALL = { - 'factory_method_path': 'my_factory_method', - 'target_method_name': 'my_target_method', - 'method_arguments': None, - 'serializers': None, - 'auth_context': None, - 'execution_time': timeutils.utcnow() + +TASK_EX = { + 'workflow_execution_id': '1', + 'workflow_name': 'my_wf', + 'name': 'my_task1', + 'spec': None, + 'action_spec': None, + 'state': 'IDLE', + 'tags': ['deployment'], + 'in_context': None, + 'runtime_context': None, + 'created_at': None, + 'updated_at': None } @@ -38,6 +57,8 @@ class InsertOrIgnoreTest(test_base.DbTestCase): cfg.CONF.set_default('auth_enable', True, group='pecan') + db_api.create_workflow_execution(WF_EX) + self.addCleanup( cfg.CONF.set_default, 'auth_enable', @@ -47,46 +68,46 @@ class InsertOrIgnoreTest(test_base.DbTestCase): def test_insert_or_ignore_without_conflicts(self): db_api.insert_or_ignore( - db_models.DelayedCall, - DELAYED_CALL.copy() + db_models.TaskExecution, + TASK_EX.copy() ) - delayed_calls = db_api.get_delayed_calls() + task_execs = db_api.get_task_executions() - self.assertEqual(1, len(delayed_calls)) + self.assertEqual(1, len(task_execs)) - delayed_call = delayed_calls[0] + task_ex = task_execs[0] - self._assert_dict_contains_subset(DELAYED_CALL, delayed_call.to_dict()) + self._assert_dict_contains_subset(TASK_EX, task_ex.to_dict()) def test_insert_or_ignore_with_conflicts(self): # Insert the first object. - values = DELAYED_CALL.copy() + values = TASK_EX.copy() values['unique_key'] = 'key' - db_api.insert_or_ignore(db_models.DelayedCall, values) + db_api.insert_or_ignore(db_models.TaskExecution, values) - delayed_calls = db_api.get_delayed_calls() + task_execs = db_api.get_task_executions() - self.assertEqual(1, len(delayed_calls)) + self.assertEqual(1, len(task_execs)) - delayed_call = delayed_calls[0] + task_ex = task_execs[0] - self._assert_dict_contains_subset(DELAYED_CALL, delayed_call.to_dict()) + self._assert_dict_contains_subset(TASK_EX, task_ex.to_dict()) # Insert the second object with the same unique key. # We must not get exceptions and new object must not be saved. - values = DELAYED_CALL.copy() + values = TASK_EX.copy() values['unique_key'] = 'key' - db_api.insert_or_ignore(db_models.DelayedCall, values) + db_api.insert_or_ignore(db_models.TaskExecution, values) - delayed_calls = db_api.get_delayed_calls() + task_execs = db_api.get_task_executions() - self.assertEqual(1, len(delayed_calls)) + self.assertEqual(1, len(task_execs)) - delayed_call = delayed_calls[0] + task_ex = task_execs[0] - self._assert_dict_contains_subset(DELAYED_CALL, delayed_call.to_dict()) + self._assert_dict_contains_subset(TASK_EX, task_ex.to_dict()) diff --git a/mistral/tests/unit/db/v2/test_sqlalchemy_db_api.py b/mistral/tests/unit/db/v2/test_sqlalchemy_db_api.py index f3a3964f4..a686568e2 100644 --- a/mistral/tests/unit/db/v2/test_sqlalchemy_db_api.py +++ b/mistral/tests/unit/db/v2/test_sqlalchemy_db_api.py @@ -2246,13 +2246,13 @@ class LockTest(SQLAlchemyTest): self.assertEqual('lock1', locks[0].name) - db_api.delete_named_lock('invalid_lock_name') + db_api.delete_named_lock('invalid_lock_id') locks = db_api.get_named_locks() self.assertEqual(1, len(locks)) - db_api.delete_named_lock(locks[0].name) + db_api.delete_named_lock(locks[0].id) locks = db_api.get_named_locks() diff --git a/mistral/tests/unit/engine/base.py b/mistral/tests/unit/engine/base.py index 3d131aa2a..524e5bb00 100644 --- a/mistral/tests/unit/engine/base.py +++ b/mistral/tests/unit/engine/base.py @@ -141,8 +141,8 @@ class EngineTestCase(base.DbTestCase): for w in wf_execs: print( - "\n%s [state=%s, state_info=%s, output=%s]" % - (w.name, w.state, w.state_info, w.output) + "\n%s (%s) [state=%s, state_info=%s, output=%s]" % + (w.name, w.id, w.state, w.state_info, w.output) ) for t in w.task_executions: diff --git a/mistral/tests/unit/engine/test_dataflow.py b/mistral/tests/unit/engine/test_dataflow.py index 0ce5a78b3..a2323af21 100644 --- a/mistral/tests/unit/engine/test_dataflow.py +++ b/mistral/tests/unit/engine/test_dataflow.py @@ -163,6 +163,7 @@ class DataFlowEngineTest(engine_test_base.EngineTestCase): self.assertDictEqual(exp_published_arr[0], task1.published) self.assertDictEqual(exp_published_arr[1], task2.published) self.assertDictEqual(exp_published_arr[2], task3.published) + self.assertIn(exp_published_arr[0]['progress'], notify_published_arr) self.assertIn(exp_published_arr[1]['progress'], notify_published_arr) self.assertIn(exp_published_arr[2]['progress'], notify_published_arr) diff --git a/mistral/tests/unit/services/test_scheduler.py b/mistral/tests/unit/services/test_scheduler.py index d952e2546..15db8fb44 100644 --- a/mistral/tests/unit/services/test_scheduler.py +++ b/mistral/tests/unit/services/test_scheduler.py @@ -341,46 +341,3 @@ class SchedulerServiceTest(base.DbTestCase): db_api.get_delayed_call(calls[0].id) db_api.delete_delayed_call(calls[0].id) - - def test_schedule_with_unique_key(self): - method_args = {'name': 'task', 'id': '321'} - - key = 'my_unique_key' - - scheduler.schedule_call( - None, - TARGET_METHOD_PATH, - DELAY, - unique_key=key, - **method_args - ) - - self.assertEqual(1, len(db_api.get_delayed_calls())) - - # Schedule the call for the second time, number of calls - # must not change due to the same unique key. - scheduler.schedule_call( - None, - TARGET_METHOD_PATH, - DELAY, - unique_key=key, - **method_args - ) - - calls = db_api.get_delayed_calls() - - self.assertEqual(1, len(calls)) - - # Now change 'processing' flag and make sure we can schedule - # one more call because DB constraint allows it. - db_api.update_delayed_call(calls[0].id, {'processing': True}) - - scheduler.schedule_call( - None, - TARGET_METHOD_PATH, - DELAY, - unique_key=key, - **method_args - ) - - self.assertEqual(2, len(db_api.get_delayed_calls())) diff --git a/mistral/tests/unit/test_command_dispatcher.py b/mistral/tests/unit/test_command_dispatcher.py new file mode 100644 index 000000000..2479a7278 --- /dev/null +++ b/mistral/tests/unit/test_command_dispatcher.py @@ -0,0 +1,75 @@ +# Copyright 2013 - Mirantis, Inc. +# Copyright 2015 - StackStorm, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from mistral.engine import dispatcher +from mistral.tests.unit import base +from mistral.workflow import commands + + +def _print_commands(cmds): + print("commands:") + + for cmd in cmds: + if isinstance(cmd, commands.RunTask): + print("%s, %s, %s" % (type(cmd), cmd.is_waiting(), cmd.unique_key)) + else: + print("%s" % type(cmd)) + + +class CommandDispatcherTest(base.BaseTest): + def setUp(self): + super(CommandDispatcherTest, self).setUp() + + def test_rearrange_commands(self): + no_wait = commands.RunTask(None, None, None, None) + fail = commands.FailWorkflow(None, None, None, None) + succeed = commands.SucceedWorkflow(None, None, None, None) + + wait1 = commands.RunTask(None, None, None, None) + wait1.wait = True + wait1.unique_key = 'wait1' + + wait2 = commands.RunTask(None, None, None, None) + wait2.wait = True + wait2.unique_key = 'wait2' + + wait3 = commands.RunTask(None, None, None, None) + wait3.wait = True + wait3.unique_key = 'wait3' + + # 'set state' command is the first, others must be ignored. + initial = [fail, no_wait, wait1, wait3, wait2] + expected = [fail] + + cmds = dispatcher._rearrange_commands(initial) + + self.assertEqual(expected, cmds) + + # 'set state' command is the last, tasks before it must be sorted. + initial = [no_wait, wait2, wait1, wait3, succeed] + expected = [no_wait, wait1, wait2, wait3, succeed] + + cmds = dispatcher._rearrange_commands(initial) + + self.assertEqual(expected, cmds) + + # 'set state' command is in the middle, tasks before it must be sorted + # and the task after it must be ignored. + initial = [wait3, wait2, no_wait, succeed, wait1] + expected = [no_wait, wait2, wait3, succeed] + + cmds = dispatcher._rearrange_commands(initial) + + self.assertEqual(expected, cmds) diff --git a/mistral/utils/yaql_utils.py b/mistral/utils/yaql_utils.py index 8e42c8464..666c4740e 100644 --- a/mistral/utils/yaql_utils.py +++ b/mistral/utils/yaql_utils.py @@ -19,6 +19,8 @@ from mistral.db.v2 import api as db_api from mistral.workflow import utils as wf_utils from oslo_serialization import jsonutils from stevedore import extension + + ROOT_CONTEXT = None @@ -36,6 +38,7 @@ def get_yaql_context(data_context): if isinstance(data_context, dict): new_ctx['__env'] = data_context.get('__env') new_ctx['__execution'] = data_context.get('__execution') + new_ctx['__task_execution'] = data_context.get('__task_execution') return new_ctx @@ -85,11 +88,18 @@ def task_(context, task_name): wf_ex = db_api.get_workflow_execution(context['__execution']['id']) - task_execs = wf_utils.find_task_executions_by_name(wf_ex, task_name) + # This section may not exist in a context if it's calculated not in + # task scope. + cur_task = context['__task_execution'] - # TODO(rakhmerov): Account for multiple executions (i.e. in case of - # cycles). - task_ex = task_execs[-1] if len(task_execs) > 0 else None + if cur_task and cur_task['name'] == task_name: + task_ex = db_api.get_task_execution(cur_task['id']) + else: + task_execs = wf_utils.find_task_executions_by_name(wf_ex, task_name) + + # TODO(rakhmerov): Account for multiple executions (i.e. in case of + # cycles). + task_ex = task_execs[-1] if len(task_execs) > 0 else None if not task_ex: raise ValueError( diff --git a/mistral/workbook/v2/workflows.py b/mistral/workbook/v2/workflows.py index 8b72da2fa..fe150cd36 100644 --- a/mistral/workbook/v2/workflows.py +++ b/mistral/workbook/v2/workflows.py @@ -130,6 +130,9 @@ class WorkflowSpec(base.BaseSpec): def get_tasks(self): return self._tasks + def get_task(self, name): + return self._tasks[name] + class DirectWorkflowSpec(WorkflowSpec): _polymorphic_value = 'direct' diff --git a/mistral/workflow/data_flow.py b/mistral/workflow/data_flow.py index 48fd088b3..f0498cc74 100644 --- a/mistral/workflow/data_flow.py +++ b/mistral/workflow/data_flow.py @@ -138,6 +138,15 @@ def evaluate_workflow_output(wf_spec, ctx): return output or ctx +def add_current_task_to_context(ctx, task_id, task_name): + ctx['__task_execution'] = { + 'id': task_id, + 'name': task_name + } + + return ctx + + def add_openstack_data_to_context(wf_ex): wf_ex.context = wf_ex.context or {} diff --git a/mistral/workflow/direct_workflow.py b/mistral/workflow/direct_workflow.py index b5937313e..92092832c 100644 --- a/mistral/workflow/direct_workflow.py +++ b/mistral/workflow/direct_workflow.py @@ -113,6 +113,8 @@ class DirectWorkflowController(base.WorkflowController): cmds = [] + ctx = data_flow.evaluate_task_outbound_context(task_ex) + for t_n, params in self._find_next_tasks(task_ex): t_s = self.wf_spec.get_tasks()[t_n] @@ -126,7 +128,7 @@ class DirectWorkflowController(base.WorkflowController): self.wf_ex, self.wf_spec, t_s, - data_flow.evaluate_task_outbound_context(task_ex), + ctx, params ) diff --git a/mistral/workflow/reverse_workflow.py b/mistral/workflow/reverse_workflow.py index 8b17554d2..ebeb076eb 100644 --- a/mistral/workflow/reverse_workflow.py +++ b/mistral/workflow/reverse_workflow.py @@ -40,7 +40,7 @@ class ReverseWorkflowController(base.WorkflowController): __workflow_type__ = "reverse" - def _find_next_commands(self, task_ex=None): + def _find_next_commands(self, task_ex): """Finds all tasks with resolved dependencies. This method finds all tasks with resolved dependencies and diff --git a/mistral/workflow/utils.py b/mistral/workflow/utils.py index 06fe8720f..ef2962d6f 100644 --- a/mistral/workflow/utils.py +++ b/mistral/workflow/utils.py @@ -70,6 +70,12 @@ class ResultSerializer(serializers.Serializer): ) +# TODO(rakhmerov): Most of these 'find..' methods should be replaced +# with corresponding methods on DB API because in most cases +# we don't need to fetch the whole collection of task executions +# from DB. We can just query only needed objects by needed criteria. +# In practice it leads to serious performance loss. + def find_task_execution_not_state(wf_ex, task_spec, state): task_execs = [ t for t in wf_ex.task_executions @@ -123,11 +129,6 @@ def find_successful_task_executions(wf_ex): return find_task_executions_with_state(wf_ex, states.SUCCESS) -def find_incomplete_task_executions(wf_ex): - return [t for t in wf_ex.task_executions - if not states.is_completed(t.state)] - - def find_error_task_executions(wf_ex): return find_task_executions_with_state(wf_ex, states.ERROR)