Allow retrying some failed requests

Connection Errors can be transient and there are many clients (including
auth_token middleware) that allow retrying requests that fail.

We should support this in the session, disabled by default, rather than
have multiple implementations for it.

For the moment I have purposefully not added it as an option to
Session.__init__ though I can see arguments for it. This can be added
later if there becomes a particular need.

I have also purposefully distinguished between Connection Errors (and
connect_retries) and HTTP errors. I don't know a good way to generalize
retrying on HTTP errors and they can be added later if required.

Blueprint: session-retries
Change-Id: Ia219636663980433ddb9c00c6df7c8477df4ef99
This commit is contained in:
Jamie Lennox
2014-08-31 11:33:42 +10:00
parent 3305c7be4b
commit b5a435b9ab
3 changed files with 99 additions and 18 deletions

View File

@@ -27,7 +27,8 @@ class Adapter(object):
@utils.positional() @utils.positional()
def __init__(self, session, service_type=None, service_name=None, def __init__(self, session, service_type=None, service_name=None,
interface=None, region_name=None, endpoint_override=None, interface=None, region_name=None, endpoint_override=None,
version=None, auth=None, user_agent=None): version=None, auth=None, user_agent=None,
connect_retries=None):
"""Create a new adapter. """Create a new adapter.
:param Session session: The session object to wrap. :param Session session: The session object to wrap.
@@ -41,6 +42,10 @@ class Adapter(object):
:param auth.BaseAuthPlugin auth: An auth plugin to use instead of the :param auth.BaseAuthPlugin auth: An auth plugin to use instead of the
session one. session one.
:param str user_agent: The User-Agent string to set. :param str user_agent: The User-Agent string to set.
:param int connect_retries: the maximum number of retries that should
be attempted for connection errors.
Default None - use session default which
is don't retry.
""" """
self.session = session self.session = session
self.service_type = service_type self.service_type = service_type
@@ -51,6 +56,7 @@ class Adapter(object):
self.version = version self.version = version
self.user_agent = user_agent self.user_agent = user_agent
self.auth = auth self.auth = auth
self.connect_retries = connect_retries
def _set_endpoint_filter_kwargs(self, kwargs): def _set_endpoint_filter_kwargs(self, kwargs):
if self.service_type: if self.service_type:
@@ -75,6 +81,8 @@ class Adapter(object):
kwargs.setdefault('auth', self.auth) kwargs.setdefault('auth', self.auth)
if self.user_agent: if self.user_agent:
kwargs.setdefault('user_agent', self.user_agent) kwargs.setdefault('user_agent', self.user_agent)
if self.connect_retries is not None:
kwargs.setdefault('connect_retries', self.connect_retries)
return self.session.request(url, method, **kwargs) return self.session.request(url, method, **kwargs)

View File

