Rollover KafkaClient correlation ids at 2**31 to keep within int32 protocol encoding

This commit is contained in:
Dana Powers
2015-03-29 16:45:20 -07:00
parent fd204dca17
commit a4b439141b

View File

@@ -2,7 +2,6 @@ import binascii
import collections import collections
import copy import copy
import functools import functools
import itertools
import logging import logging
import time import time
import kafka.common import kafka.common
@@ -23,17 +22,18 @@ log = logging.getLogger("kafka")
class KafkaClient(object): class KafkaClient(object):
CLIENT_ID = b"kafka-python" CLIENT_ID = b"kafka-python"
ID_GEN = itertools.count()
# NOTE: The timeout given to the client should always be greater than the # NOTE: The timeout given to the client should always be greater than the
# one passed to SimpleConsumer.get_message(), otherwise you can get a # one passed to SimpleConsumer.get_message(), otherwise you can get a
# socket timeout. # socket timeout.
def __init__(self, hosts, client_id=CLIENT_ID, def __init__(self, hosts, client_id=CLIENT_ID,
timeout=DEFAULT_SOCKET_TIMEOUT_SECONDS): timeout=DEFAULT_SOCKET_TIMEOUT_SECONDS,
correlation_id=0):
# We need one connection to bootstrap # We need one connection to bootstrap
self.client_id = kafka_bytestring(client_id) self.client_id = kafka_bytestring(client_id)
self.timeout = timeout self.timeout = timeout
self.hosts = collect_hosts(hosts) self.hosts = collect_hosts(hosts)
self.correlation_id = correlation_id
# create connections only when we need them # create connections only when we need them
self.conns = {} self.conns = {}
@@ -98,10 +98,10 @@ class KafkaClient(object):
return self.brokers[meta.leader] return self.brokers[meta.leader]
def _next_id(self): def _next_id(self):
""" """Generate a new correlation id"""
Generate a new correlation id # modulo to keep w/i int32
""" self.correlation_id = (self.correlation_id + 1) % 2**31
return next(KafkaClient.ID_GEN) return self.correlation_id
def _send_broker_unaware_request(self, payloads, encoder_fn, decoder_fn): def _send_broker_unaware_request(self, payloads, encoder_fn, decoder_fn):
""" """