diff --git a/doc/source/transport.rst b/doc/source/transport.rst index 547198aa8..3449e9b7d 100644 --- a/doc/source/transport.rst +++ b/doc/source/transport.rst @@ -25,6 +25,4 @@ different 3rd party libraries that don't ensure that. In certain cases, with some drivers, it does work: * rabbit: works only if no connection have already been established. -* qpid: doesn't work (The qpid library has a global state that uses - file descriptors that can't be reset) * amqp1: works diff --git a/doc/source/zmq_driver.rst b/doc/source/zmq_driver.rst index 23c6d4d23..da4d0abc5 100644 --- a/doc/source/zmq_driver.rst +++ b/doc/source/zmq_driver.rst @@ -45,7 +45,7 @@ Juno release, as almost all the core projects in OpenStack have switched to oslo_messaging, ZeroMQ can be the only RPC driver across the OpenStack cluster. This document provides deployment information for this driver in oslo_messaging. -Other than AMQP-based drivers, like RabbitMQ or Qpid, ZeroMQ doesn't have +Other than AMQP-based drivers, like RabbitMQ, ZeroMQ doesn't have any central brokers in oslo.messaging, instead, each host (running OpenStack services) is both ZeroMQ client and server. As a result, each host needs to listen to a certain TCP port for incoming connections and directly connect @@ -172,7 +172,6 @@ The parameters for the script oslo-messaging-zmq-receiver should be:: You can specify ZeroMQ options in /etc/oslo/zeromq.conf if necessary. - Listening Address (optional) ---------------------------- @@ -204,7 +203,7 @@ DevStack Support ZeroMQ driver has been supported by DevStack. The configuration is as follows:: - ENABLED_SERVICES+=,-rabbit,-qpid,zeromq + ENABLED_SERVICES+=,-rabbit,zeromq ZEROMQ_MATCHMAKER=redis In local.conf [localrc] section need to enable zmq plugin which lives in diff --git a/oslo_messaging/_drivers/amqp.py b/oslo_messaging/_drivers/amqp.py index 55fb9d49f..2308b80d1 100644 --- a/oslo_messaging/_drivers/amqp.py +++ b/oslo_messaging/_drivers/amqp.py @@ -19,7 +19,7 @@ Shared code between AMQP based openstack.common.rpc implementations. The code in this module is shared between the rpc implementations based on -AMQP. Specifically, this includes impl_kombu and impl_qpid. impl_carrot also +AMQP. Specifically, this includes impl_kombu. impl_carrot also uses AMQP, but is deprecated and predates this code. """ @@ -66,7 +66,7 @@ amqp_opts = [ UNIQUE_ID = '_unique_id' LOG = logging.getLogger(__name__) -# NOTE(sileht): Even if rabbit/qpid have only one Connection class, +# NOTE(sileht): Even if rabbit has only one Connection class, # this connection can be used for two purposes: # * wait and receive amqp messages (only do read stuffs on the socket) # * send messages to the broker (only do write stuffs on the socket) diff --git a/oslo_messaging/_drivers/impl_qpid.py b/oslo_messaging/_drivers/impl_qpid.py deleted file mode 100644 index e0e901968..000000000 --- a/oslo_messaging/_drivers/impl_qpid.py +++ /dev/null @@ -1,800 +0,0 @@ -# Copyright 2011 OpenStack Foundation -# Copyright 2011 - 2012, Red Hat, Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -import functools -import itertools -import logging -import os -import random -import time -import warnings - -from oslo_config import cfg -from oslo_serialization import jsonutils -from oslo_utils import importutils -from oslo_utils import netutils -import six - -from oslo_messaging._drivers import amqp as rpc_amqp -from oslo_messaging._drivers import amqpdriver -from oslo_messaging._drivers import base -from oslo_messaging._drivers import common as rpc_common -from oslo_messaging._i18n import _ -from oslo_messaging._i18n import _LE -from oslo_messaging._i18n import _LI -from oslo_messaging import exceptions - -qpid_codec = importutils.try_import("qpid.codec010") -qpid_messaging = importutils.try_import("qpid.messaging") -qpid_exceptions = importutils.try_import("qpid.messaging.exceptions") - -LOG = logging.getLogger(__name__) - -qpid_opts = [ - cfg.StrOpt('qpid_hostname', - default='localhost', - deprecated_group='DEFAULT', - help='Qpid broker hostname.'), - cfg.IntOpt('qpid_port', - default=5672, - deprecated_group='DEFAULT', - help='Qpid broker port.'), - cfg.ListOpt('qpid_hosts', - default=['$qpid_hostname:$qpid_port'], - deprecated_group='DEFAULT', - help='Qpid HA cluster host:port pairs.'), - cfg.StrOpt('qpid_username', - default='', - deprecated_group='DEFAULT', - help='Username for Qpid connection.'), - cfg.StrOpt('qpid_password', - default='', - deprecated_group='DEFAULT', - help='Password for Qpid connection.', - secret=True), - cfg.StrOpt('qpid_sasl_mechanisms', - default='', - deprecated_group='DEFAULT', - help='Space separated list of SASL mechanisms to use for ' - 'auth.'), - cfg.IntOpt('qpid_heartbeat', - default=60, - deprecated_group='DEFAULT', - help='Seconds between connection keepalive heartbeats.'), - cfg.StrOpt('qpid_protocol', - default='tcp', - deprecated_group='DEFAULT', - help="Transport to use, either 'tcp' or 'ssl'."), - cfg.BoolOpt('qpid_tcp_nodelay', - default=True, - deprecated_group='DEFAULT', - help='Whether to disable the Nagle algorithm.'), - cfg.IntOpt('qpid_receiver_capacity', - default=1, - deprecated_group='DEFAULT', - help='The number of prefetched messages held by receiver.'), - # NOTE(russellb) If any additional versions are added (beyond 1 and 2), - # this file could probably use some additional refactoring so that the - # differences between each version are split into different classes. - cfg.IntOpt('qpid_topology_version', - default=1, - deprecated_group='DEFAULT', - help="The qpid topology version to use. Version 1 is what " - "was originally used by impl_qpid. Version 2 includes " - "some backwards-incompatible changes that allow broker " - "federation to work. Users should update to version 2 " - "when they are able to take everything down, as it " - "requires a clean break."), -] - -JSON_CONTENT_TYPE = 'application/json; charset=utf8' - - -def raise_invalid_topology_version(conf): - msg = (_("Invalid value for qpid_topology_version: %d") % - conf.qpid_topology_version) - LOG.error(msg) - raise Exception(msg) - - -class QpidMessage(dict): - def __init__(self, session, raw_message): - super(QpidMessage, self).__init__( - rpc_common.deserialize_msg(raw_message.content)) - self._raw_message = raw_message - self._session = session - - def acknowledge(self): - self._session.acknowledge(self._raw_message) - - def requeue(self): - pass - - -class ConsumerBase(object): - """Consumer base class.""" - - def __init__(self, conf, session, callback, node_name, node_opts, - link_name, link_opts): - """Declare a queue on an amqp session. - - 'session' is the amqp session to use - 'callback' is the callback to call when messages are received - 'node_name' is the first part of the Qpid address string, before ';' - 'node_opts' will be applied to the "x-declare" section of "node" - in the address string. - 'link_name' goes into the "name" field of the "link" in the address - string - 'link_opts' will be applied to the "x-declare" section of "link" - in the address string. - """ - self.callback = callback - self.receiver = None - self.rcv_capacity = conf.qpid_receiver_capacity - self.session = None - - if conf.qpid_topology_version == 1: - addr_opts = { - "create": "always", - "node": { - "type": "topic", - "x-declare": { - "durable": True, - "auto-delete": True, - }, - }, - "link": { - "durable": True, - "x-declare": { - "durable": False, - "auto-delete": True, - "exclusive": False, - }, - }, - } - addr_opts["node"]["x-declare"].update(node_opts) - elif conf.qpid_topology_version == 2: - addr_opts = { - "link": { - "x-declare": { - "auto-delete": True, - "exclusive": False, - }, - }, - } - else: - raise_invalid_topology_version(conf) - - addr_opts["link"]["x-declare"].update(link_opts) - if link_name: - addr_opts["link"]["name"] = link_name - - self.address = "%s ; %s" % (node_name, jsonutils.dumps(addr_opts)) - - self.connect(session) - - def connect(self, session): - """Declare the receiver on connect.""" - self._declare_receiver(session) - - def reconnect(self, session): - """Re-declare the receiver after a Qpid reconnect.""" - self._declare_receiver(session) - - def _declare_receiver(self, session): - self.session = session - self.receiver = session.receiver(self.address) - self.receiver.capacity = self.rcv_capacity - - def _unpack_json_msg(self, msg): - """Load the JSON data in msg if msg.content_type indicates that it - is necessary. Put the loaded data back into msg.content and - update msg.content_type appropriately. - - A Qpid Message containing a dict will have a content_type of - 'amqp/map', whereas one containing a string that needs to be converted - back from JSON will have a content_type of JSON_CONTENT_TYPE. - - :param msg: a Qpid Message object - :returns: None - """ - if msg.content_type == JSON_CONTENT_TYPE: - msg.content = jsonutils.loads(msg.content) - msg.content_type = 'amqp/map' - - def consume(self): - """Fetch the message and pass it to the callback object.""" - message = self.receiver.fetch() - try: - self._unpack_json_msg(message) - self.callback(QpidMessage(self.session, message)) - except Exception: - LOG.exception(_LE("Failed to process message... skipping it.")) - self.session.acknowledge(message) - - def get_receiver(self): - return self.receiver - - def get_node_name(self): - return self.address.split(';')[0] - - -class DirectConsumer(ConsumerBase): - """Queue/consumer class for 'direct'.""" - - def __init__(self, conf, session, msg_id, callback): - """Init a 'direct' queue. - - 'session' is the amqp session to use - 'msg_id' is the msg_id to listen on - 'callback' is the callback to call when messages are received - """ - - link_opts = { - "exclusive": True, - "durable": conf.amqp_durable_queues, - } - - if conf.qpid_topology_version == 1: - node_name = "%s/%s" % (msg_id, msg_id) - node_opts = {"type": "direct"} - link_name = msg_id - elif conf.qpid_topology_version == 2: - node_name = "amq.direct/%s" % msg_id - node_opts = {} - link_name = msg_id - else: - raise_invalid_topology_version(conf) - - super(DirectConsumer, self).__init__(conf, session, callback, - node_name, node_opts, link_name, - link_opts) - - -class TopicConsumer(ConsumerBase): - """Consumer class for 'topic'.""" - - def __init__(self, conf, session, topic, callback, exchange_name, - name=None): - """Init a 'topic' queue. - - :param session: the amqp session to use - :param topic: is the topic to listen on - :paramtype topic: str - :param callback: the callback to call when messages are received - :param name: optional queue name, defaults to topic - """ - - link_opts = { - "auto-delete": conf.amqp_auto_delete, - "durable": conf.amqp_durable_queues, - } - - if conf.qpid_topology_version == 1: - node_name = "%s/%s" % (exchange_name, topic) - elif conf.qpid_topology_version == 2: - node_name = "amq.topic/topic/%s/%s" % (exchange_name, topic) - else: - raise_invalid_topology_version(conf) - - super(TopicConsumer, self).__init__(conf, session, callback, node_name, - {}, name or topic, link_opts) - - -class FanoutConsumer(ConsumerBase): - """Consumer class for 'fanout'.""" - - def __init__(self, conf, session, topic, callback): - """Init a 'fanout' queue. - - 'session' is the amqp session to use - 'topic' is the topic to listen on - 'callback' is the callback to call when messages are received - """ - self.conf = conf - - link_opts = {"exclusive": True} - - if conf.qpid_topology_version == 1: - node_name = "%s_fanout" % topic - node_opts = {"durable": False, "type": "fanout"} - elif conf.qpid_topology_version == 2: - node_name = "amq.topic/fanout/%s" % topic - node_opts = {} - else: - raise_invalid_topology_version(conf) - - super(FanoutConsumer, self).__init__(conf, session, callback, - node_name, node_opts, None, - link_opts) - - -class Publisher(object): - """Base Publisher class.""" - - def __init__(self, conf, session, node_name, node_opts=None): - """Init the Publisher class with the exchange_name, routing_key, - and other options - """ - self.sender = None - self.session = session - - if conf.qpid_topology_version == 1: - addr_opts = { - "create": "always", - "node": { - "type": "topic", - "x-declare": { - "durable": False, - # auto-delete isn't implemented for exchanges in qpid, - # but put in here anyway - "auto-delete": True, - }, - }, - } - if node_opts: - addr_opts["node"]["x-declare"].update(node_opts) - - self.address = "%s ; %s" % (node_name, jsonutils.dumps(addr_opts)) - elif conf.qpid_topology_version == 2: - self.address = node_name - else: - raise_invalid_topology_version(conf) - - self.reconnect(session) - - def reconnect(self, session): - """Re-establish the Sender after a reconnection.""" - self.sender = session.sender(self.address) - - def _pack_json_msg(self, msg): - """Qpid cannot serialize dicts containing strings longer than 65535 - characters. This function dumps the message content to a JSON - string, which Qpid is able to handle. - - :param msg: May be either a Qpid Message object or a bare dict. - :returns: A Qpid Message with its content field JSON encoded. - """ - try: - msg.content = jsonutils.dumps(msg.content) - except AttributeError: - # Need to have a Qpid message so we can set the content_type. - msg = qpid_messaging.Message(jsonutils.dumps(msg)) - msg.content_type = JSON_CONTENT_TYPE - return msg - - def send(self, msg): - """Send a message.""" - try: - # Check if Qpid can encode the message - check_msg = msg - if not hasattr(check_msg, 'content_type'): - check_msg = qpid_messaging.Message(msg) - content_type = check_msg.content_type - enc, dec = qpid_messaging.message.get_codec(content_type) - enc(check_msg.content) - except qpid_codec.CodecException: - # This means the message couldn't be serialized as a dict. - msg = self._pack_json_msg(msg) - self.sender.send(msg) - - -class DirectPublisher(Publisher): - """Publisher class for 'direct'.""" - def __init__(self, conf, session, topic): - """Init a 'direct' publisher.""" - - if conf.qpid_topology_version == 1: - node_name = "%s/%s" % (topic, topic) - node_opts = {"type": "direct"} - elif conf.qpid_topology_version == 2: - node_name = "amq.direct/%s" % topic - node_opts = {} - else: - raise_invalid_topology_version(conf) - - super(DirectPublisher, self).__init__(conf, session, node_name, - node_opts) - - -class TopicPublisher(Publisher): - """Publisher class for 'topic'.""" - def __init__(self, conf, session, exchange_name, topic): - """Init a 'topic' publisher. - """ - if conf.qpid_topology_version == 1: - node_name = "%s/%s" % (exchange_name, topic) - elif conf.qpid_topology_version == 2: - node_name = "amq.topic/topic/%s/%s" % (exchange_name, topic) - else: - raise_invalid_topology_version(conf) - - super(TopicPublisher, self).__init__(conf, session, node_name) - - -class FanoutPublisher(Publisher): - """Publisher class for 'fanout'.""" - def __init__(self, conf, session, topic): - """Init a 'fanout' publisher. - """ - - if conf.qpid_topology_version == 1: - node_name = "%s_fanout" % topic - node_opts = {"type": "fanout"} - elif conf.qpid_topology_version == 2: - node_name = "amq.topic/fanout/%s" % topic - node_opts = {} - else: - raise_invalid_topology_version(conf) - - super(FanoutPublisher, self).__init__(conf, session, node_name, - node_opts) - - -class NotifyPublisher(Publisher): - """Publisher class for notifications.""" - def __init__(self, conf, session, exchange_name, topic): - """Init a 'topic' publisher. - """ - node_opts = {"durable": True} - - if conf.qpid_topology_version == 1: - node_name = "%s/%s" % (exchange_name, topic) - elif conf.qpid_topology_version == 2: - node_name = "amq.topic/topic/%s/%s" % (exchange_name, topic) - else: - raise_invalid_topology_version(conf) - - super(NotifyPublisher, self).__init__(conf, session, node_name, - node_opts) - - -class Connection(object): - """Connection object.""" - - pools = {} - - def __init__(self, conf, url, purpose): - if not qpid_messaging: - raise ImportError("Failed to import qpid.messaging") - - self.connection = None - self.session = None - self.consumers = {} - self.conf = conf - self.driver_conf = conf.oslo_messaging_qpid - - self._consume_loop_stopped = False - - self.brokers_params = [] - if url.hosts: - for host in url.hosts: - params = { - 'username': host.username or '', - 'password': host.password or '', - } - if host.port is not None: - params['host'] = '%s:%d' % (host.hostname, host.port) - else: - params['host'] = host.hostname - self.brokers_params.append(params) - else: - # Old configuration format - for adr in self.driver_conf.qpid_hosts: - hostname, port = netutils.parse_host_port( - adr, default_port=5672) - - if ':' in hostname: - hostname = '[' + hostname + ']' - - params = { - 'host': '%s:%d' % (hostname, port), - 'username': self.driver_conf.qpid_username, - 'password': self.driver_conf.qpid_password, - } - self.brokers_params.append(params) - - random.shuffle(self.brokers_params) - self.brokers = itertools.cycle(self.brokers_params) - - self._initial_pid = os.getpid() - self.reconnect() - - def _connect(self, broker): - # Create the connection - this does not open the connection - self.connection = qpid_messaging.Connection(broker['host']) - - # Check if flags are set and if so set them for the connection - # before we call open - self.connection.username = broker['username'] - self.connection.password = broker['password'] - - self.connection.sasl_mechanisms = self.driver_conf.qpid_sasl_mechanisms - # Reconnection is done by self.reconnect() - self.connection.reconnect = False - self.connection.heartbeat = self.driver_conf.qpid_heartbeat - self.connection.transport = self.driver_conf.qpid_protocol - self.connection.tcp_nodelay = self.driver_conf.qpid_tcp_nodelay - self.connection.open() - - def _register_consumer(self, consumer): - self.consumers[six.text_type(consumer.get_receiver())] = consumer - - def _lookup_consumer(self, receiver): - return self.consumers[six.text_type(receiver)] - - def _disconnect(self): - # Close the session if necessary - if self.connection is not None and self.connection.opened(): - try: - self.connection.close() - except qpid_exceptions.MessagingError: - pass - self.connection = None - - def reconnect(self, retry=None): - """Handles reconnecting and re-establishing sessions and queues. - Will retry up to retry number of times. - retry = None or -1 means to retry forever - retry = 0 means no retry - retry = N means N retries - """ - delay = 1 - attempt = 0 - loop_forever = False - if retry is None or retry < 0: - loop_forever = True - - while True: - self._disconnect() - - attempt += 1 - broker = six.next(self.brokers) - try: - self._connect(broker) - except qpid_exceptions.MessagingError as e: - msg_dict = dict(e=e, - delay=delay, - retry=retry, - broker=broker) - if not loop_forever and attempt > retry: - msg = _('Unable to connect to AMQP server on ' - '%(broker)s after %(retry)d ' - 'tries: %(e)s') % msg_dict - LOG.error(msg) - raise exceptions.MessageDeliveryFailure(msg) - else: - msg = _LE("Unable to connect to AMQP server on " - "%(broker)s: %(e)s. Sleeping %(delay)s seconds") - LOG.error(msg, msg_dict) - time.sleep(delay) - delay = min(delay + 1, 5) - else: - LOG.info(_LI('Connected to AMQP server on %s'), broker['host']) - break - - self.session = self.connection.session() - - if self.consumers: - consumers = self.consumers - self.consumers = {} - - for consumer in six.itervalues(consumers): - consumer.reconnect(self.session) - self._register_consumer(consumer) - - LOG.debug("Re-established AMQP queues") - - def ensure(self, error_callback, method, retry=None): - - current_pid = os.getpid() - if self._initial_pid != current_pid: - # NOTE(sileht): - # to get the same level of fork support that rabbit driver have - # (ie: allow fork before the first connection established) - # we could use the kombu workaround: - # https://github.com/celery/kombu/blob/master/kombu/transport/ - # qpid_patches.py#L67 - LOG.warn("Process forked! " - "This can result in unpredictable behavior. " - "See: http://docs.openstack.org/developer/" - "oslo_messaging/transport.html") - self._initial_pid = current_pid - - while True: - try: - return method() - except (qpid_exceptions.Empty, - qpid_exceptions.MessagingError) as e: - if error_callback: - error_callback(e) - self.reconnect(retry=retry) - - def close(self): - """Close/release this connection.""" - try: - self.connection.close() - except Exception: - # NOTE(dripton) Logging exceptions that happen during cleanup just - # causes confusion; there's really nothing useful we can do with - # them. - pass - self.connection = None - - def reset(self): - """Reset a connection so it can be used again.""" - self.session.close() - self.session = self.connection.session() - self.consumers = {} - - def declare_consumer(self, consumer_cls, topic, callback): - """Create a Consumer using the class that was passed in and - add it to our list of consumers - """ - def _connect_error(exc): - log_info = {'topic': topic, 'err_str': exc} - LOG.error(_LE("Failed to declare consumer for topic '%(topic)s': " - "%(err_str)s"), log_info) - - def _declare_consumer(): - consumer = consumer_cls(self.driver_conf, self.session, topic, - callback) - self._register_consumer(consumer) - return consumer - - return self.ensure(_connect_error, _declare_consumer) - - def consume(self, timeout=None): - """Consume from all queues/consumers.""" - - timer = rpc_common.DecayingTimer(duration=timeout) - timer.start() - - def _raise_timeout(exc): - LOG.debug('Timed out waiting for RPC response: %s', exc) - raise rpc_common.Timeout() - - def _error_callback(exc): - timer.check_return(_raise_timeout, exc) - LOG.exception(_LE('Failed to consume message from queue: %s'), exc) - - def _consume(): - # NOTE(sileht): - # maximum value chosen according the best practice from kombu: - # http://kombu.readthedocs.org/en/latest/reference/kombu.common.html#kombu.common.eventloop - poll_timeout = 1 if timeout is None else min(timeout, 1) - - while True: - if self._consume_loop_stopped: - self._consume_loop_stopped = False - return - - try: - nxt_receiver = self.session.next_receiver( - timeout=poll_timeout) - except qpid_exceptions.Empty as exc: - poll_timeout = timer.check_return(_raise_timeout, exc, - maximum=1) - else: - break - - try: - self._lookup_consumer(nxt_receiver).consume() - except Exception: - LOG.exception(_LE("Error processing message. " - "Skipping it.")) - - self.ensure(_error_callback, _consume) - - def publisher_send(self, cls, topic, msg, retry=None, **kwargs): - """Send to a publisher based on the publisher class.""" - - def _connect_error(exc): - log_info = {'topic': topic, 'err_str': exc} - LOG.exception(_LE("Failed to publish message to topic " - "'%(topic)s': %(err_str)s"), log_info) - - def _publisher_send(): - publisher = cls(self.driver_conf, self.session, topic=topic, - **kwargs) - publisher.send(msg) - - return self.ensure(_connect_error, _publisher_send, retry=retry) - - def declare_direct_consumer(self, topic, callback): - """Create a 'direct' queue. - In nova's use, this is generally a msg_id queue used for - responses for call/multicall - """ - self.declare_consumer(DirectConsumer, topic, callback) - - def declare_topic_consumer(self, exchange_name, topic, callback=None, - queue_name=None): - """Create a 'topic' consumer.""" - self.declare_consumer(functools.partial(TopicConsumer, - name=queue_name, - exchange_name=exchange_name, - ), - topic, callback) - - def declare_fanout_consumer(self, topic, callback): - """Create a 'fanout' consumer.""" - self.declare_consumer(FanoutConsumer, topic, callback) - - def direct_send(self, msg_id, msg): - """Send a 'direct' message.""" - self.publisher_send(DirectPublisher, topic=msg_id, msg=msg) - - def topic_send(self, exchange_name, topic, msg, timeout=None, retry=None): - """Send a 'topic' message.""" - # - # We want to create a message with attributes, for example a TTL. We - # don't really need to keep 'msg' in its JSON format any longer - # so let's create an actual Qpid message here and get some - # value-add on the go. - # - # WARNING: Request timeout happens to be in the same units as - # Qpid's TTL (seconds). If this changes in the future, then this - # will need to be altered accordingly. - # - qpid_message = qpid_messaging.Message(content=msg, ttl=timeout) - self.publisher_send(TopicPublisher, topic=topic, msg=qpid_message, - exchange_name=exchange_name, retry=retry) - - def fanout_send(self, topic, msg, retry=None): - """Send a 'fanout' message.""" - self.publisher_send(FanoutPublisher, topic=topic, msg=msg, retry=retry) - - def notify_send(self, exchange_name, topic, msg, retry=None, **kwargs): - """Send a notify message on a topic.""" - self.publisher_send(NotifyPublisher, topic=topic, msg=msg, - exchange_name=exchange_name, retry=retry) - - def stop_consuming(self): - self._consume_loop_stopped = True - - -class QpidDriver(amqpdriver.AMQPDriverBase): - """qpidd Driver - - .. deprecated:: 1.16 (Liberty) - """ - - def __init__(self, conf, url, - default_exchange=None, allowed_remote_exmods=None): - - warnings.warn(_('The Qpid driver has been deprecated. ' - 'The driver is planned to be removed during the ' - '`Mitaka` development cycle.'), - DeprecationWarning, stacklevel=2) - - opt_group = cfg.OptGroup(name='oslo_messaging_qpid', - title='QPID driver options') - conf.register_group(opt_group) - conf.register_opts(qpid_opts, group=opt_group) - conf.register_opts(rpc_amqp.amqp_opts, group=opt_group) - conf.register_opts(base.base_opts, group=opt_group) - - connection_pool = rpc_amqp.ConnectionPool( - conf, conf.oslo_messaging_qpid.rpc_conn_pool_size, - url, Connection) - - super(QpidDriver, self).__init__( - conf, url, - connection_pool, - default_exchange, - allowed_remote_exmods, - conf.oslo_messaging_qpid.send_single_reply, - ) diff --git a/oslo_messaging/conffixture.py b/oslo_messaging/conffixture.py index 1312d66cf..35b6e2a27 100644 --- a/oslo_messaging/conffixture.py +++ b/oslo_messaging/conffixture.py @@ -50,9 +50,6 @@ class ConfFixture(fixtures.Fixture): _import_opts(self.conf, 'oslo_messaging._drivers.amqp', 'amqp_opts', 'oslo_messaging_rabbit') - _import_opts(self.conf, - 'oslo_messaging._drivers.impl_qpid', 'qpid_opts', - 'oslo_messaging_qpid') _import_opts(self.conf, 'oslo_messaging._drivers.amqp', 'amqp_opts', 'oslo_messaging_qpid') @@ -77,7 +74,7 @@ class ConfFixture(fixtures.Fixture): @property def transport_driver(self): - """The transport driver - for example 'rabbit', 'qpid' or 'fake'.""" + """The transport driver - for example 'rabbit', 'amqp' or 'fake'.""" return self.conf.rpc_backend @transport_driver.setter diff --git a/oslo_messaging/notify/logger.py b/oslo_messaging/notify/logger.py index 6b96b58d2..b4e48df0f 100644 --- a/oslo_messaging/notify/logger.py +++ b/oslo_messaging/notify/logger.py @@ -33,7 +33,7 @@ class LoggingNotificationHandler(logging.Handler): [handler_notifier] class=oslo_messaging.LoggingNotificationHandler level=ERROR - args=('qpid:///') + args=('rabbit:///') """ diff --git a/oslo_messaging/opts.py b/oslo_messaging/opts.py index c5856595d..263c59f68 100644 --- a/oslo_messaging/opts.py +++ b/oslo_messaging/opts.py @@ -22,7 +22,6 @@ import itertools from oslo_messaging._drivers import amqp from oslo_messaging._drivers import base as drivers_base -from oslo_messaging._drivers import impl_qpid from oslo_messaging._drivers import impl_rabbit from oslo_messaging._drivers import impl_zmq from oslo_messaging._drivers.protocols.amqp import opts as amqp_opts @@ -48,8 +47,6 @@ _opts = [ ('oslo_messaging_amqp', amqp_opts.amqp1_opts), ('oslo_messaging_rabbit', list(itertools.chain(amqp.amqp_opts, impl_rabbit.rabbit_opts))), - ('oslo_messaging_qpid', list(itertools.chain(amqp.amqp_opts, - impl_qpid.qpid_opts))) ] diff --git a/oslo_messaging/tests/drivers/test_impl_qpid.py b/oslo_messaging/tests/drivers/test_impl_qpid.py deleted file mode 100644 index 2eb0bb244..000000000 --- a/oslo_messaging/tests/drivers/test_impl_qpid.py +++ /dev/null @@ -1,850 +0,0 @@ -# Copyright (C) 2014 eNovance SAS -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -import operator -import random -import threading -import time - -try: - import qpid -except ImportError: - qpid = None -from six.moves import _thread -import testscenarios -import testtools - -import oslo_messaging -from oslo_messaging._drivers import amqp -from oslo_messaging._drivers import impl_qpid as qpid_driver -from oslo_messaging.tests import utils as test_utils -from six.moves import mock - - -load_tests = testscenarios.load_tests_apply_scenarios - -QPID_BROKER = 'localhost:5672' - - -class TestQpidDriverLoad(test_utils.BaseTestCase): - - def setUp(self): - super(TestQpidDriverLoad, self).setUp() - self.messaging_conf.transport_driver = 'qpid' - - def test_driver_load(self): - transport = oslo_messaging.get_transport(self.conf) - self.assertIsInstance(transport._driver, qpid_driver.QpidDriver) - - -def _is_qpidd_service_running(): - - """this function checks if the qpid service is running or not.""" - - qpid_running = True - try: - broker = QPID_BROKER - connection = qpid.messaging.Connection(broker) - connection.open() - except Exception: - # qpid service is not running. - qpid_running = False - else: - connection.close() - - return qpid_running - - -class _QpidBaseTestCase(test_utils.BaseTestCase): - - @testtools.skipIf(qpid is None, "qpid not available") - def setUp(self): - super(_QpidBaseTestCase, self).setUp() - self.messaging_conf.transport_driver = 'qpid' - self.fake_qpid = not _is_qpidd_service_running() - - if self.fake_qpid: - self.session_receive = get_fake_qpid_session() - self.session_send = get_fake_qpid_session() - else: - self.broker = QPID_BROKER - # create connection from the qpid.messaging - # connection for the Consumer. - self.con_receive = qpid.messaging.Connection(self.broker) - self.con_receive.open() - # session to receive the messages - self.session_receive = self.con_receive.session() - - # connection for sending the message - self.con_send = qpid.messaging.Connection(self.broker) - self.con_send.open() - # session to send the messages - self.session_send = self.con_send.session() - - # list to store the expected messages and - # the actual received messages - self._expected = [] - self._messages = [] - self.initialized = True - - def tearDown(self): - super(_QpidBaseTestCase, self).tearDown() - - if self.initialized: - if self.fake_qpid: - _fake_session.flush_exchanges() - else: - self.con_receive.close() - self.con_send.close() - - -class TestQpidTransportURL(_QpidBaseTestCase): - - scenarios = [ - ('none', dict(url=None, - expected=[dict(host='localhost:5672', - username='', - password='')])), - ('empty', - dict(url='qpid:///', - expected=[dict(host='localhost:5672', - username='', - password='')])), - ('localhost', - dict(url='qpid://localhost/', - expected=[dict(host='localhost', - username='', - password='')])), - ('no_creds', - dict(url='qpid://host/', - expected=[dict(host='host', - username='', - password='')])), - ('no_port', - dict(url='qpid://user:password@host/', - expected=[dict(host='host', - username='user', - password='password')])), - ('full_url', - dict(url='qpid://user:password@host:10/', - expected=[dict(host='host:10', - username='user', - password='password')])), - ('full_two_url', - dict(url='qpid://user:password@host:10,' - 'user2:password2@host2:12/', - expected=[dict(host='host:10', - username='user', - password='password'), - dict(host='host2:12', - username='user2', - password='password2') - ] - )), - - ] - - @mock.patch.object(qpid_driver.Connection, 'reconnect') - def test_transport_url(self, *args): - transport = oslo_messaging.get_transport(self.conf, self.url) - self.addCleanup(transport.cleanup) - driver = transport._driver - - brokers_params = driver._get_connection().brokers_params - self.assertEqual(sorted(self.expected, - key=operator.itemgetter('host')), - sorted(brokers_params, - key=operator.itemgetter('host'))) - - -class TestQpidInvalidTopologyVersion(_QpidBaseTestCase): - """Unit test cases to test invalid qpid topology version.""" - - scenarios = [ - ('direct', dict(consumer_cls=qpid_driver.DirectConsumer, - consumer_kwargs={}, - publisher_cls=qpid_driver.DirectPublisher, - publisher_kwargs={})), - ('topic', dict(consumer_cls=qpid_driver.TopicConsumer, - consumer_kwargs={'exchange_name': 'openstack'}, - publisher_cls=qpid_driver.TopicPublisher, - publisher_kwargs={'exchange_name': 'openstack'})), - ('fanout', dict(consumer_cls=qpid_driver.FanoutConsumer, - consumer_kwargs={}, - publisher_cls=qpid_driver.FanoutPublisher, - publisher_kwargs={})), - ] - - def setUp(self): - super(TestQpidInvalidTopologyVersion, self).setUp() - self.config(qpid_topology_version=-1, - group='oslo_messaging_qpid') - - def test_invalid_topology_version(self): - def consumer_callback(msg): - pass - - msgid_or_topic = 'test' - - # not using self.assertRaises because - # 1. qpid driver raises Exception(msg) for invalid topology version - # 2. flake8 - H202 assertRaises Exception too broad - exception_msg = ("Invalid value for qpid_topology_version: %d" % - self.conf.oslo_messaging_qpid.qpid_topology_version) - recvd_exc_msg = '' - - try: - self.consumer_cls(self.conf.oslo_messaging_qpid, - self.session_receive, - msgid_or_topic, - consumer_callback, - **self.consumer_kwargs) - except Exception as e: - recvd_exc_msg = e.message - - self.assertEqual(exception_msg, recvd_exc_msg) - - recvd_exc_msg = '' - try: - self.publisher_cls(self.conf.oslo_messaging_qpid, - self.session_send, - topic=msgid_or_topic, - **self.publisher_kwargs) - except Exception as e: - recvd_exc_msg = e.message - - self.assertEqual(exception_msg, recvd_exc_msg) - - -class TestQpidDirectConsumerPublisher(_QpidBaseTestCase): - """Unit test cases to test DirectConsumer and Direct Publisher.""" - - _n_qpid_topology = [ - ('v1', dict(qpid_topology=1)), - ('v2', dict(qpid_topology=2)), - ] - - _n_msgs = [ - ('single', dict(no_msgs=1)), - ('multiple', dict(no_msgs=10)), - ] - - @classmethod - def generate_scenarios(cls): - cls.scenarios = testscenarios.multiply_scenarios(cls._n_qpid_topology, - cls._n_msgs) - - def consumer_callback(self, msg): - # This function will be called by the DirectConsumer - # when any message is received. - # Append the received message into the messages list - # so that the received messages can be validated - # with the expected messages - if isinstance(msg, dict): - self._messages.append(msg['content']) - else: - self._messages.append(msg) - - def test_qpid_direct_consumer_producer(self): - self.msgid = str(random.randint(1, 100)) - - # create a DirectConsumer and DirectPublisher class objects - self.dir_cons = qpid_driver.DirectConsumer( - self.conf.oslo_messaging_qpid, - self.session_receive, - self.msgid, - self.consumer_callback) - self.dir_pub = qpid_driver.DirectPublisher( - self.conf.oslo_messaging_qpid, - self.session_send, - self.msgid) - - def try_send_msg(no_msgs): - for i in range(no_msgs): - self._expected.append(str(i)) - snd_msg = {'content_type': 'text/plain', 'content': str(i)} - self.dir_pub.send(snd_msg) - - def try_receive_msg(no_msgs): - for i in range(no_msgs): - self.dir_cons.consume() - - thread1 = threading.Thread(target=try_receive_msg, - args=(self.no_msgs,)) - thread2 = threading.Thread(target=try_send_msg, - args=(self.no_msgs,)) - - thread1.start() - thread2.start() - thread1.join() - thread2.join() - - self.assertEqual(self.no_msgs, len(self._messages)) - self.assertEqual(self._expected, self._messages) - - -TestQpidDirectConsumerPublisher.generate_scenarios() - - -class TestQpidTopicAndFanout(_QpidBaseTestCase): - """Unit Test cases to test TopicConsumer and - TopicPublisher classes of the qpid driver - and FanoutConsumer and FanoutPublisher classes - of the qpid driver - """ - - _n_qpid_topology = [ - ('v1', dict(qpid_topology=1)), - ('v2', dict(qpid_topology=2)), - ] - - _n_msgs = [ - ('single', dict(no_msgs=1)), - ('multiple', dict(no_msgs=10)), - ] - - _n_senders = [ - ('single', dict(no_senders=1)), - ('multiple', dict(no_senders=10)), - ] - - _n_receivers = [ - ('single', dict(no_receivers=1)), - ] - _exchange_class = [ - ('topic', dict(consumer_cls=qpid_driver.TopicConsumer, - consumer_kwargs={'exchange_name': 'openstack'}, - publisher_cls=qpid_driver.TopicPublisher, - publisher_kwargs={'exchange_name': 'openstack'}, - topic='topictest.test', - receive_topic='topictest.test')), - ('fanout', dict(consumer_cls=qpid_driver.FanoutConsumer, - consumer_kwargs={}, - publisher_cls=qpid_driver.FanoutPublisher, - publisher_kwargs={}, - topic='fanouttest', - receive_topic='fanouttest')), - ] - - @classmethod - def generate_scenarios(cls): - cls.scenarios = testscenarios.multiply_scenarios(cls._n_qpid_topology, - cls._n_msgs, - cls._n_senders, - cls._n_receivers, - cls._exchange_class) - - def setUp(self): - super(TestQpidTopicAndFanout, self).setUp() - - # to store the expected messages and the - # actual received messages - # - # NOTE(dhellmann): These are dicts, where the base class uses - # lists. - self._expected = {} - self._messages = {} - - self._senders = [] - self._receivers = [] - - self._sender_threads = [] - self._receiver_threads = [] - - def consumer_callback(self, msg): - """callback function called by the ConsumerBase class of - qpid driver. - Message will be received in the format x-y - where x is the sender id and y is the msg number of the sender - extract the sender id 'x' and store the msg 'x-y' with 'x' as - the key - """ - - if isinstance(msg, dict): - msgcontent = msg['content'] - else: - msgcontent = msg - - splitmsg = msgcontent.split('-') - key = _thread.get_ident() - - if key not in self._messages: - self._messages[key] = dict() - - tdict = self._messages[key] - - if splitmsg[0] not in tdict: - tdict[splitmsg[0]] = [] - - tdict[splitmsg[0]].append(msgcontent) - - def _try_send_msg(self, sender_id, no_msgs): - for i in range(no_msgs): - sendmsg = '%s-%s' % (str(sender_id), str(i)) - key = str(sender_id) - # Store the message in the self._expected for each sender. - # This will be used later to - # validate the test by comparing it with the - # received messages by all the receivers - if key not in self._expected: - self._expected[key] = [] - self._expected[key].append(sendmsg) - send_dict = {'content_type': 'text/plain', 'content': sendmsg} - self._senders[sender_id].send(send_dict) - - def _try_receive_msg(self, receiver_id, no_msgs): - for i in range(self.no_senders * no_msgs): - no_of_attempts = 0 - - # ConsumerBase.consume blocks indefinitely until a message - # is received. - # So qpid_receiver.available() is called before calling - # ConsumerBase.consume() so that we are not - # blocked indefinitely - qpid_receiver = self._receivers[receiver_id].get_receiver() - while no_of_attempts < 50: - if qpid_receiver.available() > 0: - self._receivers[receiver_id].consume() - break - no_of_attempts += 1 - time.sleep(0.05) - - def test_qpid_topic_and_fanout(self): - for receiver_id in range(self.no_receivers): - consumer = self.consumer_cls(self.conf.oslo_messaging_qpid, - self.session_receive, - self.receive_topic, - self.consumer_callback, - **self.consumer_kwargs) - self._receivers.append(consumer) - - # create receivers threads - thread = threading.Thread(target=self._try_receive_msg, - args=(receiver_id, self.no_msgs,)) - self._receiver_threads.append(thread) - - for sender_id in range(self.no_senders): - publisher = self.publisher_cls(self.conf.oslo_messaging_qpid, - self.session_send, - topic=self.topic, - **self.publisher_kwargs) - self._senders.append(publisher) - - # create sender threads - thread = threading.Thread(target=self._try_send_msg, - args=(sender_id, self.no_msgs,)) - self._sender_threads.append(thread) - - for thread in self._receiver_threads: - thread.start() - - for thread in self._sender_threads: - thread.start() - - for thread in self._receiver_threads: - thread.join() - - for thread in self._sender_threads: - thread.join() - - # Each receiver should receive all the messages sent by - # the sender(s). - # So, Iterate through each of the receiver items in - # self._messages and compare with the expected messages - # messages. - - self.assertEqual(self.no_senders, len(self._expected)) - self.assertEqual(self.no_receivers, len(self._messages)) - - for key, messages in self._messages.iteritems(): - self.assertEqual(self._expected, messages) - -TestQpidTopicAndFanout.generate_scenarios() - - -class AddressNodeMatcher(object): - def __init__(self, node): - self.node = node - - def __eq__(self, address): - return address.split(';')[0].strip() == self.node - - -class TestDriverInterface(_QpidBaseTestCase): - """Unit Test cases to test the amqpdriver with qpid - """ - - def setUp(self): - super(TestDriverInterface, self).setUp() - self.config(qpid_topology_version=2, - group='oslo_messaging_qpid') - transport = oslo_messaging.get_transport(self.conf) - self.driver = transport._driver - - original_get_connection = self.driver._get_connection - p = mock.patch.object(self.driver, '_get_connection', - side_effect=lambda pooled=True: - original_get_connection(False)) - p.start() - self.addCleanup(p.stop) - - def test_listen_and_direct_send(self): - target = oslo_messaging.Target(exchange="exchange_test", - topic="topic_test", - server="server_test") - - with mock.patch('qpid.messaging.Connection') as conn_cls: - conn = conn_cls.return_value - session = conn.session.return_value - session.receiver.side_effect = [mock.Mock(), mock.Mock(), - mock.Mock()] - - listener = self.driver.listen(target) - listener.conn.direct_send("msg_id", {}) - - self.assertEqual(3, len(listener.conn.consumers)) - - expected_calls = [ - mock.call(AddressNodeMatcher( - 'amq.topic/topic/exchange_test/topic_test')), - mock.call(AddressNodeMatcher( - 'amq.topic/topic/exchange_test/topic_test.server_test')), - mock.call(AddressNodeMatcher('amq.topic/fanout/topic_test')), - ] - session.receiver.assert_has_calls(expected_calls) - session.sender.assert_called_with( - AddressNodeMatcher("amq.direct/msg_id")) - - def test_send(self): - target = oslo_messaging.Target(exchange="exchange_test", - topic="topic_test", - server="server_test") - with mock.patch('qpid.messaging.Connection') as conn_cls: - conn = conn_cls.return_value - session = conn.session.return_value - - self.driver.send(target, {}, {}) - session.sender.assert_called_with(AddressNodeMatcher( - "amq.topic/topic/exchange_test/topic_test.server_test")) - - def test_send_notification(self): - target = oslo_messaging.Target(exchange="exchange_test", - topic="topic_test.info") - with mock.patch('qpid.messaging.Connection') as conn_cls: - conn = conn_cls.return_value - session = conn.session.return_value - - self.driver.send_notification(target, {}, {}, "2.0") - session.sender.assert_called_with(AddressNodeMatcher( - "amq.topic/topic/exchange_test/topic_test.info")) - - -class TestQpidReconnectOrder(test_utils.BaseTestCase): - """Unit Test cases to test reconnection - """ - - @testtools.skipIf(qpid is None, "qpid not available") - def test_reconnect_order(self): - brokers = ['host1', 'host2', 'host3', 'host4', 'host5'] - brokers_count = len(brokers) - - self.config(qpid_hosts=brokers, - group='oslo_messaging_qpid') - - with mock.patch('qpid.messaging.Connection') as conn_mock: - # starting from the first broker in the list - url = oslo_messaging.TransportURL.parse(self.conf, None) - connection = qpid_driver.Connection(self.conf, url, - amqp.PURPOSE_SEND) - - # reconnect will advance to the next broker, one broker per - # attempt, and then wrap to the start of the list once the end is - # reached - for _ in range(brokers_count): - connection.reconnect() - - expected = [] - for broker in brokers: - expected.extend([mock.call("%s:5672" % broker), - mock.call().open(), - mock.call().session(), - mock.call().opened(), - mock.call().opened().__nonzero__(), - mock.call().close()]) - - conn_mock.assert_has_calls(expected, any_order=True) - - -def synchronized(func): - func.__lock__ = threading.Lock() - - def synced_func(*args, **kws): - with func.__lock__: - return func(*args, **kws) - - return synced_func - - -class FakeQpidMsgManager(object): - def __init__(self): - self._exchanges = {} - - @synchronized - def add_exchange(self, exchange): - if exchange not in self._exchanges: - self._exchanges[exchange] = {'msgs': [], 'consumers': {}} - - @synchronized - def add_exchange_consumer(self, exchange, consumer_id): - exchange_info = self._exchanges[exchange] - cons_dict = exchange_info['consumers'] - cons_dict[consumer_id] = 0 - - @synchronized - def add_exchange_msg(self, exchange, msg): - exchange_info = self._exchanges[exchange] - exchange_info['msgs'].append(msg) - - def get_exchange_msg(self, exchange, index): - exchange_info = self._exchanges[exchange] - return exchange_info['msgs'][index] - - def get_no_exch_msgs(self, exchange): - exchange_info = self._exchanges[exchange] - return len(exchange_info['msgs']) - - def get_exch_cons_index(self, exchange, consumer_id): - exchange_info = self._exchanges[exchange] - cons_dict = exchange_info['consumers'] - return cons_dict[consumer_id] - - @synchronized - def inc_consumer_index(self, exchange, consumer_id): - exchange_info = self._exchanges[exchange] - cons_dict = exchange_info['consumers'] - cons_dict[consumer_id] += 1 - -_fake_qpid_msg_manager = FakeQpidMsgManager() - - -class FakeQpidSessionSender(object): - def __init__(self, session, id, target, options): - self.session = session - self.id = id - self.target = target - self.options = options - - @synchronized - def send(self, object, sync=True, timeout=None): - _fake_qpid_msg_manager.add_exchange_msg(self.target, object) - - def close(self, timeout=None): - pass - - -class FakeQpidSessionReceiver(object): - - def __init__(self, session, id, source, options): - self.session = session - self.id = id - self.source = source - self.options = options - - @synchronized - def fetch(self, timeout=None): - if timeout is None: - # if timeout is not given, take a default time out - # of 30 seconds to avoid indefinite loop - _timeout = 30 - else: - _timeout = timeout - - deadline = time.time() + _timeout - while time.time() <= deadline: - index = _fake_qpid_msg_manager.get_exch_cons_index(self.source, - self.id) - try: - msg = _fake_qpid_msg_manager.get_exchange_msg(self.source, - index) - except IndexError: - pass - else: - _fake_qpid_msg_manager.inc_consumer_index(self.source, - self.id) - return qpid.messaging.Message(msg) - time.sleep(0.050) - - if timeout is None: - raise Exception('timed out waiting for reply') - - def close(self, timeout=None): - pass - - @synchronized - def available(self): - no_msgs = _fake_qpid_msg_manager.get_no_exch_msgs(self.source) - index = _fake_qpid_msg_manager.get_exch_cons_index(self.source, - self.id) - if no_msgs == 0 or index >= no_msgs: - return 0 - else: - return no_msgs - index - - -class FakeQpidSession(object): - - def __init__(self, connection=None, name=None, transactional=None): - self.connection = connection - self.name = name - self.transactional = transactional - self._receivers = {} - self.conf = None - self.url = None - self._senders = {} - self._sender_id = 0 - self._receiver_id = 0 - - @synchronized - def sender(self, target, **options): - exchange_key = self._extract_exchange_key(target) - _fake_qpid_msg_manager.add_exchange(exchange_key) - - sendobj = FakeQpidSessionSender(self, self._sender_id, - exchange_key, options) - self._senders[self._sender_id] = sendobj - self._sender_id = self._sender_id + 1 - return sendobj - - @synchronized - def receiver(self, source, **options): - exchange_key = self._extract_exchange_key(source) - _fake_qpid_msg_manager.add_exchange(exchange_key) - recvobj = FakeQpidSessionReceiver(self, self._receiver_id, - exchange_key, options) - self._receivers[self._receiver_id] = recvobj - _fake_qpid_msg_manager.add_exchange_consumer(exchange_key, - self._receiver_id) - self._receiver_id += 1 - return recvobj - - def acknowledge(self, message=None, disposition=None, sync=True): - pass - - @synchronized - def flush_exchanges(self): - _fake_qpid_msg_manager._exchanges = {} - - def _extract_exchange_key(self, exchange_msg): - """This function extracts a unique key for the exchange. - This key is used in the dictionary as a 'key' for - this exchange. - Eg. if the exchange_msg (for qpid topology version 1) - is 33/33 ; {"node": {"x-declare": {"auto-delete": true, .... - then 33 is returned as the key. - Eg 2. For topology v2, if the - exchange_msg is - amq.direct/44 ; {"link": {"x-dec....... - then 44 is returned - """ - # first check for ';' - semicolon_split = exchange_msg.split(';') - - # split the first item of semicolon_split with '/' - slash_split = semicolon_split[0].split('/') - # return the last element of the list as the key - key = slash_split[-1] - return key.strip() - - def close(self): - pass - -_fake_session = FakeQpidSession() - - -def get_fake_qpid_session(): - return _fake_session - - -class QPidHATestCase(test_utils.BaseTestCase): - - @testtools.skipIf(qpid is None, "qpid not available") - def setUp(self): - super(QPidHATestCase, self).setUp() - self.brokers = ['host1', 'host2', 'host3', 'host4', 'host5'] - - self.config(qpid_hosts=self.brokers, - qpid_username=None, - qpid_password=None, - group='oslo_messaging_qpid') - - hostname_sets = set() - self.info = {'attempt': 0, - 'fail': False} - - def _connect(myself, broker): - # do as little work that is enough to pass connection attempt - myself.connection = mock.Mock() - hostname = broker['host'] - self.assertNotIn(hostname, hostname_sets) - hostname_sets.add(hostname) - - self.info['attempt'] += 1 - if self.info['fail']: - raise qpid.messaging.exceptions.ConnectionError - - # just make sure connection instantiation does not fail with an - # exception - self.stubs.Set(qpid_driver.Connection, '_connect', _connect) - - # starting from the first broker in the list - url = oslo_messaging.TransportURL.parse(self.conf, None) - self.connection = qpid_driver.Connection(self.conf, url, - amqp.PURPOSE_SEND) - self.addCleanup(self.connection.close) - - self.info.update({'attempt': 0, - 'fail': True}) - hostname_sets.clear() - - def test_reconnect_order(self): - self.assertRaises(oslo_messaging.MessageDeliveryFailure, - self.connection.reconnect, - retry=len(self.brokers) - 1) - self.assertEqual(len(self.brokers), self.info['attempt']) - - def test_ensure_four_retries(self): - mock_callback = mock.Mock( - side_effect=qpid.messaging.exceptions.ConnectionError) - self.assertRaises(oslo_messaging.MessageDeliveryFailure, - self.connection.ensure, None, mock_callback, - retry=4) - self.assertEqual(5, self.info['attempt']) - self.assertEqual(1, mock_callback.call_count) - - def test_ensure_one_retry(self): - mock_callback = mock.Mock( - side_effect=qpid.messaging.exceptions.ConnectionError) - self.assertRaises(oslo_messaging.MessageDeliveryFailure, - self.connection.ensure, None, mock_callback, - retry=1) - self.assertEqual(2, self.info['attempt']) - self.assertEqual(1, mock_callback.call_count) - - def test_ensure_no_retry(self): - mock_callback = mock.Mock( - side_effect=qpid.messaging.exceptions.ConnectionError) - self.assertRaises(oslo_messaging.MessageDeliveryFailure, - self.connection.ensure, None, mock_callback, - retry=0) - self.assertEqual(1, self.info['attempt']) - self.assertEqual(1, mock_callback.call_count) diff --git a/oslo_messaging/tests/functional/gate/post_test_hook.sh b/oslo_messaging/tests/functional/gate/post_test_hook.sh index 276129cdd..23ee6ab48 100755 --- a/oslo_messaging/tests/functional/gate/post_test_hook.sh +++ b/oslo_messaging/tests/functional/gate/post_test_hook.sh @@ -46,10 +46,6 @@ case $RPC_BACKEND in sudo apt-get update -y sudo apt-get install -y redis-server python-redis ;; - qpid) - sudo apt-get update -y - sudo apt-get install -y qpidd sasl2-bin - ;; amqp1) sudo yum install -y qpid-cpp-server qpid-proton-c-devel python-qpid-proton cyrus-sasl-lib cyrus-sasl-plain ;; diff --git a/oslo_messaging/tests/test_opts.py b/oslo_messaging/tests/test_opts.py index e16145b94..931ded80f 100644 --- a/oslo_messaging/tests/test_opts.py +++ b/oslo_messaging/tests/test_opts.py @@ -32,14 +32,13 @@ class OptsTestCase(test_utils.BaseTestCase): super(OptsTestCase, self).setUp() def _test_list_opts(self, result): - self.assertEqual(5, len(result)) + self.assertEqual(4, len(result)) groups = [g for (g, l) in result] self.assertIn(None, groups) self.assertIn('matchmaker_redis', groups) self.assertIn('oslo_messaging_amqp', groups) self.assertIn('oslo_messaging_rabbit', groups) - self.assertIn('oslo_messaging_qpid', groups) opt_names = [o.name for (g, l) in result for o in l] self.assertIn('rpc_backend', opt_names) diff --git a/oslo_messaging/transport.py b/oslo_messaging/transport.py index 6fb3c8e42..144d1a7f3 100644 --- a/oslo_messaging/transport.py +++ b/oslo_messaging/transport.py @@ -43,7 +43,7 @@ _transport_opts = [ cfg.StrOpt('rpc_backend', default='rabbit', help='The messaging driver to use, defaults to rabbit. Other ' - 'drivers include qpid and zmq.'), + 'drivers include amqp and zmq.'), cfg.StrOpt('control_exchange', default='openstack', help='The default exchange under which topics are scoped. May ' @@ -232,7 +232,7 @@ class TransportURL(object): :param conf: a ConfigOpts instance :type conf: oslo.config.cfg.ConfigOpts - :param transport: a transport name for example 'rabbit' or 'qpid' + :param transport: a transport name for example 'rabbit' :type transport: str :param virtual_host: a virtual host path for example '/' :type virtual_host: str diff --git a/setup.cfg b/setup.cfg index ee63dc5bb..cbed37743 100644 --- a/setup.cfg +++ b/setup.cfg @@ -26,7 +26,6 @@ console_scripts = oslo.messaging.drivers = rabbit = oslo_messaging._drivers.impl_rabbit:RabbitDriver - qpid = oslo_messaging._drivers.impl_qpid:QpidDriver zmq = oslo_messaging._drivers.impl_zmq:ZmqDriver amqp = oslo_messaging._drivers.protocols.amqp.driver:ProtonDriver diff --git a/test-requirements.txt b/test-requirements.txt index 0c0f4e884..89cda423c 100644 --- a/test-requirements.txt +++ b/test-requirements.txt @@ -15,9 +15,6 @@ testscenarios>=0.4 testtools>=1.4.0 oslotest>=1.10.0 # Apache-2.0 -# for test_qpid -qpid-python;python_version=='2.7' - # for test_matchmaker_redis redis>=2.10.0 diff --git a/tox.ini b/tox.ini index 8f110014d..648962712 100644 --- a/tox.ini +++ b/tox.ini @@ -22,10 +22,6 @@ commands = {posargs} [testenv:docs] commands = python setup.py build_sphinx -[testenv:py27-func-qpid] -setenv = TRANSPORT_URL=qpid://stackqpid:secretqpid@127.0.0.1:65123// -commands = {toxinidir}/setup-test-env-qpid.sh 0-10 python setup.py testr --slowest --testr-args='oslo_messaging.tests.functional' - [testenv:py27-func-rabbit] commands = {toxinidir}/setup-test-env-rabbit.sh python setup.py testr --slowest --testr-args='oslo_messaging.tests.functional'