more work done to restore original rpc interfaces.

This commit is contained in:
Chris Behrens
2011-08-26 15:59:15 -07:00
parent 92cf801d69
commit ca537eebf0
3 changed files with 306 additions and 6 deletions

2
nova/rpc/FIXME Normal file
View File

@@ -0,0 +1,2 @@
Move some code duplication between carrot/kombu into common.py
The other FIXMEs in __init__.py and impl_kombu.py

View File

@@ -27,7 +27,7 @@ flags.DEFINE_string('rpc_backend',
"The messaging module to use, defaults to carrot.") "The messaging module to use, defaults to carrot.")
impl_table = {'kombu': 'nova.rpc.impl_kombu', impl_table = {'kombu': 'nova.rpc.impl_kombu',
'amqp': 'nova.rpc.impl_kombu'} 'amqp': 'nova.rpc.impl_kombu',
'carrot': 'nova.rpc.impl_carrot'} 'carrot': 'nova.rpc.impl_carrot'}
@@ -46,6 +46,7 @@ def create_consumer(conn, topic, proxy, fanout=False):
def create_consumer_set(conn, consumers): def create_consumer_set(conn, consumers):
# FIXME(comstud): replace however necessary
return RPCIMPL.ConsumerSet(connection=conn, consumer_list=consumers) return RPCIMPL.ConsumerSet(connection=conn, consumer_list=consumers)

View File

