From b014fc7d48969bd6812a11a5a0342c9324108876 Mon Sep 17 00:00:00 2001 From: Joshua Harlow Date: Sat, 23 Aug 2014 20:09:10 -0700 Subject: [PATCH] Add a futures type that can unify our future functionality Move the currently existing green future executor and associated code to a new futures types module so that it can be accessed from this new location (TODO: deprecate the old location and link the old to the new for one release so that we can remove the old link in N + 1 release). This unifies the API that the existing pool (thread or process) future executors and the green thread pool future executor, and the newly added synchronous executor (replacing the previous `make_completed_future` function) provide so there usage is as seamless as possible. Part of blueprint top-level-types Change-Id: Ie5500eaa7f4425edb604b2dd13a15f82909a673b --- doc/source/types.rst | 5 + taskflow/engines/action_engine/executor.py | 17 +- .../engines/action_engine/retry_action.py | 62 +++-- taskflow/examples/resume_vm_boot.py | 7 +- .../persistence/backends/impl_sqlalchemy.py | 4 +- taskflow/tests/unit/test_engines.py | 8 +- taskflow/tests/unit/test_futures.py | 222 ++++++++++++++++++ taskflow/tests/unit/test_green_executor.py | 131 ----------- taskflow/tests/unit/test_suspend_flow.py | 7 +- taskflow/tests/unit/test_utils_async_utils.py | 45 ++-- .../eventlet_utils.py => types/futures.py} | 178 +++++++------- taskflow/utils/async_utils.py | 82 ++++++- 12 files changed, 480 insertions(+), 288 deletions(-) create mode 100644 taskflow/tests/unit/test_futures.py delete mode 100644 taskflow/tests/unit/test_green_executor.py rename taskflow/{utils/eventlet_utils.py => types/futures.py} (51%) diff --git a/doc/source/types.rst b/doc/source/types.rst index c628c1ce8..fb9580af0 100644 --- a/doc/source/types.rst +++ b/doc/source/types.rst @@ -17,6 +17,11 @@ FSM .. automodule:: taskflow.types.fsm +Futures +======= + +.. automodule:: taskflow.types.futures + Graph ===== diff --git a/taskflow/engines/action_engine/executor.py b/taskflow/engines/action_engine/executor.py index 40a671eba..83da3b6bb 100644 --- a/taskflow/engines/action_engine/executor.py +++ b/taskflow/engines/action_engine/executor.py @@ -16,10 +16,10 @@ import abc -from concurrent import futures import six from taskflow import task as _task +from taskflow.types import futures from taskflow.utils import async_utils from taskflow.utils import misc from taskflow.utils import threading_utils @@ -94,19 +94,20 @@ class TaskExecutorBase(object): class SerialTaskExecutor(TaskExecutorBase): """Execute task one after another.""" + def __init__(self): + self._executor = futures.SynchronousExecutor() + def execute_task(self, task, task_uuid, arguments, progress_callback=None): - return async_utils.make_completed_future( - _execute_task(task, arguments, progress_callback)) + return self._executor.submit(_execute_task, task, arguments, + progress_callback) def revert_task(self, task, task_uuid, arguments, result, failures, progress_callback=None): - return async_utils.make_completed_future( - _revert_task(task, arguments, result, - failures, progress_callback)) + return self._executor.submit(_revert_task, task, arguments, result, + failures, progress_callback) def wait_for_any(self, fs, timeout=None): - # NOTE(imelnikov): this executor returns only done futures. - return (fs, set()) + return async_utils.wait_for_any(fs, timeout) class ParallelTaskExecutor(TaskExecutorBase): diff --git a/taskflow/engines/action_engine/retry_action.py b/taskflow/engines/action_engine/retry_action.py index e4df5afa9..3bf6f491c 100644 --- a/taskflow/engines/action_engine/retry_action.py +++ b/taskflow/engines/action_engine/retry_action.py @@ -18,7 +18,7 @@ import logging from taskflow.engines.action_engine import executor as ex from taskflow import states -from taskflow.utils import async_utils +from taskflow.types import futures from taskflow.utils import misc LOG = logging.getLogger(__name__) @@ -31,6 +31,7 @@ class RetryAction(object): self._storage = storage self._notifier = notifier self._walker_factory = walker_factory + self._executor = futures.SynchronousExecutor() def _get_retry_args(self, retry): scope_walker = self._walker_factory(retry) @@ -59,29 +60,50 @@ class RetryAction(object): self._notifier.notify(state, details) def execute(self, retry): + + def _execute_retry(kwargs): + try: + result = retry.execute(**kwargs) + except Exception: + result = misc.Failure() + return (retry, ex.EXECUTED, result) + + def _on_done_callback(fut): + result = fut.result()[-1] + if isinstance(result, misc.Failure): + self.change_state(retry, states.FAILURE, result=result) + else: + self.change_state(retry, states.SUCCESS, result=result) + self.change_state(retry, states.RUNNING) - kwargs = self._get_retry_args(retry) - try: - result = retry.execute(**kwargs) - except Exception: - result = misc.Failure() - self.change_state(retry, states.FAILURE, result=result) - else: - self.change_state(retry, states.SUCCESS, result=result) - return async_utils.make_completed_future((retry, ex.EXECUTED, result)) + fut = self._executor.submit(_execute_retry, + self._get_retry_args(retry)) + fut.add_done_callback(_on_done_callback) + return fut def revert(self, retry): + + def _execute_retry(kwargs, failures): + kwargs['flow_failures'] = failures + try: + result = retry.revert(**kwargs) + except Exception: + result = misc.Failure() + return (retry, ex.REVERTED, result) + + def _on_done_callback(fut): + result = fut.result()[-1] + if isinstance(result, misc.Failure): + self.change_state(retry, states.FAILURE) + else: + self.change_state(retry, states.REVERTED) + self.change_state(retry, states.REVERTING) - kwargs = self._get_retry_args(retry) - kwargs['flow_failures'] = self._storage.get_failures() - try: - result = retry.revert(**kwargs) - except Exception: - result = misc.Failure() - self.change_state(retry, states.FAILURE) - else: - self.change_state(retry, states.REVERTED) - return async_utils.make_completed_future((retry, ex.REVERTED, result)) + fut = self._executor.submit(_execute_retry, + self._get_retry_args(retry), + self._storage.get_failures()) + fut.add_done_callback(_on_done_callback) + return fut def on_failure(self, retry, atom, last_failure): self._storage.save_retry_failure(retry.name, atom.name, last_failure) diff --git a/taskflow/examples/resume_vm_boot.py b/taskflow/examples/resume_vm_boot.py index 203cb8822..514f3336d 100644 --- a/taskflow/examples/resume_vm_boot.py +++ b/taskflow/examples/resume_vm_boot.py @@ -37,7 +37,8 @@ from taskflow.openstack.common import uuidutils from taskflow.patterns import graph_flow as gf from taskflow.patterns import linear_flow as lf from taskflow import task -from taskflow.utils import eventlet_utils as e_utils +from taskflow.types import futures +from taskflow.utils import async_utils from taskflow.utils import persistence_utils as p_utils import example_utils as eu # noqa @@ -236,8 +237,8 @@ with eu.get_backend() as backend: # Set up how we want our engine to run, serial, parallel... executor = None - if e_utils.EVENTLET_AVAILABLE: - executor = e_utils.GreenExecutor(5) + if async_utils.EVENTLET_AVAILABLE: + executor = futures.GreenThreadPoolExecutor(5) # Create/fetch a logbook that will track the workflows work. book = None diff --git a/taskflow/persistence/backends/impl_sqlalchemy.py b/taskflow/persistence/backends/impl_sqlalchemy.py index 587d4d25a..29ab8c97c 100644 --- a/taskflow/persistence/backends/impl_sqlalchemy.py +++ b/taskflow/persistence/backends/impl_sqlalchemy.py @@ -37,7 +37,7 @@ from taskflow.persistence.backends import base from taskflow.persistence.backends.sqlalchemy import migration from taskflow.persistence.backends.sqlalchemy import models from taskflow.persistence import logbook -from taskflow.utils import eventlet_utils +from taskflow.utils import async_utils from taskflow.utils import misc @@ -249,7 +249,7 @@ class SQLAlchemyBackend(base.Backend): engine_args.update(conf.pop('engine_args', {})) engine = sa.create_engine(sql_connection, **engine_args) checkin_yield = conf.pop('checkin_yield', - eventlet_utils.EVENTLET_AVAILABLE) + async_utils.EVENTLET_AVAILABLE) if _as_bool(checkin_yield): sa.event.listen(engine, 'checkin', _thread_yield) if 'mysql' in e_url.drivername: diff --git a/taskflow/tests/unit/test_engines.py b/taskflow/tests/unit/test_engines.py index 4ce291d19..0823002d6 100644 --- a/taskflow/tests/unit/test_engines.py +++ b/taskflow/tests/unit/test_engines.py @@ -17,7 +17,6 @@ import contextlib import threading -from concurrent import futures import testtools import taskflow.engines @@ -33,8 +32,9 @@ from taskflow import states from taskflow import task from taskflow import test from taskflow.tests import utils +from taskflow.types import futures from taskflow.types import graph as gr -from taskflow.utils import eventlet_utils as eu +from taskflow.utils import async_utils as au from taskflow.utils import misc from taskflow.utils import persistence_utils as p_utils @@ -590,7 +590,7 @@ class MultiThreadedEngineTest(EngineTaskTest, executor.shutdown(wait=True) -@testtools.skipIf(not eu.EVENTLET_AVAILABLE, 'eventlet is not available') +@testtools.skipIf(not au.EVENTLET_AVAILABLE, 'eventlet is not available') class ParallelEngineWithEventletTest(EngineTaskTest, EngineLinearFlowTest, EngineParallelFlowTest, @@ -601,7 +601,7 @@ class ParallelEngineWithEventletTest(EngineTaskTest, def _make_engine(self, flow, flow_detail=None, executor=None): if executor is None: - executor = eu.GreenExecutor() + executor = futures.GreenThreadPoolExecutor() return taskflow.engines.load(flow, flow_detail=flow_detail, backend=self.backend, engine='parallel', executor=executor) diff --git a/taskflow/tests/unit/test_futures.py b/taskflow/tests/unit/test_futures.py new file mode 100644 index 000000000..576b5eee8 --- /dev/null +++ b/taskflow/tests/unit/test_futures.py @@ -0,0 +1,222 @@ +# -*- coding: utf-8 -*- + +# Copyright (C) 2013 Yahoo! Inc. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import collections +import functools +import threading +import time + +import testtools + +from taskflow import test +from taskflow.types import futures + +try: + from eventlet.green import threading as greenthreading + from eventlet.green import time as greentime + EVENTLET_AVAILABLE = True +except ImportError: + EVENTLET_AVAILABLE = False + + +def _noop(): + pass + + +def _blowup(): + raise IOError("Broke!") + + +def _return_given(given): + return given + + +def _return_one(): + return 1 + + +def _double(x): + return x * 2 + + +class _SimpleFuturesTestMixin(object): + # This exists to test basic functionality, mainly required to test the + # process executor which has a very restricted set of things it can + # execute (no lambda functions, no instance methods...) + def _make_executor(self, max_workers): + raise NotImplementedError("Not implemented") + + def test_invalid_workers(self): + self.assertRaises(ValueError, self._make_executor, -1) + self.assertRaises(ValueError, self._make_executor, 0) + + def test_exception_transfer(self): + with self._make_executor(2) as e: + f = e.submit(_blowup) + self.assertRaises(IOError, f.result) + + def test_accumulator(self): + created = [] + with self._make_executor(5) as e: + for _i in range(0, 10): + created.append(e.submit(_return_one)) + results = [f.result() for f in created] + self.assertEqual(10, sum(results)) + + def test_map(self): + count = [i for i in range(0, 100)] + with self._make_executor(5) as e: + results = list(e.map(_double, count)) + initial = sum(count) + self.assertEqual(2 * initial, sum(results)) + + def test_alive(self): + e = self._make_executor(1) + self.assertTrue(e.alive) + e.shutdown() + self.assertFalse(e.alive) + with self._make_executor(1) as e2: + self.assertTrue(e2.alive) + self.assertFalse(e2.alive) + + +class _FuturesTestMixin(_SimpleFuturesTestMixin): + def _delay(self, secs): + raise NotImplementedError("Not implemented") + + def _make_lock(self): + raise NotImplementedError("Not implemented") + + def _make_funcs(self, called, amount): + mutator = self._make_lock() + + def store_call(ident): + with mutator: + called[ident] += 1 + + for i in range(0, amount): + yield functools.partial(store_call, ident=i) + + def test_func_calls(self): + called = collections.defaultdict(int) + + with self._make_executor(2) as e: + for f in self._make_funcs(called, 2): + e.submit(f) + + self.assertEqual(1, called[0]) + self.assertEqual(1, called[1]) + + def test_result_callback(self): + called = collections.defaultdict(int) + mutator = self._make_lock() + + def callback(future): + with mutator: + called[future] += 1 + + funcs = list(self._make_funcs(called, 1)) + with self._make_executor(2) as e: + for func in funcs: + f = e.submit(func) + f.add_done_callback(callback) + + self.assertEqual(2, len(called)) + + def test_result_transfer(self): + create_am = 50 + with self._make_executor(2) as e: + fs = [] + for i in range(0, create_am): + fs.append(e.submit(functools.partial(_return_given, i))) + self.assertEqual(create_am, len(fs)) + for i in range(0, create_am): + result = fs[i].result() + self.assertEqual(i, result) + + def test_called_restricted_size(self): + called = collections.defaultdict(int) + + with self._make_executor(1) as e: + for f in self._make_funcs(called, 100): + e.submit(f) + + self.assertFalse(e.alive) + self.assertEqual(100, len(called)) + + +class ThreadPoolExecutorTest(test.TestCase, _FuturesTestMixin): + def _make_executor(self, max_workers): + return futures.ThreadPoolExecutor(max_workers=max_workers) + + def _delay(self, secs): + time.sleep(secs) + + def _make_lock(self): + return threading.Lock() + + +class ProcessPoolExecutorTest(test.TestCase, _SimpleFuturesTestMixin): + def _make_executor(self, max_workers): + return futures.ProcessPoolExecutor(max_workers=max_workers) + + +class SynchronousExecutorTest(test.TestCase, _FuturesTestMixin): + def _make_executor(self, max_workers): + return futures.SynchronousExecutor() + + def _delay(self, secs): + time.sleep(secs) + + def _make_lock(self): + return threading.Lock() + + def test_invalid_workers(self): + pass + + +@testtools.skipIf(not EVENTLET_AVAILABLE, 'eventlet is not available') +class GreenThreadPoolExecutorTest(test.TestCase, _FuturesTestMixin): + def _make_executor(self, max_workers): + return futures.GreenThreadPoolExecutor(max_workers=max_workers) + + def _delay(self, secs): + greentime.sleep(secs) + + def _make_lock(self): + return greenthreading.Lock() + + def test_cancellation(self): + called = collections.defaultdict(int) + + fs = [] + with self._make_executor(2) as e: + for func in self._make_funcs(called, 2): + fs.append(e.submit(func)) + # Greenthreads don't start executing until we wait for them + # to, since nothing here does IO, this will work out correctly. + # + # If something here did a blocking call, then eventlet could swap + # one of the executors threads in, but nothing in this test does. + for f in fs: + self.assertFalse(f.running()) + f.cancel() + + self.assertEqual(0, len(called)) + self.assertEqual(2, len(fs)) + for f in fs: + self.assertTrue(f.cancelled()) + self.assertTrue(f.done()) diff --git a/taskflow/tests/unit/test_green_executor.py b/taskflow/tests/unit/test_green_executor.py deleted file mode 100644 index eae523dc8..000000000 --- a/taskflow/tests/unit/test_green_executor.py +++ /dev/null @@ -1,131 +0,0 @@ -# -*- coding: utf-8 -*- - -# Copyright (C) 2013 Yahoo! Inc. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -import collections -import functools - -import testtools - -from taskflow import test -from taskflow.utils import eventlet_utils as eu - - -@testtools.skipIf(not eu.EVENTLET_AVAILABLE, 'eventlet is not available') -class GreenExecutorTest(test.TestCase): - def make_funcs(self, called, amount): - - def store_call(name): - called[name] += 1 - - for i in range(0, amount): - yield functools.partial(store_call, name=i) - - def test_func_calls(self): - called = collections.defaultdict(int) - - with eu.GreenExecutor(2) as e: - for f in self.make_funcs(called, 2): - e.submit(f) - - self.assertEqual(1, called[0]) - self.assertEqual(1, called[1]) - - def test_no_construction(self): - self.assertRaises(ValueError, eu.GreenExecutor, 0) - self.assertRaises(ValueError, eu.GreenExecutor, -1) - self.assertRaises(ValueError, eu.GreenExecutor, "-1") - - def test_result_callback(self): - called = collections.defaultdict(int) - - def callback(future): - called[future] += 1 - - funcs = list(self.make_funcs(called, 1)) - with eu.GreenExecutor(2) as e: - for func in funcs: - f = e.submit(func) - f.add_done_callback(callback) - - self.assertEqual(2, len(called)) - - def test_exception_transfer(self): - - def blowup(): - raise IOError("Broke!") - - with eu.GreenExecutor(2) as e: - f = e.submit(blowup) - - self.assertRaises(IOError, f.result) - - def test_result_transfer(self): - - def return_given(given): - return given - - create_am = 50 - with eu.GreenExecutor(2) as e: - fs = [] - for i in range(0, create_am): - fs.append(e.submit(functools.partial(return_given, i))) - - self.assertEqual(create_am, len(fs)) - for i in range(0, create_am): - result = fs[i].result() - self.assertEqual(i, result) - - def test_called_restricted_size(self): - called = collections.defaultdict(int) - - with eu.GreenExecutor(1) as e: - for f in self.make_funcs(called, 100): - e.submit(f) - self.assertEqual(99, e.amount_delayed) - - self.assertFalse(e.alive) - self.assertEqual(100, len(called)) - self.assertGreaterEqual(1, e.workers_created) - self.assertEqual(0, e.amount_delayed) - - def test_shutdown_twice(self): - e = eu.GreenExecutor(1) - self.assertTrue(e.alive) - e.shutdown() - self.assertFalse(e.alive) - e.shutdown() - self.assertFalse(e.alive) - - def test_func_cancellation(self): - called = collections.defaultdict(int) - - fs = [] - with eu.GreenExecutor(2) as e: - for func in self.make_funcs(called, 2): - fs.append(e.submit(func)) - # Greenthreads don't start executing until we wait for them - # to, since nothing here does IO, this will work out correctly. - # - # If something here did a blocking call, then eventlet could swap - # one of the executors threads in, but nothing in this test does. - for f in fs: - self.assertFalse(f.running()) - f.cancel() - - self.assertEqual(0, len(called)) - for f in fs: - self.assertTrue(f.cancelled()) - self.assertTrue(f.done()) diff --git a/taskflow/tests/unit/test_suspend_flow.py b/taskflow/tests/unit/test_suspend_flow.py index 7b9875c9d..928f2bece 100644 --- a/taskflow/tests/unit/test_suspend_flow.py +++ b/taskflow/tests/unit/test_suspend_flow.py @@ -23,7 +23,8 @@ from taskflow.patterns import linear_flow as lf from taskflow import states from taskflow import test from taskflow.tests import utils -from taskflow.utils import eventlet_utils as eu +from taskflow.types import futures +from taskflow.utils import async_utils as au class SuspendingListener(lbase.ListenerBase): @@ -181,13 +182,13 @@ class MultiThreadedEngineTest(SuspendFlowTest, executor=executor) -@testtools.skipIf(not eu.EVENTLET_AVAILABLE, 'eventlet is not available') +@testtools.skipIf(not au.EVENTLET_AVAILABLE, 'eventlet is not available') class ParallelEngineWithEventletTest(SuspendFlowTest, test.TestCase): def _make_engine(self, flow, flow_detail=None, executor=None): if executor is None: - executor = eu.GreenExecutor() + executor = futures.GreenThreadPoolExecutor() return taskflow.engines.load(flow, flow_detail=flow_detail, engine='parallel', backend=self.backend, diff --git a/taskflow/tests/unit/test_utils_async_utils.py b/taskflow/tests/unit/test_utils_async_utils.py index 0abf41078..7bb033b8d 100644 --- a/taskflow/tests/unit/test_utils_async_utils.py +++ b/taskflow/tests/unit/test_utils_async_utils.py @@ -14,12 +14,11 @@ # License for the specific language governing permissions and limitations # under the License. -from concurrent import futures import testtools from taskflow import test +from taskflow.types import futures from taskflow.utils import async_utils as au -from taskflow.utils import eventlet_utils as eu class WaitForAnyTestsMixin(object): @@ -29,7 +28,7 @@ class WaitForAnyTestsMixin(object): def foo(): pass - with self.executor_cls(2) as e: + with self._make_executor(2) as e: fs = [e.submit(foo), e.submit(foo)] # this test assumes that our foo will end within 10 seconds done, not_done = au.wait_for_any(fs, 10) @@ -53,34 +52,17 @@ class WaitForAnyTestsMixin(object): self.assertIs(done.pop(), f2) -@testtools.skipIf(not eu.EVENTLET_AVAILABLE, 'eventlet is not available') +@testtools.skipIf(not au.EVENTLET_AVAILABLE, 'eventlet is not available') class AsyncUtilsEventletTest(test.TestCase, WaitForAnyTestsMixin): - executor_cls = eu.GreenExecutor - is_green = True - - def test_add_result(self): - waiter = eu._GreenWaiter() - self.assertFalse(waiter.event.is_set()) - waiter.add_result(futures.Future()) - self.assertTrue(waiter.event.is_set()) - - def test_add_exception(self): - waiter = eu._GreenWaiter() - self.assertFalse(waiter.event.is_set()) - waiter.add_exception(futures.Future()) - self.assertTrue(waiter.event.is_set()) - - def test_add_cancelled(self): - waiter = eu._GreenWaiter() - self.assertFalse(waiter.event.is_set()) - waiter.add_cancelled(futures.Future()) - self.assertTrue(waiter.event.is_set()) + def _make_executor(self, max_workers): + return futures.GreenThreadPoolExecutor(max_workers=max_workers) class AsyncUtilsThreadedTest(test.TestCase, WaitForAnyTestsMixin): - executor_cls = futures.ThreadPoolExecutor + def _make_executor(self, max_workers): + return futures.ThreadPoolExecutor(max_workers=max_workers) class MakeCompletedFutureTest(test.TestCase): @@ -90,3 +72,16 @@ class MakeCompletedFutureTest(test.TestCase): future = au.make_completed_future(result) self.assertTrue(future.done()) self.assertIs(future.result(), result) + + def test_make_completed_future_exception(self): + result = IOError("broken") + future = au.make_completed_future(result, exception=True) + self.assertTrue(future.done()) + self.assertRaises(IOError, future.result) + self.assertIsNotNone(future.exception()) + + +class AsyncUtilsSynchronousTest(test.TestCase, + WaitForAnyTestsMixin): + def _make_executor(self, max_workers): + return futures.SynchronousExecutor() diff --git a/taskflow/utils/eventlet_utils.py b/taskflow/types/futures.py similarity index 51% rename from taskflow/utils/eventlet_utils.py rename to taskflow/types/futures.py index 335dd0179..194730e55 100644 --- a/taskflow/utils/eventlet_utils.py +++ b/taskflow/types/futures.py @@ -1,6 +1,6 @@ # -*- coding: utf-8 -*- -# Copyright (C) 2013 Yahoo! Inc. All Rights Reserved. +# Copyright (C) 2014 Yahoo! Inc. All Rights Reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); you may # not use this file except in compliance with the License. You may obtain @@ -14,9 +14,9 @@ # License for the specific language governing permissions and limitations # under the License. -import logging - -from concurrent import futures +from concurrent import futures as _futures +from concurrent.futures import process as _process +from concurrent.futures import thread as _thread try: from eventlet.green import threading as greenthreading @@ -27,15 +27,45 @@ try: except ImportError: EVENTLET_AVAILABLE = False +from taskflow.utils import threading_utils as tu -from taskflow.utils import lock_utils -LOG = logging.getLogger(__name__) +# NOTE(harlowja): Allows for simpler access to this type... +Future = _futures.Future -_DONE_STATES = frozenset([ - futures._base.CANCELLED_AND_NOTIFIED, - futures._base.FINISHED, -]) + +class ThreadPoolExecutor(_thread.ThreadPoolExecutor): + """Executor that uses a thread pool to execute calls asynchronously. + + See: https://docs.python.org/dev/library/concurrent.futures.html + """ + def __init__(self, max_workers=None): + if max_workers is None: + max_workers = tu.get_optimal_thread_count() + super(ThreadPoolExecutor, self).__init__(max_workers=max_workers) + if self._max_workers <= 0: + raise ValueError("Max workers must be greater than zero") + + @property + def alive(self): + return not self._shutdown + + +class ProcessPoolExecutor(_process.ProcessPoolExecutor): + """Executor that uses a process pool to execute calls asynchronously. + + See: https://docs.python.org/dev/library/concurrent.futures.html + """ + def __init__(self, max_workers=None): + if max_workers is None: + max_workers = tu.get_optimal_thread_count() + super(ProcessPoolExecutor, self).__init__(max_workers=max_workers) + if self._max_workers <= 0: + raise ValueError("Max workers must be greater than zero") + + @property + def alive(self): + return not self._shutdown_thread class _WorkItem(object): @@ -56,7 +86,36 @@ class _WorkItem(object): self.future.set_result(result) -class _Worker(object): +class SynchronousExecutor(_futures.Executor): + """Executor that uses the caller to execute calls synchronously. + + This provides an interface to a caller that looks like an executor but + will execute the calls inside the caller thread instead of executing it + in a external process/thread for when this type of functionality is + useful to provide... + """ + + def __init__(self): + self._shutoff = False + + @property + def alive(self): + return not self._shutoff + + def shutdown(self, wait=True): + self._shutoff = True + + def submit(self, fn, *args, **kwargs): + if self._shutoff: + raise RuntimeError('Can not schedule new futures' + ' after being shutdown') + f = Future() + runner = _WorkItem(f, fn, args, kwargs) + runner.run() + return f + + +class _GreenWorker(object): def __init__(self, executor, work, work_queue): self.executor = executor self.work = work @@ -82,7 +141,7 @@ class _Worker(object): self.work_queue.task_done() -class GreenFuture(futures.Future): +class GreenFuture(Future): def __init__(self): super(GreenFuture, self).__init__() assert EVENTLET_AVAILABLE, 'eventlet is needed to use a green future' @@ -95,96 +154,53 @@ class GreenFuture(futures.Future): self._condition = greenthreading.Condition() -class GreenExecutor(futures.Executor): - """A greenthread backed executor.""" +class GreenThreadPoolExecutor(_futures.Executor): + """Executor that uses a green thread pool to execute calls asynchronously. + + See: https://docs.python.org/dev/library/concurrent.futures.html + and http://eventlet.net/doc/modules/greenpool.html for information on + how this works. + """ def __init__(self, max_workers=1000): assert EVENTLET_AVAILABLE, 'eventlet is needed to use a green executor' - self._max_workers = int(max_workers) - if self._max_workers <= 0: - raise ValueError('Max workers must be greater than zero') + if max_workers <= 0: + raise ValueError("Max workers must be greater than zero") + self._max_workers = max_workers self._pool = greenpool.GreenPool(self._max_workers) self._delayed_work = greenqueue.Queue() self._shutdown_lock = greenthreading.Lock() self._shutdown = False - self._workers_created = 0 - - @property - def workers_created(self): - return self._workers_created - - @property - def amount_delayed(self): - return self._delayed_work.qsize() @property def alive(self): return not self._shutdown - @lock_utils.locked(lock='_shutdown_lock') def submit(self, fn, *args, **kwargs): - if self._shutdown: - raise RuntimeError('cannot schedule new futures after shutdown') - f = GreenFuture() - work = _WorkItem(f, fn, args, kwargs) - if not self._spin_up(work): - self._delayed_work.put(work) - return f + with self._shutdown_lock: + if self._shutdown: + raise RuntimeError('Can not schedule new futures' + ' after being shutdown') + f = GreenFuture() + work = _WorkItem(f, fn, args, kwargs) + if not self._spin_up(work): + self._delayed_work.put(work) + return f def _spin_up(self, work): alive = self._pool.running() + self._pool.waiting() if alive < self._max_workers: - self._pool.spawn_n(_Worker(self, work, self._delayed_work)) - self._workers_created += 1 + self._pool.spawn_n(_GreenWorker(self, work, self._delayed_work)) return True return False def shutdown(self, wait=True): with self._shutdown_lock: - self._shutdown = True - if wait: + if not self._shutdown: + self._shutdown = True + shutoff = True + else: + shutoff = False + if wait and shutoff: self._pool.waitall() self._delayed_work.join() - - -class _GreenWaiter(object): - """Provides the event that wait_for_any() blocks on.""" - def __init__(self): - self.event = greenthreading.Event() - - def add_result(self, future): - self.event.set() - - def add_exception(self, future): - self.event.set() - - def add_cancelled(self, future): - self.event.set() - - -def _partition_futures(fs): - """Partitions the input futures into done and not done lists.""" - done = set() - not_done = set() - for f in fs: - if f._state in _DONE_STATES: - done.add(f) - else: - not_done.add(f) - return (done, not_done) - - -def wait_for_any(fs, timeout=None): - assert EVENTLET_AVAILABLE, ('eventlet is needed to wait on green futures') - with futures._base._AcquireFutures(fs): - (done, not_done) = _partition_futures(fs) - if done: - return (done, not_done) - waiter = _GreenWaiter() - for f in fs: - f._waiters.append(waiter) - waiter.event.wait(timeout) - for f in fs: - f._waiters.remove(waiter) - with futures._base._AcquireFutures(fs): - return _partition_futures(fs) diff --git a/taskflow/utils/async_utils.py b/taskflow/utils/async_utils.py index 6d280dfef..b055a27bd 100644 --- a/taskflow/utils/async_utils.py +++ b/taskflow/utils/async_utils.py @@ -14,9 +14,32 @@ # License for the specific language governing permissions and limitations # under the License. -from concurrent import futures +from concurrent import futures as _futures +from concurrent.futures import _base -from taskflow.utils import eventlet_utils as eu +try: + from eventlet.green import threading as greenthreading + EVENTLET_AVAILABLE = True +except ImportError: + EVENTLET_AVAILABLE = False + +from taskflow.types import futures + + +_DONE_STATES = frozenset([ + _base.CANCELLED_AND_NOTIFIED, + _base.FINISHED, +]) + + +def make_completed_future(result, exception=False): + """Make a future completed with a given result.""" + future = futures.Future() + if exception: + future.set_exception(result) + else: + future.set_result(result) + return future def wait_for_any(fs, timeout=None): @@ -29,10 +52,10 @@ def wait_for_any(fs, timeout=None): Returns pair (done futures, not done futures). """ - green_fs = sum(1 for f in fs if isinstance(f, eu.GreenFuture)) + green_fs = sum(1 for f in fs if isinstance(f, futures.GreenFuture)) if not green_fs: - return tuple(futures.wait(fs, timeout=timeout, - return_when=futures.FIRST_COMPLETED)) + return tuple(_futures.wait(fs, timeout=timeout, + return_when=_futures.FIRST_COMPLETED)) else: non_green_fs = len(fs) - green_fs if non_green_fs: @@ -40,11 +63,48 @@ def wait_for_any(fs, timeout=None): " non-green futures in the same `wait_for_any`" " call" % (green_fs, non_green_fs)) else: - return eu.wait_for_any(fs, timeout=timeout) + return _wait_for_any_green(fs, timeout=timeout) -def make_completed_future(result): - """Make with completed with given result.""" - future = futures.Future() - future.set_result(result) - return future +class _GreenWaiter(object): + """Provides the event that wait_for_any() blocks on.""" + def __init__(self): + self.event = greenthreading.Event() + + def add_result(self, future): + self.event.set() + + def add_exception(self, future): + self.event.set() + + def add_cancelled(self, future): + self.event.set() + + +def _wait_for_any_green(fs, timeout=None): + assert EVENTLET_AVAILABLE, 'eventlet is needed to wait on green futures' + + def _partition_futures(fs): + done = set() + not_done = set() + for f in fs: + if f._state in _DONE_STATES: + done.add(f) + else: + not_done.add(f) + return (done, not_done) + + with _base._AcquireFutures(fs): + (done, not_done) = _partition_futures(fs) + if done: + return (done, not_done) + waiter = _GreenWaiter() + for f in fs: + f._waiters.append(waiter) + + waiter.event.wait(timeout) + for f in fs: + f._waiters.remove(waiter) + + with _base._AcquireFutures(fs): + return _partition_futures(fs)