Merge "Move connection pool back into impl_kombu/qpid."

This commit is contained in:
Jenkins
2012-02-15 21:41:35 +00:00
committed by Gerrit Code Review
4 changed files with 63 additions and 63 deletions

View File

@@ -44,18 +44,22 @@ from nova.rpc.common import LOG
FLAGS = flags.FLAGS FLAGS = flags.FLAGS
ConnectionClass = None
class Pool(pools.Pool): class Pool(pools.Pool):
"""Class that implements a Pool of Connections.""" """Class that implements a Pool of Connections."""
def __init__(self, *args, **kwargs): def __init__(self, *args, **kwargs):
self.connection_cls = kwargs.pop("connection_cls", None)
kwargs.setdefault("max_size", FLAGS.rpc_conn_pool_size)
kwargs.setdefault("order_as_stack", True)
super(Pool, self).__init__(*args, **kwargs) super(Pool, self).__init__(*args, **kwargs)
# TODO(comstud): Timeout connections not used in a while # TODO(comstud): Timeout connections not used in a while
def create(self): def create(self):
LOG.debug('Pool creating new connection') LOG.debug('Pool creating new connection')
return ConnectionClass() return self.connection_cls()
def empty(self):
while self.free_items:
self.get().close()
class ConnectionContext(rpc_common.Connection): class ConnectionContext(rpc_common.Connection):
@@ -69,16 +73,14 @@ class ConnectionContext(rpc_common.Connection):
the pool. the pool.
""" """
_connection_pool = Pool(max_size=FLAGS.rpc_conn_pool_size, def __init__(self, connection_pool, pooled=True):
order_as_stack=True)
def __init__(self, pooled=True):
"""Create a new connection, or get one from the pool""" """Create a new connection, or get one from the pool"""
self.connection = None self.connection = None
self.connection_pool = connection_pool
if pooled: if pooled:
self.connection = self._connection_pool.get() self.connection = connection_pool.get()
else: else:
self.connection = ConnectionClass() self.connection = connection_pool.connection_cls()
self.pooled = pooled self.pooled = pooled
def __enter__(self): def __enter__(self):
@@ -94,7 +96,7 @@ class ConnectionContext(rpc_common.Connection):
# Reset the connection so it's ready for the next caller # Reset the connection so it's ready for the next caller
# to grab from the pool # to grab from the pool
self.connection.reset() self.connection.reset()
self._connection_pool.put(self.connection) self.connection_pool.put(self.connection)
else: else:
try: try:
self.connection.close() self.connection.close()
@@ -127,19 +129,14 @@ class ConnectionContext(rpc_common.Connection):
else: else:
raise exception.InvalidRPCConnectionReuse() raise exception.InvalidRPCConnectionReuse()
@classmethod
def empty_pool(cls):
while cls._connection_pool.free_items:
cls._connection_pool.get().close()
def msg_reply(msg_id, connection_pool, reply=None, failure=None, ending=False):
def msg_reply(msg_id, reply=None, failure=None, ending=False):
"""Sends a reply or an error on the channel signified by msg_id. """Sends a reply or an error on the channel signified by msg_id.
Failure should be a sys.exc_info() tuple. Failure should be a sys.exc_info() tuple.
""" """
with ConnectionContext() as conn: with ConnectionContext(connection_pool) as conn:
if failure: if failure:
message = str(failure[1]) message = str(failure[1])
tb = traceback.format_exception(*failure) tb = traceback.format_exception(*failure)
@@ -161,18 +158,19 @@ def msg_reply(msg_id, reply=None, failure=None, ending=False):
class RpcContext(context.RequestContext): class RpcContext(context.RequestContext):
"""Context that supports replying to a rpc.call""" """Context that supports replying to a rpc.call"""
def __init__(self, *args, **kwargs): def __init__(self, *args, **kwargs):
msg_id = kwargs.pop('msg_id', None) self.msg_id = kwargs.pop('msg_id', None)
self.msg_id = msg_id self.connection_pool = kwargs.pop('connection_pool', None)
super(RpcContext, self).__init__(*args, **kwargs) super(RpcContext, self).__init__(*args, **kwargs)
def reply(self, reply=None, failure=None, ending=False): def reply(self, reply=None, failure=None, ending=False):
if self.msg_id: if self.msg_id:
msg_reply(self.msg_id, reply, failure, ending) msg_reply(self.msg_id, self.connection_pool, reply, failure,
ending)
if ending: if ending:
self.msg_id = None self.msg_id = None
def unpack_context(msg): def unpack_context(msg, connection_pool):
"""Unpack context from msg.""" """Unpack context from msg."""
context_dict = {} context_dict = {}
for key in list(msg.keys()): for key in list(msg.keys()):
@@ -183,6 +181,7 @@ def unpack_context(msg):
value = msg.pop(key) value = msg.pop(key)
context_dict[key[9:]] = value context_dict[key[9:]] = value
context_dict['msg_id'] = msg.pop('_msg_id', None) context_dict['msg_id'] = msg.pop('_msg_id', None)
context_dict['connection_pool'] = connection_pool
ctx = RpcContext.from_dict(context_dict) ctx = RpcContext.from_dict(context_dict)
LOG.debug(_('unpacked context: %s'), ctx.to_dict()) LOG.debug(_('unpacked context: %s'), ctx.to_dict())
return ctx return ctx
@@ -205,9 +204,10 @@ def pack_context(msg, context):
class ProxyCallback(object): class ProxyCallback(object):
"""Calls methods on a proxy object based on method and args.""" """Calls methods on a proxy object based on method and args."""
def __init__(self, proxy): def __init__(self, proxy, connection_pool):
self.proxy = proxy self.proxy = proxy
self.pool = greenpool.GreenPool(FLAGS.rpc_thread_pool_size) self.pool = greenpool.GreenPool(FLAGS.rpc_thread_pool_size)
self.connection_pool = connection_pool
def __call__(self, message_data): def __call__(self, message_data):
"""Consumer callback to call a method on a proxy object. """Consumer callback to call a method on a proxy object.
@@ -227,7 +227,7 @@ class ProxyCallback(object):
if hasattr(local.store, 'context'): if hasattr(local.store, 'context'):
del local.store.context del local.store.context
rpc_common._safe_log(LOG.debug, _('received %s'), message_data) rpc_common._safe_log(LOG.debug, _('received %s'), message_data)
ctxt = unpack_context(message_data) ctxt = unpack_context(message_data, self.connection_pool)
method = message_data.get('method') method = message_data.get('method')
args = message_data.get('args', {}) args = message_data.get('args', {})
if not method: if not method:
@@ -303,12 +303,12 @@ class MulticallWaiter(object):
yield result yield result
def create_connection(new=True): def create_connection(new, connection_pool):
"""Create a connection""" """Create a connection"""
return ConnectionContext(pooled=not new) return ConnectionContext(connection_pool, pooled=not new)
def multicall(context, topic, msg, timeout): def multicall(context, topic, msg, timeout, connection_pool):
"""Make a call that returns multiple times.""" """Make a call that returns multiple times."""
# Can't use 'with' for multicall, as it returns an iterator # Can't use 'with' for multicall, as it returns an iterator
# that will continue to use the connection. When it's done, # that will continue to use the connection. When it's done,
@@ -320,16 +320,16 @@ def multicall(context, topic, msg, timeout):
LOG.debug(_('MSG_ID is %s') % (msg_id)) LOG.debug(_('MSG_ID is %s') % (msg_id))
pack_context(msg, context) pack_context(msg, context)
conn = ConnectionContext() conn = ConnectionContext(connection_pool)
wait_msg = MulticallWaiter(conn, timeout) wait_msg = MulticallWaiter(conn, timeout)
conn.declare_direct_consumer(msg_id, wait_msg) conn.declare_direct_consumer(msg_id, wait_msg)
conn.topic_send(topic, msg) conn.topic_send(topic, msg)
return wait_msg return wait_msg
def call(context, topic, msg, timeout): def call(context, topic, msg, timeout, connection_pool):
"""Sends a message on a topic and wait for a response.""" """Sends a message on a topic and wait for a response."""
rv = multicall(context, topic, msg, timeout) rv = multicall(context, topic, msg, timeout, connection_pool)
# NOTE(vish): return the last result from the multicall # NOTE(vish): return the last result from the multicall
rv = list(rv) rv = list(rv)
if not rv: if not rv:
@@ -337,29 +337,29 @@ def call(context, topic, msg, timeout):
return rv[-1] return rv[-1]
def cast(context, topic, msg): def cast(context, topic, msg, connection_pool):
"""Sends a message on a topic without waiting for a response.""" """Sends a message on a topic without waiting for a response."""
LOG.debug(_('Making asynchronous cast on %s...'), topic) LOG.debug(_('Making asynchronous cast on %s...'), topic)
pack_context(msg, context) pack_context(msg, context)
with ConnectionContext() as conn: with ConnectionContext(connection_pool) as conn:
conn.topic_send(topic, msg) conn.topic_send(topic, msg)
def fanout_cast(context, topic, msg): def fanout_cast(context, topic, msg, connection_pool):
"""Sends a message on a fanout exchange without waiting for a response.""" """Sends a message on a fanout exchange without waiting for a response."""
LOG.debug(_('Making asynchronous fanout cast...')) LOG.debug(_('Making asynchronous fanout cast...'))
pack_context(msg, context) pack_context(msg, context)
with ConnectionContext() as conn: with ConnectionContext(connection_pool) as conn:
conn.fanout_send(topic, msg) conn.fanout_send(topic, msg)
def notify(context, topic, msg): def notify(context, topic, msg, connection_pool):
"""Sends a notification event on a topic.""" """Sends a notification event on a topic."""
LOG.debug(_('Sending notification on %s...'), topic) LOG.debug(_('Sending notification on %s...'), topic)
pack_context(msg, context) pack_context(msg, context)
with ConnectionContext() as conn: with ConnectionContext(connection_pool) as conn:
conn.notify_send(topic, msg) conn.notify_send(topic, msg)
def cleanup(): def cleanup(connection_pool):
ConnectionContext.empty_pool() connection_pool.empty()

