Moved the code in the futures package to concurrent.futures as per PEP 3148; unified the codebase to support both Python 2 and 3 in a single tree; added support to Python 2.5; added tox.ini for easy testing with multiple Python versions

This commit is contained in:
Alex Gr?nholm
2010-12-18 06:01:09 +00:00
parent 837ac01ca1
commit 43539ed75f
25 changed files with 295 additions and 2106 deletions

3
concurrent/__init__.py Normal file
View File

@@ -0,0 +1,3 @@
from pkgutil import extend_path
__path__ = extend_path(__path__, __name__)

View File

@@ -0,0 +1,18 @@
# Copyright 2009 Brian Quinlan. All Rights Reserved.
# Licensed to PSF under a Contributor Agreement.
"""Execute computations asynchronously using threads or processes."""
__author__ = 'Brian Quinlan (brian@sweetapp.com)'
from concurrent.futures._base import (FIRST_COMPLETED,
FIRST_EXCEPTION,
ALL_COMPLETED,
CancelledError,
TimeoutError,
Future,
Executor,
wait,
as_completed)
from concurrent.futures.process import ProcessPoolExecutor
from concurrent.futures.thread import ThreadPoolExecutor

View File

@@ -1,14 +1,19 @@
# Copyright 2009 Brian Quinlan. All Rights Reserved.
# Licensed to PSF under a Contributor Agreement.
__author__ = 'Brian Quinlan (brian@sweetapp.com)'
import collections
from __future__ import with_statement
import functools
import logging
import threading
import time
try:
from collections import namedtuple
except ImportError:
from concurrent.futures._compat import namedtuple
__author__ = 'Brian Quinlan (brian@sweetapp.com)'
FIRST_COMPLETED = 'FIRST_COMPLETED'
FIRST_EXCEPTION = 'FIRST_EXCEPTION'
ALL_COMPLETED = 'ALL_COMPLETED'
@@ -40,7 +45,7 @@ _STATE_TO_DESCRIPTION_MAP = {
}
# Logger for internal use by the futures package.
LOGGER = logging.getLogger("futures")
LOGGER = logging.getLogger("concurrent.futures")
STDERR_HANDLER = logging.StreamHandler()
LOGGER.addHandler(STDERR_HANDLER)
@@ -227,7 +232,7 @@ def as_completed(fs, timeout=None):
for f in fs:
f._waiters.remove(waiter)
DoneAndNotDoneFutures = collections.namedtuple(
DoneAndNotDoneFutures = namedtuple(
'DoneAndNotDoneFutures', 'done not_done')
def wait(fs, timeout=None, return_when=ALL_COMPLETED):
"""Wait for the futures in the given sequence to complete.

View File

@@ -0,0 +1,107 @@
import sys
#if sys.version_info >= (2, 6):
# from collections import namedtuple
#else:
## Copied from Python 2.6 standard library
from keyword import iskeyword as _iskeyword
from operator import itemgetter as _itemgetter
_sys = sys
def namedtuple(typename, field_names, verbose=False):
"""Returns a new subclass of tuple with named fields.
>>> Point = namedtuple('Point', 'x y')
>>> Point.__doc__ # docstring for the new class
'Point(x, y)'
>>> p = Point(11, y=22) # instantiate with positional args or keywords
>>> p[0] + p[1] # indexable like a plain tuple
33
>>> x, y = p # unpack like a regular tuple
>>> x, y
(11, 22)
>>> p.x + p.y # fields also accessable by name
33
>>> d = p._asdict() # convert to a dictionary
>>> d['x']
11
>>> Point(**d) # convert from a dictionary
Point(x=11, y=22)
>>> p._replace(x=100) # _replace() is like str.replace() but targets named fields
Point(x=100, y=22)
"""
# Parse and validate the field names. Validation serves two purposes,
# generating informative error messages and preventing template injection attacks.
if isinstance(field_names, basestring):
field_names = field_names.replace(',', ' ').split() # names separated by whitespace and/or commas
field_names = tuple(map(str, field_names))
for name in (typename,) + field_names:
if not all(c.isalnum() or c=='_' for c in name):
raise ValueError('Type names and field names can only contain alphanumeric characters and underscores: %r' % name)
if _iskeyword(name):
raise ValueError('Type names and field names cannot be a keyword: %r' % name)
if name[0].isdigit():
raise ValueError('Type names and field names cannot start with a number: %r' % name)
seen_names = set()
for name in field_names:
if name.startswith('_'):
raise ValueError('Field names cannot start with an underscore: %r' % name)
if name in seen_names:
raise ValueError('Encountered duplicate field name: %r' % name)
seen_names.add(name)
# Create and fill-in the class template
numfields = len(field_names)
argtxt = repr(field_names).replace("'", "")[1:-1] # tuple repr without parens or quotes
reprtxt = ', '.join('%s=%%r' % name for name in field_names)
dicttxt = ', '.join('%r: t[%d]' % (name, pos) for pos, name in enumerate(field_names))
template = '''class %(typename)s(tuple):
'%(typename)s(%(argtxt)s)' \n
__slots__ = () \n
_fields = %(field_names)r \n
def __new__(_cls, %(argtxt)s):
return _tuple.__new__(_cls, (%(argtxt)s)) \n
@classmethod
def _make(cls, iterable, new=tuple.__new__, len=len):
'Make a new %(typename)s object from a sequence or iterable'
result = new(cls, iterable)
if len(result) != %(numfields)d:
raise TypeError('Expected %(numfields)d arguments, got %%d' %% len(result))
return result \n
def __repr__(self):
return '%(typename)s(%(reprtxt)s)' %% self \n
def _asdict(t):
'Return a new dict which maps field names to their values'
return {%(dicttxt)s} \n
def _replace(_self, **kwds):
'Return a new %(typename)s object replacing specified fields with new values'
result = _self._make(map(kwds.pop, %(field_names)r, _self))
if kwds:
raise ValueError('Got unexpected field names: %%r' %% kwds.keys())
return result \n
def __getnewargs__(self):
return tuple(self) \n\n''' % locals()
for i, name in enumerate(field_names):
template += ' %s = _property(_itemgetter(%d))\n' % (name, i)
if verbose:
print template
# Execute the template string in a temporary namespace and
# support tracing utilities by setting a value for frame.f_globals['__name__']
namespace = dict(_itemgetter=_itemgetter, __name__='namedtuple_%s' % typename,
_property=property, _tuple=tuple)
try:
exec template in namespace
except SyntaxError, e:
raise SyntaxError(e.message + ':\n' + template)
result = namespace[typename]
# For pickling to work, the __module__ variable needs to be set to the frame
# where the named tuple is created. Bypass this step in enviroments where
# sys._getframe is not defined (Jython for example).
if hasattr(_sys, '_getframe'):
result.__module__ = _sys._getframe(1).f_globals.get('__name__', '__main__')
return result

View File

@@ -43,14 +43,21 @@ Process #1..n:
_ResultItems in "Request Q"
"""
__author__ = 'Brian Quinlan (brian@sweetapp.com)'
from __future__ import with_statement
import atexit
from futures import _base
import queue
import multiprocessing
import threading
import weakref
import sys
from concurrent.futures import _base
try:
import queue
except ImportError:
import Queue as queue
__author__ = 'Brian Quinlan (brian@sweetapp.com)'
# Workers are created as daemon threads and processes. This is done to allow the
# interpreter to exit when there are still idle processes in a
@@ -137,7 +144,8 @@ def _process_worker(call_queue, result_queue, shutdown):
else:
try:
r = call_item.fn(*call_item.args, **call_item.kwargs)
except BaseException as e:
except BaseException:
e = sys.exc_info()[1]
result_queue.put(_ResultItem(call_item.work_id,
exception=e))
else:

View File

@@ -3,13 +3,20 @@
"""Implements ThreadPoolExecutor."""
__author__ = 'Brian Quinlan (brian@sweetapp.com)'
from __future__ import with_statement
import atexit
from futures import _base
import queue
import threading
import weakref
import sys
from concurrent.futures import _base
try:
import queue
except ImportError:
import Queue as queue
__author__ = 'Brian Quinlan (brian@sweetapp.com)'
# Workers are created as daemon threads. This is done to allow the interpreter
# to exit when there are still idle threads in a ThreadPoolExecutor's thread
@@ -63,7 +70,8 @@ class _WorkItem(object):
try:
result = self.fn(*self.args, **self.kwargs)
except BaseException as e:
except BaseException:
e = sys.exc_info()[1]
self.future.set_exception(e)
else:
self.future.set_result(result)
@@ -84,7 +92,7 @@ def _worker(executor_reference, work_queue):
del executor
else:
work_item.run()
except BaseException as e:
except BaseException:
_base.LOGGER.critical('Exception in worker', exc_info=True)
class ThreadPoolExecutor(_base.Executor):

View File

@@ -1,11 +1,17 @@
"""Compare the speed of downloading URLs sequentially vs. using futures."""
import datetime
import functools
import futures.thread
import time
import timeit
import urllib2
import sys
try:
from urllib2 import urlopen
except ImportError:
from urllib.request import urlopen
from concurrent.futures import (as_completed, ThreadPoolExecutor,
ProcessPoolExecutor)
URLS = ['http://www.google.com/',
'http://www.apple.com/',
@@ -20,7 +26,8 @@ URLS = ['http://www.google.com/',
'http://www.blogger.com/']
def load_url(url, timeout):
return urllib2.urlopen(url, timeout=timeout).read()
kwargs = {'timeout': timeout} if sys.version_info >= (2, 6) else {}
return urlopen(url, **kwargs).read()
def download_urls_sequential(urls, timeout=60):
url_to_content = {}
@@ -37,7 +44,7 @@ def download_urls_with_executor(urls, executor, timeout=60):
future_to_url = dict((executor.submit(load_url, url, timeout), url)
for url in urls)
for future in futures.as_completed(future_to_url):
for future in as_completed(future_to_url):
try:
url_to_content[future_to_url[future]] = future.result()
except:
@@ -52,17 +59,16 @@ def main():
('processes',
functools.partial(download_urls_with_executor,
URLS,
futures.ProcessPoolExecutor(10))),
ProcessPoolExecutor(10))),
('threads',
functools.partial(download_urls_with_executor,
URLS,
futures.ThreadPoolExecutor(10)))]:
print name.ljust(12),
ThreadPoolExecutor(10)))]:
sys.stdout.write('%s: ' % name.ljust(12))
start = time.time()
url_map = fn()
print '%.2f seconds (%d of %d downloaded)' % (time.time() - start,
len(url_map),
len(URLS))
sys.stdout.write('%.2f seconds (%d of %d downloaded)\n' %
(time.time() - start, len(url_map), len(URLS)))
if __name__ == '__main__':
main()

