cleanup the code for merging

This commit is contained in:
termie
2011-05-25 15:42:49 -07:00
parent da22b73052
commit 3dee8b8f9f
7 changed files with 46 additions and 66 deletions

View File

@@ -78,10 +78,6 @@ class Queue(object):
class Backend(base.BaseBackend): class Backend(base.BaseBackend):
def __init__(self, connection, **kwargs):
super(Backend, self).__init__(connection, **kwargs)
self.consumers = {}
def queue_declare(self, queue, **kwargs): def queue_declare(self, queue, **kwargs):
global QUEUES global QUEUES
if queue not in QUEUES: if queue not in QUEUES:

View File

@@ -24,7 +24,6 @@ No fan-out support yet.
""" """
import greenlet
import json import json
import sys import sys
import time import time
@@ -36,6 +35,7 @@ from carrot import messaging
from eventlet import greenpool from eventlet import greenpool
from eventlet import pools from eventlet import pools
from eventlet import queue from eventlet import queue
import greenlet
from nova import context from nova import context
from nova import exception from nova import exception
@@ -50,9 +50,9 @@ LOG = logging.getLogger('nova.rpc')
FLAGS = flags.FLAGS FLAGS = flags.FLAGS
flags.DEFINE_integer('rpc_thread_pool_size', 1024, flags.DEFINE_integer('rpc_thread_pool_size', 1024,
'Size of RPC thread pool') 'Size of RPC thread pool')
flags.DEFINE_integer('rpc_conn_pool_size', 30, flags.DEFINE_integer('rpc_conn_pool_size', 30,
'Size of RPC connection pool') 'Size of RPC connection pool')
class Connection(carrot_connection.BrokerConnection): class Connection(carrot_connection.BrokerConnection):
@@ -96,7 +96,7 @@ class Connection(carrot_connection.BrokerConnection):
class Pool(pools.Pool): class Pool(pools.Pool):
"""Class that implements a Pool of Connections""" """Class that implements a Pool of Connections."""
# TODO(comstud): Timeout connections not used in a while # TODO(comstud): Timeout connections not used in a while
def create(self): def create(self):
@@ -152,8 +152,9 @@ class Consumer(messaging.Consumer):
self.connection = Connection.recreate() self.connection = Connection.recreate()
self.backend = self.connection.create_backend() self.backend = self.connection.create_backend()
self.declare() self.declare()
return super(Consumer, self).fetch( return super(Consumer, self).fetch(no_ack,
no_ack, auto_ack, enable_callbacks) auto_ack,
enable_callbacks)
if self.failed_connection: if self.failed_connection:
LOG.error(_('Reconnected to queue')) LOG.error(_('Reconnected to queue'))
self.failed_connection = False self.failed_connection = False
@@ -165,10 +166,6 @@ class Consumer(messaging.Consumer):
LOG.exception(_('Failed to fetch message from queue: %s' % e)) LOG.exception(_('Failed to fetch message from queue: %s' % e))
self.failed_connection = True self.failed_connection = True
def close(self, *args, **kwargs):
LOG.debug('Closing consumer %s', self.consumer_tag)
return super(Consumer, self).close(*args, **kwargs)
def attach_to_eventlet(self): def attach_to_eventlet(self):
"""Only needed for unit tests!""" """Only needed for unit tests!"""
timer = utils.LoopingCall(self.fetch, enable_callbacks=True) timer = utils.LoopingCall(self.fetch, enable_callbacks=True)
@@ -188,8 +185,10 @@ class AdapterConsumer(Consumer):
self.register_callback(self.process_data) self.register_callback(self.process_data)
def process_data(self, message_data, message): def process_data(self, message_data, message):
"""Consumer callback that parses the message for validity and """Consumer callback to call a method on a proxy object.
fires off a thread to call the proxy object method.
Parses the message for validity and fires off a thread to call the
proxy object method.
Message data should be a dictionary with two keys: Message data should be a dictionary with two keys:
method: string representing the method to call method: string representing the method to call
@@ -199,8 +198,8 @@ class AdapterConsumer(Consumer):
""" """
LOG.debug(_('received %s') % message_data) LOG.debug(_('received %s') % message_data)
# This will be popped off in _unpack_context
msg_id = message_data.get('_msg_id', None) msg_id = message_data.get('_msg_id', None)
ctxt = _unpack_context(message_data) ctxt = _unpack_context(message_data)
method = message_data.get('method') method = message_data.get('method')
@@ -228,13 +227,13 @@ class AdapterConsumer(Consumer):
try: try:
rval = node_func(context=ctxt, **node_args) rval = node_func(context=ctxt, **node_args)
if msg_id: if msg_id:
# TODO(termie): re-enable when fix the yielding issue # Check if the result was a generator
if hasattr(rval, 'send'): if hasattr(rval, 'send'):
logging.error('rval! %s', rval)
for x in rval: for x in rval:
msg_reply(msg_id, x, None) msg_reply(msg_id, x, None)
else: else:
msg_reply(msg_id, rval, None) msg_reply(msg_id, rval, None)
# This final None tells multicall that it is done. # This final None tells multicall that it is done.
msg_reply(msg_id, None, None) msg_reply(msg_id, None, None)
except Exception as e: except Exception as e:
@@ -277,7 +276,7 @@ class FanoutAdapterConsumer(AdapterConsumer):
class ConsumerSet(object): class ConsumerSet(object):
"""Groups consumers to listen on together on a single connection""" """Groups consumers to listen on together on a single connection."""
def __init__(self, conn, consumer_list): def __init__(self, conn, consumer_list):
self.consumer_list = set(consumer_list) self.consumer_list = set(consumer_list)
@@ -365,7 +364,7 @@ class DirectConsumer(Consumer):
self.routing_key = msg_id self.routing_key = msg_id
self.exchange = msg_id self.exchange = msg_id
self.auto_delete = True self.auto_delete = True
self.exclusive = False self.exclusive = True
super(DirectConsumer, self).__init__(connection=connection) super(DirectConsumer, self).__init__(connection=connection)
@@ -393,20 +392,18 @@ def msg_reply(msg_id, reply=None, failure=None):
LOG.error(_("Returning exception %s to caller"), message) LOG.error(_("Returning exception %s to caller"), message)
LOG.error(tb) LOG.error(tb)
failure = (failure[0].__name__, str(failure[1]), tb) failure = (failure[0].__name__, str(failure[1]), tb)
conn = ConnectionPool.get()
publisher = DirectPublisher(connection=conn, msg_id=msg_id)
try:
publisher.send({'result': reply, 'failure': failure})
LOG.error('MSG REPLY SUCCESS')
except TypeError:
LOG.error('MSG REPLY FAILURE')
publisher.send(
{'result': dict((k, repr(v))
for k, v in reply.__dict__.iteritems()),
'failure': failure})
publisher.close() with ConnectionPool.item() as conn:
ConnectionPool.put(conn) publisher = DirectPublisher(connection=conn, msg_id=msg_id)
try:
publisher.send({'result': reply, 'failure': failure})
except TypeError:
publisher.send(
{'result': dict((k, repr(v))
for k, v in reply.__dict__.iteritems()),
'failure': failure})
publisher.close()
class RemoteError(exception.Error): class RemoteError(exception.Error):
@@ -518,12 +515,9 @@ class MulticallWaiter(object):
except Exception: except Exception:
self.close() self.close()
raise raise
#rv = self._consumer.fetch(enable_callbacks=True)
time.sleep(0.01) time.sleep(0.01)
LOG.error('RV %s', rv)
result = self._results.get() result = self._results.get()
LOG.error('RESULT %s', result)
if isinstance(result, Exception): if isinstance(result, Exception):
self.close() self.close()
raise result raise result
@@ -545,22 +539,20 @@ def cast(context, topic, msg):
"""Sends a message on a topic without waiting for a response.""" """Sends a message on a topic without waiting for a response."""
LOG.debug(_('Making asynchronous cast on %s...'), topic) LOG.debug(_('Making asynchronous cast on %s...'), topic)
_pack_context(msg, context) _pack_context(msg, context)
conn = ConnectionPool.get() with ConnectionPool.item() as conn:
publisher = TopicPublisher(connection=conn, topic=topic) publisher = TopicPublisher(connection=conn, topic=topic)
publisher.send(msg) publisher.send(msg)
publisher.close() publisher.close()
ConnectionPool.put(conn)
def fanout_cast(context, topic, msg): def fanout_cast(context, topic, msg):
"""Sends a message on a fanout exchange without waiting for a response.""" """Sends a message on a fanout exchange without waiting for a response."""
LOG.debug(_('Making asynchronous fanout cast...')) LOG.debug(_('Making asynchronous fanout cast...'))
_pack_context(msg, context) _pack_context(msg, context)
conn = ConnectionPool.get() with ConnectionPool.item() as conn:
publisher = FanoutPublisher(topic, connection=conn) publisher = FanoutPublisher(topic, connection=conn)
publisher.send(msg) publisher.send(msg)
publisher.close() publisher.close()
ConnectionPool.put(conn)
def generic_response(message_data, message): def generic_response(message_data, message):