View File

@@ -588,43 +588,44 @@ class Connection(object):
"""Create a consumer that calls a method in a proxy object""" """Create a consumer that calls a method in a proxy object"""
if fanout: if fanout:
self.declare_fanout_consumer(topic, self.declare_fanout_consumer(topic,
rpc_amqp.ProxyCallback(proxy)) rpc_amqp.ProxyCallback(proxy, Connection.pool))
else: else:
self.declare_topic_consumer(topic, rpc_amqp.ProxyCallback(proxy)) self.declare_topic_consumer(topic,
rpc_amqp.ProxyCallback(proxy, Connection.pool))
rpc_amqp.ConnectionClass = Connection Connection.pool = rpc_amqp.Pool(connection_cls=Connection)
def create_connection(new=True): def create_connection(new=True):
"""Create a connection""" """Create a connection"""
return rpc_amqp.create_connection(new) return rpc_amqp.create_connection(new, Connection.pool)
def multicall(context, topic, msg, timeout=None): def multicall(context, topic, msg, timeout=None):
"""Make a call that returns multiple times.""" """Make a call that returns multiple times."""
return rpc_amqp.multicall(context, topic, msg, timeout) return rpc_amqp.multicall(context, topic, msg, timeout, Connection.pool)
def call(context, topic, msg, timeout=None): def call(context, topic, msg, timeout=None):
"""Sends a message on a topic and wait for a response.""" """Sends a message on a topic and wait for a response."""
return rpc_amqp.call(context, topic, msg, timeout) return rpc_amqp.call(context, topic, msg, timeout, Connection.pool)
def cast(context, topic, msg): def cast(context, topic, msg):
"""Sends a message on a topic without waiting for a response.""" """Sends a message on a topic without waiting for a response."""
return rpc_amqp.cast(context, topic, msg) return rpc_amqp.cast(context, topic, msg, Connection.pool)
def fanout_cast(context, topic, msg): def fanout_cast(context, topic, msg):
"""Sends a message on a fanout exchange without waiting for a response.""" """Sends a message on a fanout exchange without waiting for a response."""
return rpc_amqp.fanout_cast(context, topic, msg) return rpc_amqp.fanout_cast(context, topic, msg, Connection.pool)
def notify(context, topic, msg): def notify(context, topic, msg):
"""Sends a notification event on a topic.""" """Sends a notification event on a topic."""
return rpc_amqp.notify(context, topic, msg) return rpc_amqp.notify(context, topic, msg, Connection.pool)
def cleanup(): def cleanup():
return rpc_amqp.cleanup() return rpc_amqp.cleanup(Connection.pool)

