Move most of green helper classes -> '_green.py'
Instead of having the green executor(s) helper classes be inline with the rest of the other executors move its helper classes/objects into its own module and use them from there instead. This also moves the work item helper into '_utils.py' for similar/equivalent reasons. Change-Id: I028738bf7d64f97320450d63debbd5116b505a72
This commit is contained in:
parent
3d604eb332
commit
711e5bb9d5
@ -15,24 +15,13 @@
|
||||
# under the License.
|
||||
|
||||
import functools
|
||||
import sys
|
||||
import threading
|
||||
|
||||
from concurrent import futures as _futures
|
||||
from concurrent.futures import process as _process
|
||||
from concurrent.futures import thread as _thread
|
||||
import six
|
||||
|
||||
try:
|
||||
from eventlet import greenpool
|
||||
from eventlet import patcher as greenpatcher
|
||||
from eventlet import queue as greenqueue
|
||||
|
||||
from eventlet.green import threading as greenthreading
|
||||
except ImportError:
|
||||
greenpatcher, greenpool, greenqueue, greenthreading = (None, None,
|
||||
None, None)
|
||||
|
||||
from futurist import _green
|
||||
from futurist import _utils
|
||||
|
||||
|
||||
@ -227,56 +216,6 @@ class ProcessPoolExecutor(_process.ProcessPoolExecutor):
|
||||
return self._gatherer.submit(fn, *args, **kwargs)
|
||||
|
||||
|
||||
class _WorkItem(object):
|
||||
def __init__(self, future, fn, args, kwargs):
|
||||
self.future = future
|
||||
self.fn = fn
|
||||
self.args = args
|
||||
self.kwargs = kwargs
|
||||
|
||||
def run(self):
|
||||
if not self.future.set_running_or_notify_cancel():
|
||||
return
|
||||
try:
|
||||
result = self.fn(*self.args, **self.kwargs)
|
||||
except BaseException:
|
||||
exc_type, exc_value, exc_tb = sys.exc_info()
|
||||
try:
|
||||
if six.PY2:
|
||||
self.future.set_exception_info(exc_value, exc_tb)
|
||||
else:
|
||||
self.future.set_exception(exc_value)
|
||||
finally:
|
||||
del(exc_type, exc_value, exc_tb)
|
||||
else:
|
||||
self.future.set_result(result)
|
||||
|
||||
|
||||
if _utils.EVENTLET_AVAILABLE:
|
||||
|
||||
class _GreenThreading(object):
|
||||
|
||||
@staticmethod
|
||||
def event_object(*args, **kwargs):
|
||||
return greenthreading.Event(*args, **kwargs)
|
||||
|
||||
@staticmethod
|
||||
def lock_object(*args, **kwargs):
|
||||
return greenthreading.Lock(*args, **kwargs)
|
||||
|
||||
@staticmethod
|
||||
def rlock_object(*args, **kwargs):
|
||||
return greenthreading.RLock(*args, **kwargs)
|
||||
|
||||
@staticmethod
|
||||
def condition_object(*args, **kwargs):
|
||||
return greenthreading.Condition(*args, **kwargs)
|
||||
|
||||
_green_threading = _GreenThreading()
|
||||
else:
|
||||
_green_threading = None
|
||||
|
||||
|
||||
class SynchronousExecutor(_futures.Executor):
|
||||
"""Executor that uses the caller to execute calls synchronously.
|
||||
|
||||
@ -303,7 +242,7 @@ class SynchronousExecutor(_futures.Executor):
|
||||
' synchronous executor')
|
||||
self._shutoff = False
|
||||
if green:
|
||||
self.threading = _green_threading
|
||||
self.threading = _green.threading
|
||||
self._future_cls = GreenFuture
|
||||
else:
|
||||
self._future_cls = Future
|
||||
@ -342,36 +281,11 @@ class SynchronousExecutor(_futures.Executor):
|
||||
|
||||
def _submit(self, fn, *args, **kwargs):
|
||||
fut = self._future_cls()
|
||||
runner = _WorkItem(fut, fn, args, kwargs)
|
||||
runner = _utils.WorkItem(fut, fn, args, kwargs)
|
||||
runner.run()
|
||||
return fut
|
||||
|
||||
|
||||
class _GreenWorker(object):
|
||||
def __init__(self, work, work_queue):
|
||||
self.work = work
|
||||
self.work_queue = work_queue
|
||||
|
||||
def __call__(self):
|
||||
# Run our main piece of work.
|
||||
try:
|
||||
self.work.run()
|
||||
finally:
|
||||
# Consume any delayed work before finishing (this is how we finish
|
||||
# work that was to big for the pool size, but needs to be finished
|
||||
# no matter).
|
||||
while True:
|
||||
try:
|
||||
w = self.work_queue.get_nowait()
|
||||
except greenqueue.Empty:
|
||||
break
|
||||
else:
|
||||
try:
|
||||
w.run()
|
||||
finally:
|
||||
self.work_queue.task_done()
|
||||
|
||||
|
||||
class GreenFuture(Future):
|
||||
__doc__ = Future.__doc__
|
||||
|
||||
@ -384,8 +298,8 @@ class GreenFuture(Future):
|
||||
# functions will correctly yield to eventlet. If this is not done then
|
||||
# waiting on the future never actually causes the greenthreads to run
|
||||
# and thus you wait for infinity.
|
||||
if not greenpatcher.is_monkey_patched('threading'):
|
||||
self._condition = greenthreading.Condition()
|
||||
if not _green.is_monkey_patched('threading'):
|
||||
self._condition = _green.threading.condition_object()
|
||||
|
||||
|
||||
class GreenThreadPoolExecutor(_futures.Executor):
|
||||
@ -398,7 +312,7 @@ class GreenThreadPoolExecutor(_futures.Executor):
|
||||
It gathers statistics about the submissions executed for post-analysis...
|
||||
"""
|
||||
|
||||
threading = _green_threading
|
||||
threading = _green.threading
|
||||
|
||||
def __init__(self, max_workers=1000, check_and_reject=None):
|
||||
"""Initializes a green thread pool executor.
|
||||
@ -423,10 +337,10 @@ class GreenThreadPoolExecutor(_futures.Executor):
|
||||
if max_workers <= 0:
|
||||
raise ValueError("Max workers must be greater than zero")
|
||||
self._max_workers = max_workers
|
||||
self._pool = greenpool.GreenPool(self._max_workers)
|
||||
self._delayed_work = greenqueue.Queue()
|
||||
self._pool = _green.Pool(self._max_workers)
|
||||
self._delayed_work = _green.Queue()
|
||||
self._check_and_reject = check_and_reject or (lambda e, waiting: None)
|
||||
self._shutdown_lock = greenthreading.Lock()
|
||||
self._shutdown_lock = self.threading.lock_object()
|
||||
self._shutdown = False
|
||||
self._gatherer = _Gatherer(self._submit,
|
||||
self.threading.lock_object)
|
||||
@ -458,7 +372,7 @@ class GreenThreadPoolExecutor(_futures.Executor):
|
||||
|
||||
def _submit(self, fn, *args, **kwargs):
|
||||
f = GreenFuture()
|
||||
work = _WorkItem(f, fn, args, kwargs)
|
||||
work = _utils.WorkItem(f, fn, args, kwargs)
|
||||
if not self._spin_up(work):
|
||||
self._delayed_work.put(work)
|
||||
return f
|
||||
@ -472,7 +386,7 @@ class GreenThreadPoolExecutor(_futures.Executor):
|
||||
"""
|
||||
alive = self._pool.running() + self._pool.waiting()
|
||||
if alive < self._max_workers:
|
||||
self._pool.spawn_n(_GreenWorker(work, self._delayed_work))
|
||||
self._pool.spawn_n(_green.GreenWorker(work, self._delayed_work))
|
||||
return True
|
||||
return False
|
||||
|
||||
|
84
futurist/_green.py
Normal file
84
futurist/_green.py
Normal file
@ -0,0 +1,84 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
# Copyright (C) 2015 Yahoo! Inc. All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
from futurist import _utils
|
||||
|
||||
try:
|
||||
from eventlet import greenpool
|
||||
from eventlet import patcher as greenpatcher
|
||||
from eventlet import queue as greenqueue
|
||||
|
||||
from eventlet.green import threading as greenthreading
|
||||
except ImportError:
|
||||
greenpatcher, greenpool, greenqueue, greenthreading = (None, None,
|
||||
None, None)
|
||||
|
||||
|
||||
if _utils.EVENTLET_AVAILABLE:
|
||||
# Aliases that we use and only expose vs the whole of eventlet...
|
||||
Pool = greenpool.GreenPool
|
||||
Queue = greenqueue.Queue
|
||||
is_monkey_patched = greenpatcher.is_monkey_patched
|
||||
|
||||
class GreenThreading(object):
|
||||
|
||||
@staticmethod
|
||||
def event_object(*args, **kwargs):
|
||||
return greenthreading.Event(*args, **kwargs)
|
||||
|
||||
@staticmethod
|
||||
def lock_object(*args, **kwargs):
|
||||
return greenthreading.Lock(*args, **kwargs)
|
||||
|
||||
@staticmethod
|
||||
def rlock_object(*args, **kwargs):
|
||||
return greenthreading.RLock(*args, **kwargs)
|
||||
|
||||
@staticmethod
|
||||
def condition_object(*args, **kwargs):
|
||||
return greenthreading.Condition(*args, **kwargs)
|
||||
|
||||
threading = GreenThreading()
|
||||
else:
|
||||
threading = None
|
||||
Pool = None
|
||||
Queue = None
|
||||
is_monkey_patched = lambda mod: False
|
||||
|
||||
|
||||
class GreenWorker(object):
|
||||
def __init__(self, work, work_queue):
|
||||
self.work = work
|
||||
self.work_queue = work_queue
|
||||
|
||||
def __call__(self):
|
||||
# Run our main piece of work.
|
||||
try:
|
||||
self.work.run()
|
||||
finally:
|
||||
# Consume any delayed work before finishing (this is how we finish
|
||||
# work that was to big for the pool size, but needs to be finished
|
||||
# no matter).
|
||||
while True:
|
||||
try:
|
||||
w = self.work_queue.get_nowait()
|
||||
except greenqueue.Empty:
|
||||
break
|
||||
else:
|
||||
try:
|
||||
w.run()
|
||||
finally:
|
||||
self.work_queue.task_done()
|
@ -20,6 +20,7 @@ import sys
|
||||
import traceback
|
||||
|
||||
from monotonic import monotonic as now # noqa
|
||||
import six
|
||||
|
||||
try:
|
||||
import eventlet as _eventlet # noqa
|
||||
@ -28,6 +29,31 @@ except ImportError:
|
||||
EVENTLET_AVAILABLE = False
|
||||
|
||||
|
||||
class WorkItem(object):
|
||||
def __init__(self, future, fn, args, kwargs):
|
||||
self.future = future
|
||||
self.fn = fn
|
||||
self.args = args
|
||||
self.kwargs = kwargs
|
||||
|
||||
def run(self):
|
||||
if not self.future.set_running_or_notify_cancel():
|
||||
return
|
||||
try:
|
||||
result = self.fn(*self.args, **self.kwargs)
|
||||
except BaseException:
|
||||
exc_type, exc_value, exc_tb = sys.exc_info()
|
||||
try:
|
||||
if six.PY2:
|
||||
self.future.set_exception_info(exc_value, exc_tb)
|
||||
else:
|
||||
self.future.set_exception(exc_value)
|
||||
finally:
|
||||
del(exc_type, exc_value, exc_tb)
|
||||
else:
|
||||
self.future.set_result(result)
|
||||
|
||||
|
||||
class Failure(object):
|
||||
"""Object that captures a exception (and its associated information)."""
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user