From 2019213a622586caed5ecf6483aea26030a85b33 Mon Sep 17 00:00:00 2001 From: Joshua Harlow Date: Fri, 26 Jun 2015 15:16:22 -0700 Subject: [PATCH] Add future waiting helper module This code is useful for waiting on green or regular futures and ensuring this is done in a safe manner (both kinds can not be waited at on the same time, due to how the locking and event objects & strategies of both differ). Depends-On: Ie491d8a504903fb27dcb9610567e660b86669126 Change-Id: I44ae2d260c1ecbdbd45b00b26113f9f1d64db5b6 --- doc/source/api.rst | 8 ++ futurist/tests/test_waiters.py | 89 ++++++++++++++ futurist/waiters.py | 214 +++++++++++++++++++++++++++++++++ requirements.txt | 1 + 4 files changed, 312 insertions(+) create mode 100644 futurist/tests/test_waiters.py create mode 100644 futurist/waiters.py diff --git a/doc/source/api.rst b/doc/source/api.rst index 7d5f78f..38db6b7 100644 --- a/doc/source/api.rst +++ b/doc/source/api.rst @@ -45,3 +45,11 @@ Miscellaneous .. autoclass:: futurist.ExecutorStatistics :members: + +------- +Waiters +------- + +.. autofunction:: futurist.waiters.wait_for_any +.. autofunction:: futurist.waiters.wait_for_all +.. autoclass:: futurist.waiters.DoneAndNotDoneFutures diff --git a/futurist/tests/test_waiters.py b/futurist/tests/test_waiters.py new file mode 100644 index 0000000..e69b3d3 --- /dev/null +++ b/futurist/tests/test_waiters.py @@ -0,0 +1,89 @@ +# -*- coding: utf-8 -*- + +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import time + +import eventlet +import testscenarios + +import futurist +from futurist.tests import base +from futurist import waiters + + +# Module level functions need to be used since the process pool +# executor can not access instance or lambda level functions (since those +# are not pickleable). + +def mini_delay(use_eventlet_sleep=False): + if use_eventlet_sleep: + eventlet.sleep(0.1) + else: + time.sleep(0.1) + return 1 + + +class TestWaiters(testscenarios.TestWithScenarios, base.TestCase): + scenarios = [ + ('sync', {'executor_cls': futurist.SynchronousExecutor, + 'executor_kwargs': {}, 'use_eventlet_sleep': False}), + ('green_sync', {'executor_cls': futurist.SynchronousExecutor, + 'executor_kwargs': {'green': True}, + 'use_eventlet_sleep': True}), + ('green', {'executor_cls': futurist.GreenThreadPoolExecutor, + 'executor_kwargs': {}, 'use_eventlet_sleep': True}), + ('thread', {'executor_cls': futurist.ThreadPoolExecutor, + 'executor_kwargs': {}, 'use_eventlet_sleep': False}), + ('process', {'executor_cls': futurist.ProcessPoolExecutor, + 'executor_kwargs': {}, 'use_eventlet_sleep': False}), + ] + + def setUp(self): + super(TestWaiters, self).setUp() + self.executor = self.executor_cls(**self.executor_kwargs) + + def tearDown(self): + super(TestWaiters, self).tearDown() + self.executor.shutdown() + self.executor = None + + def test_wait_for_any(self): + fs = [] + for _i in range(0, 10): + fs.append(self.executor.submit( + mini_delay, use_eventlet_sleep=self.use_eventlet_sleep)) + all_done_fs = [] + total_fs = len(fs) + while len(all_done_fs) != total_fs: + done, not_done = waiters.wait_for_any(fs) + all_done_fs.extend(done) + fs = not_done + self.assertEqual(total_fs, sum(f.result() for f in all_done_fs)) + + def test_wait_for_all(self): + fs = [] + for _i in range(0, 10): + fs.append(self.executor.submit( + mini_delay, use_eventlet_sleep=self.use_eventlet_sleep)) + done_fs, not_done_fs = waiters.wait_for_all(fs) + self.assertEqual(len(fs), sum(f.result() for f in done_fs)) + self.assertEqual(0, len(not_done_fs)) + + def test_no_mixed_wait_for_any(self): + fs = [futurist.GreenFuture(), futurist.Future()] + self.assertRaises(RuntimeError, waiters.wait_for_any, fs) + + def test_no_mixed_wait_for_all(self): + fs = [futurist.GreenFuture(), futurist.Future()] + self.assertRaises(RuntimeError, waiters.wait_for_all, fs) diff --git a/futurist/waiters.py b/futurist/waiters.py new file mode 100644 index 0000000..24aeebc --- /dev/null +++ b/futurist/waiters.py @@ -0,0 +1,214 @@ +# -*- coding: utf-8 -*- + +# Copyright (C) 2015 Yahoo! Inc. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +try: + from contextlib import ExitStack +except ImportError: + from contextlib2 import ExitStack + +import collections +import contextlib + +from concurrent import futures +from concurrent.futures import _base +import six + +import futurist +from futurist import _utils + +try: + from eventlet.green import threading as greenthreading +except ImportError: + greenthreading = None + + +#: Named tuple returned from ``wait_for*`` calls. +DoneAndNotDoneFutures = collections.namedtuple( + 'DoneAndNotDoneFutures', 'done not_done') + +_DONE_STATES = frozenset([ + _base.CANCELLED_AND_NOTIFIED, + _base.FINISHED, +]) + + +@contextlib.contextmanager +def _acquire_and_release_futures(fs): + # Do this to ensure that we always get the futures in the same order (aka + # always acquire the conditions in the same order, no matter what; a way + # to avoid dead-lock). + fs = sorted(fs, key=id) + with ExitStack() as stack: + for fut in fs: + stack.enter_context(fut._condition) + yield + + +def _ensure_eventlet(func): + """Decorator that verifies we have the needed eventlet components.""" + + @six.wraps(func) + def wrapper(*args, **kwargs): + if not _utils.EVENTLET_AVAILABLE or greenthreading is None: + raise RuntimeError('Eventlet is needed to wait on green futures') + return func(*args, **kwargs) + + return wrapper + + +def _wait_for(fs, no_green_return_when, on_all_green_cb, + caller_name, timeout=None): + green_fs = sum(1 for f in fs if isinstance(f, futurist.GreenFuture)) + if not green_fs: + done, not_done = futures.wait(fs, timeout=timeout, + return_when=no_green_return_when) + return DoneAndNotDoneFutures(done, not_done) + else: + non_green_fs = len(fs) - green_fs + if non_green_fs: + raise RuntimeError("Can not wait on %s green futures and %s" + " non-green futures in the same" + " `%s` call" % (green_fs, non_green_fs, + caller_name)) + else: + return on_all_green_cb(fs, timeout=timeout) + + +def wait_for_all(fs, timeout=None): + """Wait for all of the futures to complete. + + Works correctly with both green and non-green futures (but not both + together, since this can't be guaranteed to avoid dead-lock due to how + the waiting implementations are different when green threads are being + used). + + Returns pair (done futures, not done futures). + """ + return _wait_for(fs, futures.ALL_COMPLETED, _wait_for_all_green, + 'wait_for_all', timeout=timeout) + + +def wait_for_any(fs, timeout=None): + """Wait for one (**any**) of the futures to complete. + + Works correctly with both green and non-green futures (but not both + together, since this can't be guaranteed to avoid dead-lock due to how + the waiting implementations are different when green threads are being + used). + + Returns pair (done futures, not done futures). + """ + return _wait_for(fs, futures.FIRST_COMPLETED, _wait_for_any_green, + 'wait_for_any', timeout=timeout) + + +class _AllGreenWaiter(object): + """Provides the event that ``_wait_for_all_green`` blocks on.""" + + def __init__(self, pending): + self.event = greenthreading.Event() + self.lock = greenthreading.Lock() + self.pending = pending + + def _decrement_pending(self): + with self.lock: + self.pending -= 1 + if self.pending <= 0: + self.event.set() + + def add_result(self, future): + self._decrement_pending() + + def add_exception(self, future): + self._decrement_pending() + + def add_cancelled(self, future): + self._decrement_pending() + + +class _AnyGreenWaiter(object): + """Provides the event that ``_wait_for_any_green`` blocks on.""" + + def __init__(self): + self.event = greenthreading.Event() + + def add_result(self, future): + self.event.set() + + def add_exception(self, future): + self.event.set() + + def add_cancelled(self, future): + self.event.set() + + +def _partition_futures(fs): + done = set() + not_done = set() + for f in fs: + if f._state in _DONE_STATES: + done.add(f) + else: + not_done.add(f) + return done, not_done + + +def _create_and_install_waiters(fs, waiter_cls, *args, **kwargs): + waiter = waiter_cls(*args, **kwargs) + for f in fs: + f._waiters.append(waiter) + return waiter + + +@_ensure_eventlet +def _wait_for_all_green(fs, timeout=None): + if not fs: + return DoneAndNotDoneFutures(set(), set()) + + with _acquire_and_release_futures(fs): + done, not_done = _partition_futures(fs) + if len(done) == len(fs): + return DoneAndNotDoneFutures(done, not_done) + waiter = _create_and_install_waiters(not_done, + _AllGreenWaiter, + len(not_done)) + waiter.event.wait(timeout) + for f in not_done: + f._waiters.remove(waiter) + + with _acquire_and_release_futures(fs): + done, not_done = _partition_futures(fs) + return DoneAndNotDoneFutures(done, not_done) + + +@_ensure_eventlet +def _wait_for_any_green(fs, timeout=None): + if not fs: + return DoneAndNotDoneFutures(set(), set()) + + with _acquire_and_release_futures(fs): + done, not_done = _partition_futures(fs) + if done: + return DoneAndNotDoneFutures(done, not_done) + waiter = _create_and_install_waiters(fs, _AnyGreenWaiter) + + waiter.event.wait(timeout) + for f in fs: + f._waiters.remove(waiter) + + with _acquire_and_release_futures(fs): + done, not_done = _partition_futures(fs) + return DoneAndNotDoneFutures(done, not_done) diff --git a/requirements.txt b/requirements.txt index de9331a..ccf8c07 100644 --- a/requirements.txt +++ b/requirements.txt @@ -6,3 +6,4 @@ pbr<2.0,>=0.11 six>=1.9.0 monotonic>=0.1 # Apache-2.0 futures>=3.0;python_version=='2.7' or python_version=='2.6' +contextlib2>=0.4.0 # PSF License