From 086a3d43fa734dae61ed40912cde701912318dc9 Mon Sep 17 00:00:00 2001 From: Renat Akhmerov Date: Wed, 15 Feb 2017 18:16:57 +0700 Subject: [PATCH] Fix memory leak related to cached lookups * It turned out that Mistral used a lot of memory because it used cached DB lookups for task executions (only task executions in a terminal state get cached) and the maximum size of the cache was too big, 20000 entries. One task execution in certain cases may take a lot of memory (e.g. several megabytes) so 20000 objects make memory footprint huge. Additionally, when a workflow completes we need to invalidate coresponding task executions in the cache. This didn't happen before this patch. * This patch fixes the aforementioned issues by using partial invalidation of the cache and setting smaller cache size. * Fixed "Starting workflow .." log statement to not print the entire structure of the workflow definition into the workflow log, only its name and input parameters * Minor style fixes Change-Id: I0ee300f631a4bdfa2f618c2a10048267f27b3345 Closes-bug: #1664864 --- mistral/api/controllers/v2/resources.py | 2 +- mistral/engine/default_engine.py | 1 - mistral/engine/workflows.py | 13 ++- mistral/tests/unit/base.py | 2 +- .../tests/unit/engine/test_lookup_utils.py | 82 +++++++++++++++++++ mistral/workflow/lookup_utils.py | 43 +++++++--- 6 files changed, 127 insertions(+), 16 deletions(-) create mode 100644 mistral/tests/unit/engine/test_lookup_utils.py diff --git a/mistral/api/controllers/v2/resources.py b/mistral/api/controllers/v2/resources.py index 402babff..0069b532 100644 --- a/mistral/api/controllers/v2/resources.py +++ b/mistral/api/controllers/v2/resources.py @@ -354,7 +354,7 @@ class ActionExecution(resource.Resource): output = types.jsontype created_at = wtypes.text updated_at = wtypes.text - params = types.jsontype + params = types.jsontype # TODO(rakhmerov): What is this?? @classmethod def sample(cls): diff --git a/mistral/engine/default_engine.py b/mistral/engine/default_engine.py index 8773c595..d1e7a9bb 100644 --- a/mistral/engine/default_engine.py +++ b/mistral/engine/default_engine.py @@ -38,7 +38,6 @@ class DefaultEngine(base.Engine): @profiler.trace('engine-start-workflow') def start_workflow(self, wf_identifier, wf_input, description='', **params): - with db_api.transaction(): wf_ex = wf_handler.start_workflow( wf_identifier, diff --git a/mistral/engine/workflows.py b/mistral/engine/workflows.py index 4fcfbcd3..fd73b237 100644 --- a/mistral/engine/workflows.py +++ b/mistral/engine/workflows.py @@ -81,7 +81,11 @@ class Workflow(object): assert not self.wf_ex - wf_trace.info(self.wf_ex, "Starting workflow: %s" % self.wf_def) + wf_trace.info( + self.wf_ex, + "Starting workflow [name=%s, input=%s]" % + (self.wf_def.name, utils.cut(input_dict)) + ) # TODO(rakhmerov): This call implicitly changes input_dict! Fix it! # After fix we need to move validation after adding risky fields. @@ -147,7 +151,7 @@ class Workflow(object): # Since some lookup utils functions may use cache for completed tasks # we need to clean caches to make sure that stale objects can't be # retrieved. - lookup_utils.clean_caches() + lookup_utils.clear_caches() wf_service.update_workflow_execution_env(self.wf_ex, env) @@ -258,6 +262,11 @@ class Workflow(object): # only if it completed successfully or failed. self.wf_ex.accepted = states.is_completed(state) + if states.is_completed(state): + # No need to keep task executions of this workflow in the + # lookup cache anymore. + lookup_utils.invalidate_cached_task_executions(self.wf_ex.id) + if recursive and self.wf_ex.task_execution_id: parent_task_ex = db_api.get_task_execution( self.wf_ex.task_execution_id diff --git a/mistral/tests/unit/base.py b/mistral/tests/unit/base.py index eef902b2..a94a666c 100644 --- a/mistral/tests/unit/base.py +++ b/mistral/tests/unit/base.py @@ -248,7 +248,7 @@ class DbTestCase(BaseTest): action_manager.sync_db() def _clean_db(self): - lookup_utils.clean_caches() + lookup_utils.clear_caches() contexts = [ get_context(default=False), diff --git a/mistral/tests/unit/engine/test_lookup_utils.py b/mistral/tests/unit/engine/test_lookup_utils.py new file mode 100644 index 00000000..720d8331 --- /dev/null +++ b/mistral/tests/unit/engine/test_lookup_utils.py @@ -0,0 +1,82 @@ +# Copyright 2017 - Nokia Networks. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from oslo_config import cfg + +from mistral.db.v2 import api as db_api +from mistral.services import workflows as wf_service +from mistral.tests.unit.engine import base +from mistral.workflow import lookup_utils +from mistral.workflow import states + +# Use the set_default method to set value otherwise in certain test cases +# the change in value is not permanent. +cfg.CONF.set_default('auth_enable', False, group='pecan') + + +class LookupUtilsTest(base.EngineTestCase): + def test_task_execution_cache_invalidation(self): + wf_text = """--- + version: '2.0' + + wf: + tasks: + task1: + action: std.noop + on-success: join_task + + task2: + action: std.noop + on-success: join_task + + join_task: + join: all + on-success: task4 + + task4: + action: std.noop + pause-before: true + """ + + wf_service.create_workflows(wf_text) + + # Start workflow. + wf_ex = self.engine.start_workflow('wf', {}) + + self.await_workflow_paused(wf_ex.id) + + with db_api.transaction(): + # Note: We need to reread execution to access related tasks. + wf_ex = db_api.get_workflow_execution(wf_ex.id) + + tasks = wf_ex.task_executions + + self.assertEqual(4, len(tasks)) + + self._assert_single_item(tasks, name='task1', state=states.SUCCESS) + self._assert_single_item(tasks, name='task2', state=states.SUCCESS) + self._assert_single_item(tasks, name='join_task', state=states.SUCCESS) + self._assert_single_item(tasks, name='task4', state=states.IDLE) + + # Expecting one cache entry because we know that 'join' operation + # uses cached lookups and the workflow is not finished yet. + self.assertEqual(1, lookup_utils.get_task_execution_cache_size()) + + self.engine.resume_workflow(wf_ex.id) + + self.await_workflow_success(wf_ex.id) + + # Expecting that the cache size is 0 because the workflow has + # finished and invalidated corresponding cache entry. + self.assertEqual(0, lookup_utils.get_task_execution_cache_size()) diff --git a/mistral/workflow/lookup_utils.py b/mistral/workflow/lookup_utils.py index 82d6bd8c..54709d5f 100644 --- a/mistral/workflow/lookup_utils.py +++ b/mistral/workflow/lookup_utils.py @@ -35,8 +35,21 @@ import threading from mistral.db.v2 import api as db_api from mistral.workflow import states -_TASK_EXECUTIONS_CACHE_LOCK = threading.RLock() -_TASK_EXECUTIONS_CACHE = cachetools.LRUCache(maxsize=20000) + +def _create_lru_cache_for_workflow_execution(wf_ex_id): + return cachetools.LRUCache(maxsize=500) + +# This is a two-level caching structure. +# First level: [ -> ] +# Second level (task execution cache): [ -> ] +# The first level (by workflow execution id) allows to invalidate +# needed cache entry when the workflow gets completed. +_TASK_EX_CACHE = cachetools.LRUCache( + maxsize=100, + missing=_create_lru_cache_for_workflow_execution +) + +_CACHE_LOCK = threading.RLock() def find_task_executions_by_name(wf_ex_id, task_name): @@ -46,10 +59,8 @@ def find_task_executions_by_name(wf_ex_id, task_name): :param task_name: Task name. :return: Task executions (possibly a cached value). """ - cache_key = (wf_ex_id, task_name) - - with _TASK_EXECUTIONS_CACHE_LOCK: - t_execs = _TASK_EXECUTIONS_CACHE.get(cache_key) + with _CACHE_LOCK: + t_execs = _TASK_EX_CACHE[wf_ex_id].get(task_name) if t_execs: return t_execs @@ -66,8 +77,8 @@ def find_task_executions_by_name(wf_ex_id, task_name): ) if all_finished: - with _TASK_EXECUTIONS_CACHE_LOCK: - _TASK_EXECUTIONS_CACHE[cache_key] = t_execs + with _CACHE_LOCK: + _TASK_EX_CACHE[wf_ex_id][task_name] = t_execs return t_execs @@ -108,6 +119,16 @@ def find_completed_tasks(wf_ex_id): return db_api.get_completed_task_executions(workflow_execution_id=wf_ex_id) -def clean_caches(): - with _TASK_EXECUTIONS_CACHE_LOCK: - _TASK_EXECUTIONS_CACHE.clear() +def get_task_execution_cache_size(): + return len(_TASK_EX_CACHE) + + +def invalidate_cached_task_executions(wf_ex_id): + with _CACHE_LOCK: + if wf_ex_id in _TASK_EX_CACHE: + del _TASK_EX_CACHE[wf_ex_id] + + +def clear_caches(): + with _CACHE_LOCK: + _TASK_EX_CACHE.clear()