flake8 pass (pep8 and pyflakes)

This commit is contained in:
mrtheb
2013-10-03 22:52:04 -04:00
parent b0cacc9485
commit a03f0c86b8
6 changed files with 97 additions and 80 deletions

View File

@@ -1,14 +1,9 @@
import base64
from collections import defaultdict from collections import defaultdict
from functools import partial from functools import partial
from itertools import count, cycle
import logging import logging
from operator import attrgetter
import struct
import time import time
import zlib
from kafka.common import * from kafka.common import count, ErrorMapping, TopicAndPartition
from kafka.conn import KafkaConnection from kafka.conn import KafkaConnection
from kafka.protocol import KafkaProtocol from kafka.protocol import KafkaProtocol
@@ -212,8 +207,10 @@ class KafkaClient(object):
order of input payloads order of input payloads
""" """
encoder = partial(KafkaProtocol.encode_produce_request, encoder = partial(
acks=acks, timeout=timeout) KafkaProtocol.encode_produce_request,
acks=acks,
timeout=timeout)
if acks == 0: if acks == 0:
decoder = None decoder = None
@@ -226,10 +223,10 @@ class KafkaClient(object):
for resp in resps: for resp in resps:
# Check for errors # Check for errors
if fail_on_error is True and resp.error != ErrorMapping.NO_ERROR: if fail_on_error is True and resp.error != ErrorMapping.NO_ERROR:
raise Exception("ProduceRequest for %s failed with " raise Exception(
"errorcode=%d" % ( "ProduceRequest for %s failed with errorcode=%d" %
TopicAndPartition(resp.topic, resp.partition), (TopicAndPartition(resp.topic, resp.partition),
resp.error)) resp.error))
# Run the callback # Run the callback
if callback is not None: if callback is not None:
@@ -251,17 +248,18 @@ class KafkaClient(object):
max_wait_time=max_wait_time, max_wait_time=max_wait_time,
min_bytes=min_bytes) min_bytes=min_bytes)
resps = self._send_broker_aware_request(payloads, encoder, resps = self._send_broker_aware_request(
KafkaProtocol.decode_fetch_response) payloads, encoder,
KafkaProtocol.decode_fetch_response)
out = [] out = []
for resp in resps: for resp in resps:
# Check for errors # Check for errors
if fail_on_error is True and resp.error != ErrorMapping.NO_ERROR: if fail_on_error is True and resp.error != ErrorMapping.NO_ERROR:
raise Exception("FetchRequest for %s failed with " raise Exception(
"errorcode=%d" % ( "FetchRequest for %s failed with errorcode=%d" %
TopicAndPartition(resp.topic, resp.partition), (TopicAndPartition(resp.topic, resp.partition),
resp.error)) resp.error))
# Run the callback # Run the callback
if callback is not None: if callback is not None:
@@ -272,9 +270,10 @@ class KafkaClient(object):
def send_offset_request(self, payloads=[], fail_on_error=True, def send_offset_request(self, payloads=[], fail_on_error=True,
callback=None): callback=None):
resps = self._send_broker_aware_request(payloads, resps = self._send_broker_aware_request(
KafkaProtocol.encode_offset_request, payloads,
KafkaProtocol.decode_offset_response) KafkaProtocol.encode_offset_request,
KafkaProtocol.decode_offset_response)
out = [] out = []
for resp in resps: for resp in resps:

View File

