Remove deprecated AMQP1 driver

The AMQP1 driver has been deprecated for several years, since 14.4.0 and
is no longer maintained or widely used. This commit removes the AMQP1
driver from the code base.

References:
- Original deprecation commit: 0f63c227f5

Change-Id: Iaeb52791008a7a6736c99459f66d2bdbb2dea17b
This commit is contained in:
Daniel Bengtsson 2024-11-05 12:14:17 +01:00 committed by Takashi Kajinami
parent a9c03bdf1e
commit 7997a199ac
20 changed files with 16 additions and 6931 deletions

View File

@ -10,7 +10,7 @@
parent: openstack-tox-py310
vars:
tox_envlist: py310-func-scenario02
bindep_profile: rabbit kafka amqp1
bindep_profile: rabbit kafka
# Begin v3 native jobs
# See https://docs.openstack.org/devstack/latest/
@ -34,22 +34,6 @@
Run full tempest tests against rabbitmq
parent: oslo.messaging-devstack-tempest-full-base
- job:
name: oslo.messaging-src-dsvm-full-amqp1-hybrid
description: |
Run the full tempest tests using the AMQP 1.0 driver for RPC and
RabbitMQ for Notifications.
parent: oslo.messaging-devstack-tempest-full-base
required-projects:
- openstack/devstack-plugin-amqp1
vars:
devstack_localrc:
AMQP1_SERVICE: qpid-hybrid
devstack_plugins:
devstack-plugin-amqp1: https://opendev.org/openstack/devstack-plugin-amqp1
zuul_copy_output:
'{{ devstack_log_dir }}/qdrouterd.log': logs
- job:
name: oslo.messaging-src-dsvm-full-kafka-hybrid
description: |
@ -109,8 +93,6 @@
- oslo.messaging-tox-py310-func-scenario02:
voting: false
- oslo.messaging-src-dsvm-full-rabbit
- oslo.messaging-src-dsvm-full-amqp1-hybrid:
voting: false
- oslo.messaging-src-dsvm-full-kafka-hybrid:
voting: false
- oslo.messaging-grenade:

View File

@ -14,15 +14,6 @@ libffi-devel [platform:rpm]
rabbitmq-server [platform:dpkg rabbit]
rabbitmq-server [platform:rpm rabbit]
# AMQP1 dpkg
# This needs qpid/testing, will be installed by tools/test-setup.sh
# qdrouterd [platform:dpkg amqp1 test]
sasl2-bin [platform:dpkg amqp1 test]
uuid-dev [platform:dpkg amqp1 test]
swig [platform:dpkg amqp1 test]
libsasl2-modules [platform:dpkg amqp1 test]
default-jdk [platform:dpkg amqp1 test !platform:debian]
# kafka dpkg
default-jdk [platform:dpkg kafka]
librdkafka1 [platform:dpkg kafka]

View File

