diff --git a/taskflow/engines/action_engine/runtime.py b/taskflow/engines/action_engine/runtime.py index 38998b8c..2841968a 100644 --- a/taskflow/engines/action_engine/runtime.py +++ b/taskflow/engines/action_engine/runtime.py @@ -16,6 +16,8 @@ import functools +from futurist import waiters + from taskflow.engines.action_engine.actions import retry as ra from taskflow.engines.action_engine.actions import task as ta from taskflow.engines.action_engine import analyzer as an @@ -26,7 +28,6 @@ from taskflow.engines.action_engine import scopes as sc from taskflow import flow as flow_type from taskflow import states as st from taskflow import task -from taskflow.utils import async_utils from taskflow.utils import misc @@ -114,7 +115,7 @@ class Runtime(object): @misc.cachedproperty def runner(self): - return ru.Runner(self, async_utils.wait_for_any) + return ru.Runner(self, waiters.wait_for_any) @misc.cachedproperty def completer(self): diff --git a/taskflow/tests/unit/test_utils_async_utils.py b/taskflow/tests/unit/test_utils_async_utils.py index 1f8b0119..bd8b9a6b 100644 --- a/taskflow/tests/unit/test_utils_async_utils.py +++ b/taskflow/tests/unit/test_utils_async_utils.py @@ -14,56 +14,8 @@ # License for the specific language governing permissions and limitations # under the License. -import futurist -import testtools - from taskflow import test from taskflow.utils import async_utils as au -from taskflow.utils import eventlet_utils as eu - - -class WaitForAnyTestsMixin(object): - timeout = 0.001 - - def test_waits_and_finishes(self): - def foo(): - pass - - with self._make_executor(2) as e: - fs = [e.submit(foo), e.submit(foo)] - # this test assumes that our foo will end within 10 seconds - done, not_done = au.wait_for_any(fs, 10) - self.assertIn(len(done), (1, 2)) - self.assertTrue(any(f in done for f in fs)) - - def test_not_done_futures(self): - fs = [futurist.Future(), futurist.Future()] - done, not_done = au.wait_for_any(fs, self.timeout) - self.assertEqual(len(done), 0) - self.assertEqual(len(not_done), 2) - - def test_mixed_futures(self): - f1 = futurist.Future() - f2 = futurist.Future() - f2.set_result(1) - done, not_done = au.wait_for_any([f1, f2], self.timeout) - self.assertEqual(len(done), 1) - self.assertEqual(len(not_done), 1) - self.assertIs(not_done.pop(), f1) - self.assertIs(done.pop(), f2) - - -@testtools.skipIf(not eu.EVENTLET_AVAILABLE, 'eventlet is not available') -class AsyncUtilsEventletTest(test.TestCase, - WaitForAnyTestsMixin): - def _make_executor(self, max_workers): - return futurist.GreenThreadPoolExecutor(max_workers=max_workers) - - -class AsyncUtilsThreadedTest(test.TestCase, - WaitForAnyTestsMixin): - def _make_executor(self, max_workers): - return futurist.ThreadPoolExecutor(max_workers=max_workers) class MakeCompletedFutureTest(test.TestCase): @@ -73,9 +25,3 @@ class MakeCompletedFutureTest(test.TestCase): future = au.make_completed_future(result) self.assertTrue(future.done()) self.assertIs(future.result(), result) - - -class AsyncUtilsSynchronousTest(test.TestCase, - WaitForAnyTestsMixin): - def _make_executor(self, max_workers): - return futurist.SynchronousExecutor() diff --git a/taskflow/tests/unit/worker_based/test_pipeline.py b/taskflow/tests/unit/worker_based/test_pipeline.py index a2075763..56740159 100644 --- a/taskflow/tests/unit/worker_based/test_pipeline.py +++ b/taskflow/tests/unit/worker_based/test_pipeline.py @@ -15,6 +15,7 @@ # under the License. import futurist +from futurist import waiters from oslo_utils import uuidutils from taskflow.engines.action_engine import executor as base_executor @@ -24,7 +25,6 @@ from taskflow.engines.worker_based import server as worker_server from taskflow import test from taskflow.tests import utils as test_utils from taskflow.types import failure -from taskflow.utils import async_utils from taskflow.utils import threading_utils @@ -78,7 +78,7 @@ class TestPipeline(test.TestCase): progress_callback = lambda *args, **kwargs: None f = executor.execute_task(t, uuidutils.generate_uuid(), {}, progress_callback=progress_callback) - async_utils.wait_for_any([f]) + waiters.wait_for_any([f]) event, result = f.result() self.assertEqual(1, result) @@ -94,7 +94,7 @@ class TestPipeline(test.TestCase): progress_callback = lambda *args, **kwargs: None f = executor.execute_task(t, uuidutils.generate_uuid(), {}, progress_callback=progress_callback) - async_utils.wait_for_any([f]) + waiters.wait_for_any([f]) action, result = f.result() self.assertIsInstance(result, failure.Failure) diff --git a/taskflow/utils/async_utils.py b/taskflow/utils/async_utils.py index c4b114b8..cc24d215 100644 --- a/taskflow/utils/async_utils.py +++ b/taskflow/utils/async_utils.py @@ -14,20 +14,7 @@ # License for the specific language governing permissions and limitations # under the License. -from concurrent import futures as _futures -from concurrent.futures import _base import futurist -from oslo_utils import importutils - -greenthreading = importutils.try_import('eventlet.green.threading') - -from taskflow.utils import eventlet_utils as eu - - -_DONE_STATES = frozenset([ - _base.CANCELLED_AND_NOTIFIED, - _base.FINISHED, -]) def make_completed_future(result): @@ -35,78 +22,3 @@ def make_completed_future(result): future = futurist.Future() future.set_result(result) return future - - -def wait_for_any(fs, timeout=None): - """Wait for one of the futures to complete. - - Works correctly with both green and non-green futures (but not both - together, since this can't be guaranteed to avoid dead-lock due to how - the waiting implementations are different when green threads are being - used). - - Returns pair (done futures, not done futures). - """ - # TODO(harlowja): remove this when - # https://review.openstack.org/#/c/196269/ is merged and is made - # available. - green_fs = sum(1 for f in fs if isinstance(f, futurist.GreenFuture)) - if not green_fs: - return _futures.wait(fs, - timeout=timeout, - return_when=_futures.FIRST_COMPLETED) - else: - non_green_fs = len(fs) - green_fs - if non_green_fs: - raise RuntimeError("Can not wait on %s green futures and %s" - " non-green futures in the same `wait_for_any`" - " call" % (green_fs, non_green_fs)) - else: - return _wait_for_any_green(fs, timeout=timeout) - - -class _GreenWaiter(object): - """Provides the event that wait_for_any() blocks on.""" - def __init__(self): - self.event = greenthreading.Event() - - def add_result(self, future): - self.event.set() - - def add_exception(self, future): - self.event.set() - - def add_cancelled(self, future): - self.event.set() - - -def _partition_futures(fs): - done = set() - not_done = set() - for f in fs: - if f._state in _DONE_STATES: - done.add(f) - else: - not_done.add(f) - return done, not_done - - -def _wait_for_any_green(fs, timeout=None): - eu.check_for_eventlet(RuntimeError('Eventlet is needed to wait on' - ' green futures')) - - with _base._AcquireFutures(fs): - done, not_done = _partition_futures(fs) - if done: - return _base.DoneAndNotDoneFutures(done, not_done) - waiter = _GreenWaiter() - for f in fs: - f._waiters.append(waiter) - - waiter.event.wait(timeout) - for f in fs: - f._waiters.remove(waiter) - - with _base._AcquireFutures(fs): - done, not_done = _partition_futures(fs) - return _base.DoneAndNotDoneFutures(done, not_done)