New timeout error API: .is_timeout=True on exception object

Please use eventlet.is_timeout(ex) or getattr(ex, 'is_timeout', False)
instead of searching substrings in exception message.

https://github.com/eventlet/eventlet/pull/346
This commit is contained in:
Sergey Shepelev
2016-12-22 04:35:14 +03:00
parent 7595a5a4b7
commit 54be7593ab
11 changed files with 158 additions and 58 deletions

View File

@@ -14,6 +14,7 @@ if os.environ.get('EVENTLET_IMPORT_VERSION_ONLY') != '1':
from eventlet import patcher from eventlet import patcher
from eventlet import queue from eventlet import queue
from eventlet import semaphore from eventlet import semaphore
from eventlet import support
from eventlet import timeout from eventlet import timeout
import greenlet import greenlet
@@ -45,10 +46,15 @@ if os.environ.get('EVENTLET_IMPORT_VERSION_ONLY') != '1':
Timeout = timeout.Timeout Timeout = timeout.Timeout
with_timeout = timeout.with_timeout with_timeout = timeout.with_timeout
wrap_is_timeout = timeout.wrap_is_timeout
is_timeout = timeout.is_timeout
getcurrent = greenlet.greenlet.getcurrent getcurrent = greenlet.greenlet.getcurrent
# deprecated # deprecated
TimeoutError = timeout.Timeout TimeoutError, exc_after, call_after_global = (
exc_after = greenthread.exc_after support.wrap_deprecated(old, new)(fun) for old, new, fun in (
call_after_global = greenthread.call_after_global ('TimeoutError', 'Timeout', Timeout),
('exc_after', 'greenthread.exc_after', greenthread.exc_after),
('call_after_global', 'greenthread.call_after_global', greenthread.call_after_global),
))

View File

@@ -1,17 +1,19 @@
__socket = __import__('socket') __socket = __import__('socket')
__all__ = __socket.__all__ __all__ = __socket.__all__
__patched__ = ['fromfd', 'socketpair', 'ssl', 'socket'] __patched__ = ['fromfd', 'socketpair', 'ssl', 'socket', 'timeout']
from eventlet.patcher import slurp_properties import eventlet.patcher
slurp_properties(__socket, globals(), eventlet.patcher.slurp_properties(__socket, globals(), ignore=__patched__, srckeys=dir(__socket))
ignore=__patched__, srckeys=dir(__socket))
os = __import__('os') os = __import__('os')
import sys import sys
from eventlet.hubs import get_hub from eventlet import greenio
from eventlet.greenio import GreenSocket as socket
from eventlet.greenio import _GLOBAL_DEFAULT_TIMEOUT
socket = greenio.GreenSocket
_GLOBAL_DEFAULT_TIMEOUT = greenio._GLOBAL_DEFAULT_TIMEOUT
timeout = greenio.socket_timeout
try: try:
__original_fromfd__ = __socket.fromfd __original_fromfd__ = __socket.fromfd

View File

