Refactor KafkaClient to use BrokerConnections and new Request/Response structs
This commit is contained in:
273
kafka/client.py
273
kafka/client.py
@@ -2,17 +2,20 @@ import collections
|
||||
import copy
|
||||
import functools
|
||||
import logging
|
||||
import random
|
||||
import select
|
||||
import time
|
||||
|
||||
import six
|
||||
|
||||
import kafka.common
|
||||
from kafka.common import (TopicAndPartition, BrokerMetadata,
|
||||
from kafka.common import (TopicAndPartition, BrokerMetadata, UnknownError,
|
||||
ConnectionError, FailedPayloadsError,
|
||||
KafkaTimeoutError, KafkaUnavailableError,
|
||||
LeaderNotAvailableError, UnknownTopicOrPartitionError,
|
||||
NotLeaderForPartitionError, ReplicaNotAvailableError)
|
||||
|
||||
from kafka.conn import collect_hosts, KafkaConnection, DEFAULT_SOCKET_TIMEOUT_SECONDS
|
||||
from kafka.conn import collect_hosts, BrokerConnection, DEFAULT_SOCKET_TIMEOUT_SECONDS
|
||||
from kafka.protocol import KafkaProtocol
|
||||
from kafka.util import kafka_bytestring
|
||||
|
||||
@@ -31,13 +34,12 @@ class KafkaClient(object):
|
||||
timeout=DEFAULT_SOCKET_TIMEOUT_SECONDS,
|
||||
correlation_id=0):
|
||||
# We need one connection to bootstrap
|
||||
self.client_id = kafka_bytestring(client_id)
|
||||
self.client_id = client_id
|
||||
self.timeout = timeout
|
||||
self.hosts = collect_hosts(hosts)
|
||||
self.correlation_id = correlation_id
|
||||
|
||||
# create connections only when we need them
|
||||
self.conns = {}
|
||||
self._conns = {}
|
||||
self.brokers = {} # broker_id -> BrokerMetadata
|
||||
self.topics_to_brokers = {} # TopicAndPartition -> BrokerMetadata
|
||||
self.topic_partitions = {} # topic -> partition -> PartitionMetadata
|
||||
@@ -52,14 +54,14 @@ class KafkaClient(object):
|
||||
def _get_conn(self, host, port):
|
||||
"""Get or create a connection to a broker using host and port"""
|
||||
host_key = (host, port)
|
||||
if host_key not in self.conns:
|
||||
self.conns[host_key] = KafkaConnection(
|
||||
host,
|
||||
port,
|
||||
timeout=self.timeout
|
||||
if host_key not in self._conns:
|
||||
self._conns[host_key] = BrokerConnection(
|
||||
host, port,
|
||||
timeout=self.timeout,
|
||||
client_id=self.client_id
|
||||
)
|
||||
|
||||
return self.conns[host_key]
|
||||
return self._conns[host_key]
|
||||
|
||||
def _get_leader_for_partition(self, topic, partition):
|
||||
"""
|
||||
@@ -91,12 +93,12 @@ class KafkaClient(object):
|
||||
raise UnknownTopicOrPartitionError(key)
|
||||
|
||||
# If there's no leader for the partition, raise
|
||||
meta = self.topic_partitions[topic][partition]
|
||||
if meta.leader == -1:
|
||||
raise LeaderNotAvailableError(meta)
|
||||
leader = self.topic_partitions[topic][partition]
|
||||
if leader == -1:
|
||||
raise LeaderNotAvailableError((topic, partition))
|
||||
|
||||
# Otherwise return the BrokerMetadata
|
||||
return self.brokers[meta.leader]
|
||||
return self.brokers[leader]
|
||||
|
||||
def _get_coordinator_for_group(self, group):
|
||||
"""
|
||||
@@ -129,27 +131,35 @@ class KafkaClient(object):
|
||||
Attempt to send a broker-agnostic request to one of the available
|
||||
brokers. Keep trying until you succeed.
|
||||
"""
|
||||
for (host, port) in self.hosts:
|
||||
requestId = self._next_id()
|
||||
log.debug('Request %s: %s', requestId, payloads)
|
||||
try:
|
||||
conn = self._get_conn(host, port)
|
||||
request = encoder_fn(client_id=self.client_id,
|
||||
correlation_id=requestId,
|
||||
payloads=payloads)
|
||||
hosts = set([(broker.host, broker.port) for broker in self.brokers.values()])
|
||||
hosts.update(self.hosts)
|
||||
hosts = list(hosts)
|
||||
random.shuffle(hosts)
|
||||
|
||||
conn.send(requestId, request)
|
||||
response = conn.recv(requestId)
|
||||
for (host, port) in hosts:
|
||||
conn = self._get_conn(host, port)
|
||||
request = encoder_fn(payloads=payloads)
|
||||
correlation_id = conn.send(request)
|
||||
if correlation_id is None:
|
||||
continue
|
||||
response = conn.recv()
|
||||
if response is not None:
|
||||
decoded = decoder_fn(response)
|
||||
log.debug('Response %s: %s', requestId, decoded)
|
||||
log.debug('Response %s: %s', correlation_id, decoded)
|
||||
return decoded
|
||||
|
||||
except Exception:
|
||||
log.exception('Error sending request [%s] to server %s:%s, '
|
||||
'trying next server', requestId, host, port)
|
||||
|
||||
raise KafkaUnavailableError('All servers failed to process request')
|
||||
|
||||
def _payloads_by_broker(self, payloads):
|
||||
payloads_by_broker = collections.defaultdict(list)
|
||||
for payload in payloads:
|
||||
try:
|
||||
leader = self._get_leader_for_partition(payload.topic, payload.partition)
|
||||
except KafkaUnavailableError:
|
||||
leader = None
|
||||
payloads_by_broker[leader].append(payload)
|
||||
return dict(payloads_by_broker)
|
||||
|
||||
def _send_broker_aware_request(self, payloads, encoder_fn, decoder_fn):
|
||||
"""
|
||||
Group a list of request payloads by topic+partition and send them to
|
||||
@@ -178,97 +188,76 @@ class KafkaClient(object):
|
||||
# so we need to keep this so we can rebuild order before returning
|
||||
original_ordering = [(p.topic, p.partition) for p in payloads]
|
||||
|
||||
# Group the requests by topic+partition
|
||||
brokers_for_payloads = []
|
||||
payloads_by_broker = collections.defaultdict(list)
|
||||
|
||||
responses = {}
|
||||
for payload in payloads:
|
||||
try:
|
||||
leader = self._get_leader_for_partition(payload.topic,
|
||||
payload.partition)
|
||||
payloads_by_broker[leader].append(payload)
|
||||
brokers_for_payloads.append(leader)
|
||||
except KafkaUnavailableError as e:
|
||||
log.warning('KafkaUnavailableError attempting to send request '
|
||||
'on topic %s partition %d', payload.topic, payload.partition)
|
||||
topic_partition = (payload.topic, payload.partition)
|
||||
responses[topic_partition] = FailedPayloadsError(payload)
|
||||
# Connection errors generally mean stale metadata
|
||||
# although sometimes it means incorrect api request
|
||||
# Unfortunately there is no good way to tell the difference
|
||||
# so we'll just reset metadata on all errors to be safe
|
||||
refresh_metadata = False
|
||||
|
||||
# For each broker, send the list of request payloads
|
||||
# and collect the responses and errors
|
||||
broker_failures = []
|
||||
payloads_by_broker = self._payloads_by_broker(payloads)
|
||||
responses = {}
|
||||
|
||||
# For each KafkaConnection keep the real socket so that we can use
|
||||
def failed_payloads(payloads):
|
||||
for payload in payloads:
|
||||
topic_partition = (str(payload.topic), payload.partition)
|
||||
responses[(topic_partition)] = FailedPayloadsError(payload)
|
||||
|
||||
# For each BrokerConnection keep the real socket so that we can use
|
||||
# a select to perform unblocking I/O
|
||||
connections_by_socket = {}
|
||||
for broker, payloads in payloads_by_broker.items():
|
||||
requestId = self._next_id()
|
||||
log.debug('Request %s to %s: %s', requestId, broker, payloads)
|
||||
request = encoder_fn(client_id=self.client_id,
|
||||
correlation_id=requestId, payloads=payloads)
|
||||
for broker, broker_payloads in six.iteritems(payloads_by_broker):
|
||||
if broker is None:
|
||||
failed_payloads(broker_payloads)
|
||||
continue
|
||||
|
||||
# Send the request, recv the response
|
||||
try:
|
||||
conn = self._get_conn(broker.host.decode('utf-8'), broker.port)
|
||||
conn.send(requestId, request)
|
||||
conn = self._get_conn(broker.host.decode('utf-8'), broker.port)
|
||||
request = encoder_fn(payloads=broker_payloads)
|
||||
# decoder_fn=None signal that the server is expected to not
|
||||
# send a response. This probably only applies to
|
||||
# ProduceRequest w/ acks = 0
|
||||
expect_response = (decoder_fn is not None)
|
||||
correlation_id = conn.send(request, expect_response=expect_response)
|
||||
|
||||
except ConnectionError as e:
|
||||
broker_failures.append(broker)
|
||||
log.warning('ConnectionError attempting to send request %s '
|
||||
'to server %s: %s', requestId, broker, e)
|
||||
if correlation_id is None:
|
||||
refresh_metadata = True
|
||||
failed_payloads(broker_payloads)
|
||||
log.warning('Error attempting to send request %s '
|
||||
'to server %s', correlation_id, broker)
|
||||
continue
|
||||
|
||||
for payload in payloads:
|
||||
topic_partition = (payload.topic, payload.partition)
|
||||
responses[topic_partition] = FailedPayloadsError(payload)
|
||||
if not expect_response:
|
||||
log.debug('Request %s does not expect a response '
|
||||
'(skipping conn.recv)', correlation_id)
|
||||
for payload in broker_payloads:
|
||||
topic_partition = (str(payload.topic), payload.partition)
|
||||
responses[topic_partition] = None
|
||||
continue
|
||||
|
||||
# No exception, try to get response
|
||||
else:
|
||||
|
||||
# decoder_fn=None signal that the server is expected to not
|
||||
# send a response. This probably only applies to
|
||||
# ProduceRequest w/ acks = 0
|
||||
if decoder_fn is None:
|
||||
log.debug('Request %s does not expect a response '
|
||||
'(skipping conn.recv)', requestId)
|
||||
for payload in payloads:
|
||||
topic_partition = (payload.topic, payload.partition)
|
||||
responses[topic_partition] = None
|
||||
continue
|
||||
else:
|
||||
connections_by_socket[conn.get_connected_socket()] = (conn, broker, requestId)
|
||||
connections_by_socket[conn._read_fd] = (conn, broker)
|
||||
|
||||
conn = None
|
||||
while connections_by_socket:
|
||||
sockets = connections_by_socket.keys()
|
||||
rlist, _, _ = select.select(sockets, [], [], None)
|
||||
conn, broker, requestId = connections_by_socket.pop(rlist[0])
|
||||
try:
|
||||
response = conn.recv(requestId)
|
||||
except ConnectionError as e:
|
||||
broker_failures.append(broker)
|
||||
log.warning('ConnectionError attempting to receive a '
|
||||
'response to request %s from server %s: %s',
|
||||
requestId, broker, e)
|
||||
conn, broker = connections_by_socket.pop(rlist[0])
|
||||
correlation_id = conn.next_correlation_id_recv()
|
||||
response = conn.recv()
|
||||
if response is None:
|
||||
refresh_metadata = True
|
||||
failed_payloads(payloads_by_broker[broker])
|
||||
log.warning('Error receiving response to request %s '
|
||||
'from server %s', correlation_id, broker)
|
||||
continue
|
||||
|
||||
for payload in payloads_by_broker[broker]:
|
||||
topic_partition = (payload.topic, payload.partition)
|
||||
responses[topic_partition] = FailedPayloadsError(payload)
|
||||
log.debug('Response %s: %s', correlation_id, response)
|
||||
for payload_response in decoder_fn(response):
|
||||
topic_partition = (str(payload_response.topic),
|
||||
payload_response.partition)
|
||||
responses[topic_partition] = payload_response
|
||||
|
||||
else:
|
||||
_resps = []
|
||||
for payload_response in decoder_fn(response):
|
||||
topic_partition = (payload_response.topic,
|
||||
payload_response.partition)
|
||||
responses[topic_partition] = payload_response
|
||||
_resps.append(payload_response)
|
||||
log.debug('Response %s: %s', requestId, _resps)
|
||||
|
||||
# Connection errors generally mean stale metadata
|
||||
# although sometimes it means incorrect api request
|
||||
# Unfortunately there is no good way to tell the difference
|
||||
# so we'll just reset metadata on all errors to be safe
|
||||
if broker_failures:
|
||||
if refresh_metadata:
|
||||
self.reset_all_metadata()
|
||||
|
||||
# Return responses in the same order as provided
|
||||
@@ -387,7 +376,7 @@ class KafkaClient(object):
|
||||
# Public API #
|
||||
#################
|
||||
def close(self):
|
||||
for conn in self.conns.values():
|
||||
for conn in self._conns.values():
|
||||
conn.close()
|
||||
|
||||
def copy(self):
|
||||
@@ -398,13 +387,14 @@ class KafkaClient(object):
|
||||
Note that the copied connections are not initialized, so reinit() must
|
||||
be called on the returned copy.
|
||||
"""
|
||||
_conns = self._conns
|
||||
self._conns = {}
|
||||
c = copy.deepcopy(self)
|
||||
for key in c.conns:
|
||||
c.conns[key] = self.conns[key].copy()
|
||||
self._conns = _conns
|
||||
return c
|
||||
|
||||
def reinit(self):
|
||||
for conn in self.conns.values():
|
||||
for conn in self._conns.values():
|
||||
conn.reinit()
|
||||
|
||||
def reset_topic_metadata(self, *topics):
|
||||
@@ -480,11 +470,8 @@ class KafkaClient(object):
|
||||
Partition-level errors will also not be raised here
|
||||
(a single partition w/o a leader, for example)
|
||||
"""
|
||||
topics = [kafka_bytestring(t) for t in topics]
|
||||
|
||||
if topics:
|
||||
for topic in topics:
|
||||
self.reset_topic_metadata(topic)
|
||||
self.reset_topic_metadata(*topics)
|
||||
else:
|
||||
self.reset_all_metadata()
|
||||
|
||||
@@ -493,50 +480,46 @@ class KafkaClient(object):
|
||||
log.debug('Updating broker metadata: %s', resp.brokers)
|
||||
log.debug('Updating topic metadata: %s', resp.topics)
|
||||
|
||||
self.brokers = dict([(broker.nodeId, broker)
|
||||
for broker in resp.brokers])
|
||||
|
||||
for topic_metadata in resp.topics:
|
||||
topic = topic_metadata.topic
|
||||
partitions = topic_metadata.partitions
|
||||
self.brokers = dict([(nodeId, BrokerMetadata(nodeId, host, port))
|
||||
for nodeId, host, port in resp.brokers])
|
||||
|
||||
for error, topic, partitions in resp.topics:
|
||||
# Errors expected for new topics
|
||||
try:
|
||||
kafka.common.check_error(topic_metadata)
|
||||
except (UnknownTopicOrPartitionError, LeaderNotAvailableError) as e:
|
||||
|
||||
# Raise if the topic was passed in explicitly
|
||||
if topic in topics:
|
||||
raise
|
||||
|
||||
# Otherwise, just log a warning
|
||||
log.error('Error loading topic metadata for %s: %s', topic, type(e))
|
||||
continue
|
||||
if error:
|
||||
error_type = kafka.common.kafka_errors.get(error, UnknownError)
|
||||
if error_type in (UnknownTopicOrPartitionError, LeaderNotAvailableError):
|
||||
log.error('Error loading topic metadata for %s: %s (%s)',
|
||||
topic, error_type, error)
|
||||
if topic not in topics:
|
||||
continue
|
||||
raise error_type(topic)
|
||||
|
||||
self.topic_partitions[topic] = {}
|
||||
for partition_metadata in partitions:
|
||||
partition = partition_metadata.partition
|
||||
leader = partition_metadata.leader
|
||||
for error, partition, leader, _, _ in partitions:
|
||||
|
||||
self.topic_partitions[topic][partition] = partition_metadata
|
||||
self.topic_partitions[topic][partition] = leader
|
||||
|
||||
# Populate topics_to_brokers dict
|
||||
topic_part = TopicAndPartition(topic, partition)
|
||||
|
||||
# Check for partition errors
|
||||
try:
|
||||
kafka.common.check_error(partition_metadata)
|
||||
if error:
|
||||
error_type = kafka.common.kafka_errors.get(error, UnknownError)
|
||||
|
||||
# If No Leader, topics_to_brokers topic_partition -> None
|
||||
except LeaderNotAvailableError:
|
||||
log.error('No leader for topic %s partition %d', topic, partition)
|
||||
self.topics_to_brokers[topic_part] = None
|
||||
continue
|
||||
# If one of the replicas is unavailable -- ignore
|
||||
# this error code is provided for admin purposes only
|
||||
# we never talk to replicas, only the leader
|
||||
except ReplicaNotAvailableError:
|
||||
log.debug('Some (non-leader) replicas not available for topic %s partition %d', topic, partition)
|
||||
# If No Leader, topics_to_brokers topic_partition -> None
|
||||
if error_type is LeaderNotAvailableError:
|
||||
log.error('No leader for topic %s partition %d', topic, partition)
|
||||
self.topics_to_brokers[topic_part] = None
|
||||
continue
|
||||
|
||||
# If one of the replicas is unavailable -- ignore
|
||||
# this error code is provided for admin purposes only
|
||||
# we never talk to replicas, only the leader
|
||||
elif error_type is ReplicaNotAvailableError:
|
||||
log.debug('Some (non-leader) replicas not available for topic %s partition %d', topic, partition)
|
||||
|
||||
else:
|
||||
raise error_type(topic_part)
|
||||
|
||||
# If Known Broker, topic_partition -> BrokerMetadata
|
||||
if leader in self.brokers:
|
||||
|
||||
Reference in New Issue
Block a user