@@ -3,6 +3,8 @@ import socket
import struct import struct
from threading import local from threading import local
from kafka.common import BufferUnderflowError
log = logging.getLogger("kafka") log = logging.getLogger("kafka")
@@ -12,7 +14,7 @@ class KafkaConnection(local):
A socket connection to a single Kafka broker A socket connection to a single Kafka broker
This class is _not_ thread safe. Each call to `send` must be followed This class is _not_ thread safe. Each call to `send` must be followed
by a call to `recv` in order to get the correct response. Eventually, by a call to `recv` in order to get the correct response. Eventually,
we can do something in here to facilitate multiplexed requests/responses we can do something in here to facilitate multiplexed requests/responses
since the Kafka API includes a correlation id. since the Kafka API includes a correlation id.
""" """
@@ -43,7 +45,7 @@ class KafkaConnection(local):
def _consume_response_iter(self): def _consume_response_iter(self):
""" """
This method handles the response header and error messages. It This method handles the response header and error messages. It
then returns an iterator for the chunks of the response then returns an iterator for the chunks of the response
""" """
log.debug("Handling response from Kafka") log.debug("Handling response from Kafka")
@@ -57,13 +59,15 @@ class KafkaConnection(local):
messagesize = size - 4 messagesize = size - 4
log.debug("About to read %d bytes from Kafka", messagesize) log.debug("About to read %d bytes from Kafka", messagesize)
# Read the remainder of the response # Read the remainder of the response
total = 0 total = 0
while total < messagesize: while total < messagesize:
resp = self._sock.recv(self.bufsize) resp = self._sock.recv(self.bufsize)
log.debug("Read %d bytes from Kafka", len(resp)) log.debug("Read %d bytes from Kafka", len(resp))
if resp == "": if resp == "":
raise BufferUnderflowError("Not enough data to read this response") raise BufferUnderflowError(
"Not enough data to read this response")
total += len(resp) total += len(resp)
yield resp yield resp
@@ -75,9 +79,13 @@ class KafkaConnection(local):
def send(self, request_id, payload): def send(self, request_id, payload):
"Send a request to Kafka" "Send a request to Kafka"
log.debug("About to send %d bytes to Kafka, request %d" % (len(payload), request_id))
log.debug(
"About to send %d bytes to Kafka, request %d" %
(len(payload), request_id))
sent = self._sock.sendall(payload) sent = self._sock.sendall(payload)
if sent != None: if sent is not None:
raise RuntimeError("Kafka went away") raise RuntimeError("Kafka went away")
def recv(self, request_id): def recv(self, request_id):

View File

@@ -8,7 +8,7 @@ from Queue import Empty
from kafka.common import ( from kafka.common import (
ErrorMapping, FetchRequest, ErrorMapping, FetchRequest,
OffsetRequest, OffsetFetchRequest, OffsetCommitRequest, OffsetRequest, OffsetCommitRequest,
ConsumerFetchSizeTooSmall, ConsumerNoMoreData ConsumerFetchSizeTooSmall, ConsumerNoMoreData
) )
@@ -223,11 +223,12 @@ class SimpleConsumer(Consumer):
self.fetch_min_bytes = fetch_size_bytes self.fetch_min_bytes = fetch_size_bytes
self.fetch_started = defaultdict(bool) # defaults to false self.fetch_started = defaultdict(bool) # defaults to false
super(SimpleConsumer, self).__init__(client, group, topic, super(SimpleConsumer, self).__init__(
partitions=partitions, client, group, topic,
auto_commit=auto_commit, partitions=partitions,
auto_commit_every_n=auto_commit_every_n, auto_commit=auto_commit,
auto_commit_every_t=auto_commit_every_t) auto_commit_every_n=auto_commit_every_n,
auto_commit_every_t=auto_commit_every_t)
def provide_partition_info(self): def provide_partition_info(self):
""" """
@@ -275,8 +276,8 @@ class SimpleConsumer(Consumer):
resps = self.client.send_offset_request(reqs) resps = self.client.send_offset_request(reqs)
for resp in resps: for resp in resps:
self.offsets[resp.partition] = resp.offsets[0] + \ self.offsets[resp.partition] = \
deltas[resp.partition] resp.offsets[0] + deltas[resp.partition]
else: else:
raise ValueError("Unexpected value for `whence`, %d" % whence) raise ValueError("Unexpected value for `whence`, %d" % whence)
@@ -364,9 +365,10 @@ class SimpleConsumer(Consumer):
req = FetchRequest( req = FetchRequest(
self.topic, partition, offset, self.client.bufsize) self.topic, partition, offset, self.client.bufsize)
(resp,) = self.client.send_fetch_request([req], (resp,) = self.client.send_fetch_request(
max_wait_time=self.fetch_max_wait_time, [req],
min_bytes=fetch_size) max_wait_time=self.fetch_max_wait_time,
min_bytes=fetch_size)
assert resp.topic == self.topic assert resp.topic == self.topic
assert resp.partition == partition assert resp.partition == partition
@@ -376,18 +378,22 @@ class SimpleConsumer(Consumer):
for message in resp.messages: for message in resp.messages:
next_offset = message.offset next_offset = message.offset
# update the offset before the message is yielded. This is # update the offset before the message is yielded. This
# so that the consumer state is not lost in certain cases. # is so that the consumer state is not lost in certain
# For eg: the message is yielded and consumed by the caller, # cases.
# but the caller does not come back into the generator again. #
# The message will be consumed but the status will not be # For eg: the message is yielded and consumed by the
# updated in the consumer # caller, but the caller does not come back into the
# generator again. The message will be consumed but the
# status will not be updated in the consumer
self.fetch_started[partition] = True self.fetch_started[partition] = True
self.offsets[partition] = message.offset self.offsets[partition] = message.offset
yield message yield message
except ConsumerFetchSizeTooSmall, e: except ConsumerFetchSizeTooSmall, e:
log.warn("Fetch size is too small, increasing by 1.5x and retrying")
fetch_size *= 1.5 fetch_size *= 1.5
log.warn(
"Fetch size too small, increasing to %d (1.5x) and retry",
fetch_size)
continue continue
except ConsumerNoMoreData, e: except ConsumerNoMoreData, e:
log.debug("Iteration was ended by %r", e) log.debug("Iteration was ended by %r", e)
@@ -429,11 +435,12 @@ class MultiProcessConsumer(Consumer):
num_procs=1, partitions_per_proc=0): num_procs=1, partitions_per_proc=0):
# Initiate the base consumer class # Initiate the base consumer class
super(MultiProcessConsumer, self).__init__(client, group, topic, super(MultiProcessConsumer, self).__init__(
partitions=None, client, group, topic,
auto_commit=auto_commit, partitions=None,
auto_commit_every_n=auto_commit_every_n, auto_commit=auto_commit,
auto_commit_every_t=auto_commit_every_t) auto_commit_every_n=auto_commit_every_n,
auto_commit_every_t=auto_commit_every_t)
# Variables for managing and controlling the data flow from # Variables for managing and controlling the data flow from
# consumer child process to master # consumer child process to master