@@ -13,6 +13,7 @@ __all__ = [
'GreenSocket', '_GLOBAL_DEFAULT_TIMEOUT', 'set_nonblocking', 'GreenSocket', '_GLOBAL_DEFAULT_TIMEOUT', 'set_nonblocking',
'SOCKET_BLOCKING', 'SOCKET_CLOSED', 'CONNECT_ERR', 'CONNECT_SUCCESS', 'SOCKET_BLOCKING', 'SOCKET_CLOSED', 'CONNECT_ERR', 'CONNECT_SUCCESS',
'shutdown_safe', 'SSL', 'shutdown_safe', 'SSL',
'socket_timeout',
] ]
BUFFER_SIZE = 4096 BUFFER_SIZE = 4096
@@ -27,6 +28,9 @@ if six.PY2:
_original_socket = eventlet.patcher.original('socket').socket _original_socket = eventlet.patcher.original('socket').socket
socket_timeout = eventlet.timeout.wrap_is_timeout(socket.timeout)
def socket_connect(descriptor, address): def socket_connect(descriptor, address):
""" """
Attempts to connect to the address, returns the descriptor if it succeeds, Attempts to connect to the address, returns the descriptor if it succeeds,
@@ -210,14 +214,14 @@ class GreenSocket(object):
if self.act_non_blocking: if self.act_non_blocking:
return self.fd.accept() return self.fd.accept()
fd = self.fd fd = self.fd
_timeout_exc = socket_timeout('timed out')
while True: while True:
res = socket_accept(fd) res = socket_accept(fd)
if res is not None: if res is not None:
client, addr = res client, addr = res
set_nonblocking(client) set_nonblocking(client)
return type(self)(client), addr return type(self)(client), addr
self._trampoline(fd, read=True, timeout=self.gettimeout(), self._trampoline(fd, read=True, timeout=self.gettimeout(), timeout_exc=_timeout_exc)
timeout_exc=socket.timeout("timed out"))
def _mark_as_closed(self): def _mark_as_closed(self):
""" Mark this socket as being closed """ """ Mark this socket as being closed """
@@ -233,6 +237,7 @@ class GreenSocket(object):
if self.act_non_blocking: if self.act_non_blocking:
return self.fd.connect(address) return self.fd.connect(address)
fd = self.fd fd = self.fd
_timeout_exc = socket_timeout('timed out')
if self.gettimeout() is None: if self.gettimeout() is None:
while not socket_connect(fd, address): while not socket_connect(fd, address):
try: try:
@@ -246,10 +251,10 @@ class GreenSocket(object):
if socket_connect(fd, address): if socket_connect(fd, address):
return return
if time.time() >= end: if time.time() >= end:
raise socket.timeout("timed out") raise _timeout_exc
timeout = end - time.time()
try: try:
self._trampoline(fd, write=True, timeout=end - time.time(), self._trampoline(fd, write=True, timeout=timeout, timeout_exc=_timeout_exc)
timeout_exc=socket.timeout("timed out"))
except IOClosed: except IOClosed:
# ... we need some workable errno here. # ... we need some workable errno here.
raise socket.error(errno.EBADFD) raise socket.error(errno.EBADFD)
@@ -270,14 +275,15 @@ class GreenSocket(object):
return errno.EBADFD return errno.EBADFD
else: else:
end = time.time() + self.gettimeout() end = time.time() + self.gettimeout()
timeout_exc = socket.timeout(errno.EAGAIN)
while True: while True:
try: try:
if socket_connect(fd, address): if socket_connect(fd, address):
return 0 return 0
if time.time() >= end: if time.time() >= end:
raise socket.timeout(errno.EAGAIN) raise timeout_exc
self._trampoline(fd, write=True, timeout=end - time.time(), self._trampoline(fd, write=True, timeout=end - time.time(),
timeout_exc=socket.timeout(errno.EAGAIN)) timeout_exc=timeout_exc)
socket_checkerr(fd) socket_checkerr(fd)
except socket.error as ex: except socket.error as ex:
return get_errno(ex) return get_errno(ex)
@@ -316,7 +322,7 @@ class GreenSocket(object):
self.fd, self.fd,
read=True, read=True,
timeout=self.gettimeout(), timeout=self.gettimeout(),
timeout_exc=socket.timeout("timed out")) timeout_exc=socket_timeout('timed out'))
def _recv_loop(self, recv_meth, empty_val, *args): def _recv_loop(self, recv_meth, empty_val, *args):
fd = self.fd fd = self.fd
@@ -366,7 +372,8 @@ class GreenSocket(object):
if self.act_non_blocking: if self.act_non_blocking:
return send_method(data, *args) return send_method(data, *args)
while 1: _timeout_exc = socket_timeout('timed out')
while True:
try: try:
return send_method(data, *args) return send_method(data, *args)
except socket.error as e: except socket.error as e:
@@ -376,7 +383,7 @@ class GreenSocket(object):
try: try:
self._trampoline(self.fd, write=True, timeout=self.gettimeout(), self._trampoline(self.fd, write=True, timeout=self.gettimeout(),
timeout_exc=socket.timeout("timed out")) timeout_exc=_timeout_exc)
except IOClosed: except IOClosed:
raise socket.error(errno.ECONNRESET, 'Connection closed by another thread') raise socket.error(errno.ECONNRESET, 'Connection closed by another thread')

View File

