Don't block forever for rpc.(multi)call response.
Fix bug 843200. This patch adds a timeout for waiting for a response from rpc.call or rpc.multicall instead of blocking for forever. Change-Id: I5675597c7e9f3d55170837859ec516cb1c806ca3
This commit is contained in:
		@@ -48,7 +48,7 @@ def create_connection(new=True):
 | 
				
			|||||||
    return _get_impl().create_connection(new=new)
 | 
					    return _get_impl().create_connection(new=new)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
def call(context, topic, msg):
 | 
					def call(context, topic, msg, timeout=None):
 | 
				
			||||||
    """Invoke a remote method that returns something.
 | 
					    """Invoke a remote method that returns something.
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    :param context: Information that identifies the user that has made this
 | 
					    :param context: Information that identifies the user that has made this
 | 
				
			||||||
@@ -59,10 +59,15 @@ def call(context, topic, msg):
 | 
				
			|||||||
                  when the consumer was created with fanout=False.
 | 
					                  when the consumer was created with fanout=False.
 | 
				
			||||||
    :param msg: This is a dict in the form { "method" : "method_to_invoke",
 | 
					    :param msg: This is a dict in the form { "method" : "method_to_invoke",
 | 
				
			||||||
                                             "args" : dict_of_kwargs }
 | 
					                                             "args" : dict_of_kwargs }
 | 
				
			||||||
 | 
					    :param timeout: int, number of seconds to use for a response timeout.
 | 
				
			||||||
 | 
					                    If set, this overrides the rpc_response_timeout option.
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    :returns: A dict from the remote method.
 | 
					    :returns: A dict from the remote method.
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    :raises: nova.rpc.common.Timeout if a complete response is not received
 | 
				
			||||||
 | 
					             before the timeout is reached.
 | 
				
			||||||
    """
 | 
					    """
 | 
				
			||||||
    return _get_impl().call(context, topic, msg)
 | 
					    return _get_impl().call(context, topic, msg, timeout)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
def cast(context, topic, msg):
 | 
					def cast(context, topic, msg):
 | 
				
			||||||
@@ -102,7 +107,7 @@ def fanout_cast(context, topic, msg):
 | 
				
			|||||||
    return _get_impl().fanout_cast(context, topic, msg)
 | 
					    return _get_impl().fanout_cast(context, topic, msg)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
def multicall(context, topic, msg):
 | 
					def multicall(context, topic, msg, timeout=None):
 | 
				
			||||||
    """Invoke a remote method and get back an iterator.
 | 
					    """Invoke a remote method and get back an iterator.
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    In this case, the remote method will be returning multiple values in
 | 
					    In this case, the remote method will be returning multiple values in
 | 
				
			||||||
@@ -117,13 +122,18 @@ def multicall(context, topic, msg):
 | 
				
			|||||||
                  when the consumer was created with fanout=False.
 | 
					                  when the consumer was created with fanout=False.
 | 
				
			||||||
    :param msg: This is a dict in the form { "method" : "method_to_invoke",
 | 
					    :param msg: This is a dict in the form { "method" : "method_to_invoke",
 | 
				
			||||||
                                             "args" : dict_of_kwargs }
 | 
					                                             "args" : dict_of_kwargs }
 | 
				
			||||||
 | 
					    :param timeout: int, number of seconds to use for a response timeout.
 | 
				
			||||||
 | 
					                    If set, this overrides the rpc_response_timeout option.
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    :returns: An iterator.  The iterator will yield a tuple (N, X) where N is
 | 
					    :returns: An iterator.  The iterator will yield a tuple (N, X) where N is
 | 
				
			||||||
              an index that starts at 0 and increases by one for each value
 | 
					              an index that starts at 0 and increases by one for each value
 | 
				
			||||||
              returned and X is the Nth value that was returned by the remote
 | 
					              returned and X is the Nth value that was returned by the remote
 | 
				
			||||||
              method.
 | 
					              method.
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    :raises: nova.rpc.common.Timeout if a complete response is not received
 | 
				
			||||||
 | 
					             before the timeout is reached.
 | 
				
			||||||
    """
 | 
					    """
 | 
				
			||||||
    return _get_impl().multicall(context, topic, msg)
 | 
					    return _get_impl().multicall(context, topic, msg, timeout)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
