diff --git a/nova/rpc/__init__.py b/nova/rpc/__init__.py index 1fbd9aea..a6067432 100644 --- a/nova/rpc/__init__.py +++ b/nova/rpc/__init__.py @@ -48,7 +48,7 @@ def create_connection(new=True): return _get_impl().create_connection(new=new) -def call(context, topic, msg): +def call(context, topic, msg, timeout=None): """Invoke a remote method that returns something. :param context: Information that identifies the user that has made this @@ -59,10 +59,15 @@ def call(context, topic, msg): when the consumer was created with fanout=False. :param msg: This is a dict in the form { "method" : "method_to_invoke", "args" : dict_of_kwargs } + :param timeout: int, number of seconds to use for a response timeout. + If set, this overrides the rpc_response_timeout option. :returns: A dict from the remote method. + + :raises: nova.rpc.common.Timeout if a complete response is not received + before the timeout is reached. """ - return _get_impl().call(context, topic, msg) + return _get_impl().call(context, topic, msg, timeout) def cast(context, topic, msg): @@ -102,7 +107,7 @@ def fanout_cast(context, topic, msg): return _get_impl().fanout_cast(context, topic, msg) -def multicall(context, topic, msg): +def multicall(context, topic, msg, timeout=None): """Invoke a remote method and get back an iterator. In this case, the remote method will be returning multiple values in @@ -117,13 +122,18 @@ def multicall(context, topic, msg): when the consumer was created with fanout=False. :param msg: This is a dict in the form { "method" : "method_to_invoke", "args" : dict_of_kwargs } + :param timeout: int, number of seconds to use for a response timeout. + If set, this overrides the rpc_response_timeout option. :returns: An iterator. The iterator will yield a tuple (N, X) where N is an index that starts at 0 and increases by one for each value returned and X is the Nth value that was returned by the remote method. + + :raises: nova.rpc.common.Timeout if a complete response is not received + before the timeout is reached. """ - return _get_impl().multicall(context, topic, msg) + return _get_impl().multicall(context, topic, msg, timeout) def notify(context, topic, msg): diff --git a/nova/rpc/amqp.py b/nova/rpc/amqp.py index 48310080..0995d9ab 100644 --- a/nova/rpc/amqp.py +++ b/nova/rpc/amqp.py @@ -262,9 +262,10 @@ class ProxyCallback(object): class MulticallWaiter(object): - def __init__(self, connection): + def __init__(self, connection, timeout): self._connection = connection - self._iterator = connection.iterconsume() + self._iterator = connection.iterconsume( + timeout=timeout or FLAGS.rpc_response_timeout) self._result = None self._done = False self._got_ending = False @@ -307,7 +308,7 @@ def create_connection(new=True): return ConnectionContext(pooled=not new) -def multicall(context, topic, msg): +def multicall(context, topic, msg, timeout): """Make a call that returns multiple times.""" # Can't use 'with' for multicall, as it returns an iterator # that will continue to use the connection. When it's done, @@ -320,15 +321,15 @@ def multicall(context, topic, msg): pack_context(msg, context) conn = ConnectionContext() - wait_msg = MulticallWaiter(conn) + wait_msg = MulticallWaiter(conn, timeout) conn.declare_direct_consumer(msg_id, wait_msg) conn.topic_send(topic, msg) return wait_msg -def call(context, topic, msg): +def call(context, topic, msg, timeout): """Sends a message on a topic and wait for a response.""" - rv = multicall(context, topic, msg) + rv = multicall(context, topic, msg, timeout) # NOTE(vish): return the last result from the multicall rv = list(rv) if not rv: diff --git a/nova/rpc/common.py b/nova/rpc/common.py index ff057701..70d5d07b 100644 --- a/nova/rpc/common.py +++ b/nova/rpc/common.py @@ -34,6 +34,9 @@ rpc_opts = [ cfg.IntOpt('rpc_conn_pool_size', default=30, help='Size of RPC connection pool'), + cfg.IntOpt('rpc_response_timeout', + default=3600, + help='Seconds to wait for a response from call or multicall'), ] flags.FLAGS.add_options(rpc_opts) @@ -59,6 +62,15 @@ class RemoteError(exception.NovaException): traceback=traceback) +class Timeout(exception.NovaException): + """Signifies that a timeout has occurred. + + This exception is raised if the rpc_response_timeout is reached while + waiting for a response from the remote side. + """ + message = _("Timeout while waiting on RPC response.") + + class Connection(object): """A connection, returned by rpc.create_connection(). diff --git a/nova/rpc/impl_carrot.py b/nova/rpc/impl_carrot.py index 5750e598..7ce37794 100644 --- a/nova/rpc/impl_carrot.py +++ b/nova/rpc/impl_carrot.py @@ -522,8 +522,9 @@ class RpcContext(context.RequestContext): self.msg_id = None -def multicall(context, topic, msg): +def multicall(context, topic, msg, timeout=None): """Make a call that returns multiple times.""" + # NOTE(russellb): carrot doesn't support timeouts LOG.debug(_('Making asynchronous call on %s ...'), topic) msg_id = uuid.uuid4().hex msg.update({'_msg_id': msg_id}) @@ -594,9 +595,9 @@ def create_connection(new=True): return Connection.instance(new=new) -def call(context, topic, msg): +def call(context, topic, msg, timeout=None): """Sends a message on a topic and wait for a response.""" - rv = multicall(context, topic, msg) + rv = multicall(context, topic, msg, timeout) # NOTE(vish): return the last result from the multicall rv = list(rv) if not rv: diff --git a/nova/rpc/impl_fake.py b/nova/rpc/impl_fake.py index dc30522b..6e4d2f6e 100644 --- a/nova/rpc/impl_fake.py +++ b/nova/rpc/impl_fake.py @@ -18,14 +18,21 @@ queues. Casts will block, but this is very useful for tests. """ import inspect +import signal import sys +import time import traceback +import eventlet + from nova import context +from nova import flags from nova.rpc import common as rpc_common CONSUMERS = {} +FLAGS = flags.FLAGS + class RpcContext(context.RequestContext): def __init__(self, *args, **kwargs): @@ -45,31 +52,49 @@ class Consumer(object): self.topic = topic self.proxy = proxy - def call(self, context, method, args): + def call(self, context, method, args, timeout): node_func = getattr(self.proxy, method) node_args = dict((str(k), v) for k, v in args.iteritems()) + done = eventlet.event.Event() - ctxt = RpcContext.from_dict(context.to_dict()) - try: - rval = node_func(context=ctxt, **node_args) - # Caller might have called ctxt.reply() manually - for (reply, failure) in ctxt._response: - if failure: - raise failure[0], failure[1], failure[2] - yield reply - # if ending not 'sent'...we might have more data to - # return from the function itself - if not ctxt._done: - if inspect.isgenerator(rval): - for val in rval: - yield val - else: - yield rval - except Exception: - exc_info = sys.exc_info() - raise rpc_common.RemoteError(exc_info[0].__name__, - str(exc_info[1]), - ''.join(traceback.format_exception(*exc_info))) + def _inner(): + ctxt = RpcContext.from_dict(context.to_dict()) + try: + rval = node_func(context=ctxt, **node_args) + res = [] + # Caller might have called ctxt.reply() manually + for (reply, failure) in ctxt._response: + if failure: + raise failure[0], failure[1], failure[2] + res.append(reply) + # if ending not 'sent'...we might have more data to + # return from the function itself + if not ctxt._done: + if inspect.isgenerator(rval): + for val in rval: + res.append(val) + else: + res.append(rval) + done.send(res) + except Exception: + exc_info = sys.exc_info() + done.send_exception( + rpc_common.RemoteError(exc_info[0].__name__, + str(exc_info[1]), + ''.join(traceback.format_exception(*exc_info)))) + + thread = eventlet.greenthread.spawn(_inner) + + if timeout: + start_time = time.time() + while not done.ready(): + eventlet.greenthread.sleep(1) + cur_time = time.time() + if (cur_time - start_time) > timeout: + thread.kill() + raise rpc_common.Timeout() + + return done.wait() class Connection(object): @@ -99,7 +124,7 @@ def create_connection(new=True): return Connection() -def multicall(context, topic, msg): +def multicall(context, topic, msg, timeout=None): """Make a call that returns multiple times.""" method = msg.get('method') @@ -112,12 +137,12 @@ def multicall(context, topic, msg): except (KeyError, IndexError): return iter([None]) else: - return consumer.call(context, method, args) + return consumer.call(context, method, args, timeout) -def call(context, topic, msg): +def call(context, topic, msg, timeout=None): """Sends a message on a topic and wait for a response.""" - rv = multicall(context, topic, msg) + rv = multicall(context, topic, msg, timeout) # NOTE(vish): return the last result from the multicall rv = list(rv) if not rv: diff --git a/nova/rpc/impl_kombu.py b/nova/rpc/impl_kombu.py index e2c0b903..50459e5a 100644 --- a/nova/rpc/impl_kombu.py +++ b/nova/rpc/impl_kombu.py @@ -15,6 +15,7 @@ # under the License. import itertools +import socket import sys import time import uuid @@ -425,7 +426,7 @@ class Connection(object): while True: try: return method(*args, **kwargs) - except self.connection_errors, e: + except (self.connection_errors, socket.timeout), e: pass except Exception, e: # NOTE(comstud): Unfortunately it's possible for amqplib @@ -478,15 +479,20 @@ class Connection(object): return self.ensure(_connect_error, _declare_consumer) - def iterconsume(self, limit=None): + def iterconsume(self, limit=None, timeout=None): """Return an iterator that will consume from all queues/consumers""" info = {'do_consume': True} def _error_callback(exc): - LOG.exception(_('Failed to consume message from queue: %s') % - str(exc)) - info['do_consume'] = True + if isinstance(exc, socket.timeout): + LOG.exception(_('Timed out waiting for RPC response: %s') % + str(exc)) + raise rpc_common.Timeout() + else: + LOG.exception(_('Failed to consume message from queue: %s') % + str(exc)) + info['do_consume'] = True def _consume(): if info['do_consume']: @@ -496,7 +502,7 @@ class Connection(object): queue.consume(nowait=True) queues_tail.consume(nowait=False) info['do_consume'] = False - return self.connection.drain_events() + return self.connection.drain_events(timeout=timeout) for iteration in itertools.count(0): if limit and iteration >= limit: @@ -595,14 +601,14 @@ def create_connection(new=True): return rpc_amqp.create_connection(new) -def multicall(context, topic, msg): +def multicall(context, topic, msg, timeout=None): """Make a call that returns multiple times.""" - return rpc_amqp.multicall(context, topic, msg) + return rpc_amqp.multicall(context, topic, msg, timeout) -def call(context, topic, msg): +def call(context, topic, msg, timeout=None): """Sends a message on a topic and wait for a response.""" - return rpc_amqp.call(context, topic, msg) + return rpc_amqp.call(context, topic, msg, timeout) def cast(context, topic, msg): diff --git a/nova/rpc/impl_qpid.py b/nova/rpc/impl_qpid.py index f4b6b9ff..353c7e50 100644 --- a/nova/rpc/impl_qpid.py +++ b/nova/rpc/impl_qpid.py @@ -28,6 +28,7 @@ import qpid.messaging.exceptions from nova.common import cfg from nova import flags from nova.rpc import amqp as rpc_amqp +from nova.rpc import common as rpc_common from nova.rpc.common import LOG @@ -338,7 +339,8 @@ class Connection(object): while True: try: return method(*args, **kwargs) - except qpid.messaging.exceptions.ConnectionError, e: + except (qpid.messaging.exceptions.Empty, + qpid.messaging.exceptions.ConnectionError), e: if error_callback: error_callback(e) self.reconnect() @@ -372,15 +374,20 @@ class Connection(object): return self.ensure(_connect_error, _declare_consumer) - def iterconsume(self, limit=None): + def iterconsume(self, limit=None, timeout=None): """Return an iterator that will consume from all queues/consumers""" def _error_callback(exc): - LOG.exception(_('Failed to consume message from queue: %s') % - str(exc)) + if isinstance(exc, qpid.messaging.exceptions.Empty): + LOG.exception(_('Timed out waiting for RPC response: %s') % + str(exc)) + raise rpc_common.Timeout() + else: + LOG.exception(_('Failed to consume message from queue: %s') % + str(exc)) def _consume(): - nxt_receiver = self.session.next_receiver() + nxt_receiver = self.session.next_receiver(timeout=timeout) self._lookup_consumer(nxt_receiver).consume() for iteration in itertools.count(0): @@ -483,14 +490,14 @@ def create_connection(new=True): return rpc_amqp.create_connection(new) -def multicall(context, topic, msg): +def multicall(context, topic, msg, timeout=None): """Make a call that returns multiple times.""" - return rpc_amqp.multicall(context, topic, msg) + return rpc_amqp.multicall(context, topic, msg, timeout) -def call(context, topic, msg): +def call(context, topic, msg, timeout=None): """Sends a message on a topic and wait for a response.""" - return rpc_amqp.call(context, topic, msg) + return rpc_amqp.call(context, topic, msg, timeout) def cast(context, topic, msg): diff --git a/nova/tests/rpc/common.py b/nova/tests/rpc/common.py index dc8aafcf..c41375ac 100644 --- a/nova/tests/rpc/common.py +++ b/nova/tests/rpc/common.py @@ -19,9 +19,13 @@ Unit Tests for remote procedure calls shared between all implementations """ +import time + +import nose + from nova import context from nova import log as logging -from nova.rpc.common import RemoteError +from nova.rpc.common import RemoteError, Timeout from nova import test @@ -29,13 +33,14 @@ LOG = logging.getLogger('nova.tests.rpc') class _BaseRpcTestCase(test.TestCase): - def setUp(self): + def setUp(self, supports_timeouts=True): super(_BaseRpcTestCase, self).setUp() self.conn = self.rpc.create_connection(True) self.receiver = TestReceiver() self.conn.create_consumer('test', self.receiver, False) self.conn.consume_in_thread() self.context = context.get_admin_context() + self.supports_timeouts = supports_timeouts def tearDown(self): self.conn.close() @@ -162,6 +167,28 @@ class _BaseRpcTestCase(test.TestCase): conn.close() self.assertEqual(value, result) + def test_call_timeout(self): + """Make sure rpc.call will time out""" + if not self.supports_timeouts: + raise nose.SkipTest(_("RPC backend does not support timeouts")) + + value = 42 + self.assertRaises(Timeout, + self.rpc.call, + self.context, + 'test', + {"method": "block", + "args": {"value": value}}, timeout=1) + try: + self.rpc.call(self.context, + 'test', + {"method": "block", + "args": {"value": value}}, + timeout=1) + self.fail("should have thrown Timeout") + except Timeout as exc: + pass + class TestReceiver(object): """Simple Proxy class so the consumer has methods to call. @@ -205,3 +232,7 @@ class TestReceiver(object): def fail(context, value): """Raises an exception with the value sent in.""" raise Exception(value) + + @staticmethod + def block(context, value): + time.sleep(2) diff --git a/nova/tests/rpc/test_carrot.py b/nova/tests/rpc/test_carrot.py index 2523810d..153747da 100644 --- a/nova/tests/rpc/test_carrot.py +++ b/nova/tests/rpc/test_carrot.py @@ -30,7 +30,7 @@ LOG = logging.getLogger('nova.tests.rpc') class RpcCarrotTestCase(common._BaseRpcTestCase): def setUp(self): self.rpc = impl_carrot - super(RpcCarrotTestCase, self).setUp() + super(RpcCarrotTestCase, self).setUp(supports_timeouts=False) def tearDown(self): super(RpcCarrotTestCase, self).tearDown() diff --git a/nova/tests/rpc/test_qpid.py b/nova/tests/rpc/test_qpid.py index 0417674b..9e318fbf 100644 --- a/nova/tests/rpc/test_qpid.py +++ b/nova/tests/rpc/test_qpid.py @@ -221,21 +221,25 @@ class RpcQpidTestCase(test.TestCase): self.mock_session.sender(send_addr).AndReturn(self.mock_sender) self.mock_sender.send(mox.IgnoreArg()) - self.mock_session.next_receiver().AndReturn(self.mock_receiver) + self.mock_session.next_receiver(timeout=mox.IsA(int)).AndReturn( + self.mock_receiver) self.mock_receiver.fetch().AndReturn(qpid.messaging.Message( {"result": "foo", "failure": False, "ending": False})) if multi: - self.mock_session.next_receiver().AndReturn(self.mock_receiver) + self.mock_session.next_receiver(timeout=mox.IsA(int)).AndReturn( + self.mock_receiver) self.mock_receiver.fetch().AndReturn( qpid.messaging.Message( {"result": "bar", "failure": False, "ending": False})) - self.mock_session.next_receiver().AndReturn(self.mock_receiver) + self.mock_session.next_receiver(timeout=mox.IsA(int)).AndReturn( + self.mock_receiver) self.mock_receiver.fetch().AndReturn( qpid.messaging.Message( {"result": "baz", "failure": False, "ending": False})) - self.mock_session.next_receiver().AndReturn(self.mock_receiver) + self.mock_session.next_receiver(timeout=mox.IsA(int)).AndReturn( + self.mock_receiver) self.mock_receiver.fetch().AndReturn(qpid.messaging.Message( {"failure": False, "ending": True})) self.mock_session.close()