@@ -3,6 +3,7 @@ import sys
from eventlet import event from eventlet import event
from eventlet import hubs from eventlet import hubs
from eventlet import support
from eventlet import timeout from eventlet import timeout
from eventlet.hubs import timer from eventlet.hubs import timer
from eventlet.support import greenlets as greenlet, six from eventlet.support import greenlets as greenlet, six
@@ -144,8 +145,11 @@ def exc_after(seconds, *throw_args):
return hub.schedule_call_local(seconds, getcurrent().throw, *throw_args) return hub.schedule_call_local(seconds, getcurrent().throw, *throw_args)
# deprecate, remove # deprecate, remove
TimeoutError = timeout.Timeout TimeoutError, with_timeout = (
with_timeout = timeout.with_timeout support.wrap_deprecated(old, new)(fun) for old, new, fun in (
('greenthread.TimeoutError', 'Timeout', timeout.Timeout),
('greenthread.with_timeout', 'with_timeout', timeout.with_timeout),
))
def _spawn_n(seconds, func, args, kwargs): def _spawn_n(seconds, func, args, kwargs):

View File

@@ -118,6 +118,7 @@ def get_hub():
return hub return hub
# Lame middle file import because complex dependencies in import graph
from eventlet import timeout from eventlet import timeout

View File

@@ -1,9 +1,15 @@
import inspect
import functools
import sys import sys
from contextlib import contextmanager import warnings
import contextlib
from eventlet.support import greenlets, six from eventlet.support import greenlets, six
_MISSING = object()
def get_errno(exc): def get_errno(exc):
""" Get the error code out of socket.error objects. """ Get the error code out of socket.error objects.
socket.error in <2.5 does not have errno attribute socket.error in <2.5 does not have errno attribute
@@ -43,7 +49,8 @@ else:
PY33 = sys.version_info[:2] == (3, 3) PY33 = sys.version_info[:2] == (3, 3)
@contextmanager
@contextlib.contextmanager
def capture_stderr(): def capture_stderr():
stream = six.StringIO() stream = six.StringIO()
original = sys.stderr original = sys.stderr
@@ -53,3 +60,30 @@ def capture_stderr():
finally: finally:
sys.stderr = original sys.stderr = original
stream.seek(0) stream.seek(0)
def wrap_deprecated(old, new):
def _resolve(s):
return 'eventlet.'+s if '.' not in s else s
msg = '''\
{old} is deprecated and will be removed in next version. Use {new} instead.
Autoupgrade: fgrep -rl '{old}' . |xargs -t sed --in-place='' -e 's/{old}/{new}/'
'''.format(old=_resolve(old), new=_resolve(new))
def wrapper(base):
klass = None
if inspect.isclass(base):
klass = base
base = klass.__init__
@functools.wraps(base)
def wrapped(*a, **kw):
warnings.warn(msg, DeprecationWarning, stacklevel=5)
return base(*a, **kw)
if klass is not None:
klass.__init__ = wrapped
return klass
return wrapped
return wrapper

View File

@@ -20,13 +20,16 @@
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.from eventlet.support import greenlets as greenlet # THE SOFTWARE.from eventlet.support import greenlets as greenlet
import functools
import inspect
import eventlet
from eventlet.support import greenlets as greenlet from eventlet.support import greenlets as greenlet
from eventlet.hubs import get_hub from eventlet.hubs import get_hub
__all__ = ['Timeout', __all__ = ['Timeout', 'with_timeout', 'wrap_is_timeout', 'is_timeout']
'with_timeout']
_NONE = object() _MISSING = object()
# deriving from BaseException so that "except Exception as e" doesn't catch # deriving from BaseException so that "except Exception as e" doesn't catch
# Timeout exceptions. # Timeout exceptions.
@@ -128,20 +131,49 @@ class Timeout(BaseException):
if value is self and self.exception is False: if value is self and self.exception is False:
return True return True
@property
def is_timeout(self):
return True
def with_timeout(seconds, function, *args, **kwds): def with_timeout(seconds, function, *args, **kwds):
"""Wrap a call to some (yielding) function with a timeout; if the called """Wrap a call to some (yielding) function with a timeout; if the called
function fails to return before the timeout, cancel it and return a flag function fails to return before the timeout, cancel it and return a flag
value. value.
""" """
timeout_value = kwds.pop("timeout_value", _NONE) timeout_value = kwds.pop("timeout_value", _MISSING)
timeout = Timeout(seconds) timeout = Timeout(seconds)
try: try:
try: try:
return function(*args, **kwds) return function(*args, **kwds)
except Timeout as ex: except Timeout as ex:
if ex is timeout and timeout_value is not _NONE: if ex is timeout and timeout_value is not _MISSING:
return timeout_value return timeout_value
raise raise
finally: finally:
timeout.cancel() timeout.cancel()
def wrap_is_timeout(base):
'''Adds `.is_timeout=True` attribute to objects returned by `base()`.
When `base` is class, attribute is added as read-only property. Returns `base`.
Otherwise, it returns a function that sets attribute on result of `base()` call.
Wrappers make best effort to be transparent.
'''
if inspect.isclass(base):
base.is_timeout = property(lambda _: True)
return base
@functools.wraps(base)
def fun(*args, **kwargs):
ex = base(*args, **kwargs)
ex.is_timeout = True
return ex
return fun
def is_timeout(obj):
py3err = getattr(__builtins__, 'TimeoutError', Timeout)
return bool(getattr(obj, 'is_timeout', False)) or isinstance(obj, py3err)

