From 87648d74f49dafb6146bb61c40d8d2d44146ff8b Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Sun, 17 Jul 2016 10:17:10 -0700 Subject: [PATCH] Simplify RecordMetadata; short circuit callbacks (#768) Simplify RecordMetadata to unaltered namedtuple -- minor speed optimization Minor optimization: inline check for no callbacks --- kafka/future.py | 3 ++- kafka/producer/future.py | 26 +++++++------------------- 2 files changed, 9 insertions(+), 20 deletions(-) diff --git a/kafka/future.py b/kafka/future.py index a4b7deb..4a3af47 100644 --- a/kafka/future.py +++ b/kafka/future.py @@ -30,7 +30,8 @@ class Future(object): assert not self.is_done, 'Future is already complete' self.value = value self.is_done = True - self._call_backs('callback', self._callbacks, self.value) + if self._callbacks: + self._call_backs('callback', self._callbacks, self.value) return self def failure(self, e): diff --git a/kafka/producer/future.py b/kafka/producer/future.py index 27cf33b..041e3a2 100644 --- a/kafka/producer/future.py +++ b/kafka/producer/future.py @@ -38,12 +38,14 @@ class FutureRecordMetadata(Future): produce_future.add_errback(self.failure) def _produce_success(self, offset_and_timestamp): - base_offset, timestamp_ms = offset_and_timestamp + offset, timestamp_ms = offset_and_timestamp if timestamp_ms is None: timestamp_ms = self.timestamp_ms - self.success(RecordMetadata(self._produce_future.topic_partition, - base_offset, timestamp_ms, - self.relative_offset)) + if offset != -1 and self.relative_offset is not None: + offset += self.relative_offset + tp = self._produce_future.topic_partition + metadata = RecordMetadata(tp[0], tp[1], tp, offset, timestamp_ms) + self.success(metadata) def get(self, timeout=None): if not self.is_done and not self._produce_future.wait(timeout): @@ -55,18 +57,4 @@ class FutureRecordMetadata(Future): return self.value -class RecordMetadata(collections.namedtuple( - 'RecordMetadata', 'topic partition topic_partition offset timestamp')): - def __new__(cls, tp, base_offset, timestamp, relative_offset=None): - offset = base_offset - if relative_offset is not None and base_offset != -1: - offset += relative_offset - return super(RecordMetadata, cls).__new__(cls, tp.topic, tp.partition, - tp, offset, timestamp) - - def __str__(self): - return 'RecordMetadata(topic=%s, partition=%s, offset=%s)' % ( - self.topic, self.partition, self.offset) - - def __repr__(self): - return str(self) +RecordMetadata = collections.namedtuple('RecordMetadata', 'topic partition topic_partition offset timestamp')