Remove nova.rpc.impl_carrot.
This module was marked as deprecated and scheduled for removal in Essex. Remove it now that Folsom development is open. nova.rpc.impl_kombu should be used instead. This patch also removes nova.testing.fake.rabbit, since as far as I can tell, it isn't used anymore and was the last thing still using the carrot dependency. Change-Id: I8cfb2d09ee5eed439ec1d152261f7097faf08ad6
This commit is contained in:
parent
a8aa3ffdeb
commit
70a712921f
@ -1,684 +0,0 @@
|
|||||||
# vim: tabstop=4 shiftwidth=4 softtabstop=4
|
|
||||||
|
|
||||||
# Copyright 2010 United States Government as represented by the
|
|
||||||
# Administrator of the National Aeronautics and Space Administration.
|
|
||||||
# All Rights Reserved.
|
|
||||||
#
|
|
||||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
|
||||||
# not use this file except in compliance with the License. You may obtain
|
|
||||||
# a copy of the License at
|
|
||||||
#
|
|
||||||
# http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
#
|
|
||||||
# Unless required by applicable law or agreed to in writing, software
|
|
||||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
|
||||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
|
||||||
# License for the specific language governing permissions and limitations
|
|
||||||
# under the License.
|
|
||||||
|
|
||||||
"""AMQP-based RPC.
|
|
||||||
|
|
||||||
Queues have consumers and publishers.
|
|
||||||
|
|
||||||
No fan-out support yet.
|
|
||||||
|
|
||||||
"""
|
|
||||||
|
|
||||||
import inspect
|
|
||||||
import json
|
|
||||||
import sys
|
|
||||||
import time
|
|
||||||
import traceback
|
|
||||||
import uuid
|
|
||||||
|
|
||||||
from carrot import connection as carrot_connection
|
|
||||||
from carrot import messaging
|
|
||||||
import eventlet
|
|
||||||
from eventlet import greenpool
|
|
||||||
from eventlet import pools
|
|
||||||
from eventlet import queue
|
|
||||||
import greenlet
|
|
||||||
|
|
||||||
from nova import context
|
|
||||||
from nova import exception
|
|
||||||
from nova import flags
|
|
||||||
from nova import local
|
|
||||||
from nova import log as logging
|
|
||||||
from nova.rpc import common as rpc_common
|
|
||||||
from nova.testing import fake
|
|
||||||
from nova import utils
|
|
||||||
|
|
||||||
FLAGS = flags.FLAGS
|
|
||||||
LOG = logging.getLogger(__name__)
|
|
||||||
|
|
||||||
|
|
||||||
@utils.deprecated('Use of carrot will be removed in a future release. '
|
|
||||||
'Use kombu, instead.')
|
|
||||||
class Connection(carrot_connection.BrokerConnection, rpc_common.Connection):
|
|
||||||
"""Connection instance object."""
|
|
||||||
|
|
||||||
def __init__(self, *args, **kwargs):
|
|
||||||
super(Connection, self).__init__(*args, **kwargs)
|
|
||||||
self._rpc_consumers = []
|
|
||||||
self._rpc_consumer_thread = None
|
|
||||||
|
|
||||||
@classmethod
|
|
||||||
def instance(cls, new=True):
|
|
||||||
"""Returns the instance."""
|
|
||||||
if new or not hasattr(cls, '_instance'):
|
|
||||||
params = dict(hostname=FLAGS.rabbit_host,
|
|
||||||
port=FLAGS.rabbit_port,
|
|
||||||
ssl=FLAGS.rabbit_use_ssl,
|
|
||||||
userid=FLAGS.rabbit_userid,
|
|
||||||
password=FLAGS.rabbit_password,
|
|
||||||
virtual_host=FLAGS.rabbit_virtual_host)
|
|
||||||
|
|
||||||
if FLAGS.fake_rabbit:
|
|
||||||
params['backend_cls'] = fake.rabbit.Backend
|
|
||||||
|
|
||||||
# NOTE(vish): magic is fun!
|
|
||||||
# pylint: disable=W0142
|
|
||||||
if new:
|
|
||||||
return cls(**params)
|
|
||||||
else:
|
|
||||||
cls._instance = cls(**params)
|
|
||||||
return cls._instance
|
|
||||||
|
|
||||||
@classmethod
|
|
||||||
def recreate(cls):
|
|
||||||
"""Recreates the connection instance.
|
|
||||||
|
|
||||||
This is necessary to recover from some network errors/disconnects.
|
|
||||||
|
|
||||||
"""
|
|
||||||
try:
|
|
||||||
del cls._instance
|
|
||||||
except AttributeError, e:
|
|
||||||
# The _instance stuff is for testing purposes. Usually we don't use
|
|
||||||
# it. So don't freak out if it doesn't exist.
|
|
||||||
pass
|
|
||||||
return cls.instance()
|
|
||||||
|
|
||||||
def close(self):
|
|
||||||
self.cancel_consumer_thread()
|
|
||||||
for consumer in self._rpc_consumers:
|
|
||||||
try:
|
|
||||||
consumer.close()
|
|
||||||
except Exception:
|
|
||||||
# ignore all errors
|
|
||||||
pass
|
|
||||||
self._rpc_consumers = []
|
|
||||||
carrot_connection.BrokerConnection.close(self)
|
|
||||||
|
|
||||||
def consume_in_thread(self):
|
|
||||||
"""Consumer from all queues/consumers in a greenthread"""
|
|
||||||
|
|
||||||
consumer_set = ConsumerSet(connection=self,
|
|
||||||
consumer_list=self._rpc_consumers)
|
|
||||||
|
|
||||||
def _consumer_thread():
|
|
||||||
try:
|
|
||||||
consumer_set.wait()
|
|
||||||
except greenlet.GreenletExit:
|
|
||||||
return
|
|
||||||
if self._rpc_consumer_thread is None:
|
|
||||||
self._rpc_consumer_thread = eventlet.spawn(_consumer_thread)
|
|
||||||
return self._rpc_consumer_thread
|
|
||||||
|
|
||||||
def cancel_consumer_thread(self):
|
|
||||||
"""Cancel a consumer thread"""
|
|
||||||
if self._rpc_consumer_thread is not None:
|
|
||||||
self._rpc_consumer_thread.kill()
|
|
||||||
try:
|
|
||||||
self._rpc_consumer_thread.wait()
|
|
||||||
except greenlet.GreenletExit:
|
|
||||||
pass
|
|
||||||
self._rpc_consumer_thread = None
|
|
||||||
|
|
||||||
def create_consumer(self, topic, proxy, fanout=False):
|
|
||||||
"""Create a consumer that calls methods in the proxy"""
|
|
||||||
if fanout:
|
|
||||||
consumer = FanoutAdapterConsumer(
|
|
||||||
connection=self,
|
|
||||||
topic=topic,
|
|
||||||
proxy=proxy)
|
|
||||||
else:
|
|
||||||
consumer = TopicAdapterConsumer(
|
|
||||||
connection=self,
|
|
||||||
topic=topic,
|
|
||||||
proxy=proxy)
|
|
||||||
self._rpc_consumers.append(consumer)
|
|
||||||
|
|
||||||
|
|
||||||
class Pool(pools.Pool):
|
|
||||||
"""Class that implements a Pool of Connections."""
|
|
||||||
|
|
||||||
# TODO(comstud): Timeout connections not used in a while
|
|
||||||
def create(self):
|
|
||||||
LOG.debug('Pool creating new connection')
|
|
||||||
return Connection.instance(new=True)
|
|
||||||
|
|
||||||
# Create a ConnectionPool to use for RPC calls. We'll order the
|
|
||||||
# pool as a stack (LIFO), so that we can potentially loop through and
|
|
||||||
# timeout old unused connections at some point
|
|
||||||
ConnectionPool = Pool(
|
|
||||||
max_size=FLAGS.rpc_conn_pool_size,
|
|
||||||
order_as_stack=True)
|
|
||||||
|
|
||||||
|
|
||||||
class Consumer(messaging.Consumer):
|
|
||||||
"""Consumer base class.
|
|
||||||
|
|
||||||
Contains methods for connecting the fetch method to async loops.
|
|
||||||
|
|
||||||
"""
|
|
||||||
|
|
||||||
def __init__(self, *args, **kwargs):
|
|
||||||
max_retries = FLAGS.rabbit_max_retries
|
|
||||||
sleep_time = FLAGS.rabbit_retry_interval
|
|
||||||
tries = 0
|
|
||||||
while True:
|
|
||||||
tries += 1
|
|
||||||
if tries > 1:
|
|
||||||
time.sleep(sleep_time)
|
|
||||||
# backoff for next retry attempt.. if there is one
|
|
||||||
sleep_time += FLAGS.rabbit_retry_backoff
|
|
||||||
if sleep_time > 30:
|
|
||||||
sleep_time = 30
|
|
||||||
try:
|
|
||||||
super(Consumer, self).__init__(*args, **kwargs)
|
|
||||||
self.failed_connection = False
|
|
||||||
break
|
|
||||||
except Exception as e: # Catching all because carrot sucks
|
|
||||||
self.failed_connection = True
|
|
||||||
if max_retries > 0 and tries == max_retries:
|
|
||||||
break
|
|
||||||
fl_host = FLAGS.rabbit_host
|
|
||||||
fl_port = FLAGS.rabbit_port
|
|
||||||
fl_intv = sleep_time
|
|
||||||
LOG.error(_('AMQP server on %(fl_host)s:%(fl_port)d is'
|
|
||||||
' unreachable: %(e)s. Trying again in %(fl_intv)d'
|
|
||||||
' seconds.') % locals())
|
|
||||||
if self.failed_connection:
|
|
||||||
LOG.error(_('Unable to connect to AMQP server '
|
|
||||||
'after %(tries)d tries. Shutting down.') % locals())
|
|
||||||
sys.exit(1)
|
|
||||||
|
|
||||||
def fetch(self, no_ack=None, auto_ack=None, enable_callbacks=False):
|
|
||||||
"""Wraps the parent fetch with some logic for failed connection."""
|
|
||||||
# TODO(vish): the logic for failed connections and logging should be
|
|
||||||
# refactored into some sort of connection manager object
|
|
||||||
try:
|
|
||||||
if self.failed_connection:
|
|
||||||
# NOTE(vish): connection is defined in the parent class, we can
|
|
||||||
# recreate it as long as we create the backend too
|
|
||||||
# pylint: disable=W0201
|
|
||||||
self.connection = Connection.recreate()
|
|
||||||
self.backend = self.connection.create_backend()
|
|
||||||
self.declare()
|
|
||||||
return super(Consumer, self).fetch(no_ack,
|
|
||||||
auto_ack,
|
|
||||||
enable_callbacks)
|
|
||||||
if self.failed_connection:
|
|
||||||
LOG.error(_('Reconnected to queue'))
|
|
||||||
self.failed_connection = False
|
|
||||||
# NOTE(vish): This is catching all errors because we really don't
|
|
||||||
# want exceptions to be logged 10 times a second if some
|
|
||||||
# persistent failure occurs.
|
|
||||||
except Exception, e: # pylint: disable=W0703
|
|
||||||
if not self.failed_connection:
|
|
||||||
LOG.exception(_('Failed to fetch message from queue: %s') % e)
|
|
||||||
self.failed_connection = True
|
|
||||||
|
|
||||||
|
|
||||||
class AdapterConsumer(Consumer):
|
|
||||||
"""Calls methods on a proxy object based on method and args."""
|
|
||||||
|
|
||||||
def __init__(self, connection=None, topic='broadcast', proxy=None):
|
|
||||||
LOG.debug(_('Initing the Adapter Consumer for %s') % topic)
|
|
||||||
self.proxy = proxy
|
|
||||||
self.pool = greenpool.GreenPool(FLAGS.rpc_thread_pool_size)
|
|
||||||
super(AdapterConsumer, self).__init__(connection=connection,
|
|
||||||
topic=topic)
|
|
||||||
self.register_callback(self.process_data)
|
|
||||||
|
|
||||||
def process_data(self, message_data, message):
|
|
||||||
"""Consumer callback to call a method on a proxy object.
|
|
||||||
|
|
||||||
Parses the message for validity and fires off a thread to call the
|
|
||||||
proxy object method.
|
|
||||||
|
|
||||||
Message data should be a dictionary with two keys:
|
|
||||||
method: string representing the method to call
|
|
||||||
args: dictionary of arg: value
|
|
||||||
|
|
||||||
Example: {'method': 'echo', 'args': {'value': 42}}
|
|
||||||
|
|
||||||
"""
|
|
||||||
# It is important to clear the context here, because at this point
|
|
||||||
# the previous context is stored in local.store.context
|
|
||||||
if hasattr(local.store, 'context'):
|
|
||||||
del local.store.context
|
|
||||||
rpc_common._safe_log(LOG.debug, _('received %s'), message_data)
|
|
||||||
# This will be popped off in _unpack_context
|
|
||||||
msg_id = message_data.get('_msg_id', None)
|
|
||||||
ctxt = _unpack_context(message_data)
|
|
||||||
|
|
||||||
method = message_data.get('method')
|
|
||||||
args = message_data.get('args', {})
|
|
||||||
message.ack()
|
|
||||||
if not method:
|
|
||||||
# NOTE(vish): we may not want to ack here, but that means that bad
|
|
||||||
# messages stay in the queue indefinitely, so for now
|
|
||||||
# we just log the message and send an error string
|
|
||||||
# back to the caller
|
|
||||||
LOG.warn(_('no method for message: %s') % message_data)
|
|
||||||
ctxt.reply(msg_id,
|
|
||||||
_('No method for message: %s') % message_data)
|
|
||||||
return
|
|
||||||
self.pool.spawn_n(self._process_data, ctxt, method, args)
|
|
||||||
|
|
||||||
@exception.wrap_exception()
|
|
||||||
def _process_data(self, ctxt, method, args):
|
|
||||||
"""Thread that magically looks for a method on the proxy
|
|
||||||
object and calls it.
|
|
||||||
"""
|
|
||||||
ctxt.update_store()
|
|
||||||
node_func = getattr(self.proxy, str(method))
|
|
||||||
node_args = dict((str(k), v) for k, v in args.iteritems())
|
|
||||||
# NOTE(vish): magic is fun!
|
|
||||||
try:
|
|
||||||
rval = node_func(context=ctxt, **node_args)
|
|
||||||
# Check if the result was a generator
|
|
||||||
if inspect.isgenerator(rval):
|
|
||||||
for x in rval:
|
|
||||||
ctxt.reply(x, None)
|
|
||||||
else:
|
|
||||||
ctxt.reply(rval, None)
|
|
||||||
|
|
||||||
# This final None tells multicall that it is done.
|
|
||||||
ctxt.reply(ending=True)
|
|
||||||
except Exception as e:
|
|
||||||
LOG.exception('Exception during message handling')
|
|
||||||
ctxt.reply(None, sys.exc_info())
|
|
||||||
return
|
|
||||||
|
|
||||||
|
|
||||||
class TopicAdapterConsumer(AdapterConsumer):
|
|
||||||
"""Consumes messages on a specific topic."""
|
|
||||||
|
|
||||||
exchange_type = 'topic'
|
|
||||||
|
|
||||||
def __init__(self, connection=None, topic='broadcast', proxy=None):
|
|
||||||
self.queue = topic
|
|
||||||
self.routing_key = topic
|
|
||||||
self.exchange = FLAGS.control_exchange
|
|
||||||
self.durable = FLAGS.rabbit_durable_queues
|
|
||||||
super(TopicAdapterConsumer, self).__init__(connection=connection,
|
|
||||||
topic=topic, proxy=proxy)
|
|
||||||
|
|
||||||
|
|
||||||
class FanoutAdapterConsumer(AdapterConsumer):
|
|
||||||
"""Consumes messages from a fanout exchange."""
|
|
||||||
|
|
||||||
exchange_type = 'fanout'
|
|
||||||
|
|
||||||
def __init__(self, connection=None, topic='broadcast', proxy=None):
|
|
||||||
self.exchange = '%s_fanout' % topic
|
|
||||||
self.routing_key = topic
|
|
||||||
unique = uuid.uuid4().hex
|
|
||||||
self.queue = '%s_fanout_%s' % (topic, unique)
|
|
||||||
self.durable = False
|
|
||||||
# Fanout creates unique queue names, so we should auto-remove
|
|
||||||
# them when done, so they're not left around on restart.
|
|
||||||
# Also, we're the only one that should be consuming. exclusive
|
|
||||||
# implies auto_delete, so we'll just set that..
|
|
||||||
self.exclusive = True
|
|
||||||
LOG.info(_('Created "%(exchange)s" fanout exchange '
|
|
||||||
'with "%(key)s" routing key'),
|
|
||||||
dict(exchange=self.exchange, key=self.routing_key))
|
|
||||||
super(FanoutAdapterConsumer, self).__init__(connection=connection,
|
|
||||||
topic=topic, proxy=proxy)
|
|
||||||
|
|
||||||
|
|
||||||
class ConsumerSet(object):
|
|
||||||
"""Groups consumers to listen on together on a single connection."""
|
|
||||||
|
|
||||||
def __init__(self, connection, consumer_list):
|
|
||||||
self.consumer_list = set(consumer_list)
|
|
||||||
self.consumer_set = None
|
|
||||||
self.enabled = True
|
|
||||||
self.init(connection)
|
|
||||||
|
|
||||||
def init(self, conn):
|
|
||||||
if not conn:
|
|
||||||
conn = Connection.instance(new=True)
|
|
||||||
if self.consumer_set:
|
|
||||||
self.consumer_set.close()
|
|
||||||
self.consumer_set = messaging.ConsumerSet(conn)
|
|
||||||
for consumer in self.consumer_list:
|
|
||||||
consumer.connection = conn
|
|
||||||
# consumer.backend is set for us
|
|
||||||
self.consumer_set.add_consumer(consumer)
|
|
||||||
|
|
||||||
def reconnect(self):
|
|
||||||
self.init(None)
|
|
||||||
|
|
||||||
def wait(self, limit=None):
|
|
||||||
running = True
|
|
||||||
while running:
|
|
||||||
it = self.consumer_set.iterconsume(limit=limit)
|
|
||||||
if not it:
|
|
||||||
break
|
|
||||||
while True:
|
|
||||||
try:
|
|
||||||
it.next()
|
|
||||||
except StopIteration:
|
|
||||||
return
|
|
||||||
except greenlet.GreenletExit:
|
|
||||||
running = False
|
|
||||||
break
|
|
||||||
except Exception as e:
|
|
||||||
LOG.exception(_("Exception while processing consumer"))
|
|
||||||
self.reconnect()
|
|
||||||
# Break to outer loop
|
|
||||||
break
|
|
||||||
|
|
||||||
def close(self):
|
|
||||||
self.consumer_set.close()
|
|
||||||
|
|
||||||
|
|
||||||
class Publisher(messaging.Publisher):
|
|
||||||
"""Publisher base class."""
|
|
||||||
pass
|
|
||||||
|
|
||||||
|
|
||||||
class TopicPublisher(Publisher):
|
|
||||||
"""Publishes messages on a specific topic."""
|
|
||||||
|
|
||||||
exchange_type = 'topic'
|
|
||||||
|
|
||||||
def __init__(self, connection=None, topic='broadcast', durable=None):
|
|
||||||
self.routing_key = topic
|
|
||||||
self.exchange = FLAGS.control_exchange
|
|
||||||
self.durable = (FLAGS.rabbit_durable_queues if durable is None
|
|
||||||
else durable)
|
|
||||||
super(TopicPublisher, self).__init__(connection=connection)
|
|
||||||
|
|
||||||
|
|
||||||
class FanoutPublisher(Publisher):
|
|
||||||
"""Publishes messages to a fanout exchange."""
|
|
||||||
|
|
||||||
exchange_type = 'fanout'
|
|
||||||
|
|
||||||
def __init__(self, topic, connection=None):
|
|
||||||
self.exchange = '%s_fanout' % topic
|
|
||||||
self.queue = '%s_fanout' % topic
|
|
||||||
self.durable = False
|
|
||||||
self.auto_delete = True
|
|
||||||
LOG.info(_('Creating "%(exchange)s" fanout exchange'),
|
|
||||||
dict(exchange=self.exchange))
|
|
||||||
super(FanoutPublisher, self).__init__(connection=connection)
|
|
||||||
|
|
||||||
|
|
||||||
class DirectConsumer(Consumer):
|
|
||||||
"""Consumes messages directly on a channel specified by msg_id."""
|
|
||||||
|
|
||||||
exchange_type = 'direct'
|
|
||||||
|
|
||||||
def __init__(self, connection=None, msg_id=None):
|
|
||||||
self.queue = msg_id
|
|
||||||
self.routing_key = msg_id
|
|
||||||
self.exchange = msg_id
|
|
||||||
self.durable = False
|
|
||||||
self.auto_delete = True
|
|
||||||
self.exclusive = True
|
|
||||||
super(DirectConsumer, self).__init__(connection=connection)
|
|
||||||
|
|
||||||
|
|
||||||
class DirectPublisher(Publisher):
|
|
||||||
"""Publishes messages directly on a channel specified by msg_id."""
|
|
||||||
|
|
||||||
exchange_type = 'direct'
|
|
||||||
|
|
||||||
def __init__(self, connection=None, msg_id=None):
|
|
||||||
self.routing_key = msg_id
|
|
||||||
self.exchange = msg_id
|
|
||||||
self.durable = False
|
|
||||||
self.auto_delete = True
|
|
||||||
super(DirectPublisher, self).__init__(connection=connection)
|
|
||||||
|
|
||||||
|
|
||||||
def msg_reply(msg_id, reply=None, failure=None, ending=False):
|
|
||||||
"""Sends a reply or an error on the channel signified by msg_id.
|
|
||||||
|
|
||||||
Failure should be a sys.exc_info() tuple.
|
|
||||||
|
|
||||||
"""
|
|
||||||
if failure:
|
|
||||||
message = str(failure[1])
|
|
||||||
tb = traceback.format_exception(*failure)
|
|
||||||
LOG.error(_("Returning exception %s to caller"), message)
|
|
||||||
LOG.error(tb)
|
|
||||||
failure = (failure[0].__name__, str(failure[1]), tb)
|
|
||||||
|
|
||||||
with ConnectionPool.item() as conn:
|
|
||||||
publisher = DirectPublisher(connection=conn, msg_id=msg_id)
|
|
||||||
try:
|
|
||||||
msg = {'result': reply, 'failure': failure}
|
|
||||||
if ending:
|
|
||||||
msg['ending'] = True
|
|
||||||
publisher.send(msg)
|
|
||||||
except TypeError:
|
|
||||||
msg = {'result': dict((k, repr(v))
|
|
||||||
for k, v in reply.__dict__.iteritems()),
|
|
||||||
'failure': failure}
|
|
||||||
if ending:
|
|
||||||
msg['ending'] = True
|
|
||||||
publisher.send(msg)
|
|
||||||
|
|
||||||
publisher.close()
|
|
||||||
|
|
||||||
|
|
||||||
def _unpack_context(msg):
|
|
||||||
"""Unpack context from msg."""
|
|
||||||
context_dict = {}
|
|
||||||
for key in list(msg.keys()):
|
|
||||||
# NOTE(vish): Some versions of python don't like unicode keys
|
|
||||||
# in kwargs.
|
|
||||||
key = str(key)
|
|
||||||
if key.startswith('_context_'):
|
|
||||||
value = msg.pop(key)
|
|
||||||
context_dict[key[9:]] = value
|
|
||||||
context_dict['msg_id'] = msg.pop('_msg_id', None)
|
|
||||||
ctx = RpcContext.from_dict(context_dict)
|
|
||||||
LOG.debug(_('unpacked context: %s'), ctx.to_dict())
|
|
||||||
return ctx
|
|
||||||
|
|
||||||
|
|
||||||
def _pack_context(msg, context):
|
|
||||||
"""Pack context into msg.
|
|
||||||
|
|
||||||
Values for message keys need to be less than 255 chars, so we pull
|
|
||||||
context out into a bunch of separate keys. If we want to support
|
|
||||||
more arguments in rabbit messages, we may want to do the same
|
|
||||||
for args at some point.
|
|
||||||
|
|
||||||
"""
|
|
||||||
context_d = dict([('_context_%s' % key, value)
|
|
||||||
for (key, value) in context.to_dict().iteritems()])
|
|
||||||
msg.update(context_d)
|
|
||||||
|
|
||||||
|
|
||||||
class RpcContext(context.RequestContext):
|
|
||||||
def __init__(self, *args, **kwargs):
|
|
||||||
msg_id = kwargs.pop('msg_id', None)
|
|
||||||
self.msg_id = msg_id
|
|
||||||
super(RpcContext, self).__init__(*args, **kwargs)
|
|
||||||
|
|
||||||
def reply(self, reply=None, failure=None, ending=False):
|
|
||||||
if self.msg_id:
|
|
||||||
msg_reply(self.msg_id, reply, failure, ending)
|
|
||||||
if ending:
|
|
||||||
self.msg_id = None
|
|
||||||
|
|
||||||
|
|
||||||
def multicall(context, topic, msg, timeout=None):
|
|
||||||
"""Make a call that returns multiple times."""
|
|
||||||
# NOTE(russellb): carrot doesn't support timeouts
|
|
||||||
LOG.debug(_('Making asynchronous call on %s ...'), topic)
|
|
||||||
msg_id = uuid.uuid4().hex
|
|
||||||
msg.update({'_msg_id': msg_id})
|
|
||||||
LOG.debug(_('MSG_ID is %s') % (msg_id))
|
|
||||||
_pack_context(msg, context)
|
|
||||||
|
|
||||||
con_conn = ConnectionPool.get()
|
|
||||||
consumer = DirectConsumer(connection=con_conn, msg_id=msg_id)
|
|
||||||
wait_msg = MulticallWaiter(consumer)
|
|
||||||
consumer.register_callback(wait_msg)
|
|
||||||
|
|
||||||
publisher = TopicPublisher(connection=con_conn, topic=topic)
|
|
||||||
publisher.send(msg)
|
|
||||||
publisher.close()
|
|
||||||
|
|
||||||
return wait_msg
|
|
||||||
|
|
||||||
|
|
||||||
class MulticallWaiter(object):
|
|
||||||
def __init__(self, consumer):
|
|
||||||
self._consumer = consumer
|
|
||||||
self._results = queue.Queue()
|
|
||||||
self._closed = False
|
|
||||||
self._got_ending = False
|
|
||||||
|
|
||||||
def close(self):
|
|
||||||
if self._closed:
|
|
||||||
return
|
|
||||||
self._closed = True
|
|
||||||
self._consumer.close()
|
|
||||||
ConnectionPool.put(self._consumer.connection)
|
|
||||||
|
|
||||||
def __call__(self, data, message):
|
|
||||||
"""Acks message and sets result."""
|
|
||||||
message.ack()
|
|
||||||
if data['failure']:
|
|
||||||
self._results.put(rpc_common.RemoteError(*data['failure']))
|
|
||||||
elif data.get('ending', False):
|
|
||||||
self._got_ending = True
|
|
||||||
else:
|
|
||||||
self._results.put(data['result'])
|
|
||||||
|
|
||||||
def __iter__(self):
|
|
||||||
return self.wait()
|
|
||||||
|
|
||||||
def wait(self):
|
|
||||||
while not self._closed:
|
|
||||||
try:
|
|
||||||
rv = self._consumer.fetch(enable_callbacks=True)
|
|
||||||
except Exception:
|
|
||||||
self.close()
|
|
||||||
raise
|
|
||||||
if rv is None:
|
|
||||||
time.sleep(0.01)
|
|
||||||
continue
|
|
||||||
if self._got_ending:
|
|
||||||
self.close()
|
|
||||||
raise StopIteration
|
|
||||||
result = self._results.get()
|
|
||||||
if isinstance(result, Exception):
|
|
||||||
self.close()
|
|
||||||
raise result
|
|
||||||
yield result
|
|
||||||
|
|
||||||
|
|
||||||
def create_connection(new=True):
|
|
||||||
"""Create a connection"""
|
|
||||||
return Connection.instance(new=new)
|
|
||||||
|
|
||||||
|
|
||||||
def call(context, topic, msg, timeout=None):
|
|
||||||
"""Sends a message on a topic and wait for a response."""
|
|
||||||
rv = multicall(context, topic, msg, timeout)
|
|
||||||
# NOTE(vish): return the last result from the multicall
|
|
||||||
rv = list(rv)
|
|
||||||
if not rv:
|
|
||||||
return
|
|
||||||
return rv[-1]
|
|
||||||
|
|
||||||
|
|
||||||
def cast(context, topic, msg):
|
|
||||||
"""Sends a message on a topic without waiting for a response."""
|
|
||||||
LOG.debug(_('Making asynchronous cast on %s...'), topic)
|
|
||||||
_pack_context(msg, context)
|
|
||||||
with ConnectionPool.item() as conn:
|
|
||||||
publisher = TopicPublisher(connection=conn, topic=topic)
|
|
||||||
publisher.send(msg)
|
|
||||||
publisher.close()
|
|
||||||
|
|
||||||
|
|
||||||
def fanout_cast(context, topic, msg):
|
|
||||||
"""Sends a message on a fanout exchange without waiting for a response."""
|
|
||||||
LOG.debug(_('Making asynchronous fanout cast...'))
|
|
||||||
_pack_context(msg, context)
|
|
||||||
with ConnectionPool.item() as conn:
|
|
||||||
publisher = FanoutPublisher(topic, connection=conn)
|
|
||||||
publisher.send(msg)
|
|
||||||
publisher.close()
|
|
||||||
|
|
||||||
|
|
||||||
def notify(context, topic, msg):
|
|
||||||
"""Sends a notification event on a topic."""
|
|
||||||
LOG.debug(_('Sending notification on %s...'), topic)
|
|
||||||
_pack_context(msg, context)
|
|
||||||
with ConnectionPool.item() as conn:
|
|
||||||
publisher = TopicPublisher(connection=conn, topic=topic,
|
|
||||||
durable=True)
|
|
||||||
publisher.send(msg)
|
|
||||||
publisher.close()
|
|
||||||
|
|
||||||
|
|
||||||
def cleanup():
|
|
||||||
pass
|
|
||||||
|
|
||||||
|
|
||||||
def generic_response(message_data, message):
|
|
||||||
"""Logs a result and exits."""
|
|
||||||
LOG.debug(_('response %s'), message_data)
|
|
||||||
message.ack()
|
|
||||||
sys.exit(0)
|
|
||||||
|
|
||||||
|
|
||||||
def send_message(topic, message, wait=True):
|
|
||||||
"""Sends a message for testing."""
|
|
||||||
msg_id = uuid.uuid4().hex
|
|
||||||
message.update({'_msg_id': msg_id})
|
|
||||||
LOG.debug(_('topic is %s'), topic)
|
|
||||||
LOG.debug(_('message %s'), message)
|
|
||||||
|
|
||||||
if wait:
|
|
||||||
consumer = messaging.Consumer(connection=Connection.instance(),
|
|
||||||
queue=msg_id,
|
|
||||||
exchange=msg_id,
|
|
||||||
auto_delete=True,
|
|
||||||
exchange_type='direct',
|
|
||||||
routing_key=msg_id)
|
|
||||||
consumer.register_callback(generic_response)
|
|
||||||
|
|
||||||
publisher = messaging.Publisher(connection=Connection.instance(),
|
|
||||||
exchange=FLAGS.control_exchange,
|
|
||||||
durable=FLAGS.rabbit_durable_queues,
|
|
||||||
exchange_type='topic',
|
|
||||||
routing_key=topic)
|
|
||||||
publisher.send(message)
|
|
||||||
publisher.close()
|
|
||||||
|
|
||||||
if wait:
|
|
||||||
consumer.wait()
|
|
||||||
consumer.close()
|
|
||||||
|
|
||||||
|
|
||||||
if __name__ == '__main__':
|
|
||||||
# You can send messages from the command line using
|
|
||||||
# topic and a json string representing a dictionary
|
|
||||||
# for the method
|
|
||||||
send_message(sys.argv[1], json.loads(sys.argv[2]))
|
|
@ -39,7 +39,6 @@ from nova import log as logging
|
|||||||
from nova.openstack.common import cfg
|
from nova.openstack.common import cfg
|
||||||
from nova import utils
|
from nova import utils
|
||||||
from nova import service
|
from nova import service
|
||||||
from nova.testing.fake import rabbit
|
|
||||||
from nova.tests import reset_db
|
from nova.tests import reset_db
|
||||||
from nova.virt import fake
|
from nova.virt import fake
|
||||||
|
|
||||||
@ -149,10 +148,6 @@ class TestCase(unittest.TestCase):
|
|||||||
self.mox.VerifyAll()
|
self.mox.VerifyAll()
|
||||||
super(TestCase, self).tearDown()
|
super(TestCase, self).tearDown()
|
||||||
finally:
|
finally:
|
||||||
# Clean out fake_rabbit's queue if we used it
|
|
||||||
if FLAGS.fake_rabbit:
|
|
||||||
rabbit.reset_all()
|
|
||||||
|
|
||||||
if FLAGS.connection_type == 'fake':
|
if FLAGS.connection_type == 'fake':
|
||||||
if hasattr(fake.FakeConnection, '_instance'):
|
if hasattr(fake.FakeConnection, '_instance'):
|
||||||
del fake.FakeConnection._instance
|
del fake.FakeConnection._instance
|
||||||
|
@ -1,153 +0,0 @@
|
|||||||
# vim: tabstop=4 shiftwidth=4 softtabstop=4
|
|
||||||
|
|
||||||
# Copyright 2010 United States Government as represented by the
|
|
||||||
# Administrator of the National Aeronautics and Space Administration.
|
|
||||||
# All Rights Reserved.
|
|
||||||
#
|
|
||||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
|
||||||
# not use this file except in compliance with the License. You may obtain
|
|
||||||
# a copy of the License at
|
|
||||||
#
|
|
||||||
# http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
#
|
|
||||||
# Unless required by applicable law or agreed to in writing, software
|
|
||||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
|
||||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
|
||||||
# License for the specific language governing permissions and limitations
|
|
||||||
# under the License.
|
|
||||||
|
|
||||||
"""Based a bit on the carrot.backends.queue backend... but a lot better."""
|
|
||||||
|
|
||||||
import Queue as queue
|
|
||||||
|
|
||||||
from carrot.backends import base
|
|
||||||
from eventlet import greenthread
|
|
||||||
|
|
||||||
from nova import log as logging
|
|
||||||
|
|
||||||
|
|
||||||
LOG = logging.getLogger(__name__)
|
|
||||||
|
|
||||||
|
|
||||||
EXCHANGES = {}
|
|
||||||
QUEUES = {}
|
|
||||||
CONSUMERS = {}
|
|
||||||
|
|
||||||
|
|
||||||
class Message(base.BaseMessage):
|
|
||||||
pass
|
|
||||||
|
|
||||||
|
|
||||||
class Exchange(object):
|
|
||||||
def __init__(self, name, exchange_type):
|
|
||||||
self.name = name
|
|
||||||
self.exchange_type = exchange_type
|
|
||||||
self._queue = queue.Queue()
|
|
||||||
self._routes = {}
|
|
||||||
|
|
||||||
def publish(self, message, routing_key=None):
|
|
||||||
nm = self.name
|
|
||||||
LOG.debug(_('(%(nm)s) publish (key: %(routing_key)s)'
|
|
||||||
' %(message)s') % locals())
|
|
||||||
if routing_key in self._routes:
|
|
||||||
for f in self._routes[routing_key]:
|
|
||||||
LOG.debug(_('Publishing to route %s'), f)
|
|
||||||
f(message, routing_key=routing_key)
|
|
||||||
|
|
||||||
def bind(self, callback, routing_key):
|
|
||||||
self._routes.setdefault(routing_key, [])
|
|
||||||
self._routes[routing_key].append(callback)
|
|
||||||
|
|
||||||
|
|
||||||
class Queue(object):
|
|
||||||
def __init__(self, name):
|
|
||||||
self.name = name
|
|
||||||
self._queue = queue.Queue()
|
|
||||||
|
|
||||||
def __repr__(self):
|
|
||||||
return '<Queue: %s>' % self.name
|
|
||||||
|
|
||||||
def push(self, message, routing_key=None):
|
|
||||||
self._queue.put(message)
|
|
||||||
|
|
||||||
def size(self):
|
|
||||||
return self._queue.qsize()
|
|
||||||
|
|
||||||
def pop(self):
|
|
||||||
return self._queue.get()
|
|
||||||
|
|
||||||
|
|
||||||
class Backend(base.BaseBackend):
|
|
||||||
def queue_declare(self, queue, **kwargs):
|
|
||||||
global QUEUES
|
|
||||||
if queue not in QUEUES:
|
|
||||||
LOG.debug(_('Declaring queue %s'), queue)
|
|
||||||
QUEUES[queue] = Queue(queue)
|
|
||||||
|
|
||||||
def exchange_declare(self, exchange, type, *args, **kwargs):
|
|
||||||
global EXCHANGES
|
|
||||||
if exchange not in EXCHANGES:
|
|
||||||
LOG.debug(_('Declaring exchange %s'), exchange)
|
|
||||||
EXCHANGES[exchange] = Exchange(exchange, type)
|
|
||||||
|
|
||||||
def queue_bind(self, queue, exchange, routing_key, **kwargs):
|
|
||||||
global EXCHANGES
|
|
||||||
global QUEUES
|
|
||||||
LOG.debug(_('Binding %(queue)s to %(exchange)s with'
|
|
||||||
' key %(routing_key)s') % locals())
|
|
||||||
EXCHANGES[exchange].bind(QUEUES[queue].push, routing_key)
|
|
||||||
|
|
||||||
def declare_consumer(self, queue, callback, consumer_tag, *args, **kwargs):
|
|
||||||
global CONSUMERS
|
|
||||||
LOG.debug("Adding consumer %s", consumer_tag)
|
|
||||||
CONSUMERS[consumer_tag] = (queue, callback)
|
|
||||||
|
|
||||||
def cancel(self, consumer_tag):
|
|
||||||
global CONSUMERS
|
|
||||||
LOG.debug("Removing consumer %s", consumer_tag)
|
|
||||||
del CONSUMERS[consumer_tag]
|
|
||||||
|
|
||||||
def consume(self, limit=None):
|
|
||||||
global CONSUMERS
|
|
||||||
num = 0
|
|
||||||
while True:
|
|
||||||
for (queue, callback) in CONSUMERS.itervalues():
|
|
||||||
item = self.get(queue)
|
|
||||||
if item:
|
|
||||||
callback(item)
|
|
||||||
num += 1
|
|
||||||
yield
|
|
||||||
if limit and num == limit:
|
|
||||||
raise StopIteration()
|
|
||||||
greenthread.sleep(0.1)
|
|
||||||
|
|
||||||
def get(self, queue, no_ack=False):
|
|
||||||
global QUEUES
|
|
||||||
if not queue in QUEUES or not QUEUES[queue].size():
|
|
||||||
return None
|
|
||||||
(message_data, content_type, content_encoding) = QUEUES[queue].pop()
|
|
||||||
message = Message(backend=self, body=message_data,
|
|
||||||
content_type=content_type,
|
|
||||||
content_encoding=content_encoding)
|
|
||||||
message.result = True
|
|
||||||
LOG.debug(_('Getting from %(queue)s: %(message)s') % locals())
|
|
||||||
return message
|
|
||||||
|
|
||||||
def prepare_message(self, message_data, delivery_mode,
|
|
||||||
content_type, content_encoding, **kwargs):
|
|
||||||
"""Prepare message for sending."""
|
|
||||||
return (message_data, content_type, content_encoding)
|
|
||||||
|
|
||||||
def publish(self, message, exchange, routing_key, **kwargs):
|
|
||||||
global EXCHANGES
|
|
||||||
if exchange in EXCHANGES:
|
|
||||||
EXCHANGES[exchange].publish(message, routing_key=routing_key)
|
|
||||||
|
|
||||||
|
|
||||||
def reset_all():
|
|
||||||
global EXCHANGES
|
|
||||||
global QUEUES
|
|
||||||
global CONSUMERS
|
|
||||||
EXCHANGES = {}
|
|
||||||
QUEUES = {}
|
|
||||||
CONSUMERS = {}
|
|
@ -1,41 +0,0 @@
|
|||||||
# vim: tabstop=4 shiftwidth=4 softtabstop=4
|
|
||||||
|
|
||||||
# Copyright 2010 United States Government as represented by the
|
|
||||||
# Administrator of the National Aeronautics and Space Administration.
|
|
||||||
# All Rights Reserved.
|
|
||||||
#
|
|
||||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
|
||||||
# not use this file except in compliance with the License. You may obtain
|
|
||||||
# a copy of the License at
|
|
||||||
#
|
|
||||||
# http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
#
|
|
||||||
# Unless required by applicable law or agreed to in writing, software
|
|
||||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
|
||||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
|
||||||
# License for the specific language governing permissions and limitations
|
|
||||||
# under the License.
|
|
||||||
"""
|
|
||||||
Unit Tests for remote procedure calls using carrot
|
|
||||||
"""
|
|
||||||
|
|
||||||
from nova import log as logging
|
|
||||||
from nova.rpc import impl_carrot
|
|
||||||
from nova.tests.rpc import common
|
|
||||||
|
|
||||||
|
|
||||||
LOG = logging.getLogger(__name__)
|
|
||||||
|
|
||||||
|
|
||||||
class RpcCarrotTestCase(common.BaseRpcTestCase):
|
|
||||||
def setUp(self):
|
|
||||||
self.rpc = impl_carrot
|
|
||||||
super(RpcCarrotTestCase, self).setUp(supports_timeouts=False)
|
|
||||||
|
|
||||||
def test_connectionpool_single(self):
|
|
||||||
"""Test that ConnectionPool recycles a single connection."""
|
|
||||||
conn1 = self.rpc.ConnectionPool.get()
|
|
||||||
self.rpc.ConnectionPool.put(conn1)
|
|
||||||
conn2 = self.rpc.ConnectionPool.get()
|
|
||||||
self.rpc.ConnectionPool.put(conn2)
|
|
||||||
self.assertEqual(conn1, conn2)
|
|
@ -3,7 +3,6 @@ Cheetah==2.4.4
|
|||||||
amqplib==0.6.1
|
amqplib==0.6.1
|
||||||
anyjson==0.2.4
|
anyjson==0.2.4
|
||||||
boto==2.1.1
|
boto==2.1.1
|
||||||
carrot==0.10.5
|
|
||||||
eventlet
|
eventlet
|
||||||
kombu==1.0.4
|
kombu==1.0.4
|
||||||
lockfile==0.8
|
lockfile==0.8
|
||||||
|
Loading…
Reference in New Issue
Block a user