View File

@@ -304,6 +304,8 @@ def get_database_auth():
def run_python(path, env=None, args=None, timeout=None, pythonpath_extend=None, expect_pass=False): def run_python(path, env=None, args=None, timeout=None, pythonpath_extend=None, expect_pass=False):
new_argv = [sys.executable] new_argv = [sys.executable]
if sys.version_info[:2] <= (2, 6):
new_argv += ['-W', 'ignore::DeprecationWarning']
new_env = os.environ.copy() new_env = os.environ.copy()
new_env.setdefault('eventlet_test_in_progress', 'yes') new_env.setdefault('eventlet_test_in_progress', 'yes')
src_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) src_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
@@ -357,6 +359,11 @@ def run_isolated(path, prefix='tests/isolated/', **kwargs):
run_python(prefix + path, **kwargs) run_python(prefix + path, **kwargs)
def check_is_timeout(obj):
value_text = getattr(obj, 'is_timeout', '(missing)')
assert obj.is_timeout, 'type={0} str={1} .is_timeout={2}'.format(type(obj), str(obj), value_text)
certificate_file = os.path.join(os.path.dirname(__file__), 'test_server.crt') certificate_file = os.path.join(os.path.dirname(__file__), 'test_server.crt')
private_key_file = os.path.join(os.path.dirname(__file__), 'test_server.key') private_key_file = os.path.join(os.path.dirname(__file__), 'test_server.key')

View File

