Merge "Don't block forever for rpc.(multi)call response."
This commit is contained in:
@@ -48,7 +48,7 @@ def create_connection(new=True):
|
||||
return _get_impl().create_connection(new=new)
|
||||
|
||||
|
||||
def call(context, topic, msg):
|
||||
def call(context, topic, msg, timeout=None):
|
||||
"""Invoke a remote method that returns something.
|
||||
|
||||
:param context: Information that identifies the user that has made this
|
||||
@@ -59,10 +59,15 @@ def call(context, topic, msg):
|
||||
when the consumer was created with fanout=False.
|
||||
:param msg: This is a dict in the form { "method" : "method_to_invoke",
|
||||
"args" : dict_of_kwargs }
|
||||
:param timeout: int, number of seconds to use for a response timeout.
|
||||
If set, this overrides the rpc_response_timeout option.
|
||||
|
||||
:returns: A dict from the remote method.
|
||||
|
||||
:raises: nova.rpc.common.Timeout if a complete response is not received
|
||||
before the timeout is reached.
|
||||
"""
|
||||
return _get_impl().call(context, topic, msg)
|
||||
return _get_impl().call(context, topic, msg, timeout)
|
||||
|
||||
|
||||
def cast(context, topic, msg):
|
||||
@@ -102,7 +107,7 @@ def fanout_cast(context, topic, msg):
|
||||
return _get_impl().fanout_cast(context, topic, msg)
|
||||
|
||||
|
||||
def multicall(context, topic, msg):
|
||||
def multicall(context, topic, msg, timeout=None):
|
||||
"""Invoke a remote method and get back an iterator.
|
||||
|
||||
In this case, the remote method will be returning multiple values in
|
||||
@@ -117,13 +122,18 @@ def multicall(context, topic, msg):
|
||||
when the consumer was created with fanout=False.
|
||||
:param msg: This is a dict in the form { "method" : "method_to_invoke",
|
||||
"args" : dict_of_kwargs }
|
||||
:param timeout: int, number of seconds to use for a response timeout.
|
||||
If set, this overrides the rpc_response_timeout option.
|
||||
|
||||
:returns: An iterator. The iterator will yield a tuple (N, X) where N is
|
||||
an index that starts at 0 and increases by one for each value
|
||||
returned and X is the Nth value that was returned by the remote
|
||||
method.
|
||||
|
||||
:raises: nova.rpc.common.Timeout if a complete response is not received
|
||||
before the timeout is reached.
|
||||
"""
|
||||
return _get_impl().multicall(context, topic, msg)
|
||||
return _get_impl().multicall(context, topic, msg, timeout)
|
||||
|
||||
|
||||
def notify(context, topic, msg):
|
||||
|
||||
@@ -262,9 +262,10 @@ class ProxyCallback(object):
|
||||
|
||||
|
||||
class MulticallWaiter(object):
|
||||
def __init__(self, connection):
|
||||
def __init__(self, connection, timeout):
|
||||
self._connection = connection
|
||||
self._iterator = connection.iterconsume()
|
||||
self._iterator = connection.iterconsume(
|
||||
timeout=timeout or FLAGS.rpc_response_timeout)
|
||||
self._result = None
|
||||
self._done = False
|
||||
self._got_ending = False
|
||||
@@ -307,7 +308,7 @@ def create_connection(new=True):
|
||||
return ConnectionContext(pooled=not new)
|
||||
|
||||
|
||||
def multicall(context, topic, msg):
|
||||
def multicall(context, topic, msg, timeout):
|
||||
"""Make a call that returns multiple times."""
|
||||
# Can't use 'with' for multicall, as it returns an iterator
|
||||
# that will continue to use the connection. When it's done,
|
||||
@@ -320,15 +321,15 @@ def multicall(context, topic, msg):
|
||||
pack_context(msg, context)
|
||||
|
||||
conn = ConnectionContext()
|
||||
wait_msg = MulticallWaiter(conn)
|
||||
wait_msg = MulticallWaiter(conn, timeout)
|
||||
conn.declare_direct_consumer(msg_id, wait_msg)
|
||||
conn.topic_send(topic, msg)
|
||||
return wait_msg
|
||||
|
||||
|
||||
def call(context, topic, msg):
|
||||
def call(context, topic, msg, timeout):
|
||||
"""Sends a message on a topic and wait for a response."""
|
||||
rv = multicall(context, topic, msg)
|
||||
rv = multicall(context, topic, msg, timeout)
|
||||
# NOTE(vish): return the last result from the multicall
|
||||
rv = list(rv)
|
||||
if not rv:
|
||||
|
||||
@@ -34,6 +34,9 @@ rpc_opts = [
|
||||
cfg.IntOpt('rpc_conn_pool_size',
|
||||
default=30,
|
||||
help='Size of RPC connection pool'),
|
||||
cfg.IntOpt('rpc_response_timeout',
|
||||
default=3600,
|
||||
help='Seconds to wait for a response from call or multicall'),
|
||||
]
|
||||
|
||||
flags.FLAGS.add_options(rpc_opts)
|
||||
@@ -59,6 +62,15 @@ class RemoteError(exception.NovaException):
|
||||
traceback=traceback)
|
||||
|
||||
|
||||
class Timeout(exception.NovaException):
|
||||
"""Signifies that a timeout has occurred.
|
||||
|
||||
This exception is raised if the rpc_response_timeout is reached while
|
||||
waiting for a response from the remote side.
|
||||
"""
|
||||
message = _("Timeout while waiting on RPC response.")
|
||||
|
||||
|
||||
class Connection(object):
|
||||
"""A connection, returned by rpc.create_connection().
|
||||
|
||||
|
||||
@@ -522,8 +522,9 @@ class RpcContext(context.RequestContext):
|
||||
self.msg_id = None
|
||||
|
||||
|
||||
def multicall(context, topic, msg):
|
||||
def multicall(context, topic, msg, timeout=None):
|
||||
"""Make a call that returns multiple times."""
|
||||
# NOTE(russellb): carrot doesn't support timeouts
|
||||
LOG.debug(_('Making asynchronous call on %s ...'), topic)
|
||||
msg_id = uuid.uuid4().hex
|
||||
msg.update({'_msg_id': msg_id})
|
||||
@@ -594,9 +595,9 @@ def create_connection(new=True):
|
||||
return Connection.instance(new=new)
|
||||
|
||||
|
||||
def call(context, topic, msg):
|
||||
def call(context, topic, msg, timeout=None):
|
||||
"""Sends a message on a topic and wait for a response."""
|
||||
rv = multicall(context, topic, msg)
|
||||
rv = multicall(context, topic, msg, timeout)
|
||||
# NOTE(vish): return the last result from the multicall
|
||||
rv = list(rv)
|
||||
if not rv:
|
||||
|
||||
@@ -18,14 +18,21 @@ queues. Casts will block, but this is very useful for tests.
|
||||
"""
|
||||
|
||||
import inspect
|
||||
import signal
|
||||
import sys
|
||||
import time
|
||||
import traceback
|
||||
|
||||
import eventlet
|
||||
|
||||
from nova import context
|
||||
from nova import flags
|
||||
from nova.rpc import common as rpc_common
|
||||
|
||||
CONSUMERS = {}
|
||||
|
||||
FLAGS = flags.FLAGS
|
||||
|
||||
|
||||
class RpcContext(context.RequestContext):
|
||||
def __init__(self, *args, **kwargs):
|
||||
@@ -45,31 +52,49 @@ class Consumer(object):
|
||||
self.topic = topic
|
||||
self.proxy = proxy
|
||||
|
||||
def call(self, context, method, args):
|
||||
def call(self, context, method, args, timeout):
|
||||
node_func = getattr(self.proxy, method)
|
||||
node_args = dict((str(k), v) for k, v in args.iteritems())
|
||||
done = eventlet.event.Event()
|
||||
|
||||
ctxt = RpcContext.from_dict(context.to_dict())
|
||||
try:
|
||||
rval = node_func(context=ctxt, **node_args)
|
||||
# Caller might have called ctxt.reply() manually
|
||||
for (reply, failure) in ctxt._response:
|
||||
if failure:
|
||||
raise failure[0], failure[1], failure[2]
|
||||
yield reply
|
||||
# if ending not 'sent'...we might have more data to
|
||||
# return from the function itself
|
||||
if not ctxt._done:
|
||||
if inspect.isgenerator(rval):
|
||||
for val in rval:
|
||||
yield val
|
||||
else:
|
||||
yield rval
|
||||
except Exception:
|
||||
exc_info = sys.exc_info()
|
||||
raise rpc_common.RemoteError(exc_info[0].__name__,
|
||||
str(exc_info[1]),
|
||||
''.join(traceback.format_exception(*exc_info)))
|
||||
def _inner():
|
||||
ctxt = RpcContext.from_dict(context.to_dict())
|
||||
try:
|
||||
rval = node_func(context=ctxt, **node_args)
|
||||
res = []
|
||||
# Caller might have called ctxt.reply() manually
|
||||
for (reply, failure) in ctxt._response:
|
||||
if failure:
|
||||
raise failure[0], failure[1], failure[2]
|
||||
res.append(reply)
|
||||
# if ending not 'sent'...we might have more data to
|
||||
# return from the function itself
|
||||
if not ctxt._done:
|
||||
if inspect.isgenerator(rval):
|
||||
for val in rval:
|
||||
res.append(val)
|
||||
else:
|
||||
res.append(rval)
|
||||
done.send(res)
|
||||
except Exception:
|
||||
exc_info = sys.exc_info()
|
||||
done.send_exception(
|
||||
rpc_common.RemoteError(exc_info[0].__name__,
|
||||
str(exc_info[1]),
|
||||
''.join(traceback.format_exception(*exc_info))))
|
||||
|
||||
thread = eventlet.greenthread.spawn(_inner)
|
||||
|
||||
if timeout:
|
||||
start_time = time.time()
|
||||
while not done.ready():
|
||||
eventlet.greenthread.sleep(1)
|
||||
cur_time = time.time()
|
||||
if (cur_time - start_time) > timeout:
|
||||
thread.kill()
|
||||
raise rpc_common.Timeout()
|
||||
|
||||
return done.wait()
|
||||
|
||||
|
||||
class Connection(object):
|
||||
@@ -99,7 +124,7 @@ def create_connection(new=True):
|
||||
return Connection()
|
||||
|
||||
|
||||
def multicall(context, topic, msg):
|
||||
def multicall(context, topic, msg, timeout=None):
|
||||
"""Make a call that returns multiple times."""
|
||||
|
||||
method = msg.get('method')
|
||||
@@ -112,12 +137,12 @@ def multicall(context, topic, msg):
|
||||
except (KeyError, IndexError):
|
||||
return iter([None])
|
||||
else:
|
||||
return consumer.call(context, method, args)
|
||||
return consumer.call(context, method, args, timeout)
|
||||
|
||||
|
||||
def call(context, topic, msg):
|
||||
def call(context, topic, msg, timeout=None):
|
||||
"""Sends a message on a topic and wait for a response."""
|
||||
rv = multicall(context, topic, msg)
|
||||
rv = multicall(context, topic, msg, timeout)
|
||||
# NOTE(vish): return the last result from the multicall
|
||||
rv = list(rv)
|
||||
if not rv:
|
||||
|
||||
@@ -15,6 +15,7 @@
|
||||
# under the License.
|
||||
|
||||
import itertools
|
||||
import socket
|
||||
import sys
|
||||
import time
|
||||
import uuid
|
||||
@@ -425,7 +426,7 @@ class Connection(object):
|
||||
while True:
|
||||
try:
|
||||
return method(*args, **kwargs)
|
||||
except self.connection_errors, e:
|
||||
except (self.connection_errors, socket.timeout), e:
|
||||
pass
|
||||
except Exception, e:
|
||||
# NOTE(comstud): Unfortunately it's possible for amqplib
|
||||
@@ -478,15 +479,20 @@ class Connection(object):
|
||||
|
||||
return self.ensure(_connect_error, _declare_consumer)
|
||||
|
||||
def iterconsume(self, limit=None):
|
||||
def iterconsume(self, limit=None, timeout=None):
|
||||
"""Return an iterator that will consume from all queues/consumers"""
|
||||
|
||||
info = {'do_consume': True}
|
||||
|
||||
def _error_callback(exc):
|
||||
LOG.exception(_('Failed to consume message from queue: %s') %
|
||||
str(exc))
|
||||
info['do_consume'] = True
|
||||
if isinstance(exc, socket.timeout):
|
||||
LOG.exception(_('Timed out waiting for RPC response: %s') %
|
||||
str(exc))
|
||||
raise rpc_common.Timeout()
|
||||
else:
|
||||
LOG.exception(_('Failed to consume message from queue: %s') %
|
||||
str(exc))
|
||||
info['do_consume'] = True
|
||||
|
||||
def _consume():
|
||||
if info['do_consume']:
|
||||
@@ -496,7 +502,7 @@ class Connection(object):
|
||||
queue.consume(nowait=True)
|
||||
queues_tail.consume(nowait=False)
|
||||
info['do_consume'] = False
|
||||
return self.connection.drain_events()
|
||||
return self.connection.drain_events(timeout=timeout)
|
||||
|
||||
for iteration in itertools.count(0):
|
||||
if limit and iteration >= limit:
|
||||
@@ -595,14 +601,14 @@ def create_connection(new=True):
|
||||
return rpc_amqp.create_connection(new)
|
||||
|
||||
|
||||
def multicall(context, topic, msg):
|
||||
def multicall(context, topic, msg, timeout=None):
|
||||
"""Make a call that returns multiple times."""
|
||||
return rpc_amqp.multicall(context, topic, msg)
|
||||
return rpc_amqp.multicall(context, topic, msg, timeout)
|
||||
|
||||
|
||||
def call(context, topic, msg):
|
||||
def call(context, topic, msg, timeout=None):
|
||||
"""Sends a message on a topic and wait for a response."""
|
||||
return rpc_amqp.call(context, topic, msg)
|
||||
return rpc_amqp.call(context, topic, msg, timeout)
|
||||
|
||||
|
||||
def cast(context, topic, msg):
|
||||
|
||||
@@ -28,6 +28,7 @@ import qpid.messaging.exceptions
|
||||
from nova.common import cfg
|
||||
from nova import flags
|
||||
from nova.rpc import amqp as rpc_amqp
|
||||
from nova.rpc import common as rpc_common
|
||||
from nova.rpc.common import LOG
|
||||
|
||||
|
||||
@@ -338,7 +339,8 @@ class Connection(object):
|
||||
while True:
|
||||
try:
|
||||
return method(*args, **kwargs)
|
||||
except qpid.messaging.exceptions.ConnectionError, e:
|
||||
except (qpid.messaging.exceptions.Empty,
|
||||
qpid.messaging.exceptions.ConnectionError), e:
|
||||
if error_callback:
|
||||
error_callback(e)
|
||||
self.reconnect()
|
||||
@@ -372,15 +374,20 @@ class Connection(object):
|
||||
|
||||
return self.ensure(_connect_error, _declare_consumer)
|
||||
|
||||
def iterconsume(self, limit=None):
|
||||
def iterconsume(self, limit=None, timeout=None):
|
||||
"""Return an iterator that will consume from all queues/consumers"""
|
||||
|
||||
def _error_callback(exc):
|
||||
LOG.exception(_('Failed to consume message from queue: %s') %
|
||||
str(exc))
|
||||
if isinstance(exc, qpid.messaging.exceptions.Empty):
|
||||
LOG.exception(_('Timed out waiting for RPC response: %s') %
|
||||
str(exc))
|
||||
raise rpc_common.Timeout()
|
||||
else:
|
||||
LOG.exception(_('Failed to consume message from queue: %s') %
|
||||
str(exc))
|
||||
|
||||
def _consume():
|
||||
nxt_receiver = self.session.next_receiver()
|
||||
nxt_receiver = self.session.next_receiver(timeout=timeout)
|
||||
self._lookup_consumer(nxt_receiver).consume()
|
||||
|
||||
for iteration in itertools.count(0):
|
||||
@@ -483,14 +490,14 @@ def create_connection(new=True):
|
||||
return rpc_amqp.create_connection(new)
|
||||
|
||||
|
||||
def multicall(context, topic, msg):
|
||||
def multicall(context, topic, msg, timeout=None):
|
||||
"""Make a call that returns multiple times."""
|
||||
return rpc_amqp.multicall(context, topic, msg)
|
||||
return rpc_amqp.multicall(context, topic, msg, timeout)
|
||||
|
||||
|
||||
def call(context, topic, msg):
|
||||
def call(context, topic, msg, timeout=None):
|
||||
"""Sends a message on a topic and wait for a response."""
|
||||
return rpc_amqp.call(context, topic, msg)
|
||||
return rpc_amqp.call(context, topic, msg, timeout)
|
||||
|
||||
|
||||
def cast(context, topic, msg):
|
||||
|
||||
@@ -19,9 +19,13 @@
|
||||
Unit Tests for remote procedure calls shared between all implementations
|
||||
"""
|
||||
|
||||
import time
|
||||
|
||||
import nose
|
||||
|
||||
from nova import context
|
||||
from nova import log as logging
|
||||
from nova.rpc.common import RemoteError
|
||||
from nova.rpc.common import RemoteError, Timeout
|
||||
from nova import test
|
||||
|
||||
|
||||
@@ -29,13 +33,14 @@ LOG = logging.getLogger('nova.tests.rpc')
|
||||
|
||||
|
||||
class _BaseRpcTestCase(test.TestCase):
|
||||
def setUp(self):
|
||||
def setUp(self, supports_timeouts=True):
|
||||
super(_BaseRpcTestCase, self).setUp()
|
||||
self.conn = self.rpc.create_connection(True)
|
||||
self.receiver = TestReceiver()
|
||||
self.conn.create_consumer('test', self.receiver, False)
|
||||
self.conn.consume_in_thread()
|
||||
self.context = context.get_admin_context()
|
||||
self.supports_timeouts = supports_timeouts
|
||||
|
||||
def tearDown(self):
|
||||
self.conn.close()
|
||||
@@ -162,6 +167,28 @@ class _BaseRpcTestCase(test.TestCase):
|
||||
conn.close()
|
||||
self.assertEqual(value, result)
|
||||
|
||||
def test_call_timeout(self):
|
||||
"""Make sure rpc.call will time out"""
|
||||
if not self.supports_timeouts:
|
||||
raise nose.SkipTest(_("RPC backend does not support timeouts"))
|
||||
|
||||
value = 42
|
||||
self.assertRaises(Timeout,
|
||||
self.rpc.call,
|
||||
self.context,
|
||||
'test',
|
||||
{"method": "block",
|
||||
"args": {"value": value}}, timeout=1)
|
||||
try:
|
||||
self.rpc.call(self.context,
|
||||
'test',
|
||||
{"method": "block",
|
||||
"args": {"value": value}},
|
||||
timeout=1)
|
||||
self.fail("should have thrown Timeout")
|
||||
except Timeout as exc:
|
||||
pass
|
||||
|
||||
|
||||
class TestReceiver(object):
|
||||
"""Simple Proxy class so the consumer has methods to call.
|
||||
@@ -205,3 +232,7 @@ class TestReceiver(object):
|
||||
def fail(context, value):
|
||||
"""Raises an exception with the value sent in."""
|
||||
raise Exception(value)
|
||||
|
||||
@staticmethod
|
||||
def block(context, value):
|
||||
time.sleep(2)
|
||||
|
||||
@@ -30,7 +30,7 @@ LOG = logging.getLogger('nova.tests.rpc')
|
||||
class RpcCarrotTestCase(common._BaseRpcTestCase):
|
||||
def setUp(self):
|
||||
self.rpc = impl_carrot
|
||||
super(RpcCarrotTestCase, self).setUp()
|
||||
super(RpcCarrotTestCase, self).setUp(supports_timeouts=False)
|
||||
|
||||
def tearDown(self):
|
||||
super(RpcCarrotTestCase, self).tearDown()
|
||||
|
||||
@@ -221,21 +221,25 @@ class RpcQpidTestCase(test.TestCase):
|
||||
self.mock_session.sender(send_addr).AndReturn(self.mock_sender)
|
||||
self.mock_sender.send(mox.IgnoreArg())
|
||||
|
||||
self.mock_session.next_receiver().AndReturn(self.mock_receiver)
|
||||
self.mock_session.next_receiver(timeout=mox.IsA(int)).AndReturn(
|
||||
self.mock_receiver)
|
||||
self.mock_receiver.fetch().AndReturn(qpid.messaging.Message(
|
||||
{"result": "foo", "failure": False, "ending": False}))
|
||||
if multi:
|
||||
self.mock_session.next_receiver().AndReturn(self.mock_receiver)
|
||||
self.mock_session.next_receiver(timeout=mox.IsA(int)).AndReturn(
|
||||
self.mock_receiver)
|
||||
self.mock_receiver.fetch().AndReturn(
|
||||
qpid.messaging.Message(
|
||||
{"result": "bar", "failure": False,
|
||||
"ending": False}))
|
||||
self.mock_session.next_receiver().AndReturn(self.mock_receiver)
|
||||
self.mock_session.next_receiver(timeout=mox.IsA(int)).AndReturn(
|
||||
self.mock_receiver)
|
||||
self.mock_receiver.fetch().AndReturn(
|
||||
qpid.messaging.Message(
|
||||
{"result": "baz", "failure": False,
|
||||
"ending": False}))
|
||||
self.mock_session.next_receiver().AndReturn(self.mock_receiver)
|
||||
self.mock_session.next_receiver(timeout=mox.IsA(int)).AndReturn(
|
||||
self.mock_receiver)
|
||||
self.mock_receiver.fetch().AndReturn(qpid.messaging.Message(
|
||||
{"failure": False, "ending": True}))
|
||||
self.mock_session.close()
|
||||
|
||||
Reference in New Issue
Block a user