View File

@@ -474,46 +474,46 @@ class Connection(object):
"""Create a consumer that calls a method in a proxy object""" """Create a consumer that calls a method in a proxy object"""
if fanout: if fanout:
consumer = FanoutConsumer(self.session, topic, consumer = FanoutConsumer(self.session, topic,
rpc_amqp.ProxyCallback(proxy)) rpc_amqp.ProxyCallback(proxy, Connection.pool))
else: else:
consumer = TopicConsumer(self.session, topic, consumer = TopicConsumer(self.session, topic,
rpc_amqp.ProxyCallback(proxy)) rpc_amqp.ProxyCallback(proxy, Connection.pool))
self._register_consumer(consumer) self._register_consumer(consumer)
return consumer return consumer
rpc_amqp.ConnectionClass = Connection Connection.pool = rpc_amqp.Pool(connection_cls=Connection)
def create_connection(new=True): def create_connection(new=True):
"""Create a connection""" """Create a connection"""
return rpc_amqp.create_connection(new) return rpc_amqp.create_connection(new, Connection.pool)
def multicall(context, topic, msg, timeout=None): def multicall(context, topic, msg, timeout=None):
"""Make a call that returns multiple times.""" """Make a call that returns multiple times."""
return rpc_amqp.multicall(context, topic, msg, timeout) return rpc_amqp.multicall(context, topic, msg, timeout, Connection.pool)
def call(context, topic, msg, timeout=None): def call(context, topic, msg, timeout=None):
"""Sends a message on a topic and wait for a response.""" """Sends a message on a topic and wait for a response."""
return rpc_amqp.call(context, topic, msg, timeout) return rpc_amqp.call(context, topic, msg, timeout, Connection.pool)
def cast(context, topic, msg): def cast(context, topic, msg):
"""Sends a message on a topic without waiting for a response.""" """Sends a message on a topic without waiting for a response."""
return rpc_amqp.cast(context, topic, msg) return rpc_amqp.cast(context, topic, msg, Connection.pool)
def fanout_cast(context, topic, msg): def fanout_cast(context, topic, msg):
"""Sends a message on a fanout exchange without waiting for a response.""" """Sends a message on a fanout exchange without waiting for a response."""
return rpc_amqp.fanout_cast(context, topic, msg) return rpc_amqp.fanout_cast(context, topic, msg, Connection.pool)
def notify(context, topic, msg): def notify(context, topic, msg):
"""Sends a notification event on a topic.""" """Sends a notification event on a topic."""
return rpc_amqp.notify(context, topic, msg) return rpc_amqp.notify(context, topic, msg, Connection.pool)
def cleanup(): def cleanup():
return rpc_amqp.cleanup() return rpc_amqp.cleanup(Connection.pool)

