Merge "Fix a few leaks in the AMQP 1.0 driver."

This commit is contained in:
Jenkins 2015-09-17 10:47:00 +00:00 committed by Gerrit Code Review
commit 08e64b28bd
2 changed files with 49 additions and 5 deletions

View File

@ -72,6 +72,10 @@ class Replies(pyngus.ReceiverEventHandler):
self._credit = 0
self._receiver.open()
def destroy(self):
self._correlation = None
self._receiver = None
def ready(self):
return self._ready
@ -157,6 +161,7 @@ class Server(pyngus.ReceiverEventHandler):
self._incoming = incoming
self._addresses = addresses
self._capacity = 500 # credit per link
self._receivers = None
def attach(self, connection):
"""Create receiver links over the given connection for all the
@ -180,6 +185,9 @@ class Server(pyngus.ReceiverEventHandler):
r.open()
self._receivers.append(r)
def destroy(self):
self._receivers = None
# Pyngus ReceiverLink event callbacks:
def receiver_remote_closed(self, receiver, pn_condition):
@ -310,6 +318,15 @@ class Controller(pyngus.ConnectionEventHandler):
LOG.debug("Waiting for eventloop to exit")
self.processor.shutdown(wait, timeout)
self.processor = None
self._tasks = None
self._senders = None
for server in self._servers.values():
server.destroy()
self._servers.clear()
self._socket_connection = None
if self._replies:
self._replies.destroy()
self._replies = None
LOG.debug("Eventloop exited, driver shut down")
# The remaining methods are reserved to run from the eventloop thread only!

View File

@ -396,6 +396,7 @@ mech_list: ${mechs}
super(TestCyrusAuthentication, self).tearDown()
if self._broker:
self._broker.stop()
self._broker = None
if self._conf_dir:
shutil.rmtree(self._conf_dir, ignore_errors=True)
@ -546,12 +547,20 @@ class FakeBroker(threading.Thread):
self.connection.pn_sasl.server()
self.connection.open()
self.sender_links = set()
self.receiver_links = set()
self.closed = False
def destroy(self):
"""Destroy the test connection."""
while self.sender_links:
link = self.sender_links.pop()
# destroy modifies the set, so make a copy
tmp = self.sender_links.copy()
while tmp:
link = tmp.pop()
link.destroy()
# destroy modifies the set, so make a copy
tmp = self.receiver_links.copy()
while tmp:
link = tmp.pop()
link.destroy()
self.connection.destroy()
self.connection = None
@ -622,16 +631,21 @@ class FakeBroker(threading.Thread):
"""An AMQP sending link."""
def __init__(self, server, conn, handle, src_addr=None):
self.server = server
self.conn = conn
cnn = conn.connection
self.link = cnn.accept_sender(handle,
source_override=src_addr,
event_handler=self)
conn.sender_links.add(self)
self.link.open()
self.routed = False
def destroy(self):
"""Destroy the link."""
self._cleanup()
conn = self.conn
self.conn = None
conn.sender_links.remove(self)
if self.link:
self.link.destroy()
self.link = None
@ -663,21 +677,31 @@ class FakeBroker(threading.Thread):
"""An AMQP Receiving link."""
def __init__(self, server, conn, handle, addr=None):
self.server = server
self.conn = conn
cnn = conn.connection
self.link = cnn.accept_receiver(handle,
target_override=addr,
event_handler=self)
conn.receiver_links.add(self)
self.link.open()
self.link.add_capacity(10)
def destroy(self):
"""Destroy the link."""
conn = self.conn
self.conn = None
conn.receiver_links.remove(self)
if self.link:
self.link.destroy()
self.link = None
# ReceiverEventHandler callbacks:
def receiver_remote_closed(self, receiver_link, error):
self.link.close()
def receiver_closed(self, receiver_link):
self.link.destroy()
self.link = None
self.destroy()
def message_received(self, receiver_link, message, handle):
"""Forward this message out the proper sending link."""
@ -802,8 +826,11 @@ class FakeBroker(threading.Thread):
# Shutting down
self._my_socket.close()
for conn in self._connections.itervalues():
for conn in self._connections.values():
conn.destroy()
self._connections = None
self.container.destroy()
self.container = None
return 0
def add_route(self, address, link):