From 84b8e92acc142d9cf13c44104cf66edb08b4a9de Mon Sep 17 00:00:00 2001 From: Mike Fedosin Date: Thu, 18 Apr 2019 12:16:15 +0200 Subject: [PATCH] Get rid of lookup utils With the new joining mechanism lookup utils for task executions become useless and can be removed from the codebase. Now all requests to the database will be perforemed directly from a workflow controller. Action defention cache was moved to mistral/engine/actions.py, because it's needed only there. Change-Id: If0d4403f5c61883ecfec4cfa14b98cc39aae5618 --- mistral/engine/actions.py | 45 +++- mistral/engine/workflows.py | 35 ++-- mistral/tests/unit/base.py | 4 +- ...lookup_utils.py => test_action_caching.py} | 77 +------ mistral/workflow/base.py | 12 +- mistral/workflow/direct_workflow.py | 13 +- mistral/workflow/lookup_utils.py | 192 ------------------ mistral/workflow/reverse_workflow.py | 14 +- 8 files changed, 73 insertions(+), 319 deletions(-) rename mistral/tests/unit/engine/{test_lookup_utils.py => test_action_caching.py} (53%) delete mode 100644 mistral/workflow/lookup_utils.py diff --git a/mistral/engine/actions.py b/mistral/engine/actions.py index 8e50b5596..9485047be 100644 --- a/mistral/engine/actions.py +++ b/mistral/engine/actions.py @@ -14,7 +14,10 @@ # See the License for the specific language governing permissions and # limitations under the License. +import threading + import abc +import cachetools from oslo_config import cfg from oslo_log import log as logging from osprofiler import profiler @@ -34,12 +37,42 @@ from mistral.services import security from mistral import utils from mistral.utils import wf_trace from mistral.workflow import data_flow -from mistral.workflow import lookup_utils from mistral.workflow import states from mistral_lib import actions as ml_actions LOG = logging.getLogger(__name__) +CONF = cfg.CONF + + +_ACTION_DEF_CACHE = cachetools.TTLCache( + maxsize=1000, + ttl=CONF.engine.action_definition_cache_time # 60 seconds by default +) + +_ACTION_DEF_CACHE_LOCK = threading.RLock() + + +def _find_action_definition_by_name(action_name): + """Find action definition name. + + :param action_name: Action name. + :return: Action definition (possibly a cached value). + """ + with _ACTION_DEF_CACHE_LOCK: + action_def = _ACTION_DEF_CACHE.get(action_name) + + if action_def: + return action_def + + action_def = db_api.load_action_definition(action_name) + + with _ACTION_DEF_CACHE_LOCK: + _ACTION_DEF_CACHE[action_name] = ( + action_def.get_clone() if action_def else None + ) + + return action_def @six.add_metaclass(abc.ABCMeta) @@ -393,7 +426,7 @@ class AdHocAction(PythonAction): wf_ctx=None): self.action_spec = spec_parser.get_action_spec(action_def.spec) - base_action_def = lookup_utils.find_action_definition_by_name( + base_action_def = _find_action_definition_by_name( self.action_spec.get_base() ) @@ -671,14 +704,10 @@ def resolve_action_definition(action_spec_name, wf_name=None, action_full_name = "%s.%s" % (wb_name, action_spec_name) - action_db = lookup_utils.find_action_definition_by_name( - action_full_name - ) + action_db = _find_action_definition_by_name(action_full_name) if not action_db: - action_db = lookup_utils.find_action_definition_by_name( - action_spec_name - ) + action_db = _find_action_definition_by_name(action_spec_name) if not action_db: raise exc.InvalidActionException( diff --git a/mistral/engine/workflows.py b/mistral/engine/workflows.py index 998dd6f52..0a3ce8a1d 100644 --- a/mistral/engine/workflows.py +++ b/mistral/engine/workflows.py @@ -40,7 +40,6 @@ from mistral.utils import wf_trace from mistral.workflow import base as wf_base from mistral.workflow import commands from mistral.workflow import data_flow -from mistral.workflow import lookup_utils from mistral.workflow import states from mistral_lib import actions as ml_actions @@ -227,11 +226,6 @@ class Workflow(object): assert self.wf_ex - # Since some lookup utils functions may use cache for completed tasks - # we need to clean caches to make sure that stale objects can't be - # retrieved. - lookup_utils.clear_caches() - wf_service.update_workflow_execution_env(self.wf_ex, env) self._recursive_rerun() @@ -401,10 +395,6 @@ class Workflow(object): self.wf_ex.accepted = states.is_completed(state) if states.is_completed(state): - # No need to keep task executions of this workflow in the - # lookup cache anymore. - lookup_utils.invalidate_cached_task_executions(self.wf_ex.id) - triggers.on_workflow_complete(self.wf_ex) return True @@ -618,13 +608,13 @@ def _get_environment(params): def _build_fail_info_message(wf_ctrl, wf_ex): # Try to find where error is exactly. - failed_tasks = sorted( - filter( - lambda t_ex: not wf_ctrl.is_error_handled_for(t_ex), - lookup_utils.find_error_task_executions(wf_ex.id) - ), - key=lambda t: t.name - ) + failed_tasks = [ + t_ex for t_ex in db_api.get_task_executions( + workflow_execution_id=wf_ex.id, + state=states.ERROR, + sort_keys=['name'] + ) if not wf_ctrl.is_error_handled_for(t_ex) + ] msg = ('Failure caused by error in tasks: %s\n' % ', '.join([t.name for t in failed_tasks])) @@ -659,10 +649,13 @@ def _build_fail_info_message(wf_ctrl, wf_ex): def _build_cancel_info_message(wf_ctrl, wf_ex): # Try to find where cancel is exactly. - cancelled_tasks = sorted( - lookup_utils.find_cancelled_task_executions(wf_ex.id), - key=lambda t: t.name - ) + cancelled_tasks = [ + t_ex for t_ex in db_api.get_task_executions( + workflow_execution_id=wf_ex.id, + state=states.CANCELLED, + sort_keys=['name'] + ) + ] return ( 'Cancelled tasks: %s' % ', '.join([t.name for t in cancelled_tasks]) diff --git a/mistral/tests/unit/base.py b/mistral/tests/unit/base.py index 8f00b68f4..67fdc5375 100644 --- a/mistral/tests/unit/base.py +++ b/mistral/tests/unit/base.py @@ -30,13 +30,13 @@ from mistral import context as auth_context from mistral.db.sqlalchemy import base as db_sa_base from mistral.db.sqlalchemy import sqlite_lock from mistral.db.v2 import api as db_api +from mistral.engine import actions from mistral.lang import parser as spec_parser from mistral.services import action_manager from mistral.services import security from mistral.tests.unit import config as test_config from mistral.utils import inspect_utils as i_utils from mistral import version -from mistral.workflow import lookup_utils RESOURCES_PATH = 'tests/resources/' LOG = logging.getLogger(__name__) @@ -279,7 +279,7 @@ class DbTestCase(BaseTest): action_manager.sync_db() def _clean_db(self): - lookup_utils.clear_caches() + actions._ACTION_DEF_CACHE.clear() contexts = [ get_context(default=False), diff --git a/mistral/tests/unit/engine/test_lookup_utils.py b/mistral/tests/unit/engine/test_action_caching.py similarity index 53% rename from mistral/tests/unit/engine/test_lookup_utils.py rename to mistral/tests/unit/engine/test_action_caching.py index 35e9cfdd4..fb7b41aea 100644 --- a/mistral/tests/unit/engine/test_lookup_utils.py +++ b/mistral/tests/unit/engine/test_action_caching.py @@ -18,11 +18,10 @@ import cachetools from oslo_config import cfg from mistral.db.v2 import api as db_api +from mistral.engine import actions from mistral.services import actions as action_service from mistral.services import workflows as wf_service from mistral.tests.unit.engine import base -from mistral.workflow import lookup_utils -from mistral.workflow import states # Use the set_default method to set value otherwise in certain test cases # the change in value is not permanent. @@ -30,60 +29,6 @@ cfg.CONF.set_default('auth_enable', False, group='pecan') class LookupUtilsTest(base.EngineTestCase): - def test_task_execution_cache_invalidation(self): - wf_text = """--- - version: '2.0' - - wf: - tasks: - task1: - action: std.noop - on-success: join_task - - task2: - action: std.noop - on-success: join_task - - join_task: - join: all - on-success: task4 - - task4: - action: std.noop - pause-before: true - """ - - wf_service.create_workflows(wf_text) - - # Start workflow. - wf_ex = self.engine.start_workflow('wf') - - self.await_workflow_paused(wf_ex.id) - - with db_api.transaction(): - # Note: We need to reread execution to access related tasks. - wf_ex = db_api.get_workflow_execution(wf_ex.id) - - tasks = wf_ex.task_executions - - self.assertEqual(4, len(tasks)) - - self._assert_single_item(tasks, name='task1', state=states.SUCCESS) - self._assert_single_item(tasks, name='task2', state=states.SUCCESS) - self._assert_single_item(tasks, name='join_task', state=states.SUCCESS) - self._assert_single_item(tasks, name='task4', state=states.IDLE) - - # Expecting one cache entry because we know that 'join' operation - # uses cached lookups and the workflow is not finished yet. - self.assertEqual(1, lookup_utils.get_task_execution_cache_size()) - - self.engine.resume_workflow(wf_ex.id) - - self.await_workflow_success(wf_ex.id) - - # Expecting that the cache size is 0 because the workflow has - # finished and invalidated corresponding cache entry. - self.assertEqual(0, lookup_utils.get_task_execution_cache_size()) def test_action_definition_cache_ttl(self): action = """--- @@ -135,7 +80,7 @@ class LookupUtilsTest(base.EngineTestCase): ttl=5 # 5 seconds ) cache_patch = mock.patch.object( - lookup_utils, '_ACTION_DEF_CACHE', new_cache) + actions, '_ACTION_DEF_CACHE', new_cache) cache_patch.start() self.addCleanup(cache_patch.stop) @@ -145,24 +90,24 @@ class LookupUtilsTest(base.EngineTestCase): self.await_workflow_paused(wf_ex.id) # Check that 'action1' 'echo' and 'noop' are cached. - self.assertEqual(3, lookup_utils.get_action_definition_cache_size()) - self.assertIn('action1', lookup_utils._ACTION_DEF_CACHE) - self.assertIn('std.noop', lookup_utils._ACTION_DEF_CACHE) - self.assertIn('std.echo', lookup_utils._ACTION_DEF_CACHE) + self.assertEqual(3, len(actions._ACTION_DEF_CACHE)) + self.assertIn('action1', actions._ACTION_DEF_CACHE) + self.assertIn('std.noop', actions._ACTION_DEF_CACHE) + self.assertIn('std.echo', actions._ACTION_DEF_CACHE) # Wait some time until cache expires self._await( - lambda: lookup_utils.get_action_definition_cache_size() == 0, + lambda: len(actions._ACTION_DEF_CACHE) == 0, fail_message="No triggers were found" ) - self.assertEqual(0, lookup_utils.get_action_definition_cache_size()) + self.assertEqual(0, len(actions._ACTION_DEF_CACHE)) self.engine.resume_workflow(wf_ex.id) self.await_workflow_success(wf_ex.id) # Check all actions are cached again. - self.assertEqual(2, lookup_utils.get_action_definition_cache_size()) - self.assertIn('action1', lookup_utils._ACTION_DEF_CACHE) - self.assertIn('std.echo', lookup_utils._ACTION_DEF_CACHE) + self.assertEqual(2, len(actions._ACTION_DEF_CACHE)) + self.assertIn('action1', actions._ACTION_DEF_CACHE) + self.assertIn('std.echo', actions._ACTION_DEF_CACHE) diff --git a/mistral/workflow/base.py b/mistral/workflow/base.py index 1df67e060..282ca298a 100644 --- a/mistral/workflow/base.py +++ b/mistral/workflow/base.py @@ -26,7 +26,6 @@ from mistral.lang import parser as spec_parser from mistral import utils as u from mistral.workflow import commands from mistral.workflow import data_flow -from mistral.workflow import lookup_utils from mistral.workflow import states @@ -203,9 +202,7 @@ class WorkflowController(object): :return: True if there is one or more tasks in cancelled state. """ - t_execs = lookup_utils.find_cancelled_task_executions(self.wf_ex.id) - - return len(t_execs) > 0 + return len(self._get_task_executions(state=states.CANCELLED)) > 0 @abc.abstractmethod def evaluate_workflow_final_context(self): @@ -252,14 +249,9 @@ class WorkflowController(object): return [] # Add all tasks in IDLE state. - idle_tasks = lookup_utils.find_task_executions_with_state( - self.wf_ex.id, - states.IDLE - ) - return [ commands.RunExistingTask(self.wf_ex, self.wf_spec, t) - for t in idle_tasks + for t in self._get_task_executions(state=states.IDLE) ] def _is_paused_or_completed(self): diff --git a/mistral/workflow/direct_workflow.py b/mistral/workflow/direct_workflow.py index a646fe4a6..ec8b893a3 100644 --- a/mistral/workflow/direct_workflow.py +++ b/mistral/workflow/direct_workflow.py @@ -22,7 +22,6 @@ from mistral import utils from mistral.workflow import base from mistral.workflow import commands from mistral.workflow import data_flow -from mistral.workflow import lookup_utils from mistral.workflow import states @@ -198,7 +197,7 @@ class DirectWorkflowController(base.WorkflowController): return bool(self.wf_spec.get_on_error_clause(task_ex.name)) def all_errors_handled(self): - cnt = lookup_utils.find_task_executions_count( + cnt = db_api.get_task_executions_count( workflow_execution_id=self.wf_ex.id, state=states.ERROR, error_handled=False @@ -207,7 +206,7 @@ class DirectWorkflowController(base.WorkflowController): return cnt == 0 def _find_end_task_executions_as_batches(self): - batches = lookup_utils.find_completed_task_executions_as_batches( + batches = db_api.get_completed_task_executions_as_batches( workflow_execution_id=self.wf_ex.id, has_next_tasks=False ) @@ -500,7 +499,6 @@ class DirectWorkflowController(base.WorkflowController): if self._is_conditional_transition(in_task_ex, in_task_spec) and \ not hasattr(in_task_ex, "in_context"): in_task_ex = db_api.get_task_execution(in_task_ex.id) - t_execs_cache[in_task_ex.name] = in_task_ex # [(task name, params, event name), ...] next_tasks_tuples = self._find_next_tasks(in_task_ex) @@ -515,11 +513,7 @@ class DirectWorkflowController(base.WorkflowController): def _find_task_execution_by_name(self, t_name): # Note: in case of 'join' completion check it's better to initialize # the entire task_executions collection to avoid too many DB queries. - - t_execs = lookup_utils.find_task_executions_by_name( - self.wf_ex.id, - t_name - ) + t_execs = self._get_task_executions(name=t_name) # TODO(rakhmerov): Temporary hack. See the previous comment. return t_execs[-1] if t_execs else None @@ -560,7 +554,6 @@ class DirectWorkflowController(base.WorkflowController): if self._is_conditional_transition(t_ex, task_spec) and \ not hasattr(t_ex, "in_context"): t_ex = db_api.get_task_execution(t_ex.id) - t_execs_cache[t_ex.name] = t_ex if t_name in self._find_next_task_names(t_ex): return True, depth diff --git a/mistral/workflow/lookup_utils.py b/mistral/workflow/lookup_utils.py deleted file mode 100644 index 5ded7be5f..000000000 --- a/mistral/workflow/lookup_utils.py +++ /dev/null @@ -1,192 +0,0 @@ -# Copyright 2015 - Mirantis, Inc. -# Copyright 2015 - StackStorm, Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -""" -The intention of the module is providing various DB related lookup functions - for more convenient usage within the workflow engine. - -Some of the functions may provide caching capabilities. - -WARNING: Oftentimes, persistent objects returned by the methods in this -module won't be attached to the current DB SQLAlchemy session because -they are returned from the cache and therefore they need to be used -carefully without trying to do any lazy loading etc. -These objects are also not suitable for re-attaching them to a session -in order to update their persistent DB state. -Mostly, they are useful for doing any kind of fast lookups with in order -to make some decision based on their state. -""" - -import threading - -import cachetools -from oslo_config import cfg - -from mistral.db.v2 import api as db_api -from mistral.workflow import states - - -CONF = cfg.CONF - - -def _create_workflow_execution_cache(): - return cachetools.LRUCache(maxsize=500) - - -# This is a two-level caching structure. -# First level: [ -> ] -# Second level (task execution cache): [ -> ] -# The first level (by workflow execution id) allows to invalidate -# needed cache entry when the workflow gets completed. -_TASK_EX_CACHE = cachetools.LRUCache(maxsize=100) - - -_ACTION_DEF_CACHE = cachetools.TTLCache( - maxsize=1000, - ttl=CONF.engine.action_definition_cache_time # 60 seconds by default -) - -_TASK_EX_CACHE_LOCK = threading.RLock() -_ACTION_DEF_CACHE_LOCK = threading.RLock() - - -def find_action_definition_by_name(action_name): - """Find action definition name. - - :param action_name: Action name. - :return: Action definition (possibly a cached value). - """ - with _ACTION_DEF_CACHE_LOCK: - action_def = _ACTION_DEF_CACHE.get(action_name) - - if action_def: - return action_def - - action_def = db_api.load_action_definition(action_name) - - with _ACTION_DEF_CACHE_LOCK: - _ACTION_DEF_CACHE[action_name] = ( - action_def.get_clone() if action_def else None - ) - - return action_def - - -def find_task_executions_by_name(wf_ex_id, task_name): - """Finds task executions by workflow execution id and task name. - - :param wf_ex_id: Workflow execution id. - :param task_name: Task name. - :return: Task executions (possibly a cached value). The returned list - may contain task execution clones not bound to the DB session. - """ - with _TASK_EX_CACHE_LOCK: - if wf_ex_id in _TASK_EX_CACHE: - wf_ex_cache = _TASK_EX_CACHE[wf_ex_id] - else: - wf_ex_cache = _create_workflow_execution_cache() - - _TASK_EX_CACHE[wf_ex_id] = wf_ex_cache - - t_execs = wf_ex_cache.get(task_name) - - if t_execs: - return t_execs - - t_execs = db_api.get_task_executions( - workflow_execution_id=wf_ex_id, - name=task_name, - sort_keys=[] # disable sorting - ) - - t_execs = [t_ex.get_clone() for t_ex in t_execs] - - # We can cache only finished tasks because they won't change. - all_finished = ( - t_execs and - all([states.is_completed(t_ex.state) for t_ex in t_execs]) - ) - - if all_finished: - with _TASK_EX_CACHE_LOCK: - wf_ex_cache[task_name] = t_execs - - return t_execs - - -def find_task_executions_by_spec(wf_ex_id, task_spec): - return find_task_executions_by_name(wf_ex_id, task_spec.get_name()) - - -def find_task_executions_by_specs(wf_ex_id, task_specs): - res = [] - - for t_s in task_specs: - res = res + find_task_executions_by_spec(wf_ex_id, t_s) - - return res - - -def find_task_executions_with_state(wf_ex_id, state): - return db_api.get_task_executions( - workflow_execution_id=wf_ex_id, - state=state - ) - - -def find_successful_task_executions(wf_ex_id): - return find_task_executions_with_state(wf_ex_id, states.SUCCESS) - - -def find_error_task_executions(wf_ex_id): - return find_task_executions_with_state(wf_ex_id, states.ERROR) - - -def find_cancelled_task_executions(wf_ex_id): - return find_task_executions_with_state(wf_ex_id, states.CANCELLED) - - -def find_completed_task_executions(wf_ex_id): - return db_api.get_completed_task_executions(workflow_execution_id=wf_ex_id) - - -def find_completed_task_executions_as_batches(**kwargs): - return db_api.get_completed_task_executions_as_batches(**kwargs) - - -def find_task_executions_count(**kwargs): - return db_api.get_task_executions_count(**kwargs) - - -def get_task_execution_cache_size(): - return len(_TASK_EX_CACHE) - - -def get_action_definition_cache_size(): - return len(_ACTION_DEF_CACHE) - - -def invalidate_cached_task_executions(wf_ex_id): - with _TASK_EX_CACHE_LOCK: - if wf_ex_id in _TASK_EX_CACHE: - del _TASK_EX_CACHE[wf_ex_id] - - -def clear_caches(): - with _TASK_EX_CACHE_LOCK: - _TASK_EX_CACHE.clear() - - with _ACTION_DEF_CACHE_LOCK: - _ACTION_DEF_CACHE.clear() diff --git a/mistral/workflow/reverse_workflow.py b/mistral/workflow/reverse_workflow.py index 6efb2ce5a..21e3a4af7 100644 --- a/mistral/workflow/reverse_workflow.py +++ b/mistral/workflow/reverse_workflow.py @@ -19,7 +19,6 @@ from mistral import exceptions as exc from mistral.workflow import base from mistral.workflow import commands from mistral.workflow import data_flow -from mistral.workflow import lookup_utils from mistral.workflow import states @@ -89,10 +88,8 @@ class ReverseWorkflowController(base.WorkflowController): return [t_ex for t_ex in t_execs if t_ex.state == states.SUCCESS] def evaluate_workflow_final_context(self): - task_execs = lookup_utils.find_task_executions_by_spec( - self.wf_ex.id, - self._get_target_task_specification() - ) + task_name = self._get_target_task_specification().get_name() + task_execs = self._get_task_executions(name=task_name) # NOTE: For reverse workflow there can't be multiple # executions for one task. @@ -114,9 +111,7 @@ class ReverseWorkflowController(base.WorkflowController): return task_ex.state != states.ERROR def all_errors_handled(self): - task_execs = lookup_utils.find_error_task_executions(self.wf_ex.id) - - return len(task_execs) == 0 + return len(self._get_task_executions(state=states.ERROR)) == 0 def _find_task_specs_with_satisfied_dependencies(self): """Given a target task name finds tasks with no dependencies. @@ -139,8 +134,7 @@ class ReverseWorkflowController(base.WorkflowController): ] def _is_satisfied_task(self, task_spec): - if lookup_utils.find_task_executions_by_spec( - self.wf_ex.id, task_spec): + if self._get_task_executions(name=task_spec.get_name()): return False if not self.wf_spec.get_task_requires(task_spec):