From 9972158baa5852736ef5669efb07354513494ede Mon Sep 17 00:00:00 2001 From: Kenneth Giusti Date: Mon, 12 Dec 2016 15:32:27 -0500 Subject: [PATCH] [AMQP 1.0] Propagate authentication errors to caller When the connection fails due to an authentication issue raise a MessageDeliveryFailure on all pending send requests. Include the authentication error message in the exception. Change-Id: I06b40c6c480a4d082dce64801736b3d12f696c26 Closes-Bug: 1508512 --- .../_drivers/amqp1_driver/controller.py | 68 ++++++++++--------- .../_drivers/amqp1_driver/eventloop.py | 27 +++----- .../tests/drivers/test_amqp_driver.py | 24 ++++--- 3 files changed, 61 insertions(+), 58 deletions(-) diff --git a/oslo_messaging/_drivers/amqp1_driver/controller.py b/oslo_messaging/_drivers/amqp1_driver/controller.py index 4a9361ceb..49aba929e 100644 --- a/oslo_messaging/_drivers/amqp1_driver/controller.py +++ b/oslo_messaging/_drivers/amqp1_driver/controller.py @@ -293,7 +293,7 @@ class Sender(pyngus.SenderEventHandler): if self._link: self._link.close() - def reset(self): + def reset(self, reason="Link reset"): """Called by the controller on connection failover. Release all link resources, abort any in-flight messages, and check the retry limit on all pending send requests. @@ -304,16 +304,16 @@ class Sender(pyngus.SenderEventHandler): if self._link: self._link.destroy() self._link = None - self._abort_unacked("Link reset") - self._check_retry_limit() + self._abort_unacked(reason) + self._check_retry_limit(reason) - def destroy(self): + def destroy(self, reason="Link destroyed"): """Destroy the sender and all pending messages. Called on driver shutdown. """ LOG.debug("Sender %s destroyed", self._address) - self.reset() - self._abort_pending("Link destroyed") + self.reset(reason) + self._abort_pending(reason) def send_message(self, send_task): """Send a message out the link. @@ -354,21 +354,24 @@ class Sender(pyngus.SenderEventHandler): # sender_closed() will be called once the link completes closing def sender_closed(self, sender_link): - self._abort_unacked("Sender closed") - if self._connection: - # still attached, so attempt to restart the link - self._check_retry_limit() - self._scheduler.defer(self._reopen_link, self._delay) + self._handle_sender_closed() def sender_failed(self, sender_link, error): """Protocol error occurred.""" LOG.warning(_LW("sender %(addr)s failed error=%(error)s"), {'addr': self._address, 'error': error}) - self.sender_closed(sender_link) + self._handle_sender_closed(str(error)) # end Pyngus callbacks - def _check_retry_limit(self): + def _handle_sender_closed(self, reason="Sender closed"): + self._abort_unacked(reason) + if self._connection: + # still attached, so attempt to restart the link + self._check_retry_limit(reason) + self._scheduler.defer(self._reopen_link, self._delay) + + def _check_retry_limit(self, reason): # Called on recoverable connection or link failure. Remove any pending # sends that have exhausted their retry count: expired = set() @@ -377,7 +380,7 @@ class Sender(pyngus.SenderEventHandler): send_task.retry -= 1 if send_task.retry <= 0: expired.add(send_task) - send_task._on_error("Send retries exhausted") + send_task._on_error("Message send failed: %s" % reason) while expired: self._pending_sends.remove(expired.pop()) @@ -813,7 +816,7 @@ class Controller(pyngus.ConnectionEventHandler): self._closing = False # only schedule one outstanding reconnect attempt at a time self._reconnecting = False - self._delay = 1 # seconds between retries + self._delay = self.conn_retry_interval # seconds between retries # prevent queuing up multiple requests to run _process_tasks() self._process_tasks_scheduled = False self._process_tasks_lock = threading.Lock() @@ -843,7 +846,7 @@ class Controller(pyngus.ConnectionEventHandler): self.processor.wakeup(self._start_shutdown) LOG.debug("Waiting for eventloop to exit") self.processor.join(timeout) - self._hard_reset() + self._hard_reset("Shutting down") for sender in itervalues(self._all_senders): sender.destroy() self._all_senders.clear() @@ -1013,7 +1016,7 @@ class Controller(pyngus.ConnectionEventHandler): def socket_error(self, error): """Called by eventloop when a socket error occurs.""" LOG.error(_LE("Socket failure: %s"), error) - self._handle_connection_loss() + self._handle_connection_loss(str(error)) # Pyngus connection event callbacks (and their helpers), all invoked from # the eventloop thread: @@ -1026,7 +1029,7 @@ class Controller(pyngus.ConnectionEventHandler): # pyngus bug: ignore failure callback on destroyed connections return LOG.debug("AMQP Connection failure: %s", error) - self._handle_connection_loss() + self._handle_connection_loss(str(error)) def connection_active(self, connection): """This is a Pyngus callback, invoked by Pyngus when the connection to @@ -1048,7 +1051,7 @@ class Controller(pyngus.ConnectionEventHandler): self._reply_link_ready, self._reply_link_down, self._reply_credit) - self._delay = 1 + self._delay = self.conn_retry_interval # reset # schedule periodic maintenance of sender links self._link_maint_timer = self.processor.defer(self._purge_sender_links, self._link_maint_timeout) @@ -1061,7 +1064,7 @@ class Controller(pyngus.ConnectionEventHandler): """ LOG.debug("AMQP connection closed.") # if the driver isn't being shutdown, failover and reconnect - self._handle_connection_loss() + self._handle_connection_loss("AMQP connection closed.") def connection_remote_closed(self, connection, reason): """This is a Pyngus callback, invoked by Pyngus when the peer has @@ -1089,13 +1092,14 @@ class Controller(pyngus.ConnectionEventHandler): {'hostname': self.hosts.current.hostname, 'port': self.hosts.current.port, 'username': self.hosts.current.username}) - # connection failure will be handled later + # pyngus will invoke connection_failed() eventually - def _handle_connection_loss(self): + def _handle_connection_loss(self, reason): """The connection to the messaging service has been lost. Try to reestablish the connection/failover if not shutting down the driver. """ self.addresser = None + self._socket_connection.close() if self._closing: # we're in the middle of shutting down the driver anyways, # just consider it done: @@ -1107,31 +1111,33 @@ class Controller(pyngus.ConnectionEventHandler): self._reconnecting = True LOG.info(_LI("delaying reconnect attempt for %d seconds"), self._delay) - self.processor.defer(self._do_reconnect, self._delay) - self._delay = min(self._delay * 2, 60) + self.processor.defer(lambda: self._do_reconnect(reason), + self._delay) + self._delay = min(self._delay * self.conn_retry_backoff, + self.conn_retry_interval_max) if self._link_maint_timer: self._link_maint_timer.cancel() self._link_maint_timer = None - def _do_reconnect(self): + def _do_reconnect(self, reason): """Invoked on connection/socket failure, failover and re-connect to the messaging service. """ self._reconnecting = False if not self._closing: - self._hard_reset() + self._hard_reset(reason) host = self.hosts.next() LOG.info(_LI("Reconnecting to: %(hostname)s:%(port)s"), {'hostname': host.hostname, 'port': host.port}) self._socket_connection.connect(host) - def _hard_reset(self): + def _hard_reset(self, reason): """Reset the controller to its pre-connection state""" # note well: since this method destroys the connection, it cannot be # invoked directly from a pyngus callback. Use processor.defer() to # run this method on the main loop instead. for sender in self._purged_senders: - sender.destroy() + sender.destroy(reason) del self._purged_senders[:] self._active_senders.clear() unused = [] @@ -1140,10 +1146,10 @@ class Controller(pyngus.ConnectionEventHandler): if sender.pending_messages == 0: unused.append(key) else: - sender.reset() + sender.reset(reason) self._active_senders.add(key) for key in unused: - self._all_senders[key].destroy() + self._all_senders[key].destroy(reason) del self._all_senders[key] for servers in itervalues(self._servers): for server in itervalues(servers): @@ -1170,7 +1176,7 @@ class Controller(pyngus.ConnectionEventHandler): if not self._closing: # destroy links that have already been closed for sender in self._purged_senders: - sender.destroy() + sender.destroy("Idle link purged") del self._purged_senders[:] # determine next set to purge diff --git a/oslo_messaging/_drivers/amqp1_driver/eventloop.py b/oslo_messaging/_drivers/amqp1_driver/eventloop.py index 0f3b5da02..c2e16fbe0 100644 --- a/oslo_messaging/_drivers/amqp1_driver/eventloop.py +++ b/oslo_messaging/_drivers/amqp1_driver/eventloop.py @@ -69,31 +69,27 @@ class _SocketConnection(object): def read_socket(self): """Called to read from the socket.""" - while True: + if self.socket: try: - rc = pyngus.read_socket_input(self.pyngus_conn, self.socket) + pyngus.read_socket_input(self.pyngus_conn, self.socket) self.pyngus_conn.process(now()) - return rc except (socket.timeout, socket.error) as e: # pyngus handles EAGAIN/EWOULDBLOCK and EINTER self.pyngus_conn.close_input() self.pyngus_conn.close_output() self._handler.socket_error(str(e)) - return pyngus.Connection.EOS def write_socket(self): """Called to write to the socket.""" - while True: + if self.socket: try: - rc = pyngus.write_socket_output(self.pyngus_conn, self.socket) + pyngus.write_socket_output(self.pyngus_conn, self.socket) self.pyngus_conn.process(now()) - return rc except (socket.timeout, socket.error) as e: # pyngus handles EAGAIN/EWOULDBLOCK and EINTER self.pyngus_conn.close_output() self.pyngus_conn.close_input() self._handler.socket_error(str(e)) - return pyngus.Connection.EOS def connect(self, host): """Connect to host and start the AMQP protocol.""" @@ -358,7 +354,7 @@ class Thread(threading.Thread): deadline = self._scheduler._next_deadline pyngus_conn = self._connection and self._connection.pyngus_conn - if pyngus_conn: + if pyngus_conn and self._connection.socket: if pyngus_conn.needs_input: readfds.append(self._connection) if pyngus_conn.has_output: @@ -388,13 +384,12 @@ class Thread(threading.Thread): # Testing shows that polling improves latency over checking the # lists returned by select() self._requests.process_requests() - if pyngus_conn: - self._connection.read_socket() - if pyngus_conn.deadline: - _now = now() - if pyngus_conn.deadline <= _now: - pyngus_conn.process(_now) - self._connection.write_socket() + self._connection.read_socket() + if pyngus_conn and pyngus_conn.deadline: + _now = now() + if pyngus_conn.deadline <= _now: + pyngus_conn.process(_now) + self._connection.write_socket() self._scheduler._process() # run any deferred requests diff --git a/oslo_messaging/tests/drivers/test_amqp_driver.py b/oslo_messaging/tests/drivers/test_amqp_driver.py index dd1f25bac..12db37a60 100644 --- a/oslo_messaging/tests/drivers/test_amqp_driver.py +++ b/oslo_messaging/tests/drivers/test_amqp_driver.py @@ -674,12 +674,12 @@ class TestAuthentication(test_utils.BaseTestCase): target = oslo_messaging.Target(topic="test-topic") _ListenerThread( driver.listen(target, None, None)._poll_style_listener, 1) - self.assertRaises(oslo_messaging.MessagingTimeout, + self.assertRaises(oslo_messaging.MessageDeliveryFailure, driver.send, target, {"context": True}, {"method": "echo"}, wait_for_reply=True, - timeout=2.0) + retry=2) driver.cleanup() @@ -771,7 +771,6 @@ mech_list: ${mechs} """Verify that a bad password given in TransportHost is rejected by the broker. """ - addr = "amqp://joe:badpass@%s:%d" % (self._broker.host, self._broker.port) url = oslo_messaging.TransportURL.parse(self.conf, addr) @@ -779,12 +778,15 @@ mech_list: ${mechs} target = oslo_messaging.Target(topic="test-topic") _ListenerThread( driver.listen(target, None, None)._poll_style_listener, 1) - self.assertRaises(oslo_messaging.MessagingTimeout, - driver.send, - target, {"context": True}, - {"method": "echo"}, - wait_for_reply=True, - timeout=2.0) + try: + driver.send(target, {"context": True}, {"method": "echo"}, + wait_for_reply=True, retry=2) + except oslo_messaging.MessageDeliveryFailure as e: + # verify the exception indicates the failure was an authentication + # error + self.assertTrue('amqp:unauthorized-access' in str(e)) + else: + self.assertIsNone("Expected authentication failure") driver.cleanup() def test_authentication_bad_mechs(self): @@ -800,12 +802,12 @@ mech_list: ${mechs} target = oslo_messaging.Target(topic="test-topic") _ListenerThread( driver.listen(target, None, None)._poll_style_listener, 1) - self.assertRaises(oslo_messaging.MessagingTimeout, + self.assertRaises(oslo_messaging.MessageDeliveryFailure, driver.send, target, {"context": True}, {"method": "echo"}, wait_for_reply=True, - timeout=2.0) + retry=0) driver.cleanup() def test_authentication_default_username(self):