Merge Tulip into Trollius

This commit is contained in:
Victor Stinner
2014-05-20 11:19:32 +02:00
22 changed files with 409 additions and 41 deletions

View File

@@ -16,3 +16,6 @@ e6ad14ff01d4f8246bc42e5f58f413af831b5da0 trollius-0.1.2
17bd8436e91c8118d5c7bcb70e1bdeb96619b55d trollius-0.1.5
258c3dfb8c641a90ab8a94ae779082dbd8e8e9bd trollius-0.1.6
0255df07d690d1caa7cae8bdf3999a18b6386788 trollius-0.2
e6084a6ff3bbfe6bf4dc40e2c9782c5706c77e79 3.4.1
e6084a6ff3bbfe6bf4dc40e2c9782c5706c77e79 3.4.1
7c85dd9f8f6e24207ddc5480501fd7614bfe4dce 3.4.1

View File

@@ -44,9 +44,19 @@ clean:
rm -rf .tox
# For distribution builders only!
# Push a source distribution for Python 3.3 to PyPI.
# You must update the version in setup.py first.
# The corresponding action on Windows is pypi.bat.
# A PyPI user configuration in ~/.pypirc is required.
# A PyPI user configuration in ~/.pypirc is required;
# you can create a suitable confifuration using
# python setup.py register
pypi: clean
python3.3 setup.py sdist upload
# The corresponding action on Windows is pypi.bat. For that to work,
# you need to install wheel and setuptools. The easiest way is to get
# pip using the get-pip.py script found here:
# https://pip.pypa.io/en/latest/installing.html#install-pip
# That will install setuptools and pip; then you can just do
# \Python33\python.exe -m pip install wheel
# after which the pypi.bat script should work.

View File

