Follow the plan about the single reply message
This change removes the "send_single_reply" option as planned in the bp: http://specs.openstack.org/openstack/oslo-specs/specs/liberty/oslo.messaging-remove-double-reply.html Change-Id: Ib88de71cb2008a49a25f302d5e47ed587154d402
This commit is contained in:
		@@ -48,18 +48,6 @@ amqp_opts = [
 | 
			
		||||
                default=False,
 | 
			
		||||
                deprecated_group='DEFAULT',
 | 
			
		||||
                help='Auto-delete queues in AMQP.'),
 | 
			
		||||
    cfg.BoolOpt('send_single_reply',
 | 
			
		||||
                default=False,
 | 
			
		||||
                help='Send a single AMQP reply to call message. The current '
 | 
			
		||||
                     'behaviour since oslo-incubator is to send two AMQP '
 | 
			
		||||
                     'replies - first one with the payload, a second one to '
 | 
			
		||||
                     'ensure the other have finish to send the payload. We '
 | 
			
		||||
                     'are going to remove it in the N release, but we must '
 | 
			
		||||
                     'keep backward compatible at the same time. This option '
 | 
			
		||||
                     'provides such compatibility - it defaults to False in '
 | 
			
		||||
                     'Liberty and can be turned on for early adopters with a '
 | 
			
		||||
                     'new installations or for testing. Please note, that '
 | 
			
		||||
                     'this option will be removed in the Mitaka release.')
 | 
			
		||||
]
 | 
			
		||||
 | 
			
		||||
UNIQUE_ID = '_unique_id'
 | 
			
		||||
 
 | 
			
		||||
@@ -48,8 +48,7 @@ class AMQPIncomingMessage(base.IncomingMessage):
 | 
			
		||||
        self.requeue_callback = message.requeue
 | 
			
		||||
        self._obsolete_reply_queues = obsolete_reply_queues
 | 
			
		||||
 | 
			
		||||
    def _send_reply(self, conn, reply=None, failure=None,
 | 
			
		||||
                    ending=False, log_failure=True):
 | 
			
		||||
    def _send_reply(self, conn, reply=None, failure=None, log_failure=True):
 | 
			
		||||
        if (self.reply_q and
 | 
			
		||||
            not self._obsolete_reply_queues.reply_q_valid(self.reply_q,
 | 
			
		||||
                                                          self.msg_id)):
 | 
			
		||||
@@ -58,11 +57,9 @@ class AMQPIncomingMessage(base.IncomingMessage):
 | 
			
		||||
        if failure:
 | 
			
		||||
            failure = rpc_common.serialize_remote_exception(failure,
 | 
			
		||||
                                                            log_failure)
 | 
			
		||||
 | 
			
		||||
        msg = {'result': reply, 'failure': failure}
 | 
			
		||||
        if ending:
 | 
			
		||||
            msg['ending'] = True
 | 
			
		||||
 | 
			
		||||
        # NOTE(sileht): ending can be removed in N*, see Listener.wait()
 | 
			
		||||
        # for more detail.
 | 
			
		||||
        msg = {'result': reply, 'failure': failure, 'ending': True}
 | 
			
		||||
        rpc_amqp._add_unique_id(msg)
 | 
			
		||||
        unique_id = msg[rpc_amqp.UNIQUE_ID]
 | 
			
		||||
 | 
			
		||||
@@ -71,12 +68,11 @@ class AMQPIncomingMessage(base.IncomingMessage):
 | 
			
		||||
        # Otherwise use the msg_id for backward compatibility.
 | 
			
		||||
        if self.reply_q:
 | 
			
		||||
            msg['_msg_id'] = self.msg_id
 | 
			
		||||
            if ending:
 | 
			
		||||
                LOG.debug("sending reply msg_id: %(msg_id)s "
 | 
			
		||||
                          "reply queue: %(reply_q)s" % {
 | 
			
		||||
                              'msg_id': self.msg_id,
 | 
			
		||||
                              'unique_id': unique_id,
 | 
			
		||||
                              'reply_q': self.reply_q})
 | 
			
		||||
            LOG.debug("sending reply msg_id: %(msg_id)s "
 | 
			
		||||
                      "reply queue: %(reply_q)s" % {
 | 
			
		||||
                          'msg_id': self.msg_id,
 | 
			
		||||
                          'unique_id': unique_id,
 | 
			
		||||
                          'reply_q': self.reply_q})
 | 
			
		||||
            conn.direct_send(self.reply_q, rpc_common.serialize_msg(msg))
 | 
			
		||||
        else:
 | 
			
		||||
            # TODO(sileht): look at which version of oslo-incubator rpc
 | 
			
		||||