@ -1,639 +0,0 @@
=========================================
AMQP 1.0 Protocol Driver Deployment Guide
=========================================
.. currentmodule:: oslo_messaging
Introduction
------------
The AMQP 1.0 Protocol Driver is a messaging transport backend
supported in oslo.messaging. The driver maps the base *oslo.messaging*
capabilities for RPC and Notification message exchange onto version
1.0 of the Advanced Message Queuing Protocol (AMQP 1.0, ISO/IEC
19464). The driver is intended to support any messaging intermediary
(e.g. broker or router) that implements version 1.0 of the AMQP
protocol.
More detail regarding the AMQP 1.0 Protocol is available from the
`AMQP specification`__.
More detail regarding the driver's implementation is available from
the `oslo specification`__.
__ http://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-overview-v1.0-os.html
__ https://opendev.org/openstack/oslo-specs/src/branch/master/specs/juno/amqp10-driver-implementation.rst
Abstract
--------
The AMQP 1.0 driver is one of a family of *oslo.messaging* backend
drivers. It currently supports two types of message intermediaries.
The first type is an AMQP 1.0 messaging broker and the second type is
an AMQP 1.0 message router. The driver should support additional
intermediary types in the future but may require additions to driver
configuration parameters in order to do so.
+--------------+-----------+------------+------------+-----------+
| Intermediary | RPC | Notify | Message | Topology |
| Type | Pattern | Pattern | Treatment | |
+--------------+-----------+------------+------------+-----------+
| Message | Yes | `Limited`_ | Direct | Single or |
| Router | | | Messaging | Mesh |
+--------------+-----------+------------+------------+-----------+
| Message | Yes | Yes | Store and | Single or |
| Broker | | | Forward | Cluster |
+--------------+-----------+------------+------------+-----------+
Direct Messaging
~~~~~~~~~~~~~~~~
The RPC messaging pattern is a synchronous exchange between
client and server that is temporally bracketed. The direct messaging
capabilities provided by the message router are optimal for the
RPC messaging pattern.
The driver can readily scale operation from working with a single
instances of a message router to working with a large scale routed
mesh interconnect topology.
Store and Forward
~~~~~~~~~~~~~~~~~
The Notification messaging pattern is an asynchronous exchange from
a notifier to a listener (e.g. consumer). The listener need not be
present when the notification is sent. Thus, the store and forwarding
capabilities provided by the message broker are required for the
Notification messaging pattern.
This driver is able to work with a single instance of a message broker
or a clustered broker deployment.
.. _Limited:
It is recommended that the message router intermediary not be used
for the Notification messaging pattern due to the consideration that
notification messages will be dropped when there is no active
consumer. The message router does not provide durability or
store-and-forward capabilities for notification messages.
Hybrid Messaging Backends
~~~~~~~~~~~~~~~~~~~~~~~~~
Oslo.messaging provides a mechanism to configure separate backends for
RPC and Notification communications. This is supported through the
specification of separate RPC and Notification `transport urls`_ in the
service configuration. This capability enables the optimal alignment
of messaging patterns to messaging backend and allows for different
messaging backend types to be deployed.
This document provides deployment and configuration information for use
of this driver in hybrid messaging configurations.
Addressing
~~~~~~~~~~
A new address syntax was added to the driver to support efficient
direct message routing. This new syntax will also work with a broker
intermediary backend but is not compatible with the address syntax
previously used by the driver. In order to allow backward compatibility,
the driver will attempt to identify the intermediary type for the
backend in use and will automatically select the 'legacy' syntax for
broker-based backends or the new 'routable' syntax for router-based
backends. An `address mode`_ configuration option is provided to
override this dynamic behavior and force the use of either the legacy
or routable address syntax.
Message Acknowledgement
~~~~~~~~~~~~~~~~~~~~~~~
A primary functional difference between a router and a
broker intermediary type is when message acknowledgement occurs.
The router does not "store" the message hence it does not generate an
acknowledgement. Instead the consuming endpoint is responsible for message
acknowledgement and the router forwards the acknowledgement back to
the sender. This is known as 'end-to-end' acknowledgement. In contrast, a
broker stores then forwards the message so that message acknowledgement is
performed in two stages. In the first stage, a message
acknowledgement occurs between the broker and the Sender. In the
second stage, an acknowledgement occurs between the Server and
the broker.
This difference affects how long the Sender waits for the message
transfer to complete.
::
+dispatch+
| (3) |
| |
| v
+--------------+ (1) +----------+ (2) +--------------+
| Client |---------->| Router |----------->| Server |
| (Sender) |<----------| (Direct) |<-----------| (Listener) |
+--------------+ (5) +----------+ (4) +--------------+
For example when a router intermediary is used, the following sequence
occurs:
1. The message is sent to the router
2. The router forwards the message to the Server
3. The Server dispatches the message to the application
4. The Server indicates the acknowledgement via the router
5. The router forwards the acknowledgement to the Sender
In this sequence, a Sender waits for the message acknowledgement until
step (5) occurs.
::
+dispatch+
| (4) |
| |
| v
+--------------+ (1) +----------+ (3) +--------------+
| Client |---------->| Broker |----------->| Server |
| (Sender) |<----------| (Queue) |<-----------| (Listener) |
+--------------+ (2) +----------+ (5) +--------------+
And when a broker intermediary is used, the following sequence occurs:
1. The message is sent to the broker
2. The broker stores the message and acknowledges the message to the
Sender
3. The broker sends the message to the Server
4. The Server dispatches the message to the application
5. The Server indicates the acknowledgement to the broker
In this sequence, a Sender waits for the message acknowledgement until
step (2) occurs.
Therefore the broker-based Sender receives the acknowledgement
earlier in the transfer than the routed case. However in the brokered
case receipt of the acknowledgement does not signify that the message
has been (or will ever be) received by the Server.
Batched Notifications **Note Well**
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
While the use of a router intermediary for oslo.messaging Notification is
currently not recommended, it should be noted that the use of a router
intermediary with batched notifications may exacerbate the acknowledgement
wait time for a Sender.
For example, when a batched notification configuration is used where
batch size is set to 100, the Server will wait until 100 notification
messages are buffered (or timeout occurs) before dispatching the
notifications to the application for message acknowledgement. Since
each notifier client can have at most one message outstanding
(e.g. pending acknowledgement), then if the total number of notifying
clients are less than 100 the batch limit will never be met. This will
effectively pause all notifying clients until the batch timeout expires.
Prerequisites
-------------
Protocol Engine
~~~~~~~~~~~~~~~
This driver uses the Apache QPID `Proton`__ AMQP 1.0 protocol engine.
This engine consists of a platform specific library and a python
binding. The driver does not directly interface with the engine API,
as the API is a very low-level interface to the AMQP protocol.
Instead, the driver uses the pure python `Pyngus`__ client API, which
is layered on top of the protocol engine.
In order to run the driver the Proton Python bindings, Proton
library, Proton header files, and Pyngus must be installed.
__ http://qpid.apache.org/proton/index.html
__ https://github.com/kgiusti/pyngus
Source packages for the `Pyngus client API`__ are available via PyPI.
__ https://pypi.org/project/pyngus
Pyngus depends on the Proton Python bindings. Source packages for the
`Proton Python bindings`__ are also available via PyPI.
__ https://pypi.org/project/python-qpid-proton
Since the AMQP 1.0 driver is an optional extension to Oslo.Messaging
these packages are not installed by default. Use the 'amqp1' extras
tag when installing Oslo.Messaging in order to pull in these extra
packages:
.. code-block:: shell
$ python -m pip install oslo.messaging[amqp1]
The Proton package includes a C extension that links to the Proton
library. The C extension is built locally when the Proton source
packages are install from PyPI. In order to build the Proton C source
locally, there are a number of tools and libraries that need to be
present on the system:
* The tools and library necessary for Python C development
* The `SWIG`__ wrapper generator
* The `OpenSSL`__ development libraries and headers
* The `Cyrus SASL`__ development libraries and headers
**Note well**: Currently the Proton PyPI package only supports building
the C extension on Linux systems.
Pre-built packages for both Pyngus and Proton engine are available for
various Linux distributions (see `packages`_ below). It is recommended
to use the pre-built packages if they are available for your platform.
__ http://www.swig.org/index.php
__ https://www.openssl.org
__ https://cyrusimap.org
Router Intermediary
~~~~~~~~~~~~~~~~~~~
This driver supports a *router* intermediary that supports version 1.0
of the AMQP protocol. The direct messaging capabilities provided by
this intermediary type are recommended for oslo.messaging RPC.
The driver has been tested with `qpid-dispatch-router`__ router in a
`devstack`_ environment. The version of qpid-dispatch-router
**must** be at least 0.7.0. The qpid-dispatch-router also uses the
Proton engine for its AMQP 1.0 support, so the Proton library must be
installed on the system hosting the qpid-dispatch-router daemon.
Pre-built packages for the router are available. See `packages`_ below.
__ http://qpid.apache.org/components/dispatch-router/
Broker Intermediary
~~~~~~~~~~~~~~~~~~~
This driver supports a *broker* intermediary that supports version 1.0
of the AMQP protocol. The store and forward capabilities provided by
this intermediary type are recommended for *oslo.messaging* Notifications.
The driver has been tested with the `qpidd`__ broker in a `devstack`_
environment. The version of qpidd **must** be at least
0.34. qpidd also uses the Proton engine for its AMQP 1.0 support, so
the Proton library must be installed on the system hosting the qpidd
daemon.
Pre-built packages for the broker are available. See `packages`_ below.
See the `oslo specification`__ for additional information regarding testing
done on the driver.
__ http://qpid.apache.org/components/cpp-broker/index.html
__ https://opendev.org/openstack/oslo-specs/src/branch/master/specs/juno/amqp10-driver-implementation.rst
Configuration
-------------
.. _transport urls:
Transport URL Enable
~~~~~~~~~~~~~~~~~~~~
In oslo.messaging, the transport_url parameters define the OpenStack service
backends for RPC and Notify. The url is of the form::
transport://user:pass@host1:port[,hostN:portN]/virtual_host
Where the transport value specifies the rpc or notification backend as
one of **amqp**, rabbit, kafka, etc.
To specify and enable the AMQP 1.0 driver for RPC, in the ``[DEFAULT]``
section of the service configuration file, specify the
``transport_url`` parameter:
.. code-block:: ini
[DEFAULT]
transport_url = amqp://username:password@routerhostname:5672
To specify and enable the AMQP 1.0 driver for Notify, in the
``[NOTIFICATIONS]`` section of the service configuration file, specify the
``transport_url`` parameter:
::
[NOTIFICATIONS]
transport_url = amqp://username:password@brokerhostname:5672
Note, that if a 'transport_url' parameter is not specified in the
[NOTIFICATIONS] section, the [DEFAULT] transport_url will be used
for both RPC and Notify backends.
Driver Options
~~~~~~~~~~~~~~
It is recommended that the default configuration options provided by
the AMQP 1.0 driver be used. The configuration options can be modified
in the :oslo.config:group:`oslo_messaging_amqp` section of the service
configuration file.
Connection Options
^^^^^^^^^^^^^^^^^^
- :oslo.config:option:`oslo_messaging_amqp.idle_timeout`
- :oslo.config:option:`oslo_messaging_amqp.connection_retry_interval`
- :oslo.config:option:`oslo_messaging_amqp.connection_retry_backoff`
- :oslo.config:option:`oslo_messaging_amqp.connection_retry_interval_max`
Message Send Options
^^^^^^^^^^^^^^^^^^^^
- :oslo.config:option:`oslo_messaging_amqp.pre_settled`
- :oslo.config:option:`oslo_messaging_amqp.link_retry_delay`
- :oslo.config:option:`oslo_messaging_amqp.default_reply_timeout`
- :oslo.config:option:`oslo_messaging_amqp.default_send_timeout`
- :oslo.config:option:`oslo_messaging_amqp.default_notify_timeout`
.. _address mode:
Addressing Options
^^^^^^^^^^^^^^^^^^
- :oslo.config:option:`oslo_messaging_amqp.addressing_mode`
- :oslo.config:option:`oslo_messaging_amqp.server_request_prefix`
- :oslo.config:option:`oslo_messaging_amqp.broadcast_prefix`
- :oslo.config:option:`oslo_messaging_amqp.group_request_prefix`
- :oslo.config:option:`oslo_messaging_amqp.rpc_address_prefix`
- :oslo.config:option:`oslo_messaging_amqp.notify_address_prefix`
- :oslo.config:option:`oslo_messaging_amqp.multicast_address`
- :oslo.config:option:`oslo_messaging_amqp.unicast_address`
- :oslo.config:option:`oslo_messaging_amqp.anycast_address`
- :oslo.config:option:`oslo_messaging_amqp.default_notification_exchange`
- :oslo.config:option:`oslo_messaging_amqp.default_rpc_exchange`
SSL Options
^^^^^^^^^^^
- :oslo.config:option:`oslo_messaging_amqp.ssl`
- :oslo.config:option:`oslo_messaging_amqp.ssl_ca_file`
- :oslo.config:option:`oslo_messaging_amqp.ssl_cert_file`
- :oslo.config:option:`oslo_messaging_amqp.ssl_key_file`
- :oslo.config:option:`oslo_messaging_amqp.ssl_key_password`
SASL Options
^^^^^^^^^^^^
- :oslo.config:option:`oslo_messaging_amqp.sasl_mechanisms`
- :oslo.config:option:`oslo_messaging_amqp.sasl_config_dir`
- :oslo.config:option:`oslo_messaging_amqp.sasl_config_name`
- :oslo.config:option:`oslo_messaging_amqp.sasl_default_realm`
AMQP Generic Options (**Note Well**)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
The AMQP 1.0 driver currently does **not** support the generic *amqp*
options used by pre-1.0 drivers such as *amqp_durable_queues* or
*amqp_auto_delete*.
qpid-dispatch-router
~~~~~~~~~~~~~~~~~~~~
First, verify that the Proton library has been installed and is
imported by the ``qpid-dispatch-router`` intermediary. This can be checked
by running:
::
$ qdrouterd --help
and looking for references to ``qpid-dispatch`` include and config path
options in the help text. If no ``qpid-dispatch`` information is listed,
verify that the Proton libraries are installed and that the version of
``qdrouterd`` is greater than or equal to 0.6.0.
Second, configure the address patterns used by the driver. This is
done by adding the following to ``/etc/qpid-dispatch/qdrouterd.conf``.
If the legacy syntax for the addressing mode is required, include the
following:
::
address {
prefix: unicast
distribution: closest
}
address {
prefix: exclusive
distribution: closest
}
address {
prefix: broadcast
distribution: multicast
}
For the routable syntax addressing mode, include the following:
::
address {
prefix: openstack.org/om/rpc/multicast
distribution: multicast
}
address {
prefix: openstack.org/om/rpc/unicast
distribution: closest
}
address {
prefix: openstack.org/om/rpc/anycast
distribution: balanced
}
address {
prefix: openstack.org/om/notify/multicast
distribution: multicast
}
address {
prefix: openstack.org/om/notify/unicast
distribution: closest
}
address {
prefix: openstack.org/om/notify/anycast
distribution: balanced
}
**Note well**: For any customization of the `address mode`_ and syntax used,
it is required that the address entity configurations in the
`1`/etc/qpid-dispatch/qdrouterd.conf`` be updated.
qpidd
~~~~~
First, verify that the Proton library has been installed and is
imported by the qpidd broker. This can checked by running:
.. code-block:: shell
$ qpidd --help
and looking for the AMQP 1.0 options in the help text. If no AMQP 1.0
options are listed, verify that the Proton libraries are installed and
that the version of qpidd is greater than or equal to 0.34.
Second, configure the default address patterns used by the
driver for a broker-based backend. This is done by adding the
following to ``/etc/qpid/qpidd.conf``:
.. code-block:: ini
queue-patterns=exclusive
queue-patterns=unicast
topic-patterns=broadcast
These patterns, *exclusive*, *unicast*, and *broadcast* are the
legacy addressing values used by the driver. These can be overridden via the
driver configuration options if desired (see above). If manually overridden,
update the ``qpidd.conf`` values to match.
.. _devstack:
DevStack Support
----------------
The plugin for the AMQP 1.0 oslo.messaging driver is supported by
DevStack. The plugin supports the deployment of several different
message bus configurations.
In the ``[localrc]`` section of ``local.conf``, the `devstack-plugin-amqp1`__
plugin repository must be enabled. For example:
.. code-block:: ini
[[local|localrc]]
enable_plugin amqp1 https://opendev.org/openstack/devstack-plugin-amqp1
Set the username and password variables if needed for the
configuration:
.. code-block:: shell
AMQP1_USERNAME=queueuser
AMQP1_PASSWORD=queuepassword
The AMQP1_SERVICE variable identifies the message bus configuration that
will be used. In addition to the AMQP 1.0 driver being used for both the
RPC and Notification messaging communications, a hybrid configuration is
supported in the plugin that will deploy AMQP 1.0 for the RPC backend and
the oslo_messaging rabbit driver for the Notification backend. Additionally,
the plugin supports a setting for a pre-provisioned messaging bus that
prevents the plugin from creating the messaging bus. The setting of the
AMQP1_SERVICE variable will select which messaging intermediary will be used
for the RPC and Notification messaging backends:
+---------------+------------------+------------------+
| AMQP1_SERVICE | RPC Backend | Notify Backend |
+---------------+------------------+------------------+
| | | |
| qpid | qpidd broker | qpidd broker |
| | | |
+---------------+------------------+------------------+
| | | |
| qpid-dual | qdrouterd router | qpidd broker |
| | | |
+---------------+------------------+------------------+
| | | |
| qpid-hybrid | qdrouterd router | rabbitmq broker |
| | | |
+---------------+------------------+------------------+
| | | |
| external | pre-provisioned | pre-provisioned |
| | message bus | message bus |
| | | |
+---------------+------------------+------------------+
__ https://github.com/openstack/devstack-plugin-amqp1.git
.. _packages:
Platforms and Packages
----------------------
PyPi
~~~~
Packages for `Pyngus`__ and the `Proton`__ engine are available on PyPI.
__ https://pypi.org/project/pyngus
__ https://pypi.org/project/python-qpid-proton
RHEL and Fedora
~~~~~~~~~~~~~~~
Packages exist in EPEL for RHEL/Centos 7 and 8, and Fedora 26+.
The following packages must be installed on the system running the
``qdrouterd`` daemon:
- ``qpid-dispatch-router``
- ``python-qpid-proton``
The following packages must be installed on the system running the
``qpidd`` daemon:
- ``qpid-cpp-server`` (version 0.26+)
- ``qpid-proton-c``
The following packages must be installed on the systems running the
services that use the new driver:
- Proton libraries: ``qpid-proton-c-devel``
- Proton python bindings: ``python-qpid-proton``
- ``pyngus`` (via PyPI)
Debian and Ubuntu
~~~~~~~~~~~~~~~~~
.. todo:: Is this still true?
Packages for the Proton library, headers, and Python bindings are
available in the Debian/Testing repository. Proton packages are not
yet available in the Ubuntu repository. The version of qpidd on both
platforms is too old and does not support AMQP 1.0.
Until the proper package version arrive the latest packages can be
pulled from the `Apache Qpid PPA`__ on Launchpad:
.. code-block:: shell
$ sudo add-apt-repository ppa:qpid/released
The following packages must be installed on the system running the
``qdrouterd`` daemon:
- ``qdrouterd`` (version 0.8.0+)
The following packages must be installed on the system running the
``qpidd`` daemon:
- ``qpidd`` (version 0.34+)
The following packages must be installed on the systems running the
services that use the new driver:
- Proton libraries: ``libqpid-proton2-dev``
- Proton python bindings: ``python-qpid-proton``
- ``pyngus`` (via Pypi)
__ https://launchpad.net/~qpid/+archive/ubuntu/released

