Update BrokerConnection for use with async client
- use descriptive names for ConnectionStates enum values - Change default send_buffer_bytes config to 131072 - add can_send_more() and max_in_flight_requests_per_connection config - add blacked_out() and reconnect_backoff_ms config - last_attempt and last_failure are now public attributes - raise TooManyInFlightRequests in conn.send() if cant send more
This commit is contained in:
@@ -109,6 +109,10 @@ class CorrelationIdError(KafkaError):
|
|||||||
pass
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
class TooManyInFlightRequests(KafkaError):
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
class BrokerResponseError(KafkaError):
|
class BrokerResponseError(KafkaError):
|
||||||
errno = None
|
errno = None
|
||||||
message = None
|
message = None
|
||||||
|
113
kafka/conn.py
113
kafka/conn.py
@@ -26,9 +26,9 @@ DEFAULT_KAFKA_PORT = 9092
|
|||||||
|
|
||||||
|
|
||||||
class ConnectionStates(object):
|
class ConnectionStates(object):
|
||||||
DISCONNECTED = 1
|
DISCONNECTED = '<disconnected>'
|
||||||
CONNECTING = 2
|
CONNECTING = '<connecting>'
|
||||||
CONNECTED = 3
|
CONNECTED = '<connected>'
|
||||||
|
|
||||||
|
|
||||||
InFlightRequest = collections.namedtuple('InFlightRequest',
|
InFlightRequest = collections.namedtuple('InFlightRequest',
|
||||||
@@ -37,10 +37,12 @@ InFlightRequest = collections.namedtuple('InFlightRequest',
|
|||||||
|
|
||||||
class BrokerConnection(object):
|
class BrokerConnection(object):
|
||||||
_receive_buffer_bytes = 32768
|
_receive_buffer_bytes = 32768
|
||||||
_send_buffer_bytes = 32768
|
_send_buffer_bytes = 131072
|
||||||
_client_id = 'kafka-python-0.10.0'
|
_client_id = 'kafka-python-0.10.0'
|
||||||
_correlation_id = 0
|
_correlation_id = 0
|
||||||
_request_timeout_ms = 40000
|
_request_timeout_ms = 40000
|
||||||
|
_max_in_flight_requests_per_connection = 5
|
||||||
|
_reconnect_backoff_ms = 50
|
||||||
|
|
||||||
def __init__(self, host, port, **kwargs):
|
def __init__(self, host, port, **kwargs):
|
||||||
self.host = host
|
self.host = host
|
||||||
@@ -48,7 +50,9 @@ class BrokerConnection(object):
|
|||||||
self.in_flight_requests = collections.deque()
|
self.in_flight_requests = collections.deque()
|
||||||
|
|
||||||
for config in ('receive_buffer_bytes', 'send_buffer_bytes',
|
for config in ('receive_buffer_bytes', 'send_buffer_bytes',
|
||||||
'client_id', 'correlation_id', 'request_timeout_ms'):
|
'client_id', 'correlation_id', 'request_timeout_ms',
|
||||||
|
'max_in_flight_requests_per_connection',
|
||||||
|
'reconnect_backoff_ms'):
|
||||||
if config in kwargs:
|
if config in kwargs:
|
||||||
setattr(self, '_' + config, kwargs.pop(config))
|
setattr(self, '_' + config, kwargs.pop(config))
|
||||||
|
|
||||||
@@ -57,8 +61,9 @@ class BrokerConnection(object):
|
|||||||
self._rbuffer = io.BytesIO()
|
self._rbuffer = io.BytesIO()
|
||||||
self._receiving = False
|
self._receiving = False
|
||||||
self._next_payload_bytes = 0
|
self._next_payload_bytes = 0
|
||||||
self._last_connection_attempt = None
|
self.last_attempt = 0
|
||||||
self._last_connection_failure = None
|
self.last_failure = 0
|
||||||
|
self._processing = False
|
||||||
|
|
||||||
def connect(self):
|
def connect(self):
|
||||||
"""Attempt to connect and return ConnectionState"""
|
"""Attempt to connect and return ConnectionState"""
|
||||||
@@ -69,34 +74,47 @@ class BrokerConnection(object):
|
|||||||
self._sock.setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF, self._send_buffer_bytes)
|
self._sock.setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF, self._send_buffer_bytes)
|
||||||
self._sock.setblocking(False)
|
self._sock.setblocking(False)
|
||||||
ret = self._sock.connect_ex((self.host, self.port))
|
ret = self._sock.connect_ex((self.host, self.port))
|
||||||
self._last_connection_attempt = time.time()
|
self.last_attempt = time.time()
|
||||||
|
|
||||||
if not ret or ret is errno.EISCONN:
|
if not ret or ret is errno.EISCONN:
|
||||||
self.state = ConnectionStates.CONNECTED
|
self.state = ConnectionStates.CONNECTED
|
||||||
elif ret in (errno.EINPROGRESS, errno.EALREADY):
|
elif ret in (errno.EINPROGRESS, errno.EALREADY):
|
||||||
self.state = ConnectionStates.CONNECTING
|
self.state = ConnectionStates.CONNECTING
|
||||||
else:
|
else:
|
||||||
log.error('Connect attempt returned error %s. Disconnecting.', ret)
|
log.error('Connect attempt to %s returned error %s.'
|
||||||
|
' Disconnecting.', self, ret)
|
||||||
self.close()
|
self.close()
|
||||||
self._last_connection_failure = time.time()
|
self.last_failure = time.time()
|
||||||
|
|
||||||
if self.state is ConnectionStates.CONNECTING:
|
if self.state is ConnectionStates.CONNECTING:
|
||||||
# in non-blocking mode, use repeated calls to socket.connect_ex
|
# in non-blocking mode, use repeated calls to socket.connect_ex
|
||||||
# to check connection status
|
# to check connection status
|
||||||
if time.time() > (self._request_timeout_ms / 1000.0) + self._last_connection_attempt:
|
if time.time() > (self._request_timeout_ms / 1000.0) + self.last_attempt:
|
||||||
log.error('Connection attempt timed out')
|
log.error('Connection attempt to %s timed out', self)
|
||||||
self.close() # error=TimeoutError ?
|
self.close() # error=TimeoutError ?
|
||||||
self._last_connection_failure = time.time()
|
self.last_failure = time.time()
|
||||||
|
|
||||||
ret = self._sock.connect_ex((self.host, self.port))
|
ret = self._sock.connect_ex((self.host, self.port))
|
||||||
if not ret or ret is errno.EISCONN:
|
if not ret or ret is errno.EISCONN:
|
||||||
self.state = ConnectionStates.CONNECTED
|
self.state = ConnectionStates.CONNECTED
|
||||||
elif ret is not errno.EALREADY:
|
elif ret is not errno.EALREADY:
|
||||||
log.error('Connect attempt returned error %s. Disconnecting.', ret)
|
log.error('Connect attempt to %s returned error %s.'
|
||||||
|
' Disconnecting.', self, ret)
|
||||||
self.close()
|
self.close()
|
||||||
self._last_connection_failure = time.time()
|
self.last_failure = time.time()
|
||||||
return self.state
|
return self.state
|
||||||
|
|
||||||
|
def blacked_out(self):
|
||||||
|
"""
|
||||||
|
Return true if we are disconnected from the given node and can't
|
||||||
|
re-establish a connection yet
|
||||||
|
"""
|
||||||
|
if self.state is ConnectionStates.DISCONNECTED:
|
||||||
|
now = time.time()
|
||||||
|
if now - self.last_attempt < self._reconnect_backoff_ms / 1000.0:
|
||||||
|
return True
|
||||||
|
return False
|
||||||
|
|
||||||
def connected(self):
|
def connected(self):
|
||||||
return self.state is ConnectionStates.CONNECTED
|
return self.state is ConnectionStates.CONNECTED
|
||||||
|
|
||||||
@@ -105,17 +123,15 @@ class BrokerConnection(object):
|
|||||||
self._sock.close()
|
self._sock.close()
|
||||||
self._sock = None
|
self._sock = None
|
||||||
self.state = ConnectionStates.DISCONNECTED
|
self.state = ConnectionStates.DISCONNECTED
|
||||||
|
self._receiving = False
|
||||||
|
self._next_payload_bytes = 0
|
||||||
|
self._rbuffer.seek(0)
|
||||||
|
self._rbuffer.truncate()
|
||||||
if error is None:
|
if error is None:
|
||||||
error = Errors.DisconnectError()
|
error = Errors.DisconnectError()
|
||||||
while self.in_flight_requests:
|
while self.in_flight_requests:
|
||||||
ifr = self.in_flight_requests.popleft()
|
ifr = self.in_flight_requests.popleft()
|
||||||
ifr.future.failure(error)
|
ifr.future.failure(error)
|
||||||
self.in_flight_requests.clear()
|
|
||||||
self._receiving = False
|
|
||||||
self._next_payload_bytes = 0
|
|
||||||
self._rbuffer.seek(0)
|
|
||||||
self._rbuffer.truncate()
|
|
||||||
|
|
||||||
def send(self, request, expect_response=True):
|
def send(self, request, expect_response=True):
|
||||||
"""send request, return Future()
|
"""send request, return Future()
|
||||||
@@ -125,6 +141,8 @@ class BrokerConnection(object):
|
|||||||
future = Future()
|
future = Future()
|
||||||
if not self.connected():
|
if not self.connected():
|
||||||
return future.failure(Errors.DisconnectError())
|
return future.failure(Errors.DisconnectError())
|
||||||
|
if not self.can_send_more():
|
||||||
|
return future.failure(Errors.TooManyInFlightRequests())
|
||||||
self._correlation_id += 1
|
self._correlation_id += 1
|
||||||
header = RequestHeader(request,
|
header = RequestHeader(request,
|
||||||
correlation_id=self._correlation_id,
|
correlation_id=self._correlation_id,
|
||||||
@@ -142,10 +160,10 @@ class BrokerConnection(object):
|
|||||||
assert sent_bytes == len(message)
|
assert sent_bytes == len(message)
|
||||||
self._sock.setblocking(False)
|
self._sock.setblocking(False)
|
||||||
except (AssertionError, socket.error) as e:
|
except (AssertionError, socket.error) as e:
|
||||||
log.debug("Error in BrokerConnection.send(): %s", request)
|
log.exception("Error sending %s to %s", request, self)
|
||||||
self.close(error=e)
|
self.close(error=e)
|
||||||
return future.failure(e)
|
return future.failure(e)
|
||||||
log.debug('Request %d: %s', self._correlation_id, request)
|
log.debug('%s Request %d: %s', self, self._correlation_id, request)
|
||||||
|
|
||||||
if expect_response:
|
if expect_response:
|
||||||
ifr = InFlightRequest(request=request,
|
ifr = InFlightRequest(request=request,
|
||||||
@@ -159,24 +177,35 @@ class BrokerConnection(object):
|
|||||||
|
|
||||||
return future
|
return future
|
||||||
|
|
||||||
|
def can_send_more(self):
|
||||||
|
return len(self.in_flight_requests) < self._max_in_flight_requests_per_connection
|
||||||
|
|
||||||
def recv(self, timeout=0):
|
def recv(self, timeout=0):
|
||||||
"""Non-blocking network receive
|
"""Non-blocking network receive
|
||||||
|
|
||||||
Return response if available
|
Return response if available
|
||||||
"""
|
"""
|
||||||
|
if self._processing:
|
||||||
|
raise Errors.IllegalStateError('Recursive connection processing'
|
||||||
|
' not supported')
|
||||||
if not self.connected():
|
if not self.connected():
|
||||||
log.warning('Cannot recv: socket not connected')
|
log.warning('%s cannot recv: socket not connected', self)
|
||||||
# If requests are pending, we should close the socket and
|
# If requests are pending, we should close the socket and
|
||||||
# fail all the pending request futures
|
# fail all the pending request futures
|
||||||
if self.in_flight_requests:
|
if self.in_flight_requests:
|
||||||
self.close()
|
self.close()
|
||||||
return None
|
return None
|
||||||
|
|
||||||
if not self.in_flight_requests:
|
elif not self.in_flight_requests:
|
||||||
log.warning('No in-flight-requests to recv')
|
log.warning('%s: No in-flight-requests to recv', self)
|
||||||
return None
|
return None
|
||||||
|
|
||||||
self._fail_timed_out_requests()
|
elif self._requests_timed_out():
|
||||||
|
log.warning('%s timed out after %s ms. Closing connection.',
|
||||||
|
self, self._request_timeout_ms)
|
||||||
|
self.close(error=Errors.RequestTimedOutError(
|
||||||
|
'Request timed out after %s ms' % self._request_timeout_ms))
|
||||||
|
return None
|
||||||
|
|
||||||
readable, _, _ = select([self._sock], [], [], timeout)
|
readable, _, _ = select([self._sock], [], [], timeout)
|
||||||
if not readable:
|
if not readable:
|
||||||
@@ -193,7 +222,8 @@ class BrokerConnection(object):
|
|||||||
# This shouldn't happen after selecting above
|
# This shouldn't happen after selecting above
|
||||||
# but just in case
|
# but just in case
|
||||||
return None
|
return None
|
||||||
log.exception("Error receiving 4-byte payload header - closing socket")
|
log.exception('%s: Error receiving 4-byte payload header -'
|
||||||
|
' closing socket', self)
|
||||||
self.close(error=e)
|
self.close(error=e)
|
||||||
return None
|
return None
|
||||||
|
|
||||||
@@ -216,7 +246,7 @@ class BrokerConnection(object):
|
|||||||
# header, but nothing to read in the body yet
|
# header, but nothing to read in the body yet
|
||||||
if e.errno == errno.EWOULDBLOCK:
|
if e.errno == errno.EWOULDBLOCK:
|
||||||
return None
|
return None
|
||||||
log.exception()
|
log.exception('%s: Error in recv', self)
|
||||||
self.close(error=e)
|
self.close(error=e)
|
||||||
return None
|
return None
|
||||||
|
|
||||||
@@ -236,6 +266,11 @@ class BrokerConnection(object):
|
|||||||
return response
|
return response
|
||||||
|
|
||||||
def _process_response(self, read_buffer):
|
def _process_response(self, read_buffer):
|
||||||
|
if self._processing:
|
||||||
|
raise Errors.IllegalStateError('Recursive connection processing'
|
||||||
|
' not supported')
|
||||||
|
else:
|
||||||
|
self._processing = True
|
||||||
ifr = self.in_flight_requests.popleft()
|
ifr = self.in_flight_requests.popleft()
|
||||||
|
|
||||||
# verify send/recv correlation ids match
|
# verify send/recv correlation ids match
|
||||||
@@ -246,23 +281,23 @@ class BrokerConnection(object):
|
|||||||
% (ifr.correlation_id, recv_correlation_id))
|
% (ifr.correlation_id, recv_correlation_id))
|
||||||
ifr.future.fail(error)
|
ifr.future.fail(error)
|
||||||
self.close()
|
self.close()
|
||||||
|
self._processing = False
|
||||||
return None
|
return None
|
||||||
|
|
||||||
# decode response
|
# decode response
|
||||||
response = ifr.response_type.decode(read_buffer)
|
response = ifr.response_type.decode(read_buffer)
|
||||||
|
log.debug('%s Response %d: %s', self, ifr.correlation_id, response)
|
||||||
ifr.future.success(response)
|
ifr.future.success(response)
|
||||||
log.debug('Response %d: %s', ifr.correlation_id, response)
|
self._processing = False
|
||||||
return response
|
return response
|
||||||
|
|
||||||
def _fail_timed_out_requests(self):
|
def _requests_timed_out(self):
|
||||||
now = time.time()
|
if self.in_flight_requests:
|
||||||
while self.in_flight_requests:
|
oldest_at = self.in_flight_requests[0].timestamp
|
||||||
next_timeout = self.in_flight_requests[0].timestamp + (self._request_timeout_ms / 1000.0)
|
timeout = self._request_timeout_ms / 1000.0
|
||||||
if now < next_timeout:
|
if time.time() >= oldest_at + timeout:
|
||||||
break
|
return True
|
||||||
timed_out = self.in_flight_requests.popleft()
|
return False
|
||||||
error = Errors.RequestTimedOutError('Request timed out after %s ms' % self._request_timeout_ms)
|
|
||||||
timed_out.future.failure(error)
|
|
||||||
|
|
||||||
def __repr__(self):
|
def __repr__(self):
|
||||||
return "<BrokerConnection host=%s port=%d>" % (self.host, self.port)
|
return "<BrokerConnection host=%s port=%d>" % (self.host, self.port)
|
||||||
|
Reference in New Issue
Block a user