def notify(context, topic, msg):
 | 
					def notify(context, topic, msg):
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -262,9 +262,10 @@ class ProxyCallback(object):
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
class MulticallWaiter(object):
 | 
					class MulticallWaiter(object):
 | 
				
			||||||
    def __init__(self, connection):
 | 
					    def __init__(self, connection, timeout):
 | 
				
			||||||
        self._connection = connection
 | 
					        self._connection = connection
 | 
				
			||||||
        self._iterator = connection.iterconsume()
 | 
					        self._iterator = connection.iterconsume(
 | 
				
			||||||
 | 
					                                timeout=timeout or FLAGS.rpc_response_timeout)
 | 
				
			||||||
        self._result = None
 | 
					        self._result = None
 | 
				
			||||||
        self._done = False
 | 
					        self._done = False
 | 
				
			||||||
        self._got_ending = False
 | 
					        self._got_ending = False
 | 
				
			||||||
@@ -307,7 +308,7 @@ def create_connection(new=True):
 | 
				
			|||||||
    return ConnectionContext(pooled=not new)
 | 
					    return ConnectionContext(pooled=not new)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
def multicall(context, topic, msg):
 | 
					def multicall(context, topic, msg, timeout):
 | 
				
			||||||
    """Make a call that returns multiple times."""
 | 
					    """Make a call that returns multiple times."""
 | 
				
			||||||
    # Can't use 'with' for multicall, as it returns an iterator
 | 
					    # Can't use 'with' for multicall, as it returns an iterator
 | 
				
			||||||
    # that will continue to use the connection.  When it's done,
 | 
					    # that will continue to use the connection.  When it's done,
 | 
				
			||||||
@@ -320,15 +321,15 @@ def multicall(context, topic, msg):
 | 
				
			|||||||
    pack_context(msg, context)
 | 
					    pack_context(msg, context)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    conn = ConnectionContext()
 | 
					    conn = ConnectionContext()
 | 
				
			||||||
    wait_msg = MulticallWaiter(conn)
 | 
					    wait_msg = MulticallWaiter(conn, timeout)
 | 
				
			||||||
    conn.declare_direct_consumer(msg_id, wait_msg)
 | 
					    conn.declare_direct_consumer(msg_id, wait_msg)
 | 
				
			||||||
    conn.topic_send(topic, msg)
 | 
					    conn.topic_send(topic, msg)
 | 
				
			||||||
    return wait_msg
 | 
					    return wait_msg
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
def call(context, topic, msg):
 | 
					def call(context, topic, msg, timeout):
 | 
				
			||||||
    """Sends a message on a topic and wait for a response."""
 | 
					    """Sends a message on a topic and wait for a response."""
 | 
				
			||||||
    rv = multicall(context, topic, msg)
 | 
					    rv = multicall(context, topic, msg, timeout)
 | 
				
			||||||
    # NOTE(vish): return the last result from the multicall
 | 
					    # NOTE(vish): return the last result from the multicall
 | 
				
			||||||
    rv = list(rv)
 | 
					    rv = list(rv)
 | 
				
			||||||
    if not rv:
 | 
					    if not rv:
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -34,6 +34,9 @@ rpc_opts = [
 | 
				
			|||||||
    cfg.IntOpt('rpc_conn_pool_size',
 | 
					    cfg.IntOpt('rpc_conn_pool_size',
 | 
				
			||||||
               default=30,
 | 
					               default=30,
 | 
				
			||||||
               help='Size of RPC connection pool'),
 | 
					               help='Size of RPC connection pool'),
 | 
				
			||||||
 | 
					    cfg.IntOpt('rpc_response_timeout',
 | 
				
			||||||
 | 
					               default=3600,
 | 
				
			||||||
 | 
					               help='Seconds to wait for a response from call or multicall'),
 | 
				
			||||||
    ]
 | 
					    ]
 | 
				
			||||||
 | 
					
 | 
				
			||||||
flags.FLAGS.add_options(rpc_opts)
 | 
					flags.FLAGS.add_options(rpc_opts)
 | 
				
			||||||
