From 4af623bcf171a63240849b84b9359a4f74471455 Mon Sep 17 00:00:00 2001 From: Stuart McLaren <stuart.mclaren@hp.com> Date: Wed, 4 Mar 2015 14:31:00 +0000 Subject: [PATCH] Retry download of object body Currently the swift client retries establishing a connection to the server (by default up to 5 times). However, when downloading an object, once the connection has been established and the inital headers have been returned, no attempt is made to retry. So, for example, if 99MB of a 100MB object have been downloaded and the connection is then lost, the download will fail. This patch changes the behaviour to re-establish the connection and fetch the remaining bytes using the 'Range' header to offset. Data retry is not yet supported if the original request is for a subset of the object data (ie uses the 'Range' header), or if resp_chunk_size has not been set. The object's etag is checked using If-Match to make sure the object data hasn't changed since the start of the download. Change-Id: Iab47f10081ff39f6d344dbc2479cbc3bfd1c5b29 --- swiftclient/client.py | 88 ++++++++++++++++++++++++++-- tests/functional/test_swiftclient.py | 52 ++++++++++++++++ tests/unit/test_swiftclient.py | 17 ++++++ 3 files changed, 151 insertions(+), 6 deletions(-) diff --git a/swiftclient/client.py b/swiftclient/client.py index aba986e6..406149f4 100644 --- a/swiftclient/client.py +++ b/swiftclient/client.py @@ -187,7 +187,7 @@ class _ObjectBody(object): return self def next(self): - buf = self.resp.read(self.chunk_size) + buf = self.read(self.chunk_size) if not buf: raise StopIteration() return buf @@ -196,6 +196,67 @@ class _ObjectBody(object): return self.next() +class _RetryBody(_ObjectBody): + """ + Wrapper for object body response which triggers a retry + (from offset) if the connection is dropped after partially + downloading the object. + """ + def __init__(self, resp, expected_length, etag, connection, container, obj, + resp_chunk_size=None, query_string=None, response_dict=None, + headers=None): + """ + Wrap the underlying response + + :param resp: the response to wrap + :param expected_length: the object size in bytes + :param etag: the object's etag + :param connection: Connection class instance + :param container: the name of the container the object is in + :param obj: the name of object we are downloading + :param resp_chunk_size: if defined, chunk size of data to read + :param query_string: if set will be appended with '?' to generated path + :param response_dict: an optional dictionary into which to place + the response - status, reason and headers + :param headers: an optional dictionary with additional headers to + include in the request + """ + super(_RetryBody, self).__init__(resp, resp_chunk_size) + self.expected_length = expected_length + self.expected_etag = etag + self.conn = connection + self.container = container + self.obj = obj + self.query_string = query_string + self.response_dict = response_dict + self.headers = headers if headers is not None else {} + self.bytes_read = 0 + + def read(self, length=None): + buf = None + try: + buf = self.resp.read(length) + self.bytes_read += len(buf) + except (socket.error, RequestException) as e: + if self.conn.attempts > self.conn.retries: + logger.exception(e) + raise + if (not buf and self.bytes_read < self.expected_length and + self.conn.attempts <= self.conn.retries): + self.headers['Range'] = 'bytes=%d-' % self.bytes_read + self.headers['If-Match'] = self.expected_etag + hdrs, body = self.conn._retry(None, get_object, + self.container, self.obj, + resp_chunk_size=self.chunk_size, + query_string=self.query_string, + response_dict=self.response_dict, + headers=self.headers, + attempts=self.conn.attempts) + self.resp = body + buf = self.read(length) + return buf + + class HTTPConnection(object): def __init__(self, url, proxy=None, cacert=None, insecure=False, ssl_compression=False, default_user_agent=None, timeout=None): @@ -1408,10 +1469,10 @@ class Connection(object): target_dict.update(response_dict) def _retry(self, reset_func, func, *args, **kwargs): - self.attempts = 0 retried_auth = False backoff = self.starting_backoff caller_response_dict = kwargs.pop('response_dict', None) + self.attempts = kwargs.pop('attempts', 0) while self.attempts <= self.retries: self.attempts += 1 try: @@ -1523,10 +1584,25 @@ class Connection(object): def get_object(self, container, obj, resp_chunk_size=None, query_string=None, response_dict=None, headers=None): """Wrapper for :func:`get_object`""" - return self._retry(None, get_object, container, obj, - resp_chunk_size=resp_chunk_size, - query_string=query_string, - response_dict=response_dict, headers=headers) + rheaders, body = self._retry(None, get_object, container, obj, + resp_chunk_size=resp_chunk_size, + query_string=query_string, + response_dict=response_dict, + headers=headers) + is_not_range_request = ( + not headers or 'range' not in (k.lower() for k in headers)) + retry_is_possible = ( + is_not_range_request and resp_chunk_size and + self.attempts <= self.retries) + if retry_is_possible: + body = _RetryBody(body.resp, int(rheaders['content-length']), + rheaders['etag'], + self, container, obj, + resp_chunk_size=resp_chunk_size, + query_string=query_string, + response_dict=response_dict, + headers=headers) + return rheaders, body def put_object(self, container, obj, contents, content_length=None, etag=None, chunk_size=None, content_type=None, diff --git a/tests/functional/test_swiftclient.py b/tests/functional/test_swiftclient.py index ef1b9d52..5f9e271f 100644 --- a/tests/functional/test_swiftclient.py +++ b/tests/functional/test_swiftclient.py @@ -333,6 +333,58 @@ class TestFunctional(testtools.TestCase): hdrs, body = self.conn.get_object(self.containername, self.objectname) self.assertEqual("should tolerate empty chunks", body) + def test_download_object_retry_chunked(self): + resp_chunk_size = 2 + hdrs, body = self.conn.get_object(self.containername, + self.objectname, + resp_chunk_size=resp_chunk_size) + data = next(body) + self.assertEqual(self.test_data[:resp_chunk_size], data) + self.assertTrue(1, self.conn.attempts) + for chunk in body.resp: + # Flush remaining data from underlying response + # (simulate a dropped connection) + pass + # Trigger the retry + for chunk in body: + data += chunk + self.assertEqual(self.test_data, data) + self.assertEqual(2, self.conn.attempts) + + def test_download_object_retry_chunked_auth_failure(self): + resp_chunk_size = 2 + self.conn.token = 'invalid' + hdrs, body = self.conn.get_object(self.containername, + self.objectname, + resp_chunk_size=resp_chunk_size) + self.assertEqual(2, self.conn.attempts) + for chunk in body.resp: + # Flush remaining data from underlying response + # (simulate a dropped connection) + pass + + self.conn.token = 'invalid' + data = next(body) + self.assertEqual(4, self.conn.attempts) + + for chunk in body: + data += chunk + + self.assertEqual(self.test_data, data) + self.assertEqual(4, self.conn.attempts) + + def test_download_object_non_chunked(self): + hdrs, body = self.conn.get_object(self.containername, self.objectname) + data = body + self.assertEqual(self.test_data, data) + self.assertTrue(1, self.conn.attempts) + + hdrs, body = self.conn.get_object(self.containername, self.objectname, + resp_chunk_size=0) + data = body + self.assertEqual(self.test_data, data) + self.assertTrue(1, self.conn.attempts) + def test_post_account(self): self.conn.post_account({'x-account-meta-data': 'Something'}) headers = self.conn.head_account() diff --git a/tests/unit/test_swiftclient.py b/tests/unit/test_swiftclient.py index 1f6a7707..56d3ff86 100644 --- a/tests/unit/test_swiftclient.py +++ b/tests/unit/test_swiftclient.py @@ -835,6 +835,23 @@ class TestGetObject(MockHttpTest): self.assertRaises(StopIteration, next, resp) self.assertEqual(resp.read(), '') + def test_get_object_with_resp_chunk_size_zero(self): + def get_connection(self): + def get_auth(): + return 'http://auth.test.com', 'token' + + conn = c.Connection('http://www.test.com', 'asdf', 'asdf') + self.assertIs(type(conn), c.Connection) + conn.get_auth = get_auth + self.assertEqual(conn.attempts, 0) + return conn + + with mock.patch('swiftclient.client.http_connection', + self.fake_http_connection(200)): + conn = get_connection(self) + conn.get_object('container1', 'obj1', resp_chunk_size=0) + self.assertEqual(conn.attempts, 1) + class TestHeadObject(MockHttpTest):