Don't block forever for rpc.(multi)call response.

Fix bug 843200.

This patch adds a timeout for waiting for a response from rpc.call or
rpc.multicall instead of blocking for forever.

Change-Id: I5675597c7e9f3d55170837859ec516cb1c806ca3
This commit is contained in:
Russell Bryant
2012-02-01 11:32:35 -05:00
parent 7aa2b906e5
commit 29ea610dd0
10 changed files with 162 additions and 65 deletions

View File

@@ -48,7 +48,7 @@ def create_connection(new=True):
return _get_impl().create_connection(new=new)
def call(context, topic, msg):
def call(context, topic, msg, timeout=None):
"""Invoke a remote method that returns something.
:param context: Information that identifies the user that has made this
@@ -59,10 +59,15 @@ def call(context, topic, msg):
when the consumer was created with fanout=False.
:param msg: This is a dict in the form { "method" : "method_to_invoke",
"args" : dict_of_kwargs }
:param timeout: int, number of seconds to use for a response timeout.
If set, this overrides the rpc_response_timeout option.
:returns: A dict from the remote method.
:raises: nova.rpc.common.Timeout if a complete response is not received
before the timeout is reached.
"""
return _get_impl().call(context, topic, msg)
return _get_impl().call(context, topic, msg, timeout)
def cast(context, topic, msg):
@@ -102,7 +107,7 @@ def fanout_cast(context, topic, msg):
return _get_impl().fanout_cast(context, topic, msg)
def multicall(context, topic, msg):
def multicall(context, topic, msg, timeout=None):
"""Invoke a remote method and get back an iterator.
In this case, the remote method will be returning multiple values in
@@ -117,13 +122,18 @@ def multicall(context, topic, msg):
when the consumer was created with fanout=False.
:param msg: This is a dict in the form { "method" : "method_to_invoke",
"args" : dict_of_kwargs }
:param timeout: int, number of seconds to use for a response timeout.
If set, this overrides the rpc_response_timeout option.
:returns: An iterator. The iterator will yield a tuple (N, X) where N is
an index that starts at 0 and increases by one for each value
returned and X is the Nth value that was returned by the remote
method.
:raises: nova.rpc.common.Timeout if a complete response is not received
before the timeout is reached.
"""
return _get_impl().multicall(context, topic, msg)
return _get_impl().multicall(context, topic, msg, timeout)
def notify(context, topic, msg):

View File

@@ -262,9 +262,10 @@ class ProxyCallback(object):
class MulticallWaiter(object):
def __init__(self, connection):
def __init__(self, connection, timeout):
self._connection = connection
self._iterator = connection.iterconsume()
self._iterator = connection.iterconsume(
timeout=timeout or FLAGS.rpc_response_timeout)
self._result = None
self._done = False
self._got_ending = False
@@ -307,7 +308,7 @@ def create_connection(new=True):
return ConnectionContext(pooled=not new)
def multicall(context, topic, msg):
def multicall(context, topic, msg, timeout):
"""Make a call that returns multiple times."""
# Can't use 'with' for multicall, as it returns an iterator
# that will continue to use the connection. When it's done,
@@ -320,15 +321,15 @@ def multicall(context, topic, msg):
pack_context(msg, context)
conn = ConnectionContext()
wait_msg = MulticallWaiter(conn)
wait_msg = MulticallWaiter(conn, timeout)
conn.declare_direct_consumer(msg_id, wait_msg)
conn.topic_send(topic, msg)
return wait_msg
def call(context, topic, msg):
def call(context, topic, msg, timeout):
"""Sends a message on a topic and wait for a response."""
rv = multicall(context, topic, msg)
rv = multicall(context, topic, msg, timeout)
# NOTE(vish): return the last result from the multicall
rv = list(rv)
if not rv:

View File