View File

@@ -1,11 +1,11 @@
:mod:`futures` --- Asynchronous computation
:mod:`concurrent.futures` --- Asynchronous computation
===========================================
.. module:: futures
.. module:: concurrent.futures
:synopsis: Execute computations asynchronously using threads or processes.
The :mod:`futures` module provides a high-level interface for asynchronously
executing callables.
The :mod:`concurrent.futures` module provides a high-level interface for
asynchronously executing callables.
The asynchronous execution can be be performed by threads using
:class:`ThreadPoolExecutor` or seperate processes using
@@ -120,7 +120,7 @@ ThreadPoolExecutor Example
^^^^^^^^^^^^^^^^^^^^^^^^^^
::
import futures
from concurrent import futures
import urllib.request
URLS = ['http://www.foxnews.com/',

24
futures/__init__.py Normal file
View File

@@ -0,0 +1,24 @@
# Copyright 2009 Brian Quinlan. All Rights Reserved.
# Licensed to PSF under a Contributor Agreement.
"""Execute computations asynchronously using threads or processes."""
import warnings
from concurrent.futures import (FIRST_COMPLETED,
FIRST_EXCEPTION,
ALL_COMPLETED,
CancelledError,
TimeoutError,
Future,
Executor,
wait,
as_completed,
ProcessPoolExecutor,
ThreadPoolExecutor)
__author__ = 'Brian Quinlan (brian@sweetapp.com)'
warnings.warn('The futures package has been deprecated. '
'Use the concurrent.futures package instead.',
DeprecationWarning)

1
futures/process.py Normal file
View File

@@ -0,0 +1 @@
from concurrent.futures import ProcessPoolExecutor

1
futures/thread.py Normal file
View File

@@ -0,0 +1 @@
from concurrent.futures import ThreadPoolExecutor

View File

@@ -1,6 +1,9 @@
import futures
from __future__ import with_statement
import math
import time
import sys
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
PRIMES = [
112272535095293,
@@ -25,23 +28,23 @@ def sequential():
return list(map(is_prime, PRIMES))
def with_process_pool_executor():
with futures.ProcessPoolExecutor(10) as executor:
with ProcessPoolExecutor(10) as executor:
return list(executor.map(is_prime, PRIMES))
def with_thread_pool_executor():
with futures.ThreadPoolExecutor(10) as executor:
with ThreadPoolExecutor(10) as executor:
return list(executor.map(is_prime, PRIMES))
def main():
for name, fn in [('sequential', sequential),
('processes', with_process_pool_executor),
('threads', with_thread_pool_executor)]:
print('%s: ' % name.ljust(12), end='')
sys.stdout.write('%s: ' % name.ljust(12))
start = time.time()
if fn() != [True] * len(PRIMES):
print('failed')
sys.stdout.write('failed\n')
else:
print('%.2f seconds' % (time.time() - start))
sys.stdout.write('%.2f seconds\n' % (time.time() - start))
if __name__ == '__main__':
main()

View File

@@ -1,18 +0,0 @@
# Copyright 2009 Brian Quinlan. All Rights Reserved.
# Licensed to PSF under a Contributor Agreement.
"""Execute computations asynchronously using threads or processes."""
__author__ = 'Brian Quinlan (brian@sweetapp.com)'
from futures._base import (FIRST_COMPLETED,
FIRST_EXCEPTION,
ALL_COMPLETED,
CancelledError,
TimeoutError,
Future,
Executor,
wait,
as_completed)
from futures.process import ProcessPoolExecutor
from futures.thread import ThreadPoolExecutor

View File

@@ -1,337 +0,0 @@
# Copyright 2009 Brian Quinlan. All Rights Reserved.
# Licensed to PSF under a Contributor Agreement.
"""Implements ProcessPoolExecutor.
The follow diagram and text describe the data-flow through the system:
|======================= In-process =====================|== Out-of-process ==|
+----------+ +----------+ +--------+ +-----------+ +---------+
| | => | Work Ids | => | | => | Call Q | => | |
| | +----------+ | | +-----------+ | |
| | | ... | | | | ... | | |
| | | 6 | | | | 5, call() | | |
| | | 7 | | | | ... | | |
| Process | | ... | | Local | +-----------+ | Process |
| Pool | +----------+ | Worker | | #1..n |
| Executor | | Thread | | |
| | +----------- + | | +-----------+ | |
| | <=> | Work Items | <=> | | <= | Result Q | <= | |
| | +------------+ | | +-----------+ | |
| | | 6: call() | | | | ... | | |
| | | future | | | | 4, result | | |
| | | ... | | | | 3, except | | |
+----------+ +------------+ +--------+ +-----------+ +---------+
Executor.submit() called:
- creates a uniquely numbered _WorkItem and adds it to the "Work Items" dict
- adds the id of the _WorkItem to the "Work Ids" queue
Local worker thread:
- reads work ids from the "Work Ids" queue and looks up the corresponding
WorkItem from the "Work Items" dict: if the work item has been cancelled then
it is simply removed from the dict, otherwise it is repackaged as a
_CallItem and put in the "Call Q". New _CallItems are put in the "Call Q"
until "Call Q" is full. NOTE: the size of the "Call Q" is kept small because
calls placed in the "Call Q" can no longer be cancelled with Future.cancel().
- reads _ResultItems from "Result Q", updates the future stored in the
"Work Items" dict and deletes the dict entry
Process #1..n:
- reads _CallItems from "Call Q", executes the calls, and puts the resulting
_ResultItems in "Request Q"
"""
__author__ = 'Brian Quinlan (brian@sweetapp.com)'
import atexit
import _base
import Queue
import multiprocessing
import threading
import weakref
# Workers are created as daemon threads and processes. This is done to allow the
# interpreter to exit when there are still idle processes in a
# ProcessPoolExecutor's process pool (i.e. shutdown() was not called). However,
# allowing workers to die with the interpreter has two undesirable properties:
# - The workers would still be running during interpretor shutdown,
# meaning that they would fail in unpredictable ways.
# - The workers could be killed while evaluating a work item, which could
# be bad if the callable being evaluated has external side-effects e.g.
# writing to a file.
#
# To work around this problem, an exit handler is installed which tells the
# workers to exit when their work queues are empty and then waits until the
# threads/processes finish.
_thread_references = set()
_shutdown = False
def _python_exit():
global _shutdown
_shutdown = True
for thread_reference in _thread_references:
thread = thread_reference()
if thread is not None:
thread.join()
def _remove_dead_thread_references():
"""Remove inactive threads from _thread_references.
Should be called periodically to prevent memory leaks in scenarios such as:
>>> while True:
>>> ... t = ThreadPoolExecutor(max_workers=5)
>>> ... t.map(int, ['1', '2', '3', '4', '5'])
"""
for thread_reference in set(_thread_references):
if thread_reference() is None:
_thread_references.discard(thread_reference)
# Controls how many more calls than processes will be queued in the call queue.
# A smaller number will mean that processes spend more time idle waiting for
# work while a larger number will make Future.cancel() succeed less frequently
# (Futures in the call queue cannot be cancelled).
EXTRA_QUEUED_CALLS = 1
class _WorkItem(object):
def __init__(self, future, fn, args, kwargs):
self.future = future
self.fn = fn
self.args = args
self.kwargs = kwargs
class _ResultItem(object):
def __init__(self, work_id, exception=None, result=None):
self.work_id = work_id
self.exception = exception
self.result = result
class _CallItem(object):
def __init__(self, work_id, fn, args, kwargs):
self.work_id = work_id
self.fn = fn
self.args = args
self.kwargs = kwargs
def _process_worker(call_queue, result_queue, shutdown):
"""Evaluates calls from call_queue and places the results in result_queue.
This worker is run in a seperate process.
Args:
call_queue: A multiprocessing.Queue of _CallItems that will be read and
evaluated by the worker.
result_queue: A multiprocessing.Queue of _ResultItems that will written
to by the worker.
shutdown: A multiprocessing.Event that will be set as a signal to the
worker that it should exit when call_queue is empty.
"""
while True:
try:
call_item = call_queue.get(block=True, timeout=0.1)
except Queue.Empty:
if shutdown.is_set():
return
else:
try:
r = call_item.fn(*call_item.args, **call_item.kwargs)
except BaseException as e:
result_queue.put(_ResultItem(call_item.work_id,
exception=e))
else:
result_queue.put(_ResultItem(call_item.work_id,
result=r))
def _add_call_item_to_queue(pending_work_items,
work_ids,
call_queue):
"""Fills call_queue with _WorkItems from pending_work_items.
This function never blocks.
Args:
pending_work_items: A dict mapping work ids to _WorkItems e.g.
{5: <_WorkItem...>, 6: <_WorkItem...>, ...}
work_ids: A queue.Queue of work ids e.g. Queue([5, 6, ...]). Work ids
are consumed and the corresponding _WorkItems from
pending_work_items are transformed into _CallItems and put in
call_queue.
call_queue: A multiprocessing.Queue that will be filled with _CallItems
derived from _WorkItems.
"""
while True:
if call_queue.full():
return
try:
work_id = work_ids.get(block=False)
except Queue.Empty:
return
else:
work_item = pending_work_items[work_id]
if work_item.future.set_running_or_notify_cancel():
call_queue.put(_CallItem(work_id,
work_item.fn,
work_item.args,
work_item.kwargs),
block=True)
else:
del pending_work_items[work_id]
continue
def _queue_manangement_worker(executor_reference,
processes,
pending_work_items,
work_ids_queue,
call_queue,
result_queue,
shutdown_process_event):
"""Manages the communication between this process and the worker processes.
This function is run in a local thread.
Args:
executor_reference: A weakref.ref to the ProcessPoolExecutor that owns
this thread. Used to determine if the ProcessPoolExecutor has been
garbage collected and that this function can exit.
process: A list of the multiprocessing.Process instances used as
workers.
pending_work_items: A dict mapping work ids to _WorkItems e.g.
{5: <_WorkItem...>, 6: <_WorkItem...>, ...}
work_ids_queue: A queue.Queue of work ids e.g. Queue([5, 6, ...]).
call_queue: A multiprocessing.Queue that will be filled with _CallItems
derived from _WorkItems for processing by the process workers.
result_queue: A multiprocessing.Queue of _ResultItems generated by the
process workers.
shutdown_process_event: A multiprocessing.Event used to signal the
process workers that they should exit when their work queue is
empty.
"""
while True:
_add_call_item_to_queue(pending_work_items,
work_ids_queue,
call_queue)
try:
result_item = result_queue.get(block=True, timeout=0.1)
except Queue.Empty:
executor = executor_reference()
# No more work items can be added if:
# - The interpreter is shutting down OR
# - The executor that owns this worker has been collected OR
# - The executor that owns this worker has been shutdown.
if _shutdown or executor is None or executor._shutdown_thread:
# Since no new work items can be added, it is safe to shutdown
# this thread if there are no pending work items.
if not pending_work_items:
shutdown_process_event.set()
# If .join() is not called on the created processes then
# some multiprocessing.Queue methods may deadlock on Mac OS
# X.
for p in processes:
p.join()
return
del executor
else:
work_item = pending_work_items[result_item.work_id]
del pending_work_items[result_item.work_id]
if result_item.exception:
work_item.future.set_exception(result_item.exception)
else:
work_item.future.set_result(result_item.result)
class ProcessPoolExecutor(_base.Executor):
def __init__(self, max_workers=None):
"""Initializes a new ProcessPoolExecutor instance.
Args:
max_workers: The maximum number of processes that can be used to
execute the given calls. If None or not given then as many
worker processes will be created as the machine has processors.
"""
_remove_dead_thread_references()
if max_workers is None:
self._max_workers = multiprocessing.cpu_count()
else:
self._max_workers = max_workers
# Make the call queue slightly larger than the number of processes to
# prevent the worker processes from idling. But don't make it too big
# because futures in the call queue cannot be cancelled.
self._call_queue = multiprocessing.Queue(self._max_workers +
EXTRA_QUEUED_CALLS)
self._result_queue = multiprocessing.Queue()
self._work_ids = Queue.Queue()
self._queue_management_thread = None
self._processes = set()
# Shutdown is a two-step process.
self._shutdown_thread = False
self._shutdown_process_event = multiprocessing.Event()
self._shutdown_lock = threading.Lock()
self._queue_count = 0
self._pending_work_items = {}
def _start_queue_management_thread(self):
if self._queue_management_thread is None:
self._queue_management_thread = threading.Thread(
target=_queue_manangement_worker,
args=(weakref.ref(self),
self._processes,
self._pending_work_items,
self._work_ids,
self._call_queue,
self._result_queue,
self._shutdown_process_event))
self._queue_management_thread.daemon = True
self._queue_management_thread.start()
_thread_references.add(weakref.ref(self._queue_management_thread))
def _adjust_process_count(self):
for _ in range(len(self._processes), self._max_workers):
p = multiprocessing.Process(
target=_process_worker,
args=(self._call_queue,
self._result_queue,
self._shutdown_process_event))
p.start()
self._processes.add(p)
def submit(self, fn, *args, **kwargs):
with self._shutdown_lock:
if self._shutdown_thread:
raise RuntimeError('cannot schedule new futures after shutdown')
f = _base.Future()
w = _WorkItem(f, fn, args, kwargs)
self._pending_work_items[self._queue_count] = w
self._work_ids.put(self._queue_count)
self._queue_count += 1
self._start_queue_management_thread()
self._adjust_process_count()
return f
submit.__doc__ = _base.Executor.submit.__doc__
def shutdown(self, wait=True):
with self._shutdown_lock:
self._shutdown_thread = True
if wait:
if self._queue_management_thread:
self._queue_management_thread.join()
# To reduce the risk of openning too many files, remove references to
# objects that use file descriptors.
self._queue_management_thread = None
self._call_queue = None
self._result_queue = None
self._shutdown_process_event = None
self._processes = None
shutdown.__doc__ = _base.Executor.shutdown.__doc__
atexit.register(_python_exit)

View File

@@ -1,136 +0,0 @@
# Copyright 2009 Brian Quinlan. All Rights Reserved.
# Licensed to PSF under a Contributor Agreement.
"""Implements ThreadPoolExecutor."""
__author__ = 'Brian Quinlan (brian@sweetapp.com)'
import atexit
import _base
import Queue
import threading
import weakref
# Workers are created as daemon threads. This is done to allow the interpreter
# to exit when there are still idle threads in a ThreadPoolExecutor's thread
# pool (i.e. shutdown() was not called). However, allowing workers to die with
# the interpreter has two undesirable properties:
# - The workers would still be running during interpretor shutdown,
# meaning that they would fail in unpredictable ways.
# - The workers could be killed while evaluating a work item, which could
# be bad if the callable being evaluated has external side-effects e.g.
# writing to a file.
#
# To work around this problem, an exit handler is installed which tells the
# workers to exit when their work queues are empty and then waits until the
# threads finish.
_thread_references = set()
_shutdown = False
def _python_exit():
global _shutdown
_shutdown = True
for thread_reference in _thread_references:
thread = thread_reference()
if thread is not None:
thread.join()
def _remove_dead_thread_references():
"""Remove inactive threads from _thread_references.
Should be called periodically to prevent memory leaks in scenarios such as:
>>> while True:
... t = ThreadPoolExecutor(max_workers=5)
... t.map(int, ['1', '2', '3', '4', '5'])
"""
for thread_reference in set(_thread_references):
if thread_reference() is None:
_thread_references.discard(thread_reference)
atexit.register(_python_exit)
class _WorkItem(object):
def __init__(self, future, fn, args, kwargs):
self.future = future
self.fn = fn
self.args = args
self.kwargs = kwargs
def run(self):
if not self.future.set_running_or_notify_cancel():
return
try:
result = self.fn(*self.args, **self.kwargs)
except BaseException as e:
self.future.set_exception(e)
else:
self.future.set_result(result)
def _worker(executor_reference, work_queue):
try:
while True:
try:
work_item = work_queue.get(block=True, timeout=0.1)
except Queue.Empty:
executor = executor_reference()
# Exit if:
# - The interpreter is shutting down OR
# - The executor that owns the worker has been collected OR
# - The executor that owns the worker has been shutdown.
if _shutdown or executor is None or executor._shutdown:
return
del executor
else:
work_item.run()
except BaseException as e:
_base.LOGGER.critical('Exception in worker', exc_info=True)
class ThreadPoolExecutor(_base.Executor):
def __init__(self, max_workers):
"""Initializes a new ThreadPoolExecutor instance.
Args:
max_workers: The maximum number of threads that can be used to
execute the given calls.
"""
_remove_dead_thread_references()
self._max_workers = max_workers
self._work_queue = Queue.Queue()
self._threads = set()
self._shutdown = False
self._shutdown_lock = threading.Lock()
def submit(self, fn, *args, **kwargs):
with self._shutdown_lock:
if self._shutdown:
raise RuntimeError('cannot schedule new futures after shutdown')
f = _base.Future()
w = _WorkItem(f, fn, args, kwargs)
self._work_queue.put(w)
self._adjust_thread_count()
return f
submit.__doc__ = _base.Executor.submit.__doc__
def _adjust_thread_count(self):
# TODO(bquinlan): Should avoid creating new threads if there are more
# idle threads than items in the work queue.
if len(self._threads) < self._max_workers:
t = threading.Thread(target=_worker,
args=(weakref.ref(self), self._work_queue))
t.daemon = True
t.start()
self._threads.add(t)
_thread_references.add(weakref.ref(t))
def shutdown(self, wait=True):
with self._shutdown_lock:
self._shutdown = True
if wait:
for t in self._threads:
t.join()
shutdown.__doc__ = _base.Executor.shutdown.__doc__

View File

@@ -1,47 +0,0 @@
import futures
import math
import time
PRIMES = [
112272535095293,
112582705942171,
112272535095293,
115280095190773,
115797848077099,
117450548693743,
993960000099397]
def is_prime(n):
if n % 2 == 0:
return False
sqrt_n = int(math.floor(math.sqrt(n)))
for i in range(3, sqrt_n + 1, 2):
if n % i == 0:
return False
return True
def sequential():
return list(map(is_prime, PRIMES))
def with_process_pool_executor():
with futures.ProcessPoolExecutor(10) as executor:
return list(executor.map(is_prime, PRIMES))
def with_thread_pool_executor():
with futures.ThreadPoolExecutor(10) as executor:
return list(executor.map(is_prime, PRIMES))
def main():
for name, fn in [('sequential', sequential),
('processes', with_process_pool_executor),
('threads', with_thread_pool_executor)]:
print name.ljust(12),
start = time.time()
if fn() != [True] * len(PRIMES):
print 'failed'
else:
print '%.2f seconds' % (time.time() - start)
if __name__ == '__main__':
main()

View File

@@ -1,18 +0,0 @@
#!/usr/bin/env python
from distutils.core import setup
setup(name='futures',
version='2.0',
description='Java-style futures implementation in Python 2.x',
author='Brian Quinlan',
author_email='brian@sweetapp.com',
url='http://code.google.com/p/pythonfutures',
download_url='http://pypi.python.org/pypi/futures3/',
packages=['futures'],
license='BSD',
classifiers=['License :: OSI Approved :: BSD License',
'Development Status :: 5 - Production/Stable',
'Intended Audience :: Developers',
'Programming Language :: Python :: 2']
)

View File

@@ -1,68 +0,0 @@
"""Compare the speed of downloading URLs sequentially vs. using futures."""
import datetime
import functools
import futures.thread
import time
import timeit
import urllib.request
URLS = ['http://www.google.com/',
'http://www.apple.com/',
'http://www.ibm.com',
'http://www.thisurlprobablydoesnotexist.com',
'http://www.slashdot.org/',
'http://www.python.org/',
'http://www.bing.com/',
'http://www.facebook.com/',
'http://www.yahoo.com/',
'http://www.youtube.com/',
'http://www.blogger.com/']
def load_url(url, timeout):
return urllib.request.urlopen(url, timeout=timeout).read()
def download_urls_sequential(urls, timeout=60):
url_to_content = {}
for url in urls:
try:
url_to_content[url] = load_url(url, timeout=timeout)
except:
pass
return url_to_content
def download_urls_with_executor(urls, executor, timeout=60):
try:
url_to_content = {}
future_to_url = dict((executor.submit(load_url, url, timeout), url)
for url in urls)
for future in futures.as_completed(future_to_url):
try:
url_to_content[future_to_url[future]] = future.result()
except:
pass
return url_to_content
finally:
executor.shutdown()
def main():
for name, fn in [('sequential',
functools.partial(download_urls_sequential, URLS)),
('processes',
functools.partial(download_urls_with_executor,
URLS,
futures.ProcessPoolExecutor(10))),
('threads',
functools.partial(download_urls_with_executor,
URLS,
futures.ThreadPoolExecutor(10)))]:
print('%s: ' % name.ljust(12), end='')
start = time.time()
url_map = fn()
print('%.2f seconds (%d of %d downloaded)' % (time.time() - start,
len(url_map),
len(URLS)))
if __name__ == '__main__':
main()

View File

@@ -1,18 +0,0 @@
# Copyright 2009 Brian Quinlan. All Rights Reserved.
# Licensed to PSF under a Contributor Agreement.
"""Execute computations asynchronously using threads or processes."""
__author__ = 'Brian Quinlan (brian@sweetapp.com)'
from futures._base import (FIRST_COMPLETED,
FIRST_EXCEPTION,
ALL_COMPLETED,
CancelledError,
TimeoutError,
Future,
Executor,
wait,
as_completed)
from futures.process import ProcessPoolExecutor
from futures.thread import ThreadPoolExecutor

View File

@@ -1,569 +0,0 @@
# Copyright 2009 Brian Quinlan. All Rights Reserved.
# Licensed to PSF under a Contributor Agreement.
__author__ = 'Brian Quinlan (brian@sweetapp.com)'
import collections
import functools
import logging
import threading
import time
FIRST_COMPLETED = 'FIRST_COMPLETED'
FIRST_EXCEPTION = 'FIRST_EXCEPTION'
ALL_COMPLETED = 'ALL_COMPLETED'
_AS_COMPLETED = '_AS_COMPLETED'
# Possible future states (for internal use by the futures package).
PENDING = 'PENDING'
RUNNING = 'RUNNING'
# The future was cancelled by the user...
CANCELLED = 'CANCELLED'
# ...and _Waiter.add_cancelled() was called by a worker.
CANCELLED_AND_NOTIFIED = 'CANCELLED_AND_NOTIFIED'
FINISHED = 'FINISHED'
_FUTURE_STATES = [
PENDING,
RUNNING,
CANCELLED,
CANCELLED_AND_NOTIFIED,
FINISHED
]
_STATE_TO_DESCRIPTION_MAP = {
PENDING: "pending",
RUNNING: "running",
CANCELLED: "cancelled",
CANCELLED_AND_NOTIFIED: "cancelled",
FINISHED: "finished"
}
# Logger for internal use by the futures package.
LOGGER = logging.getLogger("futures")
STDERR_HANDLER = logging.StreamHandler()
LOGGER.addHandler(STDERR_HANDLER)
class Error(Exception):
"""Base class for all future-related exceptions."""
pass
class CancelledError(Error):
"""The Future was cancelled."""
pass
class TimeoutError(Error):
"""The operation exceeded the given deadline."""
pass
class _Waiter(object):
"""Provides the event that wait() and as_completed() block on."""
def __init__(self):
self.event = threading.Event()
self.finished_futures = []
def add_result(self, future):
self.finished_futures.append(future)
def add_exception(self, future):
self.finished_futures.append(future)
def add_cancelled(self, future):
self.finished_futures.append(future)
class _AsCompletedWaiter(_Waiter):
"""Used by as_completed()."""
def __init__(self):
super(_AsCompletedWaiter, self).__init__()
self.lock = threading.Lock()
def add_result(self, future):
with self.lock:
super(_AsCompletedWaiter, self).add_result(future)
self.event.set()
def add_exception(self, future):
with self.lock:
super(_AsCompletedWaiter, self).add_exception(future)
self.event.set()
def add_cancelled(self, future):
with self.lock:
super(_AsCompletedWaiter, self).add_cancelled(future)
self.event.set()
class _FirstCompletedWaiter(_Waiter):
"""Used by wait(return_when=FIRST_COMPLETED)."""
def add_result(self, future):
super().add_result(future)
self.event.set()
def add_exception(self, future):
super().add_exception(future)
self.event.set()
def add_cancelled(self, future):
super().add_cancelled(future)
self.event.set()
class _AllCompletedWaiter(_Waiter):
"""Used by wait(return_when=FIRST_EXCEPTION and ALL_COMPLETED)."""
def __init__(self, num_pending_calls, stop_on_exception):
self.num_pending_calls = num_pending_calls
self.stop_on_exception = stop_on_exception
super().__init__()
def _decrement_pending_calls(self):
self.num_pending_calls -= 1
if not self.num_pending_calls:
self.event.set()
def add_result(self, future):
super().add_result(future)
self._decrement_pending_calls()
def add_exception(self, future):
super().add_exception(future)
if self.stop_on_exception:
self.event.set()
else:
self._decrement_pending_calls()
def add_cancelled(self, future):
super().add_cancelled(future)
self._decrement_pending_calls()
class _AcquireFutures(object):
"""A context manager that does an ordered acquire of Future conditions."""
def __init__(self, futures):
self.futures = sorted(futures, key=id)
def __enter__(self):
for future in self.futures:
future._condition.acquire()
def __exit__(self, *args):
for future in self.futures:
future._condition.release()
def _create_and_install_waiters(fs, return_when):
if return_when == _AS_COMPLETED:
waiter = _AsCompletedWaiter()
elif return_when == FIRST_COMPLETED:
waiter = _FirstCompletedWaiter()
else:
pending_count = sum(
f._state not in [CANCELLED_AND_NOTIFIED, FINISHED] for f in fs)
if return_when == FIRST_EXCEPTION:
waiter = _AllCompletedWaiter(pending_count, stop_on_exception=True)
elif return_when == ALL_COMPLETED:
waiter = _AllCompletedWaiter(pending_count, stop_on_exception=False)
else:
raise ValueError("Invalid return condition: %r" % return_when)
for f in fs:
f._waiters.append(waiter)
return waiter
def as_completed(fs, timeout=None):
"""An iterator over the given futures that yields each as it completes.
Args:
fs: The sequence of Futures (possibly created by different Executors) to
iterate over.
timeout: The maximum number of seconds to wait. If None, then there
is no limit on the wait time.
Returns:
An iterator that yields the given Futures as they complete (finished or
cancelled).
Raises:
TimeoutError: If the entire result iterator could not be generated
before the given timeout.
"""
if timeout is not None:
end_time = timeout + time.time()
with _AcquireFutures(fs):
finished = set(
f for f in fs
if f._state in [CANCELLED_AND_NOTIFIED, FINISHED])
pending = set(fs) - finished
waiter = _create_and_install_waiters(fs, _AS_COMPLETED)
try:
for future in finished:
yield future
while pending:
if timeout is None:
wait_timeout = None
else:
wait_timeout = end_time - time.time()
if wait_timeout < 0:
raise TimeoutError(
'%d (of %d) futures unfinished' % (
len(pending), len(fs)))
waiter.event.wait(wait_timeout)
with waiter.lock:
finished = waiter.finished_futures
waiter.finished_futures = []
waiter.event.clear()
for future in finished:
yield future
pending.remove(future)
finally:
for f in fs:
f._waiters.remove(waiter)
DoneAndNotDoneFutures = collections.namedtuple(
'DoneAndNotDoneFutures', 'done not_done')
def wait(fs, timeout=None, return_when=ALL_COMPLETED):
"""Wait for the futures in the given sequence to complete.
Args:
fs: The sequence of Futures (possibly created by different Executors) to
wait upon.
timeout: The maximum number of seconds to wait. If None, then there
is no limit on the wait time.
return_when: Indicates when this function should return. The options
are:
FIRST_COMPLETED - Return when any future finishes or is
cancelled.
FIRST_EXCEPTION - Return when any future finishes by raising an
exception. If no future raises an exception
then it is equivalent to ALL_COMPLETED.
ALL_COMPLETED - Return when all futures finish or are cancelled.
Returns:
A named 2-tuple of sets. The first set, named 'done', contains the
futures that completed (is finished or cancelled) before the wait
completed. The second set, named 'not_done', contains uncompleted
futures.
"""
with _AcquireFutures(fs):
done = set(f for f in fs
if f._state in [CANCELLED_AND_NOTIFIED, FINISHED])
not_done = set(fs) - done
if (return_when == FIRST_COMPLETED) and done:
return DoneAndNotDoneFutures(done, not_done)
elif (return_when == FIRST_EXCEPTION) and done:
if any(f for f in done
if not f.cancelled() and f.exception() is not None):
return DoneAndNotDoneFutures(done, not_done)
if len(done) == len(fs):
return DoneAndNotDoneFutures(done, not_done)
waiter = _create_and_install_waiters(fs, return_when)
waiter.event.wait(timeout)
for f in fs:
f._waiters.remove(waiter)
done.update(waiter.finished_futures)
return DoneAndNotDoneFutures(done, set(fs) - done)
class Future(object):
"""Represents the result of an asynchronous computation."""
def __init__(self):
"""Initializes the future. Should not be called by clients."""
self._condition = threading.Condition()
self._state = PENDING
self._result = None
self._exception = None
self._waiters = []
self._done_callbacks = []
def _invoke_callbacks(self):
for callback in self._done_callbacks:
try:
callback(self)
except Exception:
LOGGER.exception('exception calling callback for %r', self)
def __repr__(self):
with self._condition:
if self._state == FINISHED:
if self._exception:
return '<Future at %s state=%s raised %s>' % (
hex(id(self)),
_STATE_TO_DESCRIPTION_MAP[self._state],
self._exception.__class__.__name__)
else:
return '<Future at %s state=%s returned %s>' % (
hex(id(self)),
_STATE_TO_DESCRIPTION_MAP[self._state],
self._result.__class__.__name__)
return '<Future at %s state=%s>' % (
hex(id(self)),
_STATE_TO_DESCRIPTION_MAP[self._state])
def cancel(self):
"""Cancel the future if possible.
Returns True if the future was cancelled, False otherwise. A future
cannot be cancelled if it is running or has already completed.
"""
with self._condition:
if self._state in [RUNNING, FINISHED]:
return False
if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]:
return True
self._state = CANCELLED
self._condition.notify_all()
self._invoke_callbacks()
return True
def cancelled(self):
"""Return True if the future has cancelled."""
with self._condition:
return self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]
def running(self):
"""Return True if the future is currently executing."""
with self._condition:
return self._state == RUNNING
def done(self):
"""Return True of the future was cancelled or finished executing."""
with self._condition:
return self._state in [CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED]
def __get_result(self):
if self._exception:
raise self._exception
else:
return self._result
def add_done_callback(self, fn):
"""Attaches a callable that will be called when the future finishes.
Args:
fn: A callable that will be called with this future as its only
argument when the future completes or is cancelled. The callable
will always be called by a thread in the same process in which
it was added. If the future has already completed or been
cancelled then the callable will be called immediately. These
callables are called in the order that they were added.
"""
with self._condition:
if self._state not in [CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED]:
self._done_callbacks.append(fn)
return
fn(self)
def result(self, timeout=None):
"""Return the result of the call that the future represents.
Args:
timeout: The number of seconds to wait for the result if the future
isn't done. If None, then there is no limit on the wait time.
Returns:
The result of the call that the future represents.
Raises:
CancelledError: If the future was cancelled.
TimeoutError: If the future didn't finish executing before the given
timeout.
Exception: If the call raised then that exception will be raised.
"""
with self._condition:
if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]:
raise CancelledError()
elif self._state == FINISHED:
return self.__get_result()
self._condition.wait(timeout)
if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]:
raise CancelledError()
elif self._state == FINISHED:
return self.__get_result()
else:
raise TimeoutError()
def exception(self, timeout=None):
"""Return the exception raised by the call that the future represents.
Args:
timeout: The number of seconds to wait for the exception if the
future isn't done. If None, then there is no limit on the wait
time.
Returns:
The exception raised by the call that the future represents or None
if the call completed without raising.
Raises:
CancelledError: If the future was cancelled.
TimeoutError: If the future didn't finish executing before the given
timeout.
"""
with self._condition:
if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]:
raise CancelledError()
elif self._state == FINISHED:
return self._exception
self._condition.wait(timeout)
if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]:
raise CancelledError()
elif self._state == FINISHED:
return self._exception
else:
raise TimeoutError()
# The following methods should only be used by Executors and in tests.
def set_running_or_notify_cancel(self):
"""Mark the future as running or process any cancel notifications.
Should only be used by Executor implementations and unit tests.
If the future has been cancelled (cancel() was called and returned
True) then any threads waiting on the future completing (though calls
to as_completed() or wait()) are notified and False is returned.
If the future was not cancelled then it is put in the running state
(future calls to running() will return True) and True is returned.
This method should be called by Executor implementations before
executing the work associated with this future. If this method returns
False then the work should not be executed.
Returns:
False if the Future was cancelled, True otherwise.
Raises:
RuntimeError: if this method was already called or if set_result()
or set_exception() was called.
"""
with self._condition:
if self._state == CANCELLED:
self._state = CANCELLED_AND_NOTIFIED
for waiter in self._waiters:
waiter.add_cancelled(self)
# self._condition.notify_all() is not necessary because
# self.cancel() triggers a notification.
return False
elif self._state == PENDING:
self._state = RUNNING
return True
else:
LOGGER.critical('Future %s in unexpected state: %s',
id(self.future),
self.future._state)
raise RuntimeError('Future in unexpected state')
def set_result(self, result):
"""Sets the return value of work associated with the future.
Should only be used by Executor implementations and unit tests.
"""
with self._condition:
self._result = result
self._state = FINISHED
for waiter in self._waiters:
waiter.add_result(self)
self._condition.notify_all()
self._invoke_callbacks()
def set_exception(self, exception):
"""Sets the result of the future as being the given exception.
Should only be used by Executor implementations and unit tests.
"""
with self._condition:
self._exception = exception
self._state = FINISHED
for waiter in self._waiters:
waiter.add_exception(self)
self._condition.notify_all()
self._invoke_callbacks()
class Executor(object):
"""This is an abstract base class for concrete asynchronous executors."""
def submit(self, fn, *args, **kwargs):
"""Submits a callable to be executed with the given arguments.
Schedules the callable to be executed as fn(*args, **kwargs) and returns
a Future instance representing the execution of the callable.
Returns:
A Future representing the given call.
"""
raise NotImplementedError()
def map(self, fn, *iterables, timeout=None):
"""Returns a iterator equivalent to map(fn, iter).
Args:
fn: A callable that will take take as many arguments as there are
passed iterables.
timeout: The maximum number of seconds to wait. If None, then there
is no limit on the wait time.
Returns:
An iterator equivalent to: map(func, *iterables) but the calls may
be evaluated out-of-order.
Raises:
TimeoutError: If the entire result iterator could not be generated
before the given timeout.
Exception: If fn(*args) raises for any values.
"""
if timeout is not None:
end_time = timeout + time.time()
fs = [self.submit(fn, *args) for args in zip(*iterables)]
try:
for future in fs:
if timeout is None:
yield future.result()
else:
yield future.result(end_time - time.time())
finally:
for future in fs:
future.cancel()
def shutdown(self, wait=True):
"""Clean-up the resources associated with the Executor.
It is safe to call this method several times. Otherwise, no other
methods can be called after this one.
Args:
wait: If True then shutdown will not return until all running
futures have finished executing and the resources used by the
executor have been reclaimed.
"""
pass
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.shutdown(wait=True)
return False

