Merge "Bump futurist and remove waiting code in taskflow"
This commit is contained in:
		| @@ -16,6 +16,8 @@ | ||||
|  | ||||
| import functools | ||||
|  | ||||
| from futurist import waiters | ||||
|  | ||||
| from taskflow.engines.action_engine.actions import retry as ra | ||||
| from taskflow.engines.action_engine.actions import task as ta | ||||
| from taskflow.engines.action_engine import analyzer as an | ||||
| @@ -26,7 +28,6 @@ from taskflow.engines.action_engine import scopes as sc | ||||
| from taskflow import flow as flow_type | ||||
| from taskflow import states as st | ||||
| from taskflow import task | ||||
| from taskflow.utils import async_utils | ||||
| from taskflow.utils import misc | ||||
|  | ||||
|  | ||||
| @@ -114,7 +115,7 @@ class Runtime(object): | ||||
|  | ||||
|     @misc.cachedproperty | ||||
|     def runner(self): | ||||
|         return ru.Runner(self, async_utils.wait_for_any) | ||||
|         return ru.Runner(self, waiters.wait_for_any) | ||||
|  | ||||
|     @misc.cachedproperty | ||||
|     def completer(self): | ||||
|   | ||||
| @@ -14,56 +14,8 @@ | ||||
| #    License for the specific language governing permissions and limitations | ||||
| #    under the License. | ||||
|  | ||||
| import futurist | ||||
| import testtools | ||||
|  | ||||
| from taskflow import test | ||||
| from taskflow.utils import async_utils as au | ||||
| from taskflow.utils import eventlet_utils as eu | ||||
|  | ||||
|  | ||||
| class WaitForAnyTestsMixin(object): | ||||
|     timeout = 0.001 | ||||
|  | ||||
|     def test_waits_and_finishes(self): | ||||
|         def foo(): | ||||
|             pass | ||||
|  | ||||
|         with self._make_executor(2) as e: | ||||
|             fs = [e.submit(foo), e.submit(foo)] | ||||
|             # this test assumes that our foo will end within 10 seconds | ||||
|             done, not_done = au.wait_for_any(fs, 10) | ||||
|             self.assertIn(len(done), (1, 2)) | ||||
|             self.assertTrue(any(f in done for f in fs)) | ||||
|  | ||||
|     def test_not_done_futures(self): | ||||
|         fs = [futurist.Future(), futurist.Future()] | ||||
|         done, not_done = au.wait_for_any(fs, self.timeout) | ||||
|         self.assertEqual(len(done), 0) | ||||
|         self.assertEqual(len(not_done), 2) | ||||
|  | ||||
|     def test_mixed_futures(self): | ||||
|         f1 = futurist.Future() | ||||
|         f2 = futurist.Future() | ||||
|         f2.set_result(1) | ||||
|         done, not_done = au.wait_for_any([f1, f2], self.timeout) | ||||
|         self.assertEqual(len(done), 1) | ||||
|         self.assertEqual(len(not_done), 1) | ||||
|         self.assertIs(not_done.pop(), f1) | ||||
|         self.assertIs(done.pop(), f2) | ||||
|  | ||||
|  | ||||
| @testtools.skipIf(not eu.EVENTLET_AVAILABLE, 'eventlet is not available') | ||||
| class AsyncUtilsEventletTest(test.TestCase, | ||||
|                              WaitForAnyTestsMixin): | ||||
|     def _make_executor(self, max_workers): | ||||
|         return futurist.GreenThreadPoolExecutor(max_workers=max_workers) | ||||
|  | ||||
|  | ||||
| class AsyncUtilsThreadedTest(test.TestCase, | ||||
|                              WaitForAnyTestsMixin): | ||||
|     def _make_executor(self, max_workers): | ||||
|         return futurist.ThreadPoolExecutor(max_workers=max_workers) | ||||
|  | ||||
|  | ||||
| class MakeCompletedFutureTest(test.TestCase): | ||||
| @@ -73,9 +25,3 @@ class MakeCompletedFutureTest(test.TestCase): | ||||
|         future = au.make_completed_future(result) | ||||
|         self.assertTrue(future.done()) | ||||
|         self.assertIs(future.result(), result) | ||||
|  | ||||
|  | ||||
| class AsyncUtilsSynchronousTest(test.TestCase, | ||||
|                                 WaitForAnyTestsMixin): | ||||
|     def _make_executor(self, max_workers): | ||||
|         return futurist.SynchronousExecutor() | ||||
|   | ||||
| @@ -15,6 +15,7 @@ | ||||
| #    under the License. | ||||
|  | ||||
| import futurist | ||||
| from futurist import waiters | ||||
| from oslo_utils import uuidutils | ||||
|  | ||||
| from taskflow.engines.action_engine import executor as base_executor | ||||
| @@ -24,7 +25,6 @@ from taskflow.engines.worker_based import server as worker_server | ||||
| from taskflow import test | ||||
| from taskflow.tests import utils as test_utils | ||||
| from taskflow.types import failure | ||||
| from taskflow.utils import async_utils | ||||
| from taskflow.utils import threading_utils | ||||
|  | ||||
|  | ||||
| @@ -78,7 +78,7 @@ class TestPipeline(test.TestCase): | ||||
|         progress_callback = lambda *args, **kwargs: None | ||||
|         f = executor.execute_task(t, uuidutils.generate_uuid(), {}, | ||||
|                                   progress_callback=progress_callback) | ||||
|         async_utils.wait_for_any([f]) | ||||
|         waiters.wait_for_any([f]) | ||||
|  | ||||
|         event, result = f.result() | ||||
|         self.assertEqual(1, result) | ||||
| @@ -94,7 +94,7 @@ class TestPipeline(test.TestCase): | ||||
|         progress_callback = lambda *args, **kwargs: None | ||||
|         f = executor.execute_task(t, uuidutils.generate_uuid(), {}, | ||||
|                                   progress_callback=progress_callback) | ||||
|         async_utils.wait_for_any([f]) | ||||
|         waiters.wait_for_any([f]) | ||||
|  | ||||
|         action, result = f.result() | ||||
|         self.assertIsInstance(result, failure.Failure) | ||||
|   | ||||
| @@ -14,20 +14,7 @@ | ||||
| #    License for the specific language governing permissions and limitations | ||||
| #    under the License. | ||||
|  | ||||
| from concurrent import futures as _futures | ||||
| from concurrent.futures import _base | ||||
| import futurist | ||||
| from oslo_utils import importutils | ||||
|  | ||||
| greenthreading = importutils.try_import('eventlet.green.threading') | ||||
|  | ||||
| from taskflow.utils import eventlet_utils as eu | ||||
|  | ||||
|  | ||||
| _DONE_STATES = frozenset([ | ||||
|     _base.CANCELLED_AND_NOTIFIED, | ||||
|     _base.FINISHED, | ||||
| ]) | ||||
|  | ||||
|  | ||||
| def make_completed_future(result): | ||||
| @@ -35,78 +22,3 @@ def make_completed_future(result): | ||||
|     future = futurist.Future() | ||||
|     future.set_result(result) | ||||
|     return future | ||||
|  | ||||
|  | ||||
| def wait_for_any(fs, timeout=None): | ||||
|     """Wait for one of the futures to complete. | ||||
|  | ||||
|     Works correctly with both green and non-green futures (but not both | ||||
|     together, since this can't be guaranteed to avoid dead-lock due to how | ||||
|     the waiting implementations are different when green threads are being | ||||
|     used). | ||||
|  | ||||
|     Returns pair (done futures, not done futures). | ||||
|     """ | ||||
|     # TODO(harlowja): remove this when | ||||
|     # https://review.openstack.org/#/c/196269/ is merged and is made | ||||
|     # available. | ||||
|     green_fs = sum(1 for f in fs if isinstance(f, futurist.GreenFuture)) | ||||
|     if not green_fs: | ||||
|         return _futures.wait(fs, | ||||
|                              timeout=timeout, | ||||
|                              return_when=_futures.FIRST_COMPLETED) | ||||
|     else: | ||||
|         non_green_fs = len(fs) - green_fs | ||||
|         if non_green_fs: | ||||
|             raise RuntimeError("Can not wait on %s green futures and %s" | ||||
|                                " non-green futures in the same `wait_for_any`" | ||||
|                                " call" % (green_fs, non_green_fs)) | ||||
|         else: | ||||
|             return _wait_for_any_green(fs, timeout=timeout) | ||||
|  | ||||
|  | ||||
| class _GreenWaiter(object): | ||||
|     """Provides the event that wait_for_any() blocks on.""" | ||||
|     def __init__(self): | ||||
|         self.event = greenthreading.Event() | ||||
|  | ||||
|     def add_result(self, future): | ||||
|         self.event.set() | ||||
|  | ||||
|     def add_exception(self, future): | ||||
|         self.event.set() | ||||
|  | ||||
|     def add_cancelled(self, future): | ||||
|         self.event.set() | ||||
|  | ||||
|  | ||||
| def _partition_futures(fs): | ||||
|     done = set() | ||||
|     not_done = set() | ||||
|     for f in fs: | ||||
|         if f._state in _DONE_STATES: | ||||
|             done.add(f) | ||||
|         else: | ||||
|             not_done.add(f) | ||||
|     return done, not_done | ||||
|  | ||||
|  | ||||
| def _wait_for_any_green(fs, timeout=None): | ||||
|     eu.check_for_eventlet(RuntimeError('Eventlet is needed to wait on' | ||||
|                                        ' green futures')) | ||||
|  | ||||
|     with _base._AcquireFutures(fs): | ||||
|         done, not_done = _partition_futures(fs) | ||||
|         if done: | ||||
|             return _base.DoneAndNotDoneFutures(done, not_done) | ||||
|         waiter = _GreenWaiter() | ||||
|         for f in fs: | ||||
|             f._waiters.append(waiter) | ||||
|  | ||||
|     waiter.event.wait(timeout) | ||||
|     for f in fs: | ||||
|         f._waiters.remove(waiter) | ||||
|  | ||||
|     with _base._AcquireFutures(fs): | ||||
|         done, not_done = _partition_futures(fs) | ||||
|         return _base.DoneAndNotDoneFutures(done, not_done) | ||||
|   | ||||
		Reference in New Issue
	
	Block a user
	 Jenkins
					Jenkins