Fixed rpc consumer to use unique return connection to prevent overlap. This could be reworked to share a connection, but it should be a wait operation and not a fast poll like it was before. We could also keep a cache of opened connections to be used between requests.
This commit is contained in:
13
nova/rpc.py
13
nova/rpc.py
@@ -46,9 +46,9 @@ LOG.setLevel(logging.DEBUG)
|
||||
class Connection(carrot_connection.BrokerConnection):
|
||||
"""Connection instance object"""
|
||||
@classmethod
|
||||
def instance(cls):
|
||||
def instance(cls, new=False):
|
||||
"""Returns the instance"""
|
||||
if not hasattr(cls, '_instance'):
|
||||
if new or not hasattr(cls, '_instance'):
|
||||
params = dict(hostname=FLAGS.rabbit_host,
|
||||
port=FLAGS.rabbit_port,
|
||||
userid=FLAGS.rabbit_userid,
|
||||
@@ -60,7 +60,10 @@ class Connection(carrot_connection.BrokerConnection):
|
||||
|
||||
# NOTE(vish): magic is fun!
|
||||
# pylint: disable-msg=W0142
|
||||
cls._instance = cls(**params)
|
||||
if new:
|
||||
return cls(**params)
|
||||
else:
|
||||
cls._instance = cls(**params)
|
||||
return cls._instance
|
||||
|
||||
@classmethod
|
||||
@@ -263,8 +266,6 @@ def call(topic, msg):
|
||||
msg.update({'_msg_id': msg_id})
|
||||
LOG.debug("MSG_ID is %s" % (msg_id))
|
||||
|
||||
conn = Connection.instance()
|
||||
|
||||
class WaitMessage(object):
|
||||
|
||||
def __call__(self, data, message):
|
||||
@@ -276,9 +277,11 @@ def call(topic, msg):
|
||||
self.result = data['result']
|
||||
|
||||
wait_msg = WaitMessage()
|
||||
conn = Connection.instance(True)
|
||||
consumer = DirectConsumer(connection=conn, msg_id=msg_id)
|
||||
consumer.register_callback(wait_msg)
|
||||
|
||||
conn = Connection.instance()
|
||||
publisher = TopicPublisher(connection=conn, topic=topic)
|
||||
publisher.send(msg)
|
||||
publisher.close()
|
||||
|
Reference in New Issue
Block a user