View File

@ -6,6 +6,5 @@ Deployment Guide
:maxdepth: 2
drivers
AMQP1.0
kafka
rabbit

View File

@ -45,12 +45,11 @@ through the definition of separate RPC and notification
`transport urls`__ in the service configuration. When the Kafka driver
is deployed for *oslo.messaging* notifications, a separate driver and
messaging backend must be deployed for RPC communications. For these
hybrid messaging configurations, either the `rabbit`__ or `amqp`__
drivers can be deployed for *oslo.messaging* RPC.
hybrid messaging configurations, the `rabbit`__ drivers can be deployed for
*oslo.messaging* RPC.
__ https://docs.openstack.org/oslo.messaging/latest/reference/transport.html
__ https://docs.openstack.org/oslo.messaging/latest/admin/drivers.html#rabbit
__ https://docs.openstack.org/oslo.messaging/latest/admin/AMQP1.0.html
Topics and vhost Support
~~~~~~~~~~~~~~~~~~~~~~~~
@ -131,7 +130,7 @@ service backends for RPC and Notify. The URL is of the form::
transport://user:pass@host1:port[,hostN:portN]/virtual_host
Where the transport value specifies the RPC or notification backend as
one of ``amqp``, ``rabbit``, ``kafka``, etc. To specify and enable the
one of ``rabbit``, ``kafka``, etc. To specify and enable the
Kafka driver for notifications, in the section
``[oslo_messaging_notifications]`` of the service configuration file,
specify the ``transport_url`` parameter::
@ -207,10 +206,10 @@ the configuration
The ``RPC_`` and ``NOTIFY_`` variables will define the message bus
configuration that will be used. The hybrid configurations will allow
for the *rabbit* and *amqp* drivers to be used for the RPC transports
while the *kafka* driver will be used for the notification transport. The
setting of the service variables will select which messaging
intermediary is enabled for the configuration:
for the *rabbit* drivers to be used for the RPC transports while the
*kafka* driver will be used for the notification transport. The setting
of the service variables will select which messaging intermediary is
enabled for the configuration:
+------------+--------------------+--------------------+
| | RPC | NOTIFY |
@ -219,7 +218,5 @@ intermediary is enabled for the configuration:
+------------+-----------+--------+-----------+--------+
| Config 1 | rabbit | 5672 | kafka | 9092 |
+------------+-----------+--------+-----------+--------+
| Config 1 | amqp | 5672 | kafka | 9092 |
+------------+-----------+--------+-----------+--------+
__ https://github.com/openstack/devstack-plugin-kafka.git

View File

@ -23,4 +23,3 @@ different 3rd party libraries that don't ensure that. In certain
cases, with some drivers, it does work:
* rabbit: works only if no connection have already been established.
* amqp1: works

View File

