From d7168f57cee43b8cef87139fdafbefd750e5b973 Mon Sep 17 00:00:00 2001 From: Abitha Palaniappan Date: Tue, 15 Dec 2015 15:22:19 -0800 Subject: [PATCH] Adding support to cleanup taskflow job details zookeeper runs out of heap space as the data we save to it never gets deleted.this patch allows us to enable a flag in cue-config to remove taskflow jobdetails from zookeeper/persistence backend. Closes-Bug: 1514559 Change-Id: I64e72a7f4c917ec076a6d22c20e5aa6d91f6f75d --- cue/taskflow/__init__.py | 4 +++ cue/taskflow/service.py | 16 ++++++++++ cue/tests/functional/taskflow/test_service.py | 29 +++++++++++++++++++ requirements.txt | 2 +- 4 files changed, 50 insertions(+), 1 deletion(-) diff --git a/cue/taskflow/__init__.py b/cue/taskflow/__init__.py index f6347b5d..7ec993a8 100644 --- a/cue/taskflow/__init__.py +++ b/cue/taskflow/__init__.py @@ -63,6 +63,10 @@ TF_OPTS = [ cfg.BoolOpt('cluster_node_anti_affinity', help="Anti-affinity policy for cue cluster nodes", default=False), + + cfg.BoolOpt('cleanup_job_details', + help="Cleanup taskflow job details", + default=True), ] opt_group = cfg.OptGroup( diff --git a/cue/taskflow/service.py b/cue/taskflow/service.py index b367570d..eac590e6 100644 --- a/cue/taskflow/service.py +++ b/cue/taskflow/service.py @@ -19,6 +19,7 @@ import time from oslo_config import cfg from oslo_log import log as logging +from taskflow.conductors.backends import impl_executor from taskflow.conductors import single_threaded import cue.taskflow.client as tf_client @@ -30,6 +31,8 @@ LOG = logging.getLogger(__name__) CONF = cfg.CONF SUPPORTED_ENGINE_TYPES = ['serial', 'parallel'] +events_emitted = impl_executor.ExecutorConductor.EVENTS_EMITTED +enable_cleanup = cfg.CONF.taskflow.cleanup_job_details class ConductorService(object): @@ -156,6 +159,11 @@ class ConductorService(object): wait_timeout=self._wait_timeout) time.sleep(0.5) + if enable_cleanup: + conductor_notifier = self._conductor.notifier + conductor_notifier.register('job_consumed', + self.cleanup_job_details) + if threading.current_thread().name == 'MainThread': t = threading.Thread(target=self._conductor.run) t.start() @@ -211,3 +219,11 @@ class ConductorService(object): def sighandler(self, signum, frame): self.handle_signals(signals=self._signal_list, handler=signal.SIG_DFL) self.stop() + + def cleanup_job_details(self, state, details): + """Cleanup taskflow job details.""" + + job = details.get('job') + persistence = details.get('persistence') + if state in events_emitted: + persistence.get_connection().destroy_logbook(job.book.uuid) diff --git a/cue/tests/functional/taskflow/test_service.py b/cue/tests/functional/taskflow/test_service.py index af0f15f0..9bf89ed8 100644 --- a/cue/tests/functional/taskflow/test_service.py +++ b/cue/tests/functional/taskflow/test_service.py @@ -17,6 +17,7 @@ import threading import time from oslo_utils import uuidutils +import taskflow.exceptions as exception import taskflow.patterns.linear_flow as linear_flow import taskflow.task import zake.fake_client as zake_client @@ -111,3 +112,31 @@ class TaskflowServiceTest(base.FunctionalTestCase): self.tf_service.stop() self.tf_service.wait() + + def test_cleanup_job_details(self): + job_args = { + 'test_arg': 5 + } + tx_uuid = uuidutils.generate_uuid() + + job = self.tf_client.post(create_flow, job_args, tx_uuid=tx_uuid) + persistence_connection = self.tf_client.persistence.get_connection() + expected_logbook_uuid = job.book.uuid + + log_book = persistence_connection.get_logbook(job.book.uuid) + actual_logbook_uuid = log_book._uuid + + self.assertEqual(expected_logbook_uuid, actual_logbook_uuid, + "expected %s logbook uuid got %s" % + (expected_logbook_uuid, actual_logbook_uuid)) + + t = threading.Thread(target=self.tf_service.start) + t.start() + time.sleep(2) + + self.assertRaises(exception.NotFound, + persistence_connection.get_logbook, + job.book.uuid) + + self.tf_service.stop() + self.tf_service.wait() diff --git a/requirements.txt b/requirements.txt index dddbf387..811c3ba7 100644 --- a/requirements.txt +++ b/requirements.txt @@ -27,7 +27,7 @@ paramiko>=1.13.0 posix-ipc Jinja2>=2.8 # BSD License (3 clause) -taskflow>=1.16.0 +taskflow>=1.25.0 kazoo>=2.2 tooz>=1.28.0 # Apache-2.0 #PyMySQL