diff --git a/nova/fakerabbit.py b/nova/fakerabbit.py index ff993e29..e7e9dab7 100644 --- a/nova/fakerabbit.py +++ b/nova/fakerabbit.py @@ -78,10 +78,6 @@ class Queue(object): class Backend(base.BaseBackend): - def __init__(self, connection, **kwargs): - super(Backend, self).__init__(connection, **kwargs) - self.consumers = {} - def queue_declare(self, queue, **kwargs): global QUEUES if queue not in QUEUES: diff --git a/nova/rpc.py b/nova/rpc.py index 84493271..8d14494f 100644 --- a/nova/rpc.py +++ b/nova/rpc.py @@ -24,7 +24,6 @@ No fan-out support yet. """ -import greenlet import json import sys import time @@ -36,6 +35,7 @@ from carrot import messaging from eventlet import greenpool from eventlet import pools from eventlet import queue +import greenlet from nova import context from nova import exception @@ -50,9 +50,9 @@ LOG = logging.getLogger('nova.rpc') FLAGS = flags.FLAGS flags.DEFINE_integer('rpc_thread_pool_size', 1024, - 'Size of RPC thread pool') + 'Size of RPC thread pool') flags.DEFINE_integer('rpc_conn_pool_size', 30, - 'Size of RPC connection pool') + 'Size of RPC connection pool') class Connection(carrot_connection.BrokerConnection): @@ -96,7 +96,7 @@ class Connection(carrot_connection.BrokerConnection): class Pool(pools.Pool): - """Class that implements a Pool of Connections""" + """Class that implements a Pool of Connections.""" # TODO(comstud): Timeout connections not used in a while def create(self): @@ -152,8 +152,9 @@ class Consumer(messaging.Consumer): self.connection = Connection.recreate() self.backend = self.connection.create_backend() self.declare() - return super(Consumer, self).fetch( - no_ack, auto_ack, enable_callbacks) + return super(Consumer, self).fetch(no_ack, + auto_ack, + enable_callbacks) if self.failed_connection: LOG.error(_('Reconnected to queue')) self.failed_connection = False @@ -165,10 +166,6 @@ class Consumer(messaging.Consumer): LOG.exception(_('Failed to fetch message from queue: %s' % e)) self.failed_connection = True - def close(self, *args, **kwargs): - LOG.debug('Closing consumer %s', self.consumer_tag) - return super(Consumer, self).close(*args, **kwargs) - def attach_to_eventlet(self): """Only needed for unit tests!""" timer = utils.LoopingCall(self.fetch, enable_callbacks=True) @@ -188,8 +185,10 @@ class AdapterConsumer(Consumer): self.register_callback(self.process_data) def process_data(self, message_data, message): - """Consumer callback that parses the message for validity and - fires off a thread to call the proxy object method. + """Consumer callback to call a method on a proxy object. + + Parses the message for validity and fires off a thread to call the + proxy object method. Message data should be a dictionary with two keys: method: string representing the method to call @@ -199,8 +198,8 @@ class AdapterConsumer(Consumer): """ LOG.debug(_('received %s') % message_data) + # This will be popped off in _unpack_context msg_id = message_data.get('_msg_id', None) - ctxt = _unpack_context(message_data) method = message_data.get('method') @@ -228,13 +227,13 @@ class AdapterConsumer(Consumer): try: rval = node_func(context=ctxt, **node_args) if msg_id: - # TODO(termie): re-enable when fix the yielding issue + # Check if the result was a generator if hasattr(rval, 'send'): - logging.error('rval! %s', rval) for x in rval: msg_reply(msg_id, x, None) else: msg_reply(msg_id, rval, None) + # This final None tells multicall that it is done. msg_reply(msg_id, None, None) except Exception as e: @@ -277,7 +276,7 @@ class FanoutAdapterConsumer(AdapterConsumer): class ConsumerSet(object): - """Groups consumers to listen on together on a single connection""" + """Groups consumers to listen on together on a single connection.""" def __init__(self, conn, consumer_list): self.consumer_list = set(consumer_list) @@ -365,7 +364,7 @@ class DirectConsumer(Consumer): self.routing_key = msg_id self.exchange = msg_id self.auto_delete = True - self.exclusive = False + self.exclusive = True super(DirectConsumer, self).__init__(connection=connection) @@ -393,20 +392,18 @@ def msg_reply(msg_id, reply=None, failure=None): LOG.error(_("Returning exception %s to caller"), message) LOG.error(tb) failure = (failure[0].__name__, str(failure[1]), tb) - conn = ConnectionPool.get() - publisher = DirectPublisher(connection=conn, msg_id=msg_id) - try: - publisher.send({'result': reply, 'failure': failure}) - LOG.error('MSG REPLY SUCCESS') - except TypeError: - LOG.error('MSG REPLY FAILURE') - publisher.send( - {'result': dict((k, repr(v)) - for k, v in reply.__dict__.iteritems()), - 'failure': failure}) - publisher.close() - ConnectionPool.put(conn) + with ConnectionPool.item() as conn: + publisher = DirectPublisher(connection=conn, msg_id=msg_id) + try: + publisher.send({'result': reply, 'failure': failure}) + except TypeError: + publisher.send( + {'result': dict((k, repr(v)) + for k, v in reply.__dict__.iteritems()), + 'failure': failure}) + + publisher.close() class RemoteError(exception.Error): @@ -518,12 +515,9 @@ class MulticallWaiter(object): except Exception: self.close() raise - #rv = self._consumer.fetch(enable_callbacks=True) time.sleep(0.01) - LOG.error('RV %s', rv) result = self._results.get() - LOG.error('RESULT %s', result) if isinstance(result, Exception): self.close() raise result @@ -545,22 +539,20 @@ def cast(context, topic, msg): """Sends a message on a topic without waiting for a response.""" LOG.debug(_('Making asynchronous cast on %s...'), topic) _pack_context(msg, context) - conn = ConnectionPool.get() - publisher = TopicPublisher(connection=conn, topic=topic) - publisher.send(msg) - publisher.close() - ConnectionPool.put(conn) + with ConnectionPool.item() as conn: + publisher = TopicPublisher(connection=conn, topic=topic) + publisher.send(msg) + publisher.close() def fanout_cast(context, topic, msg): """Sends a message on a fanout exchange without waiting for a response.""" LOG.debug(_('Making asynchronous fanout cast...')) _pack_context(msg, context) - conn = ConnectionPool.get() - publisher = FanoutPublisher(topic, connection=conn) - publisher.send(msg) - publisher.close() - ConnectionPool.put(conn) + with ConnectionPool.item() as conn: + publisher = FanoutPublisher(topic, connection=conn) + publisher.send(msg) + publisher.close() def generic_response(message_data, message): diff --git a/nova/tests/test_cloud.py b/nova/tests/test_cloud.py index a838dd53..ca3ef7ff 100644 --- a/nova/tests/test_cloud.py +++ b/nova/tests/test_cloud.py @@ -87,8 +87,6 @@ class CloudTestCase(test.TestCase): db.network_disassociate(self.context, network_ref['id']) self.manager.delete_project(self.project) self.manager.delete_user(self.user) - #self.compute.kill() - #self.network.kill() super(CloudTestCase, self).tearDown() def _create_key(self, name): @@ -314,7 +312,6 @@ class CloudTestCase(test.TestCase): rv = self.cloud.terminate_instances(self.context, [instance_id]) def test_ajax_console(self): - kwargs = {'image_id': 'ami-1'} rv = self.cloud.run_instances(self.context, **kwargs) instance_id = rv['instancesSet'][0]['instanceId'] diff --git a/nova/tests/test_rpc.py b/nova/tests/test_rpc.py index c1ef60ff..fcecfb35 100644 --- a/nova/tests/test_rpc.py +++ b/nova/tests/test_rpc.py @@ -124,7 +124,6 @@ class RpcTestCase(test.TestCase): 'test', {"method": "fail", "args": {"value": value}}) - LOG.error('INNNNNNN BETTTWWWWWWWWWWEEEEEEEEEEN') try: rpc.call(self.context, 'test', diff --git a/run_tests.py b/run_tests.py index 509a60ca..d5d8acd1 100644 --- a/run_tests.py +++ b/run_tests.py @@ -285,7 +285,6 @@ if __name__ == '__main__': # If any argument looks like a test name but doesn't have "nova.tests" in # front of it, automatically add that so we don't have to type as much argv = [] - logging.getLogger('amqplib').setLevel(logging.DEBUG) for x in sys.argv: if x.startswith('test_'): argv.append('nova.tests.%s' % x)