Use futures wait() when possible

Instead of always using our custom future wait
functionality, only use that functionality if there
are green futures and in other cases just use the
future wait() function instead.

Change-Id: I1eadcf53eb4b5f47b9543965610bfe04fec52e70
This commit is contained in:
Joshua Harlow
2014-05-11 09:12:55 -07:00
parent 19ab3392cb
commit 7655ae02ce
5 changed files with 71 additions and 75 deletions

View File

@@ -99,7 +99,7 @@ class SerialTaskExecutor(TaskExecutorBase):
def wait_for_any(self, fs, timeout=None):
# NOTE(imelnikov): this executor returns only done futures.
return fs, []
return (fs, set())
class ParallelTaskExecutor(TaskExecutorBase):

View File

@@ -58,10 +58,10 @@ class FutureGraphAction(object):
def _schedule(self, nodes):
"""Schedule a group of nodes for execution."""
futures = []
futures = set()
for node in nodes:
try:
futures.append(self._schedule_node(node))
futures.add(self._schedule_node(node))
except Exception:
# Immediately stop scheduling future work so that we can
# exit execution early (rather than later) if a single task
@@ -83,7 +83,7 @@ class FutureGraphAction(object):
if self.is_running():
not_done, failures = self._schedule(next_nodes)
else:
not_done, failures = ([], [])
not_done, failures = (set(), [])
# Run!
#
@@ -129,7 +129,7 @@ class FutureGraphAction(object):
# Recheck incase someone suspended it.
if self.is_running():
more_not_done, failures = self._schedule(next_nodes)
not_done.extend(more_not_done)
not_done.update(more_not_done)
if failures:
misc.Failure.reraise_if_any(failures)

View File

@@ -54,40 +54,34 @@ class WaitForAnyTestsMixin(object):
self.assertIs(done.pop(), f2)
class WaiterTestsMixin(object):
@testtools.skipIf(not eu.EVENTLET_AVAILABLE, 'eventlet is not available')
class AsyncUtilsEventletTest(test.TestCase,
WaitForAnyTestsMixin):
executor_cls = eu.GreenExecutor
is_green = True
def test_add_result(self):
waiter = au._Waiter(self.is_green)
waiter = eu._GreenWaiter()
self.assertFalse(waiter.event.is_set())
waiter.add_result(futures.Future())
self.assertTrue(waiter.event.is_set())
def test_add_exception(self):
waiter = au._Waiter(self.is_green)
waiter = eu._GreenWaiter()
self.assertFalse(waiter.event.is_set())
waiter.add_exception(futures.Future())
self.assertTrue(waiter.event.is_set())
def test_add_cancelled(self):
waiter = au._Waiter(self.is_green)
waiter = eu._GreenWaiter()
self.assertFalse(waiter.event.is_set())
waiter.add_cancelled(futures.Future())
self.assertTrue(waiter.event.is_set())
@testtools.skipIf(not eu.EVENTLET_AVAILABLE, 'eventlet is not available')
class AsyncUtilsEventletTest(test.TestCase,
WaitForAnyTestsMixin,
WaiterTestsMixin):
executor_cls = eu.GreenExecutor
is_green = True
class AsyncUtilsThreadedTest(test.TestCase,
WaitForAnyTestsMixin,
WaiterTestsMixin):
WaitForAnyTestsMixin):
executor_cls = futures.ThreadPoolExecutor
is_green = False
class MakeCompletedFutureTest(test.TestCase):

View File