@@ -34,6 +34,9 @@ rpc_opts = [
cfg.IntOpt('rpc_conn_pool_size',
default=30,
help='Size of RPC connection pool'),
cfg.IntOpt('rpc_response_timeout',
default=3600,
help='Seconds to wait for a response from call or multicall'),
]
flags.FLAGS.add_options(rpc_opts)
@@ -59,6 +62,15 @@ class RemoteError(exception.NovaException):
traceback=traceback)
class Timeout(exception.NovaException):
"""Signifies that a timeout has occurred.
This exception is raised if the rpc_response_timeout is reached while
waiting for a response from the remote side.
"""
message = _("Timeout while waiting on RPC response.")
class Connection(object):
"""A connection, returned by rpc.create_connection().

View File

@@ -522,8 +522,9 @@ class RpcContext(context.RequestContext):
self.msg_id = None
def multicall(context, topic, msg):
def multicall(context, topic, msg, timeout=None):
"""Make a call that returns multiple times."""
# NOTE(russellb): carrot doesn't support timeouts
LOG.debug(_('Making asynchronous call on %s ...'), topic)
msg_id = uuid.uuid4().hex
msg.update({'_msg_id': msg_id})
@@ -594,9 +595,9 @@ def create_connection(new=True):
return Connection.instance(new=new)
def call(context, topic, msg):
def call(context, topic, msg, timeout=None):
"""Sends a message on a topic and wait for a response."""
rv = multicall(context, topic, msg)
rv = multicall(context, topic, msg, timeout)
# NOTE(vish): return the last result from the multicall
rv = list(rv)
if not rv:

View File

@@ -18,14 +18,21 @@ queues. Casts will block, but this is very useful for tests.
"""
import inspect
import signal
import sys
import time
import traceback
import eventlet
from nova import context
from nova import flags
from nova.rpc import common as rpc_common
CONSUMERS = {}
FLAGS = flags.FLAGS
class RpcContext(context.RequestContext):
def __init__(self, *args, **kwargs):
@@ -45,31 +52,49 @@ class Consumer(object):
self.topic = topic
self.proxy = proxy
def call(self, context, method, args):
def call(self, context, method, args, timeout):
node_func = getattr(self.proxy, method)
node_args = dict((str(k), v) for k, v in args.iteritems())
done = eventlet.event.Event()
ctxt = RpcContext.from_dict(context.to_dict())
try:
rval = node_func(context=ctxt, **node_args)
# Caller might have called ctxt.reply() manually
for (reply, failure) in ctxt._response:
if failure:
raise failure[0], failure[1], failure[2]
yield reply
# if ending not 'sent'...we might have more data to
# return from the function itself
if not ctxt._done:
if inspect.isgenerator(rval):
for val in rval:
yield val
else:
yield rval
except Exception:
exc_info = sys.exc_info()
raise rpc_common.RemoteError(exc_info[0].__name__,
str(exc_info[1]),
''.join(traceback.format_exception(*exc_info)))
def _inner():
ctxt = RpcContext.from_dict(context.to_dict())
try:
rval = node_func(context=ctxt, **node_args)
res = []
# Caller might have called ctxt.reply() manually
for (reply, failure) in ctxt._response:
if failure:
raise failure[0], failure[1], failure[2]
res.append(reply)
# if ending not 'sent'...we might have more data to
# return from the function itself
if not ctxt._done:
if inspect.isgenerator(rval):
for val in rval:
res.append(val)
else:
res.append(rval)
done.send(res)
except Exception:
exc_info = sys.exc_info()
done.send_exception(
rpc_common.RemoteError(exc_info[0].__name__,
str(exc_info[1]),
''.join(traceback.format_exception(*exc_info))))
thread = eventlet.greenthread.spawn(_inner)
if timeout:
start_time = time.time()
while not done.ready():
eventlet.greenthread.sleep(1)
cur_time = time.time()
if (cur_time - start_time) > timeout:
thread.kill()
raise rpc_common.Timeout()
return done.wait()
class Connection(object):
@@ -99,7 +124,7 @@ def create_connection(new=True):
return Connection()
def multicall(context, topic, msg):
def multicall(context, topic, msg, timeout=None):
"""Make a call that returns multiple times."""
method = msg.get('method')
@@ -112,12 +137,12 @@ def multicall(context, topic, msg):
except (KeyError, IndexError):
return iter([None])
else:
return consumer.call(context, method, args)
return consumer.call(context, method, args, timeout)
def call(context, topic, msg):
def call(context, topic, msg, timeout=None):
"""Sends a message on a topic and wait for a response."""
rv = multicall(context, topic, msg)
rv = multicall(context, topic, msg, timeout)
# NOTE(vish): return the last result from the multicall
rv = list(rv)
if not rv:

