Begin adding our own thread pool executor

To be able to add extended functionality without having
to (just yet) upstream those changes into the stdlib, especially
around thread pool execution and tracking start to add our own
thread workers and the needed mechanics to clean them up
appropriately.

Change-Id: If9af0d905009435d91e7d8ac00005f4ca30bd987
This commit is contained in:
Joshua Harlow 2016-06-01 18:50:10 -07:00 committed by Joshua Harlow
parent 8166a435f7
commit dc0862510f
3 changed files with 191 additions and 37 deletions

View File

@ -19,10 +19,12 @@ import threading
from concurrent import futures as _futures
from concurrent.futures import process as _process
from concurrent.futures import thread as _thread
import six
from six.moves import queue as compat_queue
from futurist import _green
from futurist import _thread
from futurist import _utils
@ -37,25 +39,6 @@ class RejectedSubmission(Exception):
Future = _futures.Future
class _Threading(object):
@staticmethod
def event_object(*args, **kwargs):
return threading.Event(*args, **kwargs)
@staticmethod
def lock_object(*args, **kwargs):
return threading.Lock(*args, **kwargs)
@staticmethod
def rlock_object(*args, **kwargs):
return threading.RLock(*args, **kwargs)
@staticmethod
def condition_object(*args, **kwargs):
return threading.Condition(*args, **kwargs)
class _Gatherer(object):
def __init__(self, submit_func, lock_factory, start_before_submit=False):
self._submit_func = submit_func
@ -116,7 +99,7 @@ class _Gatherer(object):
return fut
class ThreadPoolExecutor(_thread.ThreadPoolExecutor):
class ThreadPoolExecutor(_futures.Executor):
"""Executor that uses a thread pool to execute calls asynchronously.
It gathers statistics about the submissions executed for post-analysis...
@ -124,13 +107,13 @@ class ThreadPoolExecutor(_thread.ThreadPoolExecutor):
See: https://docs.python.org/dev/library/concurrent.futures.html
"""
threading = _Threading()
threading = _thread.Threading()
def __init__(self, max_workers=None, check_and_reject=None):
"""Initializes a thread pool executor.
:param max_workers: maximum number of workers that can be
simulatenously active at the same time, further
simultaneously active at the same time, further
submitted work will be queued up when this limit
is reached.
:type max_workers: int
@ -146,21 +129,15 @@ class ThreadPoolExecutor(_thread.ThreadPoolExecutor):
"""
if max_workers is None:
max_workers = _utils.get_optimal_thread_count()
super(ThreadPoolExecutor, self).__init__(max_workers=max_workers)
if self._max_workers <= 0:
if max_workers <= 0:
raise ValueError("Max workers must be greater than zero")
# NOTE(harlowja): this replaces the parent classes non-reentrant lock
# with a reentrant lock so that we can correctly call into the check
# and reject lock, and that it will block correctly if another
# submit call is done during that...
self._max_workers = max_workers
self._work_queue = compat_queue.Queue()
self._shutdown_lock = threading.RLock()
self._shutdown = False
self._workers = []
self._check_and_reject = check_and_reject or (lambda e, waiting: None)
self._gatherer = _Gatherer(
# Since our submit will use this gatherer we have to reference
# the parent submit, bound to this instance (which is what we
# really want to use anyway).
super(ThreadPoolExecutor, self).submit,
self.threading.lock_object)
self._gatherer = _Gatherer(self._submit, self.threading.lock_object)
@property
def statistics(self):
@ -172,6 +149,35 @@ class ThreadPoolExecutor(_thread.ThreadPoolExecutor):
"""Accessor to determine if the executor is alive/active."""
return not self._shutdown
def _maybe_spin_up(self):
"""Spin up a worker if needed."""
if (not self._workers or
(len(self._workers) < self._max_workers and not
# Do more advanced idle checks in the future....
any(w.idle for w in self._workers))):
w = _thread.ThreadWorker.create_and_register(
self, self._work_queue)
# Always save it before we start (so that even if we fail
# starting it we can correctly join on it).
self._workers.append(w)
w.start()
def shutdown(self, wait=True):
with self._shutdown_lock:
if not self._shutdown:
self._shutdown = True
for w in self._workers:
w.stop()
if wait:
for w in self._workers:
_thread.join_thread(w)
def _submit(self, fn, *args, **kwargs):
f = Future()
self._maybe_spin_up()
self._work_queue.put(_utils.WorkItem(f, fn, args, kwargs))
return f
def submit(self, fn, *args, **kwargs):
"""Submit some work to be executed (and gather statistics)."""
with self._shutdown_lock:
@ -190,7 +196,7 @@ class ProcessPoolExecutor(_process.ProcessPoolExecutor):
See: https://docs.python.org/dev/library/concurrent.futures.html
"""
threading = _Threading()
threading = _thread.Threading()
def __init__(self, max_workers=None):
if max_workers is None:
@ -231,7 +237,7 @@ class SynchronousExecutor(_futures.Executor):
It gathers statistics about the submissions executed for post-analysis...
"""
threading = _Threading()
threading = _thread.Threading()
def __init__(self, green=False, run_work_func=lambda work: work.run()):
"""Synchronous executor constructor.

146
futurist/_thread.py Normal file
View File

@ -0,0 +1,146 @@
# -*- coding: utf-8 -*-
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import atexit
import sys
import threading
import weakref
import six
from six.moves import queue as compat_queue
class Threading(object):
@staticmethod
def event_object(*args, **kwargs):
return threading.Event(*args, **kwargs)
@staticmethod
def lock_object(*args, **kwargs):
return threading.Lock(*args, **kwargs)
@staticmethod
def rlock_object(*args, **kwargs):
return threading.RLock(*args, **kwargs)
@staticmethod
def condition_object(*args, **kwargs):
return threading.Condition(*args, **kwargs)
_to_be_cleaned = weakref.WeakKeyDictionary()
_dying = False
if six.PY2:
# This ensures joining responds to keyboard interrupts.
join_thread = lambda thread: thread.join(sys.maxint)
else:
# Not needed on py3 or newer...
join_thread = lambda thread: thread.join()
_TOMBSTONE = object()
class ThreadWorker(threading.Thread):
MAX_IDLE_FOR = 1
def __init__(self, executor, work_queue):
super(ThreadWorker, self).__init__()
self.work_queue = work_queue
self.should_stop = False
self.idle = False
self.daemon = True
# Ensure that when the owning executor gets cleaned up that these
# threads also get shutdown (if they were not already shutdown).
self.executor_ref = weakref.ref(
executor, lambda _obj: work_queue.put(_TOMBSTONE))
@classmethod
def create_and_register(cls, executor, work_queue):
w = cls(executor, work_queue)
# Ensure that on shutdown, if threads still exist that we get
# around to cleaning them up and waiting for them to correctly stop.
#
# TODO(harlowja): use a weakrefset in the future, as we don't
# really care about the values...
_to_be_cleaned[w] = True
return w
def _is_dying(self):
if self.should_stop or _dying:
return True
executor = self.executor_ref()
if executor is None:
return True
# Avoid confusing the GC with cycles (since each executor
# references its known workers)...
del executor
return False
def _wait_for_work(self):
self.idle = True
work = None
while work is None:
try:
work = self.work_queue.get(True, self.MAX_IDLE_FOR)
except compat_queue.Empty:
if self._is_dying():
work = _TOMBSTONE
self.idle = False
return work
def stop(self, soon_as_possible=False):
if soon_as_possible:
# This will potentially leave unfinished work on queues.
self.should_stop = True
self.work_queue.put(_TOMBSTONE)
def run(self):
while not self._is_dying():
work = self._wait_for_work()
try:
if work is _TOMBSTONE:
# Ensure any other threads on the same queue also get
# the tombstone object...
self.work_queue.put(_TOMBSTONE)
return
else:
work.run()
finally:
# Avoid any potential (self) references to the work item
# in tracebacks or similar...
del work
def _clean_up():
"""Ensure all threads that were created were destroyed cleanly."""
global _dying
_dying = True
threads_to_wait_for = []
while _to_be_cleaned:
worker, _work_val = _to_be_cleaned.popitem()
worker.stop(soon_as_possible=True)
threads_to_wait_for.append(worker)
while threads_to_wait_for:
worker = threads_to_wait_for.pop()
try:
join_thread(worker)
finally:
del worker
atexit.register(_clean_up)

View File

@ -31,6 +31,8 @@ except ImportError:
class WorkItem(object):
"""A thing to be executed by a executor."""
def __init__(self, future, fn, args, kwargs):
self.future = future
self.fn = fn