Add a futures type that can unify our future functionality

Move the currently existing green future executor and associated
code to a new futures types module so that it can be accessed from
this new location (TODO: deprecate the old location and link the
old to the new for one release so that we can remove the old link
in N + 1 release).

This unifies the API that the existing pool (thread or process) future
executors and the green thread pool future executor, and the newly added
synchronous executor (replacing the previous `make_completed_future`
function) provide so there usage is as seamless as possible.

Part of blueprint top-level-types

Change-Id: Ie5500eaa7f4425edb604b2dd13a15f82909a673b
This commit is contained in:
Joshua Harlow 2014-08-23 20:09:10 -07:00
parent c95a681a9f
commit b014fc7d48
12 changed files with 480 additions and 288 deletions

View File

@ -17,6 +17,11 @@ FSM
.. automodule:: taskflow.types.fsm .. automodule:: taskflow.types.fsm
Futures
=======
.. automodule:: taskflow.types.futures
Graph Graph
===== =====

View File

@ -16,10 +16,10 @@
import abc import abc
from concurrent import futures
import six import six
from taskflow import task as _task from taskflow import task as _task
from taskflow.types import futures
from taskflow.utils import async_utils from taskflow.utils import async_utils
from taskflow.utils import misc from taskflow.utils import misc
from taskflow.utils import threading_utils from taskflow.utils import threading_utils
@ -94,19 +94,20 @@ class TaskExecutorBase(object):
class SerialTaskExecutor(TaskExecutorBase): class SerialTaskExecutor(TaskExecutorBase):
"""Execute task one after another.""" """Execute task one after another."""
def __init__(self):
self._executor = futures.SynchronousExecutor()
def execute_task(self, task, task_uuid, arguments, progress_callback=None): def execute_task(self, task, task_uuid, arguments, progress_callback=None):
return async_utils.make_completed_future( return self._executor.submit(_execute_task, task, arguments,
_execute_task(task, arguments, progress_callback)) progress_callback)
def revert_task(self, task, task_uuid, arguments, result, failures, def revert_task(self, task, task_uuid, arguments, result, failures,
progress_callback=None): progress_callback=None):
return async_utils.make_completed_future( return self._executor.submit(_revert_task, task, arguments, result,
_revert_task(task, arguments, result, failures, progress_callback)
failures, progress_callback))
def wait_for_any(self, fs, timeout=None): def wait_for_any(self, fs, timeout=None):
# NOTE(imelnikov): this executor returns only done futures. return async_utils.wait_for_any(fs, timeout)
return (fs, set())
class ParallelTaskExecutor(TaskExecutorBase): class ParallelTaskExecutor(TaskExecutorBase):

View File

@ -18,7 +18,7 @@ import logging
from taskflow.engines.action_engine import executor as ex from taskflow.engines.action_engine import executor as ex
from taskflow import states from taskflow import states
from taskflow.utils import async_utils from taskflow.types import futures
from taskflow.utils import misc from taskflow.utils import misc
LOG = logging.getLogger(__name__) LOG = logging.getLogger(__name__)
@ -31,6 +31,7 @@ class RetryAction(object):
self._storage = storage self._storage = storage
self._notifier = notifier self._notifier = notifier
self._walker_factory = walker_factory self._walker_factory = walker_factory
self._executor = futures.SynchronousExecutor()
def _get_retry_args(self, retry): def _get_retry_args(self, retry):
scope_walker = self._walker_factory(retry) scope_walker = self._walker_factory(retry)
@ -59,29 +60,50 @@ class RetryAction(object):
self._notifier.notify(state, details) self._notifier.notify(state, details)
def execute(self, retry): def execute(self, retry):
self.change_state(retry, states.RUNNING)
kwargs = self._get_retry_args(retry) def _execute_retry(kwargs):
try: try:
result = retry.execute(**kwargs) result = retry.execute(**kwargs)
except Exception: except Exception:
result = misc.Failure() result = misc.Failure()
return (retry, ex.EXECUTED, result)
def _on_done_callback(fut):
result = fut.result()[-1]
if isinstance(result, misc.Failure):
self.change_state(retry, states.FAILURE, result=result) self.change_state(retry, states.FAILURE, result=result)
else: else:
self.change_state(retry, states.SUCCESS, result=result) self.change_state(retry, states.SUCCESS, result=result)
return async_utils.make_completed_future((retry, ex.EXECUTED, result))
self.change_state(retry, states.RUNNING)
fut = self._executor.submit(_execute_retry,
self._get_retry_args(retry))
fut.add_done_callback(_on_done_callback)
return fut
def revert(self, retry): def revert(self, retry):
self.change_state(retry, states.REVERTING)
kwargs = self._get_retry_args(retry) def _execute_retry(kwargs, failures):
kwargs['flow_failures'] = self._storage.get_failures() kwargs['flow_failures'] = failures
try: try:
result = retry.revert(**kwargs) result = retry.revert(**kwargs)
except Exception: except Exception:
result = misc.Failure() result = misc.Failure()
return (retry, ex.REVERTED, result)
def _on_done_callback(fut):
result = fut.result()[-1]
if isinstance(result, misc.Failure):
self.change_state(retry, states.FAILURE) self.change_state(retry, states.FAILURE)
else: else:
self.change_state(retry, states.REVERTED) self.change_state(retry, states.REVERTED)
return async_utils.make_completed_future((retry, ex.REVERTED, result))
self.change_state(retry, states.REVERTING)
fut = self._executor.submit(_execute_retry,
self._get_retry_args(retry),
self._storage.get_failures())
fut.add_done_callback(_on_done_callback)
return fut
def on_failure(self, retry, atom, last_failure): def on_failure(self, retry, atom, last_failure):
self._storage.save_retry_failure(retry.name, atom.name, last_failure) self._storage.save_retry_failure(retry.name, atom.name, last_failure)