@@ -262,6 +262,8 @@ class BaseEventLoop(events.AbstractEventLoop):
"""Like call_later(), but uses an absolute time."""
if tasks.iscoroutinefunction(callback):
raise TypeError("coroutines cannot be used with call_at()")
if self._debug:
self._assert_is_current_event_loop()
timer = events.TimerHandle(when, callback, args, self)
heapq.heappush(self._scheduled, timer)
return timer
@@ -276,15 +278,34 @@ class BaseEventLoop(events.AbstractEventLoop):
Any positional arguments after the callback will be passed to
the callback when it is called.
"""
return self._call_soon(callback, args, check_loop=True)
def _call_soon(self, callback, args, check_loop):
if tasks.iscoroutinefunction(callback):
raise TypeError("coroutines cannot be used with call_soon()")
if self._debug and check_loop:
self._assert_is_current_event_loop()
handle = events.Handle(callback, args, self)
self._ready.append(handle)
return handle
def _assert_is_current_event_loop(self):
"""Asserts that this event loop is the current event loop.
Non-threadsafe methods of this class make this assumption and will
likely behave incorrectly when the assumption is violated.
Should only be called when (self._debug == True). The caller is
responsible for checking this condition for performance reasons.
"""
if events.get_event_loop() is not self:
raise RuntimeError(
"non-threadsafe operation invoked on an event loop other "
"than the current one")
def call_soon_threadsafe(self, callback, *args):
"""XXX"""
handle = self.call_soon(callback, *args)
handle = self._call_soon(callback, args, check_loop=False)
self._write_to_self()
return handle
@@ -768,11 +789,7 @@ class BaseEventLoop(events.AbstractEventLoop):
elif self._scheduled:
# Compute the desired timeout.
when = self._scheduled[0]._when
deadline = max(0, when - self.time())
if timeout is None:
timeout = deadline
else:
timeout = min(timeout, deadline)
timeout = max(0, when - self.time())
# TODO: Instrumentation only in debug mode?
if logger.isEnabledFor(logging.INFO):

View File

@@ -38,7 +38,7 @@ class Return(StopIteration):
class CoroWrapper(object):
# Wrapper for coroutine in _DEBUG mode.
__slots__ = ['gen', 'func', '__name__', '__doc__']
__slots__ = ['gen', 'func', '__name__', '__doc__', '__weakref__']
def __init__(self, gen, func):
assert inspect.isgenerator(gen), gen
@@ -52,7 +52,12 @@ class CoroWrapper(object):
return next(self.gen)
next = __next__
def send(self, value):
def send(self, *value):
# We use `*value` because of a bug in CPythons prior
# to 3.4.1. See issue #21209 and test_yield_from_corowrapper
# for details. This workaround should be removed in 3.5.0.
if len(value) == 1:
value = value[0]
return self.gen.send(value)
def throw(self, exc):
@@ -61,8 +66,22 @@ class CoroWrapper(object):
def close(self):
return self.gen.close()
@property
def gi_frame(self):
return self.gen.gi_frame
@property
def gi_running(self):
return self.gen.gi_running
@property
def gi_code(self):
return self.gen.gi_code
def __del__(self):
frame = self.gen.gi_frame
# Be careful accessing self.gen.frame -- self.gen might not exist.
gen = getattr(self, 'gen', None)
frame = getattr(gen, 'gi_frame', None)
if frame is not None and frame.f_lasti == -1:
func = self.func
code = func.__code__

View File

@@ -17,7 +17,7 @@ import socket
class Handle(object):
"""Object returned by callback registration methods."""
__slots__ = ['_callback', '_args', '_cancelled', '_loop']
__slots__ = ['_callback', '_args', '_cancelled', '_loop', '__weakref__']
def __init__(self, callback, args, loop):
assert not isinstance(callback, Handle), 'A Handle is not a callback'

View File

@@ -109,10 +109,17 @@ class BaseSelectorEventLoop(base_events.BaseEventLoop):
pass
def _write_to_self(self):
try:
wrap_error(self._csock.send, b'x')
except (BlockingIOError, InterruptedError):
pass
# This may be called from a different thread, possibly after
# _close_self_pipe() has been called or even while it is
# running. Guard for self._csock being None or closed. When
# a socket is closed, send() raises OSError (with errno set to
# EBADF, but let's not rely on the exact error code).
csock = self._csock
if csock is not None:
try:
wrap_error(csock.send, b'x')
except OSError:
pass
def _start_serving(self, protocol_factory, sock,
sslcontext=None, server=None):

View File

@@ -446,6 +446,64 @@ if hasattr(select, 'epoll'):
super(EpollSelector, self).close()
if hasattr(select, 'devpoll'):
class DevpollSelector(_BaseSelectorImpl):
"""Solaris /dev/poll selector."""
def __init__(self):
super().__init__()
self._devpoll = select.devpoll()
def fileno(self):
return self._devpoll.fileno()
def register(self, fileobj, events, data=None):
key = super().register(fileobj, events, data)
poll_events = 0
if events & EVENT_READ:
poll_events |= select.POLLIN
if events & EVENT_WRITE:
poll_events |= select.POLLOUT
self._devpoll.register(key.fd, poll_events)
return key
def unregister(self, fileobj):
key = super().unregister(fileobj)
self._devpoll.unregister(key.fd)
return key
def select(self, timeout=None):
if timeout is None:
timeout = None
elif timeout <= 0:
timeout = 0
else:
# devpoll() has a resolution of 1 millisecond, round away from
# zero to wait *at least* timeout seconds.
timeout = math.ceil(timeout * 1e3)
ready = []
try:
fd_event_list = self._devpoll.poll(timeout)
except InterruptedError:
return ready
for fd, event in fd_event_list:
events = 0
if event & ~select.POLLIN:
events |= EVENT_WRITE
if event & ~select.POLLOUT:
events |= EVENT_READ
key = self._key_from_fd(fd)
if key:
ready.append((key, events & key.events))
return ready
def close(self):
self._devpoll.close()
super().close()
if hasattr(select, 'kqueue'):
class KqueueSelector(_BaseSelectorImpl):
@@ -519,12 +577,14 @@ if hasattr(select, 'kqueue'):
super(KqueueSelector, self).close()
# Choose the best implementation: roughly, epoll|kqueue > poll > select.
# Choose the best implementation: roughly, epoll|kqueue|devpoll > poll > select.
# select() also can't accept a FD > FD_SETSIZE (usually around 1024)
if 'KqueueSelector' in globals():
DefaultSelector = KqueueSelector
elif 'EpollSelector' in globals():
DefaultSelector = EpollSelector
elif 'DevpollSelector' in globals():
DefaultSelector = DevpollSelector
elif 'PollSelector' in globals():
DefaultSelector = PollSelector
else:

View File

@@ -423,12 +423,17 @@ class StreamReader(object):
raise Return(b'')
if n < 0:
while not self._eof:
self._waiter = self._create_waiter('read')
try:
yield From(self._waiter)
finally:
self._waiter = None
# This used to just loop creating a new waiter hoping to
# collect everything in self._buffer, but that would
# deadlock if the subprocess sends more than self.limit
# bytes. So just call self.read(self._limit) until EOF.
blocks = []
while True:
block = yield From(self.read(self._limit))
if not block:
break
blocks.append(block)
raise Return(b''.join(blocks))
else:
if not self._buffer and not self._eof:
self._waiter = self._create_waiter('read')

View File

@@ -171,6 +171,25 @@ class Task(futures.Future):
print(line, file=file, end='')
def cancel(self):
"""Request that a task to cancel itself.
This arranges for a CancellationError to be thrown into the
wrapped coroutine on the next cycle through the event loop.
The coroutine then has a chance to clean up or even deny
the request using try/except/finally.
Contrary to Future.cancel(), this does not guarantee that the
task will be cancelled: the exception might be caught and
acted upon, delaying cancellation of the task or preventing it
completely. The task may also return a value or raise a
different exception.
Immediately after this method is called, Task.cancelled() will
not return True (unless the task was already cancelled). A
task will be marked as cancelled when the wrapped coroutine
terminates with a CancelledError exception (even if cancel()
was not called).
"""
if self.done():
return False
if self._fut_waiter is not None:

View File

@@ -218,6 +218,10 @@ class _UnixSelectorEventLoop(selector_events.BaseSelectorEventLoop):
raise TypeError('ssl argument must be an SSLContext or None')
if path is not None:
if sock is not None:
raise ValueError(
'path and sock can not be specified at the same time')
sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
try:

View File

@@ -0,0 +1,20 @@
import asyncio
from asyncio import From
END = b'Bye-bye!\n'
@asyncio.coroutine
def echo_client():
reader, writer = yield From(asyncio.open_connection('localhost', 8000))
writer.write(b'Hello, world\n')
writer.write(b'What a fine day it is.\n')
writer.write(END)
while True:
line = yield From(reader.readline())
print('received:', line)
if line == END or not line:
break
writer.close()
loop = asyncio.get_event_loop()
loop.run_until_complete(echo_client())

View File

@@ -0,0 +1,18 @@
import asyncio
from asyncio import From
@asyncio.coroutine
def echo_server():
yield From(asyncio.start_server(handle_connection, 'localhost', 8000))
@asyncio.coroutine
def handle_connection(reader, writer):
while True:
data = yield From(reader.read(8192))
if not data:
break
writer.write(data)
loop = asyncio.get_event_loop()
loop.run_until_complete(echo_server())
loop.run_forever()

View File

@@ -7,7 +7,7 @@ from asyncio import From
code = """
import os, sys
fd = int(sys.argv[1])
data = os.write(fd, b'data')
os.write(fd, b'data')
os.close(fd)
"""

19
runtests.py Normal file → Executable file
View File

@@ -25,6 +25,7 @@ import optparse
import gc
import logging
import os
import random
import re
import sys
import textwrap
@@ -65,6 +66,12 @@ ARGS.add_option(
ARGS.add_option(
'--findleaks', action='store_true', dest='findleaks',
help='detect tests that leak memory')
ARGS.add_option(
'-r', '--randomize', action='store_true',
help='randomize test execution order.')
ARGS.add_option(
'--seed', type=int,
help='random seed to reproduce a previous random run')
ARGS.add_option(
'-q', action="store_true", dest='quiet', help='quiet')
ARGS.add_option(
@@ -126,6 +133,14 @@ def load_modules(basedir, suffix='.py'):
return mods
def randomize_tests(tests, seed):
if seed is None:
seed = random.randrange(10000000)
random.seed(seed)
print("Using random seed", seed)
random.shuffle(tests._tests)
class TestsFinder:
def __init__(self, testsdir, includes=(), excludes=()):
@@ -267,12 +282,16 @@ def runtests():
if args.forever:
while True:
tests = finder.load_tests()
if args.randomize:
randomize_tests(tests, args.seed)
result = runner_factory(verbosity=v,
failfast=failfast).run(tests)
if not result.wasSuccessful():
sys.exit(1)
else:
tests = finder.load_tests()
if args.randomize:
randomize_tests(tests, args.seed)
result = runner_factory(verbosity=v,
failfast=failfast).run(tests)
sys.exit(not result.wasSuccessful())

View File

@@ -1,2 +0,0 @@
[build_ext]
inplace = 1

View File

@@ -138,6 +138,29 @@ class BaseEventLoopTests(test_utils.TestCase):
# are really slow
self.assertLessEqual(dt, 0.9, dt)
def test_assert_is_current_event_loop(self):
def cb():
pass
other_loop = base_events.BaseEventLoop()
other_loop._selector = mock.Mock()
asyncio.set_event_loop(other_loop)
# raise RuntimeError if the event loop is different in debug mode
self.loop.set_debug(True)
with self.assertRaises(RuntimeError):
self.loop.call_soon(cb)
with self.assertRaises(RuntimeError):
self.loop.call_later(60, cb)
with self.assertRaises(RuntimeError):
self.loop.call_at(self.loop.time() + 60, cb)
# check disabled if debug mode is disabled
self.loop.set_debug(False)
self.loop.call_soon(cb)
self.loop.call_later(60, cb)
self.loop.call_at(self.loop.time() + 60, cb)
def test_run_once_in_executor_handle(self):
def cb():
pass
@@ -333,6 +356,7 @@ class BaseEventLoopTests(test_utils.TestCase):
def test_default_exc_handler_coro(self):
self.loop._process_events = mock.Mock()
self.loop.set_debug(True)
asyncio.set_event_loop(self.loop)
@asyncio.coroutine
def zero_error_coro():

View File

@@ -13,6 +13,7 @@ import sys
import threading
import errno
import unittest
import weakref
try:
import ssl
@@ -714,6 +715,19 @@ class EventLoopTestsMixin(object):
# close server
server.close()
@unittest.skipUnless(hasattr(socket, 'AF_UNIX'), 'No UNIX Sockets')
def test_create_unix_server_path_socket_error(self):
proto = MyProto(loop=self.loop)
sock = socket.socket()
try:
f = self.loop.create_unix_server(lambda: proto, '/test', sock=sock)
with self.assertRaisesRegex(ValueError,
'path and sock can not be specified '
'at the same time'):
server = self.loop.run_until_complete(f)
finally:
sock.close()
def _create_ssl_context(self, certfile, keyfile=None):
sslcontext = asyncio.SSLContext(ssl.PROTOCOL_SSLv23)
if not asyncio.BACKPORT_SSL_CONTEXT:
@@ -1797,6 +1811,11 @@ class HandleTests(test_utils.TestCase):
'handle': h
})
def test_handle_weakref(self):
wd = weakref.WeakValueDictionary()
h = asyncio.Handle(lambda: None, (), object())
wd['h'] = h # Would fail without __weakref__ slot.
class TimerTests(test_utils.TestCase):

View File

@@ -170,6 +170,7 @@ class FutureTests(test_utils.TestCase):
@mock.patch('asyncio.base_events.logger')
def test_tb_logger_exception_unretrieved(self, m_log):
self.loop.set_debug(True)
asyncio.set_event_loop(self.loop)
fut = asyncio.Future(loop=self.loop)
fut.set_exception(RuntimeError('boom'))
del fut

View File

@@ -137,8 +137,9 @@ class BaseSelectorEventLoopTests(test_utils.TestCase):
self.assertIsNone(self.loop._write_to_self())
def test_write_to_self_exception(self):
self.loop._csock.send.side_effect = OSError()
self.assertRaises(OSError, self.loop._write_to_self)
# _write_to_self() swallows OSError
self.loop._csock.send.side_effect = RuntimeError()
self.assertRaises(RuntimeError, self.loop._write_to_self)
def test_sock_recv(self):
sock = mock.Mock()

View File

@@ -1,7 +1,10 @@
"""Tests for streams.py."""
import gc
import io
import os
import socket
import sys
import unittest
try:
import ssl
@@ -10,6 +13,7 @@ except ImportError:
import asyncio
from asyncio import Return, From
from asyncio import compat
from asyncio import test_utils
from asyncio.test_utils import mock
@@ -584,6 +588,46 @@ class StreamReaderTests(test_utils.TestCase):
server.stop()
self.assertEqual(msg, b"hello world!\n")
@unittest.skipIf(sys.platform == 'win32', "Don't have pipes")
def test_read_all_from_pipe_reader(self):
# See Tulip issue 168. This test is derived from the example
# subprocess_attach_read_pipe.py, but we configure the
# StreamReader's limit so that twice it is less than the size
# of the data writter. Also we must explicitly attach a child
# watcher to the event loop.
code = """\
import os, sys
fd = int(sys.argv[1])
os.write(fd, b'data')
os.close(fd)
"""
rfd, wfd = os.pipe()
args = [sys.executable, '-c', code, str(wfd)]
pipe = io.open(rfd, 'rb', 0)
reader = asyncio.StreamReader(loop=self.loop, limit=1)
protocol = asyncio.StreamReaderProtocol(reader, loop=self.loop)
transport, _ = self.loop.run_until_complete(
self.loop.connect_read_pipe(lambda: protocol, pipe))
watcher = asyncio.SafeChildWatcher()
watcher.attach_loop(self.loop)
try:
asyncio.set_child_watcher(watcher)
kw = {'loop': self.loop}
if compat.PY3:
kw['pass_fds'] = set((wfd,))
proc = self.loop.run_until_complete(
asyncio.create_subprocess_exec(*args, **kw))
self.loop.run_until_complete(proc.wait())
finally:
asyncio.set_child_watcher(None)
os.close(wfd)
data = self.loop.run_until_complete(reader.read(-1))
self.assertEqual(data, b'data')
if __name__ == '__main__':
unittest.main()

View File

@@ -2,7 +2,9 @@
import gc
import os.path
import types
import unittest
import weakref
import asyncio
from asyncio import From, Return
@@ -1367,23 +1369,102 @@ class TaskTests(test_utils.TestCase):
self.assertRaises(ValueError, self.loop.run_until_complete,
asyncio.wait([], loop=self.loop))
def test_yield_without_from(self):
old_debug = coroutines._DEBUG
def test_corowrapper_mocks_generator(self):
def check():
# A function that asserts various things.
# Called twice, with different debug flag values.
@asyncio.coroutine
def coro():
# The actual coroutine.
self.assertTrue(gen.gi_running)
yield From(fut)
# A completed Future used to run the coroutine.
fut = asyncio.Future(loop=self.loop)
fut.set_result(None)
# Call the coroutine.
gen = coro()
# Check some properties.
self.assertTrue(asyncio.iscoroutine(gen))
self.assertIsInstance(gen.gi_frame, types.FrameType)
self.assertFalse(gen.gi_running)
self.assertIsInstance(gen.gi_code, types.CodeType)
# Run it.
self.loop.run_until_complete(gen)
# The frame should have changed.
self.assertIsNone(gen.gi_frame)
# Save debug flag.
old_debug = asyncio.coroutines._DEBUG
try:
# Test with debug flag cleared.
asyncio.coroutines._DEBUG = False
check()
# Test with debug flag set.
asyncio.coroutines._DEBUG = True
check()
finally:
# Restore original debug flag.
asyncio.coroutines._DEBUG = old_debug
def test_yield_from_corowrapper(self):
old_debug = asyncio.coroutines._DEBUG
asyncio.coroutines._DEBUG = True
try:
@asyncio.coroutine
def task():
yield None
raise Return("done")
def t1():
res = yield From(t2())
raise Return(res)
coroutines._DEBUG = False
value = self.loop.run_until_complete(task())
self.assertEqual(value, "done")
@asyncio.coroutine
def t2():
f = asyncio.Future(loop=self.loop)
asyncio.Task(t3(f), loop=self.loop)
res = yield From(f)
raise Return(res)
coroutines._DEBUG = True
self.assertRaises(RuntimeError,
self.loop.run_until_complete, task())
@asyncio.coroutine
def t3(f):
f.set_result((1, 2, 3))
task = asyncio.Task(t1(), loop=self.loop)
val = self.loop.run_until_complete(task)
self.assertEqual(val, (1, 2, 3))
finally:
coroutines._DEBUG = old_debug
asyncio.coroutines._DEBUG = old_debug
def test_yield_from_corowrapper_send(self):
def foo():
a = yield
raise Return(a)
def call(arg):
cw = asyncio.coroutines.CoroWrapper(foo(), foo)
cw.send(None)
try:
cw.send(arg)
except Return as ex:
return ex.value
else:
raise AssertionError('StopIteration was expected')
self.assertEqual(call((1, 2)), (1, 2))
self.assertEqual(call('spam'), 'spam')
def test_corowrapper_weakref(self):
wd = weakref.WeakValueDictionary()
def foo(): yield From([])
cw = asyncio.coroutines.CoroWrapper(foo(), foo)
wd['cw'] = cw # Would fail without __weakref__ slot.
cw.gen = None # Suppress warning from __del__.
class GatherTestsBase:

View File

@@ -1345,7 +1345,6 @@ class ChildWatcherTestsMixin:
with self.ignore_warnings:
self.watcher._sig_chld()
callback.assert_called(m.waitpid)
if isinstance(self.watcher, asyncio.FastChildWatcher):
# here the FastChildWatche enters a deadlock
# (there is no way to prevent it)