almost everything working with fake_rabbit

This commit is contained in:
termie
2011-05-25 15:42:24 -07:00
parent 4c065cd03d
commit 5651a2773a
5 changed files with 35 additions and 9 deletions

View File

@@ -102,6 +102,7 @@ class Pool(pools.Pool):
# TODO(comstud): Timeout connections not used in a while # TODO(comstud): Timeout connections not used in a while
def create(self): def create(self):
LOG.debug('Creating new connection')
return Connection.instance(new=True) return Connection.instance(new=True)
# Create a ConnectionPool to use for RPC calls. We'll order the # Create a ConnectionPool to use for RPC calls. We'll order the
@@ -166,6 +167,10 @@ class Consumer(messaging.Consumer):
# LOG.exception(_('Failed to fetch message from queue: %s' % e)) # LOG.exception(_('Failed to fetch message from queue: %s' % e))
# self.failed_connection = True # self.failed_connection = True
def close(self, *args, **kwargs):
LOG.debug('Closing consumer %s', self.consumer_tag)
return super(Consumer, self).close(*args, **kwargs)
def attach_to_eventlet(self): def attach_to_eventlet(self):
"""Only needed for unit tests!""" """Only needed for unit tests!"""
timer = utils.LoopingCall(self.fetch, enable_callbacks=True) timer = utils.LoopingCall(self.fetch, enable_callbacks=True)
@@ -317,6 +322,8 @@ class ConsumerSet(object):
# Break to outer loop # Break to outer loop
break break
def close(self):
self.consumer_set.close()
class Publisher(messaging.Publisher): class Publisher(messaging.Publisher):
"""Publisher base class.""" """Publisher base class."""
@@ -525,12 +532,19 @@ class MulticallWaiter(object):
while True: while True:
rv = None rv = None
while rv is None and not self._closed: while rv is None and not self._closed:
rv = self._consumer.fetch(enable_callbacks=True) try:
rv = self._consumer.fetch(enable_callbacks=True)
except Exception:
self.close()
raise
#rv = self._consumer.fetch(enable_callbacks=True)
time.sleep(0.01) time.sleep(0.01)
LOG.error('RV %s', rv) LOG.error('RV %s', rv)
result = self._results.get() result = self._results.get()
LOG.error('RESULT %s', result)
if isinstance(result, Exception): if isinstance(result, Exception):
self.close()
raise result raise result
if result == None: if result == None:
self.close() self.close()

View File

@@ -88,29 +88,39 @@ class Service(object):
if 'nova-compute' == self.binary: if 'nova-compute' == self.binary:
self.manager.update_available_resource(ctxt) self.manager.update_available_resource(ctxt)
conn = rpc.Connection.instance(new=True) conn1 = rpc.Connection.instance(new=True)
conn2 = rpc.Connection.instance(new=True)
conn3 = rpc.Connection.instance(new=True)
logging.debug("Creating Consumer connection for Service %s" % \ logging.debug("Creating Consumer connection for Service %s" % \
self.topic) self.topic)
# Share this same connection for these Consumers # Share this same connection for these Consumers
consumer_all = rpc.TopicAdapterConsumer( consumer_all = rpc.TopicAdapterConsumer(
connection=conn, connection=conn1,
topic=self.topic, topic=self.topic,
proxy=self) proxy=self)
consumer_node = rpc.TopicAdapterConsumer( consumer_node = rpc.TopicAdapterConsumer(
connection=conn, connection=conn1,
topic='%s.%s' % (self.topic, self.host), topic='%s.%s' % (self.topic, self.host),
proxy=self) proxy=self)
fanout = rpc.FanoutAdapterConsumer( fanout = rpc.FanoutAdapterConsumer(
connection=conn, connection=conn1,
topic=self.topic, topic=self.topic,
proxy=self) proxy=self)
cset = rpc.ConsumerSet(conn, [consumer_all, cset = rpc.ConsumerSet(conn1, [consumer_all,
consumer_node, consumer_node,
fanout]) fanout])
# Wait forever, processing these consumers # Wait forever, processing these consumers
self.csetthread = greenthread.spawn(cset.wait) def _wait():
cset.wait()
cset.close()
self.csetthread = greenthread.spawn(_wait)
#self.timers.append(consumer_all.attach_to_eventlet())
#self.timers.append(consumer_node.attach_to_eventlet())
#self.timers.append(fanout.attach_to_eventlet())
if self.report_interval: if self.report_interval:
pulse = utils.LoopingCall(self.report_state) pulse = utils.LoopingCall(self.report_state)

View File

@@ -85,6 +85,7 @@ class TestCase(unittest.TestCase):
self._monkey_patch_attach() self._monkey_patch_attach()
self._monkey_patch_wsgi() self._monkey_patch_wsgi()
self._original_flags = FLAGS.FlagValuesDict() self._original_flags = FLAGS.FlagValuesDict()
rpc.ConnectionPool = rpc.Pool(max_size=30)
def tearDown(self): def tearDown(self):
"""Runs after each test method to tear down test environment.""" """Runs after each test method to tear down test environment."""

View File

@@ -87,8 +87,8 @@ class CloudTestCase(test.TestCase):
db.network_disassociate(self.context, network_ref['id']) db.network_disassociate(self.context, network_ref['id'])
self.manager.delete_project(self.project) self.manager.delete_project(self.project)
self.manager.delete_user(self.user) self.manager.delete_user(self.user)
self.compute.kill() #self.compute.kill()
self.network.kill() #self.network.kill()
super(CloudTestCase, self).tearDown() super(CloudTestCase, self).tearDown()
def _create_key(self, name): def _create_key(self, name):

View File

@@ -285,6 +285,7 @@ if __name__ == '__main__':
# If any argument looks like a test name but doesn't have "nova.tests" in # If any argument looks like a test name but doesn't have "nova.tests" in
# front of it, automatically add that so we don't have to type as much # front of it, automatically add that so we don't have to type as much
argv = [] argv = []
logging.getLogger('amqplib').setLevel(logging.DEBUG)
for x in sys.argv: for x in sys.argv:
if x.startswith('test_'): if x.startswith('test_'):
argv.append('nova.tests.%s' % x) argv.append('nova.tests.%s' % x)