348 lines
11 KiB
Python
348 lines
11 KiB
Python
import eventlet.hubs.hub
|
|
import greenlet
|
|
import logging
|
|
import signal
|
|
import sys
|
|
socket = eventlet.patcher.original('socket')
|
|
threading = eventlet.patcher.original('threading')
|
|
|
|
logger = logging.getLogger('aioeventlet')
|
|
|
|
try:
|
|
import asyncio
|
|
|
|
if sys.platform == 'win32':
|
|
from asyncio.windows_utils import socketpair
|
|
else:
|
|
socketpair = socket.socketpair
|
|
except ImportError:
|
|
import trollius as asyncio
|
|
|
|
if sys.platform == 'win32':
|
|
from trollius.windows_utils import socketpair
|
|
else:
|
|
socketpair = socket.socketpair
|
|
|
|
if eventlet.patcher.is_monkey_patched('socket'):
|
|
# trollius must use call original socket and threading functions.
|
|
# Examples: socket.socket(), socket.socketpair(),
|
|
# threading.current_thread().
|
|
asyncio.base_events.socket = socket
|
|
asyncio.base_events.threading = threading
|
|
if hasattr(threading, 'get_ident'):
|
|
asyncio.base_events._get_thread_ident = threading.get_ident
|
|
else:
|
|
# Python 2
|
|
asyncio.base_events._get_thread_ident = threading._get_ident
|
|
asyncio.events.threading = threading
|
|
if sys.platform == 'win32':
|
|
asyncio.windows_events.socket = socket
|
|
asyncio.windows_utils.socket = socket
|
|
else:
|
|
asyncio.unix_events.socket = socket
|
|
asyncio.unix_events.threading = threading
|
|
# FIXME: patch also trollius.py3_ssl
|
|
|
|
# BaseDefaultEventLoopPolicy._Local must inherit from threading.local
|
|
# of the original threading module, not the patched threading module
|
|
class _Local(threading.local):
|
|
_loop = None
|
|
_set_called = False
|
|
|
|
asyncio.events.BaseDefaultEventLoopPolicy._Local = _Local
|
|
|
|
_EVENT_READ = asyncio.selectors.EVENT_READ
|
|
_EVENT_WRITE = asyncio.selectors.EVENT_WRITE
|
|
_HUB_READ = eventlet.hubs.hub.READ
|
|
_HUB_WRITE = eventlet.hubs.hub.WRITE
|
|
|
|
# Eventlet 0.15 or newer?
|
|
_EVENTLET15 = hasattr(eventlet.hubs.hub.noop, 'mark_as_closed')
|
|
|
|
|
|
class _TpoolExecutor(object):
|
|
def __init__(self, loop):
|
|
import eventlet.tpool
|
|
self._loop = loop
|
|
self._tpool = eventlet.tpool
|
|
|
|
def submit(self, fn, *args, **kwargs):
|
|
f = asyncio.Future(loop=self._loop)
|
|
try:
|
|
res = self._tpool.execute(fn, *args, **kwargs)
|
|
except Exception as exc:
|
|
f.set_exception(exc)
|
|
else:
|
|
f.set_result(res)
|
|
return f
|
|
|
|
def shutdown(self, wait=True):
|
|
self._tpool.killall()
|
|
|
|
|
|
class _Selector(asyncio.selectors._BaseSelectorImpl):
|
|
def __init__(self, loop, hub):
|
|
super(_Selector, self).__init__()
|
|
# fd => events
|
|
self._notified = {}
|
|
self._loop = loop
|
|
self._hub = hub
|
|
# eventlet.event.Event() used by FD notifiers to wake up select()
|
|
self._event = None
|
|
|
|
def close(self):
|
|
keys = list(self.get_map().values())
|
|
for key in keys:
|
|
self.unregister(key.fd)
|
|
super(_Selector, self).close()
|
|
|
|
def _add(self, fd, event):
|
|
if event == _EVENT_READ:
|
|
event_type = _HUB_READ
|
|
func = self._notify_read
|
|
else:
|
|
event_type = _HUB_WRITE
|
|
func = self._notify_write
|
|
|
|
if _EVENTLET15:
|
|
self._hub.add(event_type, fd, func, self._throwback, None)
|
|
else:
|
|
self._hub.add(event_type, fd, func)
|
|
|
|
def register(self, fileobj, events, data=None):
|
|
key = super(_Selector, self).register(fileobj, events, data)
|
|
if events & _EVENT_READ:
|
|
self._add(key.fd, _EVENT_READ)
|
|
if events & _EVENT_WRITE:
|
|
self._add(key.fd, _EVENT_WRITE)
|
|
return key
|
|
|
|
def _remove(self, fd, event):
|
|
if event == _EVENT_READ:
|
|
event_type = _HUB_READ
|
|
else:
|
|
event_type = _HUB_WRITE
|
|
try:
|
|
listener = self._hub.listeners[event_type][fd]
|
|
except KeyError:
|
|
pass
|
|
else:
|
|
self._hub.remove(listener)
|
|
|
|
def unregister(self, fileobj):
|
|
key = super(_Selector, self).unregister(fileobj)
|
|
self._remove(key.fd, _EVENT_READ)
|
|
self._remove(key.fd, _EVENT_WRITE)
|
|
return key
|
|
|
|
def _notify(self, fd, event):
|
|
if fd in self._notified:
|
|
self._notified[fd] |= event
|
|
else:
|
|
self._notified[fd] = event
|
|
if self._event is not None and not self._event.ready():
|
|
# wakeup the select() method
|
|
self._event.send("ready")
|
|
|
|
def _notify_read(self, fd):
|
|
self._notify(fd, _EVENT_READ)
|
|
|
|
def _notify_write(self, fd):
|
|
self._notify(fd, _EVENT_WRITE)
|
|
|
|
def _throwback(self, fd):
|
|
# FIXME: do something with the FD in this case?
|
|
pass
|
|
|
|
def _read_events(self):
|
|
notified = self._notified
|
|
self._notified = {}
|
|
ready = []
|
|
for fd, events in notified.items():
|
|
key = self.get_key(fd)
|
|
ready.append((key, events & key.events))
|
|
return ready
|
|
|
|
def select(self, timeout):
|
|
events = self._read_events()
|
|
if events:
|
|
return events
|
|
|
|
self._event = eventlet.event.Event()
|
|
try:
|
|
if timeout is not None:
|
|
def timeout_cb(event):
|
|
if event.ready():
|
|
return
|
|
event.send('timeout')
|
|
|
|
eventlet.spawn_after(timeout, timeout_cb, self._event)
|
|
|
|
self._event.wait()
|
|
# FIXME: cancel the timeout_cb if wait() returns 'ready'?
|
|
else:
|
|
# blocking call
|
|
self._event.wait()
|
|
return self._read_events()
|
|
finally:
|
|
self._event = None
|
|
|
|
|
|
class EventLoop(asyncio.SelectorEventLoop):
|
|
def __init__(self):
|
|
self._greenthread = None
|
|
|
|
# Store a reference to the hub to ensure
|
|
# that we always use the same hub
|
|
self._hub = eventlet.hubs.get_hub()
|
|
|
|
selector = _Selector(self, self._hub)
|
|
|
|
super(EventLoop, self).__init__(selector=selector)
|
|
|
|
# Force a call to set_debug() to set hub.debug_blocking
|
|
self.set_debug(self.get_debug())
|
|
|
|
if eventlet.patcher.is_monkey_patched('thread'):
|
|
self._default_executor = _TpoolExecutor(self)
|
|
|
|
def stop(self):
|
|
super(EventLoop, self).stop()
|
|
# selector.select() is running: write into the self-pipe to wake up
|
|
# the selector
|
|
self._write_to_self()
|
|
|
|
def call_soon(self, callback, *args):
|
|
handle = super(EventLoop, self).call_soon(callback, *args)
|
|
if self._selector is not None and self._selector._event:
|
|
# selector.select() is running: write into the self-pipe to wake up
|
|
# the selector
|
|
self._write_to_self()
|
|
return handle
|
|
|
|
def call_at(self, when, callback, *args):
|
|
handle = super(EventLoop, self).call_at(when, callback, *args)
|
|
if self._selector is not None and self._selector._event:
|
|
# selector.select() is running: write into the self-pipe to wake up
|
|
# the selector
|
|
self._write_to_self()
|
|
return handle
|
|
|
|
def set_debug(self, debug):
|
|
super(EventLoop, self).set_debug(debug)
|
|
|
|
self._hub.debug_exceptions = debug
|
|
|
|
# Detect blocking eventlet functions. The feature is implemented with
|
|
# signal.alarm() which is is not available on Windows. Signal handlers
|
|
# can only be set from the main loop. So detecting blocking functions
|
|
# cannot be used on Windows nor from a thread different than the main
|
|
# thread.
|
|
self._hub.debug_blocking = (
|
|
debug
|
|
and (sys.platform != 'win32')
|
|
and isinstance(threading.current_thread(), threading._MainThread))
|
|
|
|
if (self._hub.debug_blocking
|
|
and hasattr(self, 'slow_callback_duration')):
|
|
self._hub.debug_blocking_resolution = self.slow_callback_duration
|
|
|
|
def run_forever(self):
|
|
self._greenthread = eventlet.getcurrent()
|
|
try:
|
|
super(EventLoop, self).run_forever()
|
|
finally:
|
|
if self._hub.debug_blocking:
|
|
# eventlet event loop is still running: cancel the current
|
|
# detection of blocking tasks
|
|
signal.alarm(0)
|
|
self._greenthread = None
|
|
|
|
def time(self):
|
|
return self._hub.clock()
|
|
|
|
|
|
class EventLoopPolicy(asyncio.DefaultEventLoopPolicy):
|
|
_loop_factory = EventLoop
|
|
|
|
|
|
def wrap_greenthread(gt, loop=None):
|
|
"""Wrap an eventlet GreenThread, or a greenlet, into a Future object.
|
|
|
|
The Future object waits for the completion of a greenthread. The result
|
|
or the exception of the greenthread will be stored in the Future object.
|
|
|
|
The greenthread must be wrapped before its execution starts. If the
|
|
greenthread is running or already finished, an exception is raised.
|
|
|
|
For greenlets, the run attribute must be set.
|
|
"""
|
|
if loop is None:
|
|
loop = asyncio.get_event_loop()
|
|
fut = asyncio.Future(loop=loop)
|
|
|
|
if not isinstance(gt, greenlet.greenlet):
|
|
raise TypeError("greenthread or greenlet request, not %s"
|
|
% type(gt))
|
|
|
|
if gt:
|
|
raise RuntimeError("wrap_greenthread: the greenthread is running")
|
|
if gt.dead:
|
|
raise RuntimeError("wrap_greenthread: the greenthread already finished")
|
|
|
|
if isinstance(gt, eventlet.greenthread.GreenThread):
|
|
orig_main = gt.run
|
|
def wrap_func(*args, **kw):
|
|
try:
|
|
orig_main(*args, **kw)
|
|
except Exception as exc:
|
|
fut.set_exception(exc)
|
|
else:
|
|
result = gt.wait()
|
|
fut.set_result(result)
|
|
gt.run = wrap_func
|
|
else:
|
|
try:
|
|
orig_func = gt.run
|
|
except AttributeError:
|
|
raise RuntimeError("wrap_greenthread: the run attribute "
|
|
"of the greenlet is not set")
|
|
def wrap_func(*args, **kw):
|
|
try:
|
|
result = orig_func(*args, **kw)
|
|
except Exception as exc:
|
|
fut.set_exception(exc)
|
|
else:
|
|
fut.set_result(result)
|
|
gt.run = wrap_func
|
|
return fut
|
|
|
|
|
|
def yield_future(future, loop=None):
|
|
"""Wait for a future, a task, or a coroutine object from a greenthread.
|
|
|
|
Yield control other eligible eventlet coroutines until the future is done
|
|
(finished successfully or failed with an exception).
|
|
|
|
Return the result or raise the exception of the future.
|
|
|
|
The function must not be called from the greenthread
|
|
running the aioeventlet event loop.
|
|
"""
|
|
future = asyncio.async(future, loop=loop)
|
|
if future._loop._greenthread == eventlet.getcurrent():
|
|
raise RuntimeError("yield_future() must not be called from "
|
|
"the greenthread of the aioeventlet event loop")
|
|
|
|
event = eventlet.event.Event()
|
|
def done(fut):
|
|
try:
|
|
result = fut.result()
|
|
except Exception as exc:
|
|
event.send_exception(exc)
|
|
else:
|
|
event.send(result)
|
|
|
|
future.add_done_callback(done)
|
|
return event.wait()
|