From 70a712921f1d9253653ebe0d25a2c23d5cf5d750 Mon Sep 17 00:00:00 2001 From: Russell Bryant Date: Mon, 2 Apr 2012 18:23:09 -0400 Subject: [PATCH] Remove nova.rpc.impl_carrot. This module was marked as deprecated and scheduled for removal in Essex. Remove it now that Folsom development is open. nova.rpc.impl_kombu should be used instead. This patch also removes nova.testing.fake.rabbit, since as far as I can tell, it isn't used anymore and was the last thing still using the carrot dependency. Change-Id: I8cfb2d09ee5eed439ec1d152261f7097faf08ad6 --- nova/rpc/impl_carrot.py | 684 ---------------------------------- nova/test.py | 5 - nova/testing/fake/rabbit.py | 153 -------- nova/tests/rpc/test_carrot.py | 41 -- tools/pip-requires | 1 - 5 files changed, 884 deletions(-) delete mode 100644 nova/rpc/impl_carrot.py delete mode 100644 nova/testing/fake/rabbit.py delete mode 100644 nova/tests/rpc/test_carrot.py diff --git a/nova/rpc/impl_carrot.py b/nova/rpc/impl_carrot.py deleted file mode 100644 index 22586b1a9a2e..000000000000 --- a/nova/rpc/impl_carrot.py +++ /dev/null @@ -1,684 +0,0 @@ -# vim: tabstop=4 shiftwidth=4 softtabstop=4 - -# Copyright 2010 United States Government as represented by the -# Administrator of the National Aeronautics and Space Administration. -# All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -"""AMQP-based RPC. - -Queues have consumers and publishers. - -No fan-out support yet. - -""" - -import inspect -import json -import sys -import time -import traceback -import uuid - -from carrot import connection as carrot_connection -from carrot import messaging -import eventlet -from eventlet import greenpool -from eventlet import pools -from eventlet import queue -import greenlet - -from nova import context -from nova import exception -from nova import flags -from nova import local -from nova import log as logging -from nova.rpc import common as rpc_common -from nova.testing import fake -from nova import utils - -FLAGS = flags.FLAGS -LOG = logging.getLogger(__name__) - - -@utils.deprecated('Use of carrot will be removed in a future release. ' - 'Use kombu, instead.') -class Connection(carrot_connection.BrokerConnection, rpc_common.Connection): - """Connection instance object.""" - - def __init__(self, *args, **kwargs): - super(Connection, self).__init__(*args, **kwargs) - self._rpc_consumers = [] - self._rpc_consumer_thread = None - - @classmethod - def instance(cls, new=True): - """Returns the instance.""" - if new or not hasattr(cls, '_instance'): - params = dict(hostname=FLAGS.rabbit_host, - port=FLAGS.rabbit_port, - ssl=FLAGS.rabbit_use_ssl, - userid=FLAGS.rabbit_userid, - password=FLAGS.rabbit_password, - virtual_host=FLAGS.rabbit_virtual_host) - - if FLAGS.fake_rabbit: - params['backend_cls'] = fake.rabbit.Backend - - # NOTE(vish): magic is fun! - # pylint: disable=W0142 - if new: - return cls(**params) - else: - cls._instance = cls(**params) - return cls._instance - - @classmethod - def recreate(cls): - """Recreates the connection instance. - - This is necessary to recover from some network errors/disconnects. - - """ - try: - del cls._instance - except AttributeError, e: - # The _instance stuff is for testing purposes. Usually we don't use - # it. So don't freak out if it doesn't exist. - pass - return cls.instance() - - def close(self): - self.cancel_consumer_thread() - for consumer in self._rpc_consumers: - try: - consumer.close() - except Exception: - # ignore all errors - pass - self._rpc_consumers = [] - carrot_connection.BrokerConnection.close(self) - - def consume_in_thread(self): - """Consumer from all queues/consumers in a greenthread""" - - consumer_set = ConsumerSet(connection=self, - consumer_list=self._rpc_consumers) - - def _consumer_thread(): - try: - consumer_set.wait() - except greenlet.GreenletExit: - return - if self._rpc_consumer_thread is None: - self._rpc_consumer_thread = eventlet.spawn(_consumer_thread) - return self._rpc_consumer_thread - - def cancel_consumer_thread(self): - """Cancel a consumer thread""" - if self._rpc_consumer_thread is not None: - self._rpc_consumer_thread.kill() - try: - self._rpc_consumer_thread.wait() - except greenlet.GreenletExit: - pass - self._rpc_consumer_thread = None - - def create_consumer(self, topic, proxy, fanout=False): - """Create a consumer that calls methods in the proxy""" - if fanout: - consumer = FanoutAdapterConsumer( - connection=self, - topic=topic, - proxy=proxy) - else: - consumer = TopicAdapterConsumer( - connection=self, - topic=topic, - proxy=proxy) - self._rpc_consumers.append(consumer) - - -class Pool(pools.Pool): - """Class that implements a Pool of Connections.""" - - # TODO(comstud): Timeout connections not used in a while - def create(self): - LOG.debug('Pool creating new connection') - return Connection.instance(new=True) - -# Create a ConnectionPool to use for RPC calls. We'll order the -# pool as a stack (LIFO), so that we can potentially loop through and -# timeout old unused connections at some point -ConnectionPool = Pool( - max_size=FLAGS.rpc_conn_pool_size, - order_as_stack=True) - - -class Consumer(messaging.Consumer): - """Consumer base class. - - Contains methods for connecting the fetch method to async loops. - - """ - - def __init__(self, *args, **kwargs): - max_retries = FLAGS.rabbit_max_retries - sleep_time = FLAGS.rabbit_retry_interval - tries = 0 - while True: - tries += 1 - if tries > 1: - time.sleep(sleep_time) - # backoff for next retry attempt.. if there is one - sleep_time += FLAGS.rabbit_retry_backoff - if sleep_time > 30: - sleep_time = 30 - try: - super(Consumer, self).__init__(*args, **kwargs) - self.failed_connection = False - break - except Exception as e: # Catching all because carrot sucks - self.failed_connection = True - if max_retries > 0 and tries == max_retries: - break - fl_host = FLAGS.rabbit_host - fl_port = FLAGS.rabbit_port - fl_intv = sleep_time - LOG.error(_('AMQP server on %(fl_host)s:%(fl_port)d is' - ' unreachable: %(e)s. Trying again in %(fl_intv)d' - ' seconds.') % locals()) - if self.failed_connection: - LOG.error(_('Unable to connect to AMQP server ' - 'after %(tries)d tries. Shutting down.') % locals()) - sys.exit(1) - - def fetch(self, no_ack=None, auto_ack=None, enable_callbacks=False): - """Wraps the parent fetch with some logic for failed connection.""" - # TODO(vish): the logic for failed connections and logging should be - # refactored into some sort of connection manager object - try: - if self.failed_connection: - # NOTE(vish): connection is defined in the parent class, we can - # recreate it as long as we create the backend too - # pylint: disable=W0201 - self.connection = Connection.recreate() - self.backend = self.connection.create_backend() - self.declare() - return super(Consumer, self).fetch(no_ack, - auto_ack, - enable_callbacks) - if self.failed_connection: - LOG.error(_('Reconnected to queue')) - self.failed_connection = False - # NOTE(vish): This is catching all errors because we really don't - # want exceptions to be logged 10 times a second if some - # persistent failure occurs. - except Exception, e: # pylint: disable=W0703 - if not self.failed_connection: - LOG.exception(_('Failed to fetch message from queue: %s') % e) - self.failed_connection = True - - -class AdapterConsumer(Consumer): - """Calls methods on a proxy object based on method and args.""" - - def __init__(self, connection=None, topic='broadcast', proxy=None): - LOG.debug(_('Initing the Adapter Consumer for %s') % topic) - self.proxy = proxy - self.pool = greenpool.GreenPool(FLAGS.rpc_thread_pool_size) - super(AdapterConsumer, self).__init__(connection=connection, - topic=topic) - self.register_callback(self.process_data) - - def process_data(self, message_data, message): - """Consumer callback to call a method on a proxy object. - - Parses the message for validity and fires off a thread to call the - proxy object method. - - Message data should be a dictionary with two keys: - method: string representing the method to call - args: dictionary of arg: value - - Example: {'method': 'echo', 'args': {'value': 42}} - - """ - # It is important to clear the context here, because at this point - # the previous context is stored in local.store.context - if hasattr(local.store, 'context'): - del local.store.context - rpc_common._safe_log(LOG.debug, _('received %s'), message_data) - # This will be popped off in _unpack_context - msg_id = message_data.get('_msg_id', None) - ctxt = _unpack_context(message_data) - - method = message_data.get('method') - args = message_data.get('args', {}) - message.ack() - if not method: - # NOTE(vish): we may not want to ack here, but that means that bad - # messages stay in the queue indefinitely, so for now - # we just log the message and send an error string - # back to the caller - LOG.warn(_('no method for message: %s') % message_data) - ctxt.reply(msg_id, - _('No method for message: %s') % message_data) - return - self.pool.spawn_n(self._process_data, ctxt, method, args) - - @exception.wrap_exception() - def _process_data(self, ctxt, method, args): - """Thread that magically looks for a method on the proxy - object and calls it. - """ - ctxt.update_store() - node_func = getattr(self.proxy, str(method)) - node_args = dict((str(k), v) for k, v in args.iteritems()) - # NOTE(vish): magic is fun! - try: - rval = node_func(context=ctxt, **node_args) - # Check if the result was a generator - if inspect.isgenerator(rval): - for x in rval: - ctxt.reply(x, None) - else: - ctxt.reply(rval, None) - - # This final None tells multicall that it is done. - ctxt.reply(ending=True) - except Exception as e: - LOG.exception('Exception during message handling') - ctxt.reply(None, sys.exc_info()) - return - - -class TopicAdapterConsumer(AdapterConsumer): - """Consumes messages on a specific topic.""" - - exchange_type = 'topic' - - def __init__(self, connection=None, topic='broadcast', proxy=None): - self.queue = topic - self.routing_key = topic - self.exchange = FLAGS.control_exchange - self.durable = FLAGS.rabbit_durable_queues - super(TopicAdapterConsumer, self).__init__(connection=connection, - topic=topic, proxy=proxy) - - -class FanoutAdapterConsumer(AdapterConsumer): - """Consumes messages from a fanout exchange.""" - - exchange_type = 'fanout' - - def __init__(self, connection=None, topic='broadcast', proxy=None): - self.exchange = '%s_fanout' % topic - self.routing_key = topic - unique = uuid.uuid4().hex - self.queue = '%s_fanout_%s' % (topic, unique) - self.durable = False - # Fanout creates unique queue names, so we should auto-remove - # them when done, so they're not left around on restart. - # Also, we're the only one that should be consuming. exclusive - # implies auto_delete, so we'll just set that.. - self.exclusive = True - LOG.info(_('Created "%(exchange)s" fanout exchange ' - 'with "%(key)s" routing key'), - dict(exchange=self.exchange, key=self.routing_key)) - super(FanoutAdapterConsumer, self).__init__(connection=connection, - topic=topic, proxy=proxy) - - -class ConsumerSet(object): - """Groups consumers to listen on together on a single connection.""" - - def __init__(self, connection, consumer_list): - self.consumer_list = set(consumer_list) - self.consumer_set = None - self.enabled = True - self.init(connection) - - def init(self, conn): - if not conn: - conn = Connection.instance(new=True) - if self.consumer_set: - self.consumer_set.close() - self.consumer_set = messaging.ConsumerSet(conn) - for consumer in self.consumer_list: - consumer.connection = conn - # consumer.backend is set for us - self.consumer_set.add_consumer(consumer) - - def reconnect(self): - self.init(None) - - def wait(self, limit=None): - running = True - while running: - it = self.consumer_set.iterconsume(limit=limit) - if not it: - break - while True: - try: - it.next() - except StopIteration: - return - except greenlet.GreenletExit: - running = False - break - except Exception as e: - LOG.exception(_("Exception while processing consumer")) - self.reconnect() - # Break to outer loop - break - - def close(self): - self.consumer_set.close() - - -class Publisher(messaging.Publisher): - """Publisher base class.""" - pass - - -class TopicPublisher(Publisher): - """Publishes messages on a specific topic.""" - - exchange_type = 'topic' - - def __init__(self, connection=None, topic='broadcast', durable=None): - self.routing_key = topic - self.exchange = FLAGS.control_exchange - self.durable = (FLAGS.rabbit_durable_queues if durable is None - else durable) - super(TopicPublisher, self).__init__(connection=connection) - - -class FanoutPublisher(Publisher): - """Publishes messages to a fanout exchange.""" - - exchange_type = 'fanout' - - def __init__(self, topic, connection=None): - self.exchange = '%s_fanout' % topic - self.queue = '%s_fanout' % topic - self.durable = False - self.auto_delete = True - LOG.info(_('Creating "%(exchange)s" fanout exchange'), - dict(exchange=self.exchange)) - super(FanoutPublisher, self).__init__(connection=connection) - - -class DirectConsumer(Consumer): - """Consumes messages directly on a channel specified by msg_id.""" - - exchange_type = 'direct' - - def __init__(self, connection=None, msg_id=None): - self.queue = msg_id - self.routing_key = msg_id - self.exchange = msg_id - self.durable = False - self.auto_delete = True - self.exclusive = True - super(DirectConsumer, self).__init__(connection=connection) - - -class DirectPublisher(Publisher): - """Publishes messages directly on a channel specified by msg_id.""" - - exchange_type = 'direct' - - def __init__(self, connection=None, msg_id=None): - self.routing_key = msg_id - self.exchange = msg_id - self.durable = False - self.auto_delete = True - super(DirectPublisher, self).__init__(connection=connection) - - -def msg_reply(msg_id, reply=None, failure=None, ending=False): - """Sends a reply or an error on the channel signified by msg_id. - - Failure should be a sys.exc_info() tuple. - - """ - if failure: - message = str(failure[1]) - tb = traceback.format_exception(*failure) - LOG.error(_("Returning exception %s to caller"), message) - LOG.error(tb) - failure = (failure[0].__name__, str(failure[1]), tb) - - with ConnectionPool.item() as conn: - publisher = DirectPublisher(connection=conn, msg_id=msg_id) - try: - msg = {'result': reply, 'failure': failure} - if ending: - msg['ending'] = True - publisher.send(msg) - except TypeError: - msg = {'result': dict((k, repr(v)) - for k, v in reply.__dict__.iteritems()), - 'failure': failure} - if ending: - msg['ending'] = True - publisher.send(msg) - - publisher.close() - - -def _unpack_context(msg): - """Unpack context from msg.""" - context_dict = {} - for key in list(msg.keys()): - # NOTE(vish): Some versions of python don't like unicode keys - # in kwargs. - key = str(key) - if key.startswith('_context_'): - value = msg.pop(key) - context_dict[key[9:]] = value - context_dict['msg_id'] = msg.pop('_msg_id', None) - ctx = RpcContext.from_dict(context_dict) - LOG.debug(_('unpacked context: %s'), ctx.to_dict()) - return ctx - - -def _pack_context(msg, context): - """Pack context into msg. - - Values for message keys need to be less than 255 chars, so we pull - context out into a bunch of separate keys. If we want to support - more arguments in rabbit messages, we may want to do the same - for args at some point. - - """ - context_d = dict([('_context_%s' % key, value) - for (key, value) in context.to_dict().iteritems()]) - msg.update(context_d) - - -class RpcContext(context.RequestContext): - def __init__(self, *args, **kwargs): - msg_id = kwargs.pop('msg_id', None) - self.msg_id = msg_id - super(RpcContext, self).__init__(*args, **kwargs) - - def reply(self, reply=None, failure=None, ending=False): - if self.msg_id: - msg_reply(self.msg_id, reply, failure, ending) - if ending: - self.msg_id = None - - -def multicall(context, topic, msg, timeout=None): - """Make a call that returns multiple times.""" - # NOTE(russellb): carrot doesn't support timeouts - LOG.debug(_('Making asynchronous call on %s ...'), topic) - msg_id = uuid.uuid4().hex - msg.update({'_msg_id': msg_id}) - LOG.debug(_('MSG_ID is %s') % (msg_id)) - _pack_context(msg, context) - - con_conn = ConnectionPool.get() - consumer = DirectConsumer(connection=con_conn, msg_id=msg_id) - wait_msg = MulticallWaiter(consumer) - consumer.register_callback(wait_msg) - - publisher = TopicPublisher(connection=con_conn, topic=topic) - publisher.send(msg) - publisher.close() - - return wait_msg - - -class MulticallWaiter(object): - def __init__(self, consumer): - self._consumer = consumer - self._results = queue.Queue() - self._closed = False - self._got_ending = False - - def close(self): - if self._closed: - return - self._closed = True - self._consumer.close() - ConnectionPool.put(self._consumer.connection) - - def __call__(self, data, message): - """Acks message and sets result.""" - message.ack() - if data['failure']: - self._results.put(rpc_common.RemoteError(*data['failure'])) - elif data.get('ending', False): - self._got_ending = True - else: - self._results.put(data['result']) - - def __iter__(self): - return self.wait() - - def wait(self): - while not self._closed: - try: - rv = self._consumer.fetch(enable_callbacks=True) - except Exception: - self.close() - raise - if rv is None: - time.sleep(0.01) - continue - if self._got_ending: - self.close() - raise StopIteration - result = self._results.get() - if isinstance(result, Exception): - self.close() - raise result - yield result - - -def create_connection(new=True): - """Create a connection""" - return Connection.instance(new=new) - - -def call(context, topic, msg, timeout=None): - """Sends a message on a topic and wait for a response.""" - rv = multicall(context, topic, msg, timeout) - # NOTE(vish): return the last result from the multicall - rv = list(rv) - if not rv: - return - return rv[-1] - - -def cast(context, topic, msg): - """Sends a message on a topic without waiting for a response.""" - LOG.debug(_('Making asynchronous cast on %s...'), topic) - _pack_context(msg, context) - with ConnectionPool.item() as conn: - publisher = TopicPublisher(connection=conn, topic=topic) - publisher.send(msg) - publisher.close() - - -def fanout_cast(context, topic, msg): - """Sends a message on a fanout exchange without waiting for a response.""" - LOG.debug(_('Making asynchronous fanout cast...')) - _pack_context(msg, context) - with ConnectionPool.item() as conn: - publisher = FanoutPublisher(topic, connection=conn) - publisher.send(msg) - publisher.close() - - -def notify(context, topic, msg): - """Sends a notification event on a topic.""" - LOG.debug(_('Sending notification on %s...'), topic) - _pack_context(msg, context) - with ConnectionPool.item() as conn: - publisher = TopicPublisher(connection=conn, topic=topic, - durable=True) - publisher.send(msg) - publisher.close() - - -def cleanup(): - pass - - -def generic_response(message_data, message): - """Logs a result and exits.""" - LOG.debug(_('response %s'), message_data) - message.ack() - sys.exit(0) - - -def send_message(topic, message, wait=True): - """Sends a message for testing.""" - msg_id = uuid.uuid4().hex - message.update({'_msg_id': msg_id}) - LOG.debug(_('topic is %s'), topic) - LOG.debug(_('message %s'), message) - - if wait: - consumer = messaging.Consumer(connection=Connection.instance(), - queue=msg_id, - exchange=msg_id, - auto_delete=True, - exchange_type='direct', - routing_key=msg_id) - consumer.register_callback(generic_response) - - publisher = messaging.Publisher(connection=Connection.instance(), - exchange=FLAGS.control_exchange, - durable=FLAGS.rabbit_durable_queues, - exchange_type='topic', - routing_key=topic) - publisher.send(message) - publisher.close() - - if wait: - consumer.wait() - consumer.close() - - -if __name__ == '__main__': - # You can send messages from the command line using - # topic and a json string representing a dictionary - # for the method - send_message(sys.argv[1], json.loads(sys.argv[2])) diff --git a/nova/test.py b/nova/test.py index e44ad57ea19c..c269b23f7b44 100644 --- a/nova/test.py +++ b/nova/test.py @@ -39,7 +39,6 @@ from nova import log as logging from nova.openstack.common import cfg from nova import utils from nova import service -from nova.testing.fake import rabbit from nova.tests import reset_db from nova.virt import fake @@ -149,10 +148,6 @@ class TestCase(unittest.TestCase): self.mox.VerifyAll() super(TestCase, self).tearDown() finally: - # Clean out fake_rabbit's queue if we used it - if FLAGS.fake_rabbit: - rabbit.reset_all() - if FLAGS.connection_type == 'fake': if hasattr(fake.FakeConnection, '_instance'): del fake.FakeConnection._instance diff --git a/nova/testing/fake/rabbit.py b/nova/testing/fake/rabbit.py deleted file mode 100644 index 316dc25099fe..000000000000 --- a/nova/testing/fake/rabbit.py +++ /dev/null @@ -1,153 +0,0 @@ -# vim: tabstop=4 shiftwidth=4 softtabstop=4 - -# Copyright 2010 United States Government as represented by the -# Administrator of the National Aeronautics and Space Administration. -# All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -"""Based a bit on the carrot.backends.queue backend... but a lot better.""" - -import Queue as queue - -from carrot.backends import base -from eventlet import greenthread - -from nova import log as logging - - -LOG = logging.getLogger(__name__) - - -EXCHANGES = {} -QUEUES = {} -CONSUMERS = {} - - -class Message(base.BaseMessage): - pass - - -class Exchange(object): - def __init__(self, name, exchange_type): - self.name = name - self.exchange_type = exchange_type - self._queue = queue.Queue() - self._routes = {} - - def publish(self, message, routing_key=None): - nm = self.name - LOG.debug(_('(%(nm)s) publish (key: %(routing_key)s)' - ' %(message)s') % locals()) - if routing_key in self._routes: - for f in self._routes[routing_key]: - LOG.debug(_('Publishing to route %s'), f) - f(message, routing_key=routing_key) - - def bind(self, callback, routing_key): - self._routes.setdefault(routing_key, []) - self._routes[routing_key].append(callback) - - -class Queue(object): - def __init__(self, name): - self.name = name - self._queue = queue.Queue() - - def __repr__(self): - return '' % self.name - - def push(self, message, routing_key=None): - self._queue.put(message) - - def size(self): - return self._queue.qsize() - - def pop(self): - return self._queue.get() - - -class Backend(base.BaseBackend): - def queue_declare(self, queue, **kwargs): - global QUEUES - if queue not in QUEUES: - LOG.debug(_('Declaring queue %s'), queue) - QUEUES[queue] = Queue(queue) - - def exchange_declare(self, exchange, type, *args, **kwargs): - global EXCHANGES - if exchange not in EXCHANGES: - LOG.debug(_('Declaring exchange %s'), exchange) - EXCHANGES[exchange] = Exchange(exchange, type) - - def queue_bind(self, queue, exchange, routing_key, **kwargs): - global EXCHANGES - global QUEUES - LOG.debug(_('Binding %(queue)s to %(exchange)s with' - ' key %(routing_key)s') % locals()) - EXCHANGES[exchange].bind(QUEUES[queue].push, routing_key) - - def declare_consumer(self, queue, callback, consumer_tag, *args, **kwargs): - global CONSUMERS - LOG.debug("Adding consumer %s", consumer_tag) - CONSUMERS[consumer_tag] = (queue, callback) - - def cancel(self, consumer_tag): - global CONSUMERS - LOG.debug("Removing consumer %s", consumer_tag) - del CONSUMERS[consumer_tag] - - def consume(self, limit=None): - global CONSUMERS - num = 0 - while True: - for (queue, callback) in CONSUMERS.itervalues(): - item = self.get(queue) - if item: - callback(item) - num += 1 - yield - if limit and num == limit: - raise StopIteration() - greenthread.sleep(0.1) - - def get(self, queue, no_ack=False): - global QUEUES - if not queue in QUEUES or not QUEUES[queue].size(): - return None - (message_data, content_type, content_encoding) = QUEUES[queue].pop() - message = Message(backend=self, body=message_data, - content_type=content_type, - content_encoding=content_encoding) - message.result = True - LOG.debug(_('Getting from %(queue)s: %(message)s') % locals()) - return message - - def prepare_message(self, message_data, delivery_mode, - content_type, content_encoding, **kwargs): - """Prepare message for sending.""" - return (message_data, content_type, content_encoding) - - def publish(self, message, exchange, routing_key, **kwargs): - global EXCHANGES - if exchange in EXCHANGES: - EXCHANGES[exchange].publish(message, routing_key=routing_key) - - -def reset_all(): - global EXCHANGES - global QUEUES - global CONSUMERS - EXCHANGES = {} - QUEUES = {} - CONSUMERS = {} diff --git a/nova/tests/rpc/test_carrot.py b/nova/tests/rpc/test_carrot.py deleted file mode 100644 index dae08e8e07e1..000000000000 --- a/nova/tests/rpc/test_carrot.py +++ /dev/null @@ -1,41 +0,0 @@ -# vim: tabstop=4 shiftwidth=4 softtabstop=4 - -# Copyright 2010 United States Government as represented by the -# Administrator of the National Aeronautics and Space Administration. -# All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. -""" -Unit Tests for remote procedure calls using carrot -""" - -from nova import log as logging -from nova.rpc import impl_carrot -from nova.tests.rpc import common - - -LOG = logging.getLogger(__name__) - - -class RpcCarrotTestCase(common.BaseRpcTestCase): - def setUp(self): - self.rpc = impl_carrot - super(RpcCarrotTestCase, self).setUp(supports_timeouts=False) - - def test_connectionpool_single(self): - """Test that ConnectionPool recycles a single connection.""" - conn1 = self.rpc.ConnectionPool.get() - self.rpc.ConnectionPool.put(conn1) - conn2 = self.rpc.ConnectionPool.get() - self.rpc.ConnectionPool.put(conn2) - self.assertEqual(conn1, conn2) diff --git a/tools/pip-requires b/tools/pip-requires index 8949428f2e18..77ceb9b5a350 100644 --- a/tools/pip-requires +++ b/tools/pip-requires @@ -3,7 +3,6 @@ Cheetah==2.4.4 amqplib==0.6.1 anyjson==0.2.4 boto==2.1.1 -carrot==0.10.5 eventlet kombu==1.0.4 lockfile==0.8