View File

@ -37,7 +37,8 @@ from taskflow.openstack.common import uuidutils
from taskflow.patterns import graph_flow as gf from taskflow.patterns import graph_flow as gf
from taskflow.patterns import linear_flow as lf from taskflow.patterns import linear_flow as lf
from taskflow import task from taskflow import task
from taskflow.utils import eventlet_utils as e_utils from taskflow.types import futures
from taskflow.utils import async_utils
from taskflow.utils import persistence_utils as p_utils from taskflow.utils import persistence_utils as p_utils
import example_utils as eu # noqa import example_utils as eu # noqa
@ -236,8 +237,8 @@ with eu.get_backend() as backend:
# Set up how we want our engine to run, serial, parallel... # Set up how we want our engine to run, serial, parallel...
executor = None executor = None
if e_utils.EVENTLET_AVAILABLE: if async_utils.EVENTLET_AVAILABLE:
executor = e_utils.GreenExecutor(5) executor = futures.GreenThreadPoolExecutor(5)
# Create/fetch a logbook that will track the workflows work. # Create/fetch a logbook that will track the workflows work.
book = None book = None

View File

@ -37,7 +37,7 @@ from taskflow.persistence.backends import base
from taskflow.persistence.backends.sqlalchemy import migration from taskflow.persistence.backends.sqlalchemy import migration
from taskflow.persistence.backends.sqlalchemy import models from taskflow.persistence.backends.sqlalchemy import models
from taskflow.persistence import logbook from taskflow.persistence import logbook
from taskflow.utils import eventlet_utils from taskflow.utils import async_utils
from taskflow.utils import misc from taskflow.utils import misc
@ -249,7 +249,7 @@ class SQLAlchemyBackend(base.Backend):
engine_args.update(conf.pop('engine_args', {})) engine_args.update(conf.pop('engine_args', {}))
engine = sa.create_engine(sql_connection, **engine_args) engine = sa.create_engine(sql_connection, **engine_args)
checkin_yield = conf.pop('checkin_yield', checkin_yield = conf.pop('checkin_yield',
eventlet_utils.EVENTLET_AVAILABLE) async_utils.EVENTLET_AVAILABLE)
if _as_bool(checkin_yield): if _as_bool(checkin_yield):
sa.event.listen(engine, 'checkin', _thread_yield) sa.event.listen(engine, 'checkin', _thread_yield)
if 'mysql' in e_url.drivername: if 'mysql' in e_url.drivername:

View File

