Catch py3 ConnectionErrors
This commit is contained in:
@@ -17,6 +17,10 @@ from .protocol.metadata import MetadataRequest
|
|||||||
from .protocol.produce import ProduceRequest
|
from .protocol.produce import ProduceRequest
|
||||||
from .version import __version__
|
from .version import __version__
|
||||||
|
|
||||||
|
if six.PY2:
|
||||||
|
ConnectionError = None
|
||||||
|
|
||||||
|
|
||||||
log = logging.getLogger(__name__)
|
log = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
@@ -503,7 +507,6 @@ class KafkaClient(object):
|
|||||||
('0.8.0', MetadataRequest([])),
|
('0.8.0', MetadataRequest([])),
|
||||||
]
|
]
|
||||||
|
|
||||||
|
|
||||||
for version, request in test_cases:
|
for version, request in test_cases:
|
||||||
connect()
|
connect()
|
||||||
f = self.send(node_id, request)
|
f = self.send(node_id, request)
|
||||||
@@ -517,8 +520,11 @@ class KafkaClient(object):
|
|||||||
log.info('Broker version identifed as %s', version)
|
log.info('Broker version identifed as %s', version)
|
||||||
return version
|
return version
|
||||||
|
|
||||||
assert isinstance(f.exception.message, socket.error)
|
if six.PY2:
|
||||||
assert f.exception.message.errno in (32, 54)
|
assert isinstance(f.exception.args[0], socket.error)
|
||||||
|
assert f.exception.args[0].errno in (32, 54)
|
||||||
|
else:
|
||||||
|
assert isinstance(f.exception.args[0], ConnectionError)
|
||||||
log.info("Broker is not v%s -- it did not recognize %s",
|
log.info("Broker is not v%s -- it did not recognize %s",
|
||||||
version, request.__class__.__name__)
|
version, request.__class__.__name__)
|
||||||
continue
|
continue
|
||||||
|
@@ -13,7 +13,6 @@ import time
|
|||||||
import six
|
import six
|
||||||
|
|
||||||
import kafka.common as Errors
|
import kafka.common as Errors
|
||||||
from kafka.common import ConnectionError
|
|
||||||
from kafka.future import Future
|
from kafka.future import Future
|
||||||
from kafka.protocol.api import RequestHeader
|
from kafka.protocol.api import RequestHeader
|
||||||
from kafka.protocol.commit import GroupCoordinatorResponse
|
from kafka.protocol.commit import GroupCoordinatorResponse
|
||||||
@@ -21,6 +20,10 @@ from kafka.protocol.types import Int32
|
|||||||
from kafka.version import __version__
|
from kafka.version import __version__
|
||||||
|
|
||||||
|
|
||||||
|
if six.PY2:
|
||||||
|
ConnectionError = socket.error
|
||||||
|
BlockingIOError = Exception
|
||||||
|
|
||||||
log = logging.getLogger(__name__)
|
log = logging.getLogger(__name__)
|
||||||
|
|
||||||
DEFAULT_SOCKET_TIMEOUT_SECONDS = 120
|
DEFAULT_SOCKET_TIMEOUT_SECONDS = 120
|
||||||
@@ -166,7 +169,7 @@ class BrokerConnection(object):
|
|||||||
sent_bytes = self._sock.send(message)
|
sent_bytes = self._sock.send(message)
|
||||||
assert sent_bytes == len(message)
|
assert sent_bytes == len(message)
|
||||||
self._sock.setblocking(False)
|
self._sock.setblocking(False)
|
||||||
except (AssertionError, socket.error) as e:
|
except (AssertionError, ConnectionError) as e:
|
||||||
log.exception("Error sending %s to %s", request, self)
|
log.exception("Error sending %s to %s", request, self)
|
||||||
error = Errors.ConnectionError(e)
|
error = Errors.ConnectionError(e)
|
||||||
self.close(error=error)
|
self.close(error=error)
|
||||||
@@ -225,8 +228,8 @@ class BrokerConnection(object):
|
|||||||
# An extremely small, but non-zero, probability that there are
|
# An extremely small, but non-zero, probability that there are
|
||||||
# more than 0 but not yet 4 bytes available to read
|
# more than 0 but not yet 4 bytes available to read
|
||||||
self._rbuffer.write(self._sock.recv(4 - self._rbuffer.tell()))
|
self._rbuffer.write(self._sock.recv(4 - self._rbuffer.tell()))
|
||||||
except socket.error as e:
|
except ConnectionError as e:
|
||||||
if e.errno == errno.EWOULDBLOCK:
|
if six.PY2 and e.errno == errno.EWOULDBLOCK:
|
||||||
# This shouldn't happen after selecting above
|
# This shouldn't happen after selecting above
|
||||||
# but just in case
|
# but just in case
|
||||||
return None
|
return None
|
||||||
@@ -234,6 +237,10 @@ class BrokerConnection(object):
|
|||||||
' closing socket', self)
|
' closing socket', self)
|
||||||
self.close(error=Errors.ConnectionError(e))
|
self.close(error=Errors.ConnectionError(e))
|
||||||
return None
|
return None
|
||||||
|
except BlockingIOError:
|
||||||
|
if six.PY3:
|
||||||
|
return None
|
||||||
|
raise
|
||||||
|
|
||||||
if self._rbuffer.tell() == 4:
|
if self._rbuffer.tell() == 4:
|
||||||
self._rbuffer.seek(0)
|
self._rbuffer.seek(0)
|
||||||
@@ -249,14 +256,18 @@ class BrokerConnection(object):
|
|||||||
staged_bytes = self._rbuffer.tell()
|
staged_bytes = self._rbuffer.tell()
|
||||||
try:
|
try:
|
||||||
self._rbuffer.write(self._sock.recv(self._next_payload_bytes - staged_bytes))
|
self._rbuffer.write(self._sock.recv(self._next_payload_bytes - staged_bytes))
|
||||||
except socket.error as e:
|
except ConnectionError as e:
|
||||||
# Extremely small chance that we have exactly 4 bytes for a
|
# Extremely small chance that we have exactly 4 bytes for a
|
||||||
# header, but nothing to read in the body yet
|
# header, but nothing to read in the body yet
|
||||||
if e.errno == errno.EWOULDBLOCK:
|
if six.PY2 and e.errno == errno.EWOULDBLOCK:
|
||||||
return None
|
return None
|
||||||
log.exception('%s: Error in recv', self)
|
log.exception('%s: Error in recv', self)
|
||||||
self.close(error=Errors.ConnectionError(e))
|
self.close(error=Errors.ConnectionError(e))
|
||||||
return None
|
return None
|
||||||
|
except BlockingIOError:
|
||||||
|
if six.PY3:
|
||||||
|
return None
|
||||||
|
raise
|
||||||
|
|
||||||
staged_bytes = self._rbuffer.tell()
|
staged_bytes = self._rbuffer.tell()
|
||||||
if staged_bytes > self._next_payload_bytes:
|
if staged_bytes > self._next_payload_bytes:
|
||||||
@@ -379,7 +390,7 @@ class KafkaConnection(local):
|
|||||||
self.close()
|
self.close()
|
||||||
|
|
||||||
# And then raise
|
# And then raise
|
||||||
raise ConnectionError("Kafka @ {0}:{1} went away".format(self.host, self.port))
|
raise Errors.ConnectionError("Kafka @ {0}:{1} went away".format(self.host, self.port))
|
||||||
|
|
||||||
def _read_bytes(self, num_bytes):
|
def _read_bytes(self, num_bytes):
|
||||||
bytes_left = num_bytes
|
bytes_left = num_bytes
|
||||||
|
Reference in New Issue
Block a user