View File

@@ -15,6 +15,7 @@
# under the License.
import itertools
import socket
import sys
import time
import uuid
@@ -425,7 +426,7 @@ class Connection(object):
while True:
try:
return method(*args, **kwargs)
except self.connection_errors, e:
except (self.connection_errors, socket.timeout), e:
pass
except Exception, e:
# NOTE(comstud): Unfortunately it's possible for amqplib
@@ -478,15 +479,20 @@ class Connection(object):
return self.ensure(_connect_error, _declare_consumer)
def iterconsume(self, limit=None):
def iterconsume(self, limit=None, timeout=None):
"""Return an iterator that will consume from all queues/consumers"""
info = {'do_consume': True}
def _error_callback(exc):
LOG.exception(_('Failed to consume message from queue: %s') %
str(exc))
info['do_consume'] = True
if isinstance(exc, socket.timeout):
LOG.exception(_('Timed out waiting for RPC response: %s') %
str(exc))
raise rpc_common.Timeout()
else:
LOG.exception(_('Failed to consume message from queue: %s') %
str(exc))
info['do_consume'] = True
def _consume():
if info['do_consume']:
@@ -496,7 +502,7 @@ class Connection(object):
queue.consume(nowait=True)
queues_tail.consume(nowait=False)
info['do_consume'] = False
return self.connection.drain_events()
return self.connection.drain_events(timeout=timeout)
for iteration in itertools.count(0):
if limit and iteration >= limit:
@@ -595,14 +601,14 @@ def create_connection(new=True):
return rpc_amqp.create_connection(new)
def multicall(context, topic, msg):
def multicall(context, topic, msg, timeout=None):
"""Make a call that returns multiple times."""
return rpc_amqp.multicall(context, topic, msg)
return rpc_amqp.multicall(context, topic, msg, timeout)
def call(context, topic, msg):
def call(context, topic, msg, timeout=None):
"""Sends a message on a topic and wait for a response."""
return rpc_amqp.call(context, topic, msg)
return rpc_amqp.call(context, topic, msg, timeout)
def cast(context, topic, msg):

View File

@@ -28,6 +28,7 @@ import qpid.messaging.exceptions
from nova.common import cfg
from nova import flags
from nova.rpc import amqp as rpc_amqp
from nova.rpc import common as rpc_common
from nova.rpc.common import LOG
@@ -338,7 +339,8 @@ class Connection(object):
while True:
try:
return method(*args, **kwargs)
except qpid.messaging.exceptions.ConnectionError, e:
except (qpid.messaging.exceptions.Empty,
qpid.messaging.exceptions.ConnectionError), e:
if error_callback:
error_callback(e)
self.reconnect()
@@ -372,15 +374,20 @@ class Connection(object):
return self.ensure(_connect_error, _declare_consumer)
def iterconsume(self, limit=None):
def iterconsume(self, limit=None, timeout=None):
"""Return an iterator that will consume from all queues/consumers"""
def _error_callback(exc):
LOG.exception(_('Failed to consume message from queue: %s') %
str(exc))
if isinstance(exc, qpid.messaging.exceptions.Empty):
LOG.exception(_('Timed out waiting for RPC response: %s') %
str(exc))
raise rpc_common.Timeout()
else:
LOG.exception(_('Failed to consume message from queue: %s') %
str(exc))
def _consume():
nxt_receiver = self.session.next_receiver()
nxt_receiver = self.session.next_receiver(timeout=timeout)
self._lookup_consumer(nxt_receiver).consume()
for iteration in itertools.count(0):
@@ -483,14 +490,14 @@ def create_connection(new=True):
return rpc_amqp.create_connection(new)
def multicall(context, topic, msg):
def multicall(context, topic, msg, timeout=None):
"""Make a call that returns multiple times."""
return rpc_amqp.multicall(context, topic, msg)
return rpc_amqp.multicall(context, topic, msg, timeout)
def call(context, topic, msg):
def call(context, topic, msg, timeout=None):
"""Sends a message on a topic and wait for a response."""
return rpc_amqp.call(context, topic, msg)
return rpc_amqp.call(context, topic, msg, timeout)
def cast(context, topic, msg):