@@ -59,6 +62,15 @@ class RemoteError(exception.NovaException):
 | 
				
			|||||||
                                          traceback=traceback)
 | 
					                                          traceback=traceback)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					class Timeout(exception.NovaException):
 | 
				
			||||||
 | 
					    """Signifies that a timeout has occurred.
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    This exception is raised if the rpc_response_timeout is reached while
 | 
				
			||||||
 | 
					    waiting for a response from the remote side.
 | 
				
			||||||
 | 
					    """
 | 
				
			||||||
 | 
					    message = _("Timeout while waiting on RPC response.")
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
class Connection(object):
 | 
					class Connection(object):
 | 
				
			||||||
    """A connection, returned by rpc.create_connection().
 | 
					    """A connection, returned by rpc.create_connection().
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -522,8 +522,9 @@ class RpcContext(context.RequestContext):
 | 
				
			|||||||
                self.msg_id = None
 | 
					                self.msg_id = None
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
def multicall(context, topic, msg):
 | 
					def multicall(context, topic, msg, timeout=None):
 | 
				
			||||||
    """Make a call that returns multiple times."""
 | 
					    """Make a call that returns multiple times."""
 | 
				
			||||||
 | 
					    # NOTE(russellb): carrot doesn't support timeouts
 | 
				
			||||||
    LOG.debug(_('Making asynchronous call on %s ...'), topic)
 | 
					    LOG.debug(_('Making asynchronous call on %s ...'), topic)
 | 
				
			||||||
    msg_id = uuid.uuid4().hex
 | 
					    msg_id = uuid.uuid4().hex
 | 
				
			||||||
    msg.update({'_msg_id': msg_id})
 | 
					    msg.update({'_msg_id': msg_id})
 | 
				
			||||||
@@ -594,9 +595,9 @@ def create_connection(new=True):
 | 
				
			|||||||
    return Connection.instance(new=new)
 | 
					    return Connection.instance(new=new)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
def call(context, topic, msg):
 | 
					def call(context, topic, msg, timeout=None):
 | 
				
			||||||
    """Sends a message on a topic and wait for a response."""
 | 
					    """Sends a message on a topic and wait for a response."""
 | 
				
			||||||
    rv = multicall(context, topic, msg)
 | 
					    rv = multicall(context, topic, msg, timeout)
 | 
				
			||||||
    # NOTE(vish): return the last result from the multicall
 | 
					    # NOTE(vish): return the last result from the multicall
 | 
				
			||||||
    rv = list(rv)
 | 
					    rv = list(rv)
 | 
				
			||||||
    if not rv:
 | 
					    if not rv:
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -18,14 +18,21 @@ queues.  Casts will block, but this is very useful for tests.
 | 
				
			|||||||
"""
 | 
					"""
 | 
				
			||||||
 | 
					
 | 
				
			||||||
import inspect
 | 
					import inspect
 | 
				
			||||||
 | 
					import signal
 | 
				
			||||||
import sys
 | 
					import sys
 | 
				
			||||||
 | 
					import time
 | 
				
			||||||
import traceback
 | 
					import traceback
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					import eventlet
 | 
				
			||||||
 | 
					
 | 
				
			||||||
from nova import context
 | 
					from nova import context
 | 
				
			||||||
 | 
					from nova import flags
 | 
				
			||||||
from nova.rpc import common as rpc_common
 | 
					from nova.rpc import common as rpc_common
 | 
				
			||||||
 | 
					
 | 
				
			||||||
CONSUMERS = {}
 | 
					CONSUMERS = {}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					FLAGS = flags.FLAGS
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
class RpcContext(context.RequestContext):
 | 
					class RpcContext(context.RequestContext):
 | 
				
			||||||
    def __init__(self, *args, **kwargs):
 | 
					    def __init__(self, *args, **kwargs):
 | 
				
			||||||
