Adding support to cleanup taskflow job details

zookeeper runs out of heap space as the data we save to it
never gets deleted.this patch allows us to enable a flag in cue-config
to remove taskflow jobdetails from zookeeper/persistence backend.

Closes-Bug: 1514559

Change-Id: I64e72a7f4c917ec076a6d22c20e5aa6d91f6f75d
This commit is contained in:
Abitha Palaniappan 2015-12-15 15:22:19 -08:00
parent 3866e6065d
commit d7168f57ce
4 changed files with 50 additions and 1 deletions

View File

@ -63,6 +63,10 @@ TF_OPTS = [
cfg.BoolOpt('cluster_node_anti_affinity',
help="Anti-affinity policy for cue cluster nodes",
default=False),
cfg.BoolOpt('cleanup_job_details',
help="Cleanup taskflow job details",
default=True),
]
opt_group = cfg.OptGroup(

View File

@ -19,6 +19,7 @@ import time
from oslo_config import cfg
from oslo_log import log as logging
from taskflow.conductors.backends import impl_executor
from taskflow.conductors import single_threaded
import cue.taskflow.client as tf_client
@ -30,6 +31,8 @@ LOG = logging.getLogger(__name__)
CONF = cfg.CONF
SUPPORTED_ENGINE_TYPES = ['serial', 'parallel']
events_emitted = impl_executor.ExecutorConductor.EVENTS_EMITTED
enable_cleanup = cfg.CONF.taskflow.cleanup_job_details
class ConductorService(object):
@ -156,6 +159,11 @@ class ConductorService(object):
wait_timeout=self._wait_timeout)
time.sleep(0.5)
if enable_cleanup:
conductor_notifier = self._conductor.notifier
conductor_notifier.register('job_consumed',
self.cleanup_job_details)
if threading.current_thread().name == 'MainThread':
t = threading.Thread(target=self._conductor.run)
t.start()
@ -211,3 +219,11 @@ class ConductorService(object):
def sighandler(self, signum, frame):
self.handle_signals(signals=self._signal_list, handler=signal.SIG_DFL)
self.stop()
def cleanup_job_details(self, state, details):
"""Cleanup taskflow job details."""
job = details.get('job')
persistence = details.get('persistence')
if state in events_emitted:
persistence.get_connection().destroy_logbook(job.book.uuid)

View File

@ -17,6 +17,7 @@ import threading
import time
from oslo_utils import uuidutils
import taskflow.exceptions as exception
import taskflow.patterns.linear_flow as linear_flow
import taskflow.task
import zake.fake_client as zake_client
@ -111,3 +112,31 @@ class TaskflowServiceTest(base.FunctionalTestCase):
self.tf_service.stop()
self.tf_service.wait()
def test_cleanup_job_details(self):
job_args = {
'test_arg': 5
}
tx_uuid = uuidutils.generate_uuid()
job = self.tf_client.post(create_flow, job_args, tx_uuid=tx_uuid)
persistence_connection = self.tf_client.persistence.get_connection()
expected_logbook_uuid = job.book.uuid
log_book = persistence_connection.get_logbook(job.book.uuid)
actual_logbook_uuid = log_book._uuid
self.assertEqual(expected_logbook_uuid, actual_logbook_uuid,
"expected %s logbook uuid got %s" %
(expected_logbook_uuid, actual_logbook_uuid))
t = threading.Thread(target=self.tf_service.start)
t.start()
time.sleep(2)
self.assertRaises(exception.NotFound,
persistence_connection.get_logbook,
job.book.uuid)
self.tf_service.stop()
self.tf_service.wait()

View File

@ -27,7 +27,7 @@ paramiko>=1.13.0
posix-ipc
Jinja2>=2.8 # BSD License (3 clause)
taskflow>=1.16.0
taskflow>=1.25.0
kazoo>=2.2
tooz>=1.28.0 # Apache-2.0
#PyMySQL