diff --git a/requirements.txt b/requirements.txt index e1e7ee37..03a3957f 100644 --- a/requirements.txt +++ b/requirements.txt @@ -12,6 +12,8 @@ Babel>=1.3 stevedore>=0.10 # Backport for concurrent.futures which exists in 3.2+ futures>=2.1.3 +# Only needed if the eventlet executor is used. +eventlet>=0.13.0 # NOTE(harlowja): if you want to be able to use the graph_utils # export_graph_to_dot function you will need to uncomment the following. # pydot>=1.0 diff --git a/taskflow/tests/unit/test_green_executor.py b/taskflow/tests/unit/test_green_executor.py new file mode 100644 index 00000000..4532c607 --- /dev/null +++ b/taskflow/tests/unit/test_green_executor.py @@ -0,0 +1,109 @@ +# -*- coding: utf-8 -*- + +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright (C) 2013 Yahoo! Inc. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import collections +import functools + +from taskflow import test + +from taskflow.utils import eventlet_utils as eu + + +class GreenExecutorTest(test.TestCase): + def make_funcs(self, called, amount): + + def store_call(name): + called[name] += 1 + + for i in range(0, amount): + yield functools.partial(store_call, name=int(i)) + + def test_func_calls(self): + called = collections.defaultdict(int) + + with eu.GreenExecutor(2) as e: + for f in self.make_funcs(called, 2): + e.submit(f) + + self.assertEquals(1, called[0]) + self.assertEquals(1, called[1]) + + def test_no_construction(self): + self.assertRaises(AssertionError, eu.GreenExecutor, 0) + self.assertRaises(AssertionError, eu.GreenExecutor, -1) + self.assertRaises(AssertionError, eu.GreenExecutor, "-1") + + def test_result_callback(self): + called = collections.defaultdict(int) + + def call_back(future): + called[future] += 1 + + funcs = list(self.make_funcs(called, 1)) + with eu.GreenExecutor(2) as e: + f = e.submit(funcs[0]) + f.add_done_callback(call_back) + + self.assertEquals(2, len(called)) + + def test_exception_transfer(self): + + def blowup(): + raise IOError("Broke!") + + with eu.GreenExecutor(2) as e: + f = e.submit(blowup) + + self.assertRaises(IOError, f.result) + + def test_result_transfer(self): + + def return_given(given): + return given + + create_am = 50 + with eu.GreenExecutor(2) as e: + futures = [] + for i in range(0, create_am): + futures.append(e.submit(functools.partial(return_given, i))) + + self.assertEquals(create_am, len(futures)) + for i in range(0, create_am): + result = futures[i].result() + self.assertEquals(i, result) + + def test_func_cancellation(self): + called = collections.defaultdict(int) + + futures = [] + with eu.GreenExecutor(2) as e: + for func in self.make_funcs(called, 2): + futures.append(e.submit(func)) + # Greenthreads don't start executing until we wait for them + # to, since nothing here does IO, this will work out correctly. + # + # If something here did a blocking call, then eventlet could swap + # one of the executors threads in, but nothing in this test does. + for f in futures: + self.assertFalse(f.running()) + f.cancel() + + self.assertEquals(0, len(called)) + for f in futures: + self.assertTrue(f.cancelled()) + self.assertTrue(f.done()) diff --git a/taskflow/utils/eventlet_utils.py b/taskflow/utils/eventlet_utils.py new file mode 100644 index 00000000..4a8129d1 --- /dev/null +++ b/taskflow/utils/eventlet_utils.py @@ -0,0 +1,127 @@ +# -*- coding: utf-8 -*- + +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright (C) 2013 Yahoo! Inc. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import logging +import threading + +from eventlet.green import threading as gthreading + +from eventlet import greenpool +from eventlet import patcher +from eventlet import queue + +from concurrent import futures + +from taskflow.utils import lock_utils + +LOG = logging.getLogger(__name__) + +# NOTE(harlowja): this object signals to threads that they should stop +# working and rest in peace. +_TOMBSTONE = object() + + +class _WorkItem(object): + def __init__(self, future, fn, args, kwargs): + self.future = future + self.fn = fn + self.args = args + self.kwargs = kwargs + + def run(self): + if not self.future.set_running_or_notify_cancel(): + return + try: + result = self.fn(*self.args, **self.kwargs) + except BaseException as e: + self.future.set_exception(e) + else: + self.future.set_result(result) + + +class _Worker(object): + def __init__(self, executor, work_queue, worker_id): + self.executor = executor + self.work_queue = work_queue + self.worker_id = worker_id + + def __call__(self): + try: + while True: + work = self.work_queue.get(block=True) + if work is _TOMBSTONE: + # NOTE(harlowja): give notice to other workers (this is + # basically a chain of tombstone calls that will cause all + # the workers on the queue to eventually shut-down). + self.work_queue.put(_TOMBSTONE) + break + else: + work.run() + except BaseException: + LOG.critical("Exception in worker %s of '%s'", + self.worker_id, self.executor, exc_info=True) + + +class _GreenFuture(futures.Future): + def __init__(self): + super(_GreenFuture, self).__init__() + # NOTE(harlowja): replace the built-in condition with a greenthread + # compatible one so that when getting the result of this future the + # functions will correctly yield to eventlet. If this is not done then + # waiting on the future never actually causes the greenthreads to run + # and thus you wait for infinity. + if not patcher.is_monkey_patched('threading'): + self._condition = gthreading.Condition() + + +class GreenExecutor(futures.Executor): + """A greenthread backed executor.""" + + def __init__(self, max_workers): + assert int(max_workers) > 0, 'Max workers must be greater than zero' + self._max_workers = int(max_workers) + self._pool = greenpool.GreenPool(self._max_workers) + self._work_queue = queue.LightQueue() + self._shutdown_lock = threading.RLock() + self._shutdown = False + + @lock_utils.locked(lock='_shutdown_lock') + def submit(self, fn, *args, **kwargs): + if self._shutdown: + raise RuntimeError('cannot schedule new futures after shutdown') + f = _GreenFuture() + w = _WorkItem(f, fn, args, kwargs) + self._work_queue.put(w) + # Spin up any new workers (since they are spun up on demand and + # not at executor initialization). + self._spin_up() + return f + + def _spin_up(self): + cur_am = (self._pool.running() + self._pool.waiting()) + if cur_am < self._max_workers and cur_am < self._work_queue.qsize(): + # Spin up a new worker to do the work as we are behind. + worker = _Worker(self, self._work_queue, cur_am + 1) + self._pool.spawn(worker) + + def shutdown(self, wait=True): + with self._shutdown_lock: + self._shutdown = True + self._work_queue.put(_TOMBSTONE) + if wait: + self._pool.waitall()