Add QManager to amqp driver

The purpose of this change is to introduce an optional mechanism to keep
the queues name consistent between service restart.
Oslo messaging is already re-using the queues while running, but the
queues are created using a random name at the beginning.

This change propose an option named use_queue_manager (default to False
- so the behavior is not changed) that can be set to True to switch to a
consistent naming based on hostname and processname.

Related-bug: #2031497

Signed-off-by: Arnaud Morin <arnaud.morin@ovhcloud.com>
Change-Id: I2acdef4e03164fdabcb50fb98a4ac14b1aefda00
This commit is contained in:
Arnaud Morin 2023-07-24 21:48:20 +02:00
parent 989dbb8aad
commit 4614132ad0
3 changed files with 126 additions and 4 deletions

View File

@ -14,12 +14,14 @@
# under the License. # under the License.
import logging import logging
import os
import queue import queue
import threading import threading
import time import time
import uuid import uuid
import cachetools import cachetools
from oslo_concurrency import lockutils
from oslo_utils import eventletutils from oslo_utils import eventletutils
from oslo_utils import timeutils from oslo_utils import timeutils
@ -40,6 +42,63 @@ ACK_REQUEUE_EVERY_SECONDS_MIN = 0.001
ACK_REQUEUE_EVERY_SECONDS_MAX = 5.0 ACK_REQUEUE_EVERY_SECONDS_MAX = 5.0
class QManager(object):
"""Queue Manager to build queue name for reply (and fanout) type.
This class is used only when use_queue_manager is set to True in config
file.
It rely on a shared memory accross processes (reading/writing data to
/dev/shm/xyz) and oslo_concurrency.lockutils to avoid assigning the same
queue name twice (or more) to different processes.
The original idea of this queue manager was to avoid random queue names,
so on service restart, the previously created queues can be reused,
avoiding deletion/creation of queues on rabbitmq side (which cost a lot
at scale).
"""
def __init__(self, hostname, processname):
# We will use hostname and processname in queue names to help identify
# them easily.
# This is also ensuring consistency between service restart.
self.hostname = hostname
self.processname = processname
# This is where the counter is kept
self.file_name = '/dev/shm/%s_%s_qmanager' % (self.hostname, # nosec
self.processname)
# We use the process group to restart the counter on service restart
self.pg = os.getpgrp()
def get(self):
lock_name = 'oslo_read_shm_%s_%s' % (self.hostname, self.processname)
@lockutils.synchronized(lock_name, external=True)
def read_from_shm():
# Grab the counter from shm
# This function is thread and process safe thanks to lockutils
try:
with open(self.file_name, 'r') as f:
pg, c = f.readline().split(':')
pg = int(pg)
c = int(c)
except (FileNotFoundError, ValueError):
pg = self.pg
c = 0
# Increment the counter
if pg == self.pg:
c += 1
else:
# The process group changed, maybe service restarted?
# Start over the counter
c = 1
# Write the new counter
with open(self.file_name, 'w') as f:
f.write(str(self.pg) + ':' + str(c))
return c
c = read_from_shm()
return self.hostname + ":" + self.processname + ":" + str(c)
class MessageOperationsHandler(object): class MessageOperationsHandler(object):
"""Queue used by message operations to ensure that all tasks are """Queue used by message operations to ensure that all tasks are
serialized and run in the same thread, since underlying drivers like kombu serialized and run in the same thread, since underlying drivers like kombu
@ -445,6 +504,7 @@ class ReplyWaiters(object):
'to message ID %s' % msg_id) 'to message ID %s' % msg_id)
def put(self, msg_id, message_data): def put(self, msg_id, message_data):
LOG.info('Received RPC response for msg %s', msg_id)
queue = self._queues.get(msg_id) queue = self._queues.get(msg_id)
if not queue: if not queue:
LOG.info('No calling threads waiting for msg_id : %s', msg_id) LOG.info('No calling threads waiting for msg_id : %s', msg_id)
@ -597,6 +657,12 @@ class AMQPDriverBase(base.BaseDriver):
self._reply_q = None self._reply_q = None
self._reply_q_conn = None self._reply_q_conn = None
self._waiter = None self._waiter = None
if conf.oslo_messaging_rabbit.use_queue_manager:
self._q_manager = QManager(
hostname=conf.oslo_messaging_rabbit.hostname,
processname=conf.oslo_messaging_rabbit.processname)
else:
self._q_manager = None
def _get_exchange(self, target): def _get_exchange(self, target):
return target.exchange or self._default_exchange return target.exchange or self._default_exchange
@ -608,10 +674,16 @@ class AMQPDriverBase(base.BaseDriver):
def _get_reply_q(self): def _get_reply_q(self):
with self._reply_q_lock: with self._reply_q_lock:
# NOTE(amorin) Re-use reply_q when it already exists
# This avoid creating too many queues on AMQP server (rabbit)
if self._reply_q is not None: if self._reply_q is not None:
return self._reply_q return self._reply_q
reply_q = 'reply_' + uuid.uuid4().hex if self._q_manager:
reply_q = 'reply_' + self._q_manager.get()
else:
reply_q = 'reply_' + uuid.uuid4().hex
LOG.info('Creating reply queue: %s', reply_q)
conn = self._get_connection(rpc_common.PURPOSE_LISTEN) conn = self._get_connection(rpc_common.PURPOSE_LISTEN)
@ -628,12 +700,20 @@ class AMQPDriverBase(base.BaseDriver):
envelope=True, notify=False, retry=None, transport_options=None): envelope=True, notify=False, retry=None, transport_options=None):
msg = message msg = message
if 'method' in msg:
LOG.debug('Calling RPC method %s on target %s', msg.get('method'),
target.topic)
else:
LOG.debug('Sending message to topic %s', target.topic)
if wait_for_reply: if wait_for_reply:
_reply_q = self._get_reply_q()
msg_id = uuid.uuid4().hex msg_id = uuid.uuid4().hex
msg.update({'_msg_id': msg_id}) msg.update({'_msg_id': msg_id})
msg.update({'_reply_q': self._get_reply_q()}) msg.update({'_reply_q': _reply_q})
msg.update({'_timeout': call_monitor_timeout}) msg.update({'_timeout': call_monitor_timeout})
LOG.info('Expecting reply to msg %s in queue %s', msg_id,
_reply_q)
rpc_amqp._add_unique_id(msg) rpc_amqp._add_unique_id(msg)
unique_id = msg[rpc_amqp.UNIQUE_ID] unique_id = msg[rpc_amqp.UNIQUE_ID]

