break up some circular references and close client wake pipe on __del__
This commit is contained in:
@@ -97,6 +97,10 @@ class KafkaClient(object):
|
|||||||
self._bootstrap(collect_hosts(self.config['bootstrap_servers']))
|
self._bootstrap(collect_hosts(self.config['bootstrap_servers']))
|
||||||
self._wake_r, self._wake_w = os.pipe()
|
self._wake_r, self._wake_w = os.pipe()
|
||||||
|
|
||||||
|
def __del__(self):
|
||||||
|
os.close(self._wake_r)
|
||||||
|
os.close(self._wake_w)
|
||||||
|
|
||||||
def _bootstrap(self, hosts):
|
def _bootstrap(self, hosts):
|
||||||
# Exponential backoff if bootstrap fails
|
# Exponential backoff if bootstrap fails
|
||||||
backoff_ms = self.config['reconnect_backoff_ms'] * 2 ** self._bootstrap_fails
|
backoff_ms = self.config['reconnect_backoff_ms'] * 2 ** self._bootstrap_fails
|
||||||
|
@@ -2,6 +2,7 @@ import abc
|
|||||||
import copy
|
import copy
|
||||||
import logging
|
import logging
|
||||||
import time
|
import time
|
||||||
|
import weakref
|
||||||
|
|
||||||
import six
|
import six
|
||||||
|
|
||||||
@@ -85,9 +86,12 @@ class BaseCoordinator(object):
|
|||||||
self.rejoin_needed = True
|
self.rejoin_needed = True
|
||||||
self.needs_join_prepare = True
|
self.needs_join_prepare = True
|
||||||
self.heartbeat = Heartbeat(**self.config)
|
self.heartbeat = Heartbeat(**self.config)
|
||||||
self.heartbeat_task = HeartbeatTask(self)
|
self.heartbeat_task = HeartbeatTask(weakref.proxy(self))
|
||||||
#self.sensors = GroupCoordinatorMetrics(metrics, metric_group_prefix, metric_tags)
|
#self.sensors = GroupCoordinatorMetrics(metrics, metric_group_prefix, metric_tags)
|
||||||
|
|
||||||
|
def __del__(self):
|
||||||
|
self.heartbeat_task.disable()
|
||||||
|
|
||||||
@abc.abstractmethod
|
@abc.abstractmethod
|
||||||
def protocol_type(self):
|
def protocol_type(self):
|
||||||
"""
|
"""
|
||||||
@@ -572,6 +576,12 @@ class HeartbeatTask(object):
|
|||||||
self._client = coordinator._client
|
self._client = coordinator._client
|
||||||
self._request_in_flight = False
|
self._request_in_flight = False
|
||||||
|
|
||||||
|
def disable(self):
|
||||||
|
try:
|
||||||
|
self._client.unschedule(self)
|
||||||
|
except KeyError:
|
||||||
|
pass
|
||||||
|
|
||||||
def reset(self):
|
def reset(self):
|
||||||
# start or restart the heartbeat task to be executed at the next chance
|
# start or restart the heartbeat task to be executed at the next chance
|
||||||
self._heartbeat.reset_session_timeout()
|
self._heartbeat.reset_session_timeout()
|
||||||
|
@@ -4,20 +4,20 @@ import copy
|
|||||||
import collections
|
import collections
|
||||||
import logging
|
import logging
|
||||||
import time
|
import time
|
||||||
|
import weakref
|
||||||
|
|
||||||
import six
|
import six
|
||||||
|
|
||||||
from .base import BaseCoordinator
|
from .base import BaseCoordinator
|
||||||
from .assignors.range import RangePartitionAssignor
|
from .assignors.range import RangePartitionAssignor
|
||||||
from .assignors.roundrobin import RoundRobinPartitionAssignor
|
from .assignors.roundrobin import RoundRobinPartitionAssignor
|
||||||
from .protocol import (
|
from .protocol import ConsumerProtocol
|
||||||
ConsumerProtocolMemberMetadata, ConsumerProtocolMemberAssignment,
|
|
||||||
ConsumerProtocol)
|
|
||||||
from ..common import OffsetAndMetadata, TopicPartition
|
from ..common import OffsetAndMetadata, TopicPartition
|
||||||
from ..future import Future
|
from ..future import Future
|
||||||
from ..protocol.commit import (
|
from ..protocol.commit import (
|
||||||
OffsetCommitRequest_v2, OffsetCommitRequest_v1, OffsetCommitRequest_v0,
|
OffsetCommitRequest_v2, OffsetCommitRequest_v1, OffsetCommitRequest_v0,
|
||||||
OffsetFetchRequest_v0, OffsetFetchRequest_v1)
|
OffsetFetchRequest_v0, OffsetFetchRequest_v1)
|
||||||
|
from ..util import WeakMethod
|
||||||
|
|
||||||
import kafka.common as Errors
|
import kafka.common as Errors
|
||||||
|
|
||||||
@@ -83,7 +83,7 @@ class ConsumerCoordinator(BaseCoordinator):
|
|||||||
self._partitions_per_topic = {}
|
self._partitions_per_topic = {}
|
||||||
self._cluster = client.cluster
|
self._cluster = client.cluster
|
||||||
self._cluster.request_update()
|
self._cluster.request_update()
|
||||||
self._cluster.add_listener(self._handle_metadata_update)
|
self._cluster.add_listener(WeakMethod(self._handle_metadata_update))
|
||||||
|
|
||||||
self._auto_commit_task = None
|
self._auto_commit_task = None
|
||||||
if self.config['enable_auto_commit']:
|
if self.config['enable_auto_commit']:
|
||||||
@@ -95,13 +95,18 @@ class ConsumerCoordinator(BaseCoordinator):
|
|||||||
log.warning('group_id is None: disabling auto-commit.')
|
log.warning('group_id is None: disabling auto-commit.')
|
||||||
else:
|
else:
|
||||||
interval = self.config['auto_commit_interval_ms'] / 1000.0
|
interval = self.config['auto_commit_interval_ms'] / 1000.0
|
||||||
self._auto_commit_task = AutoCommitTask(self, interval)
|
self._auto_commit_task = AutoCommitTask(weakref.proxy(self), interval)
|
||||||
|
|
||||||
# metrics=None,
|
# metrics=None,
|
||||||
# metric_group_prefix=None,
|
# metric_group_prefix=None,
|
||||||
# metric_tags=None,
|
# metric_tags=None,
|
||||||
# self.sensors = ConsumerCoordinatorMetrics(metrics, metric_group_prefix, metric_tags)
|
# self.sensors = ConsumerCoordinatorMetrics(metrics, metric_group_prefix, metric_tags)
|
||||||
|
|
||||||
|
def __del__(self):
|
||||||
|
if self._auto_commit_task:
|
||||||
|
self._auto_commit_task.disable()
|
||||||
|
self._cluster.remove_listener(WeakMethod(self._handle_metadata_update))
|
||||||
|
|
||||||
def protocol_type(self):
|
def protocol_type(self):
|
||||||
return ConsumerProtocol.PROTOCOL_TYPE
|
return ConsumerProtocol.PROTOCOL_TYPE
|
||||||
|
|
||||||
|
@@ -3,6 +3,7 @@ import collections
|
|||||||
import struct
|
import struct
|
||||||
import sys
|
import sys
|
||||||
from threading import Thread, Event
|
from threading import Thread, Event
|
||||||
|
import weakref
|
||||||
|
|
||||||
import six
|
import six
|
||||||
|
|
||||||
@@ -151,3 +152,39 @@ class ReentrantTimer(object):
|
|||||||
|
|
||||||
def __del__(self):
|
def __del__(self):
|
||||||
self.stop()
|
self.stop()
|
||||||
|
|
||||||
|
|
||||||
|
class WeakMethod(object):
|
||||||
|
"""
|
||||||
|
Callable that weakly references a method and the object it is bound to. It
|
||||||
|
is based on http://stackoverflow.com/a/24287465.
|
||||||
|
|
||||||
|
Arguments:
|
||||||
|
|
||||||
|
object_dot_method: A bound instance method (i.e. 'object.method').
|
||||||
|
"""
|
||||||
|
def __init__(self, object_dot_method):
|
||||||
|
try:
|
||||||
|
self.target = weakref.ref(object_dot_method.__self__)
|
||||||
|
except AttributeError:
|
||||||
|
self.target = weakref.ref(object_dot_method.im_self)
|
||||||
|
self._target_id = id(self.target())
|
||||||
|
try:
|
||||||
|
self.method = weakref.ref(object_dot_method.__func__)
|
||||||
|
except AttributeError:
|
||||||
|
self.method = weakref.ref(object_dot_method.im_func)
|
||||||
|
self._method_id = id(self.method())
|
||||||
|
|
||||||
|
def __call__(self, *args, **kwargs):
|
||||||
|
"""
|
||||||
|
Calls the method on target with args and kwargs.
|
||||||
|
"""
|
||||||
|
return self.method()(self.target(), *args, **kwargs)
|
||||||
|
|
||||||
|
def __hash__(self):
|
||||||
|
return hash(self.target) ^ hash(self.method)
|
||||||
|
|
||||||
|
def __eq__(self, other):
|
||||||
|
if not isinstance(other, WeakMethod):
|
||||||
|
return False
|
||||||
|
return self._target_id == other._target_id and self._method_id == other._method_id
|
||||||
|
@@ -19,6 +19,7 @@ from kafka.protocol.commit import (
|
|||||||
OffsetCommitResponse, OffsetFetchRequest_v0, OffsetFetchRequest_v1,
|
OffsetCommitResponse, OffsetFetchRequest_v0, OffsetFetchRequest_v1,
|
||||||
OffsetFetchResponse)
|
OffsetFetchResponse)
|
||||||
from kafka.protocol.metadata import MetadataResponse
|
from kafka.protocol.metadata import MetadataResponse
|
||||||
|
from kafka.util import WeakMethod
|
||||||
|
|
||||||
import kafka.common as Errors
|
import kafka.common as Errors
|
||||||
|
|
||||||
@@ -46,7 +47,7 @@ def test_init(conn):
|
|||||||
|
|
||||||
# metadata update on init
|
# metadata update on init
|
||||||
assert cli.cluster._need_update is True
|
assert cli.cluster._need_update is True
|
||||||
assert coordinator._handle_metadata_update in cli.cluster._listeners
|
assert WeakMethod(coordinator._handle_metadata_update) in cli.cluster._listeners
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.parametrize("api_version", [(0, 8, 0), (0, 8, 1), (0, 8, 2), (0, 9)])
|
@pytest.mark.parametrize("api_version", [(0, 8, 0), (0, 8, 1), (0, 8, 2), (0, 9)])
|
||||||
|
Reference in New Issue
Block a user