Rename KafkaTransaction to OffsetCommitContext for clarity.
This commit is contained in:
@@ -1,14 +1,14 @@
|
|||||||
"""
|
"""
|
||||||
Transactional commit and rollback semantics for consumer.
|
Context manager to commit/rollback consumer offsets.
|
||||||
"""
|
"""
|
||||||
from logging import getLogger
|
from logging import getLogger
|
||||||
|
|
||||||
from kafka.common import check_error, OffsetCommitRequest, OffsetOutOfRangeError
|
from kafka.common import check_error, OffsetCommitRequest, OffsetOutOfRangeError
|
||||||
|
|
||||||
|
|
||||||
class KafkaTransaction(object):
|
class OffsetCommitContext(object):
|
||||||
"""
|
"""
|
||||||
Provides transactional commit/rollback semantics around a `SimpleConsumer`.
|
Provides commit/rollback semantics around a `SimpleConsumer`.
|
||||||
|
|
||||||
Usage assumes that `auto_commit` is disabled, that messages are consumed in
|
Usage assumes that `auto_commit` is disabled, that messages are consumed in
|
||||||
batches, and that the consuming process will record its own successful
|
batches, and that the consuming process will record its own successful
|
||||||
@@ -23,16 +23,16 @@ class KafkaTransaction(object):
|
|||||||
consumer.fetch_last_known_offsets()
|
consumer.fetch_last_known_offsets()
|
||||||
|
|
||||||
while some_condition:
|
while some_condition:
|
||||||
with KafkaTransaction(consumer) as transaction:
|
with OffsetCommitContext(consumer) as context:
|
||||||
messages = consumer.get_messages(count, block=False)
|
messages = consumer.get_messages(count, block=False)
|
||||||
|
|
||||||
for partition, message in messages:
|
for partition, message in messages:
|
||||||
if can_process(message):
|
if can_process(message):
|
||||||
transaction.mark(partition, message.offset)
|
context.mark(partition, message.offset)
|
||||||
else:
|
else:
|
||||||
break
|
break
|
||||||
|
|
||||||
if not transaction:
|
if not context:
|
||||||
sleep(delay)
|
sleep(delay)
|
||||||
|
|
||||||
|
|
||||||
@@ -48,11 +48,11 @@ class KafkaTransaction(object):
|
|||||||
self.consumer = consumer
|
self.consumer = consumer
|
||||||
self.initial_offsets = None
|
self.initial_offsets = None
|
||||||
self.high_water_mark = None
|
self.high_water_mark = None
|
||||||
self.logger = getLogger("kafka.transaction")
|
self.logger = getLogger("kafka.context")
|
||||||
|
|
||||||
def mark(self, partition, offset):
|
def mark(self, partition, offset):
|
||||||
"""
|
"""
|
||||||
Set the high-water mark in the current transaction.
|
Set the high-water mark in the current context.
|
||||||
|
|
||||||
In order to know the current partition, it is helpful to initialize
|
In order to know the current partition, it is helpful to initialize
|
||||||
the consumer to provide partition info via:
|
the consumer to provide partition info via:
|
||||||
@@ -68,13 +68,13 @@ class KafkaTransaction(object):
|
|||||||
|
|
||||||
def __nonzero__(self):
|
def __nonzero__(self):
|
||||||
"""
|
"""
|
||||||
Return whether any operations were marked in the transaction.
|
Return whether any operations were marked in the context.
|
||||||
"""
|
"""
|
||||||
return bool(self.high_water_mark)
|
return bool(self.high_water_mark)
|
||||||
|
|
||||||
def __enter__(self):
|
def __enter__(self):
|
||||||
"""
|
"""
|
||||||
Start a new transaction:
|
Start a new context:
|
||||||
|
|
||||||
- Record the initial offsets for rollback
|
- Record the initial offsets for rollback
|
||||||
- Reset the high-water mark
|
- Reset the high-water mark
|
||||||
@@ -82,13 +82,13 @@ class KafkaTransaction(object):
|
|||||||
self.initial_offsets = dict(self.consumer.offsets)
|
self.initial_offsets = dict(self.consumer.offsets)
|
||||||
self.high_water_mark = dict()
|
self.high_water_mark = dict()
|
||||||
|
|
||||||
self.logger.debug("Starting transaction at: %s", self.initial_offsets)
|
self.logger.debug("Starting context at: %s", self.initial_offsets)
|
||||||
|
|
||||||
return self
|
return self
|
||||||
|
|
||||||
def __exit__(self, exc_type, exc_value, traceback):
|
def __exit__(self, exc_type, exc_value, traceback):
|
||||||
"""
|
"""
|
||||||
End a transaction.
|
End a context.
|
||||||
|
|
||||||
- If there was no exception, commit up to the current high-water mark.
|
- If there was no exception, commit up to the current high-water mark.
|
||||||
- If there was an offset of range error, attempt to find the correct
|
- If there was an offset of range error, attempt to find the correct
|
||||||
@@ -105,14 +105,14 @@ class KafkaTransaction(object):
|
|||||||
|
|
||||||
def commit(self):
|
def commit(self):
|
||||||
"""
|
"""
|
||||||
Commit this transaction:
|
Commit this context's offsets:
|
||||||
|
|
||||||
- If the high-water mark has moved, commit up to and position the
|
- If the high-water mark has moved, commit up to and position the
|
||||||
consumer at the high-water mark.
|
consumer at the high-water mark.
|
||||||
- Otherwise, reset to the consumer to the initial offsets.
|
- Otherwise, reset to the consumer to the initial offsets.
|
||||||
"""
|
"""
|
||||||
if self.high_water_mark:
|
if self.high_water_mark:
|
||||||
self.logger.info("Committing transaction: %s", self.high_water_mark)
|
self.logger.info("Committing offsets: %s", self.high_water_mark)
|
||||||
self.commit_partition_offsets(self.high_water_mark)
|
self.commit_partition_offsets(self.high_water_mark)
|
||||||
self.update_consumer_offsets(self.high_water_mark)
|
self.update_consumer_offsets(self.high_water_mark)
|
||||||
else:
|
else:
|
||||||
@@ -120,11 +120,11 @@ class KafkaTransaction(object):
|
|||||||
|
|
||||||
def rollback(self):
|
def rollback(self):
|
||||||
"""
|
"""
|
||||||
Rollback this transaction:
|
Rollback this context:
|
||||||
|
|
||||||
- Position the consumer at the initial offsets.
|
- Position the consumer at the initial offsets.
|
||||||
"""
|
"""
|
||||||
self.logger.info("Rolling back transaction: %s", self.initial_offsets)
|
self.logger.info("Rolling back context: %s", self.initial_offsets)
|
||||||
self.update_consumer_offsets(self.initial_offsets)
|
self.update_consumer_offsets(self.initial_offsets)
|
||||||
|
|
||||||
def commit_partition_offsets(self, partition_offsets):
|
def commit_partition_offsets(self, partition_offsets):
|
@@ -1,17 +1,17 @@
|
|||||||
"""
|
"""
|
||||||
KafkaTransaction tests.
|
OffsetCommitContext tests.
|
||||||
"""
|
"""
|
||||||
from unittest2 import TestCase
|
from unittest2 import TestCase
|
||||||
|
|
||||||
from mock import MagicMock, patch
|
from mock import MagicMock, patch
|
||||||
|
|
||||||
from kafka.common import OffsetOutOfRangeError
|
from kafka.common import OffsetOutOfRangeError
|
||||||
from kafka.transaction import KafkaTransaction
|
from kafka.context import OffsetCommitContext
|
||||||
|
|
||||||
|
|
||||||
class TestKafkaTransaction(TestCase):
|
class TestOffsetCommitContext(TestCase):
|
||||||
"""
|
"""
|
||||||
KafkaTransaction tests.
|
OffsetCommitContext tests.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
def setUp(self):
|
def setUp(self):
|
||||||
@@ -24,13 +24,13 @@ class TestKafkaTransaction(TestCase):
|
|||||||
self.consumer.group = self.group
|
self.consumer.group = self.group
|
||||||
self.consumer.client = self.client
|
self.consumer.client = self.client
|
||||||
self.consumer.offsets = {self.partition: 0}
|
self.consumer.offsets = {self.partition: 0}
|
||||||
self.transaction = KafkaTransaction(self.consumer)
|
self.context = OffsetCommitContext(self.consumer)
|
||||||
|
|
||||||
def test_noop(self):
|
def test_noop(self):
|
||||||
"""
|
"""
|
||||||
Should revert consumer after transaction with no mark() call.
|
Should revert consumer after context exit with no mark() call.
|
||||||
"""
|
"""
|
||||||
with self.transaction:
|
with self.context:
|
||||||
# advance offset
|
# advance offset
|
||||||
self.consumer.offsets = {self.partition: 1}
|
self.consumer.offsets = {self.partition: 1}
|
||||||
|
|
||||||
@@ -42,10 +42,10 @@ class TestKafkaTransaction(TestCase):
|
|||||||
|
|
||||||
def test_mark(self):
|
def test_mark(self):
|
||||||
"""
|
"""
|
||||||
Should remain at marked location.
|
Should remain at marked location ater context exit.
|
||||||
"""
|
"""
|
||||||
with self.transaction as transaction:
|
with self.context as context:
|
||||||
transaction.mark(self.partition, 0)
|
context.mark(self.partition, 0)
|
||||||
# advance offset
|
# advance offset
|
||||||
self.consumer.offsets = {self.partition: 1}
|
self.consumer.offsets = {self.partition: 1}
|
||||||
|
|
||||||
@@ -61,12 +61,12 @@ class TestKafkaTransaction(TestCase):
|
|||||||
|
|
||||||
def test_mark_multiple(self):
|
def test_mark_multiple(self):
|
||||||
"""
|
"""
|
||||||
Should remain at highest marked location.
|
Should remain at highest marked location after context exit.
|
||||||
"""
|
"""
|
||||||
with self.transaction as transaction:
|
with self.context as context:
|
||||||
transaction.mark(self.partition, 0)
|
context.mark(self.partition, 0)
|
||||||
transaction.mark(self.partition, 1)
|
context.mark(self.partition, 1)
|
||||||
transaction.mark(self.partition, 2)
|
context.mark(self.partition, 2)
|
||||||
# advance offset
|
# advance offset
|
||||||
self.consumer.offsets = {self.partition: 3}
|
self.consumer.offsets = {self.partition: 3}
|
||||||
|
|
||||||
@@ -82,11 +82,11 @@ class TestKafkaTransaction(TestCase):
|
|||||||
|
|
||||||
def test_rollback(self):
|
def test_rollback(self):
|
||||||
"""
|
"""
|
||||||
Should rollback to beginning of transaction.
|
Should rollback to initial offsets on context exit with exception.
|
||||||
"""
|
"""
|
||||||
with self.assertRaises(Exception):
|
with self.assertRaises(Exception):
|
||||||
with self.transaction as transaction:
|
with self.context as context:
|
||||||
transaction.mark(self.partition, 0)
|
context.mark(self.partition, 0)
|
||||||
# advance offset
|
# advance offset
|
||||||
self.consumer.offsets = {self.partition: 1}
|
self.consumer.offsets = {self.partition: 1}
|
||||||
|
|
||||||
@@ -101,7 +101,7 @@ class TestKafkaTransaction(TestCase):
|
|||||||
|
|
||||||
def test_out_of_range(self):
|
def test_out_of_range(self):
|
||||||
"""
|
"""
|
||||||
Should remain at beginning of range.
|
Should reset to beginning of valid offsets on `OffsetOutOfRangeError`
|
||||||
"""
|
"""
|
||||||
def _seek(offset, whence):
|
def _seek(offset, whence):
|
||||||
# seek must be called with 0, 0 to find the beginning of the range
|
# seek must be called with 0, 0 to find the beginning of the range
|
||||||
@@ -111,7 +111,7 @@ class TestKafkaTransaction(TestCase):
|
|||||||
self.consumer.offsets = {self.partition: 100}
|
self.consumer.offsets = {self.partition: 100}
|
||||||
|
|
||||||
with patch.object(self.consumer, "seek", _seek):
|
with patch.object(self.consumer, "seek", _seek):
|
||||||
with self.transaction:
|
with self.context:
|
||||||
raise OffsetOutOfRangeError()
|
raise OffsetOutOfRangeError()
|
||||||
|
|
||||||
self.assertEqual(self.consumer.offsets, {self.partition: 100})
|
self.assertEqual(self.consumer.offsets, {self.partition: 100})
|
Reference in New Issue
Block a user