@@ -45,31 +52,49 @@ class Consumer(object):
 | 
				
			|||||||
        self.topic = topic
 | 
					        self.topic = topic
 | 
				
			||||||
        self.proxy = proxy
 | 
					        self.proxy = proxy
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    def call(self, context, method, args):
 | 
					    def call(self, context, method, args, timeout):
 | 
				
			||||||
        node_func = getattr(self.proxy, method)
 | 
					        node_func = getattr(self.proxy, method)
 | 
				
			||||||
        node_args = dict((str(k), v) for k, v in args.iteritems())
 | 
					        node_args = dict((str(k), v) for k, v in args.iteritems())
 | 
				
			||||||
 | 
					        done = eventlet.event.Event()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        def _inner():
 | 
				
			||||||
            ctxt = RpcContext.from_dict(context.to_dict())
 | 
					            ctxt = RpcContext.from_dict(context.to_dict())
 | 
				
			||||||
            try:
 | 
					            try:
 | 
				
			||||||
                rval = node_func(context=ctxt, **node_args)
 | 
					                rval = node_func(context=ctxt, **node_args)
 | 
				
			||||||
 | 
					                res = []
 | 
				
			||||||
                # Caller might have called ctxt.reply() manually
 | 
					                # Caller might have called ctxt.reply() manually
 | 
				
			||||||
                for (reply, failure) in ctxt._response:
 | 
					                for (reply, failure) in ctxt._response:
 | 
				
			||||||
                    if failure:
 | 
					                    if failure:
 | 
				
			||||||
                        raise failure[0], failure[1], failure[2]
 | 
					                        raise failure[0], failure[1], failure[2]
 | 
				
			||||||
                yield reply
 | 
					                    res.append(reply)
 | 
				
			||||||
                # if ending not 'sent'...we might have more data to
 | 
					                # if ending not 'sent'...we might have more data to
 | 
				
			||||||
                # return from the function itself
 | 
					                # return from the function itself
 | 
				
			||||||
                if not ctxt._done:
 | 
					                if not ctxt._done:
 | 
				
			||||||
                    if inspect.isgenerator(rval):
 | 
					                    if inspect.isgenerator(rval):
 | 
				
			||||||
                        for val in rval:
 | 
					                        for val in rval:
 | 
				
			||||||
                        yield val
 | 
					                            res.append(val)
 | 
				
			||||||
                    else:
 | 
					                    else:
 | 
				
			||||||
                    yield rval
 | 
					                        res.append(rval)
 | 
				
			||||||
 | 
					                done.send(res)
 | 
				
			||||||
            except Exception:
 | 
					            except Exception:
 | 
				
			||||||
                exc_info = sys.exc_info()
 | 
					                exc_info = sys.exc_info()
 | 
				
			||||||
            raise rpc_common.RemoteError(exc_info[0].__name__,
 | 
					                done.send_exception(
 | 
				
			||||||
 | 
					                        rpc_common.RemoteError(exc_info[0].__name__,
 | 
				
			||||||
                            str(exc_info[1]),
 | 
					                            str(exc_info[1]),
 | 
				
			||||||
                    ''.join(traceback.format_exception(*exc_info)))
 | 
					                            ''.join(traceback.format_exception(*exc_info))))
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        thread = eventlet.greenthread.spawn(_inner)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        if timeout:
 | 
				
			||||||
 | 
					            start_time = time.time()
 | 
				
			||||||
 | 
					            while not done.ready():
 | 
				
			||||||
 | 
					                eventlet.greenthread.sleep(1)
 | 
				
			||||||
 | 
					                cur_time = time.time()
 | 
				
			||||||
 | 
					                if (cur_time - start_time) > timeout:
 | 
				
			||||||
 | 
					                    thread.kill()
 | 
				
			||||||
 | 
					                    raise rpc_common.Timeout()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        return done.wait()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
class Connection(object):
 | 
					class Connection(object):
 | 
				
			||||||
@@ -99,7 +124,7 @@ def create_connection(new=True):
 | 
				
			|||||||
    return Connection()
 | 
					    return Connection()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
def multicall(context, topic, msg):
 | 
					def multicall(context, topic, msg, timeout=None):
 | 
				
			||||||
    """Make a call that returns multiple times."""
 | 
					    """Make a call that returns multiple times."""
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    method = msg.get('method')
 | 
					    method = msg.get('method')
 | 
				
			||||||
@@ -112,12 +137,12 @@ def multicall(context, topic, msg):
 | 
				
			|||||||
    except (KeyError, IndexError):
 | 
					    except (KeyError, IndexError):
 | 
				
			||||||
        return iter([None])
 | 
					        return iter([None])
 | 
				
			||||||
    else:
 | 
					    else:
 | 
				
			||||||
        return consumer.call(context, method, args)
 | 
					        return consumer.call(context, method, args, timeout)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
def call(context, topic, msg):
 | 
					def call(context, topic, msg, timeout=None):
 | 
				
			||||||
    """Sends a message on a topic and wait for a response."""
 | 
					    """Sends a message on a topic and wait for a response."""
 | 
				
			||||||
    rv = multicall(context, topic, msg)
 | 
					    rv = multicall(context, topic, msg, timeout)
 | 
				
			||||||
    # NOTE(vish): return the last result from the multicall
 | 
					    # NOTE(vish): return the last result from the multicall
 | 
				
			||||||
    rv = list(rv)
 | 
					    rv = list(rv)
 | 
				
			||||||
    if not rv:
 | 
					    if not rv:
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -15,6 +15,7 @@
 | 
				
			|||||||
