From e3d99b2127391af184a4c16a2c4812f87d85dbc9 Mon Sep 17 00:00:00 2001 From: Kenneth Giusti Date: Wed, 16 Sep 2015 16:47:12 -0400 Subject: [PATCH] Fix a few leaks in the AMQP 1.0 driver. Break any references that can form a cyclic reference. Several of proton's python classes define __del__ methods, so a cyclic reference must be broken manually. Change-Id: Icabc7abd01a02cb80c7eb4a0de682e90308459ef Closes-Bug: #1496540 --- .../_drivers/protocols/amqp/controller.py | 17 +++++++++ oslo_messaging/tests/test_amqp_driver.py | 37 ++++++++++++++++--- 2 files changed, 49 insertions(+), 5 deletions(-) diff --git a/oslo_messaging/_drivers/protocols/amqp/controller.py b/oslo_messaging/_drivers/protocols/amqp/controller.py index d259eb968..4f545e796 100644 --- a/oslo_messaging/_drivers/protocols/amqp/controller.py +++ b/oslo_messaging/_drivers/protocols/amqp/controller.py @@ -72,6 +72,10 @@ class Replies(pyngus.ReceiverEventHandler): self._credit = 0 self._receiver.open() + def destroy(self): + self._correlation = None + self._receiver = None + def ready(self): return self._ready @@ -157,6 +161,7 @@ class Server(pyngus.ReceiverEventHandler): self._incoming = incoming self._addresses = addresses self._capacity = 500 # credit per link + self._receivers = None def attach(self, connection): """Create receiver links over the given connection for all the @@ -180,6 +185,9 @@ class Server(pyngus.ReceiverEventHandler): r.open() self._receivers.append(r) + def destroy(self): + self._receivers = None + # Pyngus ReceiverLink event callbacks: def receiver_remote_closed(self, receiver, pn_condition): @@ -310,6 +318,15 @@ class Controller(pyngus.ConnectionEventHandler): LOG.debug("Waiting for eventloop to exit") self.processor.shutdown(wait, timeout) self.processor = None + self._tasks = None + self._senders = None + for server in self._servers.values(): + server.destroy() + self._servers.clear() + self._socket_connection = None + if self._replies: + self._replies.destroy() + self._replies = None LOG.debug("Eventloop exited, driver shut down") # The remaining methods are reserved to run from the eventloop thread only! diff --git a/oslo_messaging/tests/test_amqp_driver.py b/oslo_messaging/tests/test_amqp_driver.py index 6955658e9..18aa2879a 100644 --- a/oslo_messaging/tests/test_amqp_driver.py +++ b/oslo_messaging/tests/test_amqp_driver.py @@ -396,6 +396,7 @@ mech_list: ${mechs} super(TestCyrusAuthentication, self).tearDown() if self._broker: self._broker.stop() + self._broker = None if self._conf_dir: shutil.rmtree(self._conf_dir, ignore_errors=True) @@ -546,12 +547,20 @@ class FakeBroker(threading.Thread): self.connection.pn_sasl.server() self.connection.open() self.sender_links = set() + self.receiver_links = set() self.closed = False def destroy(self): """Destroy the test connection.""" - while self.sender_links: - link = self.sender_links.pop() + # destroy modifies the set, so make a copy + tmp = self.sender_links.copy() + while tmp: + link = tmp.pop() + link.destroy() + # destroy modifies the set, so make a copy + tmp = self.receiver_links.copy() + while tmp: + link = tmp.pop() link.destroy() self.connection.destroy() self.connection = None @@ -622,16 +631,21 @@ class FakeBroker(threading.Thread): """An AMQP sending link.""" def __init__(self, server, conn, handle, src_addr=None): self.server = server + self.conn = conn cnn = conn.connection self.link = cnn.accept_sender(handle, source_override=src_addr, event_handler=self) + conn.sender_links.add(self) self.link.open() self.routed = False def destroy(self): """Destroy the link.""" self._cleanup() + conn = self.conn + self.conn = None + conn.sender_links.remove(self) if self.link: self.link.destroy() self.link = None @@ -663,21 +677,31 @@ class FakeBroker(threading.Thread): """An AMQP Receiving link.""" def __init__(self, server, conn, handle, addr=None): self.server = server + self.conn = conn cnn = conn.connection self.link = cnn.accept_receiver(handle, target_override=addr, event_handler=self) + conn.receiver_links.add(self) self.link.open() self.link.add_capacity(10) + def destroy(self): + """Destroy the link.""" + conn = self.conn + self.conn = None + conn.receiver_links.remove(self) + if self.link: + self.link.destroy() + self.link = None + # ReceiverEventHandler callbacks: def receiver_remote_closed(self, receiver_link, error): self.link.close() def receiver_closed(self, receiver_link): - self.link.destroy() - self.link = None + self.destroy() def message_received(self, receiver_link, message, handle): """Forward this message out the proper sending link.""" @@ -802,8 +826,11 @@ class FakeBroker(threading.Thread): # Shutting down self._my_socket.close() - for conn in self._connections.itervalues(): + for conn in self._connections.values(): conn.destroy() + self._connections = None + self.container.destroy() + self.container = None return 0 def add_route(self, address, link):