Merge "Compare each chunk of large objects when uploading"
This commit is contained in:
@@ -45,7 +45,7 @@ class OutputManager(object):
|
|||||||
"""
|
"""
|
||||||
DEFAULT_OFFSET = 14
|
DEFAULT_OFFSET = 14
|
||||||
|
|
||||||
def __init__(self, print_stream=sys.stdout, error_stream=sys.stderr):
|
def __init__(self, print_stream=None, error_stream=None):
|
||||||
"""
|
"""
|
||||||
:param print_stream: The stream to which :meth:`print_msg` sends
|
:param print_stream: The stream to which :meth:`print_msg` sends
|
||||||
formatted messages.
|
formatted messages.
|
||||||
@@ -54,9 +54,10 @@ class OutputManager(object):
|
|||||||
|
|
||||||
On Python 2, Unicode messages are encoded to utf8.
|
On Python 2, Unicode messages are encoded to utf8.
|
||||||
"""
|
"""
|
||||||
self.print_stream = print_stream
|
self.print_stream = print_stream or sys.stdout
|
||||||
self.print_pool = ThreadPoolExecutor(max_workers=1)
|
self.print_pool = ThreadPoolExecutor(max_workers=1)
|
||||||
self.error_stream = error_stream
|
|
||||||
|
self.error_stream = error_stream or sys.stderr
|
||||||
self.error_print_pool = ThreadPoolExecutor(max_workers=1)
|
self.error_print_pool = ThreadPoolExecutor(max_workers=1)
|
||||||
self.error_count = 0
|
self.error_count = 0
|
||||||
|
|
||||||
|
@@ -1160,7 +1160,7 @@ class SwiftService(object):
|
|||||||
'headers': [],
|
'headers': [],
|
||||||
'segment_size': None,
|
'segment_size': None,
|
||||||
'use_slo': False,
|
'use_slo': False,
|
||||||
'segment_container: None,
|
'segment_container': None,
|
||||||
'leave_segments': False,
|
'leave_segments': False,
|
||||||
'changed': None,
|
'changed': None,
|
||||||
'skip_identical': False,
|
'skip_identical': False,
|
||||||
@@ -1505,6 +1505,51 @@ class SwiftService(object):
|
|||||||
results_queue.put(res)
|
results_queue.put(res)
|
||||||
return res
|
return res
|
||||||
|
|
||||||
|
def _get_chunk_data(self, conn, container, obj, headers):
|
||||||
|
chunks = []
|
||||||
|
if 'x-object-manifest' in headers:
|
||||||
|
scontainer, sprefix = headers['x-object-manifest'].split('/', 1)
|
||||||
|
for part in self.list(scontainer, {'prefix': sprefix}):
|
||||||
|
if part["success"]:
|
||||||
|
chunks.extend(part["listing"])
|
||||||
|
else:
|
||||||
|
raise part["error"]
|
||||||
|
elif config_true_value(headers.get('x-static-large-object')):
|
||||||
|
_, manifest_data = conn.get_object(
|
||||||
|
container, obj, query_string='multipart-manifest=get')
|
||||||
|
for chunk in json.loads(manifest_data):
|
||||||
|
if chunk.get('sub_slo'):
|
||||||
|
scont, sobj = chunk['name'].lstrip('/').split('/', 1)
|
||||||
|
chunks.extend(self._get_chunk_data(
|
||||||
|
conn, scont, sobj, {'x-static-large-object': True}))
|
||||||
|
else:
|
||||||
|
chunks.append(chunk)
|
||||||
|
else:
|
||||||
|
chunks.append({'hash': headers.get('etag').strip('"'),
|
||||||
|
'bytes': int(headers.get('content-length'))})
|
||||||
|
return chunks
|
||||||
|
|
||||||
|
def _is_identical(self, chunk_data, path):
|
||||||
|
try:
|
||||||
|
fp = open(path, 'rb')
|
||||||
|
except IOError:
|
||||||
|
return False
|
||||||
|
|
||||||
|
with fp:
|
||||||
|
for chunk in chunk_data:
|
||||||
|
to_read = chunk['bytes']
|
||||||
|
md5sum = md5()
|
||||||
|
while to_read:
|
||||||
|
data = fp.read(min(65536, to_read))
|
||||||
|
if not data:
|
||||||
|
return False
|
||||||
|
md5sum.update(data)
|
||||||
|
to_read -= len(data)
|
||||||
|
if md5sum.hexdigest() != chunk['hash']:
|
||||||
|
return False
|
||||||
|
# Each chunk is verified; check that we're at the end of the file
|
||||||
|
return not fp.read(1)
|
||||||
|
|
||||||
def _upload_object_job(self, conn, container, source, obj, options,
|
def _upload_object_job(self, conn, container, source, obj, options,
|
||||||
results_queue=None):
|
results_queue=None):
|
||||||
res = {
|
res = {
|
||||||
@@ -1536,27 +1581,22 @@ class SwiftService(object):
|
|||||||
old_manifest = None
|
old_manifest = None
|
||||||
old_slo_manifest_paths = []
|
old_slo_manifest_paths = []
|
||||||
new_slo_manifest_paths = set()
|
new_slo_manifest_paths = set()
|
||||||
|
segment_size = int(0 if options['segment_size'] is None
|
||||||
|
else options['segment_size'])
|
||||||
if (options['changed'] or options['skip_identical']
|
if (options['changed'] or options['skip_identical']
|
||||||
or not options['leave_segments']):
|
or not options['leave_segments']):
|
||||||
checksum = None
|
|
||||||
if options['skip_identical']:
|
|
||||||
try:
|
|
||||||
fp = open(path, 'rb')
|
|
||||||
except IOError:
|
|
||||||
pass
|
|
||||||
else:
|
|
||||||
with fp:
|
|
||||||
md5sum = md5()
|
|
||||||
while True:
|
|
||||||
data = fp.read(65536)
|
|
||||||
if not data:
|
|
||||||
break
|
|
||||||
md5sum.update(data)
|
|
||||||
checksum = md5sum.hexdigest()
|
|
||||||
try:
|
try:
|
||||||
headers = conn.head_object(container, obj)
|
headers = conn.head_object(container, obj)
|
||||||
if options['skip_identical'] and checksum is not None:
|
is_slo = config_true_value(
|
||||||
if checksum == headers.get('etag'):
|
headers.get('x-static-large-object'))
|
||||||
|
|
||||||
|
if options['skip_identical'] or (
|
||||||
|
is_slo and not options['leave_segments']):
|
||||||
|
chunk_data = self._get_chunk_data(
|
||||||
|
conn, container, obj, headers)
|
||||||
|
|
||||||
|
if options['skip_identical'] and self._is_identical(
|
||||||
|
chunk_data, path):
|
||||||
res.update({
|
res.update({
|
||||||
'success': True,
|
'success': True,
|
||||||
'status': 'skipped-identical'
|
'status': 'skipped-identical'
|
||||||
@@ -1575,13 +1615,8 @@ class SwiftService(object):
|
|||||||
return res
|
return res
|
||||||
if not options['leave_segments']:
|
if not options['leave_segments']:
|
||||||
old_manifest = headers.get('x-object-manifest')
|
old_manifest = headers.get('x-object-manifest')
|
||||||
if config_true_value(
|
if is_slo:
|
||||||
headers.get('x-static-large-object')):
|
for old_seg in chunk_data:
|
||||||
headers, manifest_data = conn.get_object(
|
|
||||||
container, obj,
|
|
||||||
query_string='multipart-manifest=get'
|
|
||||||
)
|
|
||||||
for old_seg in json.loads(manifest_data):
|
|
||||||
seg_path = old_seg['name'].lstrip('/')
|
seg_path = old_seg['name'].lstrip('/')
|
||||||
if isinstance(seg_path, text_type):
|
if isinstance(seg_path, text_type):
|
||||||
seg_path = seg_path.encode('utf-8')
|
seg_path = seg_path.encode('utf-8')
|
||||||
@@ -1601,8 +1636,8 @@ class SwiftService(object):
|
|||||||
# a segment job if we're reading from a stream - we may fail if we
|
# a segment job if we're reading from a stream - we may fail if we
|
||||||
# go over the single object limit, but this gives us a nice way
|
# go over the single object limit, but this gives us a nice way
|
||||||
# to create objects from memory
|
# to create objects from memory
|
||||||
if (path is not None and options['segment_size']
|
if (path is not None and segment_size
|
||||||
and (getsize(path) > int(options['segment_size']))):
|
and (getsize(path) > segment_size)):
|
||||||
res['large_object'] = True
|
res['large_object'] = True
|
||||||
seg_container = container + '_segments'
|
seg_container = container + '_segments'
|
||||||
if options['segment_container']:
|
if options['segment_container']:
|
||||||
@@ -1615,7 +1650,6 @@ class SwiftService(object):
|
|||||||
segment_start = 0
|
segment_start = 0
|
||||||
|
|
||||||
while segment_start < full_size:
|
while segment_start < full_size:
|
||||||
segment_size = int(options['segment_size'])
|
|
||||||
if segment_start + segment_size > full_size:
|
if segment_start + segment_size > full_size:
|
||||||
segment_size = full_size - segment_start
|
segment_size = full_size - segment_start
|
||||||
if options['use_slo']:
|
if options['use_slo']:
|
||||||
|
@@ -51,8 +51,13 @@ class TestFunctional(testtools.TestCase):
|
|||||||
auth_ssl = config.getboolean('func_test', 'auth_ssl')
|
auth_ssl = config.getboolean('func_test', 'auth_ssl')
|
||||||
auth_prefix = config.get('func_test', 'auth_prefix')
|
auth_prefix = config.get('func_test', 'auth_prefix')
|
||||||
self.auth_version = config.get('func_test', 'auth_version')
|
self.auth_version = config.get('func_test', 'auth_version')
|
||||||
self.account = config.get('func_test', 'account')
|
try:
|
||||||
self.username = config.get('func_test', 'username')
|
self.account_username = config.get('func_test',
|
||||||
|
'account_username')
|
||||||
|
except configparser.NoOptionError:
|
||||||
|
account = config.get('func_test', 'account')
|
||||||
|
username = config.get('func_test', 'username')
|
||||||
|
self.account_username = "%s:%s" % (account, username)
|
||||||
self.password = config.get('func_test', 'password')
|
self.password = config.get('func_test', 'password')
|
||||||
self.auth_url = ""
|
self.auth_url = ""
|
||||||
if auth_ssl:
|
if auth_ssl:
|
||||||
@@ -62,7 +67,6 @@ class TestFunctional(testtools.TestCase):
|
|||||||
self.auth_url += "%s:%s%s" % (auth_host, auth_port, auth_prefix)
|
self.auth_url += "%s:%s%s" % (auth_host, auth_port, auth_prefix)
|
||||||
if self.auth_version == "1":
|
if self.auth_version == "1":
|
||||||
self.auth_url += 'v1.0'
|
self.auth_url += 'v1.0'
|
||||||
self.account_username = "%s:%s" % (self.account, self.username)
|
|
||||||
|
|
||||||
else:
|
else:
|
||||||
self.skip_tests = True
|
self.skip_tests = True
|
||||||
|
@@ -860,3 +860,131 @@ class TestServiceUpload(testtools.TestCase):
|
|||||||
|
|
||||||
contents = mock_conn.put_object.call_args[0][2]
|
contents = mock_conn.put_object.call_args[0][2]
|
||||||
self.assertEqual(contents.get_md5sum(), md5(b'a' * 30).hexdigest())
|
self.assertEqual(contents.get_md5sum(), md5(b'a' * 30).hexdigest())
|
||||||
|
|
||||||
|
def test_upload_object_job_identical_etag(self):
|
||||||
|
with tempfile.NamedTemporaryFile() as f:
|
||||||
|
f.write(b'a' * 30)
|
||||||
|
f.flush()
|
||||||
|
|
||||||
|
mock_conn = mock.Mock()
|
||||||
|
mock_conn.head_object.return_value = {
|
||||||
|
'content-length': 30,
|
||||||
|
'etag': md5(b'a' * 30).hexdigest()}
|
||||||
|
type(mock_conn).attempts = mock.PropertyMock(return_value=2)
|
||||||
|
|
||||||
|
s = SwiftService()
|
||||||
|
r = s._upload_object_job(conn=mock_conn,
|
||||||
|
container='test_c',
|
||||||
|
source=f.name,
|
||||||
|
obj='test_o',
|
||||||
|
options={'changed': False,
|
||||||
|
'skip_identical': True,
|
||||||
|
'leave_segments': True,
|
||||||
|
'header': '',
|
||||||
|
'segment_size': 0})
|
||||||
|
|
||||||
|
self.assertTrue(r['success'])
|
||||||
|
self.assertIn('status', r)
|
||||||
|
self.assertEqual(r['status'], 'skipped-identical')
|
||||||
|
self.assertEqual(mock_conn.put_object.call_count, 0)
|
||||||
|
self.assertEqual(mock_conn.head_object.call_count, 1)
|
||||||
|
mock_conn.head_object.assert_called_with('test_c', 'test_o')
|
||||||
|
|
||||||
|
def test_upload_object_job_identical_slo_with_nesting(self):
|
||||||
|
with tempfile.NamedTemporaryFile() as f:
|
||||||
|
f.write(b'a' * 30)
|
||||||
|
f.flush()
|
||||||
|
seg_etag = md5(b'a' * 10).hexdigest()
|
||||||
|
submanifest = "[%s]" % ",".join(
|
||||||
|
['{"bytes":10,"hash":"%s"}' % seg_etag] * 2)
|
||||||
|
submanifest_etag = md5(seg_etag.encode('ascii') * 2).hexdigest()
|
||||||
|
manifest = "[%s]" % ",".join([
|
||||||
|
'{"sub_slo":true,"name":"/test_c_segments/test_sub_slo",'
|
||||||
|
'"bytes":20,"hash":"%s"}' % submanifest_etag,
|
||||||
|
'{"bytes":10,"hash":"%s"}' % seg_etag])
|
||||||
|
|
||||||
|
mock_conn = mock.Mock()
|
||||||
|
mock_conn.head_object.return_value = {
|
||||||
|
'x-static-large-object': True,
|
||||||
|
'content-length': 30,
|
||||||
|
'etag': md5(submanifest_etag.encode('ascii') +
|
||||||
|
seg_etag.encode('ascii')).hexdigest()}
|
||||||
|
mock_conn.get_object.side_effect = [
|
||||||
|
(None, manifest),
|
||||||
|
(None, submanifest)]
|
||||||
|
type(mock_conn).attempts = mock.PropertyMock(return_value=2)
|
||||||
|
|
||||||
|
s = SwiftService()
|
||||||
|
r = s._upload_object_job(conn=mock_conn,
|
||||||
|
container='test_c',
|
||||||
|
source=f.name,
|
||||||
|
obj='test_o',
|
||||||
|
options={'changed': False,
|
||||||
|
'skip_identical': True,
|
||||||
|
'leave_segments': True,
|
||||||
|
'header': '',
|
||||||
|
'segment_size': 10})
|
||||||
|
|
||||||
|
self.assertIsNone(r.get('error'))
|
||||||
|
self.assertTrue(r['success'])
|
||||||
|
self.assertEqual('skipped-identical', r.get('status'))
|
||||||
|
self.assertEqual(0, mock_conn.put_object.call_count)
|
||||||
|
self.assertEqual([mock.call('test_c', 'test_o')],
|
||||||
|
mock_conn.head_object.mock_calls)
|
||||||
|
self.assertEqual([
|
||||||
|
mock.call('test_c', 'test_o',
|
||||||
|
query_string='multipart-manifest=get'),
|
||||||
|
mock.call('test_c_segments', 'test_sub_slo',
|
||||||
|
query_string='multipart-manifest=get'),
|
||||||
|
], mock_conn.get_object.mock_calls)
|
||||||
|
|
||||||
|
def test_upload_object_job_identical_dlo(self):
|
||||||
|
with tempfile.NamedTemporaryFile() as f:
|
||||||
|
f.write(b'a' * 30)
|
||||||
|
f.flush()
|
||||||
|
segment_etag = md5(b'a' * 10).hexdigest()
|
||||||
|
|
||||||
|
mock_conn = mock.Mock()
|
||||||
|
mock_conn.head_object.return_value = {
|
||||||
|
'x-object-manifest': 'test_c_segments/test_o/prefix',
|
||||||
|
'content-length': 30,
|
||||||
|
'etag': md5(segment_etag.encode('ascii') * 3).hexdigest()}
|
||||||
|
mock_conn.get_container.side_effect = [
|
||||||
|
(None, [{"bytes": 10, "hash": segment_etag,
|
||||||
|
"name": "test_o/prefix/00"},
|
||||||
|
{"bytes": 10, "hash": segment_etag,
|
||||||
|
"name": "test_o/prefix/01"}]),
|
||||||
|
(None, [{"bytes": 10, "hash": segment_etag,
|
||||||
|
"name": "test_o/prefix/02"}]),
|
||||||
|
(None, {})]
|
||||||
|
type(mock_conn).attempts = mock.PropertyMock(return_value=2)
|
||||||
|
|
||||||
|
s = SwiftService()
|
||||||
|
with mock.patch('swiftclient.service.get_conn',
|
||||||
|
return_value=mock_conn):
|
||||||
|
r = s._upload_object_job(conn=mock_conn,
|
||||||
|
container='test_c',
|
||||||
|
source=f.name,
|
||||||
|
obj='test_o',
|
||||||
|
options={'changed': False,
|
||||||
|
'skip_identical': True,
|
||||||
|
'leave_segments': True,
|
||||||
|
'header': '',
|
||||||
|
'segment_size': 10})
|
||||||
|
|
||||||
|
self.assertIsNone(r.get('error'))
|
||||||
|
self.assertTrue(r['success'])
|
||||||
|
self.assertEqual('skipped-identical', r.get('status'))
|
||||||
|
self.assertEqual(0, mock_conn.put_object.call_count)
|
||||||
|
self.assertEqual(1, mock_conn.head_object.call_count)
|
||||||
|
self.assertEqual(3, mock_conn.get_container.call_count)
|
||||||
|
mock_conn.head_object.assert_called_with('test_c', 'test_o')
|
||||||
|
expected = [
|
||||||
|
mock.call('test_c_segments', prefix='test_o/prefix',
|
||||||
|
marker='', delimiter=None),
|
||||||
|
mock.call('test_c_segments', prefix='test_o/prefix',
|
||||||
|
marker="test_o/prefix/01", delimiter=None),
|
||||||
|
mock.call('test_c_segments', prefix='test_o/prefix',
|
||||||
|
marker="test_o/prefix/02", delimiter=None),
|
||||||
|
]
|
||||||
|
mock_conn.get_container.assert_has_calls(expected)
|
||||||
|
@@ -208,6 +208,12 @@ class MockHttpTest(testtools.TestCase):
|
|||||||
self.fake_connect = None
|
self.fake_connect = None
|
||||||
self.request_log = []
|
self.request_log = []
|
||||||
|
|
||||||
|
# Capture output, since the test-runner stdout/stderr moneky-patching
|
||||||
|
# won't cover the references to sys.stdout/sys.stderr in
|
||||||
|
# swiftclient.multithreading
|
||||||
|
self.capture_output = CaptureOutput()
|
||||||
|
self.capture_output.__enter__()
|
||||||
|
|
||||||
def fake_http_connection(*args, **kwargs):
|
def fake_http_connection(*args, **kwargs):
|
||||||
self.validateMockedRequestsConsumed()
|
self.validateMockedRequestsConsumed()
|
||||||
self.request_log = []
|
self.request_log = []
|
||||||
@@ -368,6 +374,7 @@ class MockHttpTest(testtools.TestCase):
|
|||||||
# un-hygienic mocking on the swiftclient.client module; which may lead
|
# un-hygienic mocking on the swiftclient.client module; which may lead
|
||||||
# to some unfortunate test order dependency bugs by way of the broken
|
# to some unfortunate test order dependency bugs by way of the broken
|
||||||
# window theory if any other modules are similarly patched
|
# window theory if any other modules are similarly patched
|
||||||
|
self.capture_output.__exit__()
|
||||||
reload_module(c)
|
reload_module(c)
|
||||||
|
|
||||||
|
|
||||||
@@ -393,7 +400,7 @@ class CaptureStream(object):
|
|||||||
self.stream = stream
|
self.stream = stream
|
||||||
self._capture = six.StringIO()
|
self._capture = six.StringIO()
|
||||||
self._buffer = CaptureStreamBuffer(self)
|
self._buffer = CaptureStreamBuffer(self)
|
||||||
self.streams = [self.stream, self._capture]
|
self.streams = [self._capture]
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def buffer(self):
|
def buffer(self):
|
||||||
|
Reference in New Issue
Block a user