From d57eccd862c1a008f9b1f868c0832e938a75415e Mon Sep 17 00:00:00 2001 From: John Eckersberg Date: Mon, 8 Nov 2021 15:19:45 -0500 Subject: [PATCH] amqp1: fix race when reconnecting Currently this is how reconnect works: - pyngus detects failure and invokes callback Controller.connection_failed() which in turn calls Controller._handle_connection_loss() - The first thing that _handle_connection_loss does is to set self.addresser to None (important later) - Then it defers _do_reconnect after a delay (normally 1 second) - (1 second passes) - _do_reconnect calls _hard_reset which resets the controller state However, there is a race here. This can happen: - The above, up until it defers and waits for 1 second - Controller.send() is invoked on a task - A new Sender is created, and critically because self.reply_link still exists and is active, we call sender.attach and pass in self.addresser. Remember _handle_connection_loss sets self.addresser to None. - Eventually Sender.attach throws an AttributeError because it attempts to call addresser.resolve() but addresser is None The reason this happens is because although the connection is dead, the controller state is still half-alive because _hard_reset hasn't been called yet since it's deferred one second in _do_reconnect. The fix here is to move _hard_reset out of _do_reconnect and directly into _handle_connection_loss. The eventloop is woken up immediately to process _hard_reset but _do_reconnect is still deferred as before so as to retain the desired reconnect backoff behavior. Closes-Bug: #1941652 Change-Id: Ife62a7d76022908f0dc6a77f1ad607cb2fbd3e8f (cherry picked from commit 02a38f507d8f0c377a2ef468e3497b5e897f1b09) --- oslo_messaging/_drivers/amqp1_driver/controller.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/oslo_messaging/_drivers/amqp1_driver/controller.py b/oslo_messaging/_drivers/amqp1_driver/controller.py index bba7228c9..44a6bb4b5 100644 --- a/oslo_messaging/_drivers/amqp1_driver/controller.py +++ b/oslo_messaging/_drivers/amqp1_driver/controller.py @@ -1245,6 +1245,7 @@ class Controller(pyngus.ConnectionEventHandler): # service. Try to re-establish the connection: if not self._reconnecting: self._reconnecting = True + self.processor.wakeup(lambda: self._hard_reset(reason)) LOG.info("Delaying reconnect attempt for %d seconds", self._delay) self.processor.defer(lambda: self._do_reconnect(reason), @@ -1261,7 +1262,6 @@ class Controller(pyngus.ConnectionEventHandler): """ self._reconnecting = False if not self._closing: - self._hard_reset(reason) host = self.hosts.next() LOG.info("Reconnecting to: %(hostname)s:%(port)s", {'hostname': host.hostname, 'port': host.port}) @@ -1331,4 +1331,5 @@ class Controller(pyngus.ConnectionEventHandler): def _active(self): # Is the connection up return (self._socket_connection and + self._socket_connection.pyngus_conn and self._socket_connection.pyngus_conn.active)