ditched rpc.create_consumer(conn) interface... instead you now do conn.create_consumer(..
This commit is contained in:
@@ -24,7 +24,7 @@ from nova import flags
|
||||
FLAGS = flags.FLAGS
|
||||
flags.DEFINE_string('rpc_backend',
|
||||
'kombu',
|
||||
"The messaging module to use, defaults to carrot.")
|
||||
"The messaging module to use, defaults to kombu.")
|
||||
|
||||
impl_table = {'kombu': 'nova.rpc.impl_kombu',
|
||||
'amqp': 'nova.rpc.impl_kombu',
|
||||
@@ -41,10 +41,6 @@ def create_connection(new=True):
|
||||
return RPCIMPL.create_connection(new=new)
|
||||
|
||||
|
||||
def create_consumer(conn, topic, proxy, fanout=False):
|
||||
RPCIMPL.create_consumer(conn, topic, proxy, fanout)
|
||||
|
||||
|
||||
def call(context, topic, msg):
|
||||
return RPCIMPL.call(context, topic, msg)
|
||||
|
||||
|
||||
@@ -132,6 +132,20 @@ class Connection(carrot_connection.BrokerConnection):
|
||||
pass
|
||||
self._rpc_consumer_thread = None
|
||||
|
||||
def create_consumer(self, topic, proxy, fanout=False):
|
||||
"""Create a consumer that calls methods in the proxy"""
|
||||
if fanout:
|
||||
consumer = FanoutAdapterConsumer(
|
||||
connection=self,
|
||||
topic=topic,
|
||||
proxy=proxy)
|
||||
else:
|
||||
consumer = TopicAdapterConsumer(
|
||||
connection=self,
|
||||
topic=topic,
|
||||
proxy=proxy)
|
||||
self._rpc_consumers.append(consumer)
|
||||
|
||||
|
||||
class Pool(pools.Pool):
|
||||
"""Class that implements a Pool of Connections."""
|
||||
@@ -187,7 +201,6 @@ class Consumer(messaging.Consumer):
|
||||
LOG.error(_('Unable to connect to AMQP server '
|
||||
'after %(tries)d tries. Shutting down.') % locals())
|
||||
sys.exit(1)
|
||||
connection._rpc_consumers.append(self)
|
||||
|
||||
def fetch(self, no_ack=None, auto_ack=None, enable_callbacks=False):
|
||||
"""Wraps the parent fetch with some logic for failed connection."""
|
||||
@@ -568,20 +581,6 @@ def create_connection(new=True):
|
||||
return Connection.instance(new=new)
|
||||
|
||||
|
||||
def create_consumer(conn, topic, proxy, fanout=False):
|
||||
"""Create a consumer that calls methods in the proxy"""
|
||||
if fanout:
|
||||
return FanoutAdapterConsumer(
|
||||
connection=conn,
|
||||
topic=topic,
|
||||
proxy=proxy)
|
||||
else:
|
||||
return TopicAdapterConsumer(
|
||||
connection=conn,
|
||||
topic=topic,
|
||||
proxy=proxy)
|
||||
|
||||
|
||||
def call(context, topic, msg):
|
||||
"""Sends a message on a topic and wait for a response."""
|
||||
rv = multicall(context, topic, msg)
|
||||
|
||||
@@ -404,26 +404,6 @@ class Connection(object):
|
||||
'%s' % str(e)))
|
||||
self.reconnect()
|
||||
|
||||
def consume(self, limit=None):
|
||||
"""Consume from all queues/consumers"""
|
||||
it = self.iterconsume(limit=limit)
|
||||
while True:
|
||||
try:
|
||||
it.next()
|
||||
except StopIteration:
|
||||
return
|
||||
|
||||
def consume_in_thread(self):
|
||||
"""Consumer from all queues/consumers in a greenthread"""
|
||||
def _consumer_thread():
|
||||
try:
|
||||
self.consume()
|
||||
except greenlet.GreenletExit:
|
||||
return
|
||||
if self.consumer_thread is None:
|
||||
self.consumer_thread = eventlet.spawn(_consumer_thread)
|
||||
return self.consumer_thread
|
||||
|
||||
def cancel_consumer_thread(self):
|
||||
"""Cancel a consumer thread"""
|
||||
if self.consumer_thread is not None:
|
||||
@@ -478,6 +458,33 @@ class Connection(object):
|
||||
"""Send a 'fanout' message"""
|
||||
self.publisher_send(FanoutPublisher, topic, msg)
|
||||
|
||||
def consume(self, limit=None):
|
||||
"""Consume from all queues/consumers"""
|
||||
it = self.iterconsume(limit=limit)
|
||||
while True:
|
||||
try:
|
||||
it.next()
|
||||
except StopIteration:
|
||||
return
|
||||
|
||||
def consume_in_thread(self):
|
||||
"""Consumer from all queues/consumers in a greenthread"""
|
||||
def _consumer_thread():
|
||||
try:
|
||||
self.consume()
|
||||
except greenlet.GreenletExit:
|
||||
return
|
||||
if self.consumer_thread is None:
|
||||
self.consumer_thread = eventlet.spawn(_consumer_thread)
|
||||
return self.consumer_thread
|
||||
|
||||
def create_consumer(self, topic, proxy, fanout=False):
|
||||
"""Create a consumer that calls a method in a proxy object"""
|
||||
if fanout:
|
||||
self.declare_fanout_consumer(topic, ProxyCallback(proxy))
|
||||
else:
|
||||
self.declare_topic_consumer(topic, ProxyCallback(proxy))
|
||||
|
||||
|
||||
class Pool(pools.Pool):
|
||||
"""Class that implements a Pool of Connections."""
|
||||
@@ -678,14 +685,6 @@ def create_connection(new=True):
|
||||
return ConnectionContext(pooled=not new)
|
||||
|
||||
|
||||
def create_consumer(conn, topic, proxy, fanout=False):
|
||||
"""Create a consumer that calls a method in a proxy object"""
|
||||
if fanout:
|
||||
conn.declare_fanout_consumer(topic, ProxyCallback(proxy))
|
||||
else:
|
||||
conn.declare_topic_consumer(topic, ProxyCallback(proxy))
|
||||
|
||||
|
||||
def multicall(context, topic, msg):
|
||||
"""Make a call that returns multiple times."""
|
||||
# Can't use 'with' for multicall, as it returns an iterator
|
||||
|
||||
Reference in New Issue
Block a user