#    under the License.
 | 
					#    under the License.
 | 
				
			||||||
 | 
					
 | 
				
			||||||
import itertools
 | 
					import itertools
 | 
				
			||||||
 | 
					import socket
 | 
				
			||||||
import sys
 | 
					import sys
 | 
				
			||||||
import time
 | 
					import time
 | 
				
			||||||
import uuid
 | 
					import uuid
 | 
				
			||||||
@@ -425,7 +426,7 @@ class Connection(object):
 | 
				
			|||||||
        while True:
 | 
					        while True:
 | 
				
			||||||
            try:
 | 
					            try:
 | 
				
			||||||
                return method(*args, **kwargs)
 | 
					                return method(*args, **kwargs)
 | 
				
			||||||
            except self.connection_errors, e:
 | 
					            except (self.connection_errors, socket.timeout), e:
 | 
				
			||||||
                pass
 | 
					                pass
 | 
				
			||||||
            except Exception, e:
 | 
					            except Exception, e:
 | 
				
			||||||
                # NOTE(comstud): Unfortunately it's possible for amqplib
 | 
					                # NOTE(comstud): Unfortunately it's possible for amqplib
 | 
				
			||||||
@@ -478,12 +479,17 @@ class Connection(object):
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
        return self.ensure(_connect_error, _declare_consumer)
 | 
					        return self.ensure(_connect_error, _declare_consumer)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    def iterconsume(self, limit=None):
 | 
					    def iterconsume(self, limit=None, timeout=None):
 | 
				
			||||||
        """Return an iterator that will consume from all queues/consumers"""
 | 
					        """Return an iterator that will consume from all queues/consumers"""
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        info = {'do_consume': True}
 | 
					        info = {'do_consume': True}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        def _error_callback(exc):
 | 
					        def _error_callback(exc):
 | 
				
			||||||
 | 
					            if isinstance(exc, socket.timeout):
 | 
				
			||||||
 | 
					                LOG.exception(_('Timed out waiting for RPC response: %s') %
 | 
				
			||||||
 | 
					                        str(exc))
 | 
				
			||||||
 | 
					                raise rpc_common.Timeout()
 | 
				
			||||||
 | 
					            else:
 | 
				
			||||||
                LOG.exception(_('Failed to consume message from queue: %s') %
 | 
					                LOG.exception(_('Failed to consume message from queue: %s') %
 | 
				
			||||||
                        str(exc))
 | 
					                        str(exc))
 | 
				
			||||||
                info['do_consume'] = True
 | 
					                info['do_consume'] = True
 | 
				
			||||||
@@ -496,7 +502,7 @@ class Connection(object):
 | 
				
			|||||||
                    queue.consume(nowait=True)
 | 
					                    queue.consume(nowait=True)
 | 
				
			||||||
                queues_tail.consume(nowait=False)
 | 
					                queues_tail.consume(nowait=False)
 | 
				
			||||||
                info['do_consume'] = False
 | 
					                info['do_consume'] = False
 | 
				
			||||||
            return self.connection.drain_events()
 | 
					            return self.connection.drain_events(timeout=timeout)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        for iteration in itertools.count(0):
 | 
					        for iteration in itertools.count(0):
 | 
				
			||||||
            if limit and iteration >= limit:
 | 
					            if limit and iteration >= limit:
 | 
				
			||||||
@@ -595,14 +601,14 @@ def create_connection(new=True):
 | 
				
			|||||||
    return rpc_amqp.create_connection(new)
 | 
					    return rpc_amqp.create_connection(new)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
def multicall(context, topic, msg):
 | 
					def multicall(context, topic, msg, timeout=None):
 | 
				
			||||||
    """Make a call that returns multiple times."""
 | 
					    """Make a call that returns multiple times."""
 | 
				
			||||||
    return rpc_amqp.multicall(context, topic, msg)
 | 
					    return rpc_amqp.multicall(context, topic, msg, timeout)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
def call(context, topic, msg):
 | 
					def call(context, topic, msg, timeout=None):
 | 
				
			||||||
    """Sends a message on a topic and wait for a response."""
 | 
					    """Sends a message on a topic and wait for a response."""
 | 
				
			||||||
    return rpc_amqp.call(context, topic, msg)
 | 
					    return rpc_amqp.call(context, topic, msg, timeout)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
