62d1003ee0
* Evaluation of the final workflow context was an extremely memory expensive operation in case if we had a lot of task executions (which are last in their branches) to process and their inbound context were big (at least a few MB each). Mistral had to fetch the entire collection of completed tasks in order to do all required processing. This patch addresses this problem by using a batch query approach in conjunction with python generators that allow GC to destroy bathes of task executions that have already been processed. According to the tests (based on test_big_on_closures in test_direct_workflow.py module) memory footprint is reduced by 3 times (from 1.5 GB to 0.5 GB when the number of tasks is 500 and each inbound context has 40000 entries) whereas the total test run time increased only by ~1%. Change-Id: I1f016a9c0d3add35e2c5059a29a2eca00e45af42
180 lines
5.1 KiB
Python
180 lines
5.1 KiB
Python
# Copyright 2015 - Mirantis, Inc.
|
|
# Copyright 2015 - StackStorm, Inc.
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
# you may not use this file except in compliance with the License.
|
|
# You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
# See the License for the specific language governing permissions and
|
|
# limitations under the License.
|
|
|
|
"""
|
|
The intention of the module is providing various DB related lookup functions
|
|
for more convenient usage within the workflow engine.
|
|
|
|
Some of the functions may provide caching capabilities.
|
|
|
|
WARNING: Oftentimes, persistent objects returned by the methods in this
|
|
module won't be attached to the current DB SQLAlchemy session because
|
|
they are returned from the cache and therefore they need to be used
|
|
carefully without trying to do any lazy loading etc.
|
|
These objects are also not suitable for re-attaching them to a session
|
|
in order to update their persistent DB state.
|
|
Mostly, they are useful for doing any kind of fast lookups with in order
|
|
to make some decision based on their state.
|
|
"""
|
|
|
|
import threading
|
|
|
|
import cachetools
|
|
from oslo_config import cfg
|
|
|
|
from mistral.db.v2 import api as db_api
|
|
from mistral.workflow import states
|
|
|
|
|
|
CONF = cfg.CONF
|
|
|
|
|
|
def _create_lru_cache_for_workflow_execution(wf_ex_id):
|
|
return cachetools.LRUCache(maxsize=500)
|
|
|
|
# This is a two-level caching structure.
|
|
# First level: [<workflow execution id> -> <task execution cache>]
|
|
# Second level (task execution cache): [<task_name> -> <task executions>]
|
|
# The first level (by workflow execution id) allows to invalidate
|
|
# needed cache entry when the workflow gets completed.
|
|
_TASK_EX_CACHE = cachetools.LRUCache(
|
|
maxsize=100,
|
|
missing=_create_lru_cache_for_workflow_execution
|
|
)
|
|
|
|
_ACTION_DEF_CACHE = cachetools.TTLCache(
|
|
maxsize=1000,
|
|
ttl=CONF.engine.action_definition_cache_time # 60 seconds by default
|
|
)
|
|
|
|
_TASK_EX_CACHE_LOCK = threading.RLock()
|
|
_ACTION_DEF_CACHE_LOCK = threading.RLock()
|
|
|
|
|
|
def find_action_definition_by_name(action_name):
|
|
"""Find action definition name.
|
|
|
|
:param action_name: Action name.
|
|
:return: Action definition (possibly a cached value).
|
|
"""
|
|
with _ACTION_DEF_CACHE_LOCK:
|
|
action_definition = _ACTION_DEF_CACHE.get(action_name)
|
|
|
|
if action_definition:
|
|
return action_definition
|
|
|
|
action_definition = db_api.load_action_definition(action_name)
|
|
|
|
with _ACTION_DEF_CACHE_LOCK:
|
|
_ACTION_DEF_CACHE[action_name] = action_definition
|
|
|
|
return action_definition
|
|
|
|
|
|
def find_task_executions_by_name(wf_ex_id, task_name):
|
|
"""Finds task executions by workflow execution id and task name.
|
|
|
|
:param wf_ex_id: Workflow execution id.
|
|
:param task_name: Task name.
|
|
:return: Task executions (possibly a cached value).
|
|
"""
|
|
with _TASK_EX_CACHE_LOCK:
|
|
t_execs = _TASK_EX_CACHE[wf_ex_id].get(task_name)
|
|
|
|
if t_execs:
|
|
return t_execs
|
|
|
|
t_execs = db_api.get_task_executions(
|
|
workflow_execution_id=wf_ex_id,
|
|
name=task_name,
|
|
sort_keys=[] # disable sorting
|
|
)
|
|
|
|
# We can cache only finished tasks because they won't change.
|
|
all_finished = (
|
|
t_execs and
|
|
all([states.is_completed(t_ex.state) for t_ex in t_execs])
|
|
)
|
|
|
|
if all_finished:
|
|
with _TASK_EX_CACHE_LOCK:
|
|
_TASK_EX_CACHE[wf_ex_id][task_name] = t_execs
|
|
|
|
return t_execs
|
|
|
|
|
|
def find_task_executions_by_spec(wf_ex_id, task_spec):
|
|
return find_task_executions_by_name(wf_ex_id, task_spec.get_name())
|
|
|
|
|
|
def find_task_executions_by_specs(wf_ex_id, task_specs):
|
|
res = []
|
|
|
|
for t_s in task_specs:
|
|
res = res + find_task_executions_by_spec(wf_ex_id, t_s)
|
|
|
|
return res
|
|
|
|
|
|
def find_task_executions_with_state(wf_ex_id, state):
|
|
return db_api.get_task_executions(
|
|
workflow_execution_id=wf_ex_id,
|
|
state=state
|
|
)
|
|
|
|
|
|
def find_successful_task_executions(wf_ex_id):
|
|
return find_task_executions_with_state(wf_ex_id, states.SUCCESS)
|
|
|
|
|
|
def find_error_task_executions(wf_ex_id):
|
|
return find_task_executions_with_state(wf_ex_id, states.ERROR)
|
|
|
|
|
|
def find_cancelled_task_executions(wf_ex_id):
|
|
return find_task_executions_with_state(wf_ex_id, states.CANCELLED)
|
|
|
|
|
|
def find_completed_task_executions(wf_ex_id):
|
|
return db_api.get_completed_task_executions(workflow_execution_id=wf_ex_id)
|
|
|
|
|
|
def find_completed_task_executions_as_batches(wf_ex_id):
|
|
return db_api.get_completed_task_executions_as_batches(
|
|
workflow_execution_id=wf_ex_id
|
|
)
|
|
|
|
|
|
def get_task_execution_cache_size():
|
|
return len(_TASK_EX_CACHE)
|
|
|
|
|
|
def get_action_definition_cache_size():
|
|
return len(_ACTION_DEF_CACHE)
|
|
|
|
|
|
def invalidate_cached_task_executions(wf_ex_id):
|
|
with _TASK_EX_CACHE_LOCK:
|
|
if wf_ex_id in _TASK_EX_CACHE:
|
|
del _TASK_EX_CACHE[wf_ex_id]
|
|
|
|
|
|
def clear_caches():
|
|
with _TASK_EX_CACHE_LOCK:
|
|
_TASK_EX_CACHE.clear()
|
|
|
|
with _ACTION_DEF_CACHE_LOCK:
|
|
_ACTION_DEF_CACHE.clear()
|