View File

@@ -25,12 +25,12 @@ class KafkaProtocol(object):
This class does not have any state associated with it, it is purely This class does not have any state associated with it, it is purely
for organization. for organization.
""" """
PRODUCE_KEY = 0 PRODUCE_KEY = 0
FETCH_KEY = 1 FETCH_KEY = 1
OFFSET_KEY = 2 OFFSET_KEY = 2
METADATA_KEY = 3 METADATA_KEY = 3
OFFSET_COMMIT_KEY = 6 OFFSET_COMMIT_KEY = 6
OFFSET_FETCH_KEY = 7 OFFSET_FETCH_KEY = 7
ATTRIBUTE_CODEC_MASK = 0x03 ATTRIBUTE_CODEC_MASK = 0x03
CODEC_NONE = 0x00 CODEC_NONE = 0x00
@@ -120,8 +120,8 @@ class KafkaProtocol(object):
yield OffsetAndMessage(offset, message) yield OffsetAndMessage(offset, message)
except BufferUnderflowError: except BufferUnderflowError:
if read_message is False: if read_message is False:
# If we get a partial read of a message, but haven't yielded anyhting # If we get a partial read of a message, but haven't
# there's a problem # yielded anyhting there's a problem
raise ConsumerFetchSizeTooSmall() raise ConsumerFetchSizeTooSmall()
else: else:
raise StopIteration() raise StopIteration()
@@ -274,14 +274,14 @@ class KafkaProtocol(object):
for i in range(num_partitions): for i in range(num_partitions):
((partition, error, highwater_mark_offset), cur) = \ ((partition, error, highwater_mark_offset), cur) = \
relative_unpack('>ihq', data, cur) relative_unpack('>ihq', data, cur)
(message_set, cur) = read_int_string(data, cur) (message_set, cur) = read_int_string(data, cur)
yield FetchResponse( yield FetchResponse(
topic, partition, error, topic, partition, error,
highwater_mark_offset, highwater_mark_offset,
KafkaProtocol._decode_message_set_iter(message_set)) KafkaProtocol._decode_message_set_iter(message_set))
@classmethod @classmethod
def encode_offset_request(cls, client_id, correlation_id, payloads=None): def encode_offset_request(cls, client_id, correlation_id, payloads=None):
@@ -321,7 +321,7 @@ class KafkaProtocol(object):
for i in range(num_partitions): for i in range(num_partitions):
((partition, error, num_offsets,), cur) = \ ((partition, error, num_offsets,), cur) = \
relative_unpack('>ihi', data, cur) relative_unpack('>ihi', data, cur)
offsets = [] offsets = []
for j in range(num_offsets): for j in range(num_offsets):
@@ -383,17 +383,17 @@ class KafkaProtocol(object):
for j in range(num_partitions): for j in range(num_partitions):
((partition_error_code, partition, leader, numReplicas), cur) = \ ((partition_error_code, partition, leader, numReplicas), cur) = \
relative_unpack('>hiii', data, cur) relative_unpack('>hiii', data, cur)
(replicas, cur) = relative_unpack('>%di' % numReplicas, (replicas, cur) = relative_unpack(
data, cur) '>%di' % numReplicas, data, cur)
((num_isr,), cur) = relative_unpack('>i', data, cur) ((num_isr,), cur) = relative_unpack('>i', data, cur)
(isr, cur) = relative_unpack('>%di' % num_isr, data, cur) (isr, cur) = relative_unpack('>%di' % num_isr, data, cur)
partition_metadata[partition] = \ partition_metadata[partition] = \
PartitionMetadata(topic_name, partition, leader, PartitionMetadata(
replicas, isr) topic_name, partition, leader, replicas, isr)
topic_metadata[topic_name] = partition_metadata topic_metadata[topic_name] = partition_metadata
@@ -531,7 +531,7 @@ def create_gzip_message(payloads, key=None):
key: bytes, a key used for partition routing (optional) key: bytes, a key used for partition routing (optional)
""" """
message_set = KafkaProtocol._encode_message_set( message_set = KafkaProtocol._encode_message_set(
[create_message(payload) for payload in payloads]) [create_message(payload) for payload in payloads])
gzipped = gzip_encode(message_set) gzipped = gzip_encode(message_set)
codec = KafkaProtocol.ATTRIBUTE_CODEC_MASK & KafkaProtocol.CODEC_GZIP codec = KafkaProtocol.ATTRIBUTE_CODEC_MASK & KafkaProtocol.CODEC_GZIP
@@ -552,7 +552,7 @@ def create_snappy_message(payloads, key=None):
key: bytes, a key used for partition routing (optional) key: bytes, a key used for partition routing (optional)
""" """
message_set = KafkaProtocol._encode_message_set( message_set = KafkaProtocol._encode_message_set(
[create_message(payload) for payload in payloads]) [create_message(payload) for payload in payloads])
snapped = snappy_encode(message_set) snapped = snappy_encode(message_set)
codec = KafkaProtocol.ATTRIBUTE_CODEC_MASK & KafkaProtocol.CODEC_SNAPPY codec = KafkaProtocol.ATTRIBUTE_CODEC_MASK & KafkaProtocol.CODEC_SNAPPY

