Fixed couple of "leaks" when gc is disabled (#979)
This commit is contained in:
committed by
Dana Powers
parent
82d50f443e
commit
5a0e9715f4
@@ -133,21 +133,26 @@ class KafkaProtocol(object):
|
||||
if acks not in (1, 0, -1):
|
||||
raise ValueError('ProduceRequest acks (%s) must be 1, 0, -1' % acks)
|
||||
|
||||
topics = []
|
||||
for topic, topic_payloads in group_by_topic_and_partition(payloads).items():
|
||||
topic_msgs = []
|
||||
for partition, payload in topic_payloads.items():
|
||||
partition_msgs = []
|
||||
for msg in payload.messages:
|
||||
m = kafka.protocol.message.Message(
|
||||
msg.value, key=msg.key,
|
||||
magic=msg.magic, attributes=msg.attributes
|
||||
)
|
||||
partition_msgs.append((0, m.encode()))
|
||||
topic_msgs.append((partition, partition_msgs))
|
||||
topics.append((topic, topic_msgs))
|
||||
|
||||
|
||||
return kafka.protocol.produce.ProduceRequest[0](
|
||||
required_acks=acks,
|
||||
timeout=timeout,
|
||||
topics=[(
|
||||
topic,
|
||||
[(
|
||||
partition,
|
||||
[(0,
|
||||
kafka.protocol.message.Message(
|
||||
msg.value, key=msg.key,
|
||||
magic=msg.magic, attributes=msg.attributes
|
||||
).encode())
|
||||
for msg in payload.messages])
|
||||
for partition, payload in topic_payloads.items()])
|
||||
for topic, topic_payloads in group_by_topic_and_partition(payloads).items()])
|
||||
topics=topics
|
||||
)
|
||||
|
||||
@classmethod
|
||||
def decode_produce_response(cls, response):
|
||||
|
||||
@@ -10,7 +10,7 @@ from .struct import Struct
|
||||
from .types import (
|
||||
Int8, Int32, Int64, Bytes, Schema, AbstractType
|
||||
)
|
||||
from ..util import crc32
|
||||
from ..util import crc32, WeakMethod
|
||||
|
||||
|
||||
class Message(Struct):
|
||||
@@ -52,7 +52,7 @@ class Message(Struct):
|
||||
self.attributes = attributes
|
||||
self.key = key
|
||||
self.value = value
|
||||
self.encode = self._encode_self
|
||||
self.encode = WeakMethod(self._encode_self)
|
||||
|
||||
@property
|
||||
def timestamp_type(self):
|
||||
|
||||
@@ -5,6 +5,8 @@ from io import BytesIO
|
||||
from .abstract import AbstractType
|
||||
from .types import Schema
|
||||
|
||||
from ..util import WeakMethod
|
||||
|
||||
|
||||
class Struct(AbstractType):
|
||||
SCHEMA = Schema()
|
||||
@@ -19,7 +21,9 @@ class Struct(AbstractType):
|
||||
self.__dict__.update(kwargs)
|
||||
|
||||
# overloading encode() to support both class and instance
|
||||
self.encode = self._encode_self
|
||||
# Without WeakMethod() this creates circular ref, which
|
||||
# causes instances to "leak" to garbage
|
||||
self.encode = WeakMethod(self._encode_self)
|
||||
|
||||
@classmethod
|
||||
def encode(cls, item): # pylint: disable=E0202
|
||||
|
||||
4
kafka/vendor/six.py
vendored
4
kafka/vendor/six.py
vendored
@@ -70,7 +70,9 @@ else:
|
||||
else:
|
||||
# 64-bit
|
||||
MAXSIZE = int((1 << 63) - 1)
|
||||
del X
|
||||
|
||||
# Don't del it here, cause with gc disabled this "leaks" to garbage
|
||||
# del X
|
||||
|
||||
|
||||
def _add_doc(func, doc):
|
||||
|
||||
Reference in New Issue
Block a user