@@ -30,6 +30,11 @@ import uuid
FLAGS = flags.FLAGS FLAGS = flags.FLAGS
LOG = logging.getLogger('nova.rpc') LOG = logging.getLogger('nova.rpc')
flags.DEFINE_integer('rpc_conn_pool_size', 30,
'Size of RPC connection pool')
flags.DEFINE_integer('rpc_thread_pool_size', 1024,
'Size of RPC thread pool')
class QueueBase(object): class QueueBase(object):
"""Queue base class.""" """Queue base class."""
@@ -298,6 +303,16 @@ class Connection(object):
self.connection = None self.connection = None
self.reconnect() self.reconnect()
@classmethod
def instance(cls, new=True):
"""Returns the instance."""
if new or not hasattr(cls, '_instance'):
if new:
return cls()
else:
cls._instance = cls()
return cls._instance
def reconnect(self): def reconnect(self):
"""Handles reconnecting and re-estblishing queues""" """Handles reconnecting and re-estblishing queues"""
if self.connection: if self.connection:
@@ -359,8 +374,10 @@ class Connection(object):
"""Create a queue using the class that was passed in and """Create a queue using the class that was passed in and
add it to our list of queues used for consuming add it to our list of queues used for consuming
""" """
self.queues.append(queue_cls(self.channel, topic, callback, queue = queue_cls(self.channel, topic, callback,
self.queue_num.next())) self.queue_num.next())
self.queues.append(queue)
return queue
def consume(self, limit=None): def consume(self, limit=None):
"""Consume from all queues""" """Consume from all queues"""
@@ -403,15 +420,15 @@ class Connection(object):
In nova's use, this is generally a msg_id queue used for In nova's use, this is generally a msg_id queue used for
responses for call/multicall responses for call/multicall
""" """
self.create_queue(DirectQueue, topic, callback) return self.create_queue(DirectQueue, topic, callback)
def topic_consumer(self, topic, callback=None): def topic_consumer(self, topic, callback=None):
"""Create a 'topic' queue.""" """Create a 'topic' queue."""
self.create_queue(TopicQueue, topic, callback) return self.create_queue(TopicQueue, topic, callback)
def fanout_consumer(self, topic, callback): def fanout_consumer(self, topic, callback):
"""Create a 'fanout' queue""" """Create a 'fanout' queue"""
self.create_queue(FanoutQueue, topic, callback) return self.create_queue(FanoutQueue, topic, callback)
def direct_send(self, msg_id, msg): def direct_send(self, msg_id, msg):
"""Send a 'direct' message""" """Send a 'direct' message"""
@@ -424,3 +441,283 @@ class Connection(object):
def fanout_send(self, topic, msg): def fanout_send(self, topic, msg):
"""Send a 'fanout' message""" """Send a 'fanout' message"""
self.publisher_send(FanoutPublisher, topic, msg) self.publisher_send(FanoutPublisher, topic, msg)
class Pool(pools.Pool):
"""Class that implements a Pool of Connections."""
# TODO(comstud): Timeout connections not used in a while
def create(self):
LOG.debug('Creating new connection')
return RPCIMPL.Connection()
# Create a ConnectionPool to use for RPC calls. We'll order the
# pool as a stack (LIFO), so that we can potentially loop through and
# timeout old unused connections at some point
ConnectionPool = Pool(
max_size=FLAGS.rpc_conn_pool_size,
order_as_stack=True)
class ConnectionContext(object):
def __init__(self, pooled=True):
self.connection = None
if pooled:
self.connection = ConnectionPool.get()
else:
self.connection = RPCIMPL.Connection()
self.pooled = pooled
def __enter__(self):
return self
def _done(self):
if self.connection:
if self.pooled:
# Reset the connection so it's ready for the next caller
# to grab from the pool
self.connection.reset()
ConnectionPool.put(self.connection)
else:
try:
self.connection.close()
except Exception:
# There's apparently a bug in kombu 'memory' transport
# which causes an assert failure.
# But, we probably want to ignore all exceptions when
# trying to close a connection, anyway...
pass
self.connection = None
def __exit__(self, t, v, tb):
"""end if 'with' statement. We're done here."""
self._done()
def __del__(self):
"""Put Connection back into the pool if this ConnectionContext
is being deleted
"""
self._done()
def close(self):
self._done()
def __getattr__(self, key):
if self.connection:
return getattr(self.connection, key)
else:
raise exception.InvalidRPCConnectionReuse()
class ProxyCallback(object):
"""Calls methods on a proxy object based on method and args."""
def __init__(self, proxy):
self.proxy = proxy
self.pool = greenpool.GreenPool(FLAGS.rpc_thread_pool_size)
def __call__(self, message_data):
"""Consumer callback to call a method on a proxy object.
Parses the message for validity and fires off a thread to call the
proxy object method.
Message data should be a dictionary with two keys:
method: string representing the method to call
args: dictionary of arg: value
Example: {'method': 'echo', 'args': {'value': 42}}
"""
LOG.debug(_('received %s') % message_data)
ctxt = _unpack_context(message_data)
method = message_data.get('method')
args = message_data.get('args', {})
if not method:
LOG.warn(_('no method for message: %s') % message_data)
ctxt.reply(_('No method for message: %s') % message_data)
return
self.pool.spawn_n(self._process_data, ctxt, method, args)
@exception.wrap_exception()
def _process_data(self, ctxt, method, args):
"""Thread that maigcally looks for a method on the proxy
object and calls it.
"""
node_func = getattr(self.proxy, str(method))
node_args = dict((str(k), v) for k, v in args.iteritems())
# NOTE(vish): magic is fun!
try:
rval = node_func(context=ctxt, **node_args)
# Check if the result was a generator
if isinstance(rval, types.GeneratorType):
for x in rval:
ctxt.reply(x, None)
else:
ctxt.reply(rval, None)
# This final None tells multicall that it is done.
ctxt.reply(None, None)
except Exception as e:
logging.exception('Exception during message handling')
ctxt.reply(None, sys.exc_info())
return
def _unpack_context(msg):
"""Unpack context from msg."""
context_dict = {}
for key in list(msg.keys()):
# NOTE(vish): Some versions of python don't like unicode keys
# in kwargs.
key = str(key)
if key.startswith('_context_'):
value = msg.pop(key)
context_dict[key[9:]] = value
context_dict['msg_id'] = msg.pop('_msg_id', None)
LOG.debug(_('unpacked context: %s'), context_dict)
return RpcContext.from_dict(context_dict)
def _pack_context(msg, context):
"""Pack context into msg.
Values for message keys need to be less than 255 chars, so we pull
context out into a bunch of separate keys. If we want to support
more arguments in rabbit messages, we may want to do the same
for args at some point.
"""
context_d = dict([('_context_%s' % key, value)
for (key, value) in context.to_dict().iteritems()])
msg.update(context_d)
class RpcContext(context.RequestContext):
def __init__(self, *args, **kwargs):
msg_id = kwargs.pop('msg_id', None)
self.msg_id = msg_id
super(RpcContext, self).__init__(*args, **kwargs)
def reply(self, *args, **kwargs):
if self.msg_id:
msg_reply(self.msg_id, *args, **kwargs)
class MulticallWaiter(object):
def __init__(self, connection):
self._connection = connection
self._iterator = connection.consume()
self._result = None
self._done = False
def done(self):
self._done = True
self._connection = None
def __call__(self, data):
"""The consume() callback will call this. Store the result."""
if data['failure']:
self._result = RemoteError(*data['failure'])
else:
self._result = data['result']
def __iter__(self):
if self._done:
raise StopIteration
while True:
self._iterator.next()
result = self._result
if isinstance(result, Exception):
self.done()
raise result
if result == None:
self.done()
raise StopIteration
yield result
def create_consumer(conn, topic, proxy, fanout=False):
"""Create a consumer that calls a method in a proxy object"""
if fanout:
return conn.fanout_consumer(topic, ProxyCallback(proxy))
else:
return conn.topic_consumer(topic, ProxyCallback(proxy))
def create_consumer_set(conn, consumers):
# FIXME(comstud): Replace this however necessary
# Returns an object that you can call .wait() on to consume
# all queues?
# Needs to have a .close() which will stop consuming?
# Needs to also have an attach_to_eventlet method for tests?
raise NotImplemented
def multicall(context, topic, msg):
"""Make a call that returns multiple times."""
# Can't use 'with' for multicall, as it returns an iterator
# that will continue to use the connection. When it's done,
# connection.close() will get called which will put it back into
# the pool
LOG.debug(_('Making asynchronous call on %s ...'), topic)
msg_id = uuid.uuid4().hex
msg.update({'_msg_id': msg_id})
LOG.debug(_('MSG_ID is %s') % (msg_id))
_pack_context(msg, context)
conn = ConnectionContext()
wait_msg = MulticallWaiter(conn)
conn.direct_consumer(msg_id, wait_msg)
conn.topic_send(topic, msg)
return wait_msg
def call(context, topic, msg):
"""Sends a message on a topic and wait for a response."""
rv = multicall(context, topic, msg)
# NOTE(vish): return the last result from the multicall
rv = list(rv)
if not rv:
return
return rv[-1]
def cast(context, topic, msg):
"""Sends a message on a topic without waiting for a response."""
LOG.debug(_('Making asynchronous cast on %s...'), topic)
_pack_context(msg, context)
with ConnectionContext() as conn:
conn.topic_send(topic, msg)
def fanout_cast(context, topic, msg):
"""Sends a message on a fanout exchange without waiting for a response."""
LOG.debug(_('Making asynchronous fanout cast...'))
_pack_context(msg, context)
with ConnectionContext() as conn:
conn.fanout_send(topic, msg)
def msg_reply(msg_id, reply=None, failure=None):
"""Sends a reply or an error on the channel signified by msg_id.
Failure should be a sys.exc_info() tuple.
"""
with ConnectionContext() as conn:
if failure:
message = str(failure[1])
tb = traceback.format_exception(*failure)
LOG.error(_("Returning exception %s to caller"), message)
LOG.error(tb)
failure = (failure[0].__name__, str(failure[1]), tb)
try:
msg = {'result': reply, 'failure': failure}
except TypeError:
msg = {'result': dict((k, repr(v))
for k, v in reply.__dict__.iteritems()),
'failure': failure}
conn.direct_send(msg_id, msg)