View File

@@ -1,18 +0,0 @@
#!/usr/bin/env python3
from distutils.core import setup
setup(name='futures3',
version='1.0',
description='Java-style futures implementation in Python 3.x',
author='Brian Quinlan',
author_email='brian@sweetapp.com',
url='http://code.google.com/p/pythonfutures',
download_url='http://pypi.python.org/pypi/futures3/',
packages=['futures'],
license='BSD',
classifiers=['License :: OSI Approved :: BSD License',
'Development Status :: 5 - Production/Stable',
'Intended Audience :: Developers',
'Programming Language :: Python :: 3']
)

View File

@@ -1,819 +0,0 @@
import io
import logging
import multiprocessing
import sys
import threading
import test.support
import time
import unittest
if sys.platform.startswith('win'):
import ctypes
import ctypes.wintypes
import futures
from futures._base import (
PENDING, RUNNING, CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED, Future,
LOGGER, STDERR_HANDLER, wait)
import futures.process
def create_future(state=PENDING, exception=None, result=None):
f = Future()
f._state = state
f._exception = exception
f._result = result
return f
PENDING_FUTURE = create_future(state=PENDING)
RUNNING_FUTURE = create_future(state=RUNNING)
CANCELLED_FUTURE = create_future(state=CANCELLED)
CANCELLED_AND_NOTIFIED_FUTURE = create_future(state=CANCELLED_AND_NOTIFIED)
EXCEPTION_FUTURE = create_future(state=FINISHED, exception=IOError())
SUCCESSFUL_FUTURE = create_future(state=FINISHED, result=42)
def mul(x, y):
return x * y
class Call(object):
"""A call that can be submitted to a future.Executor for testing.
The call signals when it is called and waits for an event before finishing.
"""
CALL_LOCKS = {}
def _create_event(self):
if sys.platform.startswith('win'):
class SECURITY_ATTRIBUTES(ctypes.Structure):
_fields_ = [("nLength", ctypes.wintypes.DWORD),
("lpSecurityDescriptor", ctypes.wintypes.LPVOID),
("bInheritHandle", ctypes.wintypes.BOOL)]
s = SECURITY_ATTRIBUTES()
s.nLength = ctypes.sizeof(s)
s.lpSecurityDescriptor = None
s.bInheritHandle = True
handle = ctypes.windll.kernel32.CreateEventA(ctypes.pointer(s),
True,
False,
None)
assert handle is not None
return handle
else:
event = multiprocessing.Event()
self.CALL_LOCKS[id(event)] = event
return id(event)
def _wait_on_event(self, handle):
if sys.platform.startswith('win'):
r = ctypes.windll.kernel32.WaitForSingleObject(handle, 5 * 1000)
assert r == 0
else:
self.CALL_LOCKS[handle].wait()
def _signal_event(self, handle):
if sys.platform.startswith('win'):
r = ctypes.windll.kernel32.SetEvent(handle)
assert r != 0
else:
self.CALL_LOCKS[handle].set()
def __init__(self, manual_finish=False, result=42):
self._called_event = self._create_event()
self._can_finish = self._create_event()
self._result = result
if not manual_finish:
self._signal_event(self._can_finish)
def wait_on_called(self):
self._wait_on_event(self._called_event)
def set_can(self):
self._signal_event(self._can_finish)
def __call__(self):
self._signal_event(self._called_event)
self._wait_on_event(self._can_finish)
return self._result
def close(self):
self.set_can()
if sys.platform.startswith('win'):
ctypes.windll.kernel32.CloseHandle(self._called_event)
ctypes.windll.kernel32.CloseHandle(self._can_finish)
else:
del self.CALL_LOCKS[self._called_event]
del self.CALL_LOCKS[self._can_finish]
class ExceptionCall(Call):
def __call__(self):
self._signal_event(self._called_event)
self._wait_on_event(self._can_finish)
raise ZeroDivisionError()
class MapCall(Call):
def __init__(self, result=42):
super().__init__(manual_finish=True, result=result)
def __call__(self, manual_finish):
if manual_finish:
super().__call__()
return self._result
class ExecutorShutdownTest(unittest.TestCase):
def test_run_after_shutdown(self):
self.executor.shutdown()
self.assertRaises(RuntimeError,
self.executor.submit,
pow, 2, 5)
def _start_some_futures(self):
call1 = Call(manual_finish=True)
call2 = Call(manual_finish=True)
call3 = Call(manual_finish=True)
try:
self.executor.submit(call1)
self.executor.submit(call2)
self.executor.submit(call3)
call1.wait_on_called()
call2.wait_on_called()
call3.wait_on_called()
call1.set_can()
call2.set_can()
call3.set_can()
finally:
call1.close()
call2.close()
call3.close()
class ThreadPoolShutdownTest(ExecutorShutdownTest):
def setUp(self):
self.executor = futures.ThreadPoolExecutor(max_workers=5)
def tearDown(self):
self.executor.shutdown(wait=True)
def test_threads_terminate(self):
self._start_some_futures()
self.assertEqual(len(self.executor._threads), 3)
self.executor.shutdown()
for t in self.executor._threads:
t.join()
def test_context_manager_shutdown(self):
with futures.ThreadPoolExecutor(max_workers=5) as e:
executor = e
self.assertEqual(list(e.map(abs, range(-5, 5))),
[5, 4, 3, 2, 1, 0, 1, 2, 3, 4])
for t in executor._threads:
t.join()
def test_del_shutdown(self):
executor = futures.ThreadPoolExecutor(max_workers=5)
executor.map(abs, range(-5, 5))
threads = executor._threads
del executor
for t in threads:
t.join()
class ProcessPoolShutdownTest(ExecutorShutdownTest):
def setUp(self):
self.executor = futures.ProcessPoolExecutor(max_workers=5)
def tearDown(self):
self.executor.shutdown(wait=True)
def test_processes_terminate(self):
self._start_some_futures()
self.assertEqual(len(self.executor._processes), 5)
processes = self.executor._processes
self.executor.shutdown()
for p in processes:
p.join()
def test_context_manager_shutdown(self):
with futures.ProcessPoolExecutor(max_workers=5) as e:
executor = e
self.assertEqual(list(e.map(abs, range(-5, 5))),
[5, 4, 3, 2, 1, 0, 1, 2, 3, 4])
for p in self.executor._processes:
p.join()
def test_del_shutdown(self):
executor = futures.ProcessPoolExecutor(max_workers=5)
list(executor.map(abs, range(-5, 5)))
queue_management_thread = executor._queue_management_thread
processes = executor._processes
del executor
queue_management_thread.join()
for p in processes:
p.join()
class WaitTests(unittest.TestCase):
def test_first_completed(self):
def wait_test():
while not future1._waiters:
pass
call1.set_can()
call1 = Call(manual_finish=True)
call2 = Call(manual_finish=True)
try:
future1 = self.executor.submit(call1)
future2 = self.executor.submit(call2)
t = threading.Thread(target=wait_test)
t.start()
done, not_done = futures.wait(
[CANCELLED_FUTURE, future1, future2],
return_when=futures.FIRST_COMPLETED)
self.assertEquals(set([future1]), done)
self.assertEquals(set([CANCELLED_FUTURE, future2]), not_done)
finally:
call1.close()
call2.close()
def test_first_completed_one_already_completed(self):
call1 = Call(manual_finish=True)
try:
future1 = self.executor.submit(call1)
finished, pending = futures.wait(
[SUCCESSFUL_FUTURE, future1],
return_when=futures.FIRST_COMPLETED)
self.assertEquals(set([SUCCESSFUL_FUTURE]), finished)
self.assertEquals(set([future1]), pending)
finally:
call1.close()
def test_first_exception(self):
def wait_test():
while not future1._waiters:
pass
call1.set_can()
call2.set_can()
call1 = Call(manual_finish=True)
call2 = ExceptionCall(manual_finish=True)
call3 = Call(manual_finish=True)
try:
future1 = self.executor.submit(call1)
future2 = self.executor.submit(call2)
future3 = self.executor.submit(call3)
t = threading.Thread(target=wait_test)
t.start()
finished, pending = futures.wait(
[future1, future2, future3],
return_when=futures.FIRST_EXCEPTION)
self.assertEquals(set([future1, future2]), finished)
self.assertEquals(set([future3]), pending)
finally:
call1.close()
call2.close()
call3.close()
def test_first_exception_some_already_complete(self):
def wait_test():
while not future1._waiters:
pass
call1.set_can()
call1 = ExceptionCall(manual_finish=True)
call2 = Call(manual_finish=True)
try:
future1 = self.executor.submit(call1)
future2 = self.executor.submit(call2)
t = threading.Thread(target=wait_test)
t.start()
finished, pending = futures.wait(
[SUCCESSFUL_FUTURE,
CANCELLED_FUTURE,
CANCELLED_AND_NOTIFIED_FUTURE,
future1, future2],
return_when=futures.FIRST_EXCEPTION)
self.assertEquals(set([SUCCESSFUL_FUTURE,
CANCELLED_AND_NOTIFIED_FUTURE,
future1]), finished)
self.assertEquals(set([CANCELLED_FUTURE, future2]), pending)
finally:
call1.close()
call2.close()
def test_first_exception_one_already_failed(self):
call1 = Call(manual_finish=True)
try:
future1 = self.executor.submit(call1)
finished, pending = futures.wait(
[EXCEPTION_FUTURE, future1],
return_when=futures.FIRST_EXCEPTION)
self.assertEquals(set([EXCEPTION_FUTURE]), finished)
self.assertEquals(set([future1]), pending)
finally:
call1.close()
def test_all_completed(self):
def wait_test():
while not future1._waiters:
pass
call1.set_can()
call2.set_can()
call1 = Call(manual_finish=True)
call2 = Call(manual_finish=True)
try:
future1 = self.executor.submit(call1)
future2 = self.executor.submit(call2)
t = threading.Thread(target=wait_test)
t.start()
finished, pending = futures.wait(
[future1, future2],
return_when=futures.ALL_COMPLETED)
self.assertEquals(set([future1, future2]), finished)
self.assertEquals(set(), pending)
finally:
call1.close()
call2.close()
def test_all_completed_some_already_completed(self):
def wait_test():
while not future1._waiters:
pass
future4.cancel()
call1.set_can()
call2.set_can()
call3.set_can()
self.assertLessEqual(
futures.process.EXTRA_QUEUED_CALLS,
1,
'this test assumes that future4 will be cancelled before it is '
'queued to run - which might not be the case if '
'ProcessPoolExecutor is too aggresive in scheduling futures')
call1 = Call(manual_finish=True)
call2 = Call(manual_finish=True)
call3 = Call(manual_finish=True)
call4 = Call(manual_finish=True)
try:
future1 = self.executor.submit(call1)
future2 = self.executor.submit(call2)
future3 = self.executor.submit(call3)
future4 = self.executor.submit(call4)
t = threading.Thread(target=wait_test)
t.start()
finished, pending = futures.wait(
[SUCCESSFUL_FUTURE,
CANCELLED_AND_NOTIFIED_FUTURE,
future1, future2, future3, future4],
return_when=futures.ALL_COMPLETED)
self.assertEquals(set([SUCCESSFUL_FUTURE,
CANCELLED_AND_NOTIFIED_FUTURE,
future1, future2, future3, future4]),
finished)
self.assertEquals(set(), pending)
finally:
call1.close()
call2.close()
call3.close()
call4.close()
def test_timeout(self):
def wait_test():
while not future1._waiters:
pass
call1.set_can()
call1 = Call(manual_finish=True)
call2 = Call(manual_finish=True)
try:
future1 = self.executor.submit(call1)
future2 = self.executor.submit(call2)
t = threading.Thread(target=wait_test)
t.start()
finished, pending = futures.wait(
[CANCELLED_AND_NOTIFIED_FUTURE,
EXCEPTION_FUTURE,
SUCCESSFUL_FUTURE,
future1, future2],
timeout=1,
return_when=futures.ALL_COMPLETED)
self.assertEquals(set([CANCELLED_AND_NOTIFIED_FUTURE,
EXCEPTION_FUTURE,
SUCCESSFUL_FUTURE,
future1]), finished)
self.assertEquals(set([future2]), pending)
finally:
call1.close()
call2.close()
class ThreadPoolWaitTests(WaitTests):
def setUp(self):
self.executor = futures.ThreadPoolExecutor(max_workers=1)
def tearDown(self):
self.executor.shutdown(wait=True)
class ProcessPoolWaitTests(WaitTests):
def setUp(self):
self.executor = futures.ProcessPoolExecutor(max_workers=1)
def tearDown(self):
self.executor.shutdown(wait=True)
class AsCompletedTests(unittest.TestCase):
# TODO(brian@sweetapp.com): Should have a test with a non-zero timeout.
def test_no_timeout(self):
def wait_test():
while not future1._waiters:
pass
call1.set_can()
call2.set_can()
call1 = Call(manual_finish=True)
call2 = Call(manual_finish=True)
try:
future1 = self.executor.submit(call1)
future2 = self.executor.submit(call2)
t = threading.Thread(target=wait_test)
t.start()
completed = set(futures.as_completed(
[CANCELLED_AND_NOTIFIED_FUTURE,
EXCEPTION_FUTURE,
SUCCESSFUL_FUTURE,
future1, future2]))
self.assertEquals(set(
[CANCELLED_AND_NOTIFIED_FUTURE,
EXCEPTION_FUTURE,
SUCCESSFUL_FUTURE,
future1, future2]),
completed)
finally:
call1.close()
call2.close()
def test_zero_timeout(self):
call1 = Call(manual_finish=True)
try:
future1 = self.executor.submit(call1)
completed_futures = set()
try:
for future in futures.as_completed(
[CANCELLED_AND_NOTIFIED_FUTURE,
EXCEPTION_FUTURE,
SUCCESSFUL_FUTURE,
future1],
timeout=0):
completed_futures.add(future)
except futures.TimeoutError:
pass
self.assertEquals(set([CANCELLED_AND_NOTIFIED_FUTURE,
EXCEPTION_FUTURE,
SUCCESSFUL_FUTURE]),
completed_futures)
finally:
call1.close()
class ThreadPoolAsCompletedTests(AsCompletedTests):
def setUp(self):
self.executor = futures.ThreadPoolExecutor(max_workers=1)
def tearDown(self):
self.executor.shutdown(wait=True)
class ProcessPoolAsCompletedTests(AsCompletedTests):
def setUp(self):
self.executor = futures.ProcessPoolExecutor(max_workers=1)
def tearDown(self):
self.executor.shutdown(wait=True)
class ExecutorTest(unittest.TestCase):
# Executor.shutdown() and context manager usage is tested by
# ExecutorShutdownTest.
def test_submit(self):
future = self.executor.submit(pow, 2, 8)
self.assertEquals(256, future.result())
def test_submit_keyword(self):
future = self.executor.submit(mul, 2, y=8)
self.assertEquals(16, future.result())
def test_map(self):
self.assertEqual(
list(self.executor.map(pow, range(10), range(10))),
list(map(pow, range(10), range(10))))
def test_map_exception(self):
i = self.executor.map(divmod, [1, 1, 1, 1], [2, 3, 0, 5])
self.assertEqual(i.__next__(), (0, 1))
self.assertEqual(i.__next__(), (0, 1))
self.assertRaises(ZeroDivisionError, i.__next__)
def test_map_timeout(self):
results = []
timeout_call = MapCall()
try:
try:
for i in self.executor.map(timeout_call,
[False, False, True],
timeout=1):
results.append(i)
except futures.TimeoutError:
pass
else:
self.fail('expected TimeoutError')
finally:
timeout_call.close()
self.assertEquals([42, 42], results)
class ThreadPoolExecutorTest(ExecutorTest):
def setUp(self):
self.executor = futures.ThreadPoolExecutor(max_workers=1)
def tearDown(self):
self.executor.shutdown(wait=True)
class ProcessPoolExecutorTest(ExecutorTest):
def setUp(self):
self.executor = futures.ProcessPoolExecutor(max_workers=1)
def tearDown(self):
self.executor.shutdown(wait=True)
class FutureTests(unittest.TestCase):
def test_done_callback_with_result(self):
callback_result = None
def fn(callback_future):
nonlocal callback_result
callback_result = callback_future.result()
f = Future()
f.add_done_callback(fn)
f.set_result(5)
self.assertEquals(5, callback_result)
def test_done_callback_with_exception(self):
callback_exception = None
def fn(callback_future):
nonlocal callback_exception
callback_exception = callback_future.exception()
f = Future()
f.add_done_callback(fn)
f.set_exception(Exception('test'))
self.assertEquals(('test',), callback_exception.args)
def test_done_callback_with_cancel(self):
was_cancelled = None
def fn(callback_future):
nonlocal was_cancelled
was_cancelled = callback_future.cancelled()
f = Future()
f.add_done_callback(fn)
self.assertTrue(f.cancel())
self.assertTrue(was_cancelled)
def test_done_callback_raises(self):
LOGGER.removeHandler(STDERR_HANDLER)
logging_stream = io.StringIO()
handler = logging.StreamHandler(logging_stream)
LOGGER.addHandler(handler)
try:
raising_was_called = False
fn_was_called = False
def raising_fn(callback_future):
nonlocal raising_was_called
raising_was_called = True
raise Exception('doh!')
def fn(callback_future):
nonlocal fn_was_called
fn_was_called = True
f = Future()
f.add_done_callback(raising_fn)
f.add_done_callback(fn)
f.set_result(5)
self.assertTrue(raising_was_called)
self.assertTrue(fn_was_called)
self.assertIn('Exception: doh!', logging_stream.getvalue())
finally:
LOGGER.removeHandler(handler)
LOGGER.addHandler(STDERR_HANDLER)
def test_done_callback_already_successful(self):
callback_result = None
def fn(callback_future):
nonlocal callback_result
callback_result = callback_future.result()
f = Future()
f.set_result(5)
f.add_done_callback(fn)
self.assertEquals(5, callback_result)
def test_done_callback_already_failed(self):
callback_exception = None
def fn(callback_future):
nonlocal callback_exception
callback_exception = callback_future.exception()
f = Future()
f.set_exception(Exception('test'))
f.add_done_callback(fn)
self.assertEquals(('test',), callback_exception.args)
def test_done_callback_already_cancelled(self):
was_cancelled = None
def fn(callback_future):
nonlocal was_cancelled
was_cancelled = callback_future.cancelled()
f = Future()
self.assertTrue(f.cancel())
f.add_done_callback(fn)
self.assertTrue(was_cancelled)
def test_repr(self):
self.assertRegexpMatches(repr(PENDING_FUTURE),
'<Future at 0x[0-9a-f]+ state=pending>')
self.assertRegexpMatches(repr(RUNNING_FUTURE),
'<Future at 0x[0-9a-f]+ state=running>')
self.assertRegexpMatches(repr(CANCELLED_FUTURE),
'<Future at 0x[0-9a-f]+ state=cancelled>')
self.assertRegexpMatches(repr(CANCELLED_AND_NOTIFIED_FUTURE),
'<Future at 0x[0-9a-f]+ state=cancelled>')
self.assertRegexpMatches(
repr(EXCEPTION_FUTURE),
'<Future at 0x[0-9a-f]+ state=finished raised IOError>')
self.assertRegexpMatches(
repr(SUCCESSFUL_FUTURE),
'<Future at 0x[0-9a-f]+ state=finished returned int>')
def test_cancel(self):
f1 = create_future(state=PENDING)
f2 = create_future(state=RUNNING)
f3 = create_future(state=CANCELLED)
f4 = create_future(state=CANCELLED_AND_NOTIFIED)
f5 = create_future(state=FINISHED, exception=IOError())
f6 = create_future(state=FINISHED, result=5)
self.assertTrue(f1.cancel())
self.assertEquals(f1._state, CANCELLED)
self.assertFalse(f2.cancel())
self.assertEquals(f2._state, RUNNING)
self.assertTrue(f3.cancel())
self.assertEquals(f3._state, CANCELLED)
self.assertTrue(f4.cancel())
self.assertEquals(f4._state, CANCELLED_AND_NOTIFIED)
self.assertFalse(f5.cancel())
self.assertEquals(f5._state, FINISHED)
self.assertFalse(f6.cancel())
self.assertEquals(f6._state, FINISHED)
def test_cancelled(self):
self.assertFalse(PENDING_FUTURE.cancelled())
self.assertFalse(RUNNING_FUTURE.cancelled())
self.assertTrue(CANCELLED_FUTURE.cancelled())
self.assertTrue(CANCELLED_AND_NOTIFIED_FUTURE.cancelled())
self.assertFalse(EXCEPTION_FUTURE.cancelled())
self.assertFalse(SUCCESSFUL_FUTURE.cancelled())
def test_done(self):
self.assertFalse(PENDING_FUTURE.done())
self.assertFalse(RUNNING_FUTURE.done())
self.assertTrue(CANCELLED_FUTURE.done())
self.assertTrue(CANCELLED_AND_NOTIFIED_FUTURE.done())
self.assertTrue(EXCEPTION_FUTURE.done())
self.assertTrue(SUCCESSFUL_FUTURE.done())
def test_running(self):
self.assertFalse(PENDING_FUTURE.running())
self.assertTrue(RUNNING_FUTURE.running())
self.assertFalse(CANCELLED_FUTURE.running())
self.assertFalse(CANCELLED_AND_NOTIFIED_FUTURE.running())
self.assertFalse(EXCEPTION_FUTURE.running())
self.assertFalse(SUCCESSFUL_FUTURE.running())
def test_result_with_timeout(self):
self.assertRaises(futures.TimeoutError,
PENDING_FUTURE.result, timeout=0)
self.assertRaises(futures.TimeoutError,
RUNNING_FUTURE.result, timeout=0)
self.assertRaises(futures.CancelledError,
CANCELLED_FUTURE.result, timeout=0)
self.assertRaises(futures.CancelledError,
CANCELLED_AND_NOTIFIED_FUTURE.result, timeout=0)
self.assertRaises(IOError, EXCEPTION_FUTURE.result, timeout=0)
self.assertEqual(SUCCESSFUL_FUTURE.result(timeout=0), 42)
def test_result_with_success(self):
# TODO(brian@sweetapp.com): This test is timing dependant.
def notification():
# Wait until the main thread is waiting for the result.
time.sleep(1)
f1.set_result(42)
f1 = create_future(state=PENDING)
t = threading.Thread(target=notification)
t.start()
self.assertEquals(f1.result(timeout=5), 42)
def test_result_with_cancel(self):
# TODO(brian@sweetapp.com): This test is timing dependant.
def notification():
# Wait until the main thread is waiting for the result.
time.sleep(1)
f1.cancel()
f1 = create_future(state=PENDING)
t = threading.Thread(target=notification)
t.start()
self.assertRaises(futures.CancelledError, f1.result, timeout=5)
def test_exception_with_timeout(self):
self.assertRaises(futures.TimeoutError,
PENDING_FUTURE.exception, timeout=0)
self.assertRaises(futures.TimeoutError,
RUNNING_FUTURE.exception, timeout=0)
self.assertRaises(futures.CancelledError,
CANCELLED_FUTURE.exception, timeout=0)
self.assertRaises(futures.CancelledError,
CANCELLED_AND_NOTIFIED_FUTURE.exception, timeout=0)
self.assertTrue(isinstance(EXCEPTION_FUTURE.exception(timeout=0),
IOError))
self.assertEqual(SUCCESSFUL_FUTURE.exception(timeout=0), None)
def test_exception_with_success(self):
def notification():
# Wait until the main thread is waiting for the exception.
time.sleep(1)
with f1._condition:
f1._state = FINISHED
f1._exception = IOError()
f1._condition.notify_all()
f1 = create_future(state=PENDING)
t = threading.Thread(target=notification)
t.start()
self.assertTrue(isinstance(f1.exception(timeout=5), IOError))
def test_main():
test.support.run_unittest(ProcessPoolExecutorTest,
ThreadPoolExecutorTest,
ProcessPoolWaitTests,
ThreadPoolWaitTests,
ProcessPoolAsCompletedTests,
ThreadPoolAsCompletedTests,
FutureTests,
ProcessPoolShutdownTest,
ThreadPoolShutdownTest)
if __name__ == "__main__":
test_main()