View File

@@ -19,9 +19,13 @@
Unit Tests for remote procedure calls shared between all implementations
"""
import time
import nose
from nova import context
from nova import log as logging
from nova.rpc.common import RemoteError
from nova.rpc.common import RemoteError, Timeout
from nova import test
@@ -29,13 +33,14 @@ LOG = logging.getLogger('nova.tests.rpc')
class _BaseRpcTestCase(test.TestCase):
def setUp(self):
def setUp(self, supports_timeouts=True):
super(_BaseRpcTestCase, self).setUp()
self.conn = self.rpc.create_connection(True)
self.receiver = TestReceiver()
self.conn.create_consumer('test', self.receiver, False)
self.conn.consume_in_thread()
self.context = context.get_admin_context()
self.supports_timeouts = supports_timeouts
def tearDown(self):
self.conn.close()
@@ -162,6 +167,28 @@ class _BaseRpcTestCase(test.TestCase):
conn.close()
self.assertEqual(value, result)
def test_call_timeout(self):
"""Make sure rpc.call will time out"""
if not self.supports_timeouts:
raise nose.SkipTest(_("RPC backend does not support timeouts"))
value = 42
self.assertRaises(Timeout,
self.rpc.call,
self.context,
'test',
{"method": "block",
"args": {"value": value}}, timeout=1)
try:
self.rpc.call(self.context,
'test',
{"method": "block",
"args": {"value": value}},
timeout=1)
self.fail("should have thrown Timeout")
except Timeout as exc:
pass
class TestReceiver(object):
"""Simple Proxy class so the consumer has methods to call.
@@ -205,3 +232,7 @@ class TestReceiver(object):
def fail(context, value):
"""Raises an exception with the value sent in."""
raise Exception(value)
@staticmethod
def block(context, value):
time.sleep(2)

View File

@@ -30,7 +30,7 @@ LOG = logging.getLogger('nova.tests.rpc')
class RpcCarrotTestCase(common._BaseRpcTestCase):
def setUp(self):
self.rpc = impl_carrot
super(RpcCarrotTestCase, self).setUp()
super(RpcCarrotTestCase, self).setUp(supports_timeouts=False)
def tearDown(self):
super(RpcCarrotTestCase, self).tearDown()

View File

@@ -221,21 +221,25 @@ class RpcQpidTestCase(test.TestCase):
self.mock_session.sender(send_addr).AndReturn(self.mock_sender)
self.mock_sender.send(mox.IgnoreArg())
self.mock_session.next_receiver().AndReturn(self.mock_receiver)
self.mock_session.next_receiver(timeout=mox.IsA(int)).AndReturn(
self.mock_receiver)
self.mock_receiver.fetch().AndReturn(qpid.messaging.Message(
{"result": "foo", "failure": False, "ending": False}))
if multi:
self.mock_session.next_receiver().AndReturn(self.mock_receiver)
self.mock_session.next_receiver(timeout=mox.IsA(int)).AndReturn(
self.mock_receiver)
self.mock_receiver.fetch().AndReturn(
qpid.messaging.Message(
{"result": "bar", "failure": False,
"ending": False}))
self.mock_session.next_receiver().AndReturn(self.mock_receiver)
self.mock_session.next_receiver(timeout=mox.IsA(int)).AndReturn(
self.mock_receiver)
self.mock_receiver.fetch().AndReturn(
qpid.messaging.Message(
{"result": "baz", "failure": False,
"ending": False}))
self.mock_session.next_receiver().AndReturn(self.mock_receiver)
self.mock_session.next_receiver(timeout=mox.IsA(int)).AndReturn(
self.mock_receiver)
self.mock_receiver.fetch().AndReturn(qpid.messaging.Message(
{"failure": False, "ending": True}))
self.mock_session.close()