Add a connection pool for rpc cast/call

Use the same rabbit connection for all topic listening and wait to be notified vs doing a 0.1 second poll for each.
This commit is contained in:
Chris Behrens
2011-05-25 15:42:24 -07:00
committed by termie
parent 7622e854ef
commit b44c1fe956
2 changed files with 84 additions and 33 deletions

View File

@@ -35,9 +35,9 @@ from carrot import messaging
import eventlet
from eventlet import greenpool
from eventlet import greenthread
from eventlet import pools
from eventlet import queue
from nova import context
from nova import exception
from nova import fakerabbit
@@ -92,6 +92,11 @@ class Connection(carrot_connection.BrokerConnection):
pass
return cls.instance()
class Pool(pools.Pool):
def create(self):
return Connection.instance(new=True)
ConnectionPool = Pool(max_size=20)
class Consumer(messaging.Consumer):
"""Consumer base class.
@@ -163,21 +168,9 @@ class AdapterConsumer(Consumer):
self.pool = greenpool.GreenPool(FLAGS.rpc_thread_pool_size)
super(AdapterConsumer, self).__init__(connection=connection,
topic=topic)
self.register_callback(self.process_data)
def receive(self, *args, **kwargs):
self.pool.spawn_n(self._receive, *args, **kwargs)
@exception.wrap_exception
def _receive(self, message_data, message):
"""Magically looks for a method on the proxy object and calls it.
Message data should be a dictionary with two keys:
method: string representing the method to call
args: dictionary of arg: value
Example: {'method': 'echo', 'args': {'value': 42}}
"""
def process_data(self, message_data, message):
LOG.debug(_('received %s') % message_data)
msg_id = message_data.pop('_msg_id', None)
@@ -194,6 +187,19 @@ class AdapterConsumer(Consumer):
LOG.warn(_('no method for message: %s') % message_data)
msg_reply(msg_id, _('No method for message: %s') % message_data)
return
self.pool.spawn_n(self._process_data, msg_id, ctxt, method, args)
@exception.wrap_exception
def _process_data(self, msg_id, ctxt, method, args):
"""Magically looks for a method on the proxy object and calls it.
Message data should be a dictionary with two keys:
method: string representing the method to call
args: dictionary of arg: value
Example: {'method': 'echo', 'args': {'value': 42}}
"""
node_func = getattr(self.proxy, str(method))
node_args = dict((str(k), v) for k, v in args.iteritems())
@@ -214,11 +220,6 @@ class AdapterConsumer(Consumer):
return
class Publisher(messaging.Publisher):
"""Publisher base class."""
pass
class TopicAdapterConsumer(AdapterConsumer):
"""Consumes messages on a specific topic."""
@@ -251,6 +252,50 @@ class FanoutAdapterConsumer(AdapterConsumer):
topic=topic, proxy=proxy)
class ConsumerSet(object):
"""Groups consumers to listen on together on a single connection"""
def __init__(self, conn, consumer_list):
self.consumer_list = set(consumer_list)
self.consumer_set = None
self.init(conn)
def init(self, conn):
if not conn:
conn = Connection.instance(new=True)
if self.consumer_set:
self.consumer_set.close()
self.consumer_set = messaging.ConsumerSet(conn)
for consumer in self.consumer_list:
consumer.connection = conn
# consumer.backend is set for us
self.consumer_set.add_consumer(consumer)
def reconnect(self):
self.init(None)
def wait(self, limit=None):
while True:
it = self.consumer_set.iterconsume(limit=limit)
while True:
try:
it.next()
except StopIteration:
return
except Exception as e:
LOG.error(_("Received exception %s " % str(e) + \
"while processing consumer"))
fuck
self.reconnect()
# Break to outer loop
break
class Publisher(messaging.Publisher):
"""Publisher base class."""
pass
class TopicPublisher(Publisher):
"""Publishes messages on a specific topic."""
@@ -315,7 +360,7 @@ def msg_reply(msg_id, reply=None, failure=None):
LOG.error(_("Returning exception %s to caller"), message)
LOG.error(tb)
failure = (failure[0].__name__, str(failure[1]), tb)
conn = Connection.instance()
conn = ConnectionPool.get()
publisher = DirectPublisher(connection=conn, msg_id=msg_id)
try:
publisher.send({'result': reply, 'failure': failure})
@@ -324,7 +369,9 @@ def msg_reply(msg_id, reply=None, failure=None):
{'result': dict((k, repr(v))
for k, v in reply.__dict__.iteritems()),
'failure': failure})
publisher.close()
ConnectionPool.put(conn)
class RemoteError(exception.Error):
@@ -393,12 +440,11 @@ def multicall(context, topic, msg):
LOG.debug(_('MSG_ID is %s') % (msg_id))
_pack_context(msg, context)
conn = Connection.instance()
conn = ConnectionPool.get()
consumer = DirectConsumer(connection=conn, msg_id=msg_id)
wait_msg = MulticallWaiter(consumer)
consumer.register_callback(wait_msg)
conn = Connection.instance()
publisher = TopicPublisher(connection=conn, topic=topic)
publisher.send(msg)
publisher.close()
@@ -462,10 +508,11 @@ def cast(context, topic, msg):
"""Sends a message on a topic without waiting for a response."""
LOG.debug(_('Making asynchronous cast on %s...'), topic)
_pack_context(msg, context)
conn = Connection.instance()
conn = ConnectionPool.get()
publisher = TopicPublisher(connection=conn, topic=topic)
publisher.send(msg)
publisher.close()
ConnectionPool.put(conn)
def fanout_cast(context, topic, msg):
@@ -511,6 +558,7 @@ def send_message(topic, message, wait=True):
if wait:
consumer.wait()
consumer.close()
if __name__ == '__main__':

View File

@@ -91,26 +91,29 @@ class Service(object):
if 'nova-compute' == self.binary:
self.manager.update_available_resource(ctxt)
conn1 = rpc.Connection.instance(new=True)
conn2 = rpc.Connection.instance(new=True)
conn3 = rpc.Connection.instance(new=True)
if self.report_interval:
conn = rpc.Connection.instance(new=True)
# Share this same connection for these Consumers
consumer_all = rpc.TopicAdapterConsumer(
connection=conn1,
connection=conn,
topic=self.topic,
proxy=self)
consumer_node = rpc.TopicAdapterConsumer(
connection=conn2,
connection=conn,
topic='%s.%s' % (self.topic, self.host),
proxy=self)
fanout = rpc.FanoutAdapterConsumer(
connection=conn3,
connection=conn,
topic=self.topic,
proxy=self)
self.timers.append(consumer_all.attach_to_eventlet())
self.timers.append(consumer_node.attach_to_eventlet())
self.timers.append(fanout.attach_to_eventlet())
cset = rpc.ConsumerSet(conn, [consumer_all,
consumer_node,
fanout])
# Wait forever, processing these consumers
greenthread.spawn_n(cset.wait)
pulse = utils.LoopingCall(self.report_state)
pulse.start(interval=self.report_interval, now=False)