Dont maintain all producer args / kwargs in subclass __init__ and docstrings -- just refer to super class (Producer)

This commit is contained in:
Dana Powers
2015-06-08 17:23:56 -07:00
parent a9ddf155e7
commit c0fb0de7c4
2 changed files with 22 additions and 89 deletions

View File

@@ -3,14 +3,10 @@ from __future__ import absolute_import
import logging
import warnings
from kafka.partitioner import HashedPartitioner
from kafka.util import kafka_bytestring
from .base import Producer
from ..partitioner import HashedPartitioner
from ..util import kafka_bytestring
from .base import (
Producer, BATCH_SEND_DEFAULT_INTERVAL,
BATCH_SEND_MSG_COUNT, ASYNC_QUEUE_MAXSIZE, ASYNC_QUEUE_PUT_TIMEOUT,
ASYNC_RETRY_LIMIT, ASYNC_RETRY_BACKOFF_MS, ASYNC_RETRY_ON_TIMEOUTS
)
log = logging.getLogger(__name__)
@@ -19,46 +15,17 @@ class KeyedProducer(Producer):
"""
A producer which distributes messages to partitions based on the key
Arguments:
client: The kafka client instance
See Producer class for Arguments
Keyword Arguments:
Additional Arguments:
partitioner: A partitioner class that will be used to get the partition
to send the message to. Must be derived from Partitioner
async: If True, the messages are sent asynchronously via another
thread (process). We will not wait for a response to these
ack_timeout: Value (in milliseconds) indicating a timeout for waiting
for an acknowledgement
batch_send: If True, messages are send in batches
batch_send_every_n: If set, messages are send in batches of this size
batch_send_every_t: If set, messages are send after this timeout
to send the message to. Must be derived from Partitioner.
Defaults to HashedPartitioner.
"""
def __init__(self, client, partitioner=None, async=False,
req_acks=Producer.ACK_AFTER_LOCAL_WRITE,
ack_timeout=Producer.DEFAULT_ACK_TIMEOUT,
codec=None,
batch_send=False,
batch_send_every_n=BATCH_SEND_MSG_COUNT,
batch_send_every_t=BATCH_SEND_DEFAULT_INTERVAL,
async_retry_limit=ASYNC_RETRY_LIMIT,
async_retry_backoff_ms=ASYNC_RETRY_BACKOFF_MS,
async_retry_on_timeouts=ASYNC_RETRY_ON_TIMEOUTS,
async_queue_maxsize=ASYNC_QUEUE_MAXSIZE,
async_queue_put_timeout=ASYNC_QUEUE_PUT_TIMEOUT):
if not partitioner:
partitioner = HashedPartitioner
self.partitioner_class = partitioner
def __init__(self, *args, **kwargs):
self.partitioner_class = kwargs.pop('partitioner', HashedPartitioner)
self.partitioners = {}
super(KeyedProducer, self).__init__(client, req_acks, ack_timeout,
codec, async, batch_send,
batch_send_every_n,
batch_send_every_t,
async_retry_limit,
async_retry_backoff_ms,
async_retry_on_timeouts,
async_queue_maxsize,
async_queue_put_timeout)
super(KeyedProducer, self).__init__(*args, **kwargs)
def _next_partition(self, topic, key):
if topic not in self.partitioners:

View File

@@ -1,68 +1,34 @@
from __future__ import absolute_import
from itertools import cycle
import logging
import random
import six
from itertools import cycle
from six.moves import xrange
from .base import (
Producer, BATCH_SEND_DEFAULT_INTERVAL,
BATCH_SEND_MSG_COUNT, ASYNC_QUEUE_MAXSIZE, ASYNC_QUEUE_PUT_TIMEOUT,
ASYNC_RETRY_LIMIT, ASYNC_RETRY_BACKOFF_MS, ASYNC_RETRY_ON_TIMEOUTS
)
from .base import Producer
log = logging.getLogger(__name__)
class SimpleProducer(Producer):
"""
A simple, round-robin producer. Each message goes to exactly one partition
"""A simple, round-robin producer.
Arguments:
client: The Kafka client instance to use
See Producer class for Base Arguments
Keyword Arguments:
async: If True, the messages are sent asynchronously via another
thread (process). We will not wait for a response to these
req_acks: A value indicating the acknowledgements that the server must
receive before responding to the request
ack_timeout: Value (in milliseconds) indicating a timeout for waiting
for an acknowledgement
batch_send: If True, messages are send in batches
batch_send_every_n: If set, messages are send in batches of this size
batch_send_every_t: If set, messages are send after this timeout
random_start: If true, randomize the initial partition which the
Additional Arguments:
random_start (bool, optional): randomize the initial partition which
the first message block will be published to, otherwise
if false, the first message block will always publish
to partition 0 before cycling through each partition
to partition 0 before cycling through each partition,
defaults to True.
"""
def __init__(self, client, async=False,
req_acks=Producer.ACK_AFTER_LOCAL_WRITE,
ack_timeout=Producer.DEFAULT_ACK_TIMEOUT,
codec=None,
batch_send=False,
batch_send_every_n=BATCH_SEND_MSG_COUNT,
batch_send_every_t=BATCH_SEND_DEFAULT_INTERVAL,
random_start=True,
async_retry_limit=ASYNC_RETRY_LIMIT,
async_retry_backoff_ms=ASYNC_RETRY_BACKOFF_MS,
async_retry_on_timeouts=ASYNC_RETRY_ON_TIMEOUTS,
async_queue_maxsize=ASYNC_QUEUE_MAXSIZE,
async_queue_put_timeout=ASYNC_QUEUE_PUT_TIMEOUT):
def __init__(self, *args, **kwargs):
self.partition_cycles = {}
self.random_start = random_start
super(SimpleProducer, self).__init__(client, req_acks, ack_timeout,
codec, async, batch_send,
batch_send_every_n,
batch_send_every_t,
async_retry_limit,
async_retry_backoff_ms,
async_retry_on_timeouts,
async_queue_maxsize,
async_queue_put_timeout)
self.random_start = kwargs.pop('random_start', True)
super(SimpleProducer, self).__init__(*args, **kwargs)
def _next_partition(self, topic):
if topic not in self.partition_cycles: