bring back commits lost in merge
This commit is contained in:
107
nova/rpc.py
107
nova/rpc.py
@@ -35,6 +35,7 @@ from carrot import connection as carrot_connection
|
||||
from carrot import messaging
|
||||
import eventlet
|
||||
from eventlet import greenpool
|
||||
from eventlet import greenthread
|
||||
from eventlet import pools
|
||||
from eventlet import queue
|
||||
|
||||
@@ -140,30 +141,30 @@ class Consumer(messaging.Consumer):
|
||||
FLAGS.rabbit_max_retries)
|
||||
sys.exit(1)
|
||||
|
||||
def fetch(self, no_ack=None, auto_ack=None, enable_callbacks=False):
|
||||
"""Wraps the parent fetch with some logic for failed connection."""
|
||||
# TODO(vish): the logic for failed connections and logging should be
|
||||
# refactored into some sort of connection manager object
|
||||
try:
|
||||
if self.failed_connection:
|
||||
# NOTE(vish): connection is defined in the parent class, we can
|
||||
# recreate it as long as we create the backend too
|
||||
# pylint: disable=W0201
|
||||
self.connection = Connection.recreate()
|
||||
self.backend = self.connection.create_backend()
|
||||
self.declare()
|
||||
return super(Consumer, self).fetch(
|
||||
no_ack, auto_ack, enable_callbacks)
|
||||
if self.failed_connection:
|
||||
LOG.error(_('Reconnected to queue'))
|
||||
self.failed_connection = False
|
||||
# NOTE(vish): This is catching all errors because we really don't
|
||||
# want exceptions to be logged 10 times a second if some
|
||||
# persistent failure occurs.
|
||||
except Exception, e: # pylint: disable=W0703
|
||||
if not self.failed_connection:
|
||||
LOG.exception(_('Failed to fetch message from queue: %s' % e))
|
||||
self.failed_connection = True
|
||||
#def fetch(self, no_ack=None, auto_ack=None, enable_callbacks=False):
|
||||
# """Wraps the parent fetch with some logic for failed connection."""
|
||||
# # TODO(vish): the logic for failed connections and logging should be
|
||||
# # refactored into some sort of connection manager object
|
||||
# try:
|
||||
# if self.failed_connection:
|
||||
# # NOTE(vish): connection is defined in the parent class, we can
|
||||
# # recreate it as long as we create the backend too
|
||||
# # pylint: disable=W0201
|
||||
# self.connection = Connection.recreate()
|
||||
# self.backend = self.connection.create_backend()
|
||||
# self.declare()
|
||||
# return super(Consumer, self).fetch(
|
||||
# no_ack, auto_ack, enable_callbacks)
|
||||
# if self.failed_connection:
|
||||
# LOG.error(_('Reconnected to queue'))
|
||||
# self.failed_connection = False
|
||||
# # NOTE(vish): This is catching all errors because we really don't
|
||||
# # want exceptions to be logged 10 times a second if some
|
||||
# # persistent failure occurs.
|
||||
# except Exception, e: # pylint: disable=W0703
|
||||
# if not self.failed_connection:
|
||||
# LOG.exception(_('Failed to fetch message from queue: %s' % e))
|
||||
# self.failed_connection = True
|
||||
|
||||
def attach_to_eventlet(self):
|
||||
"""Only needed for unit tests!"""
|
||||
@@ -195,7 +196,7 @@ class AdapterConsumer(Consumer):
|
||||
|
||||
"""
|
||||
LOG.debug(_('received %s') % message_data)
|
||||
msg_id = message_data.pop('_msg_id', None)
|
||||
msg_id = message_data.get('_msg_id', None)
|
||||
|
||||
ctxt = _unpack_context(message_data)
|
||||
|
||||
@@ -225,11 +226,14 @@ class AdapterConsumer(Consumer):
|
||||
rval = node_func(context=ctxt, **node_args)
|
||||
if msg_id:
|
||||
# TODO(termie): re-enable when fix the yielding issue
|
||||
#if hasattr(rval, 'send'):
|
||||
# logging.error('rval! %s', rval)
|
||||
# for x in rval:
|
||||
# msg_reply(msg_id, x, None)
|
||||
msg_reply(msg_id, rval, None)
|
||||
if hasattr(rval, 'send'):
|
||||
logging.error('rval! %s', rval)
|
||||
for x in rval:
|
||||
msg_reply(msg_id, x, None)
|
||||
msg_reply(msg_id, None, None)
|
||||
else:
|
||||
msg_reply(msg_id, rval, None)
|
||||
#msg_reply(msg_id, rval, None)
|
||||
except Exception as e:
|
||||
logging.exception('Exception during message handling')
|
||||
if msg_id:
|
||||
@@ -355,7 +359,7 @@ class DirectConsumer(Consumer):
|
||||
self.routing_key = msg_id
|
||||
self.exchange = msg_id
|
||||
self.auto_delete = True
|
||||
self.exclusive = True
|
||||
self.exclusive = False
|
||||
super(DirectConsumer, self).__init__(connection=connection)
|
||||
|
||||
|
||||
@@ -387,7 +391,9 @@ def msg_reply(msg_id, reply=None, failure=None):
|
||||
publisher = DirectPublisher(connection=conn, msg_id=msg_id)
|
||||
try:
|
||||
publisher.send({'result': reply, 'failure': failure})
|
||||
LOG.error('MSG REPLY SUCCESS')
|
||||
except TypeError:
|
||||
LOG.error('MSG REPLY FAILURE')
|
||||
publisher.send(
|
||||
{'result': dict((k, repr(v))
|
||||
for k, v in reply.__dict__.iteritems()),
|
||||
@@ -440,9 +446,9 @@ def _pack_context(msg, context):
|
||||
for args at some point.
|
||||
|
||||
"""
|
||||
context = dict([('_context_%s' % key, value)
|
||||
for (key, value) in context.to_dict().iteritems()])
|
||||
msg.update(context)
|
||||
context_d = dict([('_context_%s' % key, value)
|
||||
for (key, value) in context.to_dict().iteritems()])
|
||||
msg.update(context_d)
|
||||
|
||||
|
||||
class RpcContext(context.RequestContext):
|
||||
@@ -463,12 +469,13 @@ def multicall(context, topic, msg):
|
||||
LOG.debug(_('MSG_ID is %s') % (msg_id))
|
||||
_pack_context(msg, context)
|
||||
|
||||
conn = ConnectionPool.get()
|
||||
consumer = DirectConsumer(connection=conn, msg_id=msg_id)
|
||||
con_conn = ConnectionPool.get()
|
||||
consumer = DirectConsumer(connection=con_conn, msg_id=msg_id)
|
||||
wait_msg = MulticallWaiter(consumer)
|
||||
consumer.register_callback(wait_msg)
|
||||
|
||||
publisher = TopicPublisher(connection=conn, topic=topic)
|
||||
pub_conn = ConnectionPool.get()
|
||||
publisher = TopicPublisher(connection=pub_conn, topic=topic)
|
||||
publisher.send(msg)
|
||||
publisher.close()
|
||||
|
||||
@@ -484,6 +491,7 @@ class MulticallWaiter(object):
|
||||
def close(self):
|
||||
self._closed = True
|
||||
self._consumer.close()
|
||||
ConnectionPool.put(self._consumer.connection)
|
||||
|
||||
def __call__(self, data, message):
|
||||
"""Acks message and sets result."""
|
||||
@@ -501,15 +509,26 @@ class MulticallWaiter(object):
|
||||
# trying to solve the problem quickly. This works but
|
||||
# I'd prefer to dig in and do it the best way later on.
|
||||
|
||||
def _waiter():
|
||||
while not self._closed:
|
||||
try:
|
||||
self._consumer.wait(limit=1)
|
||||
except StopIteration:
|
||||
pass
|
||||
eventlet.spawn(_waiter)
|
||||
#def _waiter():
|
||||
# i = 0
|
||||
# while not self._closed:
|
||||
# LOG.error('Iteration #%s (%s)', i, self._consumer.consumer_tag)
|
||||
# i += 1
|
||||
# try:
|
||||
# self._consumer.wait(limit=1)
|
||||
# except StopIteration:
|
||||
# pass
|
||||
# self._consumer.close()
|
||||
# ConnectionPool.put(self._consumer.connection)
|
||||
#eventlet.spawn(_waiter)
|
||||
|
||||
while True:
|
||||
rv = None
|
||||
while rv is None and not self._closed:
|
||||
rv = self._consumer.fetch(enable_callbacks=True)
|
||||
time.sleep(0.01)
|
||||
|
||||
LOG.error('RV %s', rv)
|
||||
result = self._results.get()
|
||||
if isinstance(result, Exception):
|
||||
raise result
|
||||
|
||||
@@ -61,6 +61,18 @@ class RpcTestCase(test.TestCase):
|
||||
self.assertEqual(value + i, x)
|
||||
i += 1
|
||||
|
||||
def test_multicall_succeed_three_times_yield(self):
|
||||
"""Get a value through rpc call"""
|
||||
value = 42
|
||||
result = rpc.multicall(self.context,
|
||||
'test',
|
||||
{"method": "echo_three_times_yield",
|
||||
"args": {"value": value}})
|
||||
i = 0
|
||||
for x in result:
|
||||
self.assertEqual(value + i, x)
|
||||
i += 1
|
||||
|
||||
def test_context_passed(self):
|
||||
"""Makes sure a context is passed through rpc call"""
|
||||
value = 42
|
||||
@@ -83,6 +95,7 @@ class RpcTestCase(test.TestCase):
|
||||
'test',
|
||||
{"method": "fail",
|
||||
"args": {"value": value}})
|
||||
LOG.error('INNNNNNN BETTTWWWWWWWWWWEEEEEEEEEEN')
|
||||
try:
|
||||
rpc.call(self.context,
|
||||
'test',
|
||||
@@ -186,6 +199,12 @@ class TestReceiver(object):
|
||||
context.reply(value + 1)
|
||||
context.reply(value + 2)
|
||||
|
||||
@staticmethod
|
||||
def echo_three_times_yield(context, value):
|
||||
yield value
|
||||
yield value + 1
|
||||
yield value + 2
|
||||
|
||||
@staticmethod
|
||||
def fail(context, value):
|
||||
"""Raises an exception with the value sent in"""
|
||||
|
||||
Reference in New Issue
Block a user