32
setup.py Executable file
View File

@@ -0,0 +1,32 @@
#!/usr/bin/env python
import sys
extras = {}
try:
from setuptools import setup
if sys.version_info < (2, 6):
extras['install_requires'] = ['multiprocessing']
except:
from distutils.core import setup
setup(name='futures',
version='2.1',
description='Backport of the concurrent.futures package from Python 3.2',
author='Brian Quinlan',
author_email='brian@sweetapp.com',
maintainer='Alex Gronholm',
maintainer_email='alex.gronholm+pypi@nextday.fi',
url='http://code.google.com/p/pythonfutures',
download_url='http://pypi.python.org/pypi/futures3/',
packages=['futures', 'concurrent.futures'],
license='BSD',
classifiers=['License :: OSI Approved :: BSD License',
'Development Status :: 5 - Production/Stable',
'Intended Audience :: Developers',
'Programming Language :: Python :: 2.5',
'Programming Language :: Python :: 2.6',
'Programming Language :: Python :: 2.7',
'Programming Language :: Python :: 3',
'Programming Language :: Python :: 3.1'],
**extras
)

View File

@@ -1,22 +1,33 @@
from __future__ import with_statement
import logging
import multiprocessing
import re
import StringIO
import sys
import threading
from test import test_support
import time
import unittest
try:
from StringIO import StringIO
except ImportError:
from io import StringIO
try:
from test.test_support import run_unittest
except ImportError:
from test.support import run_unittest
if sys.version_info < (3, 0):
next = lambda x: x.next()
if sys.platform.startswith('win'):
import ctypes
import ctypes.wintypes
import futures
from futures._base import (
from concurrent import futures
from concurrent.futures._base import (
PENDING, RUNNING, CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED, Future,
LOGGER, STDERR_HANDLER, wait)
import futures.process
LOGGER, STDERR_HANDLER)
def create_future(state=PENDING, exception=None, result=None):
f = Future()
@@ -539,9 +550,9 @@ class ExecutorTest(unittest.TestCase):
def test_map_exception(self):
i = self.executor.map(divmod, [1, 1, 1, 1], [2, 3, 0, 5])
self.assertEqual(i.next(), (0, 1))
self.assertEqual(i.next(), (0, 1))
self.assertRaises(ZeroDivisionError, i.next)
self.assertEqual(next(i), (0, 1))
self.assertEqual(next(i), (0, 1))
self.assertRaises(ZeroDivisionError, next, i)
def test_map_timeout(self):
results = []
@@ -608,7 +619,7 @@ class FutureTests(unittest.TestCase):
def test_done_callback_raises(self):
LOGGER.removeHandler(STDERR_HANDLER)
logging_stream = StringIO.StringIO()
logging_stream = StringIO()
handler = logging.StreamHandler(logging_stream)
LOGGER.addHandler(handler)
try:
@@ -679,7 +690,6 @@ class FutureTests(unittest.TestCase):
'<Future at 0x[0-9a-f]+L? state=finished returned int>',
repr(SUCCESSFUL_FUTURE)))
def test_cancel(self):
f1 = create_future(state=PENDING)
f2 = create_future(state=RUNNING)
@@ -797,15 +807,15 @@ class FutureTests(unittest.TestCase):
self.assertTrue(isinstance(f1.exception(timeout=5), IOError))
def test_main():
test_support.run_unittest(ProcessPoolExecutorTest,
ThreadPoolExecutorTest,
ProcessPoolWaitTests,
ThreadPoolWaitTests,
ProcessPoolAsCompletedTests,
ThreadPoolAsCompletedTests,
FutureTests,
ProcessPoolShutdownTest,
ThreadPoolShutdownTest)
run_unittest(ProcessPoolExecutorTest,
ThreadPoolExecutorTest,
ProcessPoolWaitTests,
ThreadPoolWaitTests,
ProcessPoolAsCompletedTests,
ThreadPoolAsCompletedTests,
FutureTests,
ProcessPoolShutdownTest,
ThreadPoolShutdownTest)
if __name__ == "__main__":
test_main()

11
tox.ini Normal file
View File

@@ -0,0 +1,11 @@
[tox]
envlist = py25,py26,py27,py31
[testenv]
commands={envpython} test_futures.py []
#[testenv:py24]
#deps=multiprocessing
#
#[testenv:py25]
#deps=multiprocessing