amqp1: fix race when reconnecting

Currently this is how reconnect works:

- pyngus detects failure and invokes callback
  Controller.connection_failed() which in turn calls
  Controller._handle_connection_loss()

- The first thing that _handle_connection_loss does is to set
  self.addresser to None (important later)

- Then it defers _do_reconnect after a delay (normally 1 second)

- (1 second passes)

- _do_reconnect calls _hard_reset which resets the controller state

However, there is a race here.  This can happen:

- The above, up until it defers and waits for 1 second

- Controller.send() is invoked on a task

- A new Sender is created, and critically because self.reply_link
  still exists and is active, we call sender.attach and pass in
  self.addresser.  Remember _handle_connection_loss sets
  self.addresser to None.

- Eventually Sender.attach throws an AttributeError because it
  attempts to call addresser.resolve() but addresser is None

The reason this happens is because although the connection is dead,
the controller state is still half-alive because _hard_reset hasn't
been called yet since it's deferred one second in _do_reconnect.

The fix here is to move _hard_reset out of _do_reconnect and directly
into _handle_connection_loss.  The eventloop is woken up immediately
to process _hard_reset but _do_reconnect is still deferred as before
so as to retain the desired reconnect backoff behavior.

Closes-Bug: #1941652
Change-Id: Ife62a7d76022908f0dc6a77f1ad607cb2fbd3e8f
(cherry picked from commit 02a38f507d)
This commit is contained in:
John Eckersberg 2021-11-08 15:19:45 -05:00 committed by Hervé Beraud
parent b657365568
commit d57eccd862
1 changed files with 2 additions and 1 deletions

View File

@ -1245,6 +1245,7 @@ class Controller(pyngus.ConnectionEventHandler):
# service. Try to re-establish the connection:
if not self._reconnecting:
self._reconnecting = True
self.processor.wakeup(lambda: self._hard_reset(reason))
LOG.info("Delaying reconnect attempt for %d seconds",
self._delay)
self.processor.defer(lambda: self._do_reconnect(reason),
@ -1261,7 +1262,6 @@ class Controller(pyngus.ConnectionEventHandler):
"""
self._reconnecting = False
if not self._closing:
self._hard_reset(reason)
host = self.hosts.next()
LOG.info("Reconnecting to: %(hostname)s:%(port)s",
{'hostname': host.hostname, 'port': host.port})
@ -1331,4 +1331,5 @@ class Controller(pyngus.ConnectionEventHandler):
def _active(self):
# Is the connection up
return (self._socket_connection and
self._socket_connection.pyngus_conn and
self._socket_connection.pyngus_conn.active)