Make ThreadPools deallocatable.

Currently, a ThreadPool acquires resources that last until process
exit. You can let the ThreadPool go out of scope, but that doesn't
terminate the worker threads or close file descriptors or anything.

This commit makes it so you can .terminate() a ThreadPool object and
get its resources back. Also, after you call .terminate(), trying to
use the ThreadPool raises an exception so you know you've goofed.

I have some internal code that could really use this, plus it makes
the unit test run not leak resources, which is nice.

Change-Id: Ibf7c6dc14c14f379421a79afb6c90a5e64b235fa
This commit is contained in:
Samuel Merritt 2015-01-07 16:50:58 -08:00
parent 4cdb51418c
commit f58f397cf6
3 changed files with 102 additions and 5 deletions
swift/common
test/unit/common

@ -111,6 +111,10 @@ class LockTimeout(MessageTimeout):
pass
class ThreadPoolDead(SwiftException):
pass
class RingBuilderError(SwiftException):
pass

@ -2737,6 +2737,7 @@ class ThreadPool(object):
self._run_queue = Queue()
self._result_queue = Queue()
self._threads = []
self._alive = True
if nthreads <= 0:
return
@ -2784,6 +2785,8 @@ class ThreadPool(object):
"""
while True:
item = work_queue.get()
if item is None:
break
ev, func, args, kwargs = item
try:
result = func(*args, **kwargs)
@ -2838,6 +2841,9 @@ class ThreadPool(object):
:returns: result of calling func
:raises: whatever func raises
"""
if not self._alive:
raise swift.common.exceptions.ThreadPoolDead()
if self.nthreads <= 0:
result = func(*args, **kwargs)
sleep()
@ -2882,11 +2888,38 @@ class ThreadPool(object):
:returns: result of calling func
:raises: whatever func raises
"""
if not self._alive:
raise swift.common.exceptions.ThreadPoolDead()
if self.nthreads <= 0:
return self._run_in_eventlet_tpool(func, *args, **kwargs)
else:
return self.run_in_thread(func, *args, **kwargs)
def terminate(self):
"""
Releases the threadpool's resources (OS threads, greenthreads, pipes,
etc.) and renders it unusable.
Don't call run_in_thread() or force_run_in_thread() after calling
terminate().
"""
self._alive = False
if self.nthreads <= 0:
return
for _junk in range(self.nthreads):
self._run_queue.put(None)
for thr in self._threads:
thr.join()
self._threads = []
self.nthreads = 0
greenthread.kill(self._consumer_coro)
self.rpipe.close()
os.close(self.wpipe)
def ismount(path):
"""

@ -28,6 +28,7 @@ import mock
import random
import re
import socket
import stat
import sys
import json
import math
@ -55,7 +56,7 @@ from mock import MagicMock, patch
from swift.common.exceptions import (Timeout, MessageTimeout,
ConnectionTimeout, LockTimeout,
ReplicationLockTimeout,
MimeInvalid)
MimeInvalid, ThreadPoolDead)
from swift.common import utils
from swift.common.container_sync_realms import ContainerSyncRealms
from swift.common.swob import Request, Response
@ -3747,7 +3748,28 @@ class TestStatsdLoggingDelegation(unittest.TestCase):
self.assertEquals(called, [12345])
class TestThreadpool(unittest.TestCase):
class TestThreadPool(unittest.TestCase):
def setUp(self):
self.tp = None
def tearDown(self):
if self.tp:
self.tp.terminate()
def _pipe_count(self):
# Counts the number of pipes that this process owns.
fd_dir = "/proc/%d/fd" % os.getpid()
def is_pipe(path):
try:
stat_result = os.stat(path)
return stat.S_ISFIFO(stat_result.st_mode)
except OSError:
return False
return len([fd for fd in os.listdir(fd_dir)
if is_pipe(os.path.join(fd_dir, fd))])
def _thread_id(self):
return threading.current_thread().ident
@ -3759,7 +3781,7 @@ class TestThreadpool(unittest.TestCase):
return int('fishcakes')
def test_run_in_thread_with_threads(self):
tp = utils.ThreadPool(1)
tp = self.tp = utils.ThreadPool(1)
my_id = self._thread_id()
other_id = tp.run_in_thread(self._thread_id)
@ -3778,7 +3800,7 @@ class TestThreadpool(unittest.TestCase):
def test_force_run_in_thread_with_threads(self):
# with nthreads > 0, force_run_in_thread looks just like run_in_thread
tp = utils.ThreadPool(1)
tp = self.tp = utils.ThreadPool(1)
my_id = self._thread_id()
other_id = tp.force_run_in_thread(self._thread_id)
@ -3828,7 +3850,7 @@ class TestThreadpool(unittest.TestCase):
def alpha():
return beta()
tp = utils.ThreadPool(1)
tp = self.tp = utils.ThreadPool(1)
try:
tp.run_in_thread(alpha)
except ZeroDivisionError:
@ -3846,6 +3868,44 @@ class TestThreadpool(unittest.TestCase):
self.assertEqual(tb_func[1], "run_in_thread")
self.assertEqual(tb_func[0], "test_preserving_stack_trace_from_thread")
def test_terminate(self):
initial_thread_count = threading.activeCount()
initial_pipe_count = self._pipe_count()
tp = utils.ThreadPool(4)
# do some work to ensure any lazy initialization happens
tp.run_in_thread(os.path.join, 'foo', 'bar')
tp.run_in_thread(os.path.join, 'baz', 'quux')
# 4 threads in the ThreadPool, plus one pipe for IPC; this also
# serves as a sanity check that we're actually allocating some
# resources to free later
self.assertEqual(initial_thread_count, threading.activeCount() - 4)
self.assertEqual(initial_pipe_count, self._pipe_count() - 2)
tp.terminate()
self.assertEqual(initial_thread_count, threading.activeCount())
self.assertEqual(initial_pipe_count, self._pipe_count())
def test_cant_run_after_terminate(self):
tp = utils.ThreadPool(0)
tp.terminate()
self.assertRaises(ThreadPoolDead, tp.run_in_thread, lambda: 1)
self.assertRaises(ThreadPoolDead, tp.force_run_in_thread, lambda: 1)
def test_double_terminate_doesnt_crash(self):
tp = utils.ThreadPool(0)
tp.terminate()
tp.terminate()
tp = utils.ThreadPool(1)
tp.terminate()
tp.terminate()
def test_terminate_no_threads_doesnt_crash(self):
tp = utils.ThreadPool(0)
tp.terminate()
class TestAuditLocationGenerator(unittest.TestCase):