Merge "Compare each chunk of large objects when downloading"
This commit is contained in:
commit
49483a3b11
@ -974,8 +974,7 @@ class SwiftService(object):
|
||||
for o_down in interruptable_as_completed(o_downs):
|
||||
yield o_down.result()
|
||||
|
||||
@staticmethod
|
||||
def _download_object_job(conn, container, obj, options):
|
||||
def _download_object_job(self, conn, container, obj, options):
|
||||
out_file = options['out_file']
|
||||
results_dict = {}
|
||||
|
||||
@ -984,7 +983,10 @@ class SwiftService(object):
|
||||
pseudodir = False
|
||||
path = join(container, obj) if options['yes_all'] else obj
|
||||
path = path.lstrip(os_path_sep)
|
||||
if options['skip_identical'] and out_file != '-':
|
||||
options['skip_identical'] = (options['skip_identical'] and
|
||||
out_file != '-')
|
||||
|
||||
if options['skip_identical']:
|
||||
filename = out_file if out_file else path
|
||||
try:
|
||||
fp = open(filename, 'rb')
|
||||
@ -1002,10 +1004,55 @@ class SwiftService(object):
|
||||
|
||||
try:
|
||||
start_time = time()
|
||||
headers, body = \
|
||||
conn.get_object(container, obj, resp_chunk_size=65536,
|
||||
headers=req_headers,
|
||||
response_dict=results_dict)
|
||||
get_args = {'resp_chunk_size': 65536,
|
||||
'headers': req_headers,
|
||||
'response_dict': results_dict}
|
||||
if options['skip_identical']:
|
||||
# Assume the file is a large object; if we're wrong, the query
|
||||
# string is ignored and the If-None-Match header will trigger
|
||||
# the behavior we want
|
||||
get_args['query_string'] = 'multipart-manifest=get'
|
||||
|
||||
try:
|
||||
headers, body = conn.get_object(container, obj, **get_args)
|
||||
except ClientException as e:
|
||||
if not options['skip_identical']:
|
||||
raise
|
||||
if e.http_status != 304: # Only handling Not Modified
|
||||
raise
|
||||
|
||||
headers = results_dict['headers']
|
||||
if 'x-object-manifest' in headers:
|
||||
# DLO: most likely it has more than one page worth of
|
||||
# segments and we have an empty file locally
|
||||
body = []
|
||||
elif config_true_value(headers.get('x-static-large-object')):
|
||||
# SLO: apparently we have a copy of the manifest locally?
|
||||
# provide no chunking data to force a fresh download
|
||||
body = [b'[]']
|
||||
else:
|
||||
# Normal object: let it bubble up
|
||||
raise
|
||||
|
||||
if options['skip_identical']:
|
||||
if config_true_value(headers.get('x-static-large-object')) or \
|
||||
'x-object-manifest' in headers:
|
||||
# The request was chunked, so stitch it back together
|
||||
chunk_data = self._get_chunk_data(conn, container, obj,
|
||||
headers, b''.join(body))
|
||||
else:
|
||||
chunk_data = None
|
||||
|
||||
if chunk_data is not None:
|
||||
if self._is_identical(chunk_data, filename):
|
||||
raise ClientException('Large object is identical',
|
||||
http_status=304)
|
||||
|
||||
# Large objects are different; start the real download
|
||||
del get_args['query_string']
|
||||
get_args['response_dict'].clear()
|
||||
headers, body = conn.get_object(container, obj, **get_args)
|
||||
|
||||
headers_receipt = time()
|
||||
|
||||
obj_body = _SwiftReader(path, body, headers)
|
||||
@ -1503,7 +1550,7 @@ class SwiftService(object):
|
||||
results_queue.put(res)
|
||||
return res
|
||||
|
||||
def _get_chunk_data(self, conn, container, obj, headers):
|
||||
def _get_chunk_data(self, conn, container, obj, headers, manifest=None):
|
||||
chunks = []
|
||||
if 'x-object-manifest' in headers:
|
||||
scontainer, sprefix = headers['x-object-manifest'].split('/', 1)
|
||||
@ -1513,10 +1560,11 @@ class SwiftService(object):
|
||||
else:
|
||||
raise part["error"]
|
||||
elif config_true_value(headers.get('x-static-large-object')):
|
||||
manifest_headers, manifest_data = conn.get_object(
|
||||
container, obj, query_string='multipart-manifest=get')
|
||||
manifest_data = parse_api_response(manifest_headers, manifest_data)
|
||||
for chunk in manifest_data:
|
||||
if manifest is None:
|
||||
headers, manifest = conn.get_object(
|
||||
container, obj, query_string='multipart-manifest=get')
|
||||
manifest = parse_api_response(headers, manifest)
|
||||
for chunk in manifest:
|
||||
if chunk.get('sub_slo'):
|
||||
scont, sobj = chunk['name'].lstrip('/').split('/', 1)
|
||||
chunks.extend(self._get_chunk_data(
|
||||
|
@ -988,3 +988,368 @@ class TestServiceUpload(testtools.TestCase):
|
||||
marker="test_o/prefix/02", delimiter=None),
|
||||
]
|
||||
mock_conn.get_container.assert_has_calls(expected)
|
||||
|
||||
|
||||
class TestServiceDownload(testtools.TestCase):
|
||||
|
||||
def _assertDictEqual(self, a, b, m=None):
|
||||
# assertDictEqual is not available in py2.6 so use a shallow check
|
||||
# instead
|
||||
if not m:
|
||||
m = '{0} != {1}'.format(a, b)
|
||||
|
||||
if hasattr(self, 'assertDictEqual'):
|
||||
self.assertDictEqual(a, b, m)
|
||||
else:
|
||||
self.assertTrue(isinstance(a, dict), m)
|
||||
self.assertTrue(isinstance(b, dict), m)
|
||||
self.assertEqual(len(a), len(b), m)
|
||||
for k, v in a.items():
|
||||
self.assertIn(k, b, m)
|
||||
self.assertEqual(b[k], v, m)
|
||||
|
||||
def test_download_object_job_skip_identical(self):
|
||||
with tempfile.NamedTemporaryFile() as f:
|
||||
f.write(b'a' * 30)
|
||||
f.flush()
|
||||
|
||||
err = swiftclient.ClientException('Object GET failed',
|
||||
http_status=304)
|
||||
|
||||
def fake_get(*args, **kwargs):
|
||||
kwargs['response_dict']['headers'] = {}
|
||||
raise err
|
||||
|
||||
mock_conn = mock.Mock()
|
||||
mock_conn.get_object.side_effect = fake_get
|
||||
type(mock_conn).attempts = mock.PropertyMock(return_value=2)
|
||||
expected_r = {
|
||||
'action': 'download_object',
|
||||
'container': 'test_c',
|
||||
'object': 'test_o',
|
||||
'success': False,
|
||||
'error': err,
|
||||
'response_dict': {'headers': {}},
|
||||
'path': 'test_o',
|
||||
'pseudodir': False,
|
||||
'attempts': 2,
|
||||
}
|
||||
|
||||
s = SwiftService()
|
||||
r = s._download_object_job(conn=mock_conn,
|
||||
container='test_c',
|
||||
obj='test_o',
|
||||
options={'out_file': f.name,
|
||||
'header': {},
|
||||
'yes_all': False,
|
||||
'skip_identical': True})
|
||||
self._assertDictEqual(r, expected_r)
|
||||
|
||||
self.assertEqual(mock_conn.get_object.call_count, 1)
|
||||
mock_conn.get_object.assert_called_with(
|
||||
'test_c',
|
||||
'test_o',
|
||||
resp_chunk_size=65536,
|
||||
headers={'If-None-Match': md5(b'a' * 30).hexdigest()},
|
||||
query_string='multipart-manifest=get',
|
||||
response_dict=expected_r['response_dict'])
|
||||
|
||||
def test_download_object_job_skip_identical_dlo(self):
|
||||
with tempfile.NamedTemporaryFile() as f:
|
||||
f.write(b'a' * 30)
|
||||
f.flush()
|
||||
on_disk_md5 = md5(b'a' * 30).hexdigest()
|
||||
segment_md5 = md5(b'a' * 10).hexdigest()
|
||||
|
||||
mock_conn = mock.Mock()
|
||||
mock_conn.get_object.return_value = (
|
||||
{'x-object-manifest': 'test_c_segments/test_o/prefix'}, [b''])
|
||||
mock_conn.get_container.side_effect = [
|
||||
(None, [{'name': 'test_o/prefix/1',
|
||||
'bytes': 10, 'hash': segment_md5},
|
||||
{'name': 'test_o/prefix/2',
|
||||
'bytes': 10, 'hash': segment_md5}]),
|
||||
(None, [{'name': 'test_o/prefix/3',
|
||||
'bytes': 10, 'hash': segment_md5}]),
|
||||
(None, [])]
|
||||
|
||||
type(mock_conn).attempts = mock.PropertyMock(return_value=2)
|
||||
expected_r = {
|
||||
'action': 'download_object',
|
||||
'container': 'test_c',
|
||||
'object': 'test_o',
|
||||
'success': False,
|
||||
'response_dict': {},
|
||||
'path': 'test_o',
|
||||
'pseudodir': False,
|
||||
'attempts': 2,
|
||||
}
|
||||
|
||||
s = SwiftService()
|
||||
with mock.patch('swiftclient.service.get_conn',
|
||||
return_value=mock_conn):
|
||||
r = s._download_object_job(conn=mock_conn,
|
||||
container='test_c',
|
||||
obj='test_o',
|
||||
options={'out_file': f.name,
|
||||
'header': {},
|
||||
'yes_all': False,
|
||||
'skip_identical': True})
|
||||
|
||||
err = r.pop('error')
|
||||
self.assertEqual("Large object is identical", err.msg)
|
||||
self.assertEqual(304, err.http_status)
|
||||
|
||||
self._assertDictEqual(r, expected_r)
|
||||
|
||||
self.assertEqual(mock_conn.get_object.call_count, 1)
|
||||
mock_conn.get_object.assert_called_with(
|
||||
'test_c',
|
||||
'test_o',
|
||||
resp_chunk_size=65536,
|
||||
headers={'If-None-Match': on_disk_md5},
|
||||
query_string='multipart-manifest=get',
|
||||
response_dict=expected_r['response_dict'])
|
||||
self.assertEqual(mock_conn.get_container.mock_calls, [
|
||||
mock.call('test_c_segments',
|
||||
delimiter=None,
|
||||
prefix='test_o/prefix',
|
||||
marker=''),
|
||||
mock.call('test_c_segments',
|
||||
delimiter=None,
|
||||
prefix='test_o/prefix',
|
||||
marker='test_o/prefix/2'),
|
||||
mock.call('test_c_segments',
|
||||
delimiter=None,
|
||||
prefix='test_o/prefix',
|
||||
marker='test_o/prefix/3')])
|
||||
|
||||
def test_download_object_job_skip_identical_nested_slo(self):
|
||||
with tempfile.NamedTemporaryFile() as f:
|
||||
f.write(b'a' * 30)
|
||||
f.flush()
|
||||
on_disk_md5 = md5(b'a' * 30).hexdigest()
|
||||
|
||||
seg_etag = md5(b'a' * 10).hexdigest()
|
||||
submanifest = "[%s]" % ",".join(
|
||||
['{"bytes":10,"hash":"%s"}' % seg_etag] * 2)
|
||||
submanifest_etag = md5(seg_etag.encode('ascii') * 2).hexdigest()
|
||||
manifest = "[%s]" % ",".join([
|
||||
'{"sub_slo":true,"name":"/test_c_segments/test_sub_slo",'
|
||||
'"bytes":20,"hash":"%s"}' % submanifest_etag,
|
||||
'{"bytes":10,"hash":"%s"}' % seg_etag])
|
||||
|
||||
mock_conn = mock.Mock()
|
||||
mock_conn.get_object.side_effect = [
|
||||
({'x-static-large-object': True,
|
||||
'content-length': 30,
|
||||
'etag': md5(submanifest_etag.encode('ascii') +
|
||||
seg_etag.encode('ascii')).hexdigest()},
|
||||
[manifest.encode('ascii')]),
|
||||
({'x-static-large-object': True,
|
||||
'content-length': 20,
|
||||
'etag': submanifest_etag},
|
||||
submanifest.encode('ascii'))]
|
||||
|
||||
type(mock_conn).attempts = mock.PropertyMock(return_value=2)
|
||||
expected_r = {
|
||||
'action': 'download_object',
|
||||
'container': 'test_c',
|
||||
'object': 'test_o',
|
||||
'success': False,
|
||||
'response_dict': {},
|
||||
'path': 'test_o',
|
||||
'pseudodir': False,
|
||||
'attempts': 2,
|
||||
}
|
||||
|
||||
s = SwiftService()
|
||||
with mock.patch('swiftclient.service.get_conn',
|
||||
return_value=mock_conn):
|
||||
r = s._download_object_job(conn=mock_conn,
|
||||
container='test_c',
|
||||
obj='test_o',
|
||||
options={'out_file': f.name,
|
||||
'header': {},
|
||||
'yes_all': False,
|
||||
'skip_identical': True})
|
||||
|
||||
err = r.pop('error')
|
||||
self.assertEqual("Large object is identical", err.msg)
|
||||
self.assertEqual(304, err.http_status)
|
||||
|
||||
self._assertDictEqual(r, expected_r)
|
||||
self.assertEqual(mock_conn.get_object.mock_calls, [
|
||||
mock.call('test_c',
|
||||
'test_o',
|
||||
resp_chunk_size=65536,
|
||||
headers={'If-None-Match': on_disk_md5},
|
||||
query_string='multipart-manifest=get',
|
||||
response_dict={}),
|
||||
mock.call('test_c_segments',
|
||||
'test_sub_slo',
|
||||
query_string='multipart-manifest=get')])
|
||||
|
||||
def test_download_object_job_skip_identical_diff_dlo(self):
|
||||
with tempfile.NamedTemporaryFile() as f:
|
||||
f.write(b'a' * 30)
|
||||
f.write(b'b')
|
||||
f.flush()
|
||||
on_disk_md5 = md5(b'a' * 30 + b'b').hexdigest()
|
||||
segment_md5 = md5(b'a' * 10).hexdigest()
|
||||
|
||||
mock_conn = mock.Mock()
|
||||
mock_conn.get_object.side_effect = [
|
||||
({'x-object-manifest': 'test_c_segments/test_o/prefix'},
|
||||
[b'']),
|
||||
({'x-object-manifest': 'test_c_segments/test_o/prefix'},
|
||||
[b'a' * 30])]
|
||||
mock_conn.get_container.side_effect = [
|
||||
(None, [{'name': 'test_o/prefix/1',
|
||||
'bytes': 10, 'hash': segment_md5},
|
||||
{'name': 'test_o/prefix/2',
|
||||
'bytes': 10, 'hash': segment_md5}]),
|
||||
(None, [{'name': 'test_o/prefix/3',
|
||||
'bytes': 10, 'hash': segment_md5}]),
|
||||
(None, [])]
|
||||
|
||||
type(mock_conn).attempts = mock.PropertyMock(return_value=2)
|
||||
type(mock_conn).auth_end_time = mock.PropertyMock(return_value=14)
|
||||
expected_r = {
|
||||
'action': 'download_object',
|
||||
'container': 'test_c',
|
||||
'object': 'test_o',
|
||||
'success': True,
|
||||
'response_dict': {},
|
||||
'path': 'test_o',
|
||||
'pseudodir': False,
|
||||
'read_length': 30,
|
||||
'attempts': 2,
|
||||
'start_time': 0,
|
||||
'headers_receipt': 1,
|
||||
'finish_time': 2,
|
||||
'auth_end_time': mock_conn.auth_end_time,
|
||||
}
|
||||
|
||||
s = SwiftService()
|
||||
with mock.patch('swiftclient.service.time', side_effect=range(3)):
|
||||
with mock.patch('swiftclient.service.get_conn',
|
||||
return_value=mock_conn):
|
||||
r = s._download_object_job(
|
||||
conn=mock_conn,
|
||||
container='test_c',
|
||||
obj='test_o',
|
||||
options={'out_file': f.name,
|
||||
'header': {},
|
||||
'no_download': True,
|
||||
'yes_all': False,
|
||||
'skip_identical': True})
|
||||
|
||||
self._assertDictEqual(r, expected_r)
|
||||
|
||||
self.assertEqual(mock_conn.get_container.mock_calls, [
|
||||
mock.call('test_c_segments',
|
||||
delimiter=None,
|
||||
prefix='test_o/prefix',
|
||||
marker=''),
|
||||
mock.call('test_c_segments',
|
||||
delimiter=None,
|
||||
prefix='test_o/prefix',
|
||||
marker='test_o/prefix/2'),
|
||||
mock.call('test_c_segments',
|
||||
delimiter=None,
|
||||
prefix='test_o/prefix',
|
||||
marker='test_o/prefix/3')])
|
||||
self.assertEqual(mock_conn.get_object.mock_calls, [
|
||||
mock.call('test_c',
|
||||
'test_o',
|
||||
resp_chunk_size=65536,
|
||||
headers={'If-None-Match': on_disk_md5},
|
||||
query_string='multipart-manifest=get',
|
||||
response_dict={}),
|
||||
mock.call('test_c',
|
||||
'test_o',
|
||||
resp_chunk_size=65536,
|
||||
headers={'If-None-Match': on_disk_md5},
|
||||
response_dict={})])
|
||||
|
||||
def test_download_object_job_skip_identical_diff_nested_slo(self):
|
||||
with tempfile.NamedTemporaryFile() as f:
|
||||
f.write(b'a' * 29)
|
||||
f.flush()
|
||||
on_disk_md5 = md5(b'a' * 29).hexdigest()
|
||||
|
||||
seg_etag = md5(b'a' * 10).hexdigest()
|
||||
submanifest = "[%s]" % ",".join(
|
||||
['{"bytes":10,"hash":"%s"}' % seg_etag] * 2)
|
||||
submanifest_etag = md5(seg_etag.encode('ascii') * 2).hexdigest()
|
||||
manifest = "[%s]" % ",".join([
|
||||
'{"sub_slo":true,"name":"/test_c_segments/test_sub_slo",'
|
||||
'"bytes":20,"hash":"%s"}' % submanifest_etag,
|
||||
'{"bytes":10,"hash":"%s"}' % seg_etag])
|
||||
|
||||
mock_conn = mock.Mock()
|
||||
mock_conn.get_object.side_effect = [
|
||||
({'x-static-large-object': True,
|
||||
'content-length': 30,
|
||||
'etag': md5(submanifest_etag.encode('ascii') +
|
||||
seg_etag.encode('ascii')).hexdigest()},
|
||||
[manifest.encode('ascii')]),
|
||||
({'x-static-large-object': True,
|
||||
'content-length': 20,
|
||||
'etag': submanifest_etag},
|
||||
submanifest.encode('ascii')),
|
||||
({'x-static-large-object': True,
|
||||
'content-length': 30,
|
||||
'etag': md5(submanifest_etag.encode('ascii') +
|
||||
seg_etag.encode('ascii')).hexdigest()},
|
||||
[b'a' * 30])]
|
||||
|
||||
type(mock_conn).attempts = mock.PropertyMock(return_value=2)
|
||||
type(mock_conn).auth_end_time = mock.PropertyMock(return_value=14)
|
||||
expected_r = {
|
||||
'action': 'download_object',
|
||||
'container': 'test_c',
|
||||
'object': 'test_o',
|
||||
'success': True,
|
||||
'response_dict': {},
|
||||
'path': 'test_o',
|
||||
'pseudodir': False,
|
||||
'read_length': 30,
|
||||
'attempts': 2,
|
||||
'start_time': 0,
|
||||
'headers_receipt': 1,
|
||||
'finish_time': 2,
|
||||
'auth_end_time': mock_conn.auth_end_time,
|
||||
}
|
||||
|
||||
s = SwiftService()
|
||||
with mock.patch('swiftclient.service.time', side_effect=range(3)):
|
||||
with mock.patch('swiftclient.service.get_conn',
|
||||
return_value=mock_conn):
|
||||
r = s._download_object_job(
|
||||
conn=mock_conn,
|
||||
container='test_c',
|
||||
obj='test_o',
|
||||
options={'out_file': f.name,
|
||||
'header': {},
|
||||
'no_download': True,
|
||||
'yes_all': False,
|
||||
'skip_identical': True})
|
||||
|
||||
self._assertDictEqual(r, expected_r)
|
||||
self.assertEqual(mock_conn.get_object.mock_calls, [
|
||||
mock.call('test_c',
|
||||
'test_o',
|
||||
resp_chunk_size=65536,
|
||||
headers={'If-None-Match': on_disk_md5},
|
||||
query_string='multipart-manifest=get',
|
||||
response_dict={}),
|
||||
mock.call('test_c_segments',
|
||||
'test_sub_slo',
|
||||
query_string='multipart-manifest=get'),
|
||||
mock.call('test_c',
|
||||
'test_o',
|
||||
resp_chunk_size=65536,
|
||||
headers={'If-None-Match': on_disk_md5},
|
||||
response_dict={})])
|
||||
|
Loading…
x
Reference in New Issue
Block a user