View File

@ -241,7 +241,16 @@ rabbit_opts = [
default=False, default=False,
help="Enable x-cancel-on-ha-failover flag so that " help="Enable x-cancel-on-ha-failover flag so that "
"rabbitmq server will cancel and notify consumers" "rabbitmq server will cancel and notify consumers"
"when queue is down") "when queue is down"),
cfg.BoolOpt('use_queue_manager',
default=False,
help='Should we use consistant queue names or random ones'),
cfg.StrOpt('hostname',
default=socket.gethostname(),
help='Hostname used by queue manager'),
cfg.StrOpt('processname',
default=os.path.basename(sys.argv[0]),
help='Process name used by queue manager'),
] ]
LOG = logging.getLogger(__name__) LOG = logging.getLogger(__name__)
@ -665,6 +674,7 @@ class Connection(object):
self.heartbeat_in_pthread = driver_conf.heartbeat_in_pthread self.heartbeat_in_pthread = driver_conf.heartbeat_in_pthread
self.ssl_enforce_fips_mode = driver_conf.ssl_enforce_fips_mode self.ssl_enforce_fips_mode = driver_conf.ssl_enforce_fips_mode
self.enable_cancel_on_failover = driver_conf.enable_cancel_on_failover self.enable_cancel_on_failover = driver_conf.enable_cancel_on_failover
self.use_queue_manager = driver_conf.use_queue_manager
if self.heartbeat_in_pthread: if self.heartbeat_in_pthread:
# NOTE(hberaud): Experimental: threading module is in use to run # NOTE(hberaud): Experimental: threading module is in use to run
@ -844,6 +854,13 @@ class Connection(object):
self.connection.port = 1234 self.connection.port = 1234
self._poll_timeout = 0.05 self._poll_timeout = 0.05
if self.use_queue_manager:
self._q_manager = amqpdriver.QManager(
hostname=driver_conf.hostname,
processname=driver_conf.processname)
else:
self._q_manager = None
# FIXME(markmc): use oslo sslutils when it is available as a library # FIXME(markmc): use oslo sslutils when it is available as a library
_SSL_PROTOCOLS = { _SSL_PROTOCOLS = {
"tlsv1": ssl.PROTOCOL_TLSv1, "tlsv1": ssl.PROTOCOL_TLSv1,
@ -1388,9 +1405,13 @@ class Connection(object):
def declare_fanout_consumer(self, topic, callback): def declare_fanout_consumer(self, topic, callback):
"""Create a 'fanout' consumer.""" """Create a 'fanout' consumer."""
unique = uuid.uuid4().hex if self._q_manager:
unique = self._q_manager.get()
else:
unique = uuid.uuid4().hex
exchange_name = '%s_fanout' % topic exchange_name = '%s_fanout' % topic
queue_name = '%s_fanout_%s' % (topic, unique) queue_name = '%s_fanout_%s' % (topic, unique)
LOG.info('Creating fanout queue: %s', queue_name)
consumer = Consumer( consumer = Consumer(
exchange_name=exchange_name, exchange_name=exchange_name,

View File

@ -0,0 +1,21 @@
---
features:
- |
Add three new options (``use_queue_manager``, ``hostname``,
``processname``) to switch oslo.messaging from random queue names
(for reply_q and fanouts) to consistent naming.
The default value is False, so oslo.messaging will still use random queue
names if nothing is set in configuration file of services.
When switching use_queue_manager to True, the uuid4 random string from the
queue name is replaced with a combination of hostname, processname and
counter.
The counter will be kept in shared memory (/dev/shm/x_y_qmanager).
This way, when a service using oslo.messaging restarts (e.g. neutron),
it will re-create the queues using the same name as the previous run, so
no new queues are created and no need for rabbitmq to delete the previous
queues.
This is extremely useful for operator to debug which queue belong to which
server/process.
It's also higlhy recommended to enable this feature when using quorum
queues for transient (option named ``rabbit_transient_quorum_queue``) to
avoid consuming all erlang atoms after some time.