@@ -1,12 +1,7 @@
import os
from unittest import TestCase, main
from nose.tools import eq_
import eventlet import eventlet
from eventlet import greenio, hubs, greenthread, spawn from eventlet import greenio, hubs, greenthread
from eventlet.green import ssl from eventlet.green import ssl
from tests import skip_if_no_ssl import tests
def check_hub(): def check_hub():
@@ -21,10 +16,7 @@ def check_hub():
assert not hub.running assert not hub.running
class TestApi(TestCase): class TestApi(tests.LimitedTestCase):
certificate_file = os.path.join(os.path.dirname(__file__), 'test_server.crt')
private_key_file = os.path.join(os.path.dirname(__file__), 'test_server.key')
def test_tcp_listener(self): def test_tcp_listener(self):
socket = eventlet.listen(('0.0.0.0', 0)) socket = eventlet.listen(('0.0.0.0', 0))
@@ -50,13 +42,13 @@ class TestApi(TestCase):
client = eventlet.connect(('127.0.0.1', server.getsockname()[1])) client = eventlet.connect(('127.0.0.1', server.getsockname()[1]))
fd = client.makefile('rb') fd = client.makefile('rb')
client.close() client.close()
eq_(fd.readline(), b'hello\n') assert fd.readline() == b'hello\n'
eq_(fd.read(), b'') assert fd.read() == b''
fd.close() fd.close()
check_hub() check_hub()
@skip_if_no_ssl @tests.skip_if_no_ssl
def test_connect_ssl(self): def test_connect_ssl(self):
def accept_once(listenfd): def accept_once(listenfd):
try: try:
@@ -70,8 +62,8 @@ class TestApi(TestCase):
server = eventlet.wrap_ssl( server = eventlet.wrap_ssl(
eventlet.listen(('0.0.0.0', 0)), eventlet.listen(('0.0.0.0', 0)),
self.private_key_file, tests.private_key_file,
self.certificate_file, tests.certificate_file,
server_side=True server_side=True
) )
eventlet.spawn_n(accept_once, server) eventlet.spawn_n(accept_once, server)
@@ -98,12 +90,12 @@ class TestApi(TestCase):
def server(sock): def server(sock):
client, addr = sock.accept() client, addr = sock.accept()
eventlet.sleep(0.1) eventlet.sleep(0.1)
server_evt = spawn(server, server_sock) server_evt = eventlet.spawn(server, server_sock)
eventlet.sleep(0) eventlet.sleep(0)
try: try:
desc = eventlet.connect(('127.0.0.1', bound_port)) desc = eventlet.connect(('127.0.0.1', bound_port))
hubs.trampoline(desc, read=True, write=False, timeout=0.001) hubs.trampoline(desc, read=True, write=False, timeout=0.001)
except eventlet.TimeoutError: except eventlet.Timeout:
pass # test passed pass # test passed
else: else:
assert False, "Didn't timeout" assert False, "Didn't timeout"
@@ -126,7 +118,7 @@ class TestApi(TestCase):
desc = eventlet.connect(('127.0.0.1', bound_port)) desc = eventlet.connect(('127.0.0.1', bound_port))
try: try:
hubs.trampoline(desc, read=True, timeout=0.1) hubs.trampoline(desc, read=True, timeout=0.1)
except eventlet.TimeoutError: except eventlet.Timeout:
assert False, "Timed out" assert False, "Timed out"
server.close() server.close()
@@ -174,14 +166,14 @@ class TestApi(TestCase):
try: try:
eventlet.with_timeout(0.1, func) eventlet.with_timeout(0.1, func)
self.fail(u'Expected TimeoutError') self.fail(u'Expected Timeout')
except eventlet.TimeoutError: except eventlet.Timeout:
pass pass
class Foo(object): def test_wrap_is_timeout():
pass class A(object):
pass
obj = eventlet.wrap_is_timeout(A)()
if __name__ == '__main__': tests.check_is_timeout(obj)
main()

View File

@@ -88,3 +88,14 @@ def test_getaddrinfo_ipv6_scope():
if not socket.has_ipv6: if not socket.has_ipv6:
return return
socket.getaddrinfo('::1%2', 80, socket.AF_INET6) socket.getaddrinfo('::1%2', 80, socket.AF_INET6)
def test_error_is_timeout():
s1, _ = socket.socketpair()
s1.settimeout(0.01)
try:
s1.recv(1)
except socket.error as e:
tests.check_is_timeout(e)
else:
assert False, 'No timeout, socket.error was not raised'

View File

@@ -1,12 +1,12 @@
import eventlet import eventlet
from tests import LimitedTestCase import tests
DELAY = 0.01 DELAY = 0.01
class TestDirectRaise(LimitedTestCase): class TestDirectRaise(tests.LimitedTestCase):
def test_direct_raise_class(self): def test_direct_raise_class(self):
try: try:
raise eventlet.Timeout raise eventlet.Timeout
@@ -36,7 +36,7 @@ class TestDirectRaise(LimitedTestCase):
str(tm) str(tm)
class TestWithTimeout(LimitedTestCase): class TestWithTimeout(tests.LimitedTestCase):
def test_with_timeout(self): def test_with_timeout(self):
self.assertRaises(eventlet.Timeout, eventlet.with_timeout, DELAY, eventlet.sleep, DELAY * 10) self.assertRaises(eventlet.Timeout, eventlet.with_timeout, DELAY, eventlet.sleep, DELAY * 10)
X = object() X = object()
@@ -53,3 +53,7 @@ class TestWithTimeout(LimitedTestCase):
eventlet.Timeout, eventlet.Timeout,
eventlet.with_timeout, eventlet.with_timeout,
DELAY, longer_timeout) DELAY, longer_timeout)
def test_is_timeout_attribute():
tests.check_is_timeout(eventlet.Timeout())