@@ -14,71 +14,24 @@
# License for the specific language governing permissions and limitations
# under the License.
import threading
from concurrent import futures
from taskflow.utils import eventlet_utils as eu
DONE_STATES = frozenset([
futures._base.CANCELLED_AND_NOTIFIED,
futures._base.FINISHED,
])
class _Waiter(object):
"""Provides the event that wait_for_any() blocks on."""
def __init__(self, is_green):
if is_green:
assert eu.EVENTLET_AVAILABLE, ('eventlet is needed to use this'
' feature')
self.event = eu.green_threading.Event()
else:
self.event = threading.Event()
def add_result(self, future):
self.event.set()
def add_exception(self, future):
self.event.set()
def add_cancelled(self, future):
self.event.set()
def _partition_futures(fs):
"""Partitions the input futures into done and not done lists."""
done = []
not_done = []
for f in fs:
if f._state in DONE_STATES:
done.append(f)
else:
not_done.append(f)
return (done, not_done)
def wait_for_any(fs, timeout=None):
"""Wait for one of the futures to complete.
Works correctly with both green and non-green futures.
Returns pair (done, not_done).
"""
with futures._base._AcquireFutures(fs):
(done, not_done) = _partition_futures(fs)
if done:
return (done, not_done)
is_green = any(isinstance(f, eu.GreenFuture) for f in fs)
waiter = _Waiter(is_green)
for f in fs:
f._waiters.append(waiter)
waiter.event.wait(timeout)
for f in fs:
f._waiters.remove(waiter)
with futures._base._AcquireFutures(fs):
return _partition_futures(fs)
any_green = any(isinstance(f, eu.GreenFuture) for f in fs)
if any_green:
return eu.wait_for_any(fs, timeout=timeout)
else:
return tuple(futures.wait(fs, timeout=timeout,
return_when=futures.FIRST_COMPLETED))
def make_completed_future(result):

View File

@@ -37,6 +37,11 @@ LOG = logging.getLogger(__name__)
# working and rest in peace.
_TOMBSTONE = object()
_DONE_STATES = frozenset([
futures._base.CANCELLED_AND_NOTIFIED,
futures._base.FINISHED,
])
class _WorkItem(object):
def __init__(self, future, fn, args, kwargs):
@@ -82,6 +87,7 @@ class _Worker(object):
class GreenFuture(futures.Future):
def __init__(self):
super(GreenFuture, self).__init__()
assert EVENTLET_AVAILABLE, 'eventlet is needed to use a green future'
# NOTE(harlowja): replace the built-in condition with a greenthread
# compatible one so that when getting the result of this future the
# functions will correctly yield to eventlet. If this is not done then
@@ -95,7 +101,7 @@ class GreenExecutor(futures.Executor):
"""A greenthread backed executor."""
def __init__(self, max_workers=1000):
assert EVENTLET_AVAILABLE, 'eventlet is needed to use GreenExecutor'
assert EVENTLET_AVAILABLE, 'eventlet is needed to use a green executor'
assert int(max_workers) > 0, 'Max workers must be greater than zero'
self._max_workers = int(max_workers)
self._pool = greenpool.GreenPool(self._max_workers)
@@ -128,3 +134,46 @@ class GreenExecutor(futures.Executor):
self._work_queue.put(_TOMBSTONE)
if wait:
self._pool.waitall()
class _GreenWaiter(object):
"""Provides the event that wait_for_any() blocks on."""
def __init__(self):
self.event = green_threading.Event()
def add_result(self, future):
self.event.set()
def add_exception(self, future):
self.event.set()
def add_cancelled(self, future):
self.event.set()
def _partition_futures(fs):
"""Partitions the input futures into done and not done lists."""
done = set()
not_done = set()
for f in fs:
if f._state in _DONE_STATES:
done.add(f)
else:
not_done.add(f)
return (done, not_done)
def wait_for_any(fs, timeout=None):
assert EVENTLET_AVAILABLE, ('eventlet is needed to wait on green futures')
with futures._base._AcquireFutures(fs):
(done, not_done) = _partition_futures(fs)
if done:
return (done, not_done)
waiter = _GreenWaiter()
for f in fs:
f._waiters.append(waiter)
waiter.event.wait(timeout)
for f in fs:
f._waiters.remove(waiter)
with futures._base._AcquireFutures(fs):
return _partition_futures(fs)