diff --git a/taskflow/engines/action_engine/executor.py b/taskflow/engines/action_engine/executor.py index 8065b4cee..1e40f5905 100644 --- a/taskflow/engines/action_engine/executor.py +++ b/taskflow/engines/action_engine/executor.py @@ -99,19 +99,13 @@ class TaskExecutorBase(object): class SerialTaskExecutor(TaskExecutorBase): """Execute task one after another.""" - @staticmethod - def _completed_future(result): - future = futures.Future() - future.set_result(result) - return future - def execute_task(self, task, arguments, progress_callback=_noop): - return self._completed_future( + return async_utils.make_completed_future( _execute_task(task, arguments, progress_callback)) def revert_task(self, task, arguments, result, failures, progress_callback=_noop): - return self._completed_future( + return async_utils.make_completed_future( _revert_task(task, arguments, result, failures, progress_callback)) diff --git a/taskflow/tests/unit/test_green_executor.py b/taskflow/tests/unit/test_green_executor.py index 85104e3bf..3a9833b19 100644 --- a/taskflow/tests/unit/test_green_executor.py +++ b/taskflow/tests/unit/test_green_executor.py @@ -20,10 +20,7 @@ import collections import functools import testtools -from concurrent import futures - from taskflow import test -from taskflow.utils import async_utils as au from taskflow.utils import eventlet_utils as eu @@ -111,35 +108,3 @@ class GreenExecutorTest(test.TestCase): for f in fs: self.assertTrue(f.cancelled()) self.assertTrue(f.done()) - - -class WaitForAnyTestCase(test.TestCase): - - @testtools.skipIf(not eu.EVENTLET_AVAILABLE, 'eventlet is not available') - def test_green_waits_and_finishes(self): - def foo(): - pass - - e = eu.GreenExecutor() - - f1 = e.submit(foo) - f2 = e.submit(foo) - # this test assumes that our foo will end within 10 seconds - done, not_done = au.wait_for_any([f1, f2], 10) - self.assertIn(len(done), (1, 2)) - self.assertTrue(any((f1 in done, f2 in done))) - - def test_threaded_waits_and_finishes(self): - def foo(): - pass - - e = futures.ThreadPoolExecutor(2) - try: - f1 = e.submit(foo) - f2 = e.submit(foo) - # this test assumes that our foo will end within 10 seconds - done, not_done = au.wait_for_any([f1, f2], 10) - self.assertIn(len(done), (1, 2)) - self.assertTrue(any((f1 in done, f2 in done))) - finally: - e.shutdown() diff --git a/taskflow/tests/unit/test_utils_async_utils.py b/taskflow/tests/unit/test_utils_async_utils.py new file mode 100644 index 000000000..36d369f9b --- /dev/null +++ b/taskflow/tests/unit/test_utils_async_utils.py @@ -0,0 +1,67 @@ +# -*- coding: utf-8 -*- + +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright (C) 2013 Yahoo! Inc. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import testtools + +from concurrent import futures + +from taskflow import test + +from taskflow.utils import async_utils as au +from taskflow.utils import eventlet_utils as eu + + +class WaitForAnyTestCase(test.TestCase): + + @testtools.skipIf(not eu.EVENTLET_AVAILABLE, 'eventlet is not available') + def test_green_waits_and_finishes(self): + def foo(): + pass + + e = eu.GreenExecutor() + + f1 = e.submit(foo) + f2 = e.submit(foo) + # this test assumes that our foo will end within 10 seconds + done, not_done = au.wait_for_any([f1, f2], 10) + self.assertIn(len(done), (1, 2)) + self.assertTrue(any((f1 in done, f2 in done))) + + def test_threaded_waits_and_finishes(self): + def foo(): + pass + + e = futures.ThreadPoolExecutor(2) + try: + f1 = e.submit(foo) + f2 = e.submit(foo) + # this test assumes that our foo will end within 10 seconds + done, not_done = au.wait_for_any([f1, f2], 10) + self.assertIn(len(done), (1, 2)) + self.assertTrue(any((f1 in done, f2 in done))) + finally: + e.shutdown() + + +class MakeCompletedFutureTestCase(test.TestCase): + + def test_make_completed_future(self): + result = object() + future = au.make_completed_future(result) + self.assertTrue(future.done()) + self.assertIs(future.result(), result) diff --git a/taskflow/utils/async_utils.py b/taskflow/utils/async_utils.py index 69aca88d9..9a3d5fcdb 100644 --- a/taskflow/utils/async_utils.py +++ b/taskflow/utils/async_utils.py @@ -70,4 +70,11 @@ def wait_for_any(fs, timeout=None): with futures._base._AcquireFutures(fs): done = _done_futures(fs) - return done, set(fs) - done + return done, set(fs) - done + + +def make_completed_future(result): + """Make with completed with given result""" + future = futures.Future() + future.set_result(result) + return future