@ -17,7 +17,6 @@
import contextlib import contextlib
import threading import threading
from concurrent import futures
import testtools import testtools
import taskflow.engines import taskflow.engines
@ -33,8 +32,9 @@ from taskflow import states
from taskflow import task from taskflow import task
from taskflow import test from taskflow import test
from taskflow.tests import utils from taskflow.tests import utils
from taskflow.types import futures
from taskflow.types import graph as gr from taskflow.types import graph as gr
from taskflow.utils import eventlet_utils as eu from taskflow.utils import async_utils as au
from taskflow.utils import misc from taskflow.utils import misc
from taskflow.utils import persistence_utils as p_utils from taskflow.utils import persistence_utils as p_utils
@ -590,7 +590,7 @@ class MultiThreadedEngineTest(EngineTaskTest,
executor.shutdown(wait=True) executor.shutdown(wait=True)
@testtools.skipIf(not eu.EVENTLET_AVAILABLE, 'eventlet is not available') @testtools.skipIf(not au.EVENTLET_AVAILABLE, 'eventlet is not available')
class ParallelEngineWithEventletTest(EngineTaskTest, class ParallelEngineWithEventletTest(EngineTaskTest,
EngineLinearFlowTest, EngineLinearFlowTest,
EngineParallelFlowTest, EngineParallelFlowTest,
@ -601,7 +601,7 @@ class ParallelEngineWithEventletTest(EngineTaskTest,
def _make_engine(self, flow, flow_detail=None, executor=None): def _make_engine(self, flow, flow_detail=None, executor=None):
if executor is None: if executor is None:
executor = eu.GreenExecutor() executor = futures.GreenThreadPoolExecutor()
return taskflow.engines.load(flow, flow_detail=flow_detail, return taskflow.engines.load(flow, flow_detail=flow_detail,
backend=self.backend, engine='parallel', backend=self.backend, engine='parallel',
executor=executor) executor=executor)

View File

@ -0,0 +1,222 @@
# -*- coding: utf-8 -*-
# Copyright (C) 2013 Yahoo! Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import collections
import functools
import threading
import time
import testtools
from taskflow import test
from taskflow.types import futures
try:
from eventlet.green import threading as greenthreading
from eventlet.green import time as greentime
EVENTLET_AVAILABLE = True
except ImportError:
EVENTLET_AVAILABLE = False
def _noop():
pass
def _blowup():
raise IOError("Broke!")
def _return_given(given):
return given
def _return_one():
return 1
def _double(x):
return x * 2
class _SimpleFuturesTestMixin(object):
# This exists to test basic functionality, mainly required to test the
# process executor which has a very restricted set of things it can
# execute (no lambda functions, no instance methods...)
def _make_executor(self, max_workers):
raise NotImplementedError("Not implemented")
def test_invalid_workers(self):
self.assertRaises(ValueError, self._make_executor, -1)
self.assertRaises(ValueError, self._make_executor, 0)
def test_exception_transfer(self):
with self._make_executor(2) as e:
f = e.submit(_blowup)
self.assertRaises(IOError, f.result)
def test_accumulator(self):
created = []
with self._make_executor(5) as e:
for _i in range(0, 10):
created.append(e.submit(_return_one))
results = [f.result() for f in created]
self.assertEqual(10, sum(results))
def test_map(self):
count = [i for i in range(0, 100)]
with self._make_executor(5) as e:
results = list(e.map(_double, count))
initial = sum(count)
self.assertEqual(2 * initial, sum(results))
def test_alive(self):
e = self._make_executor(1)
self.assertTrue(e.alive)
e.shutdown()
self.assertFalse(e.alive)
with self._make_executor(1) as e2:
self.assertTrue(e2.alive)
self.assertFalse(e2.alive)
class _FuturesTestMixin(_SimpleFuturesTestMixin):
def _delay(self, secs):
raise NotImplementedError("Not implemented")
def _make_lock(self):
raise NotImplementedError("Not implemented")
def _make_funcs(self, called, amount):
mutator = self._make_lock()
def store_call(ident):
with mutator:
called[ident] += 1
for i in range(0, amount):
yield functools.partial(store_call, ident=i)
def test_func_calls(self):
called = collections.defaultdict(int)
with self._make_executor(2) as e:
for f in self._make_funcs(called, 2):
e.submit(f)
self.assertEqual(1, called[0])
self.assertEqual(1, called[1])
def test_result_callback(self):
called = collections.defaultdict(int)
mutator = self._make_lock()
def callback(future):
with mutator:
called[future] += 1
funcs = list(self._make_funcs(called, 1))
with self._make_executor(2) as e:
for func in funcs:
f = e.submit(func)
f.add_done_callback(callback)
self.assertEqual(2, len(called))
def test_result_transfer(self):
create_am = 50
with self._make_executor(2) as e:
fs = []
for i in range(0, create_am):
fs.append(e.submit(functools.partial(_return_given, i)))
self.assertEqual(create_am, len(fs))
for i in range(0, create_am):
result = fs[i].result()
self.assertEqual(i, result)
def test_called_restricted_size(self):
called = collections.defaultdict(int)
with self._make_executor(1) as e:
for f in self._make_funcs(called, 100):
e.submit(f)
self.assertFalse(e.alive)
self.assertEqual(100, len(called))
class ThreadPoolExecutorTest(test.TestCase, _FuturesTestMixin):
def _make_executor(self, max_workers):
return futures.ThreadPoolExecutor(max_workers=max_workers)
def _delay(self, secs):
time.sleep(secs)
def _make_lock(self):
return threading.Lock()
class ProcessPoolExecutorTest(test.TestCase, _SimpleFuturesTestMixin):
def _make_executor(self, max_workers):
return futures.ProcessPoolExecutor(max_workers=max_workers)
class SynchronousExecutorTest(test.TestCase, _FuturesTestMixin):
def _make_executor(self, max_workers):
return futures.SynchronousExecutor()
def _delay(self, secs):
time.sleep(secs)
def _make_lock(self):
return threading.Lock()
def test_invalid_workers(self):
pass
@testtools.skipIf(not EVENTLET_AVAILABLE, 'eventlet is not available')
class GreenThreadPoolExecutorTest(test.TestCase, _FuturesTestMixin):
def _make_executor(self, max_workers):
return futures.GreenThreadPoolExecutor(max_workers=max_workers)
def _delay(self, secs):
greentime.sleep(secs)
def _make_lock(self):
return greenthreading.Lock()
def test_cancellation(self):
called = collections.defaultdict(int)
fs = []
with self._make_executor(2) as e:
for func in self._make_funcs(called, 2):
fs.append(e.submit(func))
# Greenthreads don't start executing until we wait for them
# to, since nothing here does IO, this will work out correctly.
#
# If something here did a blocking call, then eventlet could swap
# one of the executors threads in, but nothing in this test does.
for f in fs:
self.assertFalse(f.running())
f.cancel()
self.assertEqual(0, len(called))
self.assertEqual(2, len(fs))
for f in fs:
self.assertTrue(f.cancelled())
self.assertTrue(f.done())

View File

@ -1,131 +0,0 @@
# -*- coding: utf-8 -*-
# Copyright (C) 2013 Yahoo! Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import collections
import functools
import testtools
from taskflow import test
from taskflow.utils import eventlet_utils as eu
@testtools.skipIf(not eu.EVENTLET_AVAILABLE, 'eventlet is not available')
class GreenExecutorTest(test.TestCase):
def make_funcs(self, called, amount):
def store_call(name):
called[name] += 1
for i in range(0, amount):
yield functools.partial(store_call, name=i)
def test_func_calls(self):
called = collections.defaultdict(int)
with eu.GreenExecutor(2) as e:
for f in self.make_funcs(called, 2):
e.submit(f)
self.assertEqual(1, called[0])
self.assertEqual(1, called[1])
def test_no_construction(self):
self.assertRaises(ValueError, eu.GreenExecutor, 0)
self.assertRaises(ValueError, eu.GreenExecutor, -1)
self.assertRaises(ValueError, eu.GreenExecutor, "-1")
def test_result_callback(self):
called = collections.defaultdict(int)
def callback(future):
called[future] += 1
funcs = list(self.make_funcs(called, 1))
with eu.GreenExecutor(2) as e:
for func in funcs:
f = e.submit(func)
f.add_done_callback(callback)
self.assertEqual(2, len(called))
def test_exception_transfer(self):
def blowup():
raise IOError("Broke!")
with eu.GreenExecutor(2) as e:
f = e.submit(blowup)
self.assertRaises(IOError, f.result)
def test_result_transfer(self):
def return_given(given):
return given
create_am = 50
with eu.GreenExecutor(2) as e:
fs = []
for i in range(0, create_am):
fs.append(e.submit(functools.partial(return_given, i)))
self.assertEqual(create_am, len(fs))
for i in range(0, create_am):
result = fs[i].result()
self.assertEqual(i, result)
def test_called_restricted_size(self):
called = collections.defaultdict(int)
with eu.GreenExecutor(1) as e:
for f in self.make_funcs(called, 100):
e.submit(f)
self.assertEqual(99, e.amount_delayed)
self.assertFalse(e.alive)
self.assertEqual(100, len(called))
self.assertGreaterEqual(1, e.workers_created)
self.assertEqual(0, e.amount_delayed)
def test_shutdown_twice(self):
e = eu.GreenExecutor(1)
self.assertTrue(e.alive)
e.shutdown()
self.assertFalse(e.alive)
e.shutdown()
self.assertFalse(e.alive)
def test_func_cancellation(self):
called = collections.defaultdict(int)
fs = []
with eu.GreenExecutor(2) as e:
for func in self.make_funcs(called, 2):
fs.append(e.submit(func))
# Greenthreads don't start executing until we wait for them
# to, since nothing here does IO, this will work out correctly.
#
# If something here did a blocking call, then eventlet could swap
# one of the executors threads in, but nothing in this test does.
for f in fs:
self.assertFalse(f.running())
f.cancel()
self.assertEqual(0, len(called))
for f in fs:
self.assertTrue(f.cancelled())
self.assertTrue(f.done())

View File

@ -23,7 +23,8 @@ from taskflow.patterns import linear_flow as lf
from taskflow import states from taskflow import states
from taskflow import test from taskflow import test
from taskflow.tests import utils from taskflow.tests import utils
from taskflow.utils import eventlet_utils as eu from taskflow.types import futures
from taskflow.utils import async_utils as au
class SuspendingListener(lbase.ListenerBase): class SuspendingListener(lbase.ListenerBase):
@ -181,13 +182,13 @@ class MultiThreadedEngineTest(SuspendFlowTest,
executor=executor) executor=executor)
@testtools.skipIf(not eu.EVENTLET_AVAILABLE, 'eventlet is not available') @testtools.skipIf(not au.EVENTLET_AVAILABLE, 'eventlet is not available')
class ParallelEngineWithEventletTest(SuspendFlowTest, class ParallelEngineWithEventletTest(SuspendFlowTest,
test.TestCase): test.TestCase):
def _make_engine(self, flow, flow_detail=None, executor=None): def _make_engine(self, flow, flow_detail=None, executor=None):
if executor is None: if executor is None:
executor = eu.GreenExecutor() executor = futures.GreenThreadPoolExecutor()
return taskflow.engines.load(flow, flow_detail=flow_detail, return taskflow.engines.load(flow, flow_detail=flow_detail,
engine='parallel', engine='parallel',
backend=self.backend, backend=self.backend,

View File

@ -14,12 +14,11 @@
# License for the specific language governing permissions and limitations # License for the specific language governing permissions and limitations
# under the License. # under the License.
from concurrent import futures
import testtools import testtools
from taskflow import test from taskflow import test
from taskflow.types import futures
from taskflow.utils import async_utils as au from taskflow.utils import async_utils as au
from taskflow.utils import eventlet_utils as eu
class WaitForAnyTestsMixin(object): class WaitForAnyTestsMixin(object):
@ -29,7 +28,7 @@ class WaitForAnyTestsMixin(object):
def foo(): def foo():
pass pass
with self.executor_cls(2) as e: with self._make_executor(2) as e:
fs = [e.submit(foo), e.submit(foo)] fs = [e.submit(foo), e.submit(foo)]
# this test assumes that our foo will end within 10 seconds # this test assumes that our foo will end within 10 seconds
done, not_done = au.wait_for_any(fs, 10) done, not_done = au.wait_for_any(fs, 10)
@ -53,34 +52,17 @@ class WaitForAnyTestsMixin(object):
self.assertIs(done.pop(), f2) self.assertIs(done.pop(), f2)
@testtools.skipIf(not eu.EVENTLET_AVAILABLE, 'eventlet is not available') @testtools.skipIf(not au.EVENTLET_AVAILABLE, 'eventlet is not available')
class AsyncUtilsEventletTest(test.TestCase, class AsyncUtilsEventletTest(test.TestCase,
WaitForAnyTestsMixin): WaitForAnyTestsMixin):
executor_cls = eu.GreenExecutor def _make_executor(self, max_workers):
is_green = True return futures.GreenThreadPoolExecutor(max_workers=max_workers)
def test_add_result(self):
waiter = eu._GreenWaiter()
self.assertFalse(waiter.event.is_set())
waiter.add_result(futures.Future())
self.assertTrue(waiter.event.is_set())
def test_add_exception(self):
waiter = eu._GreenWaiter()
self.assertFalse(waiter.event.is_set())
waiter.add_exception(futures.Future())
self.assertTrue(waiter.event.is_set())
def test_add_cancelled(self):
waiter = eu._GreenWaiter()
self.assertFalse(waiter.event.is_set())
waiter.add_cancelled(futures.Future())
self.assertTrue(waiter.event.is_set())
class AsyncUtilsThreadedTest(test.TestCase, class AsyncUtilsThreadedTest(test.TestCase,
WaitForAnyTestsMixin): WaitForAnyTestsMixin):
executor_cls = futures.ThreadPoolExecutor def _make_executor(self, max_workers):
return futures.ThreadPoolExecutor(max_workers=max_workers)
class MakeCompletedFutureTest(test.TestCase): class MakeCompletedFutureTest(test.TestCase):
@ -90,3 +72,16 @@ class MakeCompletedFutureTest(test.TestCase):
future = au.make_completed_future(result) future = au.make_completed_future(result)
self.assertTrue(future.done()) self.assertTrue(future.done())
self.assertIs(future.result(), result) self.assertIs(future.result(), result)
def test_make_completed_future_exception(self):
result = IOError("broken")
future = au.make_completed_future(result, exception=True)
self.assertTrue(future.done())
self.assertRaises(IOError, future.result)
self.assertIsNotNone(future.exception())
class AsyncUtilsSynchronousTest(test.TestCase,
WaitForAnyTestsMixin):
def _make_executor(self, max_workers):
return futures.SynchronousExecutor()

View File

@ -1,6 +1,6 @@
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
# Copyright (C) 2013 Yahoo! Inc. All Rights Reserved. # Copyright (C) 2014 Yahoo! Inc. All Rights Reserved.
# #
# Licensed under the Apache License, Version 2.0 (the "License"); you may # Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain # not use this file except in compliance with the License. You may obtain
@ -14,9 +14,9 @@
# License for the specific language governing permissions and limitations # License for the specific language governing permissions and limitations
# under the License. # under the License.
import logging from concurrent import futures as _futures
from concurrent.futures import process as _process
from concurrent import futures from concurrent.futures import thread as _thread
try: try:
from eventlet.green import threading as greenthreading from eventlet.green import threading as greenthreading
@ -27,15 +27,45 @@ try:
except ImportError: except ImportError:
EVENTLET_AVAILABLE = False EVENTLET_AVAILABLE = False
from taskflow.utils import threading_utils as tu
from taskflow.utils import lock_utils
LOG = logging.getLogger(__name__) # NOTE(harlowja): Allows for simpler access to this type...
Future = _futures.Future
_DONE_STATES = frozenset([
futures._base.CANCELLED_AND_NOTIFIED, class ThreadPoolExecutor(_thread.ThreadPoolExecutor):
futures._base.FINISHED, """Executor that uses a thread pool to execute calls asynchronously.
])
See: https://docs.python.org/dev/library/concurrent.futures.html
"""
def __init__(self, max_workers=None):
if max_workers is None:
max_workers = tu.get_optimal_thread_count()
super(ThreadPoolExecutor, self).__init__(max_workers=max_workers)
if self._max_workers <= 0:
raise ValueError("Max workers must be greater than zero")
@property
def alive(self):
return not self._shutdown
class ProcessPoolExecutor(_process.ProcessPoolExecutor):
"""Executor that uses a process pool to execute calls asynchronously.
See: https://docs.python.org/dev/library/concurrent.futures.html
"""
def __init__(self, max_workers=None):
if max_workers is None:
max_workers = tu.get_optimal_thread_count()
super(ProcessPoolExecutor, self).__init__(max_workers=max_workers)
if self._max_workers <= 0:
raise ValueError("Max workers must be greater than zero")
@property
def alive(self):
return not self._shutdown_thread
class _WorkItem(object): class _WorkItem(object):
@ -56,7 +86,36 @@ class _WorkItem(object):
self.future.set_result(result) self.future.set_result(result)
class _Worker(object): class SynchronousExecutor(_futures.Executor):
"""Executor that uses the caller to execute calls synchronously.
This provides an interface to a caller that looks like an executor but
will execute the calls inside the caller thread instead of executing it
in a external process/thread for when this type of functionality is
useful to provide...
"""
def __init__(self):
self._shutoff = False
@property
def alive(self):
return not self._shutoff
def shutdown(self, wait=True):
self._shutoff = True
def submit(self, fn, *args, **kwargs):
if self._shutoff:
raise RuntimeError('Can not schedule new futures'
' after being shutdown')
f = Future()
runner = _WorkItem(f, fn, args, kwargs)
runner.run()
return f
class _GreenWorker(object):
def __init__(self, executor, work, work_queue): def __init__(self, executor, work, work_queue):
self.executor = executor self.executor = executor
self.work = work self.work = work
@ -82,7 +141,7 @@ class _Worker(object):
self.work_queue.task_done() self.work_queue.task_done()
class GreenFuture(futures.Future): class GreenFuture(Future):
def __init__(self): def __init__(self):
super(GreenFuture, self).__init__() super(GreenFuture, self).__init__()
assert EVENTLET_AVAILABLE, 'eventlet is needed to use a green future' assert EVENTLET_AVAILABLE, 'eventlet is needed to use a green future'
@ -95,36 +154,33 @@ class GreenFuture(futures.Future):
self._condition = greenthreading.Condition() self._condition = greenthreading.Condition()
class GreenExecutor(futures.Executor): class GreenThreadPoolExecutor(_futures.Executor):
"""A greenthread backed executor.""" """Executor that uses a green thread pool to execute calls asynchronously.
See: https://docs.python.org/dev/library/concurrent.futures.html
and http://eventlet.net/doc/modules/greenpool.html for information on
how this works.
"""
def __init__(self, max_workers=1000): def __init__(self, max_workers=1000):
assert EVENTLET_AVAILABLE, 'eventlet is needed to use a green executor' assert EVENTLET_AVAILABLE, 'eventlet is needed to use a green executor'
self._max_workers = int(max_workers) if max_workers <= 0:
if self._max_workers <= 0: raise ValueError("Max workers must be greater than zero")
raise ValueError('Max workers must be greater than zero') self._max_workers = max_workers
self._pool = greenpool.GreenPool(self._max_workers) self._pool = greenpool.GreenPool(self._max_workers)
self._delayed_work = greenqueue.Queue() self._delayed_work = greenqueue.Queue()
self._shutdown_lock = greenthreading.Lock() self._shutdown_lock = greenthreading.Lock()
self._shutdown = False self._shutdown = False
self._workers_created = 0
@property
def workers_created(self):
return self._workers_created
@property
def amount_delayed(self):
return self._delayed_work.qsize()
@property @property
def alive(self): def alive(self):
return not self._shutdown return not self._shutdown
@lock_utils.locked(lock='_shutdown_lock')
def submit(self, fn, *args, **kwargs): def submit(self, fn, *args, **kwargs):
with self._shutdown_lock:
if self._shutdown: if self._shutdown:
raise RuntimeError('cannot schedule new futures after shutdown') raise RuntimeError('Can not schedule new futures'
' after being shutdown')
f = GreenFuture() f = GreenFuture()
work = _WorkItem(f, fn, args, kwargs) work = _WorkItem(f, fn, args, kwargs)
if not self._spin_up(work): if not self._spin_up(work):
@ -134,57 +190,17 @@ class GreenExecutor(futures.Executor):
def _spin_up(self, work): def _spin_up(self, work):
alive = self._pool.running() + self._pool.waiting() alive = self._pool.running() + self._pool.waiting()
if alive < self._max_workers: if alive < self._max_workers:
self._pool.spawn_n(_Worker(self, work, self._delayed_work)) self._pool.spawn_n(_GreenWorker(self, work, self._delayed_work))
self._workers_created += 1
return True return True
return False return False
def shutdown(self, wait=True): def shutdown(self, wait=True):
with self._shutdown_lock: with self._shutdown_lock:
if not self._shutdown:
self._shutdown = True self._shutdown = True
if wait: shutoff = True
else:
shutoff = False
if wait and shutoff:
self._pool.waitall() self._pool.waitall()
self._delayed_work.join() self._delayed_work.join()
class _GreenWaiter(object):
"""Provides the event that wait_for_any() blocks on."""
def __init__(self):
self.event = greenthreading.Event()
def add_result(self, future):
self.event.set()
def add_exception(self, future):
self.event.set()
def add_cancelled(self, future):
self.event.set()
def _partition_futures(fs):
"""Partitions the input futures into done and not done lists."""
done = set()
not_done = set()
for f in fs:
if f._state in _DONE_STATES:
done.add(f)
else:
not_done.add(f)
return (done, not_done)
def wait_for_any(fs, timeout=None):
assert EVENTLET_AVAILABLE, ('eventlet is needed to wait on green futures')
with futures._base._AcquireFutures(fs):
(done, not_done) = _partition_futures(fs)
if done:
return (done, not_done)
waiter = _GreenWaiter()
for f in fs:
f._waiters.append(waiter)
waiter.event.wait(timeout)
for f in fs:
f._waiters.remove(waiter)
with futures._base._AcquireFutures(fs):
return _partition_futures(fs)

View File

@ -14,9 +14,32 @@
# License for the specific language governing permissions and limitations # License for the specific language governing permissions and limitations
# under the License. # under the License.
from concurrent import futures from concurrent import futures as _futures
from concurrent.futures import _base
from taskflow.utils import eventlet_utils as eu try:
from eventlet.green import threading as greenthreading
EVENTLET_AVAILABLE = True
except ImportError:
EVENTLET_AVAILABLE = False
from taskflow.types import futures
_DONE_STATES = frozenset([
_base.CANCELLED_AND_NOTIFIED,
_base.FINISHED,
])
def make_completed_future(result, exception=False):
"""Make a future completed with a given result."""
future = futures.Future()
if exception:
future.set_exception(result)
else:
future.set_result(result)
return future
def wait_for_any(fs, timeout=None): def wait_for_any(fs, timeout=None):
@ -29,10 +52,10 @@ def wait_for_any(fs, timeout=None):
Returns pair (done futures, not done futures). Returns pair (done futures, not done futures).
""" """
green_fs = sum(1 for f in fs if isinstance(f, eu.GreenFuture)) green_fs = sum(1 for f in fs if isinstance(f, futures.GreenFuture))
if not green_fs: if not green_fs:
return tuple(futures.wait(fs, timeout=timeout, return tuple(_futures.wait(fs, timeout=timeout,
return_when=futures.FIRST_COMPLETED)) return_when=_futures.FIRST_COMPLETED))
else: else:
non_green_fs = len(fs) - green_fs non_green_fs = len(fs) - green_fs
if non_green_fs: if non_green_fs:
@ -40,11 +63,48 @@ def wait_for_any(fs, timeout=None):
" non-green futures in the same `wait_for_any`" " non-green futures in the same `wait_for_any`"
" call" % (green_fs, non_green_fs)) " call" % (green_fs, non_green_fs))
else: else:
return eu.wait_for_any(fs, timeout=timeout) return _wait_for_any_green(fs, timeout=timeout)
def make_completed_future(result): class _GreenWaiter(object):
"""Make with completed with given result.""" """Provides the event that wait_for_any() blocks on."""
future = futures.Future() def __init__(self):
future.set_result(result) self.event = greenthreading.Event()
return future
def add_result(self, future):
self.event.set()
def add_exception(self, future):
self.event.set()
def add_cancelled(self, future):
self.event.set()
def _wait_for_any_green(fs, timeout=None):
assert EVENTLET_AVAILABLE, 'eventlet is needed to wait on green futures'
def _partition_futures(fs):
done = set()
not_done = set()
for f in fs:
if f._state in _DONE_STATES:
done.add(f)
else:
not_done.add(f)
return (done, not_done)
with _base._AcquireFutures(fs):
(done, not_done) = _partition_futures(fs)
if done:
return (done, not_done)
waiter = _GreenWaiter()
for f in fs:
f._waiters.append(waiter)
waiter.event.wait(timeout)
for f in fs:
f._waiters.remove(waiter)
with _base._AcquireFutures(fs):
return _partition_futures(fs)