diff --git a/nova/rpc.py b/nova/rpc.py index 493978e57e0a..1ec495bc890e 100644 --- a/nova/rpc.py +++ b/nova/rpc.py @@ -28,6 +28,7 @@ import json import sys import time import traceback +import types import uuid from carrot import connection as carrot_connection @@ -228,7 +229,7 @@ class AdapterConsumer(Consumer): rval = node_func(context=ctxt, **node_args) if msg_id: # Check if the result was a generator - if hasattr(rval, 'send'): + if isinstance(rval, types.GeneratorType): for x in rval: msg_reply(msg_id, x, None) else: @@ -236,7 +237,7 @@ class AdapterConsumer(Consumer): # This final None tells multicall that it is done. msg_reply(msg_id, None, None) - elif hasattr(rval, 'send'): + elif isinstance(rval, types.GeneratorType): # NOTE(vish): this iterates through the generator list(rval) except Exception as e: @@ -281,11 +282,11 @@ class FanoutAdapterConsumer(AdapterConsumer): class ConsumerSet(object): """Groups consumers to listen on together on a single connection.""" - def __init__(self, conn, consumer_list): + def __init__(self, connection, consumer_list): self.consumer_list = set(consumer_list) self.consumer_set = None self.enabled = True - self.init(conn) + self.init(connection) def init(self, conn): if not conn: @@ -316,8 +317,7 @@ class ConsumerSet(object): running = False break except Exception as e: - LOG.error(_("Received exception %s " % type(e) + \ - "while processing consumer")) + LOG.exception(_("Exception while processing consumer")) self.reconnect() # Break to outer loop break @@ -534,7 +534,10 @@ def call(context, topic, msg): """Sends a message on a topic and wait for a response.""" rv = multicall(context, topic, msg) # NOTE(vish): return the last result from the multicall - return list(rv)[-1] + rv = list(rv) + if not rv: + return + return rv[-1] def cast(context, topic, msg): diff --git a/nova/service.py b/nova/service.py index 7821833227c0..74f9f04d85bf 100644 --- a/nova/service.py +++ b/nova/service.py @@ -105,19 +105,18 @@ class Service(object): connection=self.conn, topic=self.topic, proxy=self) - - cset = rpc.ConsumerSet(self.conn, [consumer_all, - consumer_node, - fanout]) + consumer_set = rpc.ConsumerSet( + connection=self.conn, + consumer_list=[consumer_all, consumer_node, fanout]) # Wait forever, processing these consumers def _wait(): try: - cset.wait() + consumer_set.wait() finally: - cset.close() + consumer_set.close() - self.csetthread = greenthread.spawn(_wait) + self.consumer_set_thread = greenthread.spawn(_wait) if self.report_interval: pulse = utils.LoopingCall(self.report_state) @@ -182,9 +181,9 @@ class Service(object): logging.warn(_('Service killed that has no database entry')) def stop(self): - self.csetthread.kill() + self.consumer_set_thread.kill() try: - self.csetthread.wait() + self.consumer_set_thread.wait() except greenlet.GreenletExit: pass for x in self.timers: diff --git a/nova/tests/test_rpc.py b/nova/tests/test_rpc.py index 35f4a64d97e9..ffd748efe540 100644 --- a/nova/tests/test_rpc.py +++ b/nova/tests/test_rpc.py @@ -66,12 +66,10 @@ class RpcTestCase(test.TestCase): 'test', {"method": "echo", "args": {"value": value}}) - i = 0 - for x in result: + for i, x in enumerate(result): if i > 0: self.fail('should only receive one response') self.assertEqual(value + i, x) - i += 1 def test_multicall_succeed_three_times(self): value = 42 @@ -79,10 +77,8 @@ class RpcTestCase(test.TestCase): 'test', {"method": "echo_three_times", "args": {"value": value}}) - i = 0 - for x in result: + for i, x in enumerate(result): self.assertEqual(value + i, x) - i += 1 def test_multicall_succeed_three_times_yield(self): value = 42 @@ -90,10 +86,8 @@ class RpcTestCase(test.TestCase): 'test', {"method": "echo_three_times_yield", "args": {"value": value}}) - i = 0 - for x in result: + for i, x in enumerate(result): self.assertEqual(value + i, x) - i += 1 def test_context_passed(self): """Makes sure a context is passed through rpc call.""" diff --git a/nova/tests/test_service.py b/nova/tests/test_service.py index 0bba01d921ef..d1cc8bd61a1a 100644 --- a/nova/tests/test_service.py +++ b/nova/tests/test_service.py @@ -142,7 +142,8 @@ class ServiceTestCase(test.TestCase): mock_cset = self.mox.CreateMock(rpc.ConsumerSet, {'wait': wait_func}) - rpc.ConsumerSet(mox.IgnoreArg(), mox.IsA(list)).AndReturn(mock_cset) + rpc.ConsumerSet(connection=mox.IgnoreArg(), + consumer_list=mox.IsA(list)).AndReturn(mock_cset) wait_func(mox.IgnoreArg()) service_create = {'host': host, @@ -331,7 +332,8 @@ class ServiceTestCase(test.TestCase): mock_cset = self.mox.CreateMock(rpc.ConsumerSet, {'wait': wait_func}) - rpc.ConsumerSet(mox.IgnoreArg(), mox.IsA(list)).AndReturn(mock_cset) + rpc.ConsumerSet(connection=mox.IgnoreArg(), + consumer_list=mox.IsA(list)).AndReturn(mock_cset) wait_func(mox.IgnoreArg()) self.mox.StubOutWithMock(serv.manager.driver,