@ -1,294 +0,0 @@
# Copyright 2016, Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
Utilities that map from a Target address to a proper AMQP 1.0 address.
This module defines a utility class that translates a high level oslo.messaging
address (Target) into the message-level address used on the message bus. This
translation may be statically configured or determined when the connection to
the message bus is made.
The Target members that are used to generate the address are:
* exchange
* topic
* server flag
* fanout flag
In addition a 'service' tag is associated with the address. This tag determines
the service associated with an address (e.g. rpc or notification) so
that traffic can be partitioned based on its use.
"""
import abc
import logging
from oslo_messaging.target import Target
__all__ = [
"keyify",
"AddresserFactory",
"SERVICE_RPC",
"SERVICE_NOTIFY"
]
SERVICE_RPC = 0
SERVICE_NOTIFY = 1
LOG = logging.getLogger(__name__)
def keyify(address, service=SERVICE_RPC):
"""Create a hashable key from a Target and service that will uniquely
identify the generated address. This key is used to map the abstract
oslo.messaging address to its corresponding AMQP link(s). This mapping may
be done before the connection is established.
"""
if isinstance(address, Target):
# service is important because the resolved address may be
# different based on whether or not this Target is used for
# notifications or RPC
return ("Target:{t={%s} e={%s} s={%s} f={%s} service={%s}}" %
(address.topic, address.exchange, address.server,
address.fanout, service))
else:
# absolute address can be used without modification
return "String:{%s}" % address
class Addresser:
"""Base class message bus address generator. Used to convert an
oslo.messaging address into an AMQP 1.0 address string used over the
connection to the message bus.
"""
def __init__(self, default_exchange):
self._default_exchange = default_exchange
def resolve(self, target, service):
if not isinstance(target, Target):
# an already resolved address
return target
# Return a link address for a given target
if target.fanout:
return self.multicast_address(target, service)
elif target.server:
return self.unicast_address(target, service)
else:
return self.anycast_address(target, service)
@abc.abstractmethod
def multicast_address(self, target, service):
"""Address used to broadcast to all subscribers
"""
@abc.abstractmethod
def unicast_address(self, target, service):
"""Address used to target a specific subscriber (direct)
"""
@abc.abstractmethod
def anycast_address(self, target, service):
"""Address used for shared subscribers (competing consumers)
"""
def _concat(self, sep, items):
return sep.join(filter(bool, items))
class LegacyAddresser(Addresser):
"""Legacy addresses are in the following format:
multicast: '$broadcast_prefix[.$vhost].$exchange.$topic.all'
unicast: '$server_prefix[.$vhost].$exchange.$topic.$server'
anycast: '$group_prefix[.$vhost].$exchange.$topic'
Legacy addresses do not distinguish RPC traffic from Notification traffic
"""
def __init__(self, default_exchange, server_prefix, broadcast_prefix,
group_prefix, vhost):
super().__init__(default_exchange)
self._server_prefix = server_prefix
self._broadcast_prefix = broadcast_prefix
self._group_prefix = group_prefix
self._vhost = vhost
def multicast_address(self, target, service):
return self._concat(".",
[self._broadcast_prefix,
self._vhost,
target.exchange or self._default_exchange,
target.topic,
"all"])
def unicast_address(self, target, service=SERVICE_RPC):
return self._concat(".",
[self._server_prefix,
self._vhost,
target.exchange or self._default_exchange,
target.topic,
target.server])
def anycast_address(self, target, service=SERVICE_RPC):
return self._concat(".",
[self._group_prefix,
self._vhost,
target.exchange or self._default_exchange,
target.topic])
# for debug:
def _is_multicast(self, address):
return address.startswith(self._broadcast_prefix)
def _is_unicast(self, address):
return address.startswith(self._server_prefix)
def _is_anycast(self, address):
return address.startswith(self._group_prefix)
def _is_service(self, address, service):
# legacy addresses are the same for RPC or Notifications
return True
class RoutableAddresser(Addresser):
"""Routable addresses have different formats based their use. It starts
with a prefix that is determined by the type of traffic (RPC or
Notifications). The prefix is followed by a description of messaging
delivery semantics. The delivery may be one of: 'multicast', 'unicast', or
'anycast'. The delivery semantics are followed by information pulled from
the Target. The template is:
$prefix/$semantics[/$vhost]/$exchange/$topic[/$server]
Examples based on the default prefix and semantic values:
rpc-unicast: "openstack.org/om/rpc/unicast/my-exchange/my-topic/my-server"
notify-anycast: "openstack.org/om/notify/anycast/my-vhost/exchange/topic"
"""
def __init__(self, default_exchange, rpc_exchange, rpc_prefix,
notify_exchange, notify_prefix, unicast_tag, multicast_tag,
anycast_tag, vhost):
super().__init__(default_exchange)
if not self._default_exchange:
self._default_exchange = "openstack"
# templates for address generation:
self._vhost = vhost
_rpc = rpc_prefix + "/"
self._rpc_prefix = _rpc
self._rpc_unicast = _rpc + unicast_tag
self._rpc_multicast = _rpc + multicast_tag
self._rpc_anycast = _rpc + anycast_tag
_notify = notify_prefix + "/"
self._notify_prefix = _notify
self._notify_unicast = _notify + unicast_tag
self._notify_multicast = _notify + multicast_tag
self._notify_anycast = _notify + anycast_tag
self._exchange = [
# SERVICE_RPC:
rpc_exchange or self._default_exchange or 'rpc',
# SERVICE_NOTIFY:
notify_exchange or self._default_exchange or 'notify'
]
def multicast_address(self, target, service=SERVICE_RPC):
if service == SERVICE_RPC:
prefix = self._rpc_multicast
else:
prefix = self._notify_multicast
return self._concat("/",
[prefix,
self._vhost,
target.exchange or self._exchange[service],
target.topic])
def unicast_address(self, target, service=SERVICE_RPC):
if service == SERVICE_RPC:
prefix = self._rpc_unicast
else:
prefix = self._notify_unicast
return self._concat("/",
[prefix,
self._vhost,
target.exchange or self._exchange[service],
target.topic,
target.server])
def anycast_address(self, target, service=SERVICE_RPC):
if service == SERVICE_RPC:
prefix = self._rpc_anycast
else:
prefix = self._notify_anycast
return self._concat("/",
[prefix,
self._vhost,
target.exchange or self._exchange[service],
target.topic])
# for debug:
def _is_multicast(self, address):
return (address.startswith(self._rpc_multicast) or
address.startswith(self._notify_multicast))
def _is_unicast(self, address):
return (address.startswith(self._rpc_unicast) or
address.startswith(self._notify_unicast))
def _is_anycast(self, address):
return (address.startswith(self._rpc_anycast) or
address.startswith(self._notify_anycast))
def _is_service(self, address, service):
return address.startswith(self._rpc_prefix if service == SERVICE_RPC
else self._notify_prefix)
class AddresserFactory:
"""Generates the proper Addresser based on configuration and the type of
message bus the driver is connected to.
"""
def __init__(self, default_exchange, mode, **kwargs):
self._default_exchange = default_exchange
self._mode = mode
self._kwargs = kwargs
def __call__(self, remote_properties, vhost=None):
# for backwards compatibility use legacy if dynamic and we're connected
# to qpidd or we cannot identify the message bus. This can be
# overridden via the configuration.
product = remote_properties.get('product', 'qpid-cpp')
if self._mode == 'legacy' or (self._mode == 'dynamic' and
product == 'qpid-cpp'):
return LegacyAddresser(self._default_exchange,
self._kwargs['legacy_server_prefix'],
self._kwargs['legacy_broadcast_prefix'],
self._kwargs['legacy_group_prefix'],
vhost)
else:
return RoutableAddresser(self._default_exchange,
self._kwargs.get("rpc_exchange"),
self._kwargs["rpc_prefix"],
self._kwargs.get("notify_exchange"),
self._kwargs["notify_prefix"],
self._kwargs["unicast"],
self._kwargs["multicast"],
self._kwargs["anycast"],
vhost)

File diff suppressed because it is too large Load Diff

View File

@ -1,404 +0,0 @@
# Copyright 2014, Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
A thread that performs all messaging I/O and protocol event handling.
This module provides a background thread that handles messaging operations
scheduled via the Controller, and performs blocking socket I/O and timer
processing. This thread is designed to be as simple as possible - all the
protocol specific intelligence is provided by the Controller and executed on
the background thread via callables.
"""
import collections
import errno
import heapq
import logging
import math
import os
import pyngus
import select
import socket
import threading
import time
import uuid
LOG = logging.getLogger(__name__)
def compute_timeout(offset):
# minimize the timer granularity to one second so we don't have to track
# too many timers
return math.ceil(time.monotonic() + offset)
class _SocketConnection:
"""Associates a pyngus Connection with a python network socket,
and handles all connection-related I/O and timer events.
"""
def __init__(self, name, container, properties, handler):
self.name = name
self.socket = None
self.pyngus_conn = None
self._properties = properties
# The handler is a pyngus ConnectionEventHandler, which is invoked by
# pyngus on connection-related events (active, closed, error, etc).
# Currently it is the Controller object.
self._handler = handler
self._container = container
def fileno(self):
"""Allows use of a _SocketConnection in a select() call.
"""
return self.socket.fileno()
def read_socket(self):
"""Called to read from the socket."""
if self.socket:
try:
pyngus.read_socket_input(self.pyngus_conn, self.socket)
self.pyngus_conn.process(time.monotonic())
except (socket.timeout, OSError) as e:
# pyngus handles EAGAIN/EWOULDBLOCK and EINTER
self.pyngus_conn.close_input()
self.pyngus_conn.close_output()
self._handler.socket_error(str(e))
def write_socket(self):
"""Called to write to the socket."""
if self.socket:
try:
pyngus.write_socket_output(self.pyngus_conn, self.socket)
self.pyngus_conn.process(time.monotonic())
except (socket.timeout, OSError) as e:
# pyngus handles EAGAIN/EWOULDBLOCK and EINTER
self.pyngus_conn.close_output()
self.pyngus_conn.close_input()
self._handler.socket_error(str(e))
def connect(self, host):
"""Connect to host and start the AMQP protocol."""
addr = socket.getaddrinfo(host.hostname, host.port, socket.AF_UNSPEC,
socket.SOCK_STREAM)
if not addr:
key = "%s:%i" % (host.hostname, host.port)
error = "Invalid peer address '%s'" % key
LOG.error("Invalid peer address '%s'", key)
self._handler.socket_error(error)
return
my_socket = socket.socket(addr[0][0], addr[0][1], addr[0][2])
my_socket.setblocking(0) # 0=non-blocking
my_socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
try:
my_socket.connect(addr[0][4])
except OSError as e:
if e.errno != errno.EINPROGRESS:
error = "Socket connect failure '%s'" % str(e)
LOG.error("Socket connect failure '%s'", str(e))
self._handler.socket_error(error)
return
self.socket = my_socket
props = self._properties.copy()
if pyngus.VERSION >= (2, 0, 0):
# configure client authentication
#
props['x-server'] = False
if host.username:
props['x-username'] = host.username
props['x-password'] = host.password or ""
self.pyngus_conn = self._container.create_connection(self.name,
self._handler,
props)
self.pyngus_conn.user_context = self
if pyngus.VERSION < (2, 0, 0):
# older versions of pyngus requires manual SASL configuration:
# determine the proper SASL mechanism: PLAIN if a username/password
# is present, else ANONYMOUS
pn_sasl = self.pyngus_conn.pn_sasl
if host.username:
password = host.password if host.password else ""
pn_sasl.plain(host.username, password)
else:
pn_sasl.mechanisms("ANONYMOUS")
pn_sasl.client()
self.pyngus_conn.open()
def reset(self, name=None):
"""Clean up the current state, expect 'connect()' to be recalled
later.
"""
# note well: since destroy() is called on the connection, do not invoke
# this method from a pyngus callback!
if self.pyngus_conn:
self.pyngus_conn.destroy()
self.pyngus_conn = None
self.close()
if name:
self.name = name
def close(self):
if self.socket:
self.socket.close()
self.socket = None
class Scheduler:
"""Schedule callables to be run in the future.
"""
class Event:
# simply hold a reference to a callback that can be set to None if the
# alarm is canceled
def __init__(self, callback):
self.callback = callback
def cancel(self):
# quicker than rebalancing the tree
self.callback = None
def __init__(self):
self._callbacks = {}
self._deadlines = []
def alarm(self, request, deadline):
"""Request a callable be executed at a specific time
"""
try:
callbacks = self._callbacks[deadline]
except KeyError:
callbacks = list()
self._callbacks[deadline] = callbacks
heapq.heappush(self._deadlines, deadline)
entry = Scheduler.Event(request)
callbacks.append(entry)
return entry
def defer(self, request, delay):
"""Request a callable be executed after delay seconds
"""
return self.alarm(request, compute_timeout(delay))
@property
def _next_deadline(self):
"""The timestamp of the next expiring event or None
"""
return self._deadlines[0] if self._deadlines else None
def _get_delay(self, max_delay=None):
"""Get the delay in milliseconds until the next callable needs to be
run, or 'max_delay' if no outstanding callables or the delay to the
next callable is > 'max_delay'.
"""
due = self._deadlines[0] if self._deadlines else None
if due is None:
return max_delay
_now = time.monotonic()
if due <= _now:
return 0
else:
return min(due - _now, max_delay) if max_delay else due - _now
def _process(self):
"""Invoke all expired callables."""
if self._deadlines:
_now = time.monotonic()
try:
while self._deadlines[0] <= _now:
deadline = heapq.heappop(self._deadlines)
callbacks = self._callbacks[deadline]
del self._callbacks[deadline]
for cb in callbacks:
cb.callback and cb.callback()
except IndexError:
pass
class Requests:
"""A queue of callables to execute from the eventloop thread's main
loop.
"""
def __init__(self):
self._requests = collections.deque()
self._wakeup_pipe = os.pipe()
self._pipe_ready = False # prevents blocking on an empty pipe
self._pipe_lock = threading.Lock()
def wakeup(self, request=None):
"""Enqueue a callable to be executed by the eventloop, and force the
eventloop thread to wake up from select().
"""
with self._pipe_lock:
if request:
self._requests.append(request)
if not self._pipe_ready:
self._pipe_ready = True
os.write(self._wakeup_pipe[1], b'!')
def fileno(self):
"""Allows this request queue to be used by select()."""
return self._wakeup_pipe[0]
def process_requests(self):
"""Invoked by the eventloop thread, execute each queued callable."""
with self._pipe_lock:
if not self._pipe_ready:
return
self._pipe_ready = False
os.read(self._wakeup_pipe[0], 512)
requests = self._requests
self._requests = collections.deque()
for r in requests:
r()
class Thread(threading.Thread):
"""Manages socket I/O and executes callables queued up by external
threads.
"""
def __init__(self, container_name, node, command, pid):
super().__init__()
# callables from other threads:
self._requests = Requests()
# delayed callables (only used on this thread for now):
self._scheduler = Scheduler()
self._connection = None
# Configure a container
if container_name is None:
container_name = ("openstack.org/om/container/%s/%s/%s/%s" %
(node, command, pid, uuid.uuid4().hex))
self._container = pyngus.Container(container_name)
self.name = "Thread for Proton container: %s" % self._container.name
self._shutdown = False
self.daemon = True
self.start()
def wakeup(self, request=None):
"""Wake up the eventloop thread, Optionally providing a callable to run
when the eventloop wakes up. Thread safe.
"""
self._requests.wakeup(request)
def shutdown(self):
"""Shutdown the eventloop thread. Thread safe.
"""
LOG.debug("eventloop shutdown requested")
self._shutdown = True
self.wakeup()
def destroy(self):
# release the container. This can only be called after the eventloop
# thread exited
self._container.destroy()
self._container = None
# the following methods are not thread safe - they must be run from the
# eventloop thread
def defer(self, request, delay):
"""Invoke request after delay seconds."""
return self._scheduler.defer(request, delay)
def alarm(self, request, deadline):
"""Invoke request at a particular time"""
return self._scheduler.alarm(request, deadline)
def connect(self, host, handler, properties):
"""Get a _SocketConnection to a peer represented by url."""
key = "openstack.org/om/connection/{}:{}/".format(
host.hostname, host.port)
# return pre-existing
conn = self._container.get_connection(key)
if conn:
return conn.user_context
# create a new connection - this will be stored in the
# container, using the specified name as the lookup key, or if
# no name was provided, the host:port combination
sc = _SocketConnection(key, self._container,
properties, handler=handler)
sc.connect(host)
self._connection = sc
return sc
def run(self):
"""Run the proton event/timer loop."""
LOG.debug("Starting Proton thread, container=%s",
self._container.name)
try:
self._main_loop()
except Exception:
# unknown error - fatal
LOG.exception("Fatal unhandled event loop error!")
raise
def _main_loop(self):
# Main event loop
while not self._shutdown:
readfds = [self._requests]
writefds = []
deadline = self._scheduler._next_deadline
pyngus_conn = self._connection and self._connection.pyngus_conn
if pyngus_conn and self._connection.socket:
if pyngus_conn.needs_input:
readfds.append(self._connection)
if pyngus_conn.has_output:
writefds.append(self._connection)
if pyngus_conn.deadline:
deadline = (pyngus_conn.deadline if not deadline else
min(deadline, pyngus_conn.deadline))
# force select to return in time to service the next expiring timer
if deadline:
_now = time.monotonic()
timeout = 0 if deadline <= _now else (deadline - _now)
else:
timeout = None
# and now we wait...
try:
select.select(readfds, writefds, [], timeout)
except OSError as serror:
if serror[0] == errno.EINTR:
LOG.warning("ignoring interrupt from select(): %s",
str(serror))
continue
raise # assuming fatal...
# Ignore the select return value - simply poll the socket for I/O.
# Testing shows that polling improves latency over checking the
# lists returned by select()
self._requests.process_requests()
self._connection.read_socket()
if pyngus_conn and pyngus_conn.deadline:
_now = time.monotonic()
if pyngus_conn.deadline <= _now:
pyngus_conn.process(_now)
self._connection.write_socket()
self._scheduler._process() # run any deferred requests
LOG.info("eventloop thread exiting, container=%s",
self._container.name)