View File

@@ -88,27 +88,27 @@ class Service(object):
if 'nova-compute' == self.binary: if 'nova-compute' == self.binary:
self.manager.update_available_resource(ctxt) self.manager.update_available_resource(ctxt)
conn1 = rpc.Connection.instance(new=True) self.conn = rpc.Connection.instance(new=True)
logging.debug("Creating Consumer connection for Service %s" % \ logging.debug("Creating Consumer connection for Service %s" %
self.topic) self.topic)
# Share this same connection for these Consumers # Share this same connection for these Consumers
consumer_all = rpc.TopicAdapterConsumer( consumer_all = rpc.TopicAdapterConsumer(
connection=conn1, connection=self.conn,
topic=self.topic, topic=self.topic,
proxy=self) proxy=self)
consumer_node = rpc.TopicAdapterConsumer( consumer_node = rpc.TopicAdapterConsumer(
connection=conn1, connection=self.conn,
topic='%s.%s' % (self.topic, self.host), topic='%s.%s' % (self.topic, self.host),
proxy=self) proxy=self)
fanout = rpc.FanoutAdapterConsumer( fanout = rpc.FanoutAdapterConsumer(
connection=conn1, connection=self.conn,
topic=self.topic, topic=self.topic,
proxy=self) proxy=self)
cset = rpc.ConsumerSet(conn1, [consumer_all, cset = rpc.ConsumerSet(self.conn, [consumer_all,
consumer_node, consumer_node,
fanout]) fanout])
# Wait forever, processing these consumers # Wait forever, processing these consumers
def _wait(): def _wait():
@@ -119,10 +119,6 @@ class Service(object):
self.csetthread = greenthread.spawn(_wait) self.csetthread = greenthread.spawn(_wait)
#self.timers.append(consumer_all.attach_to_eventlet())
#self.timers.append(consumer_node.attach_to_eventlet())
#self.timers.append(fanout.attach_to_eventlet())
if self.report_interval: if self.report_interval:
pulse = utils.LoopingCall(self.report_state) pulse = utils.LoopingCall(self.report_state)
pulse.start(interval=self.report_interval, now=False) pulse.start(interval=self.report_interval, now=False)
@@ -185,6 +181,7 @@ class Service(object):
except greenlet.GreenletExit: except greenlet.GreenletExit:
pass pass
self.stop() self.stop()
rpc.ConnectionPool.put(self.conn)
try: try:
db.service_destroy(context.get_admin_context(), self.service_id) db.service_destroy(context.get_admin_context(), self.service_id)
except exception.NotFound: except exception.NotFound:

