Move from using zookeeper to track partition offsets to kafka
With kafka 0.8.1 it is now possible for non-java clients to track offsets with kafka. Previously the code had built this using zookeeper. This relies on kafka-python > 0.9.1 Change-Id: Ia42e713cc5d9ca61d8f8df2adc454f1e2579a229
This commit is contained in:
parent
224e654f4c
commit
3d1deca139
19
README.md
19
README.md
@ -10,11 +10,10 @@ There are four processing steps separated by queues implemented with python mult
|
||||
3. Send Notification. - NotificationProcessor class
|
||||
4. Add sent notifications to Kafka on the notification topic. - SentNotificationProcessor class
|
||||
|
||||
There is also a special processing step, the ZookeeperStateTracker, that runs in the main thread and keeps track of the
|
||||
There is also a special processing step, the KafkaStateTracker, that runs in the main thread and keeps track of the
|
||||
last committed message and ones available for commit, it then periodically commits all progress. This handles the
|
||||
situation where alarms that are not acted on are quickly ready for commit but others which are prior to them in the
|
||||
kafka order are still in progress. Locking is also handled by this class, so all zookeeper functionality is tracked in
|
||||
this class.
|
||||
kafka order are still in progress. Locking is also handled by this class using zookeeper.
|
||||
|
||||
There are 4 internal queues:
|
||||
|
||||
@ -30,7 +29,7 @@ over and continue from where it left. A zookeeper lock file is used to ensure on
|
||||
the code can be modified to use kafka partitions to have multiple active engines working on different alarms.
|
||||
|
||||
## Fault Tolerance
|
||||
When reading from the alarm topic no committing is done. The committing is done in sent_notification processor. This allows
|
||||
When reading from the alarm topic no committing is done. The committing is done only after processing. This allows
|
||||
the processing to continue even though some notifications can be slow. In the event of a catastrophic failure some
|
||||
notifications could be sent but the alarms not yet acknowledged. This is an acceptable failure mode, better to send a
|
||||
notification twice than not at all.
|
||||
@ -76,12 +75,12 @@ are gathered per thread, the thread number is indicated by a -# at the end of th
|
||||
# Future Considerations
|
||||
- Currently I lock the topic rather than the partitions. This effectively means there is only one active notification
|
||||
engine at any given time. In the future to share the load among multiple daemons we could lock by partition.
|
||||
- The ZookeeperStateTracker is a likely place to end up as a bottleneck on high throughput. Detailed investigation of
|
||||
its speed should be done.
|
||||
- How fast is the mysql db? How much load do we put on it. Initially I think it makes most sense to read notification
|
||||
details for each alarm but eventually I may want to cache that info.
|
||||
- I am starting with a single KafkaConsumer and a single SentNotificationProcessor depending on load this may need
|
||||
to scale.
|
||||
- More extensive load testing is needed
|
||||
- How fast is the mysql db? How much load do we put on it. Initially I think it makes most sense to read notification
|
||||
details for each alarm but eventually I may want to cache that info.
|
||||
- I am starting with a single KafkaConsumer and a single SentNotificationProcessor depending on load this may need
|
||||
to scale.
|
||||
- How fast is the state tracker? Do I need to scale or speed that up at all?
|
||||
|
||||
# License
|
||||
|
||||
|
@ -23,7 +23,7 @@ import logging.config
|
||||
import multiprocessing
|
||||
import os
|
||||
import signal
|
||||
from state_tracker import ZookeeperStateTracker
|
||||
from state_tracker import KafkaStateTracker
|
||||
import sys
|
||||
import threading
|
||||
import time
|
||||
@ -108,8 +108,9 @@ def main(argv=None):
|
||||
|
||||
# State Tracker - Used for tracking the progress of fully processed alarms and the zookeeper lock
|
||||
global tracker # Set to global for use in the cleanup function
|
||||
tracker = ZookeeperStateTracker(
|
||||
config['zookeeper']['url'], config['kafka']['alarm_topic'], finished, config['zookeeper']['max_offset_lag'])
|
||||
tracker = KafkaStateTracker(finished, config['kafka']['url'], config['kafka']['group'],
|
||||
config['kafka']['alarm_topic'], config['kafka']['max_offset_lag'],
|
||||
config['zookeeper']['url'])
|
||||
tracker.lock(clean_exit) # Only begin if we have the processing lock
|
||||
tracker_thread = threading.Thread(target=tracker.run)
|
||||
|
||||
@ -120,8 +121,7 @@ def main(argv=None):
|
||||
alarms,
|
||||
config['kafka']['url'],
|
||||
config['kafka']['group'],
|
||||
config['kafka']['alarm_topic'],
|
||||
tracker.offsets
|
||||
config['kafka']['alarm_topic']
|
||||
).run
|
||||
)
|
||||
processors.append(kafka)
|
||||
|
@ -14,6 +14,7 @@
|
||||
# limitations under the License.
|
||||
|
||||
import kafka.client
|
||||
import kafka.common
|
||||
import kafka.consumer
|
||||
import logging
|
||||
import statsd
|
||||
@ -31,7 +32,7 @@ class KafkaConsumer(BaseProcessor):
|
||||
Unfortunately at this point the python-kafka client does not handle multiple consumers seamlessly.
|
||||
For more information see, https://github.com/mumrah/kafka-python/issues/112
|
||||
"""
|
||||
def __init__(self, queue, kafka_url, group, topic, initial_offsets=None):
|
||||
def __init__(self, queue, kafka_url, group, topic):
|
||||
"""Init
|
||||
kafka_url, group, topic - kafka connection details
|
||||
sent_queue - a sent_queue to publish log entries to
|
||||
@ -39,21 +40,56 @@ class KafkaConsumer(BaseProcessor):
|
||||
self.queue = queue
|
||||
|
||||
self.kafka = kafka.client.KafkaClient(kafka_url)
|
||||
# No autocommit, it does not work with kafka 0.8.0 - see https://github.com/mumrah/kafka-python/issues/118
|
||||
# No auto-commit so that commits only happen after the alarm is processed.
|
||||
self.consumer = kafka.consumer.SimpleConsumer(self.kafka, group, topic, auto_commit=False)
|
||||
self.consumer.provide_partition_info() # Without this the partition is not provided in the response
|
||||
if initial_offsets is not None:
|
||||
# Set initial offsets directly in the consumer, there is no method for this so I have to do it here
|
||||
self.consumer.offsets.update(initial_offsets)
|
||||
# fetch offsets are +1 of standard offsets
|
||||
for partition in initial_offsets:
|
||||
self.consumer.fetch_offsets[partition] = initial_offsets[partition] + 1
|
||||
|
||||
self._initialize_offsets(group, topic)
|
||||
# After my pull request is merged I can remove _initialize_offsets and use
|
||||
# self.consumer.offsets = self.consumer.get_offsets()
|
||||
# self.consumer.fetch_offsets = self.consumer.offsets.copy()
|
||||
offsets = self.consumer.offsets.copy()
|
||||
self.consumer.seek(0, 0)
|
||||
if offsets != self.consumer.offsets:
|
||||
log.error('Some messages not yet processed are no longer available in kafka, skipping to first available')
|
||||
log.debug('Intialized offsets %s\nStarting offsets %s' % (offsets, self.consumer.offsets))
|
||||
|
||||
def _initialize_offsets(self, group, topic):
|
||||
"""Fetch initial offsets from kafka
|
||||
This is largely taken from what the kafka consumer itself does when auto_commit is used
|
||||
"""
|
||||
def get_or_init_offset_callback(resp):
|
||||
try:
|
||||
kafka.common.check_error(resp)
|
||||
return resp.offset
|
||||
except kafka.common.UnknownTopicOrPartitionError:
|
||||
return 0
|
||||
|
||||
for partition in self.kafka.topic_partitions[topic]:
|
||||
req = kafka.common.OffsetFetchRequest(topic, partition)
|
||||
(offset,) = self.consumer.client.send_offset_fetch_request(group, [req],
|
||||
callback=get_or_init_offset_callback,
|
||||
fail_on_error=False)
|
||||
|
||||
# The recorded offset is the last successfully processed, start processing at the next
|
||||
# if no processing has been done the offset is 0
|
||||
if offset == 0:
|
||||
self.consumer.offsets[partition] = offset
|
||||
else:
|
||||
self.consumer.offsets[partition] = offset + 1
|
||||
|
||||
# fetch_offsets are used by the SimpleConsumer
|
||||
self.consumer.fetch_offsets = self.consumer.offsets.copy()
|
||||
|
||||
def run(self):
|
||||
"""Consume from kafka and place alarm objects on the sent_queue
|
||||
"""
|
||||
counter = statsd.Counter('ConsumedFromKafka')
|
||||
for message in self.consumer:
|
||||
counter += 1
|
||||
log.debug("Consuming message from kafka, partition %d, offset %d" % (message[0], message[1].offset))
|
||||
self._add_to_queue(self.queue, 'alarms', message)
|
||||
try:
|
||||
for message in self.consumer:
|
||||
counter += 1
|
||||
log.debug("Consuming message from kafka, partition %d, offset %d" % (message[0], message[1].offset))
|
||||
self._add_to_queue(self.queue, 'alarms', message)
|
||||
except Exception:
|
||||
log.exception('Error running Kafka Consumer')
|
||||
raise
|
||||
|
@ -14,6 +14,8 @@
|
||||
# limitations under the License.
|
||||
|
||||
import collections
|
||||
import kafka.client
|
||||
import kafka.common
|
||||
import kazoo.client
|
||||
import kazoo.exceptions
|
||||
import logging
|
||||
@ -26,17 +28,18 @@ from monasca_notification import notification_exceptions
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class ZookeeperStateTracker(object):
|
||||
class KafkaStateTracker(object):
|
||||
"""Tracks message offsets for a kafka topic and partitions.
|
||||
Uses zookeeper to keep track of the last committed offset.
|
||||
As messages are finished with processing the committed offset is updated periodically.
|
||||
The messages are not necessarily finished in order, but the committed offset includes
|
||||
all previous messages so this object tracks any gaps updating as needed.
|
||||
Uses zookeeper to keep track of the last committed offset.
|
||||
"""
|
||||
def __init__(self, url, topic, finished_queue, max_lag):
|
||||
def __init__(self, finished_queue, kafka_url, group, topic, max_lag, zookeeper_url):
|
||||
"""Setup the finished_queue
|
||||
url is the zookeeper hostname:port
|
||||
topic is the kafka topic to track
|
||||
finished_queue - queue containing all processed alarms
|
||||
kafka_url, group, topic - kafka connection details
|
||||
zookeeper_url is the zookeeper hostname:port
|
||||
"""
|
||||
self.finished_queue = finished_queue
|
||||
self.max_lag = max_lag
|
||||
@ -44,14 +47,20 @@ class ZookeeperStateTracker(object):
|
||||
self.has_lock = False
|
||||
self.stop = False
|
||||
|
||||
self.zookeeper = kazoo.client.KazooClient(url)
|
||||
# The kafka client is used solely for committing offsets, the consuming is done in the kafka_consumer
|
||||
self.kafka_group = group
|
||||
self.kafka = kafka.client.KafkaClient(kafka_url)
|
||||
# The consumer is not needed but after my pull request is merged using it for get_offsets simplifies this code
|
||||
# import kafka.consumer
|
||||
# self.consumer = kafka.consumer.SimpleConsumer(self.kafka, group, topic, auto_commit=False)
|
||||
|
||||
self.zookeeper = kazoo.client.KazooClient(zookeeper_url)
|
||||
self.zookeeper.start()
|
||||
self.topic_path = '/consumers/monasca-notification/%s' % topic
|
||||
|
||||
self.lock_retry_time = 15 # number of seconds to wait for retrying for the lock
|
||||
self.lock_path = '/locks/monasca-notification/%s' % topic
|
||||
|
||||
self._offsets = None
|
||||
self._offsets = None # Initialized in the beginning of the run
|
||||
# This is a dictionary of sets used for tracking finished offsets when there is a gap and the committed offset
|
||||
# can not yet be advanced
|
||||
self._uncommitted_offsets = collections.defaultdict(set)
|
||||
@ -68,53 +77,6 @@ class ZookeeperStateTracker(object):
|
||||
self.zookeeper.delete(self.lock_path)
|
||||
self.has_lock = False
|
||||
|
||||
def _get_offsets(self):
|
||||
"""Read the initial offsets from zookeeper or set defaults
|
||||
The return is a dictionary with key name being partition # and value the offset
|
||||
"""
|
||||
if not self.has_lock:
|
||||
log.warn('Reading offsets before the tracker has the lock, they could change')
|
||||
try:
|
||||
if self.zookeeper.exists(self.topic_path):
|
||||
offsets = collections.defaultdict(int)
|
||||
for child in self.zookeeper.get_children(self.topic_path):
|
||||
offsets[int(child)] = int(self.zookeeper.get('/'.join((self.topic_path, child)))[0])
|
||||
log.info('Setting initial offsets to %s' % str(offsets))
|
||||
return offsets
|
||||
else:
|
||||
self.zookeeper.ensure_path(self.topic_path)
|
||||
return {}
|
||||
except kazoo.exceptions.KazooException:
|
||||
log.exception('Error retrieving the committed offset in zookeeper')
|
||||
raise
|
||||
|
||||
def _update_offset(self, partition, value):
|
||||
"""Update the object and zookeepers stored offset number for a partition to value
|
||||
"""
|
||||
self.offset_update_count += value - self._offsets[partition]
|
||||
self._offsets[partition] = value
|
||||
partition_path = '/'.join((self.topic_path, str(partition)))
|
||||
try:
|
||||
with self.zk_timer.time():
|
||||
self.zookeeper.ensure_path(partition_path)
|
||||
self.zookeeper.set(partition_path, str(value))
|
||||
log.debug('Updated committed offset at path %s, offsets %s' % (partition_path, value))
|
||||
except kazoo.exceptions.KazooException:
|
||||
log.exception('Error updating the committed offset in zookeeper, path %s, value %s'
|
||||
% (partition_path, value))
|
||||
raise
|
||||
|
||||
self._last_commit_time[partition] = time.time()
|
||||
|
||||
@property
|
||||
def offsets(self):
|
||||
"""Generally only initialize the offsets after the lock has been pulled
|
||||
"""
|
||||
if self._offsets is None:
|
||||
self._offsets = self._get_offsets()
|
||||
|
||||
return self._offsets
|
||||
|
||||
def lock(self, exit_method):
|
||||
"""Grab a lock within zookeeper, if not available retry.
|
||||
"""
|
||||
@ -141,6 +103,35 @@ class ZookeeperStateTracker(object):
|
||||
# suspended, the process should be supervised so it starts right back up again.
|
||||
self.zookeeper.add_listener(exit_method)
|
||||
|
||||
@property
|
||||
def offsets(self):
|
||||
"""Return self._offsets, this is a property because generally only initialize the offsets after the lock has
|
||||
been pulled
|
||||
"""
|
||||
if not self.has_lock:
|
||||
log.warn('Reading offsets before the tracker has the lock, they could change')
|
||||
|
||||
if self._offsets is None:
|
||||
# After my pull request is merged I can setup self.consumer as is done in kafka_consumer
|
||||
# then simply use the get_offsets command as below
|
||||
# self._offsets = self.consumer.get_offsets()
|
||||
self._offsets = {}
|
||||
|
||||
def get_or_init_offset_callback(resp):
|
||||
try:
|
||||
kafka.common.check_error(resp)
|
||||
return resp.offset
|
||||
except kafka.common.UnknownTopicOrPartitionError:
|
||||
return 0
|
||||
for partition in self.kafka.topic_partitions[self.topic]:
|
||||
req = kafka.common.OffsetFetchRequest(self.topic, partition)
|
||||
(offset,) = self.kafka.send_offset_fetch_request(self.kafka_group, [req],
|
||||
callback=get_or_init_offset_callback,
|
||||
fail_on_error=False)
|
||||
self._offsets[partition] = offset
|
||||
|
||||
return self._offsets
|
||||
|
||||
def run(self):
|
||||
"""Mark a message as finished and where possible commit the new offset to zookeeper.
|
||||
There is no mechanism here to deal with the situation where a single alarm is extremely slow to finish
|
||||
@ -149,9 +140,6 @@ class ZookeeperStateTracker(object):
|
||||
if not self.has_lock:
|
||||
raise notification_exceptions.NotificationException('Attempt to begin run without Zookeeper Lock')
|
||||
|
||||
if self._offsets is None: # Verify the offsets have been initialized
|
||||
self._offsets = self._get_offsets()
|
||||
|
||||
finished_count = statsd.Counter('AlarmsFinished')
|
||||
while True:
|
||||
# If self.stop is True run the queue until it is empty, do final commits then exit
|
||||
@ -165,7 +153,7 @@ class ZookeeperStateTracker(object):
|
||||
if ((time.time() - self._last_commit_time[partition]) > self.max_lag) and\
|
||||
(len(self._uncommitted_offsets[partition]) > 0):
|
||||
log.error('Max Lag has been reached! Skipping offsets for partition %s' % partition)
|
||||
self._update_offset(partition, max(self._uncommitted_offsets[partition]))
|
||||
self.update_offset(partition, max(self._uncommitted_offsets[partition]))
|
||||
break
|
||||
|
||||
try:
|
||||
@ -178,11 +166,8 @@ class ZookeeperStateTracker(object):
|
||||
offset = int(msg[1])
|
||||
|
||||
log.debug('Received commit finish for partition %d, offset %d' % (partition, offset))
|
||||
# Update immediately if the partition is not yet tracked or the offset is 1 above current offset
|
||||
if partition not in self._offsets:
|
||||
log.debug('Updating offset for partition %d, offset %d' % (partition, offset))
|
||||
self._update_offset(partition, offset)
|
||||
elif self._offsets[partition] == offset - 1:
|
||||
# Update immediately if the offset is 1 above current offset
|
||||
if self.offsets[partition] == offset - 1:
|
||||
|
||||
new_offset = offset
|
||||
for x in range(offset + 1, offset + 1 + len(self._uncommitted_offsets[partition])):
|
||||
@ -192,13 +177,13 @@ class ZookeeperStateTracker(object):
|
||||
else:
|
||||
break
|
||||
|
||||
self._update_offset(partition, new_offset)
|
||||
self.update_offset(partition, new_offset)
|
||||
if offset == new_offset:
|
||||
log.debug('Updating offset for partition %d, offset %d' % (partition, new_offset))
|
||||
else:
|
||||
log.debug('Updating offset for partition %d, offset %d covering this update and older offsets'
|
||||
% (partition, new_offset))
|
||||
elif self._offsets[partition] > offset:
|
||||
elif self.offsets[partition] > offset:
|
||||
log.warn('An offset was received that was lower than the committed offset.' +
|
||||
'Possibly a result of skipping lagging notifications')
|
||||
else: # This is skipping offsets so add to the uncommitted set unless max_lag has been hit
|
||||
@ -206,7 +191,27 @@ class ZookeeperStateTracker(object):
|
||||
log.debug('Added partition %d, offset %d to uncommited set' % (partition, offset))
|
||||
if (self.max_lag is not None) and ((time.time() - self._last_commit_time[partition]) > self.max_lag):
|
||||
log.error('Max Lag has been reached! Skipping offsets for partition %s' % partition)
|
||||
self._update_offset(partition, max(self._uncommitted_offsets[partition]))
|
||||
self.update_offset(partition, max(self._uncommitted_offsets[partition]))
|
||||
self._uncommitted_offsets[partition].clear()
|
||||
|
||||
self._drop_lock()
|
||||
|
||||
def update_offset(self, partition, value):
|
||||
"""Update the object and kafka offset number for a partition to value
|
||||
"""
|
||||
if self._offsets is None: # Initialize offsets if needed
|
||||
self.offsets
|
||||
|
||||
self.offset_update_count += value - self._offsets[partition]
|
||||
self._offsets[partition] = value
|
||||
|
||||
req = kafka.common.OffsetCommitRequest(self.topic, partition, value, None)
|
||||
try:
|
||||
responses = self.kafka.send_offset_commit_request(self.kafka_group, [req])
|
||||
kafka.common.check_error(responses[0])
|
||||
log.debug('Updated committed offset for partition %s, offset %s' % (partition, value))
|
||||
except kafka.common.KafkaError:
|
||||
log.exception('Error updating the committed offset in kafka, partition %s, value %s' % (partition, value))
|
||||
raise
|
||||
|
||||
self._last_commit_time[partition] = time.time()
|
||||
|
@ -1,17 +1,18 @@
|
||||
kafka:
|
||||
url: 192.168.10.10:9092 # or comma seperated list of multiple hosts
|
||||
url: 192.168.10.4:9092 # or comma seperated list of multiple hosts
|
||||
group: monasca-notification
|
||||
alarm_topic: alarm-state-transitions
|
||||
notification_topic: alarm-notifications
|
||||
max_offset_lag: 600 # In seconds, undefined for none
|
||||
|
||||
mysql:
|
||||
host: 192.168.10.6
|
||||
host: 192.168.10.4
|
||||
user: notification
|
||||
passwd: password
|
||||
db: mon
|
||||
|
||||
email:
|
||||
server: smtp3.hp.com
|
||||
server: 192.168.10.4
|
||||
port: 25
|
||||
user:
|
||||
password:
|
||||
@ -32,8 +33,7 @@ queues:
|
||||
sent_notifications_size: 50 # limiting this size reduces potential # of re-sent notifications after a failure
|
||||
|
||||
zookeeper:
|
||||
url: 192.168.10.10:2181 # or comma seperated list of multiple hosts
|
||||
max_offset_lag: 600 # In seconds, undefined for none
|
||||
url: 192.168.10.4:2181 # or comma seperated list of multiple hosts
|
||||
|
||||
logging: # Used in logging.dictConfig
|
||||
version: 1
|
||||
@ -61,4 +61,4 @@ logging: # Used in logging.dictConfig
|
||||
root:
|
||||
handlers:
|
||||
- console
|
||||
level: INFO
|
||||
level: DEBUG
|
||||
|
@ -1,4 +1,4 @@
|
||||
kafka-python>=0.9.0
|
||||
kafka-python>=0.9.1
|
||||
kazoo>=1.3
|
||||
MySQL-python
|
||||
pbr>=0.6,<1.0
|
||||
|
@ -15,6 +15,7 @@
|
||||
|
||||
"""Tests the StateTracker"""
|
||||
|
||||
import kafka.common
|
||||
import mock
|
||||
import multiprocessing
|
||||
import threading
|
||||
@ -29,7 +30,11 @@ class TestStateTracker(unittest.TestCase):
|
||||
self.finished_queue = multiprocessing.Queue(10)
|
||||
with mock.patch('kazoo.client.KazooClient') as self.mock_zk:
|
||||
self.mock_zk.return_value = self.mock_zk
|
||||
self.tracker = state_tracker.ZookeeperStateTracker('url', 'topic', self.finished_queue, 1)
|
||||
with mock.patch('kafka.client.KafkaClient') as self.mock_kafka:
|
||||
self.tracker = state_tracker.KafkaStateTracker(self.finished_queue, 'kafka_url', 'group',
|
||||
'topic', 1, 'zookeeper_url')
|
||||
self.mock_kafka.return_value = self.mock_kafka
|
||||
|
||||
self.tracker.has_lock = True
|
||||
self.tracker_thread = threading.Thread(target=self.tracker.run)
|
||||
self.tracker_thread.daemon = True # needed for the thread to properly exit
|
||||
@ -50,9 +55,12 @@ class TestStateTracker(unittest.TestCase):
|
||||
test_list = [(0, 1), (0, 2), (1, 1), (1, 2)]
|
||||
self._feed_queue(test_list)
|
||||
|
||||
expected_calls = [mock.call.set('/'.join([self.tracker.topic_path, str(partition)]), str(value))
|
||||
expected_calls = [mock.call().send_offset_commit_request('group',
|
||||
[kafka.common.OffsetCommitRequest('topic', partition,
|
||||
value, None)])
|
||||
for partition, value in test_list]
|
||||
set_calls = [call for call in self.mock_zk.mock_calls if call.__str__().startswith('call.set')]
|
||||
set_calls = [call for call in self.mock_kafka.mock_calls
|
||||
if call.__str__().startswith("call().send_offset_commit_request('")]
|
||||
|
||||
self.assertTrue(expected_calls == set_calls)
|
||||
|
||||
@ -63,9 +71,12 @@ class TestStateTracker(unittest.TestCase):
|
||||
self._feed_queue(unordered_test_list)
|
||||
|
||||
commit_list = [(0, 2), (1, 2)]
|
||||
expected_calls = [mock.call.set('/'.join([self.tracker.topic_path, str(partition)]), str(value))
|
||||
expected_calls = [mock.call().send_offset_commit_request('group',
|
||||
[kafka.common.OffsetCommitRequest('topic', partition,
|
||||
value, None)])
|
||||
for partition, value in commit_list]
|
||||
set_calls = [call for call in self.mock_zk.mock_calls if call.__str__().startswith('call.set')]
|
||||
set_calls = [call for call in self.mock_kafka.mock_calls
|
||||
if call.__str__().startswith("call().send_offset_commit_request('")]
|
||||
|
||||
self.assertTrue(expected_calls == set_calls)
|
||||
|
||||
@ -83,8 +94,11 @@ class TestStateTracker(unittest.TestCase):
|
||||
time.sleep(1) # wait for processing
|
||||
|
||||
commit_list = [(0, 3), (1, 3)]
|
||||
expected_calls = [mock.call.set('/'.join([self.tracker.topic_path, str(partition)]), str(value))
|
||||
expected_calls = [mock.call().send_offset_commit_request('group',
|
||||
[kafka.common.OffsetCommitRequest('topic', partition,
|
||||
value, None)])
|
||||
for partition, value in commit_list]
|
||||
set_calls = [call for call in self.mock_zk.mock_calls if call.__str__().startswith('call.set')]
|
||||
set_calls = [call for call in self.mock_kafka.mock_calls
|
||||
if call.__str__().startswith("call().send_offset_commit_request('")]
|
||||
|
||||
self.assertTrue(expected_calls == set_calls)
|
||||
|
@ -29,7 +29,7 @@ from monasca_notification import state_tracker
|
||||
|
||||
|
||||
def listener():
|
||||
"""Simple listener for ZookeeperStateTracker
|
||||
"""Simple listener for KafkaStateTracker
|
||||
"""
|
||||
sys.exit(1)
|
||||
|
||||
@ -53,12 +53,12 @@ def main():
|
||||
|
||||
# Parse config and setup state tracker
|
||||
config = yaml.load(open(args.config, 'r'))
|
||||
tracker = state_tracker.ZookeeperStateTracker(
|
||||
config['zookeeper']['url'], config['kafka']['alarm_topic'], None, config['zookeeper']['max_offset_lag'])
|
||||
tracker = state_tracker.KafkaStateTracker(None, config['kafka']['url'], config['kafka']['group'],
|
||||
config['kafka']['alarm_topic'], config['kafka']['max_offset_lag'],
|
||||
config['zookeeper']['url'])
|
||||
|
||||
current_offsests = tracker.offsets
|
||||
if args.list:
|
||||
print(json.dumps(current_offsests))
|
||||
print(json.dumps(tracker.offsets))
|
||||
else:
|
||||
offsets = json.loads(args.set_offsets)
|
||||
raw_input("Warning setting offset will affect the behavior of the next notification engine to run.\n" +
|
||||
@ -70,7 +70,7 @@ def main():
|
||||
|
||||
tracker.lock(listener)
|
||||
for partition in offsets.iterkeys():
|
||||
tracker._update_offset(int(partition), int(offsets[partition]))
|
||||
tracker.update_offset(int(partition), int(offsets[partition]))
|
||||
|
||||
if __name__ == "__main__":
|
||||
sys.exit(main())
|
||||
|
Loading…
x
Reference in New Issue
Block a user