View File

@ -1,270 +0,0 @@
# Copyright 2014, Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo_config import cfg
amqp1_opts = [
cfg.StrOpt('container_name',
deprecated_group='amqp1',
help='Name for the AMQP container. must be globally unique.'
' Defaults to a generated UUID'),
cfg.IntOpt('idle_timeout',
default=0, # disabled
deprecated_group='amqp1',
help='Timeout for inactive connections (in seconds)'),
cfg.BoolOpt('trace',
default=False,
deprecated_group='amqp1',
help='Debug: dump AMQP frames to stdout'),
cfg.BoolOpt('ssl',
default=False,
help=("Attempt to connect via SSL. If no other ssl-related "
"parameters are given, it will use the system's "
"CA-bundle to verify the server's certificate.")),
cfg.StrOpt('ssl_ca_file',
default='',
deprecated_group='amqp1',
help="CA certificate PEM file used to verify the server's"
' certificate'),
cfg.StrOpt('ssl_cert_file',
default='',
deprecated_group='amqp1',
help='Self-identifying certificate PEM file'
' for client authentication'),
cfg.StrOpt('ssl_key_file',
default='',
deprecated_group='amqp1',
help='Private key PEM file used to sign ssl_cert_file'
' certificate (optional)'),
cfg.StrOpt('ssl_key_password',
deprecated_group='amqp1',
secret=True,
help='Password for decrypting ssl_key_file (if encrypted)'),
cfg.BoolOpt('ssl_verify_vhost',
default=False,
help="By default SSL checks that the name in the server's"
" certificate matches the hostname in the transport_url. In"
" some configurations it may be preferable to use the virtual"
" hostname instead, for example if the server uses the Server"
" Name Indication TLS extension (rfc6066) to provide a"
" certificate per virtual host. Set ssl_verify_vhost to True"
" if the server's SSL certificate uses the virtual host name"
" instead of the DNS name."),
cfg.StrOpt('sasl_mechanisms',
default='',
deprecated_group='amqp1',
help='Space separated list of acceptable SASL mechanisms'),
cfg.StrOpt('sasl_config_dir',
default='',
deprecated_group='amqp1',
help='Path to directory that contains the SASL configuration'),
cfg.StrOpt('sasl_config_name',
default='',
deprecated_group='amqp1',
help='Name of configuration file (without .conf suffix)'),
cfg.StrOpt('sasl_default_realm',
default='',
help='SASL realm to use if no realm present in username'),
# Network connection failure retry options
cfg.IntOpt('connection_retry_interval',
default=1,
min=1,
help='Seconds to pause before attempting to re-connect.'),
cfg.IntOpt('connection_retry_backoff',
default=2,
min=0,
help='Increase the connection_retry_interval by this many'
' seconds after each unsuccessful failover attempt.'),
cfg.IntOpt('connection_retry_interval_max',
default=30,
min=1,
help='Maximum limit for connection_retry_interval'
' + connection_retry_backoff'),
# Message send retry and timeout options
cfg.IntOpt('link_retry_delay',
default=10,
min=1,
help='Time to pause between re-connecting an AMQP 1.0 link that'
' failed due to a recoverable error.'),
cfg.IntOpt('default_reply_retry',
default=0,
min=-1,
help='The maximum number of attempts to re-send a reply message'
' which failed due to a recoverable error.'),
cfg.IntOpt('default_reply_timeout',
default=30,
min=5,
help='The deadline for an rpc reply message delivery.'),
cfg.IntOpt('default_send_timeout',
default=30,
min=5,
help='The deadline for an rpc cast or call message delivery.'
' Only used when caller does not provide a timeout expiry.'),
cfg.IntOpt('default_notify_timeout',
default=30,
min=5,
help='The deadline for a sent notification message delivery.'
' Only used when caller does not provide a timeout expiry.'),
# Sender link cache maintenance:
cfg.IntOpt('default_sender_link_timeout',
default=600,
min=1,
help='The duration to schedule a purge of idle sender links.'
' Detach link after expiry.'),
# Addressing:
cfg.StrOpt('addressing_mode',
default='dynamic',
help="Indicates the addressing mode used by the driver.\n"
"Permitted values:\n"
"'legacy' - use legacy non-routable addressing\n"
"'routable' - use routable addresses\n"
"'dynamic' - use legacy addresses if the message bus does not"
" support routing otherwise use routable addressing"),
cfg.BoolOpt('pseudo_vhost',
default=True,
help="Enable virtual host support for those message buses"
" that do not natively support virtual hosting (such as"
" qpidd). When set to true the virtual host name will be"
" added to all message bus addresses, effectively creating"
" a private 'subnet' per virtual host. Set to False if the"
" message bus supports virtual hosting using the 'hostname'"
" field in the AMQP 1.0 Open performative as the name of the"
" virtual host."),
# Legacy addressing customization:
cfg.StrOpt('server_request_prefix',
default='exclusive',
deprecated_group='amqp1',
help="address prefix used when sending to a specific server"),
cfg.StrOpt('broadcast_prefix',
default='broadcast',
deprecated_group='amqp1',
help="address prefix used when broadcasting to all servers"),
cfg.StrOpt('group_request_prefix',
default='unicast',
deprecated_group='amqp1',
help="address prefix when sending to any server in group"),
# Routable addressing customization:
#
# Addresses a composed of the following string values using a template in
# the form of:
# $(address_prefix)/$(*cast)/$(exchange)/$(topic)[/$(server-name)]
# where *cast is one of the multicast/unicast/anycast values used to
# identify the delivery pattern used for the addressed message
cfg.StrOpt('rpc_address_prefix',
default='openstack.org/om/rpc',
help="Address prefix for all generated RPC addresses"),
cfg.StrOpt('notify_address_prefix',
default='openstack.org/om/notify',
help="Address prefix for all generated Notification addresses"),
cfg.StrOpt('multicast_address',
default='multicast',
help="Appended to the address prefix when sending a fanout"
" message. Used by the message bus to identify fanout"
" messages."),
cfg.StrOpt('unicast_address',
default='unicast',
help="Appended to the address prefix when sending to a"
" particular RPC/Notification server. Used by the message bus"
" to identify messages sent to a single destination."),
cfg.StrOpt('anycast_address',
default='anycast',
help="Appended to the address prefix when sending to a group of"
" consumers. Used by the message bus to identify messages that"
" should be delivered in a round-robin fashion across"
" consumers."),
cfg.StrOpt('default_notification_exchange',
help="Exchange name used in notification addresses.\n"
"Exchange name resolution precedence:\n"
"Target.exchange if set\n"
"else default_notification_exchange if set\n"
"else control_exchange if set\n"
"else 'notify'"),
cfg.StrOpt('default_rpc_exchange',
help="Exchange name used in RPC addresses.\n"
"Exchange name resolution precedence:\n"
"Target.exchange if set\n"
"else default_rpc_exchange if set\n"
"else control_exchange if set\n"
"else 'rpc'"),
# Message Credit Levels
cfg.IntOpt('reply_link_credit',
default=200,
min=1,
help='Window size for incoming RPC Reply messages.'),
cfg.IntOpt('rpc_server_credit',
default=100,
min=1,
help='Window size for incoming RPC Request messages'),
cfg.IntOpt('notify_server_credit',
default=100,
min=1,
help='Window size for incoming Notification messages'),
# Settlement control
cfg.MultiStrOpt('pre_settled',
default=['rpc-cast', 'rpc-reply'],
help="Send messages of this type pre-settled.\n"
"Pre-settled messages will not receive acknowledgement\n"
"from the peer. Note well: pre-settled messages may be\n"
"silently discarded if the delivery fails.\n"
"Permitted values:\n"
"'rpc-call' - send RPC Calls pre-settled\n"
"'rpc-reply'- send RPC Replies pre-settled\n"
"'rpc-cast' - Send RPC Casts pre-settled\n"
"'notify' - Send Notifications pre-settled\n")
]