def cast(context, topic, msg):
 | 
					def cast(context, topic, msg):
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -28,6 +28,7 @@ import qpid.messaging.exceptions
 | 
				
			|||||||
from nova.common import cfg
 | 
					from nova.common import cfg
 | 
				
			||||||
from nova import flags
 | 
					from nova import flags
 | 
				
			||||||
from nova.rpc import amqp as rpc_amqp
 | 
					from nova.rpc import amqp as rpc_amqp
 | 
				
			||||||
 | 
					from nova.rpc import common as rpc_common
 | 
				
			||||||
from nova.rpc.common import LOG
 | 
					from nova.rpc.common import LOG
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@@ -338,7 +339,8 @@ class Connection(object):
 | 
				
			|||||||
        while True:
 | 
					        while True:
 | 
				
			||||||
            try:
 | 
					            try:
 | 
				
			||||||
                return method(*args, **kwargs)
 | 
					                return method(*args, **kwargs)
 | 
				
			||||||
            except qpid.messaging.exceptions.ConnectionError, e:
 | 
					            except (qpid.messaging.exceptions.Empty,
 | 
				
			||||||
 | 
					                    qpid.messaging.exceptions.ConnectionError), e:
 | 
				
			||||||
                if error_callback:
 | 
					                if error_callback:
 | 
				
			||||||
                    error_callback(e)
 | 
					                    error_callback(e)
 | 
				
			||||||
                self.reconnect()
 | 
					                self.reconnect()
 | 
				
			||||||
@@ -372,15 +374,20 @@ class Connection(object):
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
        return self.ensure(_connect_error, _declare_consumer)
 | 
					        return self.ensure(_connect_error, _declare_consumer)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    def iterconsume(self, limit=None):
 | 
					    def iterconsume(self, limit=None, timeout=None):
 | 
				
			||||||
        """Return an iterator that will consume from all queues/consumers"""
 | 
					        """Return an iterator that will consume from all queues/consumers"""
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        def _error_callback(exc):
 | 
					        def _error_callback(exc):
 | 
				
			||||||
 | 
					            if isinstance(exc, qpid.messaging.exceptions.Empty):
 | 
				
			||||||
 | 
					                LOG.exception(_('Timed out waiting for RPC response: %s') %
 | 
				
			||||||
 | 
					                        str(exc))
 | 
				
			||||||
 | 
					                raise rpc_common.Timeout()
 | 
				
			||||||
 | 
					            else:
 | 
				
			||||||
                LOG.exception(_('Failed to consume message from queue: %s') %
 | 
					                LOG.exception(_('Failed to consume message from queue: %s') %
 | 
				
			||||||
                        str(exc))
 | 
					                        str(exc))
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        def _consume():
 | 
					        def _consume():
 | 
				
			||||||
            nxt_receiver = self.session.next_receiver()
 | 
					            nxt_receiver = self.session.next_receiver(timeout=timeout)
 | 
				
			||||||
            self._lookup_consumer(nxt_receiver).consume()
 | 
					            self._lookup_consumer(nxt_receiver).consume()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        for iteration in itertools.count(0):
 | 
					        for iteration in itertools.count(0):
 | 
				
			||||||
@@ -483,14 +490,14 @@ def create_connection(new=True):
 | 
				
			|||||||
    return rpc_amqp.create_connection(new)
 | 
					    return rpc_amqp.create_connection(new)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
def multicall(context, topic, msg):
 | 
					def multicall(context, topic, msg, timeout=None):
 | 
				
			||||||
    """Make a call that returns multiple times."""
 | 
					    """Make a call that returns multiple times."""
 | 
				
			||||||
    return rpc_amqp.multicall(context, topic, msg)
 | 
					    return rpc_amqp.multicall(context, topic, msg, timeout)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
def call(context, topic, msg):
 | 
					def call(context, topic, msg, timeout=None):
 | 
				
			||||||
    """Sends a message on a topic and wait for a response."""
 | 
					    """Sends a message on a topic and wait for a response."""
 | 
				
			||||||
    return rpc_amqp.call(context, topic, msg)
 | 
					    return rpc_amqp.call(context, topic, msg, timeout)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
def cast(context, topic, msg):
 | 
					def cast(context, topic, msg):
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -19,9 +19,13 @@
 | 
				
			|||||||
