changes per review

This commit is contained in:
termie 2011-05-26 15:08:53 -07:00
parent feb04f0117
commit d7e0b45a9b
4 changed files with 25 additions and 27 deletions

View File

@ -28,6 +28,7 @@ import json
import sys import sys
import time import time
import traceback import traceback
import types
import uuid import uuid
from carrot import connection as carrot_connection from carrot import connection as carrot_connection
@ -228,7 +229,7 @@ class AdapterConsumer(Consumer):
rval = node_func(context=ctxt, **node_args) rval = node_func(context=ctxt, **node_args)
if msg_id: if msg_id:
# Check if the result was a generator # Check if the result was a generator
if hasattr(rval, 'send'): if isinstance(rval, types.GeneratorType):
for x in rval: for x in rval:
msg_reply(msg_id, x, None) msg_reply(msg_id, x, None)
else: else:
@ -236,7 +237,7 @@ class AdapterConsumer(Consumer):
# This final None tells multicall that it is done. # This final None tells multicall that it is done.
msg_reply(msg_id, None, None) msg_reply(msg_id, None, None)
elif hasattr(rval, 'send'): elif isinstance(rval, types.GeneratorType):
# NOTE(vish): this iterates through the generator # NOTE(vish): this iterates through the generator
list(rval) list(rval)
except Exception as e: except Exception as e:
@ -281,11 +282,11 @@ class FanoutAdapterConsumer(AdapterConsumer):
class ConsumerSet(object): class ConsumerSet(object):
"""Groups consumers to listen on together on a single connection.""" """Groups consumers to listen on together on a single connection."""
def __init__(self, conn, consumer_list): def __init__(self, connection, consumer_list):
self.consumer_list = set(consumer_list) self.consumer_list = set(consumer_list)
self.consumer_set = None self.consumer_set = None
self.enabled = True self.enabled = True
self.init(conn) self.init(connection)
def init(self, conn): def init(self, conn):
if not conn: if not conn:
@ -316,8 +317,7 @@ class ConsumerSet(object):
running = False running = False
break break
except Exception as e: except Exception as e:
LOG.error(_("Received exception %s " % type(e) + \ LOG.exception(_("Exception while processing consumer"))
"while processing consumer"))
self.reconnect() self.reconnect()
# Break to outer loop # Break to outer loop
break break
@ -534,7 +534,10 @@ def call(context, topic, msg):
"""Sends a message on a topic and wait for a response.""" """Sends a message on a topic and wait for a response."""
rv = multicall(context, topic, msg) rv = multicall(context, topic, msg)
# NOTE(vish): return the last result from the multicall # NOTE(vish): return the last result from the multicall
return list(rv)[-1] rv = list(rv)
if not rv:
return
return rv[-1]
def cast(context, topic, msg): def cast(context, topic, msg):

View File

@ -105,19 +105,18 @@ class Service(object):
connection=self.conn, connection=self.conn,
topic=self.topic, topic=self.topic,
proxy=self) proxy=self)
consumer_set = rpc.ConsumerSet(
cset = rpc.ConsumerSet(self.conn, [consumer_all, connection=self.conn,
consumer_node, consumer_list=[consumer_all, consumer_node, fanout])
fanout])
# Wait forever, processing these consumers # Wait forever, processing these consumers
def _wait(): def _wait():
try: try:
cset.wait() consumer_set.wait()
finally: finally:
cset.close() consumer_set.close()
self.csetthread = greenthread.spawn(_wait) self.consumer_set_thread = greenthread.spawn(_wait)
if self.report_interval: if self.report_interval:
pulse = utils.LoopingCall(self.report_state) pulse = utils.LoopingCall(self.report_state)
@ -182,9 +181,9 @@ class Service(object):
logging.warn(_('Service killed that has no database entry')) logging.warn(_('Service killed that has no database entry'))
def stop(self): def stop(self):
self.csetthread.kill() self.consumer_set_thread.kill()
try: try:
self.csetthread.wait() self.consumer_set_thread.wait()
except greenlet.GreenletExit: except greenlet.GreenletExit:
pass pass
for x in self.timers: for x in self.timers:

View File

@ -66,12 +66,10 @@ class RpcTestCase(test.TestCase):
'test', 'test',
{"method": "echo", {"method": "echo",
"args": {"value": value}}) "args": {"value": value}})
i = 0 for i, x in enumerate(result):
for x in result:
if i > 0: if i > 0:
self.fail('should only receive one response') self.fail('should only receive one response')
self.assertEqual(value + i, x) self.assertEqual(value + i, x)
i += 1
def test_multicall_succeed_three_times(self): def test_multicall_succeed_three_times(self):
value = 42 value = 42
@ -79,10 +77,8 @@ class RpcTestCase(test.TestCase):
'test', 'test',
{"method": "echo_three_times", {"method": "echo_three_times",
"args": {"value": value}}) "args": {"value": value}})
i = 0 for i, x in enumerate(result):
for x in result:
self.assertEqual(value + i, x) self.assertEqual(value + i, x)
i += 1
def test_multicall_succeed_three_times_yield(self): def test_multicall_succeed_three_times_yield(self):
value = 42 value = 42
@ -90,10 +86,8 @@ class RpcTestCase(test.TestCase):
'test', 'test',
{"method": "echo_three_times_yield", {"method": "echo_three_times_yield",
"args": {"value": value}}) "args": {"value": value}})
i = 0 for i, x in enumerate(result):
for x in result:
self.assertEqual(value + i, x) self.assertEqual(value + i, x)
i += 1
def test_context_passed(self): def test_context_passed(self):
"""Makes sure a context is passed through rpc call.""" """Makes sure a context is passed through rpc call."""

View File

@ -142,7 +142,8 @@ class ServiceTestCase(test.TestCase):
mock_cset = self.mox.CreateMock(rpc.ConsumerSet, mock_cset = self.mox.CreateMock(rpc.ConsumerSet,
{'wait': wait_func}) {'wait': wait_func})
rpc.ConsumerSet(mox.IgnoreArg(), mox.IsA(list)).AndReturn(mock_cset) rpc.ConsumerSet(connection=mox.IgnoreArg(),
consumer_list=mox.IsA(list)).AndReturn(mock_cset)
wait_func(mox.IgnoreArg()) wait_func(mox.IgnoreArg())
service_create = {'host': host, service_create = {'host': host,
@ -331,7 +332,8 @@ class ServiceTestCase(test.TestCase):
mock_cset = self.mox.CreateMock(rpc.ConsumerSet, mock_cset = self.mox.CreateMock(rpc.ConsumerSet,
{'wait': wait_func}) {'wait': wait_func})
rpc.ConsumerSet(mox.IgnoreArg(), mox.IsA(list)).AndReturn(mock_cset) rpc.ConsumerSet(connection=mox.IgnoreArg(),
consumer_list=mox.IsA(list)).AndReturn(mock_cset)
wait_func(mox.IgnoreArg()) wait_func(mox.IgnoreArg())
self.mox.StubOutWithMock(serv.manager.driver, self.mox.StubOutWithMock(serv.manager.driver,