@@ -104,21 +100,12 @@ class AMQPIncomingMessage(base.IncomingMessage):
 | 
			
		||||
        timer = rpc_common.DecayingTimer(duration=duration)
 | 
			
		||||
        timer.start()
 | 
			
		||||
 | 
			
		||||
        first_reply_sent = False
 | 
			
		||||
        while True:
 | 
			
		||||
            try:
 | 
			
		||||
                with self.listener.driver._get_connection(
 | 
			
		||||
                        rpc_common.PURPOSE_SEND) as conn:
 | 
			
		||||
                    if self.listener.driver.send_single_reply:
 | 
			
		||||
                        self._send_reply(conn, reply, failure,
 | 
			
		||||
                                         log_failure=log_failure,
 | 
			
		||||
                                         ending=True)
 | 
			
		||||
                    else:
 | 
			
		||||
                        if not first_reply_sent:
 | 
			
		||||
                            self._send_reply(conn, reply, failure,
 | 
			
		||||
                                             log_failure=log_failure)
 | 
			
		||||
                            first_reply_sent = True
 | 
			
		||||
                        self._send_reply(conn, ending=True)
 | 
			
		||||
                    self._send_reply(conn, reply, failure,
 | 
			
		||||
                                     log_failure=log_failure)
 | 
			
		||||
                return
 | 
			
		||||
            except rpc_amqp.AMQPDestinationNotFound:
 | 
			
		||||
                if timer.check_return() > 0:
 | 
			
		||||
@@ -378,8 +365,7 @@ class AMQPDriverBase(base.BaseDriver):
 | 
			
		||||
    missing_destination_retry_timeout = 0
 | 
			
		||||
 | 
			
		||||
    def __init__(self, conf, url, connection_pool,
 | 
			
		||||
                 default_exchange=None, allowed_remote_exmods=None,
 | 
			
		||||
                 send_single_reply=False):
 | 
			
		||||
                 default_exchange=None, allowed_remote_exmods=None):
 | 
			
		||||
        super(AMQPDriverBase, self).__init__(conf, url, default_exchange,
 | 
			
		||||
                                             allowed_remote_exmods)
 | 
			
		||||
 | 
			
		||||
@@ -392,8 +378,6 @@ class AMQPDriverBase(base.BaseDriver):
 | 
			
		||||
        self._reply_q_conn = None
 | 
			
		||||
        self._waiter = None
 | 
			
		||||
 | 
			
		||||
        self.send_single_reply = send_single_reply
 | 
			
		||||
 | 
			
		||||
    def _get_exchange(self, target):
 | 
			
		||||
        return target.exchange or self._default_exchange
 | 
			
		||||
 | 
			
		||||
 
 | 
			
		||||
@@ -1139,8 +1139,7 @@ class RabbitDriver(amqpdriver.AMQPDriverBase):
 | 
			
		||||
            conf, url,
 | 
			
		||||
            connection_pool,
 | 
			
		||||
            default_exchange,
 | 
			
		||||
            allowed_remote_exmods,
 | 
			
		||||
            conf.oslo_messaging_rabbit.send_single_reply,
 | 
			
		||||
            allowed_remote_exmods
 | 
			
		||||
        )
 | 
			
		||||
 | 
			
		||||
    def require_features(self, requeue=True):
 | 
			
		||||
 
 | 
			
		||||
@@ -28,7 +28,6 @@ from oslotest import mockpatch
 | 
			
		||||
import testscenarios
 | 
			
		||||
 | 
			
		||||
import oslo_messaging
 | 
			
		||||
from oslo_messaging._drivers import amqp
 | 
			
		||||
from oslo_messaging._drivers import amqpdriver
 | 
			
		||||
from oslo_messaging._drivers import common as driver_common
 | 
			
		||||
from oslo_messaging._drivers import impl_rabbit as rabbit_driver
 | 
			
		||||
@@ -363,11 +362,6 @@ class TestSendReceive(test_utils.BaseTestCase):
 | 
			
		||||
        ('timeout', dict(timeout=0.01)),  # FIXME(markmc): timeout=0 is broken?
 | 
			
		||||
    ]
 | 
			
		||||
 | 
			
		||||
    _reply_ending = [
 | 
			
		||||
        ('old_behavior', dict(send_single_reply=False)),
 | 
			
		||||
        ('new_behavior', dict(send_single_reply=True)),
 | 
			
		||||
    ]
 | 
			
		||||
 | 
			
		||||
    @classmethod
 | 
			
		||||
    def generate_scenarios(cls):
 | 
			
		||||
        cls.scenarios = testscenarios.multiply_scenarios(cls._n_senders,
 | 
			
		||||
@@ -375,16 +369,13 @@ class TestSendReceive(test_utils.BaseTestCase):
 | 
			
		||||
                                                         cls._reply,
 | 
			
		||||
                                                         cls._reply_fail,
 | 
			
		||||
                                                         cls._failure,
 | 
			
		||||
                                                         cls._timeout,
 | 
			
		||||
                                                         cls._reply_ending)
 | 
			
		||||
                                                         cls._timeout)
 | 
			
		||||
 | 
			
		||||
    def test_send_receive(self):
 | 
			
		||||
        self.config(kombu_missing_consumer_retry_timeout=0.5,
 | 
			
		||||
                    group="oslo_messaging_rabbit")
 | 
			
		||||
        self.config(heartbeat_timeout_threshold=0,
 | 
			
		||||
                    group="oslo_messaging_rabbit")
 | 
			
		||||
        self.config(send_single_reply=self.send_single_reply,
 | 
			
		||||
                    group="oslo_messaging_rabbit")
 | 
			
		||||
        transport = oslo_messaging.get_transport(self.conf,
 | 
			
		||||
                                                 'kombu+memory:////')
 | 
			
		||||
        self.addCleanup(transport.cleanup)
 | 
			
		||||
 
 | 
			
		||||
		Reference in New Issue
	
	Block a user