Unit Tests for remote procedure calls shared between all implementations
 | 
					Unit Tests for remote procedure calls shared between all implementations
 | 
				
			||||||
"""
 | 
					"""
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					import time
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					import nose
 | 
				
			||||||
 | 
					
 | 
				
			||||||
from nova import context
 | 
					from nova import context
 | 
				
			||||||
from nova import log as logging
 | 
					from nova import log as logging
 | 
				
			||||||
from nova.rpc.common import RemoteError
 | 
					from nova.rpc.common import RemoteError, Timeout
 | 
				
			||||||
from nova import test
 | 
					from nova import test
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@@ -29,13 +33,14 @@ LOG = logging.getLogger('nova.tests.rpc')
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
class _BaseRpcTestCase(test.TestCase):
 | 
					class _BaseRpcTestCase(test.TestCase):
 | 
				
			||||||
    def setUp(self):
 | 
					    def setUp(self, supports_timeouts=True):
 | 
				
			||||||
        super(_BaseRpcTestCase, self).setUp()
 | 
					        super(_BaseRpcTestCase, self).setUp()
 | 
				
			||||||
        self.conn = self.rpc.create_connection(True)
 | 
					        self.conn = self.rpc.create_connection(True)
 | 
				
			||||||
        self.receiver = TestReceiver()
 | 
					        self.receiver = TestReceiver()
 | 
				
			||||||
        self.conn.create_consumer('test', self.receiver, False)
 | 
					        self.conn.create_consumer('test', self.receiver, False)
 | 
				
			||||||
        self.conn.consume_in_thread()
 | 
					        self.conn.consume_in_thread()
 | 
				
			||||||
        self.context = context.get_admin_context()
 | 
					        self.context = context.get_admin_context()
 | 
				
			||||||
 | 
					        self.supports_timeouts = supports_timeouts
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    def tearDown(self):
 | 
					    def tearDown(self):
 | 
				
			||||||
        self.conn.close()
 | 
					        self.conn.close()
 | 
				
			||||||
@@ -162,6 +167,28 @@ class _BaseRpcTestCase(test.TestCase):
 | 
				
			|||||||
        conn.close()
 | 
					        conn.close()
 | 
				
			||||||
        self.assertEqual(value, result)
 | 
					        self.assertEqual(value, result)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    def test_call_timeout(self):
 | 
				
			||||||
 | 
					        """Make sure rpc.call will time out"""
 | 
				
			||||||
 | 
					        if not self.supports_timeouts:
 | 
				
			||||||
 | 
					            raise nose.SkipTest(_("RPC backend does not support timeouts"))
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        value = 42
 | 
				
			||||||
 | 
					        self.assertRaises(Timeout,
 | 
				
			||||||
 | 
					                          self.rpc.call,
 | 
				
			||||||
 | 
					                          self.context,
 | 
				
			||||||
 | 
					                          'test',
 | 
				
			||||||
 | 
					                          {"method": "block",
 | 
				
			||||||
 | 
					                           "args": {"value": value}}, timeout=1)
 | 
				
			||||||
 | 
					        try:
 | 
				
			||||||
 | 
					            self.rpc.call(self.context,
 | 
				
			||||||
 | 
					                     'test',
 | 
				
			||||||
 | 
					                     {"method": "block",
 | 
				
			||||||
 | 
					                      "args": {"value": value}},
 | 
				
			||||||
 | 
					                     timeout=1)
 | 
				
			||||||
 | 
					            self.fail("should have thrown Timeout")
 | 
				
			||||||
 | 
					        except Timeout as exc:
 | 
				
			||||||
 | 
					            pass
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
class TestReceiver(object):
 | 
					class TestReceiver(object):
 | 
				
			||||||
    """Simple Proxy class so the consumer has methods to call.
 | 
					    """Simple Proxy class so the consumer has methods to call.
 | 
				
			||||||
@@ -205,3 +232,7 @@ class TestReceiver(object):
 | 
				
			|||||||
    def fail(context, value):
 | 
					    def fail(context, value):
 | 
				
			||||||
        """Raises an exception with the value sent in."""
 | 
					        """Raises an exception with the value sent in."""
 | 
				
			||||||
        raise Exception(value)
 | 
					        raise Exception(value)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    @staticmethod
 | 
				
			||||||
 | 
					    def block(context, value):
 | 
				
			||||||
 | 
					        time.sleep(2)
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -30,7 +30,7 @@ LOG = logging.getLogger('nova.tests.rpc')
 | 
				
			|||||||
