add support to rpc for multicall

This commit is contained in:
termie
2011-05-25 15:42:24 -07:00
parent 8b896f5925
commit 1f23c30868
2 changed files with 90 additions and 26 deletions

View File

@@ -32,8 +32,11 @@ import uuid
from carrot import connection as carrot_connection from carrot import connection as carrot_connection
from carrot import messaging from carrot import messaging
import eventlet
from eventlet import greenpool from eventlet import greenpool
from eventlet import greenthread from eventlet import greenthread
from eventlet import queue
from nova import context from nova import context
from nova import exception from nova import exception
@@ -131,7 +134,8 @@ class Consumer(messaging.Consumer):
self.connection = Connection.recreate() self.connection = Connection.recreate()
self.backend = self.connection.create_backend() self.backend = self.connection.create_backend()
self.declare() self.declare()
super(Consumer, self).fetch(no_ack, auto_ack, enable_callbacks) return super(Consumer, self).fetch(
no_ack, auto_ack, enable_callbacks)
if self.failed_connection: if self.failed_connection:
LOG.error(_('Reconnected to queue')) LOG.error(_('Reconnected to queue'))
self.failed_connection = False self.failed_connection = False
@@ -347,8 +351,9 @@ def _unpack_context(msg):
if key.startswith('_context_'): if key.startswith('_context_'):
value = msg.pop(key) value = msg.pop(key)
context_dict[key[9:]] = value context_dict[key[9:]] = value
context_dict['msg_id'] = msg.pop('_msg_id', None)
LOG.debug(_('unpacked context: %s'), context_dict) LOG.debug(_('unpacked context: %s'), context_dict)
return context.RequestContext.from_dict(context_dict) return RpcContext.from_dict(context_dict)
def _pack_context(msg, context): def _pack_context(msg, context):
@@ -365,26 +370,27 @@ def _pack_context(msg, context):
msg.update(context) msg.update(context)
def call(context, topic, msg): class RpcContext(context.RequestContext):
"""Sends a message on a topic and wait for a response.""" def __init__(self, *args, **kwargs):
msg_id = kwargs.pop('msg_id', None)
self.msg_id = msg_id
super(RpcContext, self).__init__(*args, **kwargs)
def reply(self, *args, **kwargs):
msg_reply(self.msg_id, *args, **kwargs)
def multicall(context, topic, msg):
"""Make a call that returns multiple times."""
LOG.debug(_('Making asynchronous call on %s ...'), topic) LOG.debug(_('Making asynchronous call on %s ...'), topic)
msg_id = uuid.uuid4().hex msg_id = uuid.uuid4().hex
msg.update({'_msg_id': msg_id}) msg.update({'_msg_id': msg_id})
LOG.debug(_('MSG_ID is %s') % (msg_id)) LOG.debug(_('MSG_ID is %s') % (msg_id))
_pack_context(msg, context) _pack_context(msg, context)
class WaitMessage(object):
def __call__(self, data, message):
"""Acks message and sets result."""
message.ack()
if data['failure']:
self.result = RemoteError(*data['failure'])
else:
self.result = data['result']
wait_msg = WaitMessage()
conn = Connection.instance() conn = Connection.instance()
consumer = DirectConsumer(connection=conn, msg_id=msg_id) consumer = DirectConsumer(connection=conn, msg_id=msg_id)
wait_msg = MulticallWaiter(consumer)
consumer.register_callback(wait_msg) consumer.register_callback(wait_msg)
conn = Connection.instance() conn = Connection.instance()
@@ -392,18 +398,59 @@ def call(context, topic, msg):
publisher.send(msg) publisher.send(msg)
publisher.close() publisher.close()
return wait_msg
class MulticallWaiter(object):
def __init__(self, consumer):
self._consumer = consumer
self._results = queue.Queue()
self._closed = False
def close(self):
self._closed = True
self._consumer.close()
def __call__(self, data, message):
"""Acks message and sets result."""
message.ack()
if data['failure']:
self._results.put(RemoteError(*data['failure']))
else:
self._results.put(data['result'])
def __iter__(self):
return self.wait()
def wait(self):
# TODO(termie): This is probably really a much simpler issue but am
# trying to solve the problem quickly. This works but
# I'd prefer to dig in and do it the best way later on.
def _waiter():
while not self._closed:
try: try:
consumer.wait(limit=1) self._consumer.wait(limit=1)
except StopIteration: except StopIteration:
pass pass
consumer.close() eventlet.spawn(_waiter)
# NOTE(termie): this is a little bit of a change from the original
# non-eventlet code where returning a Failure while True:
# instance from a deferred call is very similar to result = self._results.get()
# raising an exception if isinstance(result, Exception):
if isinstance(wait_msg.result, Exception): raise result
raise wait_msg.result if result == None:
return wait_msg.result self.close()
raise StopIteration
yield result
def call(context, topic, msg):
"""Sends a message on a topic and wait for a response."""
rv = multicall(context, topic, msg)
for x in rv:
rv.close()
return x
def cast(context, topic, msg): def cast(context, topic, msg):

View File

@@ -49,6 +49,17 @@ class RpcTestCase(test.TestCase):
"args": {"value": value}}) "args": {"value": value}})
self.assertEqual(value, result) self.assertEqual(value, result)
def test_multicall_succeed_three_times(self):
"""Get a value through rpc call"""
value = 42
result = rpc.multicall(self.context,
'test',
{"method": "echo_three_times",
"args": {"value": value}})
for x in result:
self.assertEqual(value, x)
def test_context_passed(self): def test_context_passed(self):
"""Makes sure a context is passed through rpc call""" """Makes sure a context is passed through rpc call"""
value = 42 value = 42
@@ -126,6 +137,12 @@ class TestReceiver(object):
LOG.debug(_("Received %s"), context) LOG.debug(_("Received %s"), context)
return context.to_dict() return context.to_dict()
@staticmethod
def echo_three_times(context, value):
context.reply(value)
context.reply(value)
context.reply(value)
@staticmethod @staticmethod
def fail(context, value): def fail(context, value):
"""Raises an exception with the value sent in""" """Raises an exception with the value sent in"""