futurist/futurist/_thread.py
Joshua Harlow dc0862510f Begin adding our own thread pool executor
To be able to add extended functionality without having
to (just yet) upstream those changes into the stdlib, especially
around thread pool execution and tracking start to add our own
thread workers and the needed mechanics to clean them up
appropriately.

Change-Id: If9af0d905009435d91e7d8ac00005f4ca30bd987
2016-06-11 11:05:18 -07:00

147 lines
4.3 KiB
Python

# -*- coding: utf-8 -*-
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import atexit
import sys
import threading
import weakref
import six
from six.moves import queue as compat_queue
class Threading(object):
@staticmethod
def event_object(*args, **kwargs):
return threading.Event(*args, **kwargs)
@staticmethod
def lock_object(*args, **kwargs):
return threading.Lock(*args, **kwargs)
@staticmethod
def rlock_object(*args, **kwargs):
return threading.RLock(*args, **kwargs)
@staticmethod
def condition_object(*args, **kwargs):
return threading.Condition(*args, **kwargs)
_to_be_cleaned = weakref.WeakKeyDictionary()
_dying = False
if six.PY2:
# This ensures joining responds to keyboard interrupts.
join_thread = lambda thread: thread.join(sys.maxint)
else:
# Not needed on py3 or newer...
join_thread = lambda thread: thread.join()
_TOMBSTONE = object()
class ThreadWorker(threading.Thread):
MAX_IDLE_FOR = 1
def __init__(self, executor, work_queue):
super(ThreadWorker, self).__init__()
self.work_queue = work_queue
self.should_stop = False
self.idle = False
self.daemon = True
# Ensure that when the owning executor gets cleaned up that these
# threads also get shutdown (if they were not already shutdown).
self.executor_ref = weakref.ref(
executor, lambda _obj: work_queue.put(_TOMBSTONE))
@classmethod
def create_and_register(cls, executor, work_queue):
w = cls(executor, work_queue)
# Ensure that on shutdown, if threads still exist that we get
# around to cleaning them up and waiting for them to correctly stop.
#
# TODO(harlowja): use a weakrefset in the future, as we don't
# really care about the values...
_to_be_cleaned[w] = True
return w
def _is_dying(self):
if self.should_stop or _dying:
return True
executor = self.executor_ref()
if executor is None:
return True
# Avoid confusing the GC with cycles (since each executor
# references its known workers)...
del executor
return False
def _wait_for_work(self):
self.idle = True
work = None
while work is None:
try:
work = self.work_queue.get(True, self.MAX_IDLE_FOR)
except compat_queue.Empty:
if self._is_dying():
work = _TOMBSTONE
self.idle = False
return work
def stop(self, soon_as_possible=False):
if soon_as_possible:
# This will potentially leave unfinished work on queues.
self.should_stop = True
self.work_queue.put(_TOMBSTONE)
def run(self):
while not self._is_dying():
work = self._wait_for_work()
try:
if work is _TOMBSTONE:
# Ensure any other threads on the same queue also get
# the tombstone object...
self.work_queue.put(_TOMBSTONE)
return
else:
work.run()
finally:
# Avoid any potential (self) references to the work item
# in tracebacks or similar...
del work
def _clean_up():
"""Ensure all threads that were created were destroyed cleanly."""
global _dying
_dying = True
threads_to_wait_for = []
while _to_be_cleaned:
worker, _work_val = _to_be_cleaned.popitem()
worker.stop(soon_as_possible=True)
threads_to_wait_for.append(worker)
while threads_to_wait_for:
worker = threads_to_wait_for.pop()
try:
join_thread(worker)
finally:
del worker
atexit.register(_clean_up)