class RpcCarrotTestCase(common._BaseRpcTestCase):
 | 
					class RpcCarrotTestCase(common._BaseRpcTestCase):
 | 
				
			||||||
    def setUp(self):
 | 
					    def setUp(self):
 | 
				
			||||||
        self.rpc = impl_carrot
 | 
					        self.rpc = impl_carrot
 | 
				
			||||||
        super(RpcCarrotTestCase, self).setUp()
 | 
					        super(RpcCarrotTestCase, self).setUp(supports_timeouts=False)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    def tearDown(self):
 | 
					    def tearDown(self):
 | 
				
			||||||
        super(RpcCarrotTestCase, self).tearDown()
 | 
					        super(RpcCarrotTestCase, self).tearDown()
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -221,21 +221,25 @@ class RpcQpidTestCase(test.TestCase):
 | 
				
			|||||||
        self.mock_session.sender(send_addr).AndReturn(self.mock_sender)
 | 
					        self.mock_session.sender(send_addr).AndReturn(self.mock_sender)
 | 
				
			||||||
        self.mock_sender.send(mox.IgnoreArg())
 | 
					        self.mock_sender.send(mox.IgnoreArg())
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        self.mock_session.next_receiver().AndReturn(self.mock_receiver)
 | 
					        self.mock_session.next_receiver(timeout=mox.IsA(int)).AndReturn(
 | 
				
			||||||
 | 
					                                                        self.mock_receiver)
 | 
				
			||||||
        self.mock_receiver.fetch().AndReturn(qpid.messaging.Message(
 | 
					        self.mock_receiver.fetch().AndReturn(qpid.messaging.Message(
 | 
				
			||||||
                        {"result": "foo", "failure": False, "ending": False}))
 | 
					                        {"result": "foo", "failure": False, "ending": False}))
 | 
				
			||||||
        if multi:
 | 
					        if multi:
 | 
				
			||||||
            self.mock_session.next_receiver().AndReturn(self.mock_receiver)
 | 
					            self.mock_session.next_receiver(timeout=mox.IsA(int)).AndReturn(
 | 
				
			||||||
 | 
					                                                        self.mock_receiver)
 | 
				
			||||||
            self.mock_receiver.fetch().AndReturn(
 | 
					            self.mock_receiver.fetch().AndReturn(
 | 
				
			||||||
                            qpid.messaging.Message(
 | 
					                            qpid.messaging.Message(
 | 
				
			||||||
                                {"result": "bar", "failure": False,
 | 
					                                {"result": "bar", "failure": False,
 | 
				
			||||||
                                 "ending": False}))
 | 
					                                 "ending": False}))
 | 
				
			||||||
            self.mock_session.next_receiver().AndReturn(self.mock_receiver)
 | 
					            self.mock_session.next_receiver(timeout=mox.IsA(int)).AndReturn(
 | 
				
			||||||
 | 
					                                                        self.mock_receiver)
 | 
				
			||||||
            self.mock_receiver.fetch().AndReturn(
 | 
					            self.mock_receiver.fetch().AndReturn(
 | 
				
			||||||
                            qpid.messaging.Message(
 | 
					                            qpid.messaging.Message(
 | 
				
			||||||
                                {"result": "baz", "failure": False,
 | 
					                                {"result": "baz", "failure": False,
 | 
				
			||||||
                                 "ending": False}))
 | 
					                                 "ending": False}))
 | 
				
			||||||
        self.mock_session.next_receiver().AndReturn(self.mock_receiver)
 | 
					        self.mock_session.next_receiver(timeout=mox.IsA(int)).AndReturn(
 | 
				
			||||||
 | 
					                                                        self.mock_receiver)
 | 
				
			||||||
        self.mock_receiver.fetch().AndReturn(qpid.messaging.Message(
 | 
					        self.mock_receiver.fetch().AndReturn(qpid.messaging.Message(
 | 
				
			||||||
                        {"failure": False, "ending": True}))
 | 
					                        {"failure": False, "ending": True}))
 | 
				
			||||||
        self.mock_session.close()
 | 
					        self.mock_session.close()
 | 
				
			||||||
 
 | 
				
			|||||||
		Reference in New Issue
	
	Block a user