Merge "Make greenexecutor not keep greenthreads active"
This commit is contained in:
@@ -31,7 +31,7 @@ class GreenExecutorTest(test.TestCase):
|
||||
called[name] += 1
|
||||
|
||||
for i in range(0, amount):
|
||||
yield functools.partial(store_call, name=int(i))
|
||||
yield functools.partial(store_call, name=i)
|
||||
|
||||
def test_func_calls(self):
|
||||
called = collections.defaultdict(int)
|
||||
@@ -44,20 +44,21 @@ class GreenExecutorTest(test.TestCase):
|
||||
self.assertEqual(1, called[1])
|
||||
|
||||
def test_no_construction(self):
|
||||
self.assertRaises(AssertionError, eu.GreenExecutor, 0)
|
||||
self.assertRaises(AssertionError, eu.GreenExecutor, -1)
|
||||
self.assertRaises(AssertionError, eu.GreenExecutor, "-1")
|
||||
self.assertRaises(ValueError, eu.GreenExecutor, 0)
|
||||
self.assertRaises(ValueError, eu.GreenExecutor, -1)
|
||||
self.assertRaises(ValueError, eu.GreenExecutor, "-1")
|
||||
|
||||
def test_result_callback(self):
|
||||
called = collections.defaultdict(int)
|
||||
|
||||
def call_back(future):
|
||||
def callback(future):
|
||||
called[future] += 1
|
||||
|
||||
funcs = list(self.make_funcs(called, 1))
|
||||
with eu.GreenExecutor(2) as e:
|
||||
f = e.submit(funcs[0])
|
||||
f.add_done_callback(call_back)
|
||||
for func in funcs:
|
||||
f = e.submit(func)
|
||||
f.add_done_callback(callback)
|
||||
|
||||
self.assertEqual(2, len(called))
|
||||
|
||||
@@ -87,6 +88,27 @@ class GreenExecutorTest(test.TestCase):
|
||||
result = fs[i].result()
|
||||
self.assertEqual(i, result)
|
||||
|
||||
def test_called_restricted_size(self):
|
||||
called = collections.defaultdict(int)
|
||||
|
||||
with eu.GreenExecutor(1) as e:
|
||||
for f in self.make_funcs(called, 100):
|
||||
e.submit(f)
|
||||
self.assertEqual(99, e.amount_delayed)
|
||||
|
||||
self.assertFalse(e.alive)
|
||||
self.assertEqual(100, len(called))
|
||||
self.assertGreaterEqual(1, e.workers_created)
|
||||
self.assertEqual(0, e.amount_delayed)
|
||||
|
||||
def test_shutdown_twice(self):
|
||||
e = eu.GreenExecutor(1)
|
||||
self.assertTrue(e.alive)
|
||||
e.shutdown()
|
||||
self.assertFalse(e.alive)
|
||||
e.shutdown()
|
||||
self.assertFalse(e.alive)
|
||||
|
||||
def test_func_cancellation(self):
|
||||
called = collections.defaultdict(int)
|
||||
|
||||
|
||||
@@ -15,15 +15,14 @@
|
||||
# under the License.
|
||||
|
||||
import logging
|
||||
import threading
|
||||
|
||||
from concurrent import futures
|
||||
|
||||
try:
|
||||
from eventlet.green import threading as green_threading
|
||||
from eventlet.green import threading as greenthreading
|
||||
from eventlet import greenpool
|
||||
from eventlet import patcher
|
||||
from eventlet import queue
|
||||
from eventlet import patcher as greenpatcher
|
||||
from eventlet import queue as greenqueue
|
||||
EVENTLET_AVAILABLE = True
|
||||
except ImportError:
|
||||
EVENTLET_AVAILABLE = False
|
||||
@@ -33,10 +32,6 @@ from taskflow.utils import lock_utils
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
# NOTE(harlowja): this object signals to threads that they should stop
|
||||
# working and rest in peace.
|
||||
_TOMBSTONE = object()
|
||||
|
||||
_DONE_STATES = frozenset([
|
||||
futures._base.CANCELLED_AND_NOTIFIED,
|
||||
futures._base.FINISHED,
|
||||
@@ -62,26 +57,29 @@ class _WorkItem(object):
|
||||
|
||||
|
||||
class _Worker(object):
|
||||
def __init__(self, executor, work_queue, worker_id):
|
||||
def __init__(self, executor, work, work_queue):
|
||||
self.executor = executor
|
||||
self.work = work
|
||||
self.work_queue = work_queue
|
||||
self.worker_id = worker_id
|
||||
|
||||
def __call__(self):
|
||||
# Run our main piece of work.
|
||||
try:
|
||||
self.work.run()
|
||||
finally:
|
||||
# Consume any delayed work before finishing (this is how we finish
|
||||
# work that was to big for the pool size, but needs to be finished
|
||||
# no matter).
|
||||
while True:
|
||||
work = self.work_queue.get(block=True)
|
||||
if work is _TOMBSTONE:
|
||||
# NOTE(harlowja): give notice to other workers (this is
|
||||
# basically a chain of tombstone calls that will cause all
|
||||
# the workers on the queue to eventually shut-down).
|
||||
self.work_queue.put(_TOMBSTONE)
|
||||
try:
|
||||
w = self.work_queue.get_nowait()
|
||||
except greenqueue.Empty:
|
||||
break
|
||||
else:
|
||||
work.run()
|
||||
except BaseException:
|
||||
LOG.critical("Exception in worker %s of '%s'",
|
||||
self.worker_id, self.executor, exc_info=True)
|
||||
try:
|
||||
w.run()
|
||||
finally:
|
||||
self.work_queue.task_done()
|
||||
|
||||
|
||||
class GreenFuture(futures.Future):
|
||||
@@ -93,8 +91,8 @@ class GreenFuture(futures.Future):
|
||||
# functions will correctly yield to eventlet. If this is not done then
|
||||
# waiting on the future never actually causes the greenthreads to run
|
||||
# and thus you wait for infinity.
|
||||
if not patcher.is_monkey_patched('threading'):
|
||||
self._condition = green_threading.Condition()
|
||||
if not greenpatcher.is_monkey_patched('threading'):
|
||||
self._condition = greenthreading.Condition()
|
||||
|
||||
|
||||
class GreenExecutor(futures.Executor):
|
||||
@@ -102,44 +100,59 @@ class GreenExecutor(futures.Executor):
|
||||
|
||||
def __init__(self, max_workers=1000):
|
||||
assert EVENTLET_AVAILABLE, 'eventlet is needed to use a green executor'
|
||||
assert int(max_workers) > 0, 'Max workers must be greater than zero'
|
||||
self._max_workers = int(max_workers)
|
||||
if self._max_workers <= 0:
|
||||
raise ValueError('Max workers must be greater than zero')
|
||||
self._pool = greenpool.GreenPool(self._max_workers)
|
||||
self._work_queue = queue.LightQueue()
|
||||
self._shutdown_lock = threading.RLock()
|
||||
self._delayed_work = greenqueue.Queue()
|
||||
self._shutdown_lock = greenthreading.Lock()
|
||||
self._shutdown = False
|
||||
self._workers_created = 0
|
||||
|
||||
@property
|
||||
def workers_created(self):
|
||||
return self._workers_created
|
||||
|
||||
@property
|
||||
def amount_delayed(self):
|
||||
return self._delayed_work.qsize()
|
||||
|
||||
@property
|
||||
def alive(self):
|
||||
return not self._shutdown
|
||||
|
||||
@lock_utils.locked(lock='_shutdown_lock')
|
||||
def submit(self, fn, *args, **kwargs):
|
||||
if self._shutdown:
|
||||
raise RuntimeError('cannot schedule new futures after shutdown')
|
||||
f = GreenFuture()
|
||||
w = _WorkItem(f, fn, args, kwargs)
|
||||
self._work_queue.put(w)
|
||||
# Spin up any new workers (since they are spun up on demand and
|
||||
# not at executor initialization).
|
||||
self._spin_up()
|
||||
work = _WorkItem(f, fn, args, kwargs)
|
||||
if not self._spin_up(work):
|
||||
self._delayed_work.put(work)
|
||||
return f
|
||||
|
||||
def _spin_up(self):
|
||||
cur_am = (self._pool.running() + self._pool.waiting())
|
||||
if cur_am < self._max_workers and cur_am < self._work_queue.qsize():
|
||||
# Spin up a new worker to do the work as we are behind.
|
||||
worker = _Worker(self, self._work_queue, cur_am + 1)
|
||||
self._pool.spawn(worker)
|
||||
def _spin_up(self, work):
|
||||
alive = self._pool.running() + self._pool.waiting()
|
||||
if alive < self._max_workers:
|
||||
self._pool.spawn_n(_Worker(self, work, self._delayed_work))
|
||||
self._workers_created += 1
|
||||
return True
|
||||
return False
|
||||
|
||||
def shutdown(self, wait=True):
|
||||
with self._shutdown_lock:
|
||||
self._shutdown = True
|
||||
self._work_queue.put(_TOMBSTONE)
|
||||
if wait:
|
||||
self._pool.waitall()
|
||||
# NOTE(harlowja): Fixed in eventlet 0.15 (remove when able to use)
|
||||
if not self._delayed_work.empty():
|
||||
self._delayed_work.join()
|
||||
|
||||
|
||||
class _GreenWaiter(object):
|
||||
"""Provides the event that wait_for_any() blocks on."""
|
||||
def __init__(self):
|
||||
self.event = green_threading.Event()
|
||||
self.event = greenthreading.Event()
|
||||
|
||||
def add_result(self, future):
|
||||
self.event.set()
|
||||
|
||||
Reference in New Issue
Block a user