From 4e78ef22494e4be5c3b82a8de3eaa6afe02759fd Mon Sep 17 00:00:00 2001 From: Mitsuru Kanabuchi Date: Thu, 17 Apr 2014 11:31:46 +0900 Subject: [PATCH] Add graceful stop function to ThreadGroup.stop Currently ThreadGroup.stop() would stop both timers and threads immediately. However, heat-engine should stop timers before threads finished for the purpose of graceful shutdown. The graceful shutdown for Heat is "do process exit after stack processing finished". Heat implemented the stack processing as threads. We should wait its finishing for graceful shutdown's purpose. On the one hand, Heat is using timers. The timers have the function of make another job threads. It means, we should stop timers before waiting threads for preventing another thread occur by timers. From the above, the appropriate order of Heat's graceful shutdown is: * stop timers for preventing new thread occur * wait for all threads to be finished * process exit However, currently ThreadGroup class doesn't have the function of graceful stop. So I propose the function of graceful stop. Change-Id: Id575674af95ae7ad88c00a2ac5d629ab0d0a9b46 Closes-bug: #1304244 --- openstack/common/threadgroup.py | 20 +++++++++++++++++++- tests/unit/test_threadgroup.py | 26 ++++++++++++++++++++++++++ 2 files changed, 45 insertions(+), 1 deletion(-) diff --git a/openstack/common/threadgroup.py b/openstack/common/threadgroup.py index 31097f1..0f4bbf8 100644 --- a/openstack/common/threadgroup.py +++ b/openstack/common/threadgroup.py @@ -85,7 +85,7 @@ class ThreadGroup(object): def thread_done(self, thread): self.threads.remove(thread) - def stop(self): + def _stop_threads(self): current = threading.current_thread() # Iterate over a copy of self.threads so thread_done doesn't @@ -99,6 +99,7 @@ class ThreadGroup(object): except Exception as ex: LOG.exception(ex) + def _stop_timers(self): for x in self.timers: try: x.stop() @@ -106,6 +107,23 @@ class ThreadGroup(object): LOG.exception(ex) self.timers = [] + def stop(self, graceful=False): + """stop function has the option of graceful=True/False. + + * In case of graceful=True, wait for all threads to be finished. + Never kill threads. + * In case of graceful=False, kill threads immediately. + """ + self._stop_timers() + if graceful: + # In case of graceful=True, wait for all threads to be + # finished, never kill threads + self.wait() + else: + # In case of graceful=False(Default), kill threads + # immediately + self._stop_threads() + def wait(self): for x in self.timers: try: diff --git a/tests/unit/test_threadgroup.py b/tests/unit/test_threadgroup.py index 08446e2..adbb6e5 100644 --- a/tests/unit/test_threadgroup.py +++ b/tests/unit/test_threadgroup.py @@ -17,6 +17,8 @@ Unit Tests for thread groups """ +import time + from oslotest import base as test_base from openstack.common import threadgroup @@ -44,3 +46,27 @@ class ThreadGroupTestCase(test_base.BaseTestCase): self.assertTrue(timer._running) self.assertEqual(('arg',), timer.args) self.assertEqual({'kwarg': 'kwarg'}, timer.kw) + + def test_stop_immediately(self): + + def foo(*args, **kwargs): + time.sleep(1) + start_time = time.time() + self.tg.add_thread(foo, 'arg', kwarg='kwarg') + self.tg.stop() + end_time = time.time() + + self.assertEqual(0, len(self.tg.threads)) + self.assertTrue(end_time - start_time < 1) + + def test_stop_gracefully(self): + + def foo(*args, **kwargs): + time.sleep(1) + start_time = time.time() + self.tg.add_thread(foo, 'arg', kwarg='kwarg') + self.tg.stop(True) + end_time = time.time() + + self.assertEqual(0, len(self.tg.threads)) + self.assertTrue(end_time - start_time >= 1)