changes per review
This commit is contained in:
17
nova/rpc.py
17
nova/rpc.py
@@ -28,6 +28,7 @@ import json
|
||||
import sys
|
||||
import time
|
||||
import traceback
|
||||
import types
|
||||
import uuid
|
||||
|
||||
from carrot import connection as carrot_connection
|
||||
@@ -228,7 +229,7 @@ class AdapterConsumer(Consumer):
|
||||
rval = node_func(context=ctxt, **node_args)
|
||||
if msg_id:
|
||||
# Check if the result was a generator
|
||||
if hasattr(rval, 'send'):
|
||||
if isinstance(rval, types.GeneratorType):
|
||||
for x in rval:
|
||||
msg_reply(msg_id, x, None)
|
||||
else:
|
||||
@@ -236,7 +237,7 @@ class AdapterConsumer(Consumer):
|
||||
|
||||
# This final None tells multicall that it is done.
|
||||
msg_reply(msg_id, None, None)
|
||||
elif hasattr(rval, 'send'):
|
||||
elif isinstance(rval, types.GeneratorType):
|
||||
# NOTE(vish): this iterates through the generator
|
||||
list(rval)
|
||||
except Exception as e:
|
||||
@@ -281,11 +282,11 @@ class FanoutAdapterConsumer(AdapterConsumer):
|
||||
class ConsumerSet(object):
|
||||
"""Groups consumers to listen on together on a single connection."""
|
||||
|
||||
def __init__(self, conn, consumer_list):
|
||||
def __init__(self, connection, consumer_list):
|
||||
self.consumer_list = set(consumer_list)
|
||||
self.consumer_set = None
|
||||
self.enabled = True
|
||||
self.init(conn)
|
||||
self.init(connection)
|
||||
|
||||
def init(self, conn):
|
||||
if not conn:
|
||||
@@ -316,8 +317,7 @@ class ConsumerSet(object):
|
||||
running = False
|
||||
break
|
||||
except Exception as e:
|
||||
LOG.error(_("Received exception %s " % type(e) + \
|
||||
"while processing consumer"))
|
||||
LOG.exception(_("Exception while processing consumer"))
|
||||
self.reconnect()
|
||||
# Break to outer loop
|
||||
break
|
||||
@@ -534,7 +534,10 @@ def call(context, topic, msg):
|
||||
"""Sends a message on a topic and wait for a response."""
|
||||
rv = multicall(context, topic, msg)
|
||||
# NOTE(vish): return the last result from the multicall
|
||||
return list(rv)[-1]
|
||||
rv = list(rv)
|
||||
if not rv:
|
||||
return
|
||||
return rv[-1]
|
||||
|
||||
|
||||
def cast(context, topic, msg):
|
||||
|
@@ -66,12 +66,10 @@ class RpcTestCase(test.TestCase):
|
||||
'test',
|
||||
{"method": "echo",
|
||||
"args": {"value": value}})
|
||||
i = 0
|
||||
for x in result:
|
||||
for i, x in enumerate(result):
|
||||
if i > 0:
|
||||
self.fail('should only receive one response')
|
||||
self.assertEqual(value + i, x)
|
||||
i += 1
|
||||
|
||||
def test_multicall_succeed_three_times(self):
|
||||
value = 42
|
||||
@@ -79,10 +77,8 @@ class RpcTestCase(test.TestCase):
|
||||
'test',
|
||||
{"method": "echo_three_times",
|
||||
"args": {"value": value}})
|
||||
i = 0
|
||||
for x in result:
|
||||
for i, x in enumerate(result):
|
||||
self.assertEqual(value + i, x)
|
||||
i += 1
|
||||
|
||||
def test_multicall_succeed_three_times_yield(self):
|
||||
value = 42
|
||||
@@ -90,10 +86,8 @@ class RpcTestCase(test.TestCase):
|
||||
'test',
|
||||
{"method": "echo_three_times_yield",
|
||||
"args": {"value": value}})
|
||||
i = 0
|
||||
for x in result:
|
||||
for i, x in enumerate(result):
|
||||
self.assertEqual(value + i, x)
|
||||
i += 1
|
||||
|
||||
def test_context_passed(self):
|
||||
"""Makes sure a context is passed through rpc call."""
|
||||
|
Reference in New Issue
Block a user