View File

@@ -25,7 +25,6 @@ import mox
from nova import context from nova import context
from nova import log as logging from nova import log as logging
from nova import test from nova import test
import nova.rpc.amqp as rpc_amqp
try: try:
import qpid import qpid
@@ -186,10 +185,10 @@ class RpcQpidTestCase(test.TestCase):
self.mocker.VerifyAll() self.mocker.VerifyAll()
finally: finally:
while rpc_amqp.ConnectionContext._connection_pool.free_items: while impl_qpid.Connection.pool.free_items:
# Pull the mock connection object out of the connection pool so # Pull the mock connection object out of the connection pool so
# that it doesn't mess up other test cases. # that it doesn't mess up other test cases.
rpc_amqp.ConnectionContext._connection_pool.get() impl_qpid.Connection.pool.get()
@test.skip_if(qpid is None, "Test requires qpid") @test.skip_if(qpid is None, "Test requires qpid")
def test_cast(self): def test_cast(self):
@@ -265,10 +264,10 @@ class RpcQpidTestCase(test.TestCase):
self.mocker.VerifyAll() self.mocker.VerifyAll()
finally: finally:
while rpc_amqp.ConnectionContext._connection_pool.free_items: while impl_qpid.Connection.pool.free_items:
# Pull the mock connection object out of the connection pool so # Pull the mock connection object out of the connection pool so
# that it doesn't mess up other test cases. # that it doesn't mess up other test cases.
rpc_amqp.ConnectionContext._connection_pool.get() impl_qpid.Connection.pool.get()
@test.skip_if(qpid is None, "Test requires qpid") @test.skip_if(qpid is None, "Test requires qpid")
def test_call(self): def test_call(self):