From 5651a2773a28f0811ffce4483b0736bdc1b49210 Mon Sep 17 00:00:00 2001 From: termie Date: Wed, 25 May 2011 15:42:24 -0700 Subject: [PATCH] almost everything working with fake_rabbit --- nova/rpc.py | 16 +++++++++++++++- nova/service.py | 22 ++++++++++++++++------ nova/test.py | 1 + nova/tests/test_cloud.py | 4 ++-- run_tests.py | 1 + 5 files changed, 35 insertions(+), 9 deletions(-) diff --git a/nova/rpc.py b/nova/rpc.py index d7d7bb014..e1f594a99 100644 --- a/nova/rpc.py +++ b/nova/rpc.py @@ -102,6 +102,7 @@ class Pool(pools.Pool): # TODO(comstud): Timeout connections not used in a while def create(self): + LOG.debug('Creating new connection') return Connection.instance(new=True) # Create a ConnectionPool to use for RPC calls. We'll order the @@ -166,6 +167,10 @@ class Consumer(messaging.Consumer): # LOG.exception(_('Failed to fetch message from queue: %s' % e)) # self.failed_connection = True + def close(self, *args, **kwargs): + LOG.debug('Closing consumer %s', self.consumer_tag) + return super(Consumer, self).close(*args, **kwargs) + def attach_to_eventlet(self): """Only needed for unit tests!""" timer = utils.LoopingCall(self.fetch, enable_callbacks=True) @@ -317,6 +322,8 @@ class ConsumerSet(object): # Break to outer loop break + def close(self): + self.consumer_set.close() class Publisher(messaging.Publisher): """Publisher base class.""" @@ -525,12 +532,19 @@ class MulticallWaiter(object): while True: rv = None while rv is None and not self._closed: - rv = self._consumer.fetch(enable_callbacks=True) + try: + rv = self._consumer.fetch(enable_callbacks=True) + except Exception: + self.close() + raise + #rv = self._consumer.fetch(enable_callbacks=True) time.sleep(0.01) LOG.error('RV %s', rv) result = self._results.get() + LOG.error('RESULT %s', result) if isinstance(result, Exception): + self.close() raise result if result == None: self.close() diff --git a/nova/service.py b/nova/service.py index 3a364b6c6..2da510140 100644 --- a/nova/service.py +++ b/nova/service.py @@ -88,29 +88,39 @@ class Service(object): if 'nova-compute' == self.binary: self.manager.update_available_resource(ctxt) - conn = rpc.Connection.instance(new=True) + conn1 = rpc.Connection.instance(new=True) + conn2 = rpc.Connection.instance(new=True) + conn3 = rpc.Connection.instance(new=True) logging.debug("Creating Consumer connection for Service %s" % \ self.topic) # Share this same connection for these Consumers consumer_all = rpc.TopicAdapterConsumer( - connection=conn, + connection=conn1, topic=self.topic, proxy=self) consumer_node = rpc.TopicAdapterConsumer( - connection=conn, + connection=conn1, topic='%s.%s' % (self.topic, self.host), proxy=self) fanout = rpc.FanoutAdapterConsumer( - connection=conn, + connection=conn1, topic=self.topic, proxy=self) - cset = rpc.ConsumerSet(conn, [consumer_all, + cset = rpc.ConsumerSet(conn1, [consumer_all, consumer_node, fanout]) # Wait forever, processing these consumers - self.csetthread = greenthread.spawn(cset.wait) + def _wait(): + cset.wait() + cset.close() + + self.csetthread = greenthread.spawn(_wait) + + #self.timers.append(consumer_all.attach_to_eventlet()) + #self.timers.append(consumer_node.attach_to_eventlet()) + #self.timers.append(fanout.attach_to_eventlet()) if self.report_interval: pulse = utils.LoopingCall(self.report_state) diff --git a/nova/test.py b/nova/test.py index 4deb2a175..7b2cf94b6 100644 --- a/nova/test.py +++ b/nova/test.py @@ -85,6 +85,7 @@ class TestCase(unittest.TestCase): self._monkey_patch_attach() self._monkey_patch_wsgi() self._original_flags = FLAGS.FlagValuesDict() + rpc.ConnectionPool = rpc.Pool(max_size=30) def tearDown(self): """Runs after each test method to tear down test environment.""" diff --git a/nova/tests/test_cloud.py b/nova/tests/test_cloud.py index 1e14c327c..a838dd530 100644 --- a/nova/tests/test_cloud.py +++ b/nova/tests/test_cloud.py @@ -87,8 +87,8 @@ class CloudTestCase(test.TestCase): db.network_disassociate(self.context, network_ref['id']) self.manager.delete_project(self.project) self.manager.delete_user(self.user) - self.compute.kill() - self.network.kill() + #self.compute.kill() + #self.network.kill() super(CloudTestCase, self).tearDown() def _create_key(self, name): diff --git a/run_tests.py b/run_tests.py index d5d8acd16..509a60caa 100644 --- a/run_tests.py +++ b/run_tests.py @@ -285,6 +285,7 @@ if __name__ == '__main__': # If any argument looks like a test name but doesn't have "nova.tests" in # front of it, automatically add that so we don't have to type as much argv = [] + logging.getLogger('amqplib').setLevel(logging.DEBUG) for x in sys.argv: if x.startswith('test_'): argv.append('nova.tests.%s' % x)