From ebeaa9e4dd3d2140bb7cd07bba455b126c508a97 Mon Sep 17 00:00:00 2001 From: Chris Behrens Date: Mon, 23 May 2011 14:38:37 -0500 Subject: [PATCH 01/25] initial fudging in of swap disk --- nova/tests/xenapi/stubs.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/nova/tests/xenapi/stubs.py b/nova/tests/xenapi/stubs.py index 4833ccb0..d9306900 100644 --- a/nova/tests/xenapi/stubs.py +++ b/nova/tests/xenapi/stubs.py @@ -37,7 +37,7 @@ def stubout_instance_snapshot(stubs): sr_ref=sr_ref, sharable=False) vdi_rec = session.get_xenapi().VDI.get_record(vdi_ref) vdi_uuid = vdi_rec['uuid'] - return vdi_uuid + return dict(primary_vdi_uuid=vdi_uuid, swap_vdi_uuid=None) stubs.Set(vm_utils.VMHelper, 'fetch_image', fake_fetch_image) From fc60fe8cbe2eaf1c0a8afb16e87980eaa2370929 Mon Sep 17 00:00:00 2001 From: Chris Behrens Date: Mon, 23 May 2011 16:51:28 -0500 Subject: [PATCH 02/25] fix tests, have glance plugin return json encoded string of vdi uuids --- nova/tests/xenapi/stubs.py | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/nova/tests/xenapi/stubs.py b/nova/tests/xenapi/stubs.py index d9306900..9f6f6431 100644 --- a/nova/tests/xenapi/stubs.py +++ b/nova/tests/xenapi/stubs.py @@ -17,6 +17,7 @@ """Stubouts, mocks and fixtures for the test suite""" import eventlet +import json from nova.virt import xenapi_conn from nova.virt.xenapi import fake from nova.virt.xenapi import volume_utils @@ -37,7 +38,7 @@ def stubout_instance_snapshot(stubs): sr_ref=sr_ref, sharable=False) vdi_rec = session.get_xenapi().VDI.get_record(vdi_ref) vdi_uuid = vdi_rec['uuid'] - return dict(primary_vdi_uuid=vdi_uuid, swap_vdi_uuid=None) + return {'primary_vdi_uuid': vdi_uuid} stubs.Set(vm_utils.VMHelper, 'fetch_image', fake_fetch_image) @@ -132,10 +133,16 @@ class FakeSessionForVMTests(fake.SessionBase): def __init__(self, uri): super(FakeSessionForVMTests, self).__init__(uri) - def host_call_plugin(self, _1, _2, _3, _4, _5): + def host_call_plugin(self, _1, _2, plugin, method, _5): sr_ref = fake.get_all('SR')[0] vdi_ref = fake.create_vdi('', False, sr_ref, False) vdi_rec = fake.get_record('VDI', vdi_ref) + if plugin == "glance" and method == "download_vhd": + swap_vdi_ref = fake.create_vdi('', False, sr_ref, False) + swap_vdi_rec = fake.get_record('VDI', swap_vdi_ref) + return '%s' % json.dumps( + {'primary_vdi_uuid': vdi_rec['uuid'], + 'swap_vdi_uuid': swap_vdi_rec['uuid']}) return '%s' % vdi_rec['uuid'] def VM_start(self, _1, ref, _2, _3): From ebf75e3798ad6aa1575df311a89afb3dd55bd7ac Mon Sep 17 00:00:00 2001 From: termie Date: Wed, 25 May 2011 15:42:24 -0700 Subject: [PATCH 03/25] add support to rpc for multicall --- nova/rpc.py | 99 +++++++++++++++++++++++++++++++----------- nova/tests/test_rpc.py | 17 ++++++++ 2 files changed, 90 insertions(+), 26 deletions(-) diff --git a/nova/rpc.py b/nova/rpc.py index 2116f22c..04198a4a 100644 --- a/nova/rpc.py +++ b/nova/rpc.py @@ -32,8 +32,11 @@ import uuid from carrot import connection as carrot_connection from carrot import messaging +import eventlet from eventlet import greenpool from eventlet import greenthread +from eventlet import queue + from nova import context from nova import exception @@ -131,7 +134,8 @@ class Consumer(messaging.Consumer): self.connection = Connection.recreate() self.backend = self.connection.create_backend() self.declare() - super(Consumer, self).fetch(no_ack, auto_ack, enable_callbacks) + return super(Consumer, self).fetch( + no_ack, auto_ack, enable_callbacks) if self.failed_connection: LOG.error(_('Reconnected to queue')) self.failed_connection = False @@ -347,8 +351,9 @@ def _unpack_context(msg): if key.startswith('_context_'): value = msg.pop(key) context_dict[key[9:]] = value + context_dict['msg_id'] = msg.pop('_msg_id', None) LOG.debug(_('unpacked context: %s'), context_dict) - return context.RequestContext.from_dict(context_dict) + return RpcContext.from_dict(context_dict) def _pack_context(msg, context): @@ -365,26 +370,27 @@ def _pack_context(msg, context): msg.update(context) -def call(context, topic, msg): - """Sends a message on a topic and wait for a response.""" +class RpcContext(context.RequestContext): + def __init__(self, *args, **kwargs): + msg_id = kwargs.pop('msg_id', None) + self.msg_id = msg_id + super(RpcContext, self).__init__(*args, **kwargs) + + def reply(self, *args, **kwargs): + msg_reply(self.msg_id, *args, **kwargs) + + +def multicall(context, topic, msg): + """Make a call that returns multiple times.""" LOG.debug(_('Making asynchronous call on %s ...'), topic) msg_id = uuid.uuid4().hex msg.update({'_msg_id': msg_id}) LOG.debug(_('MSG_ID is %s') % (msg_id)) _pack_context(msg, context) - class WaitMessage(object): - def __call__(self, data, message): - """Acks message and sets result.""" - message.ack() - if data['failure']: - self.result = RemoteError(*data['failure']) - else: - self.result = data['result'] - - wait_msg = WaitMessage() conn = Connection.instance() consumer = DirectConsumer(connection=conn, msg_id=msg_id) + wait_msg = MulticallWaiter(consumer) consumer.register_callback(wait_msg) conn = Connection.instance() @@ -392,18 +398,59 @@ def call(context, topic, msg): publisher.send(msg) publisher.close() - try: - consumer.wait(limit=1) - except StopIteration: - pass - consumer.close() - # NOTE(termie): this is a little bit of a change from the original - # non-eventlet code where returning a Failure - # instance from a deferred call is very similar to - # raising an exception - if isinstance(wait_msg.result, Exception): - raise wait_msg.result - return wait_msg.result + return wait_msg + + +class MulticallWaiter(object): + def __init__(self, consumer): + self._consumer = consumer + self._results = queue.Queue() + self._closed = False + + def close(self): + self._closed = True + self._consumer.close() + + def __call__(self, data, message): + """Acks message and sets result.""" + message.ack() + if data['failure']: + self._results.put(RemoteError(*data['failure'])) + else: + self._results.put(data['result']) + + def __iter__(self): + return self.wait() + + def wait(self): + # TODO(termie): This is probably really a much simpler issue but am + # trying to solve the problem quickly. This works but + # I'd prefer to dig in and do it the best way later on. + + def _waiter(): + while not self._closed: + try: + self._consumer.wait(limit=1) + except StopIteration: + pass + eventlet.spawn(_waiter) + + while True: + result = self._results.get() + if isinstance(result, Exception): + raise result + if result == None: + self.close() + raise StopIteration + yield result + + +def call(context, topic, msg): + """Sends a message on a topic and wait for a response.""" + rv = multicall(context, topic, msg) + for x in rv: + rv.close() + return x def cast(context, topic, msg): diff --git a/nova/tests/test_rpc.py b/nova/tests/test_rpc.py index 44d7c91e..92ddfcff 100644 --- a/nova/tests/test_rpc.py +++ b/nova/tests/test_rpc.py @@ -49,6 +49,17 @@ class RpcTestCase(test.TestCase): "args": {"value": value}}) self.assertEqual(value, result) + def test_multicall_succeed_three_times(self): + """Get a value through rpc call""" + value = 42 + result = rpc.multicall(self.context, + 'test', + {"method": "echo_three_times", + "args": {"value": value}}) + + for x in result: + self.assertEqual(value, x) + def test_context_passed(self): """Makes sure a context is passed through rpc call""" value = 42 @@ -126,6 +137,12 @@ class TestReceiver(object): LOG.debug(_("Received %s"), context) return context.to_dict() + @staticmethod + def echo_three_times(context, value): + context.reply(value) + context.reply(value) + context.reply(value) + @staticmethod def fail(context, value): """Raises an exception with the value sent in""" From 7eb27bdc028354e5f72f3391fbe1b062aac6ceb9 Mon Sep 17 00:00:00 2001 From: termie Date: Wed, 25 May 2011 15:42:24 -0700 Subject: [PATCH 04/25] make the test more expicit --- nova/tests/test_rpc.py | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/nova/tests/test_rpc.py b/nova/tests/test_rpc.py index 92ddfcff..acab3e75 100644 --- a/nova/tests/test_rpc.py +++ b/nova/tests/test_rpc.py @@ -56,9 +56,10 @@ class RpcTestCase(test.TestCase): 'test', {"method": "echo_three_times", "args": {"value": value}}) - + i = 0 for x in result: - self.assertEqual(value, x) + self.assertEqual(value + i, x) + i += 1 def test_context_passed(self): """Makes sure a context is passed through rpc call""" @@ -140,8 +141,8 @@ class TestReceiver(object): @staticmethod def echo_three_times(context, value): context.reply(value) - context.reply(value) - context.reply(value) + context.reply(value + 1) + context.reply(value + 2) @staticmethod def fail(context, value): From 0bb2db220529030b14df5d8c93fa440cb94c4286 Mon Sep 17 00:00:00 2001 From: termie Date: Wed, 25 May 2011 15:42:24 -0700 Subject: [PATCH 05/25] add commented out unworking code for yield-based returns --- nova/rpc.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/nova/rpc.py b/nova/rpc.py index 04198a4a..f43291c4 100644 --- a/nova/rpc.py +++ b/nova/rpc.py @@ -201,6 +201,11 @@ class AdapterConsumer(Consumer): try: rval = node_func(context=ctxt, **node_args) if msg_id: + # TODO(termie): re-enable when fix the yielding issue + #if hasattr(rval, 'send'): + # logging.error('rval! %s', rval) + # for x in rval: + # msg_reply(msg_id, x, None) msg_reply(msg_id, rval, None) except Exception as e: logging.exception('Exception during message handling') From f9e1125b4fafbe8a35646c5d32c8c87688ff0271 Mon Sep 17 00:00:00 2001 From: Chris Behrens Date: Wed, 25 May 2011 15:42:24 -0700 Subject: [PATCH 06/25] Add a connection pool for rpc cast/call Use the same rabbit connection for all topic listening and wait to be notified vs doing a 0.1 second poll for each. --- nova/rpc.py | 96 +++++++++++++++++++++++++++++++++++++++-------------- 1 file changed, 72 insertions(+), 24 deletions(-) diff --git a/nova/rpc.py b/nova/rpc.py index f43291c4..62590ca9 100644 --- a/nova/rpc.py +++ b/nova/rpc.py @@ -35,9 +35,9 @@ from carrot import messaging import eventlet from eventlet import greenpool from eventlet import greenthread +from eventlet import pools from eventlet import queue - from nova import context from nova import exception from nova import fakerabbit @@ -92,6 +92,11 @@ class Connection(carrot_connection.BrokerConnection): pass return cls.instance() +class Pool(pools.Pool): + def create(self): + return Connection.instance(new=True) + +ConnectionPool = Pool(max_size=20) class Consumer(messaging.Consumer): """Consumer base class. @@ -163,21 +168,9 @@ class AdapterConsumer(Consumer): self.pool = greenpool.GreenPool(FLAGS.rpc_thread_pool_size) super(AdapterConsumer, self).__init__(connection=connection, topic=topic) + self.register_callback(self.process_data) - def receive(self, *args, **kwargs): - self.pool.spawn_n(self._receive, *args, **kwargs) - - @exception.wrap_exception - def _receive(self, message_data, message): - """Magically looks for a method on the proxy object and calls it. - - Message data should be a dictionary with two keys: - method: string representing the method to call - args: dictionary of arg: value - - Example: {'method': 'echo', 'args': {'value': 42}} - - """ + def process_data(self, message_data, message): LOG.debug(_('received %s') % message_data) msg_id = message_data.pop('_msg_id', None) @@ -194,6 +187,19 @@ class AdapterConsumer(Consumer): LOG.warn(_('no method for message: %s') % message_data) msg_reply(msg_id, _('No method for message: %s') % message_data) return + self.pool.spawn_n(self._process_data, msg_id, ctxt, method, args) + + @exception.wrap_exception + def _process_data(self, msg_id, ctxt, method, args): + """Magically looks for a method on the proxy object and calls it. + + Message data should be a dictionary with two keys: + method: string representing the method to call + args: dictionary of arg: value + + Example: {'method': 'echo', 'args': {'value': 42}} + + """ node_func = getattr(self.proxy, str(method)) node_args = dict((str(k), v) for k, v in args.iteritems()) @@ -214,11 +220,6 @@ class AdapterConsumer(Consumer): return -class Publisher(messaging.Publisher): - """Publisher base class.""" - pass - - class TopicAdapterConsumer(AdapterConsumer): """Consumes messages on a specific topic.""" @@ -251,6 +252,50 @@ class FanoutAdapterConsumer(AdapterConsumer): topic=topic, proxy=proxy) +class ConsumerSet(object): + """Groups consumers to listen on together on a single connection""" + + def __init__(self, conn, consumer_list): + self.consumer_list = set(consumer_list) + self.consumer_set = None + self.init(conn) + + def init(self, conn): + if not conn: + conn = Connection.instance(new=True) + if self.consumer_set: + self.consumer_set.close() + self.consumer_set = messaging.ConsumerSet(conn) + for consumer in self.consumer_list: + consumer.connection = conn + # consumer.backend is set for us + self.consumer_set.add_consumer(consumer) + + def reconnect(self): + self.init(None) + + def wait(self, limit=None): + while True: + it = self.consumer_set.iterconsume(limit=limit) + while True: + try: + it.next() + except StopIteration: + return + except Exception as e: + LOG.error(_("Received exception %s " % str(e) + \ + "while processing consumer")) + fuck + self.reconnect() + # Break to outer loop + break + + +class Publisher(messaging.Publisher): + """Publisher base class.""" + pass + + class TopicPublisher(Publisher): """Publishes messages on a specific topic.""" @@ -315,7 +360,7 @@ def msg_reply(msg_id, reply=None, failure=None): LOG.error(_("Returning exception %s to caller"), message) LOG.error(tb) failure = (failure[0].__name__, str(failure[1]), tb) - conn = Connection.instance() + conn = ConnectionPool.get() publisher = DirectPublisher(connection=conn, msg_id=msg_id) try: publisher.send({'result': reply, 'failure': failure}) @@ -324,7 +369,9 @@ def msg_reply(msg_id, reply=None, failure=None): {'result': dict((k, repr(v)) for k, v in reply.__dict__.iteritems()), 'failure': failure}) + publisher.close() + ConnectionPool.put(conn) class RemoteError(exception.Error): @@ -393,12 +440,11 @@ def multicall(context, topic, msg): LOG.debug(_('MSG_ID is %s') % (msg_id)) _pack_context(msg, context) - conn = Connection.instance() + conn = ConnectionPool.get() consumer = DirectConsumer(connection=conn, msg_id=msg_id) wait_msg = MulticallWaiter(consumer) consumer.register_callback(wait_msg) - conn = Connection.instance() publisher = TopicPublisher(connection=conn, topic=topic) publisher.send(msg) publisher.close() @@ -462,10 +508,11 @@ def cast(context, topic, msg): """Sends a message on a topic without waiting for a response.""" LOG.debug(_('Making asynchronous cast on %s...'), topic) _pack_context(msg, context) - conn = Connection.instance() + conn = ConnectionPool.get() publisher = TopicPublisher(connection=conn, topic=topic) publisher.send(msg) publisher.close() + ConnectionPool.put(conn) def fanout_cast(context, topic, msg): @@ -511,6 +558,7 @@ def send_message(topic, message, wait=True): if wait: consumer.wait() + consumer.close() if __name__ == '__main__': From 58ac7dd16b6db9a33998bcfb0d379c248fa29fce Mon Sep 17 00:00:00 2001 From: Chris Behrens Date: Wed, 25 May 2011 15:42:24 -0700 Subject: [PATCH 07/25] pep8 and comment fixes --- nova/rpc.py | 25 ++++++++++++++++--------- 1 file changed, 16 insertions(+), 9 deletions(-) diff --git a/nova/rpc.py b/nova/rpc.py index 62590ca9..db5aec82 100644 --- a/nova/rpc.py +++ b/nova/rpc.py @@ -92,12 +92,16 @@ class Connection(carrot_connection.BrokerConnection): pass return cls.instance() + class Pool(pools.Pool): + """Class that implements a Pool of Connections""" + def create(self): return Connection.instance(new=True) ConnectionPool = Pool(max_size=20) + class Consumer(messaging.Consumer): """Consumer base class. @@ -171,6 +175,16 @@ class AdapterConsumer(Consumer): self.register_callback(self.process_data) def process_data(self, message_data, message): + """Consumer callback that parses the message for validity and + fires off a thread to call the proxy object method. + + Message data should be a dictionary with two keys: + method: string representing the method to call + args: dictionary of arg: value + + Example: {'method': 'echo', 'args': {'value': 42}} + + """ LOG.debug(_('received %s') % message_data) msg_id = message_data.pop('_msg_id', None) @@ -191,14 +205,8 @@ class AdapterConsumer(Consumer): @exception.wrap_exception def _process_data(self, msg_id, ctxt, method, args): - """Magically looks for a method on the proxy object and calls it. - - Message data should be a dictionary with two keys: - method: string representing the method to call - args: dictionary of arg: value - - Example: {'method': 'echo', 'args': {'value': 42}} - + """Thread that maigcally looks for a method on the proxy + object and calls it. """ node_func = getattr(self.proxy, str(method)) @@ -285,7 +293,6 @@ class ConsumerSet(object): except Exception as e: LOG.error(_("Received exception %s " % str(e) + \ "while processing consumer")) - fuck self.reconnect() # Break to outer loop break From 1b75e5eba049951c093ae5a25a59f0225f54f473 Mon Sep 17 00:00:00 2001 From: Chris Behrens Date: Wed, 25 May 2011 15:42:24 -0700 Subject: [PATCH 08/25] convert fanout_cast to ConnectionPool --- nova/rpc.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/nova/rpc.py b/nova/rpc.py index db5aec82..fdb22869 100644 --- a/nova/rpc.py +++ b/nova/rpc.py @@ -526,10 +526,11 @@ def fanout_cast(context, topic, msg): """Sends a message on a fanout exchange without waiting for a response.""" LOG.debug(_('Making asynchronous fanout cast...')) _pack_context(msg, context) - conn = Connection.instance() + conn = ConnectionPool.get() publisher = FanoutPublisher(topic, connection=conn) publisher.send(msg) publisher.close() + ConnectionPool.put(conn) def generic_response(message_data, message): From 2368bc4fe37ae2f708ebf2b9a69edb1777f7af3c Mon Sep 17 00:00:00 2001 From: Chris Behrens Date: Wed, 25 May 2011 15:42:24 -0700 Subject: [PATCH 09/25] fakerabbit's declare_consumer should support more than 1 consumer. also: make fakerabbit Backend.consume be an iterator like it should be.. --- nova/fakerabbit.py | 22 +++++++++++++++------- 1 file changed, 15 insertions(+), 7 deletions(-) diff --git a/nova/fakerabbit.py b/nova/fakerabbit.py index a7dee8ca..a29ba9d8 100644 --- a/nova/fakerabbit.py +++ b/nova/fakerabbit.py @@ -77,6 +77,10 @@ class Queue(object): class Backend(base.BaseBackend): + def __init__(self, connection, **kwargs): + super(Backend, self).__init__(connection, **kwargs) + self.consumers = [] + def queue_declare(self, queue, **kwargs): global QUEUES if queue not in QUEUES: @@ -97,16 +101,20 @@ class Backend(base.BaseBackend): EXCHANGES[exchange].bind(QUEUES[queue].push, routing_key) def declare_consumer(self, queue, callback, *args, **kwargs): - self.current_queue = queue - self.current_callback = callback + self.consumers.append((queue, callback)) def consume(self, limit=None): + num = 0 while True: - item = self.get(self.current_queue) - if item: - self.current_callback(item) - raise StopIteration() - greenthread.sleep(0) + for (queue, callback) in self.consumers: + item = self.get(queue) + if item: + callback(item) + num += 1 + yield + if limit and num == limit: + raise StopIteration() + greenthread.sleep(0.1) def get(self, queue, no_ack=False): global QUEUES From 1da5df1d8e6fac63106ca5f0eb839add3407d4fd Mon Sep 17 00:00:00 2001 From: Vishvananda Ishaya Date: Wed, 25 May 2011 15:42:24 -0700 Subject: [PATCH 10/25] fix consumers to actually be deleted and clean up cloud test --- nova/fakerabbit.py | 13 +++++++++---- nova/rpc.py | 13 ++++++++++--- nova/tests/test_cloud.py | 26 ++++++++++---------------- 3 files changed, 29 insertions(+), 23 deletions(-) diff --git a/nova/fakerabbit.py b/nova/fakerabbit.py index a29ba9d8..5f3e75c4 100644 --- a/nova/fakerabbit.py +++ b/nova/fakerabbit.py @@ -79,7 +79,7 @@ class Queue(object): class Backend(base.BaseBackend): def __init__(self, connection, **kwargs): super(Backend, self).__init__(connection, **kwargs) - self.consumers = [] + self.consumers = {} def queue_declare(self, queue, **kwargs): global QUEUES @@ -100,13 +100,18 @@ class Backend(base.BaseBackend): ' key %(routing_key)s') % locals()) EXCHANGES[exchange].bind(QUEUES[queue].push, routing_key) - def declare_consumer(self, queue, callback, *args, **kwargs): - self.consumers.append((queue, callback)) + def declare_consumer(self, queue, callback, consumer_tag, *args, **kwargs): + LOG.debug("Adding consumer %s", consumer_tag) + self.consumers[consumer_tag] = (queue, callback) + + def cancel(self, consumer_tag): + LOG.debug("Removing consumer %s", consumer_tag) + del self.consumers[consumer_tag] def consume(self, limit=None): num = 0 while True: - for (queue, callback) in self.consumers: + for (queue, callback) in self.consumers.itervalues(): item = self.get(queue) if item: callback(item) diff --git a/nova/rpc.py b/nova/rpc.py index fdb22869..e2e962fc 100644 --- a/nova/rpc.py +++ b/nova/rpc.py @@ -30,11 +30,11 @@ import time import traceback import uuid +import greenlet from carrot import connection as carrot_connection from carrot import messaging import eventlet from eventlet import greenpool -from eventlet import greenthread from eventlet import pools from eventlet import queue @@ -266,6 +266,7 @@ class ConsumerSet(object): def __init__(self, conn, consumer_list): self.consumer_list = set(consumer_list) self.consumer_set = None + self.enabled = True self.init(conn) def init(self, conn): @@ -283,15 +284,21 @@ class ConsumerSet(object): self.init(None) def wait(self, limit=None): - while True: + running = True + while running: it = self.consumer_set.iterconsume(limit=limit) + if not it: + break while True: try: it.next() except StopIteration: return + except greenlet.GreenletExit: + running = False + break except Exception as e: - LOG.error(_("Received exception %s " % str(e) + \ + LOG.error(_("Received exception %s " % type(e) + \ "while processing consumer")) self.reconnect() # Break to outer loop diff --git a/nova/tests/test_cloud.py b/nova/tests/test_cloud.py index 54c0454d..1e14c327 100644 --- a/nova/tests/test_cloud.py +++ b/nova/tests/test_cloud.py @@ -17,13 +17,8 @@ # under the License. from base64 import b64decode -import json from M2Crypto import BIO from M2Crypto import RSA -import os -import shutil -import tempfile -import time from eventlet import greenthread @@ -33,12 +28,10 @@ from nova import db from nova import flags from nova import log as logging from nova import rpc -from nova import service from nova import test from nova import utils from nova import exception from nova.auth import manager -from nova.compute import power_state from nova.api.ec2 import cloud from nova.api.ec2 import ec2utils from nova.image import local @@ -79,6 +72,15 @@ class CloudTestCase(test.TestCase): self.stubs.Set(local.LocalImageService, 'show', fake_show) self.stubs.Set(local.LocalImageService, 'show_by_name', fake_show) + # NOTE(vish): set up a manual wait so rpc.cast has a chance to finish + rpc_cast = rpc.cast + + def finish_cast(*args, **kwargs): + rpc_cast(*args, **kwargs) + greenthread.sleep(0.2) + + self.stubs.Set(rpc, 'cast', finish_cast) + def tearDown(self): network_ref = db.project_get_network(self.context, self.project.id) @@ -113,7 +115,6 @@ class CloudTestCase(test.TestCase): self.cloud.describe_addresses(self.context) self.cloud.release_address(self.context, public_ip=address) - greenthread.sleep(0.3) db.floating_ip_destroy(self.context, address) def test_associate_disassociate_address(self): @@ -129,12 +130,10 @@ class CloudTestCase(test.TestCase): self.cloud.associate_address(self.context, instance_id=ec2_id, public_ip=address) - greenthread.sleep(0.3) self.cloud.disassociate_address(self.context, public_ip=address) self.cloud.release_address(self.context, public_ip=address) - greenthread.sleep(0.3) self.network.deallocate_fixed_ip(self.context, fixed) db.instance_destroy(self.context, inst['id']) db.floating_ip_destroy(self.context, address) @@ -306,31 +305,26 @@ class CloudTestCase(test.TestCase): 'instance_type': instance_type, 'max_count': max_count} rv = self.cloud.run_instances(self.context, **kwargs) - greenthread.sleep(0.3) instance_id = rv['instancesSet'][0]['instanceId'] output = self.cloud.get_console_output(context=self.context, instance_id=[instance_id]) self.assertEquals(b64decode(output['output']), 'FAKE CONSOLE?OUTPUT') # TODO(soren): We need this until we can stop polling in the rpc code # for unit tests. - greenthread.sleep(0.3) rv = self.cloud.terminate_instances(self.context, [instance_id]) - greenthread.sleep(0.3) def test_ajax_console(self): + kwargs = {'image_id': 'ami-1'} rv = self.cloud.run_instances(self.context, **kwargs) instance_id = rv['instancesSet'][0]['instanceId'] - greenthread.sleep(0.3) output = self.cloud.get_ajax_console(context=self.context, instance_id=[instance_id]) self.assertEquals(output['url'], '%s/?token=FAKETOKEN' % FLAGS.ajax_console_proxy_url) # TODO(soren): We need this until we can stop polling in the rpc code # for unit tests. - greenthread.sleep(0.3) rv = self.cloud.terminate_instances(self.context, [instance_id]) - greenthread.sleep(0.3) def test_key_generation(self): result = self._create_key('test') From 26daa48eb527c9f69b488a4eebfb6fafac1f2e08 Mon Sep 17 00:00:00 2001 From: Chris Behrens Date: Wed, 25 May 2011 15:42:24 -0700 Subject: [PATCH 11/25] catch greenlet.GreenletExit when shutting service down --- nova/rpc.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/nova/rpc.py b/nova/rpc.py index e2e962fc..02052ecf 100644 --- a/nova/rpc.py +++ b/nova/rpc.py @@ -24,13 +24,13 @@ No fan-out support yet. """ +import greenlet import json import sys import time import traceback import uuid -import greenlet from carrot import connection as carrot_connection from carrot import messaging import eventlet From 0e22560ea2de3ea6cef9494ed2af8fc58eca8148 Mon Sep 17 00:00:00 2001 From: Chris Behrens Date: Wed, 25 May 2011 15:42:24 -0700 Subject: [PATCH 12/25] Add rpc_conn_pool_size flag for the new connection pool --- nova/rpc.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/nova/rpc.py b/nova/rpc.py index 02052ecf..82869fc4 100644 --- a/nova/rpc.py +++ b/nova/rpc.py @@ -50,7 +50,10 @@ LOG = logging.getLogger('nova.rpc') FLAGS = flags.FLAGS -flags.DEFINE_integer('rpc_thread_pool_size', 1024, 'Size of RPC thread pool') +flags.DEFINE_integer('rpc_thread_pool_size', 1024, + 'Size of RPC thread pool') +flags.DEFINE_integer('rpc_conn_pool_size', 30, + 'Size of RPC connection pool') class Connection(carrot_connection.BrokerConnection): @@ -99,7 +102,7 @@ class Pool(pools.Pool): def create(self): return Connection.instance(new=True) -ConnectionPool = Pool(max_size=20) +ConnectionPool = Pool(max_size=FLAGS.rpc_conn_pool_size) class Consumer(messaging.Consumer): From a3a605534df98c482ab5364ce6881ffec1e87705 Mon Sep 17 00:00:00 2001 From: Chris Behrens Date: Wed, 25 May 2011 15:42:24 -0700 Subject: [PATCH 13/25] connection pool tests and make the pool LIFO --- nova/rpc.py | 8 +++++++- nova/tests/test_rpc.py | 42 ++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 49 insertions(+), 1 deletion(-) diff --git a/nova/rpc.py b/nova/rpc.py index 82869fc4..3cc0dadd 100644 --- a/nova/rpc.py +++ b/nova/rpc.py @@ -99,10 +99,16 @@ class Connection(carrot_connection.BrokerConnection): class Pool(pools.Pool): """Class that implements a Pool of Connections""" + # TODO(comstud): Timeout connections not used in a while def create(self): return Connection.instance(new=True) -ConnectionPool = Pool(max_size=FLAGS.rpc_conn_pool_size) +# Create a ConnectionPool to use for RPC calls. We'll order the +# pool as a stack (LIFO), so that we can potentially loop through and +# timeout old unused connections at some point +ConnectionPool = Pool( + max_size=FLAGS.rpc_conn_pool_size, + order_as_stack=True) class Consumer(messaging.Consumer): diff --git a/nova/tests/test_rpc.py b/nova/tests/test_rpc.py index acab3e75..f6420959 100644 --- a/nova/tests/test_rpc.py +++ b/nova/tests/test_rpc.py @@ -120,6 +120,48 @@ class RpcTestCase(test.TestCase): "value": value}}) self.assertEqual(value, result) + def test_connectionpool_single(self): + """Test that ConnectionPool recycles a single connection""" + + conn1 = rpc.ConnectionPool.get() + rpc.ConnectionPool.put(conn1) + conn2 = rpc.ConnectionPool.get() + rpc.ConnectionPool.put(conn2) + self.assertEqual(conn1, conn2) + + def test_connectionpool_double(self): + """Test that ConnectionPool returns 2 separate connections + when called consecutively and the pool returns connections LIFO + """ + + conn1 = rpc.ConnectionPool.get() + conn2 = rpc.ConnectionPool.get() + + self.assertNotEqual(conn1, conn2) + rpc.ConnectionPool.put(conn1) + rpc.ConnectionPool.put(conn2) + + conn3 = rpc.ConnectionPool.get() + conn4 = rpc.ConnectionPool.get() + self.assertEqual(conn2, conn3) + self.assertEqual(conn1, conn4) + + def test_connectionpool_limit(self): + """Test connection pool limit and verify all connections + are unique + """ + + max_size = FLAGS.rpc_conn_pool_size + conns = [] + + for i in xrange(max_size): + conns.append(rpc.ConnectionPool.get()) + + self.assertFalse(rpc.ConnectionPool.free_items) + self.assertEqual(rpc.ConnectionPool.current_size, + rpc.ConnectionPool.max_size) + self.assertEqual(len(set(conns)), max_size) + class TestReceiver(object): """Simple Proxy class so the consumer has methods to call From 86d53fa64dba97cba05c2e60c1b38f6d8625180e Mon Sep 17 00:00:00 2001 From: termie Date: Wed, 25 May 2011 15:42:24 -0700 Subject: [PATCH 14/25] bring back commits lost in merge --- nova/rpc.py | 107 ++++++++++++++++++++++++----------------- nova/tests/test_rpc.py | 19 ++++++++ 2 files changed, 82 insertions(+), 44 deletions(-) diff --git a/nova/rpc.py b/nova/rpc.py index 3cc0dadd..d7d7bb01 100644 --- a/nova/rpc.py +++ b/nova/rpc.py @@ -35,6 +35,7 @@ from carrot import connection as carrot_connection from carrot import messaging import eventlet from eventlet import greenpool +from eventlet import greenthread from eventlet import pools from eventlet import queue @@ -140,30 +141,30 @@ class Consumer(messaging.Consumer): FLAGS.rabbit_max_retries) sys.exit(1) - def fetch(self, no_ack=None, auto_ack=None, enable_callbacks=False): - """Wraps the parent fetch with some logic for failed connection.""" - # TODO(vish): the logic for failed connections and logging should be - # refactored into some sort of connection manager object - try: - if self.failed_connection: - # NOTE(vish): connection is defined in the parent class, we can - # recreate it as long as we create the backend too - # pylint: disable=W0201 - self.connection = Connection.recreate() - self.backend = self.connection.create_backend() - self.declare() - return super(Consumer, self).fetch( - no_ack, auto_ack, enable_callbacks) - if self.failed_connection: - LOG.error(_('Reconnected to queue')) - self.failed_connection = False - # NOTE(vish): This is catching all errors because we really don't - # want exceptions to be logged 10 times a second if some - # persistent failure occurs. - except Exception, e: # pylint: disable=W0703 - if not self.failed_connection: - LOG.exception(_('Failed to fetch message from queue: %s' % e)) - self.failed_connection = True + #def fetch(self, no_ack=None, auto_ack=None, enable_callbacks=False): + # """Wraps the parent fetch with some logic for failed connection.""" + # # TODO(vish): the logic for failed connections and logging should be + # # refactored into some sort of connection manager object + # try: + # if self.failed_connection: + # # NOTE(vish): connection is defined in the parent class, we can + # # recreate it as long as we create the backend too + # # pylint: disable=W0201 + # self.connection = Connection.recreate() + # self.backend = self.connection.create_backend() + # self.declare() + # return super(Consumer, self).fetch( + # no_ack, auto_ack, enable_callbacks) + # if self.failed_connection: + # LOG.error(_('Reconnected to queue')) + # self.failed_connection = False + # # NOTE(vish): This is catching all errors because we really don't + # # want exceptions to be logged 10 times a second if some + # # persistent failure occurs. + # except Exception, e: # pylint: disable=W0703 + # if not self.failed_connection: + # LOG.exception(_('Failed to fetch message from queue: %s' % e)) + # self.failed_connection = True def attach_to_eventlet(self): """Only needed for unit tests!""" @@ -195,7 +196,7 @@ class AdapterConsumer(Consumer): """ LOG.debug(_('received %s') % message_data) - msg_id = message_data.pop('_msg_id', None) + msg_id = message_data.get('_msg_id', None) ctxt = _unpack_context(message_data) @@ -225,11 +226,14 @@ class AdapterConsumer(Consumer): rval = node_func(context=ctxt, **node_args) if msg_id: # TODO(termie): re-enable when fix the yielding issue - #if hasattr(rval, 'send'): - # logging.error('rval! %s', rval) - # for x in rval: - # msg_reply(msg_id, x, None) - msg_reply(msg_id, rval, None) + if hasattr(rval, 'send'): + logging.error('rval! %s', rval) + for x in rval: + msg_reply(msg_id, x, None) + msg_reply(msg_id, None, None) + else: + msg_reply(msg_id, rval, None) + #msg_reply(msg_id, rval, None) except Exception as e: logging.exception('Exception during message handling') if msg_id: @@ -355,7 +359,7 @@ class DirectConsumer(Consumer): self.routing_key = msg_id self.exchange = msg_id self.auto_delete = True - self.exclusive = True + self.exclusive = False super(DirectConsumer, self).__init__(connection=connection) @@ -387,7 +391,9 @@ def msg_reply(msg_id, reply=None, failure=None): publisher = DirectPublisher(connection=conn, msg_id=msg_id) try: publisher.send({'result': reply, 'failure': failure}) + LOG.error('MSG REPLY SUCCESS') except TypeError: + LOG.error('MSG REPLY FAILURE') publisher.send( {'result': dict((k, repr(v)) for k, v in reply.__dict__.iteritems()), @@ -440,9 +446,9 @@ def _pack_context(msg, context): for args at some point. """ - context = dict([('_context_%s' % key, value) - for (key, value) in context.to_dict().iteritems()]) - msg.update(context) + context_d = dict([('_context_%s' % key, value) + for (key, value) in context.to_dict().iteritems()]) + msg.update(context_d) class RpcContext(context.RequestContext): @@ -463,12 +469,13 @@ def multicall(context, topic, msg): LOG.debug(_('MSG_ID is %s') % (msg_id)) _pack_context(msg, context) - conn = ConnectionPool.get() - consumer = DirectConsumer(connection=conn, msg_id=msg_id) + con_conn = ConnectionPool.get() + consumer = DirectConsumer(connection=con_conn, msg_id=msg_id) wait_msg = MulticallWaiter(consumer) consumer.register_callback(wait_msg) - publisher = TopicPublisher(connection=conn, topic=topic) + pub_conn = ConnectionPool.get() + publisher = TopicPublisher(connection=pub_conn, topic=topic) publisher.send(msg) publisher.close() @@ -484,6 +491,7 @@ class MulticallWaiter(object): def close(self): self._closed = True self._consumer.close() + ConnectionPool.put(self._consumer.connection) def __call__(self, data, message): """Acks message and sets result.""" @@ -501,15 +509,26 @@ class MulticallWaiter(object): # trying to solve the problem quickly. This works but # I'd prefer to dig in and do it the best way later on. - def _waiter(): - while not self._closed: - try: - self._consumer.wait(limit=1) - except StopIteration: - pass - eventlet.spawn(_waiter) + #def _waiter(): + # i = 0 + # while not self._closed: + # LOG.error('Iteration #%s (%s)', i, self._consumer.consumer_tag) + # i += 1 + # try: + # self._consumer.wait(limit=1) + # except StopIteration: + # pass + # self._consumer.close() + # ConnectionPool.put(self._consumer.connection) + #eventlet.spawn(_waiter) while True: + rv = None + while rv is None and not self._closed: + rv = self._consumer.fetch(enable_callbacks=True) + time.sleep(0.01) + + LOG.error('RV %s', rv) result = self._results.get() if isinstance(result, Exception): raise result diff --git a/nova/tests/test_rpc.py b/nova/tests/test_rpc.py index f6420959..e5d99474 100644 --- a/nova/tests/test_rpc.py +++ b/nova/tests/test_rpc.py @@ -61,6 +61,18 @@ class RpcTestCase(test.TestCase): self.assertEqual(value + i, x) i += 1 + def test_multicall_succeed_three_times_yield(self): + """Get a value through rpc call""" + value = 42 + result = rpc.multicall(self.context, + 'test', + {"method": "echo_three_times_yield", + "args": {"value": value}}) + i = 0 + for x in result: + self.assertEqual(value + i, x) + i += 1 + def test_context_passed(self): """Makes sure a context is passed through rpc call""" value = 42 @@ -83,6 +95,7 @@ class RpcTestCase(test.TestCase): 'test', {"method": "fail", "args": {"value": value}}) + LOG.error('INNNNNNN BETTTWWWWWWWWWWEEEEEEEEEEN') try: rpc.call(self.context, 'test', @@ -186,6 +199,12 @@ class TestReceiver(object): context.reply(value + 1) context.reply(value + 2) + @staticmethod + def echo_three_times_yield(context, value): + yield value + yield value + 1 + yield value + 2 + @staticmethod def fail(context, value): """Raises an exception with the value sent in""" From ff9ba4e5d7f63ad4aab36e97f175926cdb31bed4 Mon Sep 17 00:00:00 2001 From: termie Date: Wed, 25 May 2011 15:42:24 -0700 Subject: [PATCH 15/25] almost everything working with fake_rabbit --- nova/rpc.py | 16 +++++++++++++++- nova/tests/test_cloud.py | 4 ++-- run_tests.py | 1 + 3 files changed, 18 insertions(+), 3 deletions(-) diff --git a/nova/rpc.py b/nova/rpc.py index d7d7bb01..e1f594a9 100644 --- a/nova/rpc.py +++ b/nova/rpc.py @@ -102,6 +102,7 @@ class Pool(pools.Pool): # TODO(comstud): Timeout connections not used in a while def create(self): + LOG.debug('Creating new connection') return Connection.instance(new=True) # Create a ConnectionPool to use for RPC calls. We'll order the @@ -166,6 +167,10 @@ class Consumer(messaging.Consumer): # LOG.exception(_('Failed to fetch message from queue: %s' % e)) # self.failed_connection = True + def close(self, *args, **kwargs): + LOG.debug('Closing consumer %s', self.consumer_tag) + return super(Consumer, self).close(*args, **kwargs) + def attach_to_eventlet(self): """Only needed for unit tests!""" timer = utils.LoopingCall(self.fetch, enable_callbacks=True) @@ -317,6 +322,8 @@ class ConsumerSet(object): # Break to outer loop break + def close(self): + self.consumer_set.close() class Publisher(messaging.Publisher): """Publisher base class.""" @@ -525,12 +532,19 @@ class MulticallWaiter(object): while True: rv = None while rv is None and not self._closed: - rv = self._consumer.fetch(enable_callbacks=True) + try: + rv = self._consumer.fetch(enable_callbacks=True) + except Exception: + self.close() + raise + #rv = self._consumer.fetch(enable_callbacks=True) time.sleep(0.01) LOG.error('RV %s', rv) result = self._results.get() + LOG.error('RESULT %s', result) if isinstance(result, Exception): + self.close() raise result if result == None: self.close() diff --git a/nova/tests/test_cloud.py b/nova/tests/test_cloud.py index 1e14c327..a838dd53 100644 --- a/nova/tests/test_cloud.py +++ b/nova/tests/test_cloud.py @@ -87,8 +87,8 @@ class CloudTestCase(test.TestCase): db.network_disassociate(self.context, network_ref['id']) self.manager.delete_project(self.project) self.manager.delete_user(self.user) - self.compute.kill() - self.network.kill() + #self.compute.kill() + #self.network.kill() super(CloudTestCase, self).tearDown() def _create_key(self, name): diff --git a/run_tests.py b/run_tests.py index d5d8acd1..509a60ca 100644 --- a/run_tests.py +++ b/run_tests.py @@ -285,6 +285,7 @@ if __name__ == '__main__': # If any argument looks like a test name but doesn't have "nova.tests" in # front of it, automatically add that so we don't have to type as much argv = [] + logging.getLogger('amqplib').setLevel(logging.DEBUG) for x in sys.argv: if x.startswith('test_'): argv.append('nova.tests.%s' % x) From aba7847b8aae1a8433d10331d0b9e96e0591f711 Mon Sep 17 00:00:00 2001 From: termie Date: Wed, 25 May 2011 15:42:24 -0700 Subject: [PATCH 16/25] don't need to use a separate connection --- nova/rpc.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/nova/rpc.py b/nova/rpc.py index e1f594a9..a212383f 100644 --- a/nova/rpc.py +++ b/nova/rpc.py @@ -481,8 +481,7 @@ def multicall(context, topic, msg): wait_msg = MulticallWaiter(consumer) consumer.register_callback(wait_msg) - pub_conn = ConnectionPool.get() - publisher = TopicPublisher(connection=pub_conn, topic=topic) + publisher = TopicPublisher(connection=con_conn, topic=topic) publisher.send(msg) publisher.close() From b3338e3ca5548f66c6756b0fd065d76b11c4017b Mon Sep 17 00:00:00 2001 From: Vishvananda Ishaya Date: Wed, 25 May 2011 15:42:24 -0700 Subject: [PATCH 17/25] lots of fixes for rpc and extra imports --- nova/fakerabbit.py | 12 ++++++-- nova/rpc.py | 68 +++++++++++++++++----------------------------- 2 files changed, 34 insertions(+), 46 deletions(-) diff --git a/nova/fakerabbit.py b/nova/fakerabbit.py index 5f3e75c4..ff993e29 100644 --- a/nova/fakerabbit.py +++ b/nova/fakerabbit.py @@ -31,6 +31,7 @@ LOG = logging.getLogger("nova.fakerabbit") EXCHANGES = {} QUEUES = {} +CONSUMERS = {} class Message(base.BaseMessage): @@ -101,17 +102,20 @@ class Backend(base.BaseBackend): EXCHANGES[exchange].bind(QUEUES[queue].push, routing_key) def declare_consumer(self, queue, callback, consumer_tag, *args, **kwargs): + global CONSUMERS LOG.debug("Adding consumer %s", consumer_tag) - self.consumers[consumer_tag] = (queue, callback) + CONSUMERS[consumer_tag] = (queue, callback) def cancel(self, consumer_tag): + global CONSUMERS LOG.debug("Removing consumer %s", consumer_tag) - del self.consumers[consumer_tag] + del CONSUMERS[consumer_tag] def consume(self, limit=None): + global CONSUMERS num = 0 while True: - for (queue, callback) in self.consumers.itervalues(): + for (queue, callback) in CONSUMERS.itervalues(): item = self.get(queue) if item: callback(item) @@ -147,5 +151,7 @@ class Backend(base.BaseBackend): def reset_all(): global EXCHANGES global QUEUES + global CONSUMERS EXCHANGES = {} QUEUES = {} + CONSUMERS = {} diff --git a/nova/rpc.py b/nova/rpc.py index a212383f..7faed4d3 100644 --- a/nova/rpc.py +++ b/nova/rpc.py @@ -33,9 +33,7 @@ import uuid from carrot import connection as carrot_connection from carrot import messaging -import eventlet from eventlet import greenpool -from eventlet import greenthread from eventlet import pools from eventlet import queue @@ -142,30 +140,30 @@ class Consumer(messaging.Consumer): FLAGS.rabbit_max_retries) sys.exit(1) - #def fetch(self, no_ack=None, auto_ack=None, enable_callbacks=False): - # """Wraps the parent fetch with some logic for failed connection.""" - # # TODO(vish): the logic for failed connections and logging should be - # # refactored into some sort of connection manager object - # try: - # if self.failed_connection: - # # NOTE(vish): connection is defined in the parent class, we can - # # recreate it as long as we create the backend too - # # pylint: disable=W0201 - # self.connection = Connection.recreate() - # self.backend = self.connection.create_backend() - # self.declare() - # return super(Consumer, self).fetch( - # no_ack, auto_ack, enable_callbacks) - # if self.failed_connection: - # LOG.error(_('Reconnected to queue')) - # self.failed_connection = False - # # NOTE(vish): This is catching all errors because we really don't - # # want exceptions to be logged 10 times a second if some - # # persistent failure occurs. - # except Exception, e: # pylint: disable=W0703 - # if not self.failed_connection: - # LOG.exception(_('Failed to fetch message from queue: %s' % e)) - # self.failed_connection = True + def fetch(self, no_ack=None, auto_ack=None, enable_callbacks=False): + """Wraps the parent fetch with some logic for failed connection.""" + # TODO(vish): the logic for failed connections and logging should be + # refactored into some sort of connection manager object + try: + if self.failed_connection: + # NOTE(vish): connection is defined in the parent class, we can + # recreate it as long as we create the backend too + # pylint: disable=W0201 + self.connection = Connection.recreate() + self.backend = self.connection.create_backend() + self.declare() + return super(Consumer, self).fetch( + no_ack, auto_ack, enable_callbacks) + if self.failed_connection: + LOG.error(_('Reconnected to queue')) + self.failed_connection = False + # NOTE(vish): This is catching all errors because we really don't + # want exceptions to be logged 10 times a second if some + # persistent failure occurs. + except Exception, e: # pylint: disable=W0703 + if not self.failed_connection: + LOG.exception(_('Failed to fetch message from queue: %s' % e)) + self.failed_connection = True def close(self, *args, **kwargs): LOG.debug('Closing consumer %s', self.consumer_tag) @@ -325,6 +323,7 @@ class ConsumerSet(object): def close(self): self.consumer_set.close() + class Publisher(messaging.Publisher): """Publisher base class.""" pass @@ -511,23 +510,6 @@ class MulticallWaiter(object): return self.wait() def wait(self): - # TODO(termie): This is probably really a much simpler issue but am - # trying to solve the problem quickly. This works but - # I'd prefer to dig in and do it the best way later on. - - #def _waiter(): - # i = 0 - # while not self._closed: - # LOG.error('Iteration #%s (%s)', i, self._consumer.consumer_tag) - # i += 1 - # try: - # self._consumer.wait(limit=1) - # except StopIteration: - # pass - # self._consumer.close() - # ConnectionPool.put(self._consumer.connection) - #eventlet.spawn(_waiter) - while True: rv = None while rv is None and not self._closed: From fa547237aea3e27fc4eaab4fb2cc144ad2d0466e Mon Sep 17 00:00:00 2001 From: termie Date: Wed, 25 May 2011 15:42:25 -0700 Subject: [PATCH 18/25] make sure that using multicall on a call with a single result still functions --- nova/rpc.py | 4 ++-- nova/tests/test_rpc.py | 29 +++++++++++++++++++++++++++++ 2 files changed, 31 insertions(+), 2 deletions(-) diff --git a/nova/rpc.py b/nova/rpc.py index 7faed4d3..84493271 100644 --- a/nova/rpc.py +++ b/nova/rpc.py @@ -233,10 +233,10 @@ class AdapterConsumer(Consumer): logging.error('rval! %s', rval) for x in rval: msg_reply(msg_id, x, None) - msg_reply(msg_id, None, None) else: msg_reply(msg_id, rval, None) - #msg_reply(msg_id, rval, None) + # This final None tells multicall that it is done. + msg_reply(msg_id, None, None) except Exception as e: logging.exception('Exception during message handling') if msg_id: diff --git a/nova/tests/test_rpc.py b/nova/tests/test_rpc.py index e5d99474..c1ef60ff 100644 --- a/nova/tests/test_rpc.py +++ b/nova/tests/test_rpc.py @@ -49,6 +49,35 @@ class RpcTestCase(test.TestCase): "args": {"value": value}}) self.assertEqual(value, result) + def test_call_succeed_despite_multiple_returns(self): + """Get a value through rpc call""" + value = 42 + result = rpc.call(self.context, 'test', {"method": "echo_three_times", + "args": {"value": value}}) + self.assertEqual(value, result) + + def test_call_succeed_despite_multiple_returns_yield(self): + """Get a value through rpc call""" + value = 42 + result = rpc.call(self.context, 'test', + {"method": "echo_three_times_yield", + "args": {"value": value}}) + self.assertEqual(value, result) + + def test_multicall_succeed_once(self): + """Get a value through rpc call""" + value = 42 + result = rpc.multicall(self.context, + 'test', + {"method": "echo", + "args": {"value": value}}) + i = 0 + for x in result: + if i > 0: + self.fail('should only receive one response') + self.assertEqual(value + i, x) + i += 1 + def test_multicall_succeed_three_times(self): """Get a value through rpc call""" value = 42 From e9f4f8b45cd9d8e1b247e2bb7c13458eb5cff829 Mon Sep 17 00:00:00 2001 From: termie Date: Wed, 25 May 2011 15:42:49 -0700 Subject: [PATCH 19/25] cleanup the code for merging --- nova/fakerabbit.py | 4 --- nova/rpc.py | 78 ++++++++++++++++++---------------------- nova/tests/test_cloud.py | 3 -- nova/tests/test_rpc.py | 1 - run_tests.py | 1 - 5 files changed, 35 insertions(+), 52 deletions(-) diff --git a/nova/fakerabbit.py b/nova/fakerabbit.py index ff993e29..e7e9dab7 100644 --- a/nova/fakerabbit.py +++ b/nova/fakerabbit.py @@ -78,10 +78,6 @@ class Queue(object): class Backend(base.BaseBackend): - def __init__(self, connection, **kwargs): - super(Backend, self).__init__(connection, **kwargs) - self.consumers = {} - def queue_declare(self, queue, **kwargs): global QUEUES if queue not in QUEUES: diff --git a/nova/rpc.py b/nova/rpc.py index 84493271..8d14494f 100644 --- a/nova/rpc.py +++ b/nova/rpc.py @@ -24,7 +24,6 @@ No fan-out support yet. """ -import greenlet import json import sys import time @@ -36,6 +35,7 @@ from carrot import messaging from eventlet import greenpool from eventlet import pools from eventlet import queue +import greenlet from nova import context from nova import exception @@ -50,9 +50,9 @@ LOG = logging.getLogger('nova.rpc') FLAGS = flags.FLAGS flags.DEFINE_integer('rpc_thread_pool_size', 1024, - 'Size of RPC thread pool') + 'Size of RPC thread pool') flags.DEFINE_integer('rpc_conn_pool_size', 30, - 'Size of RPC connection pool') + 'Size of RPC connection pool') class Connection(carrot_connection.BrokerConnection): @@ -96,7 +96,7 @@ class Connection(carrot_connection.BrokerConnection): class Pool(pools.Pool): - """Class that implements a Pool of Connections""" + """Class that implements a Pool of Connections.""" # TODO(comstud): Timeout connections not used in a while def create(self): @@ -152,8 +152,9 @@ class Consumer(messaging.Consumer): self.connection = Connection.recreate() self.backend = self.connection.create_backend() self.declare() - return super(Consumer, self).fetch( - no_ack, auto_ack, enable_callbacks) + return super(Consumer, self).fetch(no_ack, + auto_ack, + enable_callbacks) if self.failed_connection: LOG.error(_('Reconnected to queue')) self.failed_connection = False @@ -165,10 +166,6 @@ class Consumer(messaging.Consumer): LOG.exception(_('Failed to fetch message from queue: %s' % e)) self.failed_connection = True - def close(self, *args, **kwargs): - LOG.debug('Closing consumer %s', self.consumer_tag) - return super(Consumer, self).close(*args, **kwargs) - def attach_to_eventlet(self): """Only needed for unit tests!""" timer = utils.LoopingCall(self.fetch, enable_callbacks=True) @@ -188,8 +185,10 @@ class AdapterConsumer(Consumer): self.register_callback(self.process_data) def process_data(self, message_data, message): - """Consumer callback that parses the message for validity and - fires off a thread to call the proxy object method. + """Consumer callback to call a method on a proxy object. + + Parses the message for validity and fires off a thread to call the + proxy object method. Message data should be a dictionary with two keys: method: string representing the method to call @@ -199,8 +198,8 @@ class AdapterConsumer(Consumer): """ LOG.debug(_('received %s') % message_data) + # This will be popped off in _unpack_context msg_id = message_data.get('_msg_id', None) - ctxt = _unpack_context(message_data) method = message_data.get('method') @@ -228,13 +227,13 @@ class AdapterConsumer(Consumer): try: rval = node_func(context=ctxt, **node_args) if msg_id: - # TODO(termie): re-enable when fix the yielding issue + # Check if the result was a generator if hasattr(rval, 'send'): - logging.error('rval! %s', rval) for x in rval: msg_reply(msg_id, x, None) else: msg_reply(msg_id, rval, None) + # This final None tells multicall that it is done. msg_reply(msg_id, None, None) except Exception as e: @@ -277,7 +276,7 @@ class FanoutAdapterConsumer(AdapterConsumer): class ConsumerSet(object): - """Groups consumers to listen on together on a single connection""" + """Groups consumers to listen on together on a single connection.""" def __init__(self, conn, consumer_list): self.consumer_list = set(consumer_list) @@ -365,7 +364,7 @@ class DirectConsumer(Consumer): self.routing_key = msg_id self.exchange = msg_id self.auto_delete = True - self.exclusive = False + self.exclusive = True super(DirectConsumer, self).__init__(connection=connection) @@ -393,20 +392,18 @@ def msg_reply(msg_id, reply=None, failure=None): LOG.error(_("Returning exception %s to caller"), message) LOG.error(tb) failure = (failure[0].__name__, str(failure[1]), tb) - conn = ConnectionPool.get() - publisher = DirectPublisher(connection=conn, msg_id=msg_id) - try: - publisher.send({'result': reply, 'failure': failure}) - LOG.error('MSG REPLY SUCCESS') - except TypeError: - LOG.error('MSG REPLY FAILURE') - publisher.send( - {'result': dict((k, repr(v)) - for k, v in reply.__dict__.iteritems()), - 'failure': failure}) - publisher.close() - ConnectionPool.put(conn) + with ConnectionPool.item() as conn: + publisher = DirectPublisher(connection=conn, msg_id=msg_id) + try: + publisher.send({'result': reply, 'failure': failure}) + except TypeError: + publisher.send( + {'result': dict((k, repr(v)) + for k, v in reply.__dict__.iteritems()), + 'failure': failure}) + + publisher.close() class RemoteError(exception.Error): @@ -518,12 +515,9 @@ class MulticallWaiter(object): except Exception: self.close() raise - #rv = self._consumer.fetch(enable_callbacks=True) time.sleep(0.01) - LOG.error('RV %s', rv) result = self._results.get() - LOG.error('RESULT %s', result) if isinstance(result, Exception): self.close() raise result @@ -545,22 +539,20 @@ def cast(context, topic, msg): """Sends a message on a topic without waiting for a response.""" LOG.debug(_('Making asynchronous cast on %s...'), topic) _pack_context(msg, context) - conn = ConnectionPool.get() - publisher = TopicPublisher(connection=conn, topic=topic) - publisher.send(msg) - publisher.close() - ConnectionPool.put(conn) + with ConnectionPool.item() as conn: + publisher = TopicPublisher(connection=conn, topic=topic) + publisher.send(msg) + publisher.close() def fanout_cast(context, topic, msg): """Sends a message on a fanout exchange without waiting for a response.""" LOG.debug(_('Making asynchronous fanout cast...')) _pack_context(msg, context) - conn = ConnectionPool.get() - publisher = FanoutPublisher(topic, connection=conn) - publisher.send(msg) - publisher.close() - ConnectionPool.put(conn) + with ConnectionPool.item() as conn: + publisher = FanoutPublisher(topic, connection=conn) + publisher.send(msg) + publisher.close() def generic_response(message_data, message): diff --git a/nova/tests/test_cloud.py b/nova/tests/test_cloud.py index a838dd53..ca3ef7ff 100644 --- a/nova/tests/test_cloud.py +++ b/nova/tests/test_cloud.py @@ -87,8 +87,6 @@ class CloudTestCase(test.TestCase): db.network_disassociate(self.context, network_ref['id']) self.manager.delete_project(self.project) self.manager.delete_user(self.user) - #self.compute.kill() - #self.network.kill() super(CloudTestCase, self).tearDown() def _create_key(self, name): @@ -314,7 +312,6 @@ class CloudTestCase(test.TestCase): rv = self.cloud.terminate_instances(self.context, [instance_id]) def test_ajax_console(self): - kwargs = {'image_id': 'ami-1'} rv = self.cloud.run_instances(self.context, **kwargs) instance_id = rv['instancesSet'][0]['instanceId'] diff --git a/nova/tests/test_rpc.py b/nova/tests/test_rpc.py index c1ef60ff..fcecfb35 100644 --- a/nova/tests/test_rpc.py +++ b/nova/tests/test_rpc.py @@ -124,7 +124,6 @@ class RpcTestCase(test.TestCase): 'test', {"method": "fail", "args": {"value": value}}) - LOG.error('INNNNNNN BETTTWWWWWWWWWWEEEEEEEEEEN') try: rpc.call(self.context, 'test', diff --git a/run_tests.py b/run_tests.py index 509a60ca..d5d8acd1 100644 --- a/run_tests.py +++ b/run_tests.py @@ -285,7 +285,6 @@ if __name__ == '__main__': # If any argument looks like a test name but doesn't have "nova.tests" in # front of it, automatically add that so we don't have to type as much argv = [] - logging.getLogger('amqplib').setLevel(logging.DEBUG) for x in sys.argv: if x.startswith('test_'): argv.append('nova.tests.%s' % x) From fdb0c5a8745c3d11693274d7764ffde69efede7a Mon Sep 17 00:00:00 2001 From: termie Date: Wed, 25 May 2011 15:43:04 -0700 Subject: [PATCH 20/25] cleanups --- nova/tests/test_rpc.py | 47 +++++++++++++++++++----------------------- 1 file changed, 21 insertions(+), 26 deletions(-) diff --git a/nova/tests/test_rpc.py b/nova/tests/test_rpc.py index fcecfb35..8523b409 100644 --- a/nova/tests/test_rpc.py +++ b/nova/tests/test_rpc.py @@ -31,7 +31,6 @@ LOG = logging.getLogger('nova.tests.rpc') class RpcTestCase(test.TestCase): - """Test cases for rpc""" def setUp(self): super(RpcTestCase, self).setUp() self.conn = rpc.Connection.instance(True) @@ -43,21 +42,18 @@ class RpcTestCase(test.TestCase): self.context = context.get_admin_context() def test_call_succeed(self): - """Get a value through rpc call""" value = 42 result = rpc.call(self.context, 'test', {"method": "echo", "args": {"value": value}}) self.assertEqual(value, result) def test_call_succeed_despite_multiple_returns(self): - """Get a value through rpc call""" value = 42 result = rpc.call(self.context, 'test', {"method": "echo_three_times", "args": {"value": value}}) self.assertEqual(value, result) def test_call_succeed_despite_multiple_returns_yield(self): - """Get a value through rpc call""" value = 42 result = rpc.call(self.context, 'test', {"method": "echo_three_times_yield", @@ -65,7 +61,6 @@ class RpcTestCase(test.TestCase): self.assertEqual(value, result) def test_multicall_succeed_once(self): - """Get a value through rpc call""" value = 42 result = rpc.multicall(self.context, 'test', @@ -79,7 +74,6 @@ class RpcTestCase(test.TestCase): i += 1 def test_multicall_succeed_three_times(self): - """Get a value through rpc call""" value = 42 result = rpc.multicall(self.context, 'test', @@ -91,7 +85,6 @@ class RpcTestCase(test.TestCase): i += 1 def test_multicall_succeed_three_times_yield(self): - """Get a value through rpc call""" value = 42 result = rpc.multicall(self.context, 'test', @@ -103,7 +96,7 @@ class RpcTestCase(test.TestCase): i += 1 def test_context_passed(self): - """Makes sure a context is passed through rpc call""" + """Makes sure a context is passed through rpc call.""" value = 42 result = rpc.call(self.context, 'test', {"method": "context", @@ -111,11 +104,12 @@ class RpcTestCase(test.TestCase): self.assertEqual(self.context.to_dict(), result) def test_call_exception(self): - """Test that exception gets passed back properly + """Test that exception gets passed back properly. rpc.call returns a RemoteError object. The value of the exception is converted to a string, so we convert it back to an int in the test. + """ value = 42 self.assertRaises(rpc.RemoteError, @@ -134,7 +128,7 @@ class RpcTestCase(test.TestCase): self.assertEqual(int(exc.value), value) def test_nested_calls(self): - """Test that we can do an rpc.call inside another call""" + """Test that we can do an rpc.call inside another call.""" class Nested(object): @staticmethod def echo(context, queue, value): @@ -162,8 +156,7 @@ class RpcTestCase(test.TestCase): self.assertEqual(value, result) def test_connectionpool_single(self): - """Test that ConnectionPool recycles a single connection""" - + """Test that ConnectionPool recycles a single connection.""" conn1 = rpc.ConnectionPool.get() rpc.ConnectionPool.put(conn1) conn2 = rpc.ConnectionPool.get() @@ -171,10 +164,13 @@ class RpcTestCase(test.TestCase): self.assertEqual(conn1, conn2) def test_connectionpool_double(self): - """Test that ConnectionPool returns 2 separate connections - when called consecutively and the pool returns connections LIFO - """ + """Test that ConnectionPool returns and reuses separate connections. + When called consecutively we should get separate connections and upon + returning them those connections should be reused for future calls + before generating a new connection. + + """ conn1 = rpc.ConnectionPool.get() conn2 = rpc.ConnectionPool.get() @@ -184,14 +180,11 @@ class RpcTestCase(test.TestCase): conn3 = rpc.ConnectionPool.get() conn4 = rpc.ConnectionPool.get() - self.assertEqual(conn2, conn3) - self.assertEqual(conn1, conn4) + self.assertEqual(conn1, conn3) + self.assertEqual(conn2, conn4) def test_connectionpool_limit(self): - """Test connection pool limit and verify all connections - are unique - """ - + """Test connection pool limit and connection uniqueness.""" max_size = FLAGS.rpc_conn_pool_size conns = [] @@ -205,19 +198,21 @@ class RpcTestCase(test.TestCase): class TestReceiver(object): - """Simple Proxy class so the consumer has methods to call + """Simple Proxy class so the consumer has methods to call. - Uses static methods because we aren't actually storing any state""" + Uses static methods because we aren't actually storing any state. + + """ @staticmethod def echo(context, value): - """Simply returns whatever value is sent in""" + """Simply returns whatever value is sent in.""" LOG.debug(_("Received %s"), value) return value @staticmethod def context(context, value): - """Returns dictionary version of context""" + """Returns dictionary version of context.""" LOG.debug(_("Received %s"), context) return context.to_dict() @@ -235,5 +230,5 @@ class TestReceiver(object): @staticmethod def fail(context, value): - """Raises an exception with the value sent in""" + """Raises an exception with the value sent in.""" raise Exception(value) From 22e1bdcda697f64775e8a10f3a0305feb1d55c30 Mon Sep 17 00:00:00 2001 From: termie Date: Wed, 25 May 2011 15:43:04 -0700 Subject: [PATCH 21/25] replace removed import --- nova/tests/test_cloud.py | 1 + 1 file changed, 1 insertion(+) diff --git a/nova/tests/test_cloud.py b/nova/tests/test_cloud.py index ca3ef7ff..b64be662 100644 --- a/nova/tests/test_cloud.py +++ b/nova/tests/test_cloud.py @@ -19,6 +19,7 @@ from base64 import b64decode from M2Crypto import BIO from M2Crypto import RSA +import os from eventlet import greenthread From bb5e80112bd4b930b46a585a2a3d470010aeb8f6 Mon Sep 17 00:00:00 2001 From: termie Date: Wed, 25 May 2011 15:43:04 -0700 Subject: [PATCH 22/25] change the behavior of calling a multicall --- nova/rpc.py | 8 +++++--- nova/tests/test_rpc.py | 4 ++-- 2 files changed, 7 insertions(+), 5 deletions(-) diff --git a/nova/rpc.py b/nova/rpc.py index 8d14494f..493978e5 100644 --- a/nova/rpc.py +++ b/nova/rpc.py @@ -236,6 +236,9 @@ class AdapterConsumer(Consumer): # This final None tells multicall that it is done. msg_reply(msg_id, None, None) + elif hasattr(rval, 'send'): + # NOTE(vish): this iterates through the generator + list(rval) except Exception as e: logging.exception('Exception during message handling') if msg_id: @@ -530,9 +533,8 @@ class MulticallWaiter(object): def call(context, topic, msg): """Sends a message on a topic and wait for a response.""" rv = multicall(context, topic, msg) - for x in rv: - rv.close() - return x + # NOTE(vish): return the last result from the multicall + return list(rv)[-1] def cast(context, topic, msg): diff --git a/nova/tests/test_rpc.py b/nova/tests/test_rpc.py index 8523b409..35f4a64d 100644 --- a/nova/tests/test_rpc.py +++ b/nova/tests/test_rpc.py @@ -51,14 +51,14 @@ class RpcTestCase(test.TestCase): value = 42 result = rpc.call(self.context, 'test', {"method": "echo_three_times", "args": {"value": value}}) - self.assertEqual(value, result) + self.assertEqual(value + 2, result) def test_call_succeed_despite_multiple_returns_yield(self): value = 42 result = rpc.call(self.context, 'test', {"method": "echo_three_times_yield", "args": {"value": value}}) - self.assertEqual(value, result) + self.assertEqual(value + 2, result) def test_multicall_succeed_once(self): value = 42 From 94dc3041b3584ee158d3c725d6c0f33daaf864fa Mon Sep 17 00:00:00 2001 From: Chris Behrens Date: Thu, 26 May 2011 19:27:27 +0000 Subject: [PATCH 23/25] Change the return from glance to be a list of dictionaries describing VDIs Fix the rest of the code to account for this Add a test for swap --- nova/tests/test_xenapi.py | 23 +++++++++++++++++++++++ nova/tests/xenapi/stubs.py | 23 ++++++++++++++++++----- 2 files changed, 41 insertions(+), 5 deletions(-) diff --git a/nova/tests/test_xenapi.py b/nova/tests/test_xenapi.py index be1e3569..18a26789 100644 --- a/nova/tests/test_xenapi.py +++ b/nova/tests/test_xenapi.py @@ -395,6 +395,29 @@ class XenAPIVMTestCase(test.TestCase): os_type="linux") self.check_vm_params_for_linux() + def test_spawn_vhd_glance_swapdisk(self): + # Change the default host_call_plugin to one that'll return + # a swap disk + orig_func = stubs.FakeSessionForVMTests.host_call_plugin + + stubs.FakeSessionForVMTests.host_call_plugin = \ + stubs.FakeSessionForVMTests.host_call_plugin_swap + + try: + # We'll steal the above glance linux test + self.test_spawn_vhd_glance_linux() + finally: + # Make sure to put this back + stubs.FakeSessionForVMTests.host_call_plugin = orig_func + + # We should have 2 VBDs. + self.assertEqual(len(self.vm['VBDs']), 2) + # Now test that we have 1. + self.tearDown() + self.setUp() + self.test_spawn_vhd_glance_linux() + self.assertEqual(len(self.vm['VBDs']), 1) + def test_spawn_vhd_glance_windows(self): FLAGS.xenapi_image_service = 'glance' self._test_spawn(glance_stubs.FakeGlance.IMAGE_VHD, None, None, diff --git a/nova/tests/xenapi/stubs.py b/nova/tests/xenapi/stubs.py index 9f6f6431..35308d95 100644 --- a/nova/tests/xenapi/stubs.py +++ b/nova/tests/xenapi/stubs.py @@ -38,7 +38,7 @@ def stubout_instance_snapshot(stubs): sr_ref=sr_ref, sharable=False) vdi_rec = session.get_xenapi().VDI.get_record(vdi_ref) vdi_uuid = vdi_rec['uuid'] - return {'primary_vdi_uuid': vdi_uuid} + return [dict(vdi_type='os', vdi_uuid=vdi_uuid)] stubs.Set(vm_utils.VMHelper, 'fetch_image', fake_fetch_image) @@ -134,16 +134,29 @@ class FakeSessionForVMTests(fake.SessionBase): super(FakeSessionForVMTests, self).__init__(uri) def host_call_plugin(self, _1, _2, plugin, method, _5): + sr_ref = fake.get_all('SR')[0] + vdi_ref = fake.create_vdi('', False, sr_ref, False) + vdi_rec = fake.get_record('VDI', vdi_ref) + if plugin == "glance" and method == "download_vhd": + ret_str = json.dumps([dict(vdi_type='os', + vdi_uuid=vdi_rec['uuid'])]) + else: + ret_str = vdi_rec['uuid'] + return '%s' % ret_str + + def host_call_plugin_swap(self, _1, _2, plugin, method, _5): sr_ref = fake.get_all('SR')[0] vdi_ref = fake.create_vdi('', False, sr_ref, False) vdi_rec = fake.get_record('VDI', vdi_ref) if plugin == "glance" and method == "download_vhd": swap_vdi_ref = fake.create_vdi('', False, sr_ref, False) swap_vdi_rec = fake.get_record('VDI', swap_vdi_ref) - return '%s' % json.dumps( - {'primary_vdi_uuid': vdi_rec['uuid'], - 'swap_vdi_uuid': swap_vdi_rec['uuid']}) - return '%s' % vdi_rec['uuid'] + ret_str = json.dumps( + [dict(vdi_type='os', vdi_uuid=vdi_rec['uuid']), + dict(vdi_type='swap', vdi_uuid=swap_vdi_rec['uuid'])]) + else: + ret_str = vdi_rec['uuid'] + return '%s' % ret_str def VM_start(self, _1, ref, _2, _3): vm = fake.get_record('VM', ref) From 7be1954fa2ad5ff3d8c71986ad7c69d76e01d753 Mon Sep 17 00:00:00 2001 From: termie Date: Thu, 26 May 2011 15:08:53 -0700 Subject: [PATCH 24/25] changes per review --- nova/rpc.py | 17 ++++++++++------- nova/tests/test_rpc.py | 12 +++--------- 2 files changed, 13 insertions(+), 16 deletions(-) diff --git a/nova/rpc.py b/nova/rpc.py index 493978e5..1ec495bc 100644 --- a/nova/rpc.py +++ b/nova/rpc.py @@ -28,6 +28,7 @@ import json import sys import time import traceback +import types import uuid from carrot import connection as carrot_connection @@ -228,7 +229,7 @@ class AdapterConsumer(Consumer): rval = node_func(context=ctxt, **node_args) if msg_id: # Check if the result was a generator - if hasattr(rval, 'send'): + if isinstance(rval, types.GeneratorType): for x in rval: msg_reply(msg_id, x, None) else: @@ -236,7 +237,7 @@ class AdapterConsumer(Consumer): # This final None tells multicall that it is done. msg_reply(msg_id, None, None) - elif hasattr(rval, 'send'): + elif isinstance(rval, types.GeneratorType): # NOTE(vish): this iterates through the generator list(rval) except Exception as e: @@ -281,11 +282,11 @@ class FanoutAdapterConsumer(AdapterConsumer): class ConsumerSet(object): """Groups consumers to listen on together on a single connection.""" - def __init__(self, conn, consumer_list): + def __init__(self, connection, consumer_list): self.consumer_list = set(consumer_list) self.consumer_set = None self.enabled = True - self.init(conn) + self.init(connection) def init(self, conn): if not conn: @@ -316,8 +317,7 @@ class ConsumerSet(object): running = False break except Exception as e: - LOG.error(_("Received exception %s " % type(e) + \ - "while processing consumer")) + LOG.exception(_("Exception while processing consumer")) self.reconnect() # Break to outer loop break @@ -534,7 +534,10 @@ def call(context, topic, msg): """Sends a message on a topic and wait for a response.""" rv = multicall(context, topic, msg) # NOTE(vish): return the last result from the multicall - return list(rv)[-1] + rv = list(rv) + if not rv: + return + return rv[-1] def cast(context, topic, msg): diff --git a/nova/tests/test_rpc.py b/nova/tests/test_rpc.py index 35f4a64d..ffd748ef 100644 --- a/nova/tests/test_rpc.py +++ b/nova/tests/test_rpc.py @@ -66,12 +66,10 @@ class RpcTestCase(test.TestCase): 'test', {"method": "echo", "args": {"value": value}}) - i = 0 - for x in result: + for i, x in enumerate(result): if i > 0: self.fail('should only receive one response') self.assertEqual(value + i, x) - i += 1 def test_multicall_succeed_three_times(self): value = 42 @@ -79,10 +77,8 @@ class RpcTestCase(test.TestCase): 'test', {"method": "echo_three_times", "args": {"value": value}}) - i = 0 - for x in result: + for i, x in enumerate(result): self.assertEqual(value + i, x) - i += 1 def test_multicall_succeed_three_times_yield(self): value = 42 @@ -90,10 +86,8 @@ class RpcTestCase(test.TestCase): 'test', {"method": "echo_three_times_yield", "args": {"value": value}}) - i = 0 - for x in result: + for i, x in enumerate(result): self.assertEqual(value + i, x) - i += 1 def test_context_passed(self): """Makes sure a context is passed through rpc call.""" From 5236b7311755025481d7bc9f7c0f62fa2642815c Mon Sep 17 00:00:00 2001 From: termie Date: Thu, 26 May 2011 17:06:52 -0700 Subject: [PATCH 25/25] fix a minor bug unrelated to this change --- nova/rpc.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/nova/rpc.py b/nova/rpc.py index 1ec495bc..c5277c6a 100644 --- a/nova/rpc.py +++ b/nova/rpc.py @@ -212,7 +212,9 @@ class AdapterConsumer(Consumer): # we just log the message and send an error string # back to the caller LOG.warn(_('no method for message: %s') % message_data) - msg_reply(msg_id, _('No method for message: %s') % message_data) + if msg_id: + msg_reply(msg_id, + _('No method for message: %s') % message_data) return self.pool.spawn_n(self._process_data, msg_id, ctxt, method, args)