ditched rpc.create_consumer(conn) interface... instead you now do conn.create_consumer(..
This commit is contained in:
parent
c0775bfd7d
commit
ee15f2a582
@ -24,7 +24,7 @@ from nova import flags
|
|||||||
FLAGS = flags.FLAGS
|
FLAGS = flags.FLAGS
|
||||||
flags.DEFINE_string('rpc_backend',
|
flags.DEFINE_string('rpc_backend',
|
||||||
'kombu',
|
'kombu',
|
||||||
"The messaging module to use, defaults to carrot.")
|
"The messaging module to use, defaults to kombu.")
|
||||||
|
|
||||||
impl_table = {'kombu': 'nova.rpc.impl_kombu',
|
impl_table = {'kombu': 'nova.rpc.impl_kombu',
|
||||||
'amqp': 'nova.rpc.impl_kombu',
|
'amqp': 'nova.rpc.impl_kombu',
|
||||||
@ -41,10 +41,6 @@ def create_connection(new=True):
|
|||||||
return RPCIMPL.create_connection(new=new)
|
return RPCIMPL.create_connection(new=new)
|
||||||
|
|
||||||
|
|
||||||
def create_consumer(conn, topic, proxy, fanout=False):
|
|
||||||
RPCIMPL.create_consumer(conn, topic, proxy, fanout)
|
|
||||||
|
|
||||||
|
|
||||||
def call(context, topic, msg):
|
def call(context, topic, msg):
|
||||||
return RPCIMPL.call(context, topic, msg)
|
return RPCIMPL.call(context, topic, msg)
|
||||||
|
|
||||||
|
@ -132,6 +132,20 @@ class Connection(carrot_connection.BrokerConnection):
|
|||||||
pass
|
pass
|
||||||
self._rpc_consumer_thread = None
|
self._rpc_consumer_thread = None
|
||||||
|
|
||||||
|
def create_consumer(self, topic, proxy, fanout=False):
|
||||||
|
"""Create a consumer that calls methods in the proxy"""
|
||||||
|
if fanout:
|
||||||
|
consumer = FanoutAdapterConsumer(
|
||||||
|
connection=self,
|
||||||
|
topic=topic,
|
||||||
|
proxy=proxy)
|
||||||
|
else:
|
||||||
|
consumer = TopicAdapterConsumer(
|
||||||
|
connection=self,
|
||||||
|
topic=topic,
|
||||||
|
proxy=proxy)
|
||||||
|
self._rpc_consumers.append(consumer)
|
||||||
|
|
||||||
|
|
||||||
class Pool(pools.Pool):
|
class Pool(pools.Pool):
|
||||||
"""Class that implements a Pool of Connections."""
|
"""Class that implements a Pool of Connections."""
|
||||||
@ -187,7 +201,6 @@ class Consumer(messaging.Consumer):
|
|||||||
LOG.error(_('Unable to connect to AMQP server '
|
LOG.error(_('Unable to connect to AMQP server '
|
||||||
'after %(tries)d tries. Shutting down.') % locals())
|
'after %(tries)d tries. Shutting down.') % locals())
|
||||||
sys.exit(1)
|
sys.exit(1)
|
||||||
connection._rpc_consumers.append(self)
|
|
||||||
|
|
||||||
def fetch(self, no_ack=None, auto_ack=None, enable_callbacks=False):
|
def fetch(self, no_ack=None, auto_ack=None, enable_callbacks=False):
|
||||||
"""Wraps the parent fetch with some logic for failed connection."""
|
"""Wraps the parent fetch with some logic for failed connection."""
|
||||||
@ -568,20 +581,6 @@ def create_connection(new=True):
|
|||||||
return Connection.instance(new=new)
|
return Connection.instance(new=new)
|
||||||
|
|
||||||
|
|
||||||
def create_consumer(conn, topic, proxy, fanout=False):
|
|
||||||
"""Create a consumer that calls methods in the proxy"""
|
|
||||||
if fanout:
|
|
||||||
return FanoutAdapterConsumer(
|
|
||||||
connection=conn,
|
|
||||||
topic=topic,
|
|
||||||
proxy=proxy)
|
|
||||||
else:
|
|
||||||
return TopicAdapterConsumer(
|
|
||||||
connection=conn,
|
|
||||||
topic=topic,
|
|
||||||
proxy=proxy)
|
|
||||||
|
|
||||||
|
|
||||||
def call(context, topic, msg):
|
def call(context, topic, msg):
|
||||||
"""Sends a message on a topic and wait for a response."""
|
"""Sends a message on a topic and wait for a response."""
|
||||||
rv = multicall(context, topic, msg)
|
rv = multicall(context, topic, msg)
|
||||||
|
@ -404,26 +404,6 @@ class Connection(object):
|
|||||||
'%s' % str(e)))
|
'%s' % str(e)))
|
||||||
self.reconnect()
|
self.reconnect()
|
||||||
|
|
||||||
def consume(self, limit=None):
|
|
||||||
"""Consume from all queues/consumers"""
|
|
||||||
it = self.iterconsume(limit=limit)
|
|
||||||
while True:
|
|
||||||
try:
|
|
||||||
it.next()
|
|
||||||
except StopIteration:
|
|
||||||
return
|
|
||||||
|
|
||||||
def consume_in_thread(self):
|
|
||||||
"""Consumer from all queues/consumers in a greenthread"""
|
|
||||||
def _consumer_thread():
|
|
||||||
try:
|
|
||||||
self.consume()
|
|
||||||
except greenlet.GreenletExit:
|
|
||||||
return
|
|
||||||
if self.consumer_thread is None:
|
|
||||||
self.consumer_thread = eventlet.spawn(_consumer_thread)
|
|
||||||
return self.consumer_thread
|
|
||||||
|
|
||||||
def cancel_consumer_thread(self):
|
def cancel_consumer_thread(self):
|
||||||
"""Cancel a consumer thread"""
|
"""Cancel a consumer thread"""
|
||||||
if self.consumer_thread is not None:
|
if self.consumer_thread is not None:
|
||||||
@ -478,6 +458,33 @@ class Connection(object):
|
|||||||
"""Send a 'fanout' message"""
|
"""Send a 'fanout' message"""
|
||||||
self.publisher_send(FanoutPublisher, topic, msg)
|
self.publisher_send(FanoutPublisher, topic, msg)
|
||||||
|
|
||||||
|
def consume(self, limit=None):
|
||||||
|
"""Consume from all queues/consumers"""
|
||||||
|
it = self.iterconsume(limit=limit)
|
||||||
|
while True:
|
||||||
|
try:
|
||||||
|
it.next()
|
||||||
|
except StopIteration:
|
||||||
|
return
|
||||||
|
|
||||||
|
def consume_in_thread(self):
|
||||||
|
"""Consumer from all queues/consumers in a greenthread"""
|
||||||
|
def _consumer_thread():
|
||||||
|
try:
|
||||||
|
self.consume()
|
||||||
|
except greenlet.GreenletExit:
|
||||||
|
return
|
||||||
|
if self.consumer_thread is None:
|
||||||
|
self.consumer_thread = eventlet.spawn(_consumer_thread)
|
||||||
|
return self.consumer_thread
|
||||||
|
|
||||||
|
def create_consumer(self, topic, proxy, fanout=False):
|
||||||
|
"""Create a consumer that calls a method in a proxy object"""
|
||||||
|
if fanout:
|
||||||
|
self.declare_fanout_consumer(topic, ProxyCallback(proxy))
|
||||||
|
else:
|
||||||
|
self.declare_topic_consumer(topic, ProxyCallback(proxy))
|
||||||
|
|
||||||
|
|
||||||
class Pool(pools.Pool):
|
class Pool(pools.Pool):
|
||||||
"""Class that implements a Pool of Connections."""
|
"""Class that implements a Pool of Connections."""
|
||||||
@ -678,14 +685,6 @@ def create_connection(new=True):
|
|||||||
return ConnectionContext(pooled=not new)
|
return ConnectionContext(pooled=not new)
|
||||||
|
|
||||||
|
|
||||||
def create_consumer(conn, topic, proxy, fanout=False):
|
|
||||||
"""Create a consumer that calls a method in a proxy object"""
|
|
||||||
if fanout:
|
|
||||||
conn.declare_fanout_consumer(topic, ProxyCallback(proxy))
|
|
||||||
else:
|
|
||||||
conn.declare_topic_consumer(topic, ProxyCallback(proxy))
|
|
||||||
|
|
||||||
|
|
||||||
def multicall(context, topic, msg):
|
def multicall(context, topic, msg):
|
||||||
"""Make a call that returns multiple times."""
|
"""Make a call that returns multiple times."""
|
||||||
# Can't use 'with' for multicall, as it returns an iterator
|
# Can't use 'with' for multicall, as it returns an iterator
|
||||||
|
@ -153,14 +153,12 @@ class Service(object):
|
|||||||
self.topic)
|
self.topic)
|
||||||
|
|
||||||
# Share this same connection for these Consumers
|
# Share this same connection for these Consumers
|
||||||
rpc.create_consumer(self.conn, self.topic, self,
|
self.conn.create_consumer(self.topic, self, fanout=False)
|
||||||
fanout=False)
|
|
||||||
|
|
||||||
node_topic = '%s.%s' % (self.topic, self.host)
|
node_topic = '%s.%s' % (self.topic, self.host)
|
||||||
rpc.create_consumer(self.conn, node_topic, self,
|
self.conn.create_consumer(node_topic, self, fanout=False)
|
||||||
fanout=False)
|
|
||||||
|
|
||||||
rpc.create_consumer(self.conn, self.topic, self, fanout=True)
|
self.conn.create_consumer(self.topic, self, fanout=True)
|
||||||
|
|
||||||
# Consume from all consumers in a thread
|
# Consume from all consumers in a thread
|
||||||
self.conn.consume_in_thread()
|
self.conn.consume_in_thread()
|
||||||
|
@ -33,10 +33,7 @@ class RpcTestCase(test.TestCase):
|
|||||||
super(RpcTestCase, self).setUp()
|
super(RpcTestCase, self).setUp()
|
||||||
self.conn = rpc.create_connection(True)
|
self.conn = rpc.create_connection(True)
|
||||||
self.receiver = TestReceiver()
|
self.receiver = TestReceiver()
|
||||||
rpc.create_consumer(self.conn,
|
self.conn.create_consumer('test', self.receiver, False)
|
||||||
'test',
|
|
||||||
self.receiver,
|
|
||||||
False)
|
|
||||||
self.conn.consume_in_thread()
|
self.conn.consume_in_thread()
|
||||||
self.context = context.get_admin_context()
|
self.context = context.get_admin_context()
|
||||||
|
|
||||||
@ -143,10 +140,7 @@ class RpcTestCase(test.TestCase):
|
|||||||
|
|
||||||
nested = Nested()
|
nested = Nested()
|
||||||
conn = rpc.create_connection(True)
|
conn = rpc.create_connection(True)
|
||||||
rpc.create_consumer(conn,
|
conn.create_consumer('nested', nested, False)
|
||||||
'nested',
|
|
||||||
nested,
|
|
||||||
False)
|
|
||||||
conn.consume_in_thread()
|
conn.consume_in_thread()
|
||||||
value = 42
|
value = 42
|
||||||
result = rpc.call(self.context,
|
result = rpc.call(self.context,
|
||||||
|
@ -33,10 +33,7 @@ class RpcCarrotTestCase(test.TestCase):
|
|||||||
super(RpcCarrotTestCase, self).setUp()
|
super(RpcCarrotTestCase, self).setUp()
|
||||||
self.conn = rpc.create_connection(True)
|
self.conn = rpc.create_connection(True)
|
||||||
self.receiver = TestReceiver()
|
self.receiver = TestReceiver()
|
||||||
rpc.create_consumer(self.conn,
|
self.conn.create_consumer('test', self.receiver, False)
|
||||||
'test',
|
|
||||||
self.receiver,
|
|
||||||
False)
|
|
||||||
self.conn.consume_in_thread()
|
self.conn.consume_in_thread()
|
||||||
self.context = context.get_admin_context()
|
self.context = context.get_admin_context()
|
||||||
|
|
||||||
@ -151,10 +148,7 @@ class RpcCarrotTestCase(test.TestCase):
|
|||||||
|
|
||||||
nested = Nested()
|
nested = Nested()
|
||||||
conn = rpc.create_connection(True)
|
conn = rpc.create_connection(True)
|
||||||
rpc.create_consumer(conn,
|
conn.create_consumer('nested', nested, False)
|
||||||
'nested',
|
|
||||||
nested,
|
|
||||||
False)
|
|
||||||
conn.consume_in_thread()
|
conn.consume_in_thread()
|
||||||
value = 42
|
value = 42
|
||||||
result = rpc.call(self.context,
|
result = rpc.call(self.context,
|
||||||
|
@ -33,10 +33,7 @@ class RpcKombuTestCase(test.TestCase):
|
|||||||
super(RpcKombuTestCase, self).setUp()
|
super(RpcKombuTestCase, self).setUp()
|
||||||
self.conn = rpc.create_connection()
|
self.conn = rpc.create_connection()
|
||||||
self.receiver = TestReceiver()
|
self.receiver = TestReceiver()
|
||||||
rpc.create_consumer(self.conn,
|
self.conn.create_consumer('test', self.receiver, False)
|
||||||
'test',
|
|
||||||
self.receiver,
|
|
||||||
False)
|
|
||||||
self.conn.consume_in_thread()
|
self.conn.consume_in_thread()
|
||||||
self.context = context.get_admin_context()
|
self.context = context.get_admin_context()
|
||||||
|
|
||||||
@ -215,10 +212,7 @@ class RpcKombuTestCase(test.TestCase):
|
|||||||
|
|
||||||
nested = Nested()
|
nested = Nested()
|
||||||
conn = rpc.create_connection(True)
|
conn = rpc.create_connection(True)
|
||||||
rpc.create_consumer(conn,
|
conn.create_consumer('nested', nested, False)
|
||||||
'nested',
|
|
||||||
nested,
|
|
||||||
False)
|
|
||||||
conn.consume_in_thread()
|
conn.consume_in_thread()
|
||||||
value = 42
|
value = 42
|
||||||
result = rpc.call(self.context,
|
result = rpc.call(self.context,
|
||||||
|
@ -40,6 +40,5 @@ class IsolationTestCase(test.TestCase):
|
|||||||
|
|
||||||
connection = rpc.create_connection(new=True)
|
connection = rpc.create_connection(new=True)
|
||||||
proxy = NeverCalled()
|
proxy = NeverCalled()
|
||||||
rpc.create_consumer(connection, 'compute',
|
connection.create_consumer('compute', proxy, fanout=False)
|
||||||
proxy, fanout=False)
|
|
||||||
connection.consume_in_thread()
|
connection.consume_in_thread()
|
||||||
|
Loading…
Reference in New Issue
Block a user