Retry download of object body
Currently the swift client retries establishing a connection to the server (by default up to 5 times). However, when downloading an object, once the connection has been established and the inital headers have been returned, no attempt is made to retry. So, for example, if 99MB of a 100MB object have been downloaded and the connection is then lost, the download will fail. This patch changes the behaviour to re-establish the connection and fetch the remaining bytes using the 'Range' header to offset. Data retry is not yet supported if the original request is for a subset of the object data (ie uses the 'Range' header), or if resp_chunk_size has not been set. The object's etag is checked using If-Match to make sure the object data hasn't changed since the start of the download. Change-Id: Iab47f10081ff39f6d344dbc2479cbc3bfd1c5b29
This commit is contained in:
parent
d1e3109588
commit
4af623bcf1
@ -187,7 +187,7 @@ class _ObjectBody(object):
|
||||
return self
|
||||
|
||||
def next(self):
|
||||
buf = self.resp.read(self.chunk_size)
|
||||
buf = self.read(self.chunk_size)
|
||||
if not buf:
|
||||
raise StopIteration()
|
||||
return buf
|
||||
@ -196,6 +196,67 @@ class _ObjectBody(object):
|
||||
return self.next()
|
||||
|
||||
|
||||
class _RetryBody(_ObjectBody):
|
||||
"""
|
||||
Wrapper for object body response which triggers a retry
|
||||
(from offset) if the connection is dropped after partially
|
||||
downloading the object.
|
||||
"""
|
||||
def __init__(self, resp, expected_length, etag, connection, container, obj,
|
||||
resp_chunk_size=None, query_string=None, response_dict=None,
|
||||
headers=None):
|
||||
"""
|
||||
Wrap the underlying response
|
||||
|
||||
:param resp: the response to wrap
|
||||
:param expected_length: the object size in bytes
|
||||
:param etag: the object's etag
|
||||
:param connection: Connection class instance
|
||||
:param container: the name of the container the object is in
|
||||
:param obj: the name of object we are downloading
|
||||
:param resp_chunk_size: if defined, chunk size of data to read
|
||||
:param query_string: if set will be appended with '?' to generated path
|
||||
:param response_dict: an optional dictionary into which to place
|
||||
the response - status, reason and headers
|
||||
:param headers: an optional dictionary with additional headers to
|
||||
include in the request
|
||||
"""
|
||||
super(_RetryBody, self).__init__(resp, resp_chunk_size)
|
||||
self.expected_length = expected_length
|
||||
self.expected_etag = etag
|
||||
self.conn = connection
|
||||
self.container = container
|
||||
self.obj = obj
|
||||
self.query_string = query_string
|
||||
self.response_dict = response_dict
|
||||
self.headers = headers if headers is not None else {}
|
||||
self.bytes_read = 0
|
||||
|
||||
def read(self, length=None):
|
||||
buf = None
|
||||
try:
|
||||
buf = self.resp.read(length)
|
||||
self.bytes_read += len(buf)
|
||||
except (socket.error, RequestException) as e:
|
||||
if self.conn.attempts > self.conn.retries:
|
||||
logger.exception(e)
|
||||
raise
|
||||
if (not buf and self.bytes_read < self.expected_length and
|
||||
self.conn.attempts <= self.conn.retries):
|
||||
self.headers['Range'] = 'bytes=%d-' % self.bytes_read
|
||||
self.headers['If-Match'] = self.expected_etag
|
||||
hdrs, body = self.conn._retry(None, get_object,
|
||||
self.container, self.obj,
|
||||
resp_chunk_size=self.chunk_size,
|
||||
query_string=self.query_string,
|
||||
response_dict=self.response_dict,
|
||||
headers=self.headers,
|
||||
attempts=self.conn.attempts)
|
||||
self.resp = body
|
||||
buf = self.read(length)
|
||||
return buf
|
||||
|
||||
|
||||
class HTTPConnection(object):
|
||||
def __init__(self, url, proxy=None, cacert=None, insecure=False,
|
||||
ssl_compression=False, default_user_agent=None, timeout=None):
|
||||
@ -1408,10 +1469,10 @@ class Connection(object):
|
||||
target_dict.update(response_dict)
|
||||
|
||||
def _retry(self, reset_func, func, *args, **kwargs):
|
||||
self.attempts = 0
|
||||
retried_auth = False
|
||||
backoff = self.starting_backoff
|
||||
caller_response_dict = kwargs.pop('response_dict', None)
|
||||
self.attempts = kwargs.pop('attempts', 0)
|
||||
while self.attempts <= self.retries:
|
||||
self.attempts += 1
|
||||
try:
|
||||
@ -1523,10 +1584,25 @@ class Connection(object):
|
||||
def get_object(self, container, obj, resp_chunk_size=None,
|
||||
query_string=None, response_dict=None, headers=None):
|
||||
"""Wrapper for :func:`get_object`"""
|
||||
return self._retry(None, get_object, container, obj,
|
||||
resp_chunk_size=resp_chunk_size,
|
||||
query_string=query_string,
|
||||
response_dict=response_dict, headers=headers)
|
||||
rheaders, body = self._retry(None, get_object, container, obj,
|
||||
resp_chunk_size=resp_chunk_size,
|
||||
query_string=query_string,
|
||||
response_dict=response_dict,
|
||||
headers=headers)
|
||||
is_not_range_request = (
|
||||
not headers or 'range' not in (k.lower() for k in headers))
|
||||
retry_is_possible = (
|
||||
is_not_range_request and resp_chunk_size and
|
||||
self.attempts <= self.retries)
|
||||
if retry_is_possible:
|
||||
body = _RetryBody(body.resp, int(rheaders['content-length']),
|
||||
rheaders['etag'],
|
||||
self, container, obj,
|
||||
resp_chunk_size=resp_chunk_size,
|
||||
query_string=query_string,
|
||||
response_dict=response_dict,
|
||||
headers=headers)
|
||||
return rheaders, body
|
||||
|
||||
def put_object(self, container, obj, contents, content_length=None,
|
||||
etag=None, chunk_size=None, content_type=None,
|
||||
|
@ -333,6 +333,58 @@ class TestFunctional(testtools.TestCase):
|
||||
hdrs, body = self.conn.get_object(self.containername, self.objectname)
|
||||
self.assertEqual("should tolerate empty chunks", body)
|
||||
|
||||
def test_download_object_retry_chunked(self):
|
||||
resp_chunk_size = 2
|
||||
hdrs, body = self.conn.get_object(self.containername,
|
||||
self.objectname,
|
||||
resp_chunk_size=resp_chunk_size)
|
||||
data = next(body)
|
||||
self.assertEqual(self.test_data[:resp_chunk_size], data)
|
||||
self.assertTrue(1, self.conn.attempts)
|
||||
for chunk in body.resp:
|
||||
# Flush remaining data from underlying response
|
||||
# (simulate a dropped connection)
|
||||
pass
|
||||
# Trigger the retry
|
||||
for chunk in body:
|
||||
data += chunk
|
||||
self.assertEqual(self.test_data, data)
|
||||
self.assertEqual(2, self.conn.attempts)
|
||||
|
||||
def test_download_object_retry_chunked_auth_failure(self):
|
||||
resp_chunk_size = 2
|
||||
self.conn.token = 'invalid'
|
||||
hdrs, body = self.conn.get_object(self.containername,
|
||||
self.objectname,
|
||||
resp_chunk_size=resp_chunk_size)
|
||||
self.assertEqual(2, self.conn.attempts)
|
||||
for chunk in body.resp:
|
||||
# Flush remaining data from underlying response
|
||||
# (simulate a dropped connection)
|
||||
pass
|
||||
|
||||
self.conn.token = 'invalid'
|
||||
data = next(body)
|
||||
self.assertEqual(4, self.conn.attempts)
|
||||
|
||||
for chunk in body:
|
||||
data += chunk
|
||||
|
||||
self.assertEqual(self.test_data, data)
|
||||
self.assertEqual(4, self.conn.attempts)
|
||||
|
||||
def test_download_object_non_chunked(self):
|
||||
hdrs, body = self.conn.get_object(self.containername, self.objectname)
|
||||
data = body
|
||||
self.assertEqual(self.test_data, data)
|
||||
self.assertTrue(1, self.conn.attempts)
|
||||
|
||||
hdrs, body = self.conn.get_object(self.containername, self.objectname,
|
||||
resp_chunk_size=0)
|
||||
data = body
|
||||
self.assertEqual(self.test_data, data)
|
||||
self.assertTrue(1, self.conn.attempts)
|
||||
|
||||
def test_post_account(self):
|
||||
self.conn.post_account({'x-account-meta-data': 'Something'})
|
||||
headers = self.conn.head_account()
|
||||
|
@ -835,6 +835,23 @@ class TestGetObject(MockHttpTest):
|
||||
self.assertRaises(StopIteration, next, resp)
|
||||
self.assertEqual(resp.read(), '')
|
||||
|
||||
def test_get_object_with_resp_chunk_size_zero(self):
|
||||
def get_connection(self):
|
||||
def get_auth():
|
||||
return 'http://auth.test.com', 'token'
|
||||
|
||||
conn = c.Connection('http://www.test.com', 'asdf', 'asdf')
|
||||
self.assertIs(type(conn), c.Connection)
|
||||
conn.get_auth = get_auth
|
||||
self.assertEqual(conn.attempts, 0)
|
||||
return conn
|
||||
|
||||
with mock.patch('swiftclient.client.http_connection',
|
||||
self.fake_http_connection(200)):
|
||||
conn = get_connection(self)
|
||||
conn.get_object('container1', 'obj1', resp_chunk_size=0)
|
||||
self.assertEqual(conn.attempts, 1)
|
||||
|
||||
|
||||
class TestHeadObject(MockHttpTest):
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user