Make greenexecutor not keep greenthreads active

Instead of keeping all greenthreads active even
if they are not being used only keep the greenthreads
active if there exists active work to complete.

This saves resources (each greenthread takes up
memory) and allows a pool to shrink and grow in
a more dynamic fashion.

Fixes bug 1339406

Change-Id: Idc8ab8447045915a0ffbaf21fa5c4bdb7a9e3593
This commit is contained in:
Joshua Harlow
2014-07-09 13:57:48 -07:00
parent e4810f0d31
commit 0fae765bdc
2 changed files with 80 additions and 45 deletions

View File

@@ -31,7 +31,7 @@ class GreenExecutorTest(test.TestCase):
called[name] += 1
for i in range(0, amount):
yield functools.partial(store_call, name=int(i))
yield functools.partial(store_call, name=i)
def test_func_calls(self):
called = collections.defaultdict(int)
@@ -44,20 +44,21 @@ class GreenExecutorTest(test.TestCase):
self.assertEqual(1, called[1])
def test_no_construction(self):
self.assertRaises(AssertionError, eu.GreenExecutor, 0)
self.assertRaises(AssertionError, eu.GreenExecutor, -1)
self.assertRaises(AssertionError, eu.GreenExecutor, "-1")
self.assertRaises(ValueError, eu.GreenExecutor, 0)
self.assertRaises(ValueError, eu.GreenExecutor, -1)
self.assertRaises(ValueError, eu.GreenExecutor, "-1")
def test_result_callback(self):
called = collections.defaultdict(int)
def call_back(future):
def callback(future):
called[future] += 1
funcs = list(self.make_funcs(called, 1))
with eu.GreenExecutor(2) as e:
f = e.submit(funcs[0])
f.add_done_callback(call_back)
for func in funcs:
f = e.submit(func)
f.add_done_callback(callback)
self.assertEqual(2, len(called))
@@ -87,6 +88,27 @@ class GreenExecutorTest(test.TestCase):
result = fs[i].result()
self.assertEqual(i, result)
def test_called_restricted_size(self):
called = collections.defaultdict(int)
with eu.GreenExecutor(1) as e:
for f in self.make_funcs(called, 100):
e.submit(f)
self.assertEqual(99, e.amount_delayed)
self.assertFalse(e.alive)
self.assertEqual(100, len(called))
self.assertGreaterEqual(1, e.workers_created)
self.assertEqual(0, e.amount_delayed)
def test_shutdown_twice(self):
e = eu.GreenExecutor(1)
self.assertTrue(e.alive)
e.shutdown()
self.assertFalse(e.alive)
e.shutdown()
self.assertFalse(e.alive)
def test_func_cancellation(self):
called = collections.defaultdict(int)

View File

@@ -15,15 +15,14 @@
# under the License.
import logging
import threading
from concurrent import futures
try:
from eventlet.green import threading as green_threading
from eventlet.green import threading as greenthreading
from eventlet import greenpool
from eventlet import patcher
from eventlet import queue
from eventlet import patcher as greenpatcher
from eventlet import queue as greenqueue
EVENTLET_AVAILABLE = True
except ImportError:
EVENTLET_AVAILABLE = False
@@ -33,10 +32,6 @@ from taskflow.utils import lock_utils
LOG = logging.getLogger(__name__)
# NOTE(harlowja): this object signals to threads that they should stop
# working and rest in peace.
_TOMBSTONE = object()
_DONE_STATES = frozenset([
futures._base.CANCELLED_AND_NOTIFIED,
futures._base.FINISHED,
@@ -62,26 +57,29 @@ class _WorkItem(object):
class _Worker(object):
def __init__(self, executor, work_queue, worker_id):
def __init__(self, executor, work, work_queue):
self.executor = executor
self.work = work
self.work_queue = work_queue
self.worker_id = worker_id
def __call__(self):
# Run our main piece of work.
try:
self.work.run()
finally:
# Consume any delayed work before finishing (this is how we finish
# work that was to big for the pool size, but needs to be finished
# no matter).
while True:
work = self.work_queue.get(block=True)
if work is _TOMBSTONE:
# NOTE(harlowja): give notice to other workers (this is
# basically a chain of tombstone calls that will cause all
# the workers on the queue to eventually shut-down).
self.work_queue.put(_TOMBSTONE)
try:
w = self.work_queue.get_nowait()
except greenqueue.Empty:
break
else:
work.run()
except BaseException:
LOG.critical("Exception in worker %s of '%s'",
self.worker_id, self.executor, exc_info=True)
try:
w.run()
finally:
self.work_queue.task_done()
class GreenFuture(futures.Future):
@@ -93,8 +91,8 @@ class GreenFuture(futures.Future):
# functions will correctly yield to eventlet. If this is not done then
# waiting on the future never actually causes the greenthreads to run
# and thus you wait for infinity.
if not patcher.is_monkey_patched('threading'):
self._condition = green_threading.Condition()
if not greenpatcher.is_monkey_patched('threading'):
self._condition = greenthreading.Condition()
class GreenExecutor(futures.Executor):
@@ -102,44 +100,59 @@ class GreenExecutor(futures.Executor):
def __init__(self, max_workers=1000):
assert EVENTLET_AVAILABLE, 'eventlet is needed to use a green executor'
assert int(max_workers) > 0, 'Max workers must be greater than zero'
self._max_workers = int(max_workers)
if self._max_workers <= 0:
raise ValueError('Max workers must be greater than zero')
self._pool = greenpool.GreenPool(self._max_workers)
self._work_queue = queue.LightQueue()
self._shutdown_lock = threading.RLock()
self._delayed_work = greenqueue.Queue()
self._shutdown_lock = greenthreading.Lock()
self._shutdown = False
self._workers_created = 0
@property
def workers_created(self):
return self._workers_created
@property
def amount_delayed(self):
return self._delayed_work.qsize()
@property
def alive(self):
return not self._shutdown
@lock_utils.locked(lock='_shutdown_lock')
def submit(self, fn, *args, **kwargs):
if self._shutdown:
raise RuntimeError('cannot schedule new futures after shutdown')
f = GreenFuture()
w = _WorkItem(f, fn, args, kwargs)
self._work_queue.put(w)
# Spin up any new workers (since they are spun up on demand and
# not at executor initialization).
self._spin_up()
work = _WorkItem(f, fn, args, kwargs)
if not self._spin_up(work):
self._delayed_work.put(work)
return f
def _spin_up(self):
cur_am = (self._pool.running() + self._pool.waiting())
if cur_am < self._max_workers and cur_am < self._work_queue.qsize():
# Spin up a new worker to do the work as we are behind.
worker = _Worker(self, self._work_queue, cur_am + 1)
self._pool.spawn(worker)
def _spin_up(self, work):
alive = self._pool.running() + self._pool.waiting()
if alive < self._max_workers:
self._pool.spawn_n(_Worker(self, work, self._delayed_work))
self._workers_created += 1
return True
return False
def shutdown(self, wait=True):
with self._shutdown_lock:
self._shutdown = True
self._work_queue.put(_TOMBSTONE)
if wait:
self._pool.waitall()
# NOTE(harlowja): Fixed in eventlet 0.15 (remove when able to use)
if not self._delayed_work.empty():
self._delayed_work.join()
class _GreenWaiter(object):
"""Provides the event that wait_for_any() blocks on."""
def __init__(self):
self.event = green_threading.Event()
self.event = greenthreading.Event()
def add_result(self, future):
self.event.set()