Files
deb-python-taskflow/taskflow/utils/eventlet_utils.py
Ivan A. Melnikov 2409a46295 Add wait_for_any method to eventlet utils
This change introduces new utility funtion in eventlet utils, named
wait_for_any. It waits on one or more futures until at least some of
them completes. This is simplified version of concurrent.futures.wait
adapted to work with our green futures.

Change-Id: I2ab50c4e108ea1e9c81f618bd8fa8a8156bb695b
2013-12-19 12:28:23 +02:00

186 lines
6.0 KiB
Python

# -*- coding: utf-8 -*-
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright (C) 2013 Yahoo! Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import logging
import threading
from concurrent import futures
try:
from eventlet.green import threading as gthreading
from eventlet import greenpool
from eventlet import patcher
from eventlet import queue
EVENTLET_AVAILABLE = True
except ImportError:
EVENTLET_AVAILABLE = False
from taskflow.utils import lock_utils
LOG = logging.getLogger(__name__)
# NOTE(harlowja): this object signals to threads that they should stop
# working and rest in peace.
_TOMBSTONE = object()
class _WorkItem(object):
def __init__(self, future, fn, args, kwargs):
self.future = future
self.fn = fn
self.args = args
self.kwargs = kwargs
def run(self):
if not self.future.set_running_or_notify_cancel():
return
try:
result = self.fn(*self.args, **self.kwargs)
except BaseException as e:
self.future.set_exception(e)
else:
self.future.set_result(result)
class _Worker(object):
def __init__(self, executor, work_queue, worker_id):
self.executor = executor
self.work_queue = work_queue
self.worker_id = worker_id
def __call__(self):
try:
while True:
work = self.work_queue.get(block=True)
if work is _TOMBSTONE:
# NOTE(harlowja): give notice to other workers (this is
# basically a chain of tombstone calls that will cause all
# the workers on the queue to eventually shut-down).
self.work_queue.put(_TOMBSTONE)
break
else:
work.run()
except BaseException:
LOG.critical("Exception in worker %s of '%s'",
self.worker_id, self.executor, exc_info=True)
class _GreenFuture(futures.Future):
def __init__(self):
super(_GreenFuture, self).__init__()
# NOTE(harlowja): replace the built-in condition with a greenthread
# compatible one so that when getting the result of this future the
# functions will correctly yield to eventlet. If this is not done then
# waiting on the future never actually causes the greenthreads to run
# and thus you wait for infinity.
if not patcher.is_monkey_patched('threading'):
self._condition = gthreading.Condition()
class GreenExecutor(futures.Executor):
"""A greenthread backed executor."""
def __init__(self, max_workers=1000):
assert EVENTLET_AVAILABLE, 'eventlet is needed to use GreenExecutor'
assert int(max_workers) > 0, 'Max workers must be greater than zero'
self._max_workers = int(max_workers)
self._pool = greenpool.GreenPool(self._max_workers)
self._work_queue = queue.LightQueue()
self._shutdown_lock = threading.RLock()
self._shutdown = False
@lock_utils.locked(lock='_shutdown_lock')
def submit(self, fn, *args, **kwargs):
if self._shutdown:
raise RuntimeError('cannot schedule new futures after shutdown')
f = _GreenFuture()
w = _WorkItem(f, fn, args, kwargs)
self._work_queue.put(w)
# Spin up any new workers (since they are spun up on demand and
# not at executor initialization).
self._spin_up()
return f
def _spin_up(self):
cur_am = (self._pool.running() + self._pool.waiting())
if cur_am < self._max_workers and cur_am < self._work_queue.qsize():
# Spin up a new worker to do the work as we are behind.
worker = _Worker(self, self._work_queue, cur_am + 1)
self._pool.spawn(worker)
def shutdown(self, wait=True):
with self._shutdown_lock:
self._shutdown = True
self._work_queue.put(_TOMBSTONE)
if wait:
self._pool.waitall()
class _FirstCompletedWaiter(object):
"""Provides the event that wait_for_any() block on."""
def __init__(self, is_green):
if is_green:
assert EVENTLET_AVAILABLE, 'eventlet is needed to use this feature'
self.event = gthreading.Event()
else:
self.event = threading.Event()
self.finished_futures = []
def add_result(self, future):
self.finished_futures.append(future)
self.event.set()
def add_exception(self, future):
self.finished_futures.append(future)
self.event.set()
def add_cancelled(self, future):
self.finished_futures.append(future)
self.event.set()
def _done_futures(fs):
return set(f for f in fs
if f._state in [futures._base.CANCELLED_AND_NOTIFIED,
futures._base.FINISHED])
def wait_for_any(fs, timeout=None):
"""Wait for one of the futures to complete.
Works correctly with both green and non-green futures.
Returns pair (done, not_done).
"""
with futures._base._AcquireFutures(fs):
done = _done_futures(fs)
if done:
return done, set(fs) - done
is_green = any(isinstance(f, _GreenFuture) for f in fs)
waiter = _FirstCompletedWaiter(is_green)
for f in fs:
f._waiters.append(waiter)
waiter.event.wait(timeout)
for f in fs:
f._waiters.remove(waiter)
with futures._base._AcquireFutures(fs):
done = _done_futures(fs)
return done, set(fs) - done