Spelling and grammar changes (#923)
This commit is contained in:
committed by
Dana Powers
parent
cb06a6b125
commit
83081befc1
@@ -110,7 +110,7 @@ class Consumer(object):
|
||||
for resp in responses:
|
||||
try:
|
||||
check_error(resp)
|
||||
# API spec says server wont set an error here
|
||||
# API spec says server won't set an error here
|
||||
# but 0.8.1.1 does actually...
|
||||
except UnknownTopicOrPartitionError:
|
||||
pass
|
||||
|
||||
@@ -42,12 +42,12 @@ class KafkaConsumer(six.Iterator):
|
||||
It just needs to have at least one broker that will respond to a
|
||||
Metadata API Request. Default port is 9092. If no servers are
|
||||
specified, will default to localhost:9092.
|
||||
client_id (str): a name for this client. This string is passed in
|
||||
client_id (str): A name for this client. This string is passed in
|
||||
each request to servers and can be used to identify specific
|
||||
server-side log entries that correspond to this client. Also
|
||||
submitted to GroupCoordinator for logging with respect to
|
||||
consumer group administration. Default: 'kafka-python-{version}'
|
||||
group_id (str or None): name of the consumer group to join for dynamic
|
||||
group_id (str or None): The name of the consumer group to join for dynamic
|
||||
partition assignment (if enabled), and to use for fetching and
|
||||
committing offsets. If None, auto-partition assignment (via
|
||||
group coordinator) and offset commits are disabled.
|
||||
@@ -85,20 +85,20 @@ class KafkaConsumer(six.Iterator):
|
||||
OffsetOutOfRange errors: 'earliest' will move to the oldest
|
||||
available message, 'latest' will move to the most recent. Any
|
||||
other value will raise the exception. Default: 'latest'.
|
||||
enable_auto_commit (bool): If true the consumer's offset will be
|
||||
enable_auto_commit (bool): If True , the consumer's offset will be
|
||||
periodically committed in the background. Default: True.
|
||||
auto_commit_interval_ms (int): milliseconds between automatic
|
||||
auto_commit_interval_ms (int): Number of milliseconds between automatic
|
||||
offset commits, if enable_auto_commit is True. Default: 5000.
|
||||
default_offset_commit_callback (callable): called as
|
||||
default_offset_commit_callback (callable): Called as
|
||||
callback(offsets, response) response will be either an Exception
|
||||
or a OffsetCommitResponse struct. This callback can be used to
|
||||
or an OffsetCommitResponse struct. This callback can be used to
|
||||
trigger custom actions when a commit request completes.
|
||||
check_crcs (bool): Automatically check the CRC32 of the records
|
||||
consumed. This ensures no on-the-wire or on-disk corruption to
|
||||
the messages occurred. This check adds some overhead, so it may
|
||||
be disabled in cases seeking extreme performance. Default: True
|
||||
metadata_max_age_ms (int): The period of time in milliseconds after
|
||||
which we force a refresh of metadata even if we haven't seen any
|
||||
which we force a refresh of metadata, even if we haven't seen any
|
||||
partition leadership changes to proactively discover any new
|
||||
brokers or partitions. Default: 300000
|
||||
partition_assignment_strategy (list): List of objects to use to
|
||||
@@ -115,7 +115,7 @@ class KafkaConsumer(six.Iterator):
|
||||
adjusted even lower to control the expected time for normal
|
||||
rebalances. Default: 3000
|
||||
session_timeout_ms (int): The timeout used to detect failures when
|
||||
using Kafka's group managementment facilities. Default: 30000
|
||||
using Kafka's group management facilities. Default: 30000
|
||||
max_poll_records (int): The maximum number of records returned in a
|
||||
single call to poll().
|
||||
receive_buffer_bytes (int): The size of the TCP receive buffer
|
||||
@@ -139,27 +139,27 @@ class KafkaConsumer(six.Iterator):
|
||||
set this option to True. Default: False.
|
||||
security_protocol (str): Protocol used to communicate with brokers.
|
||||
Valid values are: PLAINTEXT, SSL. Default: PLAINTEXT.
|
||||
ssl_context (ssl.SSLContext): pre-configured SSLContext for wrapping
|
||||
ssl_context (ssl.SSLContext): Pre-configured SSLContext for wrapping
|
||||
socket connections. If provided, all other ssl_* configurations
|
||||
will be ignored. Default: None.
|
||||
ssl_check_hostname (bool): flag to configure whether ssl handshake
|
||||
ssl_check_hostname (bool): Flag to configure whether ssl handshake
|
||||
should verify that the certificate matches the brokers hostname.
|
||||
default: true.
|
||||
ssl_cafile (str): optional filename of ca file to use in certificate
|
||||
verification. default: none.
|
||||
ssl_certfile (str): optional filename of file in pem format containing
|
||||
Default: True.
|
||||
ssl_cafile (str): Optional filename of ca file to use in certificate
|
||||
verification. Default: None.
|
||||
ssl_certfile (str): Optional filename of file in pem format containing
|
||||
the client certificate, as well as any ca certificates needed to
|
||||
establish the certificate's authenticity. default: none.
|
||||
ssl_keyfile (str): optional filename containing the client private key.
|
||||
default: none.
|
||||
ssl_password (str): optional password to be used when loading the
|
||||
certificate chain. default: None.
|
||||
ssl_crlfile (str): optional filename containing the CRL to check for
|
||||
establish the certificate's authenticity. Default: None.
|
||||
ssl_keyfile (str): Optional filename containing the client private key.
|
||||
Default: None.
|
||||
ssl_password (str): Optional password to be used when loading the
|
||||
certificate chain. Default: None.
|
||||
ssl_crlfile (str): Optional filename containing the CRL to check for
|
||||
certificate expiration. By default, no CRL check is done. When
|
||||
providing a file, only the leaf certificate will be checked against
|
||||
this CRL. The CRL can only be checked with Python 3.4+ or 2.7.9+.
|
||||
default: none.
|
||||
api_version (tuple): specify which kafka API version to use.
|
||||
Default: None.
|
||||
api_version (tuple): Specify which kafka API version to use.
|
||||
If set to None, the client will attempt to infer the broker version
|
||||
by probing various APIs. Default: None
|
||||
Examples:
|
||||
@@ -189,12 +189,12 @@ class KafkaConsumer(six.Iterator):
|
||||
(such as offsets) should be exposed to the consumer. If set to True
|
||||
the only way to receive records from an internal topic is
|
||||
subscribing to it. Requires 0.10+ Default: True
|
||||
sasl_mechanism (str): string picking sasl mechanism when security_protocol
|
||||
sasl_mechanism (str): String picking sasl mechanism when security_protocol
|
||||
is SASL_PLAINTEXT or SASL_SSL. Currently only PLAIN is supported.
|
||||
Default: None
|
||||
sasl_plain_username (str): username for sasl PLAIN authentication.
|
||||
sasl_plain_username (str): Username for sasl PLAIN authentication.
|
||||
Default: None
|
||||
sasl_plain_password (str): password for sasl PLAIN authentication.
|
||||
sasl_plain_password (str): Password for sasl PLAIN authentication.
|
||||
Default: None
|
||||
|
||||
Note:
|
||||
@@ -239,7 +239,7 @@ class KafkaConsumer(six.Iterator):
|
||||
'ssl_password': None,
|
||||
'api_version': None,
|
||||
'api_version_auto_timeout_ms': 2000,
|
||||
'connections_max_idle_ms': 9 * 60 * 1000, # not implemented yet
|
||||
'connections_max_idle_ms': 9 * 60 * 1000, # Not implemented yet
|
||||
'metric_reporters': [],
|
||||
'metrics_num_samples': 2,
|
||||
'metrics_sample_window_ms': 30000,
|
||||
@@ -275,7 +275,7 @@ class KafkaConsumer(six.Iterator):
|
||||
self._metrics = Metrics(metric_config, reporters)
|
||||
# TODO _metrics likely needs to be passed to KafkaClient, etc.
|
||||
|
||||
# api_version was previously a str. accept old format for now
|
||||
# api_version was previously a str. Accept old format for now
|
||||
if isinstance(self.config['api_version'], str):
|
||||
str_version = self.config['api_version']
|
||||
if str_version == 'auto':
|
||||
@@ -310,10 +310,10 @@ class KafkaConsumer(six.Iterator):
|
||||
"""Manually assign a list of TopicPartitions to this consumer.
|
||||
|
||||
Arguments:
|
||||
partitions (list of TopicPartition): assignment for this instance.
|
||||
partitions (list of TopicPartition): Assignment for this instance.
|
||||
|
||||
Raises:
|
||||
IllegalStateError: if consumer has already called subscribe()
|
||||
IllegalStateError: If consumer has already called subscribe()
|
||||
|
||||
Warning:
|
||||
It is not possible to use both manual partition assignment with
|
||||
@@ -339,7 +339,7 @@ class KafkaConsumer(six.Iterator):
|
||||
simply return the same partitions that were previously assigned.
|
||||
If topics were subscribed using subscribe(), then this will give the
|
||||
set of topic partitions currently assigned to the consumer (which may
|
||||
be none if the assignment hasn't happened yet, or if the partitions are
|
||||
be None if the assignment hasn't happened yet, or if the partitions are
|
||||
in the process of being reassigned).
|
||||
|
||||
Returns:
|
||||
@@ -367,7 +367,7 @@ class KafkaConsumer(six.Iterator):
|
||||
log.debug("The KafkaConsumer has closed.")
|
||||
|
||||
def commit_async(self, offsets=None, callback=None):
|
||||
"""Commit offsets to kafka asynchronously, optionally firing callback
|
||||
"""Commit offsets to kafka asynchronously, optionally firing callback.
|
||||
|
||||
This commits offsets only to Kafka. The offsets committed using this API
|
||||
will be used on the first fetch after every rebalance and also on
|
||||
@@ -381,10 +381,10 @@ class KafkaConsumer(six.Iterator):
|
||||
|
||||
Arguments:
|
||||
offsets (dict, optional): {TopicPartition: OffsetAndMetadata} dict
|
||||
to commit with the configured group_id. Defaults to current
|
||||
to commit with the configured group_id. Defaults to currently
|
||||
consumed offsets for all subscribed partitions.
|
||||
callback (callable, optional): called as callback(offsets, response)
|
||||
with response as either an Exception or a OffsetCommitResponse
|
||||
callback (callable, optional): Called as callback(offsets, response)
|
||||
with response as either an Exception or an OffsetCommitResponse
|
||||
struct. This callback can be used to trigger custom actions when
|
||||
a commit request completes.
|
||||
|
||||
@@ -401,7 +401,7 @@ class KafkaConsumer(six.Iterator):
|
||||
return future
|
||||
|
||||
def commit(self, offsets=None):
|
||||
"""Commit offsets to kafka, blocking until success or error
|
||||
"""Commit offsets to kafka, blocking until success or error.
|
||||
|
||||
This commits offsets only to Kafka. The offsets committed using this API
|
||||
will be used on the first fetch after every rebalance and also on
|
||||
@@ -413,11 +413,11 @@ class KafkaConsumer(six.Iterator):
|
||||
Blocks until either the commit succeeds or an unrecoverable error is
|
||||
encountered (in which case it is thrown to the caller).
|
||||
|
||||
Currently only supports kafka-topic offset storage (not zookeeper)
|
||||
Currently only supports kafka-topic offset storage (not zookeeper).
|
||||
|
||||
Arguments:
|
||||
offsets (dict, optional): {TopicPartition: OffsetAndMetadata} dict
|
||||
to commit with the configured group_id. Defaults to current
|
||||
to commit with the configured group_id. Defaults to currently
|
||||
consumed offsets for all subscribed partitions.
|
||||
"""
|
||||
assert self.config['api_version'] >= (0, 8, 1), 'Requires >= Kafka 0.8.1'
|
||||
@@ -427,7 +427,7 @@ class KafkaConsumer(six.Iterator):
|
||||
self._coordinator.commit_offsets_sync(offsets)
|
||||
|
||||
def committed(self, partition):
|
||||
"""Get the last committed offset for the given partition
|
||||
"""Get the last committed offset for the given partition.
|
||||
|
||||
This offset will be used as the position for the consumer
|
||||
in the event of a failure.
|
||||
@@ -437,7 +437,7 @@ class KafkaConsumer(six.Iterator):
|
||||
initialized its cache of committed offsets.
|
||||
|
||||
Arguments:
|
||||
partition (TopicPartition): the partition to check
|
||||
partition (TopicPartition): The partition to check.
|
||||
|
||||
Returns:
|
||||
The last committed offset, or None if there was no prior commit.
|
||||
@@ -480,10 +480,10 @@ class KafkaConsumer(six.Iterator):
|
||||
"""Get metadata about the partitions for a given topic.
|
||||
|
||||
Arguments:
|
||||
topic (str): topic to check
|
||||
topic (str): Topic to check.
|
||||
|
||||
Returns:
|
||||
set: partition ids
|
||||
set: Partition ids
|
||||
"""
|
||||
return self._client.cluster.partitions_for_topic(topic)
|
||||
|
||||
@@ -499,7 +499,7 @@ class KafkaConsumer(six.Iterator):
|
||||
Incompatible with iterator interface -- use one or the other, not both.
|
||||
|
||||
Arguments:
|
||||
timeout_ms (int, optional): milliseconds spent waiting in poll if
|
||||
timeout_ms (int, optional): Milliseconds spent waiting in poll if
|
||||
data is not available in the buffer. If 0, returns immediately
|
||||
with any records that are available currently in the buffer,
|
||||
else returns empty. Must not be negative. Default: 0
|
||||
@@ -508,14 +508,14 @@ class KafkaConsumer(six.Iterator):
|
||||
max_poll_records.
|
||||
|
||||
Returns:
|
||||
dict: topic to list of records since the last fetch for the
|
||||
subscribed list of topics and partitions
|
||||
dict: Topic to list of records since the last fetch for the
|
||||
subscribed list of topics and partitions.
|
||||
"""
|
||||
assert timeout_ms >= 0, 'Timeout must not be negative'
|
||||
if max_records is None:
|
||||
max_records = self.config['max_poll_records']
|
||||
|
||||
# poll for new data until the timeout expires
|
||||
# Poll for new data until the timeout expires
|
||||
start = time.time()
|
||||
remaining = timeout_ms
|
||||
while True:
|
||||
@@ -530,15 +530,14 @@ class KafkaConsumer(six.Iterator):
|
||||
return {}
|
||||
|
||||
def _poll_once(self, timeout_ms, max_records):
|
||||
"""
|
||||
Do one round of polling. In addition to checking for new data, this does
|
||||
"""Do one round of polling. In addition to checking for new data, this does
|
||||
any needed heart-beating, auto-commits, and offset updates.
|
||||
|
||||
Arguments:
|
||||
timeout_ms (int): The maximum time in milliseconds to block
|
||||
timeout_ms (int): The maximum time in milliseconds to block.
|
||||
|
||||
Returns:
|
||||
dict: map of topic to list of records (may be empty)
|
||||
dict: Map of topic to list of records (may be empty).
|
||||
"""
|
||||
if self._use_consumer_group():
|
||||
self._coordinator.ensure_coordinator_known()
|
||||
@@ -548,16 +547,16 @@ class KafkaConsumer(six.Iterator):
|
||||
elif self.config['group_id'] is not None and self.config['api_version'] >= (0, 8, 2):
|
||||
self._coordinator.ensure_coordinator_known()
|
||||
|
||||
# fetch positions if we have partitions we're subscribed to that we
|
||||
# Fetch positions if we have partitions we're subscribed to that we
|
||||
# don't know the offset for
|
||||
if not self._subscription.has_all_fetch_positions():
|
||||
self._update_fetch_positions(self._subscription.missing_fetch_positions())
|
||||
|
||||
# if data is available already, e.g. from a previous network client
|
||||
# If data is available already, e.g. from a previous network client
|
||||
# poll() call to commit, then just return it immediately
|
||||
records, partial = self._fetcher.fetched_records(max_records)
|
||||
if records:
|
||||
# before returning the fetched records, we can send off the
|
||||
# Before returning the fetched records, we can send off the
|
||||
# next round of fetches and avoid block waiting for their
|
||||
# responses to enable pipelining while the user is handling the
|
||||
# fetched records.
|
||||
@@ -565,7 +564,7 @@ class KafkaConsumer(six.Iterator):
|
||||
self._fetcher.send_fetches()
|
||||
return records
|
||||
|
||||
# send any new fetches (won't resend pending fetches)
|
||||
# Send any new fetches (won't resend pending fetches)
|
||||
self._fetcher.send_fetches()
|
||||
|
||||
self._client.poll(timeout_ms=timeout_ms, sleep=True)
|
||||
@@ -576,10 +575,10 @@ class KafkaConsumer(six.Iterator):
|
||||
"""Get the offset of the next record that will be fetched
|
||||
|
||||
Arguments:
|
||||
partition (TopicPartition): partition to check
|
||||
partition (TopicPartition): Partition to check
|
||||
|
||||
Returns:
|
||||
int: offset
|
||||
int: Offset
|
||||
"""
|
||||
if not isinstance(partition, TopicPartition):
|
||||
raise TypeError('partition must be a TopicPartition namedtuple')
|
||||
@@ -591,7 +590,7 @@ class KafkaConsumer(six.Iterator):
|
||||
return offset
|
||||
|
||||
def highwater(self, partition):
|
||||
"""Last known highwater offset for a partition
|
||||
"""Last known highwater offset for a partition.
|
||||
|
||||
A highwater offset is the offset that will be assigned to the next
|
||||
message that is produced. It may be useful for calculating lag, by
|
||||
@@ -604,10 +603,10 @@ class KafkaConsumer(six.Iterator):
|
||||
yet.
|
||||
|
||||
Arguments:
|
||||
partition (TopicPartition): partition to check
|
||||
partition (TopicPartition): Partition to check
|
||||
|
||||
Returns:
|
||||
int or None: offset if available
|
||||
int or None: Offset if available
|
||||
"""
|
||||
if not isinstance(partition, TopicPartition):
|
||||
raise TypeError('partition must be a TopicPartition namedtuple')
|
||||
@@ -623,7 +622,7 @@ class KafkaConsumer(six.Iterator):
|
||||
group rebalance when automatic assignment is used.
|
||||
|
||||
Arguments:
|
||||
*partitions (TopicPartition): partitions to pause
|
||||
*partitions (TopicPartition): Partitions to pause.
|
||||
"""
|
||||
if not all([isinstance(p, TopicPartition) for p in partitions]):
|
||||
raise TypeError('partitions must be TopicPartition namedtuples')
|
||||
@@ -643,7 +642,7 @@ class KafkaConsumer(six.Iterator):
|
||||
"""Resume fetching from the specified (paused) partitions.
|
||||
|
||||
Arguments:
|
||||
*partitions (TopicPartition): partitions to resume
|
||||
*partitions (TopicPartition): Partitions to resume.
|
||||
"""
|
||||
if not all([isinstance(p, TopicPartition) for p in partitions]):
|
||||
raise TypeError('partitions must be TopicPartition namedtuples')
|
||||
@@ -661,11 +660,11 @@ class KafkaConsumer(six.Iterator):
|
||||
to reset the fetch offsets.
|
||||
|
||||
Arguments:
|
||||
partition (TopicPartition): partition for seek operation
|
||||
offset (int): message offset in partition
|
||||
partition (TopicPartition): Partition for seek operation
|
||||
offset (int): Message offset in partition
|
||||
|
||||
Raises:
|
||||
AssertionError: if offset is not an int >= 0; or if partition is not
|
||||
AssertionError: If offset is not an int >= 0; or if partition is not
|
||||
currently assigned.
|
||||
"""
|
||||
if not isinstance(partition, TopicPartition):
|
||||
@@ -679,12 +678,12 @@ class KafkaConsumer(six.Iterator):
|
||||
"""Seek to the oldest available offset for partitions.
|
||||
|
||||
Arguments:
|
||||
*partitions: optionally provide specific TopicPartitions, otherwise
|
||||
default to all assigned partitions
|
||||
*partitions: Optionally provide specific TopicPartitions, otherwise
|
||||
default to all assigned partitions.
|
||||
|
||||
Raises:
|
||||
AssertionError: if any partition is not currently assigned, or if
|
||||
no partitions are assigned
|
||||
AssertionError: If any partition is not currently assigned, or if
|
||||
no partitions are assigned.
|
||||
"""
|
||||
if not all([isinstance(p, TopicPartition) for p in partitions]):
|
||||
raise TypeError('partitions must be TopicPartition namedtuples')
|
||||
@@ -703,12 +702,12 @@ class KafkaConsumer(six.Iterator):
|
||||
"""Seek to the most recent available offset for partitions.
|
||||
|
||||
Arguments:
|
||||
*partitions: optionally provide specific TopicPartitions, otherwise
|
||||
default to all assigned partitions
|
||||
*partitions: Optionally provide specific TopicPartitions, otherwise
|
||||
default to all assigned partitions.
|
||||
|
||||
Raises:
|
||||
AssertionError: if any partition is not currently assigned, or if
|
||||
no partitions are assigned
|
||||
AssertionError: If any partition is not currently assigned, or if
|
||||
no partitions are assigned.
|
||||
"""
|
||||
if not all([isinstance(p, TopicPartition) for p in partitions]):
|
||||
raise TypeError('partitions must be TopicPartition namedtuples')
|
||||
@@ -724,13 +723,13 @@ class KafkaConsumer(six.Iterator):
|
||||
self._subscription.need_offset_reset(tp, OffsetResetStrategy.LATEST)
|
||||
|
||||
def subscribe(self, topics=(), pattern=None, listener=None):
|
||||
"""Subscribe to a list of topics, or a topic regex pattern
|
||||
"""Subscribe to a list of topics, or a topic regex pattern.
|
||||
|
||||
Partitions will be dynamically assigned via a group coordinator.
|
||||
Topic subscriptions are not incremental: this list will replace the
|
||||
current assignment (if there is one).
|
||||
|
||||
This method is incompatible with assign()
|
||||
This method is incompatible with assign().
|
||||
|
||||
Arguments:
|
||||
topics (list): List of topics for subscription.
|
||||
@@ -759,16 +758,16 @@ class KafkaConsumer(six.Iterator):
|
||||
through this interface are from topics subscribed in this call.
|
||||
|
||||
Raises:
|
||||
IllegalStateError: if called after previously calling assign()
|
||||
AssertionError: if neither topics or pattern is provided
|
||||
TypeError: if listener is not a ConsumerRebalanceListener
|
||||
IllegalStateError: If called after previously calling assign().
|
||||
AssertionError: If neither topics or pattern is provided.
|
||||
TypeError: If listener is not a ConsumerRebalanceListener.
|
||||
"""
|
||||
# SubscriptionState handles error checking
|
||||
self._subscription.subscribe(topics=topics,
|
||||
pattern=pattern,
|
||||
listener=listener)
|
||||
|
||||
# regex will need all topic metadata
|
||||
# Regex will need all topic metadata
|
||||
if pattern is not None:
|
||||
self._client.cluster.need_all_topic_metadata = True
|
||||
self._client.set_topics([])
|
||||
@@ -821,25 +820,24 @@ class KafkaConsumer(six.Iterator):
|
||||
return True
|
||||
|
||||
def _update_fetch_positions(self, partitions):
|
||||
"""
|
||||
Set the fetch position to the committed position (if there is one)
|
||||
"""Set the fetch position to the committed position (if there is one)
|
||||
or reset it using the offset reset policy the user has configured.
|
||||
|
||||
Arguments:
|
||||
partitions (List[TopicPartition]): The partitions that need
|
||||
updating fetch positions
|
||||
updating fetch positions.
|
||||
|
||||
Raises:
|
||||
NoOffsetForPartitionError: If no offset is stored for a given
|
||||
partition and no offset reset policy is defined
|
||||
partition and no offset reset policy is defined.
|
||||
"""
|
||||
if (self.config['api_version'] >= (0, 8, 1)
|
||||
and self.config['group_id'] is not None):
|
||||
|
||||
# refresh commits for all assigned partitions
|
||||
# Refresh commits for all assigned partitions
|
||||
self._coordinator.refresh_committed_offsets_if_needed()
|
||||
|
||||
# then do any offset lookups in case some positions are not known
|
||||
# Then, do any offset lookups in case some positions are not known
|
||||
self._fetcher.update_fetch_positions(partitions)
|
||||
|
||||
def _message_generator(self):
|
||||
@@ -854,7 +852,7 @@ class KafkaConsumer(six.Iterator):
|
||||
elif self.config['group_id'] is not None and self.config['api_version'] >= (0, 8, 2):
|
||||
self._coordinator.ensure_coordinator_known()
|
||||
|
||||
# fetch offsets for any subscribed partitions that we arent tracking yet
|
||||
# Fetch offsets for any subscribed partitions that we arent tracking yet
|
||||
if not self._subscription.has_all_fetch_positions():
|
||||
partitions = self._subscription.missing_fetch_positions()
|
||||
self._update_fetch_positions(partitions)
|
||||
@@ -889,9 +887,9 @@ class KafkaConsumer(six.Iterator):
|
||||
log.debug("internal iterator timeout - breaking for poll")
|
||||
break
|
||||
|
||||
# an else block on a for loop only executes if there was no break
|
||||
# An else block on a for loop only executes if there was no break
|
||||
# so this should only be called on a StopIteration from the fetcher
|
||||
# and we assume that it is safe to init_fetches when fetcher is done
|
||||
# We assume that it is safe to init_fetches when fetcher is done
|
||||
# i.e., there are no more records stored internally
|
||||
else:
|
||||
self._fetcher.send_fetches()
|
||||
@@ -933,7 +931,7 @@ class KafkaConsumer(six.Iterator):
|
||||
self._consumer_timeout = time.time() + (
|
||||
self.config['consumer_timeout_ms'] / 1000.0)
|
||||
|
||||
# old KafkaConsumer methods are deprecated
|
||||
# Old KafkaConsumer methods are deprecated
|
||||
def configure(self, **configs):
|
||||
raise NotImplementedError(
|
||||
'deprecated -- initialize a new consumer')
|
||||
|
||||
@@ -74,7 +74,7 @@ def test_bootstrap_failure(conn):
|
||||
|
||||
|
||||
def test_can_connect(cli, conn):
|
||||
# Node is not in broker metadata - cant connect
|
||||
# Node is not in broker metadata - can't connect
|
||||
assert not cli._can_connect(2)
|
||||
|
||||
# Node is in broker metadata but not in _conns
|
||||
|
||||
@@ -114,7 +114,7 @@ class KafkaIntegrationTestCase(unittest.TestCase):
|
||||
try:
|
||||
offsets, = self.client.send_offset_request([OffsetRequestPayload(topic, partition, -1, 1)])
|
||||
except:
|
||||
# XXX: We've seen some UnknownErrors here and cant debug w/o server logs
|
||||
# XXX: We've seen some UnknownErrors here and can't debug w/o server logs
|
||||
self.zk.child.dump_logs()
|
||||
self.server.child.dump_logs()
|
||||
raise
|
||||
|
||||
Reference in New Issue
Block a user