Get rid of lookup utils

With the new joining mechanism lookup utils for task executions
become useless and can be removed from the codebase.
Now all requests to the database will be perforemed directly from
a workflow controller.

Action defention cache was moved to mistral/engine/actions.py,
because it's needed only there.

Change-Id: If0d4403f5c61883ecfec4cfa14b98cc39aae5618
This commit is contained in:
Mike Fedosin 2019-04-18 12:16:15 +02:00
parent fefdcd77cd
commit 84b8e92acc
8 changed files with 73 additions and 319 deletions

View File

@ -14,7 +14,10 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import threading
import abc
import cachetools
from oslo_config import cfg
from oslo_log import log as logging
from osprofiler import profiler
@ -34,12 +37,42 @@ from mistral.services import security
from mistral import utils
from mistral.utils import wf_trace
from mistral.workflow import data_flow
from mistral.workflow import lookup_utils
from mistral.workflow import states
from mistral_lib import actions as ml_actions
LOG = logging.getLogger(__name__)
CONF = cfg.CONF
_ACTION_DEF_CACHE = cachetools.TTLCache(
maxsize=1000,
ttl=CONF.engine.action_definition_cache_time # 60 seconds by default
)
_ACTION_DEF_CACHE_LOCK = threading.RLock()
def _find_action_definition_by_name(action_name):
"""Find action definition name.
:param action_name: Action name.
:return: Action definition (possibly a cached value).
"""
with _ACTION_DEF_CACHE_LOCK:
action_def = _ACTION_DEF_CACHE.get(action_name)
if action_def:
return action_def
action_def = db_api.load_action_definition(action_name)
with _ACTION_DEF_CACHE_LOCK:
_ACTION_DEF_CACHE[action_name] = (
action_def.get_clone() if action_def else None
)
return action_def
@six.add_metaclass(abc.ABCMeta)
@ -393,7 +426,7 @@ class AdHocAction(PythonAction):
wf_ctx=None):
self.action_spec = spec_parser.get_action_spec(action_def.spec)
base_action_def = lookup_utils.find_action_definition_by_name(
base_action_def = _find_action_definition_by_name(
self.action_spec.get_base()
)
@ -671,14 +704,10 @@ def resolve_action_definition(action_spec_name, wf_name=None,
action_full_name = "%s.%s" % (wb_name, action_spec_name)
action_db = lookup_utils.find_action_definition_by_name(
action_full_name
)
action_db = _find_action_definition_by_name(action_full_name)
if not action_db:
action_db = lookup_utils.find_action_definition_by_name(
action_spec_name
)
action_db = _find_action_definition_by_name(action_spec_name)
if not action_db:
raise exc.InvalidActionException(

View File

@ -40,7 +40,6 @@ from mistral.utils import wf_trace
from mistral.workflow import base as wf_base
from mistral.workflow import commands
from mistral.workflow import data_flow
from mistral.workflow import lookup_utils
from mistral.workflow import states
from mistral_lib import actions as ml_actions
@ -227,11 +226,6 @@ class Workflow(object):
assert self.wf_ex
# Since some lookup utils functions may use cache for completed tasks
# we need to clean caches to make sure that stale objects can't be
# retrieved.
lookup_utils.clear_caches()
wf_service.update_workflow_execution_env(self.wf_ex, env)
self._recursive_rerun()
@ -401,10 +395,6 @@ class Workflow(object):
self.wf_ex.accepted = states.is_completed(state)
if states.is_completed(state):
# No need to keep task executions of this workflow in the
# lookup cache anymore.
lookup_utils.invalidate_cached_task_executions(self.wf_ex.id)
triggers.on_workflow_complete(self.wf_ex)
return True
@ -618,13 +608,13 @@ def _get_environment(params):
def _build_fail_info_message(wf_ctrl, wf_ex):
# Try to find where error is exactly.
failed_tasks = sorted(
filter(
lambda t_ex: not wf_ctrl.is_error_handled_for(t_ex),
lookup_utils.find_error_task_executions(wf_ex.id)
),
key=lambda t: t.name
)
failed_tasks = [
t_ex for t_ex in db_api.get_task_executions(
workflow_execution_id=wf_ex.id,
state=states.ERROR,
sort_keys=['name']
) if not wf_ctrl.is_error_handled_for(t_ex)
]
msg = ('Failure caused by error in tasks: %s\n' %
', '.join([t.name for t in failed_tasks]))
@ -659,10 +649,13 @@ def _build_fail_info_message(wf_ctrl, wf_ex):
def _build_cancel_info_message(wf_ctrl, wf_ex):
# Try to find where cancel is exactly.
cancelled_tasks = sorted(
lookup_utils.find_cancelled_task_executions(wf_ex.id),
key=lambda t: t.name
cancelled_tasks = [
t_ex for t_ex in db_api.get_task_executions(
workflow_execution_id=wf_ex.id,
state=states.CANCELLED,
sort_keys=['name']
)
]
return (
'Cancelled tasks: %s' % ', '.join([t.name for t in cancelled_tasks])

View File

@ -30,13 +30,13 @@ from mistral import context as auth_context
from mistral.db.sqlalchemy import base as db_sa_base
from mistral.db.sqlalchemy import sqlite_lock
from mistral.db.v2 import api as db_api
from mistral.engine import actions
from mistral.lang import parser as spec_parser
from mistral.services import action_manager
from mistral.services import security
from mistral.tests.unit import config as test_config
from mistral.utils import inspect_utils as i_utils
from mistral import version
from mistral.workflow import lookup_utils
RESOURCES_PATH = 'tests/resources/'
LOG = logging.getLogger(__name__)
@ -279,7 +279,7 @@ class DbTestCase(BaseTest):
action_manager.sync_db()
def _clean_db(self):
lookup_utils.clear_caches()
actions._ACTION_DEF_CACHE.clear()
contexts = [
get_context(default=False),

View File

@ -18,11 +18,10 @@ import cachetools
from oslo_config import cfg
from mistral.db.v2 import api as db_api
from mistral.engine import actions
from mistral.services import actions as action_service
from mistral.services import workflows as wf_service
from mistral.tests.unit.engine import base
from mistral.workflow import lookup_utils
from mistral.workflow import states
# Use the set_default method to set value otherwise in certain test cases
# the change in value is not permanent.
@ -30,60 +29,6 @@ cfg.CONF.set_default('auth_enable', False, group='pecan')
class LookupUtilsTest(base.EngineTestCase):
def test_task_execution_cache_invalidation(self):
wf_text = """---
version: '2.0'
wf:
tasks:
task1:
action: std.noop
on-success: join_task
task2:
action: std.noop
on-success: join_task
join_task:
join: all
on-success: task4
task4:
action: std.noop
pause-before: true
"""
wf_service.create_workflows(wf_text)
# Start workflow.
wf_ex = self.engine.start_workflow('wf')
self.await_workflow_paused(wf_ex.id)
with db_api.transaction():
# Note: We need to reread execution to access related tasks.
wf_ex = db_api.get_workflow_execution(wf_ex.id)
tasks = wf_ex.task_executions
self.assertEqual(4, len(tasks))
self._assert_single_item(tasks, name='task1', state=states.SUCCESS)
self._assert_single_item(tasks, name='task2', state=states.SUCCESS)
self._assert_single_item(tasks, name='join_task', state=states.SUCCESS)
self._assert_single_item(tasks, name='task4', state=states.IDLE)
# Expecting one cache entry because we know that 'join' operation
# uses cached lookups and the workflow is not finished yet.
self.assertEqual(1, lookup_utils.get_task_execution_cache_size())
self.engine.resume_workflow(wf_ex.id)
self.await_workflow_success(wf_ex.id)
# Expecting that the cache size is 0 because the workflow has
# finished and invalidated corresponding cache entry.
self.assertEqual(0, lookup_utils.get_task_execution_cache_size())
def test_action_definition_cache_ttl(self):
action = """---
@ -135,7 +80,7 @@ class LookupUtilsTest(base.EngineTestCase):
ttl=5 # 5 seconds
)
cache_patch = mock.patch.object(
lookup_utils, '_ACTION_DEF_CACHE', new_cache)
actions, '_ACTION_DEF_CACHE', new_cache)
cache_patch.start()
self.addCleanup(cache_patch.stop)
@ -145,24 +90,24 @@ class LookupUtilsTest(base.EngineTestCase):
self.await_workflow_paused(wf_ex.id)
# Check that 'action1' 'echo' and 'noop' are cached.
self.assertEqual(3, lookup_utils.get_action_definition_cache_size())
self.assertIn('action1', lookup_utils._ACTION_DEF_CACHE)
self.assertIn('std.noop', lookup_utils._ACTION_DEF_CACHE)
self.assertIn('std.echo', lookup_utils._ACTION_DEF_CACHE)
self.assertEqual(3, len(actions._ACTION_DEF_CACHE))
self.assertIn('action1', actions._ACTION_DEF_CACHE)
self.assertIn('std.noop', actions._ACTION_DEF_CACHE)
self.assertIn('std.echo', actions._ACTION_DEF_CACHE)
# Wait some time until cache expires
self._await(
lambda: lookup_utils.get_action_definition_cache_size() == 0,
lambda: len(actions._ACTION_DEF_CACHE) == 0,
fail_message="No triggers were found"
)
self.assertEqual(0, lookup_utils.get_action_definition_cache_size())
self.assertEqual(0, len(actions._ACTION_DEF_CACHE))
self.engine.resume_workflow(wf_ex.id)
self.await_workflow_success(wf_ex.id)
# Check all actions are cached again.
self.assertEqual(2, lookup_utils.get_action_definition_cache_size())
self.assertIn('action1', lookup_utils._ACTION_DEF_CACHE)
self.assertIn('std.echo', lookup_utils._ACTION_DEF_CACHE)
self.assertEqual(2, len(actions._ACTION_DEF_CACHE))
self.assertIn('action1', actions._ACTION_DEF_CACHE)
self.assertIn('std.echo', actions._ACTION_DEF_CACHE)

View File

@ -26,7 +26,6 @@ from mistral.lang import parser as spec_parser
from mistral import utils as u
from mistral.workflow import commands
from mistral.workflow import data_flow
from mistral.workflow import lookup_utils
from mistral.workflow import states
@ -203,9 +202,7 @@ class WorkflowController(object):
:return: True if there is one or more tasks in cancelled state.
"""
t_execs = lookup_utils.find_cancelled_task_executions(self.wf_ex.id)
return len(t_execs) > 0
return len(self._get_task_executions(state=states.CANCELLED)) > 0
@abc.abstractmethod
def evaluate_workflow_final_context(self):
@ -252,14 +249,9 @@ class WorkflowController(object):
return []
# Add all tasks in IDLE state.
idle_tasks = lookup_utils.find_task_executions_with_state(
self.wf_ex.id,
states.IDLE
)
return [
commands.RunExistingTask(self.wf_ex, self.wf_spec, t)
for t in idle_tasks
for t in self._get_task_executions(state=states.IDLE)
]
def _is_paused_or_completed(self):

View File

@ -22,7 +22,6 @@ from mistral import utils
from mistral.workflow import base
from mistral.workflow import commands
from mistral.workflow import data_flow
from mistral.workflow import lookup_utils
from mistral.workflow import states
@ -198,7 +197,7 @@ class DirectWorkflowController(base.WorkflowController):
return bool(self.wf_spec.get_on_error_clause(task_ex.name))
def all_errors_handled(self):
cnt = lookup_utils.find_task_executions_count(
cnt = db_api.get_task_executions_count(
workflow_execution_id=self.wf_ex.id,
state=states.ERROR,
error_handled=False
@ -207,7 +206,7 @@ class DirectWorkflowController(base.WorkflowController):
return cnt == 0
def _find_end_task_executions_as_batches(self):
batches = lookup_utils.find_completed_task_executions_as_batches(
batches = db_api.get_completed_task_executions_as_batches(
workflow_execution_id=self.wf_ex.id,
has_next_tasks=False
)
@ -500,7 +499,6 @@ class DirectWorkflowController(base.WorkflowController):
if self._is_conditional_transition(in_task_ex, in_task_spec) and \
not hasattr(in_task_ex, "in_context"):
in_task_ex = db_api.get_task_execution(in_task_ex.id)
t_execs_cache[in_task_ex.name] = in_task_ex
# [(task name, params, event name), ...]
next_tasks_tuples = self._find_next_tasks(in_task_ex)
@ -515,11 +513,7 @@ class DirectWorkflowController(base.WorkflowController):
def _find_task_execution_by_name(self, t_name):
# Note: in case of 'join' completion check it's better to initialize
# the entire task_executions collection to avoid too many DB queries.
t_execs = lookup_utils.find_task_executions_by_name(
self.wf_ex.id,
t_name
)
t_execs = self._get_task_executions(name=t_name)
# TODO(rakhmerov): Temporary hack. See the previous comment.
return t_execs[-1] if t_execs else None
@ -560,7 +554,6 @@ class DirectWorkflowController(base.WorkflowController):
if self._is_conditional_transition(t_ex, task_spec) and \
not hasattr(t_ex, "in_context"):
t_ex = db_api.get_task_execution(t_ex.id)
t_execs_cache[t_ex.name] = t_ex
if t_name in self._find_next_task_names(t_ex):
return True, depth

View File

@ -1,192 +0,0 @@
# Copyright 2015 - Mirantis, Inc.
# Copyright 2015 - StackStorm, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
The intention of the module is providing various DB related lookup functions
for more convenient usage within the workflow engine.
Some of the functions may provide caching capabilities.
WARNING: Oftentimes, persistent objects returned by the methods in this
module won't be attached to the current DB SQLAlchemy session because
they are returned from the cache and therefore they need to be used
carefully without trying to do any lazy loading etc.
These objects are also not suitable for re-attaching them to a session
in order to update their persistent DB state.
Mostly, they are useful for doing any kind of fast lookups with in order
to make some decision based on their state.
"""
import threading
import cachetools
from oslo_config import cfg
from mistral.db.v2 import api as db_api
from mistral.workflow import states
CONF = cfg.CONF
def _create_workflow_execution_cache():
return cachetools.LRUCache(maxsize=500)
# This is a two-level caching structure.
# First level: [<workflow execution id> -> <task execution cache>]
# Second level (task execution cache): [<task_name> -> <task executions>]
# The first level (by workflow execution id) allows to invalidate
# needed cache entry when the workflow gets completed.
_TASK_EX_CACHE = cachetools.LRUCache(maxsize=100)
_ACTION_DEF_CACHE = cachetools.TTLCache(
maxsize=1000,
ttl=CONF.engine.action_definition_cache_time # 60 seconds by default
)
_TASK_EX_CACHE_LOCK = threading.RLock()
_ACTION_DEF_CACHE_LOCK = threading.RLock()
def find_action_definition_by_name(action_name):
"""Find action definition name.
:param action_name: Action name.
:return: Action definition (possibly a cached value).
"""
with _ACTION_DEF_CACHE_LOCK:
action_def = _ACTION_DEF_CACHE.get(action_name)
if action_def:
return action_def
action_def = db_api.load_action_definition(action_name)
with _ACTION_DEF_CACHE_LOCK:
_ACTION_DEF_CACHE[action_name] = (
action_def.get_clone() if action_def else None
)
return action_def
def find_task_executions_by_name(wf_ex_id, task_name):
"""Finds task executions by workflow execution id and task name.
:param wf_ex_id: Workflow execution id.
:param task_name: Task name.
:return: Task executions (possibly a cached value). The returned list
may contain task execution clones not bound to the DB session.
"""
with _TASK_EX_CACHE_LOCK:
if wf_ex_id in _TASK_EX_CACHE:
wf_ex_cache = _TASK_EX_CACHE[wf_ex_id]
else:
wf_ex_cache = _create_workflow_execution_cache()
_TASK_EX_CACHE[wf_ex_id] = wf_ex_cache
t_execs = wf_ex_cache.get(task_name)
if t_execs:
return t_execs
t_execs = db_api.get_task_executions(
workflow_execution_id=wf_ex_id,
name=task_name,
sort_keys=[] # disable sorting
)
t_execs = [t_ex.get_clone() for t_ex in t_execs]
# We can cache only finished tasks because they won't change.
all_finished = (
t_execs and
all([states.is_completed(t_ex.state) for t_ex in t_execs])
)
if all_finished:
with _TASK_EX_CACHE_LOCK:
wf_ex_cache[task_name] = t_execs
return t_execs
def find_task_executions_by_spec(wf_ex_id, task_spec):
return find_task_executions_by_name(wf_ex_id, task_spec.get_name())
def find_task_executions_by_specs(wf_ex_id, task_specs):
res = []
for t_s in task_specs:
res = res + find_task_executions_by_spec(wf_ex_id, t_s)
return res
def find_task_executions_with_state(wf_ex_id, state):
return db_api.get_task_executions(
workflow_execution_id=wf_ex_id,
state=state
)
def find_successful_task_executions(wf_ex_id):
return find_task_executions_with_state(wf_ex_id, states.SUCCESS)
def find_error_task_executions(wf_ex_id):
return find_task_executions_with_state(wf_ex_id, states.ERROR)
def find_cancelled_task_executions(wf_ex_id):
return find_task_executions_with_state(wf_ex_id, states.CANCELLED)
def find_completed_task_executions(wf_ex_id):
return db_api.get_completed_task_executions(workflow_execution_id=wf_ex_id)
def find_completed_task_executions_as_batches(**kwargs):
return db_api.get_completed_task_executions_as_batches(**kwargs)
def find_task_executions_count(**kwargs):
return db_api.get_task_executions_count(**kwargs)
def get_task_execution_cache_size():
return len(_TASK_EX_CACHE)
def get_action_definition_cache_size():
return len(_ACTION_DEF_CACHE)
def invalidate_cached_task_executions(wf_ex_id):
with _TASK_EX_CACHE_LOCK:
if wf_ex_id in _TASK_EX_CACHE:
del _TASK_EX_CACHE[wf_ex_id]
def clear_caches():
with _TASK_EX_CACHE_LOCK:
_TASK_EX_CACHE.clear()
with _ACTION_DEF_CACHE_LOCK:
_ACTION_DEF_CACHE.clear()

View File

@ -19,7 +19,6 @@ from mistral import exceptions as exc
from mistral.workflow import base
from mistral.workflow import commands
from mistral.workflow import data_flow
from mistral.workflow import lookup_utils
from mistral.workflow import states
@ -89,10 +88,8 @@ class ReverseWorkflowController(base.WorkflowController):
return [t_ex for t_ex in t_execs if t_ex.state == states.SUCCESS]
def evaluate_workflow_final_context(self):
task_execs = lookup_utils.find_task_executions_by_spec(
self.wf_ex.id,
self._get_target_task_specification()
)
task_name = self._get_target_task_specification().get_name()
task_execs = self._get_task_executions(name=task_name)
# NOTE: For reverse workflow there can't be multiple
# executions for one task.
@ -114,9 +111,7 @@ class ReverseWorkflowController(base.WorkflowController):
return task_ex.state != states.ERROR
def all_errors_handled(self):
task_execs = lookup_utils.find_error_task_executions(self.wf_ex.id)
return len(task_execs) == 0
return len(self._get_task_executions(state=states.ERROR)) == 0
def _find_task_specs_with_satisfied_dependencies(self):
"""Given a target task name finds tasks with no dependencies.
@ -139,8 +134,7 @@ class ReverseWorkflowController(base.WorkflowController):
]
def _is_satisfied_task(self, task_spec):
if lookup_utils.find_task_executions_by_spec(
self.wf_ex.id, task_spec):
if self._get_task_executions(name=task_spec.get_name()):
return False
if not self.wf_spec.get_task_requires(task_spec):