View File

@@ -25,8 +25,9 @@ class KafkaConsumerProcess(Process):
Process.__init__(self) Process.__init__(self)
def __str__(self): def __str__(self):
return "[KafkaConsumerProcess: topic=%s, partition=%s, sleep=%s]" % \ return "[KafkaConsumerProcess: topic=%s, \
(self.topic, self.partition, self.consumer_sleep) partition=%s, sleep=%s]" % \
(self.topic, self.partition, self.consumer_sleep)
def run(self): def run(self):
self.barrier.wait() self.barrier.wait()
@@ -70,10 +71,12 @@ class KafkaProducerProcess(Process):
Process.__init__(self) Process.__init__(self)
def __str__(self): def __str__(self):
return "[KafkaProducerProcess: topic=%s, flush_buffer=%s, \ return "[KafkaProducerProcess: topic=%s, \
flush_timeout=%s, timeout=%s]" % ( flush_buffer=%s, flush_timeout=%s, timeout=%s]" % \
self.topic, self.producer_flush_buffer, (self.topic,
self.producer_flush_timeout, self.producer_timeout) self.producer_flush_buffer,
self.producer_flush_timeout,
self.producer_timeout)
def run(self): def run(self):
self.barrier.wait() self.barrier.wait()
@@ -104,8 +107,8 @@ class KafkaProducerProcess(Process):
last_produce = time.time() last_produce = time.time()
try: try:
msg = KafkaClient.create_message(self.in_queue.get(True, msg = KafkaClient.create_message(
self.producer_timeout)) self.in_queue.get(True, self.producer_timeout))
messages.append(msg) messages.append(msg)
except Empty: except Empty:

View File

@@ -1,9 +1,8 @@
from collections import defaultdict from collections import defaultdict
from itertools import groupby
import struct import struct
from threading import Thread, Event from threading import Thread, Event
from common import * from kafka.common import BufferUnderflowError
def write_int_string(s): def write_int_string(s):
@@ -39,7 +38,8 @@ def read_short_string(data, cur):
def read_int_string(data, cur): def read_int_string(data, cur):
if len(data) < cur + 4: if len(data) < cur + 4:
raise BufferUnderflowError( raise BufferUnderflowError(
"Not enough data left to read string len (%d < %d)" % (len(data), cur + 4)) "Not enough data left to read string len (%d < %d)" %
(len(data), cur + 4))
(strlen,) = struct.unpack('>i', data[cur:cur + 4]) (strlen,) = struct.unpack('>i', data[cur:cur + 4])
if strlen == -1: if strlen == -1: