Fix memory leak related to cached lookups

* It turned out that Mistral used a lot of memory because it
  used cached DB lookups for task executions (only task executions
  in a terminal state get cached) and the maximum size of the cache
  was too big, 20000 entries. One task execution in certain cases
  may take a lot of memory (e.g. several megabytes) so 20000 objects
  make memory footprint huge. Additionally, when a workflow completes
  we need to invalidate coresponding task executions in the cache.
  This didn't happen before this patch.
* This patch fixes the aforementioned issues by using partial
  invalidation of the cache and setting smaller cache size.
* Fixed "Starting workflow .." log statement to not print the entire
  structure of the workflow definition into the workflow log, only its
  name and input parameters
* Minor style fixes

Change-Id: I0ee300f631a4bdfa2f618c2a10048267f27b3345
Closes-bug: #1664864
This commit is contained in:
Renat Akhmerov 2017-02-15 18:16:57 +07:00
parent 6e5cbd3b5a
commit 086a3d43fa
6 changed files with 127 additions and 16 deletions

View File

@ -354,7 +354,7 @@ class ActionExecution(resource.Resource):
output = types.jsontype output = types.jsontype
created_at = wtypes.text created_at = wtypes.text
updated_at = wtypes.text updated_at = wtypes.text
params = types.jsontype params = types.jsontype # TODO(rakhmerov): What is this??
@classmethod @classmethod
def sample(cls): def sample(cls):

View File

