style: fix camelCase variable names
Conflicts: kafka/util.py
This commit is contained in:

committed by
David Arthur

parent
e392e0c201
commit
8b05e6240b
@@ -61,11 +61,11 @@ class KafkaClient(object):
|
||||
Discover brokers and metadata for a set of topics. This method will
|
||||
recurse in the event of a retry.
|
||||
"""
|
||||
requestId = self._next_id()
|
||||
request_id = self._next_id()
|
||||
request = KafkaProtocol.encode_metadata_request(self.client_id,
|
||||
requestId, topics)
|
||||
request_id, topics)
|
||||
|
||||
response = self._send_broker_unaware_request(requestId, request)
|
||||
response = self._send_broker_unaware_request(request_id, request)
|
||||
if response is None:
|
||||
raise Exception("All servers failed to process request")
|
||||
|
||||
|
@@ -79,11 +79,11 @@ class KafkaConnection(local):
|
||||
if sent != None:
|
||||
raise RuntimeError("Kafka went away")
|
||||
|
||||
def recv(self, requestId):
|
||||
def recv(self, request_id):
|
||||
"""
|
||||
Get a response from Kafka
|
||||
"""
|
||||
log.debug("Reading response %d from Kafka" % requestId)
|
||||
log.debug("Reading response %d from Kafka" % request_id)
|
||||
self.data = self._consume_response()
|
||||
return self.data
|
||||
|
||||
|
@@ -361,11 +361,11 @@ class KafkaProtocol(object):
|
||||
======
|
||||
data: bytes to decode
|
||||
"""
|
||||
((correlation_id, numBrokers), cur) = relative_unpack('>ii', data, 0)
|
||||
((correlation_id, numbrokers), cur) = relative_unpack('>ii', data, 0)
|
||||
|
||||
# Broker info
|
||||
brokers = {}
|
||||
for i in range(numBrokers):
|
||||
for i in range(numbrokers):
|
||||
((nodeId, ), cur) = relative_unpack('>i', data, cur)
|
||||
(host, cur) = read_short_string(data, cur)
|
||||
((port,), cur) = relative_unpack('>i', data, cur)
|
||||
@@ -373,31 +373,31 @@ class KafkaProtocol(object):
|
||||
|
||||
# Topic info
|
||||
((num_topics,), cur) = relative_unpack('>i', data, cur)
|
||||
topicMetadata = {}
|
||||
topic_metadata = {}
|
||||
|
||||
for i in range(num_topics):
|
||||
((topicError,), cur) = relative_unpack('>h', data, cur)
|
||||
(topicName, cur) = read_short_string(data, cur)
|
||||
((topic_error,), cur) = relative_unpack('>h', data, cur)
|
||||
(topic_name, cur) = read_short_string(data, cur)
|
||||
((num_partitions,), cur) = relative_unpack('>i', data, cur)
|
||||
partitionMetadata = {}
|
||||
partition_metadata = {}
|
||||
|
||||
for j in range(num_partitions):
|
||||
((partitionErrorCode, partition, leader, numReplicas), cur) = \
|
||||
((partition_error_code, partition, leader, numReplicas), cur) = \
|
||||
relative_unpack('>hiii', data, cur)
|
||||
|
||||
(replicas, cur) = relative_unpack('>%di' % numReplicas,
|
||||
data, cur)
|
||||
|
||||
((numIsr,), cur) = relative_unpack('>i', data, cur)
|
||||
(isr, cur) = relative_unpack('>%di' % numIsr, data, cur)
|
||||
((num_isr,), cur) = relative_unpack('>i', data, cur)
|
||||
(isr, cur) = relative_unpack('>%di' % num_isr, data, cur)
|
||||
|
||||
partitionMetadata[partition] = \
|
||||
PartitionMetadata(topicName, partition, leader,
|
||||
partition_metadata[partition] = \
|
||||
PartitionMetadata(topic_name, partition, leader,
|
||||
replicas, isr)
|
||||
|
||||
topicMetadata[topicName] = partitionMetadata
|
||||
topic_metadata[topic_name] = partition_metadata
|
||||
|
||||
return (brokers, topicMetadata)
|
||||
return (brokers, topic_metadata)
|
||||
|
||||
@classmethod
|
||||
def encode_offset_commit_request(cls, client_id, correlation_id,
|
||||
|
@@ -24,16 +24,16 @@ def read_short_string(data, cur):
|
||||
if len(data) < cur + 2:
|
||||
raise BufferUnderflowError("Not enough data left")
|
||||
|
||||
(strLen,) = struct.unpack('>h', data[cur:cur + 2])
|
||||
if strLen == -1:
|
||||
(strlen,) = struct.unpack('>h', data[cur:cur + 2])
|
||||
if strlen == -1:
|
||||
return (None, cur + 2)
|
||||
|
||||
cur += 2
|
||||
if len(data) < cur + strLen:
|
||||
if len(data) < cur + strlen:
|
||||
raise BufferUnderflowError("Not enough data left")
|
||||
|
||||
out = data[cur:cur + strLen]
|
||||
return (out, cur + strLen)
|
||||
out = data[cur:cur + strlen]
|
||||
return (out, cur + strlen)
|
||||
|
||||
|
||||
def read_int_string(data, cur):
|
||||
@@ -41,17 +41,16 @@ def read_int_string(data, cur):
|
||||
raise BufferUnderflowError(
|
||||
"Not enough data left to read string len (%d < %d)" % (len(data), cur + 4))
|
||||
|
||||
(strLen,) = struct.unpack('>i', data[cur:cur + 4])
|
||||
if strLen == -1:
|
||||
(strlen,) = struct.unpack('>i', data[cur:cur + 4])
|
||||
if strlen == -1:
|
||||
return (None, cur + 4)
|
||||
|
||||
cur += 4
|
||||
if len(data) < cur + strLen:
|
||||
raise BufferUnderflowError(
|
||||
"Not enough data left to read string (%d < %d)" % (len(data), cur + strLen))
|
||||
if len(data) < cur + strlen:
|
||||
raise BufferUnderflowError("Not enough data left")
|
||||
|
||||
out = data[cur:cur + strLen]
|
||||
return (out, cur + strLen)
|
||||
out = data[cur:cur + strlen]
|
||||
return (out, cur + strlen)
|
||||
|
||||
|
||||
def relative_unpack(fmt, data, cur):
|
||||
|
Reference in New Issue
Block a user