Add graceful stop function to ThreadGroup.stop
Currently ThreadGroup.stop() would stop both timers and threads immediately. However, heat-engine should stop timers before threads finished for the purpose of graceful shutdown. The graceful shutdown for Heat is "do process exit after stack processing finished". Heat implemented the stack processing as threads. We should wait its finishing for graceful shutdown's purpose. On the one hand, Heat is using timers. The timers have the function of make another job threads. It means, we should stop timers before waiting threads for preventing another thread occur by timers. From the above, the appropriate order of Heat's graceful shutdown is: * stop timers for preventing new thread occur * wait for all threads to be finished * process exit However, currently ThreadGroup class doesn't have the function of graceful stop. So I propose the function of graceful stop. Change-Id: Id575674af95ae7ad88c00a2ac5d629ab0d0a9b46 Closes-bug: #1304244
This commit is contained in:
@@ -85,7 +85,7 @@ class ThreadGroup(object):
|
||||
def thread_done(self, thread):
|
||||
self.threads.remove(thread)
|
||||
|
||||
def stop(self):
|
||||
def _stop_threads(self):
|
||||
current = threading.current_thread()
|
||||
|
||||
# Iterate over a copy of self.threads so thread_done doesn't
|
||||
@@ -99,6 +99,7 @@ class ThreadGroup(object):
|
||||
except Exception as ex:
|
||||
LOG.exception(ex)
|
||||
|
||||
def _stop_timers(self):
|
||||
for x in self.timers:
|
||||
try:
|
||||
x.stop()
|
||||
@@ -106,6 +107,23 @@ class ThreadGroup(object):
|
||||
LOG.exception(ex)
|
||||
self.timers = []
|
||||
|
||||
def stop(self, graceful=False):
|
||||
"""stop function has the option of graceful=True/False.
|
||||
|
||||
* In case of graceful=True, wait for all threads to be finished.
|
||||
Never kill threads.
|
||||
* In case of graceful=False, kill threads immediately.
|
||||
"""
|
||||
self._stop_timers()
|
||||
if graceful:
|
||||
# In case of graceful=True, wait for all threads to be
|
||||
# finished, never kill threads
|
||||
self.wait()
|
||||
else:
|
||||
# In case of graceful=False(Default), kill threads
|
||||
# immediately
|
||||
self._stop_threads()
|
||||
|
||||
def wait(self):
|
||||
for x in self.timers:
|
||||
try:
|
||||
|
||||
@@ -17,6 +17,8 @@
|
||||
Unit Tests for thread groups
|
||||
"""
|
||||
|
||||
import time
|
||||
|
||||
from oslotest import base as test_base
|
||||
|
||||
from openstack.common import threadgroup
|
||||
@@ -44,3 +46,27 @@ class ThreadGroupTestCase(test_base.BaseTestCase):
|
||||
self.assertTrue(timer._running)
|
||||
self.assertEqual(('arg',), timer.args)
|
||||
self.assertEqual({'kwarg': 'kwarg'}, timer.kw)
|
||||
|
||||
def test_stop_immediately(self):
|
||||
|
||||
def foo(*args, **kwargs):
|
||||
time.sleep(1)
|
||||
start_time = time.time()
|
||||
self.tg.add_thread(foo, 'arg', kwarg='kwarg')
|
||||
self.tg.stop()
|
||||
end_time = time.time()
|
||||
|
||||
self.assertEqual(0, len(self.tg.threads))
|
||||
self.assertTrue(end_time - start_time < 1)
|
||||
|
||||
def test_stop_gracefully(self):
|
||||
|
||||
def foo(*args, **kwargs):
|
||||
time.sleep(1)
|
||||
start_time = time.time()
|
||||
self.tg.add_thread(foo, 'arg', kwarg='kwarg')
|
||||
self.tg.stop(True)
|
||||
end_time = time.time()
|
||||
|
||||
self.assertEqual(0, len(self.tg.threads))
|
||||
self.assertTrue(end_time - start_time >= 1)
|
||||
|
||||
Reference in New Issue
Block a user