From 271711bf680eab3ee5a862dfb5c937c475f8ba2b Mon Sep 17 00:00:00 2001 From: Russell Bryant Date: Tue, 7 Feb 2012 22:31:27 -0500 Subject: [PATCH] Move connection pool back into impl_kombu/qpid. Fix bug 928996. This patch moves the creation of the connection pool from nova.rpc.amqp back into nova.rpc.impl_kombu and nova.rpc.impl_qpid. The pool now gets passed into nova.rpc.amqp using arguments as needed. The previous method worked fine unless both rpc implementations got loaded into the same Python instance. In that case, whichever one got loaded 2nd had control over what type of connections nova.rpc.amqp would create. With these changes in place, this conflict between impl_kombu and impl_qpid is resolved. Change-Id: I72bc0c95bfc04ccdfb89d3456332f622ca5ffa42 --- nova/rpc/amqp.py | 76 ++++++++++++++++++------------------- nova/rpc/impl_kombu.py | 21 +++++----- nova/rpc/impl_qpid.py | 20 +++++----- nova/tests/rpc/test_qpid.py | 9 ++--- 4 files changed, 63 insertions(+), 63 deletions(-) diff --git a/nova/rpc/amqp.py b/nova/rpc/amqp.py index 0995d9ab..01e12776 100644 --- a/nova/rpc/amqp.py +++ b/nova/rpc/amqp.py @@ -44,18 +44,22 @@ from nova.rpc.common import LOG FLAGS = flags.FLAGS -ConnectionClass = None - - class Pool(pools.Pool): """Class that implements a Pool of Connections.""" def __init__(self, *args, **kwargs): + self.connection_cls = kwargs.pop("connection_cls", None) + kwargs.setdefault("max_size", FLAGS.rpc_conn_pool_size) + kwargs.setdefault("order_as_stack", True) super(Pool, self).__init__(*args, **kwargs) # TODO(comstud): Timeout connections not used in a while def create(self): LOG.debug('Pool creating new connection') - return ConnectionClass() + return self.connection_cls() + + def empty(self): + while self.free_items: + self.get().close() class ConnectionContext(rpc_common.Connection): @@ -69,16 +73,14 @@ class ConnectionContext(rpc_common.Connection): the pool. """ - _connection_pool = Pool(max_size=FLAGS.rpc_conn_pool_size, - order_as_stack=True) - - def __init__(self, pooled=True): + def __init__(self, connection_pool, pooled=True): """Create a new connection, or get one from the pool""" self.connection = None + self.connection_pool = connection_pool if pooled: - self.connection = self._connection_pool.get() + self.connection = connection_pool.get() else: - self.connection = ConnectionClass() + self.connection = connection_pool.connection_cls() self.pooled = pooled def __enter__(self): @@ -94,7 +96,7 @@ class ConnectionContext(rpc_common.Connection): # Reset the connection so it's ready for the next caller # to grab from the pool self.connection.reset() - self._connection_pool.put(self.connection) + self.connection_pool.put(self.connection) else: try: self.connection.close() @@ -127,19 +129,14 @@ class ConnectionContext(rpc_common.Connection): else: raise exception.InvalidRPCConnectionReuse() - @classmethod - def empty_pool(cls): - while cls._connection_pool.free_items: - cls._connection_pool.get().close() - -def msg_reply(msg_id, reply=None, failure=None, ending=False): +def msg_reply(msg_id, connection_pool, reply=None, failure=None, ending=False): """Sends a reply or an error on the channel signified by msg_id. Failure should be a sys.exc_info() tuple. """ - with ConnectionContext() as conn: + with ConnectionContext(connection_pool) as conn: if failure: message = str(failure[1]) tb = traceback.format_exception(*failure) @@ -161,18 +158,19 @@ def msg_reply(msg_id, reply=None, failure=None, ending=False): class RpcContext(context.RequestContext): """Context that supports replying to a rpc.call""" def __init__(self, *args, **kwargs): - msg_id = kwargs.pop('msg_id', None) - self.msg_id = msg_id + self.msg_id = kwargs.pop('msg_id', None) + self.connection_pool = kwargs.pop('connection_pool', None) super(RpcContext, self).__init__(*args, **kwargs) def reply(self, reply=None, failure=None, ending=False): if self.msg_id: - msg_reply(self.msg_id, reply, failure, ending) + msg_reply(self.msg_id, self.connection_pool, reply, failure, + ending) if ending: self.msg_id = None -def unpack_context(msg): +def unpack_context(msg, connection_pool): """Unpack context from msg.""" context_dict = {} for key in list(msg.keys()): @@ -183,6 +181,7 @@ def unpack_context(msg): value = msg.pop(key) context_dict[key[9:]] = value context_dict['msg_id'] = msg.pop('_msg_id', None) + context_dict['connection_pool'] = connection_pool ctx = RpcContext.from_dict(context_dict) LOG.debug(_('unpacked context: %s'), ctx.to_dict()) return ctx @@ -205,9 +204,10 @@ def pack_context(msg, context): class ProxyCallback(object): """Calls methods on a proxy object based on method and args.""" - def __init__(self, proxy): + def __init__(self, proxy, connection_pool): self.proxy = proxy self.pool = greenpool.GreenPool(FLAGS.rpc_thread_pool_size) + self.connection_pool = connection_pool def __call__(self, message_data): """Consumer callback to call a method on a proxy object. @@ -227,7 +227,7 @@ class ProxyCallback(object): if hasattr(local.store, 'context'): del local.store.context rpc_common._safe_log(LOG.debug, _('received %s'), message_data) - ctxt = unpack_context(message_data) + ctxt = unpack_context(message_data, self.connection_pool) method = message_data.get('method') args = message_data.get('args', {}) if not method: @@ -303,12 +303,12 @@ class MulticallWaiter(object): yield result -def create_connection(new=True): +def create_connection(new, connection_pool): """Create a connection""" - return ConnectionContext(pooled=not new) + return ConnectionContext(connection_pool, pooled=not new) -def multicall(context, topic, msg, timeout): +def multicall(context, topic, msg, timeout, connection_pool): """Make a call that returns multiple times.""" # Can't use 'with' for multicall, as it returns an iterator # that will continue to use the connection. When it's done, @@ -320,16 +320,16 @@ def multicall(context, topic, msg, timeout): LOG.debug(_('MSG_ID is %s') % (msg_id)) pack_context(msg, context) - conn = ConnectionContext() + conn = ConnectionContext(connection_pool) wait_msg = MulticallWaiter(conn, timeout) conn.declare_direct_consumer(msg_id, wait_msg) conn.topic_send(topic, msg) return wait_msg -def call(context, topic, msg, timeout): +def call(context, topic, msg, timeout, connection_pool): """Sends a message on a topic and wait for a response.""" - rv = multicall(context, topic, msg, timeout) + rv = multicall(context, topic, msg, timeout, connection_pool) # NOTE(vish): return the last result from the multicall rv = list(rv) if not rv: @@ -337,29 +337,29 @@ def call(context, topic, msg, timeout): return rv[-1] -def cast(context, topic, msg): +def cast(context, topic, msg, connection_pool): """Sends a message on a topic without waiting for a response.""" LOG.debug(_('Making asynchronous cast on %s...'), topic) pack_context(msg, context) - with ConnectionContext() as conn: + with ConnectionContext(connection_pool) as conn: conn.topic_send(topic, msg) -def fanout_cast(context, topic, msg): +def fanout_cast(context, topic, msg, connection_pool): """Sends a message on a fanout exchange without waiting for a response.""" LOG.debug(_('Making asynchronous fanout cast...')) pack_context(msg, context) - with ConnectionContext() as conn: + with ConnectionContext(connection_pool) as conn: conn.fanout_send(topic, msg) -def notify(context, topic, msg): +def notify(context, topic, msg, connection_pool): """Sends a notification event on a topic.""" LOG.debug(_('Sending notification on %s...'), topic) pack_context(msg, context) - with ConnectionContext() as conn: + with ConnectionContext(connection_pool) as conn: conn.notify_send(topic, msg) -def cleanup(): - ConnectionContext.empty_pool() +def cleanup(connection_pool): + connection_pool.empty() diff --git a/nova/rpc/impl_kombu.py b/nova/rpc/impl_kombu.py index 50459e5a..a90d06a7 100644 --- a/nova/rpc/impl_kombu.py +++ b/nova/rpc/impl_kombu.py @@ -588,43 +588,44 @@ class Connection(object): """Create a consumer that calls a method in a proxy object""" if fanout: self.declare_fanout_consumer(topic, - rpc_amqp.ProxyCallback(proxy)) + rpc_amqp.ProxyCallback(proxy, Connection.pool)) else: - self.declare_topic_consumer(topic, rpc_amqp.ProxyCallback(proxy)) + self.declare_topic_consumer(topic, + rpc_amqp.ProxyCallback(proxy, Connection.pool)) -rpc_amqp.ConnectionClass = Connection +Connection.pool = rpc_amqp.Pool(connection_cls=Connection) def create_connection(new=True): """Create a connection""" - return rpc_amqp.create_connection(new) + return rpc_amqp.create_connection(new, Connection.pool) def multicall(context, topic, msg, timeout=None): """Make a call that returns multiple times.""" - return rpc_amqp.multicall(context, topic, msg, timeout) + return rpc_amqp.multicall(context, topic, msg, timeout, Connection.pool) def call(context, topic, msg, timeout=None): """Sends a message on a topic and wait for a response.""" - return rpc_amqp.call(context, topic, msg, timeout) + return rpc_amqp.call(context, topic, msg, timeout, Connection.pool) def cast(context, topic, msg): """Sends a message on a topic without waiting for a response.""" - return rpc_amqp.cast(context, topic, msg) + return rpc_amqp.cast(context, topic, msg, Connection.pool) def fanout_cast(context, topic, msg): """Sends a message on a fanout exchange without waiting for a response.""" - return rpc_amqp.fanout_cast(context, topic, msg) + return rpc_amqp.fanout_cast(context, topic, msg, Connection.pool) def notify(context, topic, msg): """Sends a notification event on a topic.""" - return rpc_amqp.notify(context, topic, msg) + return rpc_amqp.notify(context, topic, msg, Connection.pool) def cleanup(): - return rpc_amqp.cleanup() + return rpc_amqp.cleanup(Connection.pool) diff --git a/nova/rpc/impl_qpid.py b/nova/rpc/impl_qpid.py index 1ed20ba8..e0168737 100644 --- a/nova/rpc/impl_qpid.py +++ b/nova/rpc/impl_qpid.py @@ -474,46 +474,46 @@ class Connection(object): """Create a consumer that calls a method in a proxy object""" if fanout: consumer = FanoutConsumer(self.session, topic, - rpc_amqp.ProxyCallback(proxy)) + rpc_amqp.ProxyCallback(proxy, Connection.pool)) else: consumer = TopicConsumer(self.session, topic, - rpc_amqp.ProxyCallback(proxy)) + rpc_amqp.ProxyCallback(proxy, Connection.pool)) self._register_consumer(consumer) return consumer -rpc_amqp.ConnectionClass = Connection +Connection.pool = rpc_amqp.Pool(connection_cls=Connection) def create_connection(new=True): """Create a connection""" - return rpc_amqp.create_connection(new) + return rpc_amqp.create_connection(new, Connection.pool) def multicall(context, topic, msg, timeout=None): """Make a call that returns multiple times.""" - return rpc_amqp.multicall(context, topic, msg, timeout) + return rpc_amqp.multicall(context, topic, msg, timeout, Connection.pool) def call(context, topic, msg, timeout=None): """Sends a message on a topic and wait for a response.""" - return rpc_amqp.call(context, topic, msg, timeout) + return rpc_amqp.call(context, topic, msg, timeout, Connection.pool) def cast(context, topic, msg): """Sends a message on a topic without waiting for a response.""" - return rpc_amqp.cast(context, topic, msg) + return rpc_amqp.cast(context, topic, msg, Connection.pool) def fanout_cast(context, topic, msg): """Sends a message on a fanout exchange without waiting for a response.""" - return rpc_amqp.fanout_cast(context, topic, msg) + return rpc_amqp.fanout_cast(context, topic, msg, Connection.pool) def notify(context, topic, msg): """Sends a notification event on a topic.""" - return rpc_amqp.notify(context, topic, msg) + return rpc_amqp.notify(context, topic, msg, Connection.pool) def cleanup(): - return rpc_amqp.cleanup() + return rpc_amqp.cleanup(Connection.pool) diff --git a/nova/tests/rpc/test_qpid.py b/nova/tests/rpc/test_qpid.py index 9e318fbf..f96b714b 100644 --- a/nova/tests/rpc/test_qpid.py +++ b/nova/tests/rpc/test_qpid.py @@ -25,7 +25,6 @@ import mox from nova import context from nova import log as logging from nova import test -import nova.rpc.amqp as rpc_amqp try: import qpid @@ -186,10 +185,10 @@ class RpcQpidTestCase(test.TestCase): self.mocker.VerifyAll() finally: - while rpc_amqp.ConnectionContext._connection_pool.free_items: + while impl_qpid.Connection.pool.free_items: # Pull the mock connection object out of the connection pool so # that it doesn't mess up other test cases. - rpc_amqp.ConnectionContext._connection_pool.get() + impl_qpid.Connection.pool.get() @test.skip_if(qpid is None, "Test requires qpid") def test_cast(self): @@ -265,10 +264,10 @@ class RpcQpidTestCase(test.TestCase): self.mocker.VerifyAll() finally: - while rpc_amqp.ConnectionContext._connection_pool.free_items: + while impl_qpid.Connection.pool.free_items: # Pull the mock connection object out of the connection pool so # that it doesn't mess up other test cases. - rpc_amqp.ConnectionContext._connection_pool.get() + impl_qpid.Connection.pool.get() @test.skip_if(qpid is None, "Test requires qpid") def test_call(self):