226 lines
6.6 KiB
Python
226 lines
6.6 KiB
Python
##
|
|
# Copyright (c) 2013 Yury Selivanov
|
|
# License: Apache 2.0
|
|
##
|
|
|
|
"""greenio package allows to compose greenlets and asyncio coroutines."""
|
|
|
|
__all__ = ['task', 'yield_from']
|
|
|
|
|
|
import greenlet
|
|
import sys
|
|
|
|
try:
|
|
import asyncio
|
|
except ImportError:
|
|
asyncio = None
|
|
try:
|
|
import trollius
|
|
except ImportError:
|
|
trollius = None
|
|
if asyncio is None:
|
|
raise
|
|
if asyncio is None:
|
|
asyncio = trollius
|
|
|
|
def _create_task(coro, loop):
|
|
if loop is None:
|
|
loop = asyncio.get_event_loop()
|
|
if hasattr(loop, 'create_task'):
|
|
return loop.create_task(coro)
|
|
else:
|
|
return GreenTask(coro, loop=loop)
|
|
|
|
|
|
if trollius is not None:
|
|
def _async(future, loop):
|
|
# trollius iscoroutine() accepts trollius and asyncio coroutine
|
|
# objects
|
|
if trollius.iscoroutine(future):
|
|
return _create_task(future, loop)
|
|
else:
|
|
return future
|
|
else:
|
|
def _async(future, loop):
|
|
if asyncio.iscoroutine(future):
|
|
return _create_task(future, loop)
|
|
else:
|
|
return future
|
|
|
|
|
|
_FUTURE_CLASSES = (asyncio.Future,)
|
|
if trollius is not None and trollius is not asyncio:
|
|
_FUTURE_CLASSES += (trollius.Future,)
|
|
|
|
import sys
|
|
|
|
|
|
class _LoopGreenlet(greenlet.greenlet):
|
|
"""Main greenlet (analog to main thread) for the event-loop.
|
|
|
|
It's a policy task to provide event-loop implementation with
|
|
its "run_*" methods executed in _LoopGreenlet context"""
|
|
|
|
|
|
class _TaskGreenlet(greenlet.greenlet):
|
|
"""Each task (and its subsequent coroutines) decorated with
|
|
``@greenio.task`` is executed in this greenlet"""
|
|
|
|
|
|
class _GreenTaskMixin(object):
|
|
def __init__(self, *args, **kwargs):
|
|
self._greenlet = None
|
|
super(_GreenTaskMixin, self).__init__(*args, **kwargs)
|
|
|
|
def _step(self, value=None, exc=None):
|
|
if self._greenlet is None:
|
|
# Means that the task is not currently in a suspended greenlet
|
|
# waiting for results for "yield_from"
|
|
ovr = super(_GreenTaskMixin, self)._step
|
|
self._greenlet = _TaskGreenlet(ovr)
|
|
|
|
# Store a reference to the current task for "yield_from"
|
|
self._greenlet.task = self
|
|
|
|
# Now invoke overloaded "Task._step" in "_TaskGreenlet"
|
|
result = self._greenlet.switch(value, exc)
|
|
|
|
# If "result" is "_YIELDED" it means that the "yield_from"
|
|
# method was called
|
|
if result is not _YIELDED:
|
|
# And if not - then task jumped out of greenlet without
|
|
# calling "yield_from"
|
|
self._greenlet.task = None
|
|
self._greenlet = None
|
|
else:
|
|
self.__class__._current_tasks.pop(self._loop)
|
|
else:
|
|
# The task is in the greenlet, that means that we have a result
|
|
# for the "yield_from"
|
|
|
|
self.__class__._current_tasks[self._loop] = self
|
|
|
|
if exc is not None:
|
|
if hasattr(exc, '__traceback__'):
|
|
tb = exc.__traceback__
|
|
else:
|
|
tb = sys.exc_info()[2]
|
|
result = self._greenlet.throw(
|
|
type(exc), exc, tb)
|
|
else:
|
|
result = self._greenlet.switch(value)
|
|
|
|
# Again, if "result" is "_YIELDED" then we just called "yield_from"
|
|
# again
|
|
if result is not _YIELDED:
|
|
self._greenlet.task = None
|
|
self._greenlet = None
|
|
else:
|
|
self.__class__._current_tasks.pop(self._loop)
|
|
|
|
|
|
class _GreenLoopMixin(object):
|
|
def _green_run(self, method, args, kwargs):
|
|
return _LoopGreenlet(method).switch(*args, **kwargs)
|
|
|
|
def run_until_complete(self, *args, **kwargs):
|
|
ovr = super(_GreenLoopMixin, self).run_until_complete
|
|
return self._green_run(ovr, args, kwargs)
|
|
|
|
def run_forever(self, *args, **kwargs):
|
|
ovr = super(_GreenLoopMixin, self).run_forever
|
|
return self._green_run(ovr, args, kwargs)
|
|
|
|
|
|
class GreenTask(_GreenTaskMixin, asyncio.Task):
|
|
pass
|
|
|
|
|
|
class GreenUnixSelectorLoop(_GreenLoopMixin, asyncio.SelectorEventLoop):
|
|
def create_task(self, coro):
|
|
return GreenTask(coro, loop=self)
|
|
|
|
|
|
class GreenEventLoopPolicy(asyncio.DefaultEventLoopPolicy):
|
|
def new_event_loop(self):
|
|
return GreenUnixSelectorLoop()
|
|
|
|
|
|
if trollius is not None:
|
|
if trollius is not asyncio:
|
|
class GreenTrolliusTask(_GreenTaskMixin, trollius.Task):
|
|
pass
|
|
|
|
class GreenTrolliusUnixSelectorLoop(_GreenLoopMixin,
|
|
trollius.SelectorEventLoop):
|
|
def create_task(self, coro):
|
|
return GreenTrolliusTask(coro)
|
|
|
|
class GreenTrolliusEventLoopPolicy(asyncio.DefaultEventLoopPolicy):
|
|
|
|
def new_event_loop(self):
|
|
return GreenTrolliusUnixSelectorLoop()
|
|
else:
|
|
GreenTrolliusTask = GreenTask
|
|
GreenTrolliusUnixSelectorLoop = GreenUnixSelectorLoop
|
|
GreenTrolliusEventLoopPolicy = GreenEventLoopPolicy
|
|
|
|
|
|
def yield_from(future, loop=None):
|
|
"""A function to use instead of ``yield from`` statement."""
|
|
|
|
future = _async(future, loop)
|
|
|
|
gl = greenlet.getcurrent()
|
|
|
|
if __debug__:
|
|
if not isinstance(gl.parent, _LoopGreenlet):
|
|
raise RuntimeError(
|
|
'"greenio.yield_from" requires GreenEventLoopPolicy '
|
|
'or compatible')
|
|
# or something went horribly wrong...
|
|
|
|
if not isinstance(gl, _TaskGreenlet):
|
|
raise RuntimeError(
|
|
'"greenio.yield_from" was supposed to be called from a '
|
|
'"greenio.task" or a subsequent coroutine')
|
|
# ...ditto
|
|
|
|
task = gl.task
|
|
|
|
if not isinstance(future, _FUTURE_CLASSES):
|
|
raise RuntimeError(
|
|
'greenlet.yield_from was supposed to receive only Futures, '
|
|
'got {!r} in task {!r}'.format(future, task))
|
|
|
|
# "_wakeup" will call the "_step" method (which we overloaded in
|
|
# GreenTask, and therefore wakeup the awaiting greenlet)
|
|
future.add_done_callback(task._wakeup)
|
|
task._fut_waiter = future
|
|
|
|
# task cancellation has been delayed.
|
|
if task._must_cancel:
|
|
if task._fut_waiter.cancel():
|
|
task._must_cancel = False
|
|
|
|
# Jump out of the current task greenlet (we'll return to GreenTask._step)
|
|
return gl.parent.switch(_YIELDED)
|
|
|
|
|
|
def task(func, loop=None):
|
|
"""A decorator, allows use of ``yield_from`` in the decorated or
|
|
subsequent coroutines."""
|
|
|
|
coro_func = asyncio.coroutine(func)
|
|
|
|
def task_wrapper(*args, **kwds):
|
|
coro_obj = coro_func(*args, **kwds)
|
|
return _create_task(coro_obj, loop)
|
|
|
|
return task_wrapper
|
|
|
|
|
|
class _YIELDED(object):
|
|
"""Marker, don't use it"""
|