Clear out RPC connection pool before exit.

Fixes bug 767984.

This patch ensures that pooled connections to a messaging system get
cleaned up before a process that has used the RPC API exits.

Change-Id: I56eca54334075378534a7a5d3434c420319672b4
This commit is contained in:
Russell Bryant 2012-01-30 18:29:04 -05:00
parent 59c0a723cc
commit bd32abf9bc
9 changed files with 43 additions and 0 deletions

View File

@ -125,5 +125,8 @@ def main():
print init_leases(network_id) print init_leases(network_id)
rpc.cleanup()
if __name__ == "__main__": if __name__ == "__main__":
main() main()

View File

@ -2354,6 +2354,7 @@ def main():
# call the action with the remaining arguments # call the action with the remaining arguments
try: try:
fn(*fn_args, **fn_kwargs) fn(*fn_args, **fn_kwargs)
rpc.cleanup()
sys.exit(0) sys.exit(0)
except TypeError: except TypeError:
print _("Possible wrong number of arguments supplied") print _("Possible wrong number of arguments supplied")

View File

@ -139,6 +139,19 @@ def notify(context, topic, msg):
return _get_impl().notify(context, topic, msg) return _get_impl().notify(context, topic, msg)
def cleanup():
"""Clean up resoruces in use by implementation.
Clean up any resources that have been allocated by the RPC implementation.
This is typically open connections to a messaging service. This function
would get called before an application using this API exits to allow
connections to get torn down cleanly.
:returns: None
"""
return _get_impl().cleanup()
_RPCIMPL = None _RPCIMPL = None

View File

@ -127,6 +127,11 @@ class ConnectionContext(rpc_common.Connection):
else: else:
raise exception.InvalidRPCConnectionReuse() raise exception.InvalidRPCConnectionReuse()
@classmethod
def empty_pool(cls):
while cls._connection_pool.free_items:
cls._connection_pool.get().close()
def msg_reply(msg_id, reply=None, failure=None, ending=False): def msg_reply(msg_id, reply=None, failure=None, ending=False):
"""Sends a reply or an error on the channel signified by msg_id. """Sends a reply or an error on the channel signified by msg_id.
@ -353,3 +358,7 @@ def notify(context, topic, msg):
pack_context(msg, context) pack_context(msg, context)
with ConnectionContext() as conn: with ConnectionContext() as conn:
conn.notify_send(topic, msg) conn.notify_send(topic, msg)
def cleanup():
ConnectionContext.empty_pool()

View File

@ -635,6 +635,10 @@ def notify(context, topic, msg):
publisher.close() publisher.close()
def cleanup():
pass
def generic_response(message_data, message): def generic_response(message_data, message):
"""Logs a result and exits.""" """Logs a result and exits."""
LOG.debug(_('response %s'), message_data) LOG.debug(_('response %s'), message_data)

View File

@ -136,6 +136,10 @@ def notify(context, topic, msg):
pass pass
def cleanup():
pass
def fanout_cast(context, topic, msg): def fanout_cast(context, topic, msg):
"""Cast to all consumers of a topic""" """Cast to all consumers of a topic"""
method = msg.get('method') method = msg.get('method')

View File

@ -618,3 +618,7 @@ def fanout_cast(context, topic, msg):
def notify(context, topic, msg): def notify(context, topic, msg):
"""Sends a notification event on a topic.""" """Sends a notification event on a topic."""
return rpc_amqp.notify(context, topic, msg) return rpc_amqp.notify(context, topic, msg)
def cleanup():
return rpc_amqp.cleanup()

View File

@ -506,3 +506,7 @@ def fanout_cast(context, topic, msg):
def notify(context, topic, msg): def notify(context, topic, msg):
"""Sends a notification event on a topic.""" """Sends a notification event on a topic."""
return rpc_amqp.notify(context, topic, msg) return rpc_amqp.notify(context, topic, msg)
def cleanup():
return rpc_amqp.cleanup()

View File

@ -414,3 +414,4 @@ def wait():
_launcher.wait() _launcher.wait()
except KeyboardInterrupt: except KeyboardInterrupt:
_launcher.stop() _launcher.stop()
rpc.cleanup()