gh-274: Handle blocking I/O errors in GreenSocket
Fix recv_into(), recvfrom(), recvfrom_into() and sendto() methods of GreenSocket to handle blocking I/O errors (ex: BlockingIOError on Python 3). Even if the trampoline was called, the socket method can still fails with an I/O errors for various reasons (see manual pages of the C functions for examples). Ref: https://github.com/eventlet/eventlet/issues/274
This commit is contained in:

committed by
Jakub Stasiak

parent
0e1030e1ba
commit
bc4d1b5d36
@@ -304,13 +304,25 @@ class GreenSocket(object):
|
|||||||
"makefile instead", DeprecationWarning, stacklevel=2)
|
"makefile instead", DeprecationWarning, stacklevel=2)
|
||||||
return self.makefile(*args, **kw)
|
return self.makefile(*args, **kw)
|
||||||
|
|
||||||
def recv(self, buflen, flags=0):
|
def _read_trampoline(self):
|
||||||
|
self._trampoline(
|
||||||
|
self.fd,
|
||||||
|
read=True,
|
||||||
|
timeout=self.gettimeout(),
|
||||||
|
timeout_exc=socket.timeout("timed out"))
|
||||||
|
|
||||||
|
def _recv_loop(self, recv_meth, *args):
|
||||||
fd = self.fd
|
fd = self.fd
|
||||||
if self.act_non_blocking:
|
if self.act_non_blocking:
|
||||||
return fd.recv(buflen, flags)
|
return recv_meth(*args)
|
||||||
|
|
||||||
while True:
|
while True:
|
||||||
try:
|
try:
|
||||||
return fd.recv(buflen, flags)
|
# recv: bufsize=0?
|
||||||
|
# recv_into: buffer is empty?
|
||||||
|
if not args[0]:
|
||||||
|
self._read_trampoline()
|
||||||
|
return recv_meth(*args)
|
||||||
except socket.error as e:
|
except socket.error as e:
|
||||||
if get_errno(e) in SOCKET_BLOCKING:
|
if get_errno(e) in SOCKET_BLOCKING:
|
||||||
pass
|
pass
|
||||||
@@ -318,45 +330,35 @@ class GreenSocket(object):
|
|||||||
return b''
|
return b''
|
||||||
else:
|
else:
|
||||||
raise
|
raise
|
||||||
|
|
||||||
try:
|
try:
|
||||||
self._trampoline(
|
self._read_trampoline()
|
||||||
fd,
|
|
||||||
read=True,
|
|
||||||
timeout=self.gettimeout(),
|
|
||||||
timeout_exc=socket.timeout("timed out"))
|
|
||||||
except IOClosed as e:
|
except IOClosed as e:
|
||||||
# Perhaps we should return '' instead?
|
# Perhaps we should return '' instead?
|
||||||
raise EOFError()
|
raise EOFError()
|
||||||
|
|
||||||
def recvfrom(self, *args):
|
def recv(self, bufsize, flags=0):
|
||||||
if not self.act_non_blocking:
|
return self._recv_loop(self.fd.recv, bufsize, flags)
|
||||||
self._trampoline(self.fd, read=True, timeout=self.gettimeout(),
|
|
||||||
timeout_exc=socket.timeout("timed out"))
|
|
||||||
return self.fd.recvfrom(*args)
|
|
||||||
|
|
||||||
def recvfrom_into(self, *args):
|
def recvfrom(self, bufsize, flags=0):
|
||||||
if not self.act_non_blocking:
|
return self._recv_loop(self.fd.recvfrom, bufsize, flags)
|
||||||
self._trampoline(self.fd, read=True, timeout=self.gettimeout(),
|
|
||||||
timeout_exc=socket.timeout("timed out"))
|
|
||||||
return self.fd.recvfrom_into(*args)
|
|
||||||
|
|
||||||
def recv_into(self, *args):
|
def recv_into(self, buffer, nbytes=0, flags=0):
|
||||||
if not self.act_non_blocking:
|
return self._recv_loop(self.fd.recv_into, buffer, nbytes, flags)
|
||||||
self._trampoline(self.fd, read=True, timeout=self.gettimeout(),
|
|
||||||
timeout_exc=socket.timeout("timed out"))
|
|
||||||
return self.fd.recv_into(*args)
|
|
||||||
|
|
||||||
def send(self, data, flags=0):
|
def recvfrom_into(self, buffer, nbytes=0, flags=0):
|
||||||
fd = self.fd
|
return self._recv_loop(self.fd.recvfrom_into, buffer, nbytes, flags)
|
||||||
|
|
||||||
|
def _send_loop(self, send_method, data, *args):
|
||||||
if self.act_non_blocking:
|
if self.act_non_blocking:
|
||||||
return fd.send(data, flags)
|
return send_method(data, *args)
|
||||||
|
|
||||||
# blocking socket behavior - sends all, blocks if the buffer is full
|
# blocking socket behavior - sends all, blocks if the buffer is full
|
||||||
total_sent = 0
|
total_sent = 0
|
||||||
len_data = len(data)
|
len_data = len(data)
|
||||||
while 1:
|
while 1:
|
||||||
try:
|
try:
|
||||||
total_sent += fd.send(data[total_sent:], flags)
|
total_sent += send_method(data[total_sent:], *args)
|
||||||
except socket.error as e:
|
except socket.error as e:
|
||||||
eno = get_errno(e)
|
eno = get_errno(e)
|
||||||
if eno == errno.ENOTCONN or eno not in SOCKET_BLOCKING:
|
if eno == errno.ENOTCONN or eno not in SOCKET_BLOCKING:
|
||||||
@@ -373,16 +375,18 @@ class GreenSocket(object):
|
|||||||
|
|
||||||
return total_sent
|
return total_sent
|
||||||
|
|
||||||
|
def send(self, data, flags=0):
|
||||||
|
return self._send_loop(self.fd.send, data, flags)
|
||||||
|
|
||||||
|
def sendto(self, data, address, flags=0):
|
||||||
|
return self._send_loop(self.fd.sendto, data, address, flags)
|
||||||
|
|
||||||
def sendall(self, data, flags=0):
|
def sendall(self, data, flags=0):
|
||||||
tail = self.send(data, flags)
|
tail = self.send(data, flags)
|
||||||
len_data = len(data)
|
len_data = len(data)
|
||||||
while tail < len_data:
|
while tail < len_data:
|
||||||
tail += self.send(data[tail:], flags)
|
tail += self.send(data[tail:], flags)
|
||||||
|
|
||||||
def sendto(self, *args):
|
|
||||||
self._trampoline(self.fd, write=True)
|
|
||||||
return self.fd.sendto(*args)
|
|
||||||
|
|
||||||
def setblocking(self, flag):
|
def setblocking(self, flag):
|
||||||
if flag:
|
if flag:
|
||||||
self.act_non_blocking = False
|
self.act_non_blocking = False
|
||||||
|
Reference in New Issue
Block a user