@ -38,7 +38,6 @@ class DefaultEngine(base.Engine):
@profiler.trace('engine-start-workflow') @profiler.trace('engine-start-workflow')
def start_workflow(self, wf_identifier, wf_input, description='', def start_workflow(self, wf_identifier, wf_input, description='',
**params): **params):
with db_api.transaction(): with db_api.transaction():
wf_ex = wf_handler.start_workflow( wf_ex = wf_handler.start_workflow(
wf_identifier, wf_identifier,

View File

@ -81,7 +81,11 @@ class Workflow(object):
assert not self.wf_ex assert not self.wf_ex
wf_trace.info(self.wf_ex, "Starting workflow: %s" % self.wf_def) wf_trace.info(
self.wf_ex,
"Starting workflow [name=%s, input=%s]" %
(self.wf_def.name, utils.cut(input_dict))
)
# TODO(rakhmerov): This call implicitly changes input_dict! Fix it! # TODO(rakhmerov): This call implicitly changes input_dict! Fix it!
# After fix we need to move validation after adding risky fields. # After fix we need to move validation after adding risky fields.
@ -147,7 +151,7 @@ class Workflow(object):
# Since some lookup utils functions may use cache for completed tasks # Since some lookup utils functions may use cache for completed tasks
# we need to clean caches to make sure that stale objects can't be # we need to clean caches to make sure that stale objects can't be
# retrieved. # retrieved.
lookup_utils.clean_caches() lookup_utils.clear_caches()
wf_service.update_workflow_execution_env(self.wf_ex, env) wf_service.update_workflow_execution_env(self.wf_ex, env)
@ -258,6 +262,11 @@ class Workflow(object):
# only if it completed successfully or failed. # only if it completed successfully or failed.
self.wf_ex.accepted = states.is_completed(state) self.wf_ex.accepted = states.is_completed(state)
if states.is_completed(state):
# No need to keep task executions of this workflow in the
# lookup cache anymore.
lookup_utils.invalidate_cached_task_executions(self.wf_ex.id)
if recursive and self.wf_ex.task_execution_id: if recursive and self.wf_ex.task_execution_id:
parent_task_ex = db_api.get_task_execution( parent_task_ex = db_api.get_task_execution(
self.wf_ex.task_execution_id self.wf_ex.task_execution_id

View File

@ -248,7 +248,7 @@ class DbTestCase(BaseTest):
action_manager.sync_db() action_manager.sync_db()
def _clean_db(self): def _clean_db(self):
lookup_utils.clean_caches() lookup_utils.clear_caches()
contexts = [ contexts = [
get_context(default=False), get_context(default=False),

View File

@ -0,0 +1,82 @@
# Copyright 2017 - Nokia Networks.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from oslo_config import cfg
from mistral.db.v2 import api as db_api
from mistral.services import workflows as wf_service
from mistral.tests.unit.engine import base
from mistral.workflow import lookup_utils
from mistral.workflow import states
# Use the set_default method to set value otherwise in certain test cases
# the change in value is not permanent.
cfg.CONF.set_default('auth_enable', False, group='pecan')
class LookupUtilsTest(base.EngineTestCase):
def test_task_execution_cache_invalidation(self):
wf_text = """---
version: '2.0'
wf:
tasks:
task1:
action: std.noop
on-success: join_task
task2:
action: std.noop
on-success: join_task
join_task:
join: all
on-success: task4
task4:
action: std.noop
pause-before: true
"""
wf_service.create_workflows(wf_text)
# Start workflow.
wf_ex = self.engine.start_workflow('wf', {})
self.await_workflow_paused(wf_ex.id)
with db_api.transaction():
# Note: We need to reread execution to access related tasks.
wf_ex = db_api.get_workflow_execution(wf_ex.id)
tasks = wf_ex.task_executions
self.assertEqual(4, len(tasks))
self._assert_single_item(tasks, name='task1', state=states.SUCCESS)
self._assert_single_item(tasks, name='task2', state=states.SUCCESS)
self._assert_single_item(tasks, name='join_task', state=states.SUCCESS)
self._assert_single_item(tasks, name='task4', state=states.IDLE)
# Expecting one cache entry because we know that 'join' operation
# uses cached lookups and the workflow is not finished yet.
self.assertEqual(1, lookup_utils.get_task_execution_cache_size())
self.engine.resume_workflow(wf_ex.id)
self.await_workflow_success(wf_ex.id)
# Expecting that the cache size is 0 because the workflow has
# finished and invalidated corresponding cache entry.
self.assertEqual(0, lookup_utils.get_task_execution_cache_size())

View File

@ -35,8 +35,21 @@ import threading
from mistral.db.v2 import api as db_api from mistral.db.v2 import api as db_api
from mistral.workflow import states from mistral.workflow import states
_TASK_EXECUTIONS_CACHE_LOCK = threading.RLock()
_TASK_EXECUTIONS_CACHE = cachetools.LRUCache(maxsize=20000) def _create_lru_cache_for_workflow_execution(wf_ex_id):
return cachetools.LRUCache(maxsize=500)
# This is a two-level caching structure.
# First level: [<workflow execution id> -> <task execution cache>]
# Second level (task execution cache): [<task_name> -> <task executions>]
# The first level (by workflow execution id) allows to invalidate
# needed cache entry when the workflow gets completed.
_TASK_EX_CACHE = cachetools.LRUCache(
maxsize=100,
missing=_create_lru_cache_for_workflow_execution
)
_CACHE_LOCK = threading.RLock()
def find_task_executions_by_name(wf_ex_id, task_name): def find_task_executions_by_name(wf_ex_id, task_name):
@ -46,10 +59,8 @@ def find_task_executions_by_name(wf_ex_id, task_name):
:param task_name: Task name. :param task_name: Task name.
:return: Task executions (possibly a cached value). :return: Task executions (possibly a cached value).
""" """
cache_key = (wf_ex_id, task_name) with _CACHE_LOCK:
t_execs = _TASK_EX_CACHE[wf_ex_id].get(task_name)
with _TASK_EXECUTIONS_CACHE_LOCK:
t_execs = _TASK_EXECUTIONS_CACHE.get(cache_key)
if t_execs: if t_execs:
return t_execs return t_execs
@ -66,8 +77,8 @@ def find_task_executions_by_name(wf_ex_id, task_name):
) )
if all_finished: if all_finished:
with _TASK_EXECUTIONS_CACHE_LOCK: with _CACHE_LOCK:
_TASK_EXECUTIONS_CACHE[cache_key] = t_execs _TASK_EX_CACHE[wf_ex_id][task_name] = t_execs
return t_execs return t_execs
@ -108,6 +119,16 @@ def find_completed_tasks(wf_ex_id):
return db_api.get_completed_task_executions(workflow_execution_id=wf_ex_id) return db_api.get_completed_task_executions(workflow_execution_id=wf_ex_id)
def clean_caches(): def get_task_execution_cache_size():
with _TASK_EXECUTIONS_CACHE_LOCK: return len(_TASK_EX_CACHE)
_TASK_EXECUTIONS_CACHE.clear()
def invalidate_cached_task_executions(wf_ex_id):
with _CACHE_LOCK:
if wf_ex_id in _TASK_EX_CACHE:
del _TASK_EX_CACHE[wf_ex_id]
def clear_caches():
with _CACHE_LOCK:
_TASK_EX_CACHE.clear()