Create a green executor & green future

In order to use the multi-threaded engine
which takes an executor as input for projects
which have *not* eventlet monkey patched the
threading module we provide a eventlet
compatible executor and future objects which
can work with the same multi-threaded engine.

This executor also works in natively threaded
applications by correctly altering the future
condition attribute to use a greened condition,
which is required to make sure that greenthreads
run when the condition is waited on.

blueprint eventlet-engine

Change-Id: Ida9ce6183471ad6b323a3c9ca863561699e32ddc
This commit is contained in:
Joshua Harlow
2013-10-04 19:56:10 -07:00
committed by Joshua Harlow
parent 0bc77eecfb
commit 38206d9c0c
3 changed files with 238 additions and 0 deletions

View File

@@ -12,6 +12,8 @@ Babel>=1.3
stevedore>=0.10
# Backport for concurrent.futures which exists in 3.2+
futures>=2.1.3
# Only needed if the eventlet executor is used.
eventlet>=0.13.0
# NOTE(harlowja): if you want to be able to use the graph_utils
# export_graph_to_dot function you will need to uncomment the following.
# pydot>=1.0

View File

@@ -0,0 +1,109 @@
# -*- coding: utf-8 -*-
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright (C) 2013 Yahoo! Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import collections
import functools
from taskflow import test
from taskflow.utils import eventlet_utils as eu
class GreenExecutorTest(test.TestCase):
def make_funcs(self, called, amount):
def store_call(name):
called[name] += 1
for i in range(0, amount):
yield functools.partial(store_call, name=int(i))
def test_func_calls(self):
called = collections.defaultdict(int)
with eu.GreenExecutor(2) as e:
for f in self.make_funcs(called, 2):
e.submit(f)
self.assertEquals(1, called[0])
self.assertEquals(1, called[1])
def test_no_construction(self):
self.assertRaises(AssertionError, eu.GreenExecutor, 0)
self.assertRaises(AssertionError, eu.GreenExecutor, -1)
self.assertRaises(AssertionError, eu.GreenExecutor, "-1")
def test_result_callback(self):
called = collections.defaultdict(int)
def call_back(future):
called[future] += 1
funcs = list(self.make_funcs(called, 1))
with eu.GreenExecutor(2) as e:
f = e.submit(funcs[0])
f.add_done_callback(call_back)
self.assertEquals(2, len(called))
def test_exception_transfer(self):
def blowup():
raise IOError("Broke!")
with eu.GreenExecutor(2) as e:
f = e.submit(blowup)
self.assertRaises(IOError, f.result)
def test_result_transfer(self):
def return_given(given):
return given
create_am = 50
with eu.GreenExecutor(2) as e:
futures = []
for i in range(0, create_am):
futures.append(e.submit(functools.partial(return_given, i)))
self.assertEquals(create_am, len(futures))
for i in range(0, create_am):
result = futures[i].result()
self.assertEquals(i, result)
def test_func_cancellation(self):
called = collections.defaultdict(int)
futures = []
with eu.GreenExecutor(2) as e:
for func in self.make_funcs(called, 2):
futures.append(e.submit(func))
# Greenthreads don't start executing until we wait for them
# to, since nothing here does IO, this will work out correctly.
#
# If something here did a blocking call, then eventlet could swap
# one of the executors threads in, but nothing in this test does.
for f in futures:
self.assertFalse(f.running())
f.cancel()
self.assertEquals(0, len(called))
for f in futures:
self.assertTrue(f.cancelled())
self.assertTrue(f.done())

View File

@@ -0,0 +1,127 @@
# -*- coding: utf-8 -*-
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright (C) 2013 Yahoo! Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import logging
import threading
from eventlet.green import threading as gthreading
from eventlet import greenpool
from eventlet import patcher
from eventlet import queue
from concurrent import futures
from taskflow.utils import lock_utils
LOG = logging.getLogger(__name__)
# NOTE(harlowja): this object signals to threads that they should stop
# working and rest in peace.
_TOMBSTONE = object()
class _WorkItem(object):
def __init__(self, future, fn, args, kwargs):
self.future = future
self.fn = fn
self.args = args
self.kwargs = kwargs
def run(self):
if not self.future.set_running_or_notify_cancel():
return
try:
result = self.fn(*self.args, **self.kwargs)
except BaseException as e:
self.future.set_exception(e)
else:
self.future.set_result(result)
class _Worker(object):
def __init__(self, executor, work_queue, worker_id):
self.executor = executor
self.work_queue = work_queue
self.worker_id = worker_id
def __call__(self):
try:
while True:
work = self.work_queue.get(block=True)
if work is _TOMBSTONE:
# NOTE(harlowja): give notice to other workers (this is
# basically a chain of tombstone calls that will cause all
# the workers on the queue to eventually shut-down).
self.work_queue.put(_TOMBSTONE)
break
else:
work.run()
except BaseException:
LOG.critical("Exception in worker %s of '%s'",
self.worker_id, self.executor, exc_info=True)
class _GreenFuture(futures.Future):
def __init__(self):
super(_GreenFuture, self).__init__()
# NOTE(harlowja): replace the built-in condition with a greenthread
# compatible one so that when getting the result of this future the
# functions will correctly yield to eventlet. If this is not done then
# waiting on the future never actually causes the greenthreads to run
# and thus you wait for infinity.
if not patcher.is_monkey_patched('threading'):
self._condition = gthreading.Condition()
class GreenExecutor(futures.Executor):
"""A greenthread backed executor."""
def __init__(self, max_workers):
assert int(max_workers) > 0, 'Max workers must be greater than zero'
self._max_workers = int(max_workers)
self._pool = greenpool.GreenPool(self._max_workers)
self._work_queue = queue.LightQueue()
self._shutdown_lock = threading.RLock()
self._shutdown = False
@lock_utils.locked(lock='_shutdown_lock')
def submit(self, fn, *args, **kwargs):
if self._shutdown:
raise RuntimeError('cannot schedule new futures after shutdown')
f = _GreenFuture()
w = _WorkItem(f, fn, args, kwargs)
self._work_queue.put(w)
# Spin up any new workers (since they are spun up on demand and
# not at executor initialization).
self._spin_up()
return f
def _spin_up(self):
cur_am = (self._pool.running() + self._pool.waiting())
if cur_am < self._max_workers and cur_am < self._work_queue.qsize():
# Spin up a new worker to do the work as we are behind.
worker = _Worker(self, self._work_queue, cur_am + 1)
self._pool.spawn(worker)
def shutdown(self, wait=True):
with self._shutdown_lock:
self._shutdown = True
self._work_queue.put(_TOMBSTONE)
if wait:
self._pool.waitall()