View File

@@ -83,7 +83,7 @@ class TestCase(unittest.TestCase):
self._monkey_patch_attach() self._monkey_patch_attach()
self._monkey_patch_wsgi() self._monkey_patch_wsgi()
self._original_flags = FLAGS.FlagValuesDict() self._original_flags = FLAGS.FlagValuesDict()
rpc.ConnectionPool = rpc.Pool(max_size=30) rpc.ConnectionPool = rpc.Pool(max_size=FLAGS.rpc_conn_pool_size)
def tearDown(self): def tearDown(self):
"""Runs after each test method to tear down test environment.""" """Runs after each test method to tear down test environment."""

View File

@@ -87,8 +87,6 @@ class CloudTestCase(test.TestCase):
db.network_disassociate(self.context, network_ref['id']) db.network_disassociate(self.context, network_ref['id'])
self.manager.delete_project(self.project) self.manager.delete_project(self.project)
self.manager.delete_user(self.user) self.manager.delete_user(self.user)
#self.compute.kill()
#self.network.kill()
super(CloudTestCase, self).tearDown() super(CloudTestCase, self).tearDown()
def _create_key(self, name): def _create_key(self, name):
@@ -314,7 +312,6 @@ class CloudTestCase(test.TestCase):
rv = self.cloud.terminate_instances(self.context, [instance_id]) rv = self.cloud.terminate_instances(self.context, [instance_id])
def test_ajax_console(self): def test_ajax_console(self):
kwargs = {'image_id': 'ami-1'} kwargs = {'image_id': 'ami-1'}
rv = self.cloud.run_instances(self.context, **kwargs) rv = self.cloud.run_instances(self.context, **kwargs)
instance_id = rv['instancesSet'][0]['instanceId'] instance_id = rv['instancesSet'][0]['instanceId']

View File

@@ -124,7 +124,6 @@ class RpcTestCase(test.TestCase):
'test', 'test',
{"method": "fail", {"method": "fail",
"args": {"value": value}}) "args": {"value": value}})
LOG.error('INNNNNNN BETTTWWWWWWWWWWEEEEEEEEEEN')
try: try:
rpc.call(self.context, rpc.call(self.context,
'test', 'test',

View File

@@ -285,7 +285,6 @@ if __name__ == '__main__':
# If any argument looks like a test name but doesn't have "nova.tests" in # If any argument looks like a test name but doesn't have "nova.tests" in
# front of it, automatically add that so we don't have to type as much # front of it, automatically add that so we don't have to type as much
argv = [] argv = []
logging.getLogger('amqplib').setLevel(logging.DEBUG)
for x in sys.argv: for x in sys.argv:
if x.startswith('test_'): if x.startswith('test_'):
argv.append('nova.tests.%s' % x) argv.append('nova.tests.%s' % x)