From 85665e60c8f8d48cc1f9cbf7676c60e479a9519d Mon Sep 17 00:00:00 2001 From: Victor Stinner Date: Tue, 25 Nov 2014 17:17:13 +0100 Subject: [PATCH 1/7] Python issue #22685: Set the transport of stdout and stderr StreamReader objects in the SubprocessStreamProtocol. It allows to pause the transport to not buffer too much stdout or stderr data. --- asyncio/subprocess.py | 17 ++++++++++++----- tests/test_subprocess.py | 32 ++++++++++++++++++++++++++++++++ 2 files changed, 44 insertions(+), 5 deletions(-) diff --git a/asyncio/subprocess.py b/asyncio/subprocess.py index e4c1499..f6d6a14 100644 --- a/asyncio/subprocess.py +++ b/asyncio/subprocess.py @@ -41,15 +41,22 @@ class SubprocessStreamProtocol(streams.FlowControlMixin, def connection_made(self, transport): self._transport = transport - if transport.get_pipe_transport(1): + + stdout_transport = transport.get_pipe_transport(1) + if stdout_transport is not None: self.stdout = streams.StreamReader(limit=self._limit, loop=self._loop) - if transport.get_pipe_transport(2): + self.stdout.set_transport(stdout_transport) + + stderr_transport = transport.get_pipe_transport(2) + if stderr_transport is not None: self.stderr = streams.StreamReader(limit=self._limit, loop=self._loop) - stdin = transport.get_pipe_transport(0) - if stdin is not None: - self.stdin = streams.StreamWriter(stdin, + self.stderr.set_transport(stderr_transport) + + stdin_transport = transport.get_pipe_transport(0) + if stdin_transport is not None: + self.stdin = streams.StreamWriter(stdin_transport, protocol=self, reader=None, loop=self._loop) diff --git a/tests/test_subprocess.py b/tests/test_subprocess.py index 0e9e1ce..d0ab230 100644 --- a/tests/test_subprocess.py +++ b/tests/test_subprocess.py @@ -4,6 +4,7 @@ import asyncio import signal import sys import unittest +from unittest import mock from test import support if sys.platform != 'win32': from asyncio import unix_events @@ -161,6 +162,37 @@ class SubprocessMixin: self.loop.run_until_complete(proc.communicate(large_data)) self.loop.run_until_complete(proc.wait()) + def test_pause_reading(self): + @asyncio.coroutine + def test_pause_reading(): + limit = 100 + + code = '\n'.join(( + 'import sys', + 'sys.stdout.write("x" * %s)' % (limit * 2 + 1), + 'sys.stdout.flush()', + )) + proc = yield from asyncio.create_subprocess_exec( + sys.executable, '-c', code, + stdin=asyncio.subprocess.PIPE, + stdout=asyncio.subprocess.PIPE, + limit=limit, + loop=self.loop) + stdout_transport = proc._transport.get_pipe_transport(1) + stdout_transport.pause_reading = mock.Mock() + + yield from proc.wait() + + # The child process produced more than limit bytes of output, + # the stream reader transport should pause the protocol to not + # allocate too much memory. + return stdout_transport.pause_reading.called + + # Issue #22685: Ensure that the stream reader pauses the protocol + # when the child process produces too much data + called = self.loop.run_until_complete(test_pause_reading()) + self.assertTrue(called) + if sys.platform != 'win32': # Unix From 7a26bc899eb986c34b798454ca26a51103214fdc Mon Sep 17 00:00:00 2001 From: Benjamin Peterson Date: Tue, 25 Nov 2014 17:23:22 +0100 Subject: [PATCH 2/7] Python issue #22921: Don't require OpenSSL SNI to pass hostname to ssl functions. Patch by Donald Stufft. --- asyncio/selector_events.py | 2 +- tests/test_events.py | 8 -------- tests/test_selector_events.py | 2 +- 3 files changed, 2 insertions(+), 10 deletions(-) diff --git a/asyncio/selector_events.py b/asyncio/selector_events.py index f0c94c4..7df8b86 100644 --- a/asyncio/selector_events.py +++ b/asyncio/selector_events.py @@ -708,7 +708,7 @@ class _SelectorSslTransport(_SelectorTransport): 'server_side': server_side, 'do_handshake_on_connect': False, } - if server_hostname and not server_side and ssl.HAS_SNI: + if server_hostname and not server_side: wrap_kwargs['server_hostname'] = server_hostname sslsock = sslcontext.wrap_socket(rawsock, **wrap_kwargs) diff --git a/tests/test_events.py b/tests/test_events.py index fab3259..ea657fd 100644 --- a/tests/test_events.py +++ b/tests/test_events.py @@ -12,9 +12,6 @@ try: import ssl except ImportError: ssl = None - HAS_SNI = False -else: - from ssl import HAS_SNI import subprocess import sys import threading @@ -857,7 +854,6 @@ class EventLoopTestsMixin: server.close() @unittest.skipIf(ssl is None, 'No ssl module') - @unittest.skipUnless(HAS_SNI, 'No SNI support in ssl module') def test_create_server_ssl_verify_failed(self): proto = MyProto(loop=self.loop) server, host, port = self._make_ssl_server( @@ -882,7 +878,6 @@ class EventLoopTestsMixin: server.close() @unittest.skipIf(ssl is None, 'No ssl module') - @unittest.skipUnless(HAS_SNI, 'No SNI support in ssl module') @unittest.skipUnless(hasattr(socket, 'AF_UNIX'), 'No UNIX Sockets') def test_create_unix_server_ssl_verify_failed(self): proto = MyProto(loop=self.loop) @@ -909,7 +904,6 @@ class EventLoopTestsMixin: server.close() @unittest.skipIf(ssl is None, 'No ssl module') - @unittest.skipUnless(HAS_SNI, 'No SNI support in ssl module') def test_create_server_ssl_match_failed(self): proto = MyProto(loop=self.loop) server, host, port = self._make_ssl_server( @@ -937,7 +931,6 @@ class EventLoopTestsMixin: server.close() @unittest.skipIf(ssl is None, 'No ssl module') - @unittest.skipUnless(HAS_SNI, 'No SNI support in ssl module') @unittest.skipUnless(hasattr(socket, 'AF_UNIX'), 'No UNIX Sockets') def test_create_unix_server_ssl_verified(self): proto = MyProto(loop=self.loop) @@ -963,7 +956,6 @@ class EventLoopTestsMixin: server.close() @unittest.skipIf(ssl is None, 'No ssl module') - @unittest.skipUnless(HAS_SNI, 'No SNI support in ssl module') def test_create_server_ssl_verified(self): proto = MyProto(loop=self.loop) server, host, port = self._make_ssl_server( diff --git a/tests/test_selector_events.py b/tests/test_selector_events.py index 528da39..8eba56c 100644 --- a/tests/test_selector_events.py +++ b/tests/test_selector_events.py @@ -1408,7 +1408,7 @@ class SelectorSslTransportTests(test_utils.TestCase): self.assertEqual(tr._conn_lost, 1) self.assertEqual(1, self.loop.remove_reader_count[1]) - @unittest.skipIf(ssl is None or not ssl.HAS_SNI, 'No SNI support') + @unittest.skipIf(ssl is None, 'No SSL support') def test_server_hostname(self): _SelectorSslTransport( self.loop, self.sock, self.protocol, self.sslcontext, From c3492ca38fb8a80dbaa3e77c651cdf14eb85abea Mon Sep 17 00:00:00 2001 From: Victor Stinner Date: Thu, 4 Dec 2014 22:24:08 +0100 Subject: [PATCH 3/7] Initialize more Future and Task attributes in the class definition to avoid attribute errors in destructors. --- asyncio/futures.py | 3 +-- asyncio/tasks.py | 7 ++++--- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/asyncio/futures.py b/asyncio/futures.py index 40662a3..03a4bf0 100644 --- a/asyncio/futures.py +++ b/asyncio/futures.py @@ -135,6 +135,7 @@ class Future: _result = None _exception = None _loop = None + _source_traceback = None _blocking = False # proper use of future (yield vs yield from) @@ -155,8 +156,6 @@ class Future: self._callbacks = [] if self._loop.get_debug(): self._source_traceback = traceback.extract_stack(sys._getframe(1)) - else: - self._source_traceback = None def _format_callbacks(self): cb = self._callbacks diff --git a/asyncio/tasks.py b/asyncio/tasks.py index e073802..698ec6a 100644 --- a/asyncio/tasks.py +++ b/asyncio/tasks.py @@ -41,6 +41,10 @@ class Task(futures.Future): # all running event loops. {EventLoop: Task} _current_tasks = {} + # If False, don't log a message if the task is destroyed whereas its + # status is still pending + _log_destroy_pending = True + @classmethod def current_task(cls, loop=None): """Return the currently running task in an event loop or None. @@ -73,9 +77,6 @@ class Task(futures.Future): self._must_cancel = False self._loop.call_soon(self._step) self.__class__._all_tasks.add(self) - # If False, don't log a message if the task is destroyed whereas its - # status is still pending - self._log_destroy_pending = True # On Python 3.3 or older, objects with a destructor that are part of a # reference cycle are never destroyed. That's not the case any more on From 074ec980ae23b0988194dc7d8ea65a3e96c1501e Mon Sep 17 00:00:00 2001 From: Victor Stinner Date: Thu, 4 Dec 2014 22:45:05 +0100 Subject: [PATCH 4/7] Python issue #22922: More EventLoop methods fail if the loop is closed. Initial patch written by Torsten Landschoff. create_task(), call_at(), call_soon(), call_soon_threadsafe() and run_in_executor() now raise an error if the event loop is closed. --- asyncio/base_events.py | 4 ++++ asyncio/unix_events.py | 1 + tests/test_events.py | 35 ++++++++++++++++++++++++++++++++++- 3 files changed, 39 insertions(+), 1 deletion(-) diff --git a/asyncio/base_events.py b/asyncio/base_events.py index 40dd668..7c38b09 100644 --- a/asyncio/base_events.py +++ b/asyncio/base_events.py @@ -177,6 +177,7 @@ class BaseEventLoop(events.AbstractEventLoop): Return a task object. """ + self._check_closed() task = tasks.Task(coro, loop=self) if task._source_traceback: del task._source_traceback[-1] @@ -360,6 +361,7 @@ class BaseEventLoop(events.AbstractEventLoop): if (coroutines.iscoroutine(callback) or coroutines.iscoroutinefunction(callback)): raise TypeError("coroutines cannot be used with call_at()") + self._check_closed() if self._debug: self._assert_is_current_event_loop() timer = events.TimerHandle(when, callback, args, self) @@ -390,6 +392,7 @@ class BaseEventLoop(events.AbstractEventLoop): raise TypeError("coroutines cannot be used with call_soon()") if self._debug and check_loop: self._assert_is_current_event_loop() + self._check_closed() handle = events.Handle(callback, args, self) if handle._source_traceback: del handle._source_traceback[-1] @@ -426,6 +429,7 @@ class BaseEventLoop(events.AbstractEventLoop): if (coroutines.iscoroutine(callback) or coroutines.iscoroutinefunction(callback)): raise TypeError("coroutines cannot be used with run_in_executor()") + self._check_closed() if isinstance(callback, events.Handle): assert not args assert not isinstance(callback, events.TimerHandle) diff --git a/asyncio/unix_events.py b/asyncio/unix_events.py index efe06d4..d5db4d5 100644 --- a/asyncio/unix_events.py +++ b/asyncio/unix_events.py @@ -71,6 +71,7 @@ class _UnixSelectorEventLoop(selector_events.BaseSelectorEventLoop): or coroutines.iscoroutinefunction(callback)): raise TypeError("coroutines cannot be used with add_signal_handler()") self._check_signal(sig) + self._check_closed() try: # set_wakeup_fd() raises ValueError if this is not the # main thread. By calling it early we ensure that an diff --git a/tests/test_events.py b/tests/test_events.py index ea657fd..6644fbe 100644 --- a/tests/test_events.py +++ b/tests/test_events.py @@ -226,7 +226,8 @@ class EventLoopTestsMixin: def tearDown(self): # just in case if we have transport close callbacks - test_utils.run_briefly(self.loop) + if not self.loop.is_closed(): + test_utils.run_briefly(self.loop) self.loop.close() gc.collect() @@ -1434,6 +1435,38 @@ class EventLoopTestsMixin: with self.assertRaises(RuntimeError): self.loop.run_until_complete(coro) + def test_close(self): + self.loop.close() + + @asyncio.coroutine + def test(): + pass + + func = lambda: False + coro = test() + self.addCleanup(coro.close) + + # operation blocked when the loop is closed + with self.assertRaises(RuntimeError): + self.loop.run_forever() + with self.assertRaises(RuntimeError): + fut = asyncio.Future(loop=self.loop) + self.loop.run_until_complete(fut) + with self.assertRaises(RuntimeError): + self.loop.call_soon(func) + with self.assertRaises(RuntimeError): + self.loop.call_soon_threadsafe(func) + with self.assertRaises(RuntimeError): + self.loop.call_later(1.0, func) + with self.assertRaises(RuntimeError): + self.loop.call_at(self.loop.time() + .0, func) + with self.assertRaises(RuntimeError): + self.loop.run_in_executor(None, func) + with self.assertRaises(RuntimeError): + self.loop.create_task(coro) + with self.assertRaises(RuntimeError): + self.loop.add_signal_handler(signal.SIGTERM, func) + class SubprocessTestsMixin: From cdf72d261dc60b193d0e71f104010649621529ae Mon Sep 17 00:00:00 2001 From: Victor Stinner Date: Thu, 4 Dec 2014 22:56:40 +0100 Subject: [PATCH 5/7] Python issue #22475: fix Task.get_stack() doc --- asyncio/tasks.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/asyncio/tasks.py b/asyncio/tasks.py index 698ec6a..9aebffd 100644 --- a/asyncio/tasks.py +++ b/asyncio/tasks.py @@ -110,7 +110,7 @@ class Task(futures.Future): def get_stack(self, *, limit=None): """Return the list of stack frames for this task's coroutine. - If the coroutine is active, this returns the stack where it is + If the coroutine is not done, this returns the stack where it is suspended. If the coroutine has completed successfully or was cancelled, this returns an empty list. If the coroutine was terminated by an exception, this returns the list of traceback From a8efdfa2eedb68b5c2fa4d3af99610f7cc21d530 Mon Sep 17 00:00:00 2001 From: Victor Stinner Date: Thu, 4 Dec 2014 22:59:12 +0100 Subject: [PATCH 6/7] Removed duplicated words in in comments and docs. Patch written by Serhiy Storchaka. --- asyncio/futures.py | 2 +- tests/test_windows_events.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/asyncio/futures.py b/asyncio/futures.py index 03a4bf0..f46d008 100644 --- a/asyncio/futures.py +++ b/asyncio/futures.py @@ -61,7 +61,7 @@ class _TracebackLogger: the Future is collected, and the helper is present, the helper object is also collected, and its __del__() method will log the traceback. When the Future's result() or exception() method is - called (and a helper object is present), it removes the the helper + called (and a helper object is present), it removes the helper object, after calling its clear() method to prevent it from logging. diff --git a/tests/test_windows_events.py b/tests/test_windows_events.py index 85d9669..b4d9398 100644 --- a/tests/test_windows_events.py +++ b/tests/test_windows_events.py @@ -105,7 +105,7 @@ class ProactorTests(test_utils.TestCase): _overlapped.SetEvent(event) - # Wait for for set event; + # Wait for set event; # result should be True immediately fut = self.loop._proactor.wait_for_handle(event, 10) start = self.loop.time() From f412cadbae5cea6613912cc90fc2a55b5ccc8376 Mon Sep 17 00:00:00 2001 From: Victor Stinner Date: Thu, 4 Dec 2014 23:04:26 +0100 Subject: [PATCH 7/7] Python issue #22685: Fix test_pause_reading() of test_subprocess * mock also resume_reading() * ensure that resume_reading() is called --- tests/test_subprocess.py | 19 ++++++++++++------- 1 file changed, 12 insertions(+), 7 deletions(-) diff --git a/tests/test_subprocess.py b/tests/test_subprocess.py index d0ab230..9060b9d 100644 --- a/tests/test_subprocess.py +++ b/tests/test_subprocess.py @@ -163,13 +163,14 @@ class SubprocessMixin: self.loop.run_until_complete(proc.wait()) def test_pause_reading(self): + limit = 10 + size = (limit * 2 + 1) + @asyncio.coroutine def test_pause_reading(): - limit = 100 - code = '\n'.join(( 'import sys', - 'sys.stdout.write("x" * %s)' % (limit * 2 + 1), + 'sys.stdout.write("x" * %s)' % size, 'sys.stdout.flush()', )) proc = yield from asyncio.create_subprocess_exec( @@ -180,18 +181,22 @@ class SubprocessMixin: loop=self.loop) stdout_transport = proc._transport.get_pipe_transport(1) stdout_transport.pause_reading = mock.Mock() + stdout_transport.resume_reading = mock.Mock() - yield from proc.wait() + stdout, stderr = yield from proc.communicate() # The child process produced more than limit bytes of output, # the stream reader transport should pause the protocol to not # allocate too much memory. - return stdout_transport.pause_reading.called + return (stdout, stdout_transport) # Issue #22685: Ensure that the stream reader pauses the protocol # when the child process produces too much data - called = self.loop.run_until_complete(test_pause_reading()) - self.assertTrue(called) + stdout, transport = self.loop.run_until_complete(test_pause_reading()) + + self.assertEqual(stdout, b'x' * size) + self.assertTrue(transport.pause_reading.called) + self.assertTrue(transport.resume_reading.called) if sys.platform != 'win32':