Redid the port from scratch using Python 3.2.5 as base

This commit is contained in:
Alex Gr?nholm 2013-06-24 01:20:47 +03:00
parent c0d77fd5bd
commit ecc0a67bb4
8 changed files with 515 additions and 598 deletions

11
.hgignore Normal file
View File

@ -0,0 +1,11 @@
syntax: glob
*.egg-info
syntax: regexp
^\.tox$
syntax: regexp
^\.project$
syntax: regexp
^\.pydevproject$
syntax: regexp
^dist$

View File

@ -1,3 +1,9 @@
2.1.4
=====
- Ported the library again from Python 3.2.5 to get the latest bug fixes
2.1.3 2.1.3
===== =====

View File

@ -2,7 +2,6 @@
# Licensed to PSF under a Contributor Agreement. # Licensed to PSF under a Contributor Agreement.
from __future__ import with_statement from __future__ import with_statement
import functools
import logging import logging
import threading import threading
import time import time
@ -46,8 +45,6 @@ _STATE_TO_DESCRIPTION_MAP = {
# Logger for internal use by the futures package. # Logger for internal use by the futures package.
LOGGER = logging.getLogger("concurrent.futures") LOGGER = logging.getLogger("concurrent.futures")
STDERR_HANDLER = logging.StreamHandler()
LOGGER.addHandler(STDERR_HANDLER)
class Error(Exception): class Error(Exception):
"""Base class for all future-related exceptions.""" """Base class for all future-related exceptions."""
@ -119,11 +116,14 @@ class _AllCompletedWaiter(_Waiter):
def __init__(self, num_pending_calls, stop_on_exception): def __init__(self, num_pending_calls, stop_on_exception):
self.num_pending_calls = num_pending_calls self.num_pending_calls = num_pending_calls
self.stop_on_exception = stop_on_exception self.stop_on_exception = stop_on_exception
self.lock = threading.Lock()
super(_AllCompletedWaiter, self).__init__() super(_AllCompletedWaiter, self).__init__()
def _decrement_pending_calls(self): def _decrement_pending_calls(self):
if self.num_pending_calls == len(self.finished_futures): with self.lock:
self.event.set() self.num_pending_calls -= 1
if not self.num_pending_calls:
self.event.set()
def add_result(self, future): def add_result(self, future):
super(_AllCompletedWaiter, self).add_result(future) super(_AllCompletedWaiter, self).add_result(future)
@ -523,7 +523,7 @@ class Executor(object):
"""Returns a iterator equivalent to map(fn, iter). """Returns a iterator equivalent to map(fn, iter).
Args: Args:
fn: A callable that will take take as many arguments as there are fn: A callable that will take as many arguments as there are
passed iterables. passed iterables.
timeout: The maximum number of seconds to wait. If None, then there timeout: The maximum number of seconds to wait. If None, then there
is no limit on the wait time. is no limit on the wait time.

View File

@ -73,28 +73,17 @@ __author__ = 'Brian Quinlan (brian@sweetapp.com)'
# workers to exit when their work queues are empty and then waits until the # workers to exit when their work queues are empty and then waits until the
# threads/processes finish. # threads/processes finish.
_thread_references = set() _threads_queues = weakref.WeakKeyDictionary()
_shutdown = False _shutdown = False
def _python_exit(): def _python_exit():
global _shutdown global _shutdown
_shutdown = True _shutdown = True
for thread_reference in _thread_references: items = list(_threads_queues.items())
thread = thread_reference() for t, q in items:
if thread is not None: q.put(None)
thread.join() for t, q in items:
t.join()
def _remove_dead_thread_references():
"""Remove inactive threads from _thread_references.
Should be called periodically to prevent memory leaks in scenarios such as:
>>> while True:
>>> ... t = ThreadPoolExecutor(max_workers=5)
>>> ... t.map(int, ['1', '2', '3', '4', '5'])
"""
for thread_reference in set(_thread_references):
if thread_reference() is None:
_thread_references.discard(thread_reference)
# Controls how many more calls than processes will be queued in the call queue. # Controls how many more calls than processes will be queued in the call queue.
# A smaller number will mean that processes spend more time idle waiting for # A smaller number will mean that processes spend more time idle waiting for
@ -122,10 +111,10 @@ class _CallItem(object):
self.args = args self.args = args
self.kwargs = kwargs self.kwargs = kwargs
def _process_worker(call_queue, result_queue, shutdown): def _process_worker(call_queue, result_queue):
"""Evaluates calls from call_queue and places the results in result_queue. """Evaluates calls from call_queue and places the results in result_queue.
This worker is run in a seperate process. This worker is run in a separate process.
Args: Args:
call_queue: A multiprocessing.Queue of _CallItems that will be read and call_queue: A multiprocessing.Queue of _CallItems that will be read and
@ -136,21 +125,20 @@ def _process_worker(call_queue, result_queue, shutdown):
worker that it should exit when call_queue is empty. worker that it should exit when call_queue is empty.
""" """
while True: while True:
call_item = call_queue.get(block=True)
if call_item is None:
# Wake up queue management thread
result_queue.put(None)
return
try: try:
call_item = call_queue.get(block=True, timeout=0.1) r = call_item.fn(*call_item.args, **call_item.kwargs)
except queue.Empty: except BaseException:
if shutdown.is_set(): e = sys.exc_info()[1]
return result_queue.put(_ResultItem(call_item.work_id,
exception=e))
else: else:
try: result_queue.put(_ResultItem(call_item.work_id,
r = call_item.fn(*call_item.args, **call_item.kwargs) result=r))
except BaseException:
e = sys.exc_info()[1]
result_queue.put(_ResultItem(call_item.work_id,
exception=e))
else:
result_queue.put(_ResultItem(call_item.work_id,
result=r))
def _add_call_item_to_queue(pending_work_items, def _add_call_item_to_queue(pending_work_items,
work_ids, work_ids,
@ -189,13 +177,12 @@ def _add_call_item_to_queue(pending_work_items,
del pending_work_items[work_id] del pending_work_items[work_id]
continue continue
def _queue_manangement_worker(executor_reference, def _queue_management_worker(executor_reference,
processes, processes,
pending_work_items, pending_work_items,
work_ids_queue, work_ids_queue,
call_queue, call_queue,
result_queue, result_queue):
shutdown_process_event):
"""Manages the communication between this process and the worker processes. """Manages the communication between this process and the worker processes.
This function is run in a local thread. This function is run in a local thread.
@ -213,37 +200,19 @@ def _queue_manangement_worker(executor_reference,
derived from _WorkItems for processing by the process workers. derived from _WorkItems for processing by the process workers.
result_queue: A multiprocessing.Queue of _ResultItems generated by the result_queue: A multiprocessing.Queue of _ResultItems generated by the
process workers. process workers.
shutdown_process_event: A multiprocessing.Event used to signal the
process workers that they should exit when their work queue is
empty.
""" """
nb_shutdown_processes = [0]
def shutdown_one_process():
"""Tell a worker to terminate, which will in turn wake us again"""
call_queue.put(None)
nb_shutdown_processes[0] += 1
while True: while True:
_add_call_item_to_queue(pending_work_items, _add_call_item_to_queue(pending_work_items,
work_ids_queue, work_ids_queue,
call_queue) call_queue)
try: result_item = result_queue.get(block=True)
result_item = result_queue.get(block=True, timeout=0.1) if result_item is not None:
except queue.Empty:
executor = executor_reference()
# No more work items can be added if:
# - The interpreter is shutting down OR
# - The executor that owns this worker has been collected OR
# - The executor that owns this worker has been shutdown.
if _shutdown or executor is None or executor._shutdown_thread:
# Since no new work items can be added, it is safe to shutdown
# this thread if there are no pending work items.
if not pending_work_items:
shutdown_process_event.set()
# If .join() is not called on the created processes then
# some multiprocessing.Queue methods may deadlock on Mac OS
# X.
for p in processes:
p.join()
return
del executor
else:
work_item = pending_work_items[result_item.work_id] work_item = pending_work_items[result_item.work_id]
del pending_work_items[result_item.work_id] del pending_work_items[result_item.work_id]
@ -251,6 +220,51 @@ def _queue_manangement_worker(executor_reference,
work_item.future.set_exception(result_item.exception) work_item.future.set_exception(result_item.exception)
else: else:
work_item.future.set_result(result_item.result) work_item.future.set_result(result_item.result)
# Check whether we should start shutting down.
executor = executor_reference()
# No more work items can be added if:
# - The interpreter is shutting down OR
# - The executor that owns this worker has been collected OR
# - The executor that owns this worker has been shutdown.
if _shutdown or executor is None or executor._shutdown_thread:
# Since no new work items can be added, it is safe to shutdown
# this thread if there are no pending work items.
if not pending_work_items:
while nb_shutdown_processes[0] < len(processes):
shutdown_one_process()
# If .join() is not called on the created processes then
# some multiprocessing.Queue methods may deadlock on Mac OS
# X.
for p in processes:
p.join()
call_queue.close()
return
del executor
_system_limits_checked = False
_system_limited = None
def _check_system_limits():
global _system_limits_checked, _system_limited
if _system_limits_checked:
if _system_limited:
raise NotImplementedError(_system_limited)
_system_limits_checked = True
try:
import os
nsems_max = os.sysconf("SC_SEM_NSEMS_MAX")
except (AttributeError, ValueError):
# sysconf not available or setting not available
return
if nsems_max == -1:
# indetermine limit, assume that limit is determined
# by available memory only
return
if nsems_max >= 256:
# minimum number of semaphores available
# according to POSIX
return
_system_limited = "system provides too few semaphores (%d available, 256 necessary)" % nsems_max
raise NotImplementedError(_system_limited)
class ProcessPoolExecutor(_base.Executor): class ProcessPoolExecutor(_base.Executor):
def __init__(self, max_workers=None): def __init__(self, max_workers=None):
@ -261,7 +275,7 @@ class ProcessPoolExecutor(_base.Executor):
execute the given calls. If None or not given then as many execute the given calls. If None or not given then as many
worker processes will be created as the machine has processors. worker processes will be created as the machine has processors.
""" """
_remove_dead_thread_references() _check_system_limits()
if max_workers is None: if max_workers is None:
self._max_workers = multiprocessing.cpu_count() self._max_workers = multiprocessing.cpu_count()
@ -280,33 +294,34 @@ class ProcessPoolExecutor(_base.Executor):
# Shutdown is a two-step process. # Shutdown is a two-step process.
self._shutdown_thread = False self._shutdown_thread = False
self._shutdown_process_event = multiprocessing.Event()
self._shutdown_lock = threading.Lock() self._shutdown_lock = threading.Lock()
self._queue_count = 0 self._queue_count = 0
self._pending_work_items = {} self._pending_work_items = {}
def _start_queue_management_thread(self): def _start_queue_management_thread(self):
# When the executor gets lost, the weakref callback will wake up
# the queue management thread.
def weakref_cb(_, q=self._result_queue):
q.put(None)
if self._queue_management_thread is None: if self._queue_management_thread is None:
self._queue_management_thread = threading.Thread( self._queue_management_thread = threading.Thread(
target=_queue_manangement_worker, target=_queue_management_worker,
args=(weakref.ref(self), args=(weakref.ref(self, weakref_cb),
self._processes, self._processes,
self._pending_work_items, self._pending_work_items,
self._work_ids, self._work_ids,
self._call_queue, self._call_queue,
self._result_queue, self._result_queue))
self._shutdown_process_event))
self._queue_management_thread.daemon = True self._queue_management_thread.daemon = True
self._queue_management_thread.start() self._queue_management_thread.start()
_thread_references.add(weakref.ref(self._queue_management_thread)) _threads_queues[self._queue_management_thread] = self._result_queue
def _adjust_process_count(self): def _adjust_process_count(self):
for _ in range(len(self._processes), self._max_workers): for _ in range(len(self._processes), self._max_workers):
p = multiprocessing.Process( p = multiprocessing.Process(
target=_process_worker, target=_process_worker,
args=(self._call_queue, args=(self._call_queue,
self._result_queue, self._result_queue))
self._shutdown_process_event))
p.start() p.start()
self._processes.add(p) self._processes.add(p)
@ -321,6 +336,8 @@ class ProcessPoolExecutor(_base.Executor):
self._pending_work_items[self._queue_count] = w self._pending_work_items[self._queue_count] = w
self._work_ids.put(self._queue_count) self._work_ids.put(self._queue_count)
self._queue_count += 1 self._queue_count += 1
# Wake up queue management thread
self._result_queue.put(None)
self._start_queue_management_thread() self._start_queue_management_thread()
self._adjust_process_count() self._adjust_process_count()
@ -330,15 +347,16 @@ class ProcessPoolExecutor(_base.Executor):
def shutdown(self, wait=True): def shutdown(self, wait=True):
with self._shutdown_lock: with self._shutdown_lock:
self._shutdown_thread = True self._shutdown_thread = True
if wait: if self._queue_management_thread:
if self._queue_management_thread: # Wake up queue management thread
self._result_queue.put(None)
if wait:
self._queue_management_thread.join() self._queue_management_thread.join()
# To reduce the risk of openning too many files, remove references to # To reduce the risk of openning too many files, remove references to
# objects that use file descriptors. # objects that use file descriptors.
self._queue_management_thread = None self._queue_management_thread = None
self._call_queue = None self._call_queue = None
self._result_queue = None self._result_queue = None
self._shutdown_process_event = None
self._processes = None self._processes = None
shutdown.__doc__ = _base.Executor.shutdown.__doc__ shutdown.__doc__ = _base.Executor.shutdown.__doc__

View File

@ -32,28 +32,17 @@ __author__ = 'Brian Quinlan (brian@sweetapp.com)'
# workers to exit when their work queues are empty and then waits until the # workers to exit when their work queues are empty and then waits until the
# threads finish. # threads finish.
_thread_references = set() _threads_queues = weakref.WeakKeyDictionary()
_shutdown = False _shutdown = False
def _python_exit(): def _python_exit():
global _shutdown global _shutdown
_shutdown = True _shutdown = True
for thread_reference in _thread_references: items = list(_threads_queues.items())
thread = thread_reference() for t, q in items:
if thread is not None: q.put(None)
thread.join() for t, q in items:
t.join()
def _remove_dead_thread_references():
"""Remove inactive threads from _thread_references.
Should be called periodically to prevent memory leaks in scenarios such as:
>>> while True:
... t = ThreadPoolExecutor(max_workers=5)
... t.map(int, ['1', '2', '3', '4', '5'])
"""
for thread_reference in set(_thread_references):
if thread_reference() is None:
_thread_references.discard(thread_reference)
atexit.register(_python_exit) atexit.register(_python_exit)
@ -79,19 +68,20 @@ class _WorkItem(object):
def _worker(executor_reference, work_queue): def _worker(executor_reference, work_queue):
try: try:
while True: while True:
try: work_item = work_queue.get(block=True)
work_item = work_queue.get(block=True, timeout=0.1) if work_item is not None:
except queue.Empty:
executor = executor_reference()
# Exit if:
# - The interpreter is shutting down OR
# - The executor that owns the worker has been collected OR
# - The executor that owns the worker has been shutdown.
if _shutdown or executor is None or executor._shutdown:
return
del executor
else:
work_item.run() work_item.run()
continue
executor = executor_reference()
# Exit if:
# - The interpreter is shutting down OR
# - The executor that owns the worker has been collected OR
# - The executor that owns the worker has been shutdown.
if _shutdown or executor is None or executor._shutdown:
# Notice other workers
work_queue.put(None)
return
del executor
except BaseException: except BaseException:
_base.LOGGER.critical('Exception in worker', exc_info=True) _base.LOGGER.critical('Exception in worker', exc_info=True)
@ -103,8 +93,6 @@ class ThreadPoolExecutor(_base.Executor):
max_workers: The maximum number of threads that can be used to max_workers: The maximum number of threads that can be used to
execute the given calls. execute the given calls.
""" """
_remove_dead_thread_references()
self._max_workers = max_workers self._max_workers = max_workers
self._work_queue = queue.Queue() self._work_queue = queue.Queue()
self._threads = set() self._threads = set()
@ -125,19 +113,25 @@ class ThreadPoolExecutor(_base.Executor):
submit.__doc__ = _base.Executor.submit.__doc__ submit.__doc__ = _base.Executor.submit.__doc__
def _adjust_thread_count(self): def _adjust_thread_count(self):
# When the executor gets lost, the weakref callback will wake up
# the worker threads.
def weakref_cb(_, q=self._work_queue):
q.put(None)
# TODO(bquinlan): Should avoid creating new threads if there are more # TODO(bquinlan): Should avoid creating new threads if there are more
# idle threads than items in the work queue. # idle threads than items in the work queue.
if len(self._threads) < self._max_workers: if len(self._threads) < self._max_workers:
t = threading.Thread(target=_worker, t = threading.Thread(target=_worker,
args=(weakref.ref(self), self._work_queue)) args=(weakref.ref(self, weakref_cb),
self._work_queue))
t.daemon = True t.daemon = True
t.start() t.start()
self._threads.add(t) self._threads.add(t)
_thread_references.add(weakref.ref(t)) _threads_queues[t] = self._work_queue
def shutdown(self, wait=True): def shutdown(self, wait=True):
with self._shutdown_lock: with self._shutdown_lock:
self._shutdown = True self._shutdown = True
self._work_queue.put(None)
if wait: if wait:
for t in self._threads: for t in self._threads:
t.join() t.join()

View File

@ -11,7 +11,7 @@ except ImportError:
from distutils.core import setup from distutils.core import setup
setup(name='futures', setup(name='futures',
version='2.1.3', version='2.1.4',
description='Backport of the concurrent.futures package from Python 3.2', description='Backport of the concurrent.futures package from Python 3.2',
author='Brian Quinlan', author='Brian Quinlan',
author_email='brian@sweetapp.com', author_email='brian@sweetapp.com',

File diff suppressed because it is too large Load Diff

View File

@ -1,11 +1,8 @@
[tox] [tox]
envlist = py25,py26,py27,py31 envlist = py26,py27,py31
[testenv] [testenv]
commands={envpython} test_futures.py [] commands={envpython} test_futures.py []
#[testenv:py24] [testenv:py26]
#deps=multiprocessing deps=unittest2
#
#[testenv:py25]
#deps=multiprocessing