@@ -11,8 +11,10 @@
# under the License. # under the License.
import argparse import argparse
import functools
import logging import logging
import os import os
import time
from oslo.config import cfg from oslo.config import cfg
import requests import requests
@@ -184,7 +186,7 @@ class Session(object):
user_agent=None, redirect=None, authenticated=None, user_agent=None, redirect=None, authenticated=None,
endpoint_filter=None, auth=None, requests_auth=None, endpoint_filter=None, auth=None, requests_auth=None,
raise_exc=True, allow_reauth=True, log=True, raise_exc=True, allow_reauth=True, log=True,
endpoint_override=None, **kwargs): endpoint_override=None, connect_retries=0, **kwargs):
"""Send an HTTP request with the specified characteristics. """Send an HTTP request with the specified characteristics.
Wrapper around `requests.Session.request` to handle tasks such as Wrapper around `requests.Session.request` to handle tasks such as
@@ -210,6 +212,9 @@ class Session(object):
can be followed by a request. Either an can be followed by a request. Either an
integer for a specific count or True/False integer for a specific count or True/False
for forever/never. (optional) for forever/never. (optional)
:param int connect_retries: the maximum number of retries that should
be attempted for connection errors.
(optional, defaults to 0 - never retry).
:param bool authenticated: True if a token should be attached to this :param bool authenticated: True if a token should be attached to this
request, False if not or None for attach if request, False if not or None for attach if
an auth_plugin is available. an auth_plugin is available.
@@ -322,7 +327,9 @@ class Session(object):
if redirect is None: if redirect is None:
redirect = self.redirect redirect = self.redirect
resp = self._send_request(url, method, redirect, log, **kwargs) send = functools.partial(self._send_request,
url, method, redirect, log, connect_retries)
resp = send(**kwargs)
# handle getting a 401 Unauthorized response by invalidating the plugin # handle getting a 401 Unauthorized response by invalidating the plugin
# and then retrying the request. This is only tried once. # and then retrying the request. This is only tried once.
@@ -331,8 +338,7 @@ class Session(object):
token = self.get_token(auth) token = self.get_token(auth)
if token: if token:
headers['X-Auth-Token'] = token headers['X-Auth-Token'] = token
resp = self._send_request(url, method, redirect, log, resp = send(**kwargs)
**kwargs)
if raise_exc and resp.status_code >= 400: if raise_exc and resp.status_code >= 400:
_logger.debug('Request returned failure status: %s', _logger.debug('Request returned failure status: %s',
@@ -341,12 +347,20 @@ class Session(object):
return resp return resp
def _send_request(self, url, method, redirect, log, **kwargs): def _send_request(self, url, method, redirect, log, connect_retries,
connect_retry_delay=0.5, **kwargs):
# NOTE(jamielennox): We handle redirection manually because the # NOTE(jamielennox): We handle redirection manually because the
# requests lib follows some browser patterns where it will redirect # requests lib follows some browser patterns where it will redirect
# POSTs as GETs for certain statuses which is not want we want for an # POSTs as GETs for certain statuses which is not want we want for an
# API. See: https://en.wikipedia.org/wiki/Post/Redirect/Get # API. See: https://en.wikipedia.org/wiki/Post/Redirect/Get
# NOTE(jamielennox): The interaction between retries and redirects are
# handled naively. We will attempt only a maximum number of retries and
# redirects rather than per request limits. Otherwise the extreme case
# could be redirects * retries requests. This will be sufficient in
# most cases and can be fixed properly if there's ever a need.
try:
try: try:
resp = self.session.request(method, url, **kwargs) resp = self.session.request(method, url, **kwargs)
except requests.exceptions.SSLError: except requests.exceptions.SSLError:
@@ -358,6 +372,19 @@ class Session(object):
except requests.exceptions.ConnectionError: except requests.exceptions.ConnectionError:
msg = 'Unable to establish connection to %s' % url msg = 'Unable to establish connection to %s' % url
raise exceptions.ConnectionRefused(msg) raise exceptions.ConnectionRefused(msg)
except (exceptions.RequestTimeout, exceptions.ConnectionRefused) as e:
if connect_retries <= 0:
raise
_logger.info('Failure: %s. Retrying in %.1fs.',
e, connect_retry_delay)
time.sleep(connect_retry_delay)
return self._send_request(
url, method, redirect, log,
connect_retries=connect_retries - 1,
connect_retry_delay=connect_retry_delay * 2,
**kwargs)
if log: if log:
self._http_log_response(response=resp) self._http_log_response(response=resp)
@@ -379,7 +406,11 @@ class Session(object):
_logger.warn("Failed to redirect request to %s as new " _logger.warn("Failed to redirect request to %s as new "
"location was not provided.", resp.url) "location was not provided.", resp.url)
else: else:
new_resp = self._send_request(location, method, redirect, log, # NOTE(jamielennox): We don't pass through connect_retry_delay.
# This request actually worked so we can reset the delay count.
new_resp = self._send_request(
location, method, redirect, log,
connect_retries=connect_retries,
**kwargs) **kwargs)
if not isinstance(new_resp.history, list): if not isinstance(new_resp.history, list):

View File

@@ -163,6 +163,29 @@ class SessionTests(utils.TestCase):
self.assertIn(k, self.logger.output) self.assertIn(k, self.logger.output)
self.assertNotIn(v, self.logger.output) self.assertNotIn(v, self.logger.output)
def test_connect_retries(self):
def _timeout_error(request, context):
raise requests.exceptions.Timeout()
self.stub_url('GET', text=_timeout_error)
session = client_session.Session()
retries = 3
with mock.patch('time.sleep') as m:
self.assertRaises(exceptions.RequestTimeout,
session.get,
self.TEST_URL, connect_retries=retries)
self.assertEqual(retries, m.call_count)
# 3 retries finishing with 2.0 means 0.5, 1.0 and 2.0
m.assert_called_with(2.0)
# we count retries so there will be one initial request + 3 retries
self.assertThat(self.requests.request_history,
matchers.HasLength(retries + 1))
class RedirectTests(utils.TestCase): class RedirectTests(utils.TestCase):
@@ -674,6 +697,25 @@ class AdapterTest(utils.TestCase):
self.assertEqual(self.TEST_TOKEN, adpt.get_token()) self.assertEqual(self.TEST_TOKEN, adpt.get_token())
self.assertTrue(auth.get_token_called) self.assertTrue(auth.get_token_called)
def test_adapter_connect_retries(self):
retries = 2
sess = client_session.Session()
adpt = adapter.Adapter(sess, connect_retries=retries)
def _refused_error(request, context):
raise requests.exceptions.ConnectionError()
self.stub_url('GET', text=_refused_error)
with mock.patch('time.sleep') as m:
self.assertRaises(exceptions.ConnectionRefused,
adpt.get, self.TEST_URL)
self.assertEqual(retries, m.call_count)
# we count retries so there will be one initial request + 2 retries
self.assertThat(self.requests.request_history,
matchers.HasLength(retries + 1))
class ConfLoadingTests(utils.TestCase): class ConfLoadingTests(utils.TestCase):