diff --git a/oslo_messaging/_drivers/impl_rabbit.py b/oslo_messaging/_drivers/impl_rabbit.py index ae795d837..f48c2ad94 100644 --- a/oslo_messaging/_drivers/impl_rabbit.py +++ b/oslo_messaging/_drivers/impl_rabbit.py @@ -48,6 +48,9 @@ from oslo_messaging._i18n import _LW from oslo_messaging import _utils from oslo_messaging import exceptions +# NOTE(sileht): don't exists in py2 socket module +TCP_USER_TIMEOUT = 18 + rabbit_opts = [ cfg.StrOpt('kombu_ssl_version', @@ -647,6 +650,7 @@ class Connection(object): # the kombu underlying connection works self._set_current_channel(None) self.ensure(method=lambda: self.connection.connection) + self.set_transport_socket_timeout() def ensure(self, method, retry=None, recoverable_error_callback=None, error_callback=None, @@ -714,6 +718,8 @@ class Connection(object): """Callback invoked when the kombu reconnects and creates a new channel, we use it the reconfigure our consumers. """ + + self.set_transport_socket_timeout() self._set_current_channel(new_channel) for consumer in self._consumers: consumer.declare(self) @@ -834,8 +840,7 @@ class Connection(object): self._heartbeat_support_log_emitted = True return False - @contextlib.contextmanager - def _transport_socket_timeout(self, timeout): + def set_transport_socket_timeout(self, timeout=None): # NOTE(sileht): they are some case where the heartbeat check # or the producer.send return only when the system socket # timeout if reach. kombu doesn't allow use to customise this @@ -844,27 +849,37 @@ class Connection(object): # kombu==3.0.33. Once the commit below is released, we should # try to set the socket timeout in the constructor: # https://github.com/celery/py-amqp/pull/64 + + heartbeat_timeout = self.heartbeat_timeout_threshold + if self._heartbeat_supported_and_enabled(): + # NOTE(sileht): we are supposed to send heartbeat every + # heartbeat_timeout, no need to wait more otherwise will + # disconnect us, so raise timeout earlier ourself + if timeout is None: + timeout = heartbeat_timeout + else: + timeout = min(heartbeat_timeout, timeout) + try: sock = self.channel.connection.sock except AttributeError as e: # Level is set to debug because otherwise we would spam the logs LOG.debug('Failed to get socket attribute: %s' % str(e)) - sock = None - - if sock: - orig_timeout = sock.gettimeout() + else: sock.settimeout(timeout) + sock.setsockopt(socket.IPPROTO_TCP, TCP_USER_TIMEOUT, + timeout * 1000 if timeout is not None else 0) + + @contextlib.contextmanager + def _transport_socket_timeout(self, timeout): + self.set_transport_socket_timeout(timeout) yield - if sock: - sock.settimeout(orig_timeout) + self.set_transport_socket_timeout() def _heartbeat_check(self): # NOTE(sileht): we are supposed to send at least one heartbeat # every heartbeat_timeout_threshold, so no need to way more - with self._transport_socket_timeout( - self.heartbeat_timeout_threshold): - self.connection.heartbeat_check( - rate=self.heartbeat_rate) + self.connection.heartbeat_check(rate=self.heartbeat_rate) def _heartbeat_start(self): if self._heartbeat_supported_and_enabled(): @@ -1089,25 +1104,15 @@ class Connection(object): auto_declare=not exchange.passive, routing_key=routing_key) - # NOTE(sileht): no need to wait more, caller expects - # a answer before timeout is reached - transport_timeout = timeout - - heartbeat_timeout = self.heartbeat_timeout_threshold - if (self._heartbeat_supported_and_enabled() and ( - transport_timeout is None or - transport_timeout > heartbeat_timeout)): - # NOTE(sileht): we are supposed to send heartbeat every - # heartbeat_timeout, no need to wait more otherwise will - # disconnect us, so raise timeout earlier ourself - transport_timeout = heartbeat_timeout - log_info = {'msg': msg, 'who': exchange or 'default', 'key': routing_key} LOG.trace('Connection._publish: sending message %(msg)s to' ' %(who)s with routing key %(key)s', log_info) - with self._transport_socket_timeout(transport_timeout): + + # NOTE(sileht): no need to wait more, caller expects + # a answer before timeout is reached + with self._transport_socket_timeout(timeout): producer.publish(msg, expiration=self._get_expiration(timeout), compression=self.kombu_compression) diff --git a/oslo_messaging/tests/drivers/test_impl_rabbit.py b/oslo_messaging/tests/drivers/test_impl_rabbit.py index b99ba1fe3..18181843c 100644 --- a/oslo_messaging/tests/drivers/test_impl_rabbit.py +++ b/oslo_messaging/tests/drivers/test_impl_rabbit.py @@ -92,11 +92,11 @@ class TestHeartbeat(test_utils.BaseTestCase): if not heartbeat_side_effect: self.assertEqual(1, fake_ensure_connection.call_count) - self.assertEqual(3, fake_logger.debug.call_count) + self.assertEqual(2, fake_logger.debug.call_count) self.assertEqual(0, fake_logger.info.call_count) else: self.assertEqual(2, fake_ensure_connection.call_count) - self.assertEqual(3, fake_logger.debug.call_count) + self.assertEqual(2, fake_logger.debug.call_count) self.assertEqual(1, fake_logger.info.call_count) self.assertIn(mock.call(info, mock.ANY), fake_logger.info.mock_calls)