From ca537eebf036a590d29822f5246a3cfc38ea58fe Mon Sep 17 00:00:00 2001 From: Chris Behrens Date: Fri, 26 Aug 2011 15:59:15 -0700 Subject: [PATCH] more work done to restore original rpc interfaces. --- nova/rpc/FIXME | 2 + nova/rpc/__init__.py | 3 +- nova/rpc/impl_kombu.py | 307 ++++++++++++++++++++++++++++++++++++++++- 3 files changed, 306 insertions(+), 6 deletions(-) create mode 100644 nova/rpc/FIXME diff --git a/nova/rpc/FIXME b/nova/rpc/FIXME new file mode 100644 index 00000000..70408180 --- /dev/null +++ b/nova/rpc/FIXME @@ -0,0 +1,2 @@ +Move some code duplication between carrot/kombu into common.py +The other FIXMEs in __init__.py and impl_kombu.py diff --git a/nova/rpc/__init__.py b/nova/rpc/__init__.py index f102cf0f..9371c2ab 100644 --- a/nova/rpc/__init__.py +++ b/nova/rpc/__init__.py @@ -27,7 +27,7 @@ flags.DEFINE_string('rpc_backend', "The messaging module to use, defaults to carrot.") impl_table = {'kombu': 'nova.rpc.impl_kombu', - 'amqp': 'nova.rpc.impl_kombu'} + 'amqp': 'nova.rpc.impl_kombu', 'carrot': 'nova.rpc.impl_carrot'} @@ -46,6 +46,7 @@ def create_consumer(conn, topic, proxy, fanout=False): def create_consumer_set(conn, consumers): + # FIXME(comstud): replace however necessary return RPCIMPL.ConsumerSet(connection=conn, consumer_list=consumers) diff --git a/nova/rpc/impl_kombu.py b/nova/rpc/impl_kombu.py index e609227c..a222bb88 100644 --- a/nova/rpc/impl_kombu.py +++ b/nova/rpc/impl_kombu.py @@ -30,6 +30,11 @@ import uuid FLAGS = flags.FLAGS LOG = logging.getLogger('nova.rpc') +flags.DEFINE_integer('rpc_conn_pool_size', 30, + 'Size of RPC connection pool') +flags.DEFINE_integer('rpc_thread_pool_size', 1024, + 'Size of RPC thread pool') + class QueueBase(object): """Queue base class.""" @@ -298,6 +303,16 @@ class Connection(object): self.connection = None self.reconnect() + @classmethod + def instance(cls, new=True): + """Returns the instance.""" + if new or not hasattr(cls, '_instance'): + if new: + return cls() + else: + cls._instance = cls() + return cls._instance + def reconnect(self): """Handles reconnecting and re-estblishing queues""" if self.connection: @@ -359,8 +374,10 @@ class Connection(object): """Create a queue using the class that was passed in and add it to our list of queues used for consuming """ - self.queues.append(queue_cls(self.channel, topic, callback, - self.queue_num.next())) + queue = queue_cls(self.channel, topic, callback, + self.queue_num.next()) + self.queues.append(queue) + return queue def consume(self, limit=None): """Consume from all queues""" @@ -403,15 +420,15 @@ class Connection(object): In nova's use, this is generally a msg_id queue used for responses for call/multicall """ - self.create_queue(DirectQueue, topic, callback) + return self.create_queue(DirectQueue, topic, callback) def topic_consumer(self, topic, callback=None): """Create a 'topic' queue.""" - self.create_queue(TopicQueue, topic, callback) + return self.create_queue(TopicQueue, topic, callback) def fanout_consumer(self, topic, callback): """Create a 'fanout' queue""" - self.create_queue(FanoutQueue, topic, callback) + return self.create_queue(FanoutQueue, topic, callback) def direct_send(self, msg_id, msg): """Send a 'direct' message""" @@ -424,3 +441,283 @@ class Connection(object): def fanout_send(self, topic, msg): """Send a 'fanout' message""" self.publisher_send(FanoutPublisher, topic, msg) + + +class Pool(pools.Pool): + """Class that implements a Pool of Connections.""" + + # TODO(comstud): Timeout connections not used in a while + def create(self): + LOG.debug('Creating new connection') + return RPCIMPL.Connection() + +# Create a ConnectionPool to use for RPC calls. We'll order the +# pool as a stack (LIFO), so that we can potentially loop through and +# timeout old unused connections at some point +ConnectionPool = Pool( + max_size=FLAGS.rpc_conn_pool_size, + order_as_stack=True) + + +class ConnectionContext(object): + def __init__(self, pooled=True): + self.connection = None + if pooled: + self.connection = ConnectionPool.get() + else: + self.connection = RPCIMPL.Connection() + self.pooled = pooled + + def __enter__(self): + return self + + def _done(self): + if self.connection: + if self.pooled: + # Reset the connection so it's ready for the next caller + # to grab from the pool + self.connection.reset() + ConnectionPool.put(self.connection) + else: + try: + self.connection.close() + except Exception: + # There's apparently a bug in kombu 'memory' transport + # which causes an assert failure. + # But, we probably want to ignore all exceptions when + # trying to close a connection, anyway... + pass + self.connection = None + + def __exit__(self, t, v, tb): + """end if 'with' statement. We're done here.""" + self._done() + + def __del__(self): + """Put Connection back into the pool if this ConnectionContext + is being deleted + """ + self._done() + + def close(self): + self._done() + + def __getattr__(self, key): + if self.connection: + return getattr(self.connection, key) + else: + raise exception.InvalidRPCConnectionReuse() + + +class ProxyCallback(object): + """Calls methods on a proxy object based on method and args.""" + + def __init__(self, proxy): + self.proxy = proxy + self.pool = greenpool.GreenPool(FLAGS.rpc_thread_pool_size) + + def __call__(self, message_data): + """Consumer callback to call a method on a proxy object. + + Parses the message for validity and fires off a thread to call the + proxy object method. + + Message data should be a dictionary with two keys: + method: string representing the method to call + args: dictionary of arg: value + + Example: {'method': 'echo', 'args': {'value': 42}} + + """ + LOG.debug(_('received %s') % message_data) + ctxt = _unpack_context(message_data) + method = message_data.get('method') + args = message_data.get('args', {}) + if not method: + LOG.warn(_('no method for message: %s') % message_data) + ctxt.reply(_('No method for message: %s') % message_data) + return + self.pool.spawn_n(self._process_data, ctxt, method, args) + + @exception.wrap_exception() + def _process_data(self, ctxt, method, args): + """Thread that maigcally looks for a method on the proxy + object and calls it. + """ + + node_func = getattr(self.proxy, str(method)) + node_args = dict((str(k), v) for k, v in args.iteritems()) + # NOTE(vish): magic is fun! + try: + rval = node_func(context=ctxt, **node_args) + # Check if the result was a generator + if isinstance(rval, types.GeneratorType): + for x in rval: + ctxt.reply(x, None) + else: + ctxt.reply(rval, None) + # This final None tells multicall that it is done. + ctxt.reply(None, None) + except Exception as e: + logging.exception('Exception during message handling') + ctxt.reply(None, sys.exc_info()) + return + + +def _unpack_context(msg): + """Unpack context from msg.""" + context_dict = {} + for key in list(msg.keys()): + # NOTE(vish): Some versions of python don't like unicode keys + # in kwargs. + key = str(key) + if key.startswith('_context_'): + value = msg.pop(key) + context_dict[key[9:]] = value + context_dict['msg_id'] = msg.pop('_msg_id', None) + LOG.debug(_('unpacked context: %s'), context_dict) + return RpcContext.from_dict(context_dict) + + +def _pack_context(msg, context): + """Pack context into msg. + + Values for message keys need to be less than 255 chars, so we pull + context out into a bunch of separate keys. If we want to support + more arguments in rabbit messages, we may want to do the same + for args at some point. + + """ + context_d = dict([('_context_%s' % key, value) + for (key, value) in context.to_dict().iteritems()]) + msg.update(context_d) + + +class RpcContext(context.RequestContext): + def __init__(self, *args, **kwargs): + msg_id = kwargs.pop('msg_id', None) + self.msg_id = msg_id + super(RpcContext, self).__init__(*args, **kwargs) + + def reply(self, *args, **kwargs): + if self.msg_id: + msg_reply(self.msg_id, *args, **kwargs) + + +class MulticallWaiter(object): + def __init__(self, connection): + self._connection = connection + self._iterator = connection.consume() + self._result = None + self._done = False + + def done(self): + self._done = True + self._connection = None + + def __call__(self, data): + """The consume() callback will call this. Store the result.""" + if data['failure']: + self._result = RemoteError(*data['failure']) + else: + self._result = data['result'] + + def __iter__(self): + if self._done: + raise StopIteration + while True: + self._iterator.next() + result = self._result + if isinstance(result, Exception): + self.done() + raise result + if result == None: + self.done() + raise StopIteration + yield result + + +def create_consumer(conn, topic, proxy, fanout=False): + """Create a consumer that calls a method in a proxy object""" + if fanout: + return conn.fanout_consumer(topic, ProxyCallback(proxy)) + else: + return conn.topic_consumer(topic, ProxyCallback(proxy)) + + +def create_consumer_set(conn, consumers): + # FIXME(comstud): Replace this however necessary + # Returns an object that you can call .wait() on to consume + # all queues? + # Needs to have a .close() which will stop consuming? + # Needs to also have an attach_to_eventlet method for tests? + raise NotImplemented + + +def multicall(context, topic, msg): + """Make a call that returns multiple times.""" + # Can't use 'with' for multicall, as it returns an iterator + # that will continue to use the connection. When it's done, + # connection.close() will get called which will put it back into + # the pool + LOG.debug(_('Making asynchronous call on %s ...'), topic) + msg_id = uuid.uuid4().hex + msg.update({'_msg_id': msg_id}) + LOG.debug(_('MSG_ID is %s') % (msg_id)) + _pack_context(msg, context) + + conn = ConnectionContext() + wait_msg = MulticallWaiter(conn) + conn.direct_consumer(msg_id, wait_msg) + conn.topic_send(topic, msg) + + return wait_msg + + +def call(context, topic, msg): + """Sends a message on a topic and wait for a response.""" + rv = multicall(context, topic, msg) + # NOTE(vish): return the last result from the multicall + rv = list(rv) + if not rv: + return + return rv[-1] + + +def cast(context, topic, msg): + """Sends a message on a topic without waiting for a response.""" + LOG.debug(_('Making asynchronous cast on %s...'), topic) + _pack_context(msg, context) + with ConnectionContext() as conn: + conn.topic_send(topic, msg) + + +def fanout_cast(context, topic, msg): + """Sends a message on a fanout exchange without waiting for a response.""" + LOG.debug(_('Making asynchronous fanout cast...')) + _pack_context(msg, context) + with ConnectionContext() as conn: + conn.fanout_send(topic, msg) + + +def msg_reply(msg_id, reply=None, failure=None): + """Sends a reply or an error on the channel signified by msg_id. + + Failure should be a sys.exc_info() tuple. + + """ + with ConnectionContext() as conn: + if failure: + message = str(failure[1]) + tb = traceback.format_exception(*failure) + LOG.error(_("Returning exception %s to caller"), message) + LOG.error(tb) + failure = (failure[0].__name__, str(failure[1]), tb) + + try: + msg = {'result': reply, 'failure': failure} + except TypeError: + msg = {'result': dict((k, repr(v)) + for k, v in reply.__dict__.iteritems()), + 'failure': failure} + conn.direct_send(msg_id, msg)