View File

@ -1,449 +0,0 @@
# Copyright 2014, Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
Driver for the 'amqp' transport.
This module provides a transport driver that speaks version 1.0 of the AMQP
messaging protocol. The driver sends messages and creates subscriptions via
'tasks' that are performed on its behalf via the controller module.
"""
import collections
import logging
import os
import threading
import uuid
import warnings
from debtcollector import removals
from oslo_config import cfg
from oslo_messaging.target import Target
from oslo_serialization import jsonutils
from oslo_utils import importutils
from oslo_utils import timeutils
from oslo_messaging._drivers.amqp1_driver.eventloop import compute_timeout
from oslo_messaging._drivers.amqp1_driver import opts
from oslo_messaging._drivers import base
from oslo_messaging._drivers import common
warnings.simplefilter('always')
proton = importutils.try_import('proton')
controller = importutils.try_import(
'oslo_messaging._drivers.amqp1_driver.controller'
)
LOG = logging.getLogger(__name__)
# Build/Decode RPC Response messages
# Body Format - json string containing a map with keys:
# 'failure' - (optional) serialized exception from remote
# 'response' - (if no failure provided) data returned by call
def marshal_response(reply, failure):
# TODO(grs): do replies have a context?
# NOTE(flaper87): Set inferred to True since rabbitmq-amqp-1.0 doesn't
# have support for vbin8.
msg = proton.Message(inferred=True)
if failure:
failure = common.serialize_remote_exception(failure)
data = {"failure": failure}
else:
data = {"response": reply}
msg.body = jsonutils.dumps(data)
return msg
def unmarshal_response(message, allowed):
# TODO(kgiusti) This may fail to unpack and raise an exception. Need to
# communicate this to the caller!
data = jsonutils.loads(message.body)
failure = data.get('failure')
if failure is not None:
raise common.deserialize_remote_exception(failure, allowed)
return data.get("response")
# Build/Decode RPC Request and Notification messages
# Body Format: json string containing a map with keys:
# 'request' - possibly serialized application data
# 'context' - context provided by the application
# 'call_monitor_timeout' - optional time in seconds for RPC call monitoring
def marshal_request(request, context, envelope=False,
call_monitor_timeout=None):
# NOTE(flaper87): Set inferred to True since rabbitmq-amqp-1.0 doesn't
# have support for vbin8.
msg = proton.Message(inferred=True)
if envelope:
request = common.serialize_msg(request)
data = {
"request": request,
"context": context
}
if call_monitor_timeout is not None:
data["call_monitor_timeout"] = call_monitor_timeout
msg.body = jsonutils.dumps(data)
return msg
def unmarshal_request(message):
data = jsonutils.loads(message.body)
msg = common.deserialize_msg(data.get("request"))
return (msg, data.get("context"), data.get("call_monitor_timeout"))
@removals.removed_class("ProtonIncomingMessage")
class ProtonIncomingMessage(base.RpcIncomingMessage):
def __init__(self, listener, message, disposition):
request, ctxt, client_timeout = unmarshal_request(message)
super().__init__(ctxt, request)
self.listener = listener
self.client_timeout = client_timeout
self._reply_to = message.reply_to
self._correlation_id = message.id
self._disposition = disposition
def heartbeat(self):
# heartbeats are sent "worst effort": non-blocking, no retries,
# pre-settled (no blocking for acks). We don't want the server thread
# being blocked because it is unable to send a heartbeat.
if not self._reply_to:
LOG.warning("Cannot send RPC heartbeat: no reply-to provided")
return
# send a null msg (no body). This will cause the client to simply reset
# its timeout (the null message is dropped). Use time-to-live to
# prevent stale heartbeats from building up on the message bus
msg = proton.Message()
msg.correlation_id = self._correlation_id
msg.ttl = self.client_timeout
task = controller.SendTask("RPC KeepAlive", msg, self._reply_to,
deadline=None, retry=0, wait_for_ack=False)
self.listener.driver._ctrl.add_task(task)
task.wait()
def reply(self, reply=None, failure=None):
"""Schedule an RPCReplyTask to send the reply."""
if self._reply_to:
response = marshal_response(reply, failure)
response.correlation_id = self._correlation_id
driver = self.listener.driver
deadline = compute_timeout(driver._default_reply_timeout)
ack = not driver._pre_settle_reply
task = controller.SendTask("RPC Reply", response, self._reply_to,
# analogous to kombu missing dest t/o:
deadline,
retry=driver._default_reply_retry,
wait_for_ack=ack)
driver._ctrl.add_task(task)
rc = task.wait()
if rc:
# something failed. Not much we can do at this point but log
LOG.debug("RPC Reply failed to send: %s", str(rc))
else:
LOG.debug("Ignoring reply as no reply address available")
def acknowledge(self):
"""Schedule a MessageDispositionTask to send the settlement."""
task = controller.MessageDispositionTask(self._disposition,
released=False)
self.listener.driver._ctrl.add_task(task)
def requeue(self):
"""Schedule a MessageDispositionTask to release the message"""
task = controller.MessageDispositionTask(self._disposition,
released=True)
self.listener.driver._ctrl.add_task(task)
@removals.removed_class("Queue")
class Queue:
def __init__(self):
self._queue = collections.deque()
self._lock = threading.Lock()
self._pop_wake_condition = threading.Condition(self._lock)
self._started = True
def put(self, item):
with self._lock:
self._queue.appendleft(item)
self._pop_wake_condition.notify()
def pop(self, timeout):
with timeutils.StopWatch(timeout) as stop_watcher:
with self._lock:
while len(self._queue) == 0:
if stop_watcher.expired() or not self._started:
return None
self._pop_wake_condition.wait(
stop_watcher.leftover(return_none=True)
)
return self._queue.pop()
def stop(self):
with self._lock:
self._started = False
self._pop_wake_condition.notify_all()
@removals.removed_class("ProtonListener")
class ProtonListener(base.PollStyleListener):
def __init__(self, driver):
super().__init__(driver.prefetch_size)
self.driver = driver
self.incoming = Queue()
self.id = uuid.uuid4().hex
def stop(self):
self.incoming.stop()
@base.batch_poll_helper
def poll(self, timeout=None):
qentry = self.incoming.pop(timeout)
if qentry is None:
return None
return ProtonIncomingMessage(self,
qentry['message'],
qentry['disposition'])
@removals.removed_class("ProtonDriver")
class ProtonDriver(base.BaseDriver):
"""AMQP 1.0 Driver
See :doc:`AMQP1.0` for details.
"""
def __init__(self, conf, url,
default_exchange=None, allowed_remote_exmods=[]):
if proton is None or controller is None:
raise NotImplementedError("Proton AMQP C libraries not installed")
super().__init__(conf, url, default_exchange,
allowed_remote_exmods)
opt_group = cfg.OptGroup(name='oslo_messaging_amqp',
title='AMQP 1.0 driver options')
conf.register_group(opt_group)
conf.register_opts(opts.amqp1_opts, group=opt_group)
conf = common.ConfigOptsProxy(conf, url, opt_group.name)
self._conf = conf
self._default_exchange = default_exchange
# lazy connection setup - don't create the controller until
# after the first messaging request:
self._ctrl = None
self._pid = None
self._lock = threading.Lock()
# timeout for message acknowledgement
opt_name = conf.oslo_messaging_amqp
self._default_reply_timeout = opt_name.default_reply_timeout
self._default_send_timeout = opt_name.default_send_timeout
self._default_notify_timeout = opt_name.default_notify_timeout
self._default_reply_retry = opt_name.default_reply_retry
# which message types should be sent pre-settled?
ps = [s.lower() for s in opt_name.pre_settled]
self._pre_settle_call = 'rpc-call' in ps
self._pre_settle_reply = 'rpc-reply' in ps
self._pre_settle_cast = 'rpc-cast' in ps
self._pre_settle_notify = 'notify' in ps
bad_opts = set(ps).difference(['rpc-call', 'rpc-reply',
'rpc-cast', 'notify'])
if bad_opts:
LOG.warning("Ignoring unrecognized pre_settle value(s): %s",
" ".join(bad_opts))
def _ensure_connect_called(func):
"""Causes a new controller to be created when the messaging service is
first used by the current process. It is safe to push tasks to it
whether connected or not, but those tasks won't be processed until
connection completes.
"""
def wrap(self, *args, **kws):
with self._lock:
# check to see if a fork was done after the Controller and its
# I/O thread was spawned. old_pid will be None the first time
# this is called which will cause the Controller to be created.
old_pid = self._pid
self._pid = os.getpid()
if old_pid != self._pid:
if self._ctrl is not None:
# fork was called after the Controller was created, and
# we are now executing as the child process. Do not
# touch the existing Controller - it is owned by the
# parent. Best we can do here is simply drop it and
# hope we get lucky.
LOG.warning("Process forked after connection "
"established!")
self._ctrl = None
# Create a Controller that connects to the messaging
# service:
self._ctrl = controller.Controller(self._url,
self._default_exchange,
self._conf)
self._ctrl.connect()
return func(self, *args, **kws)
return wrap
@_ensure_connect_called
def send(self, target, ctxt, message,
wait_for_reply=False,
timeout=None, call_monitor_timeout=None,
retry=None, transport_options=None):
"""Send a message to the given target.
:param target: destination for message
:type target: oslo_messaging.Target
:param ctxt: message context
:type ctxt: dict
:param message: message payload
:type message: dict
:param wait_for_reply: expects a reply message, wait for it
:type wait_for_reply: bool
:param timeout: raise exception if send does not complete within
timeout seconds. None == no timeout.
:type timeout: float
:param call_monitor_timeout: Maximum time the client will wait for the
call to complete or receive a message heartbeat indicating the
remote side is still executing.
:type call_monitor_timeout: float
:param retry: (optional) maximum re-send attempts on recoverable error
None or -1 means to retry forever
0 means no retry
N means N retries
:type retry: int
:param transport_options: transport-specific options to apply to the
sending of the message (TBD)
:type transport_options: dictionary
"""
request = marshal_request(message, ctxt, None,
call_monitor_timeout)
if timeout:
expire = compute_timeout(timeout)
request.ttl = timeout
request.expiry_time = compute_timeout(timeout)
else:
# no timeout provided by application. If the backend is queueless
# this could lead to a hang - provide a default to prevent this
# TODO(kgiusti) only do this if brokerless backend
expire = compute_timeout(self._default_send_timeout)
if wait_for_reply:
ack = not self._pre_settle_call
if call_monitor_timeout is None:
task = controller.RPCCallTask(target, request, expire, retry,
wait_for_ack=ack)
else:
task = controller.RPCMonitoredCallTask(target, request, expire,
call_monitor_timeout,
retry, wait_for_ack=ack)
else:
ack = not self._pre_settle_cast
task = controller.SendTask("RPC Cast", request, target, expire,
retry, wait_for_ack=ack)
self._ctrl.add_task(task)
reply = task.wait()
if isinstance(reply, Exception):
raise reply
if reply:
# TODO(kgiusti) how to handle failure to un-marshal?
# Must log, and determine best way to communicate this failure
# back up to the caller
reply = unmarshal_response(reply, self._allowed_remote_exmods)
return reply
@_ensure_connect_called
def send_notification(self, target, ctxt, message, version,
retry=None):
"""Send a notification message to the given target.
:param target: destination for message
:type target: oslo_messaging.Target
:param ctxt: message context
:type ctxt: dict
:param message: message payload
:type message: dict
:param version: message envelope version
:type version: float
:param retry: (optional) maximum re-send attempts on recoverable error
None or -1 means to retry forever
0 means no retry
N means N retries
:type retry: int
"""
request = marshal_request(message, ctxt, envelope=(version == 2.0))
# no timeout is applied to notifications, however if the backend is
# queueless this could lead to a hang - provide a default to prevent
# this
# TODO(kgiusti) should raise NotImplemented if not broker backend
deadline = compute_timeout(self._default_notify_timeout)
ack = not self._pre_settle_notify
task = controller.SendTask("Notify", request, target,
deadline, retry, wait_for_ack=ack,
notification=True)
self._ctrl.add_task(task)
rc = task.wait()
if isinstance(rc, Exception):
raise rc
@_ensure_connect_called
def listen(self, target, batch_size, batch_timeout):
"""Construct a Listener for the given target."""
LOG.debug("Listen to %s", target)
listener = ProtonListener(self)
task = controller.SubscribeTask(target, listener)
self._ctrl.add_task(task)
task.wait()
return base.PollStyleListenerAdapter(listener, batch_size,
batch_timeout)
@_ensure_connect_called
def listen_for_notifications(self, targets_and_priorities, pool,
batch_size, batch_timeout):
"""Construct a Listener for notifications on the given target and
priority.
"""
# TODO(kgiusti) should raise NotImplemented if not broker backend
LOG.debug("Listen for notifications %s", targets_and_priorities)
if pool:
raise NotImplementedError('"pool" not implemented by '
'this transport driver')
listener = ProtonListener(self)
# this is how the destination target is created by the notifier,
# see MessagingDriver.notify in oslo_messaging/notify/messaging.py
for target, priority in targets_and_priorities:
topic = '{}.{}'.format(target.topic, priority)
# Sooo... the exchange is simply discarded? (see above comment)
task = controller.SubscribeTask(Target(topic=topic),
listener, notifications=True)
self._ctrl.add_task(task)
task.wait()
return base.PollStyleListenerAdapter(listener, batch_size,
batch_timeout)
def cleanup(self):
"""Release all resources."""
if self._ctrl:
self._ctrl.shutdown()
self._ctrl = None
LOG.info("AMQP 1.0 messaging driver shutdown")
def require_features(self, requeue=True):
pass

View File

@ -55,9 +55,6 @@ class ConfFixture(fixtures.Fixture):
_import_opts(self.conf,
'oslo_messaging._drivers.amqp', 'amqp_opts',
'oslo_messaging_rabbit')
_import_opts(self.conf,
'oslo_messaging._drivers.amqp1_driver.opts',
'amqp1_opts', 'oslo_messaging_amqp')
_import_opts(self.conf, 'oslo_messaging.rpc.client', '_client_opts')
_import_opts(self.conf, 'oslo_messaging.transport', '_transport_opts')
_import_opts(self.conf, 'oslo_messaging.rpc.dispatcher',

View File

@ -16,7 +16,6 @@ import copy
import itertools
from oslo_messaging._drivers import amqp
from oslo_messaging._drivers.amqp1_driver import opts as amqp_opts
from oslo_messaging._drivers import base as drivers_base
from oslo_messaging._drivers import impl_rabbit
from oslo_messaging._drivers.kafka_driver import kafka_options
@ -40,7 +39,6 @@ _global_opt_lists = [
_opts = [
(None, list(itertools.chain(*_global_opt_lists))),
('oslo_messaging_amqp', amqp_opts.amqp1_opts),
('oslo_messaging_notifications', notifier._notifier_opts),
('oslo_messaging_rabbit', list(
itertools.chain(amqp.amqp_opts, impl_rabbit.rabbit_opts))),

File diff suppressed because it is too large Load Diff

View File

@ -29,11 +29,10 @@ from oslo_messaging.tests import utils as test_utils
class OptsTestCase(test_utils.BaseTestCase):
def _test_list_opts(self, result):
self.assertEqual(5, len(result))
self.assertEqual(4, len(result))
groups = [g for (g, l) in result]
self.assertIn(None, groups)
self.assertIn('oslo_messaging_amqp', groups)
self.assertIn('oslo_messaging_notifications', groups)
self.assertIn('oslo_messaging_rabbit', groups)
self.assertIn('oslo_messaging_kafka', groups)

View File

@ -0,0 +1,6 @@
---
upgrade:
- |
The AMQP1 driver, which was deprecated in 14.1.0 was removed, due to
limited usage and lack of support on recent distributions. Use any of
the other supported driver, such as RabbitMQ or Kafka.

View File

@ -25,9 +25,7 @@ classifier =
[extras]
# package dependencies for optional (non-rabbitmq) messaging drivers.
# projects can test-depend on oslo.messaging[<drivers>]
# e.g.: oslo.messaging[kafka,amqp1]
amqp1 =
pyngus>=2.2.0 # Apache-2.0
# e.g.: oslo.messaging[kafka]
kafka =
confluent-kafka>=1.3.0 # Apache-2.0
@ -41,7 +39,6 @@ console_scripts =
oslo.messaging.drivers =
rabbit = oslo_messaging._drivers.impl_rabbit:RabbitDriver
amqp = oslo_messaging._drivers.impl_amqp1:ProtonDriver
# This driver is supporting for only notification usage
kafka = oslo_messaging._drivers.impl_kafka:KafkaDriver

View File

@ -1,20 +0,0 @@
#!/bin/bash -xe
# This script will be run by OpenStack CI before unit tests are run,
# it sets up the test system as needed.
# Developer should setup their test systems in a similar way.
# This setup for amqp1 needs to be run by a user that can run sudo.
# qdrouterd needs to be installed from qpid/testing repo in Ubuntu.
# bindep does not allow setting up another repo, so we just install
# this package here.
# inspired from project-config install-distro-packages.sh
#if apt-get -v >/dev/null 2>&1 ; then
# sudo add-apt-repository -y ppa:qpid/testing
# sudo apt-get -qq update
# sudo PATH=/usr/sbin:/sbin:$PATH DEBIAN_FRONTEND=noninteractive \
# apt-get -q --option "Dpkg::Options::=--force-confold" \
# --assume-yes install qdrouterd
#fi