Cleanup listener after stopping rpc server

If you don't close the AMQP connection, then the connection
remains open and the next time when the messages are sent on
the listening topic, then some of the messages will not be processed
as there is no dispatcher running to process the message.

Closes-Bug: #1335086
Change-Id: I1f39eedf1500b6b6209ae0222f32e08e304895e0
This commit is contained in:
Abhijeet Malawade 2014-06-27 07:29:57 -07:00
parent 2b6e24f81b
commit f37800943e
4 changed files with 36 additions and 0 deletions

View File

@ -127,6 +127,10 @@ class AMQPListener(base.Listener):
else:
self.conn.consume(limit=1)
def cleanup(self):
# Closes listener connection
self.conn.close()
class ReplyWaiters(object):

View File

@ -59,6 +59,15 @@ class Listener(object):
ending.
"""
def cleanup(self):
"""Cleanup listener.
Close connection used by listener if any. For some listeners like
zmq there is no connection so no need to close connection.
As this is listener specific method, overwrite it in to derived class
if cleanup of listener required.
"""
pass
@six.add_metaclass(abc.ABCMeta)
class BaseDriver(object):

View File

@ -140,4 +140,7 @@ class MessageHandlingServer(object):
"""
if self._executor is not None:
self._executor.wait()
# Close listener connection after processing all messages
self._executor.listener.cleanup()
self._executor = None

View File

@ -15,6 +15,7 @@
import threading
import mock
import testscenarios
from oslo.config import cfg
@ -110,6 +111,25 @@ class TestRPCServer(test_utils.BaseTestCase, ServerSetupMixin):
self.assertIs(server.dispatcher.serializer, serializer)
self.assertEqual('blocking', server.executor)
def test_server_wait_method(self):
transport = messaging.get_transport(self.conf, url='fake:')
target = messaging.Target(topic='foo', server='bar')
endpoints = [object()]
serializer = object()
server = messaging.get_rpc_server(transport, target, endpoints,
serializer=serializer)
# Mocking executor
server._executor = mock.Mock()
# Here assigning executor's listener object to listener variable
# before calling wait method, beacuse in wait method we are
# setting executor to None.
listener = server._executor.listener
# call server wait method
server.wait()
self.assertIsNone(server._executor)
self.assertEqual(1, listener.cleanup.call_count)
def test_no_target_server(self):
transport = messaging.get_transport(self.conf, url='fake:')