Compare each chunk of large objects when uploading

Previously, we compared the ETag from Swift against the MD5 of the
entire large object. However, the ETag for large objects is generally
the MD5 of the concatenation of the ETags for each segment, unless the
object is a DLO whose segments span more than one page of a container
listing. Rather than worry about ETags, just compare each chunk of the
segmented file. This allows the use of --skip-identical when uploading
SLOs and DLOs.

Additionally, there are several test-related improvements:
 * The default arguments for OutputManager are now evaluated on
   construction, rather than on definition, so that
   TestOutputManager.test_instantiation will succeed when using nosetest
   as a test runner. (See also: bug 1251507)
 * An account_username option is now available in the functional tests
   config file for auth systems that do not follow the account:username
   format.
 * CaptureOutput no longer writes to the captured stream, and
   MockHttpTest now captures output. These were polluting test output
   unnecessarily. (See also: bug 1201376)

Change-Id: Ic484e9a0c186c9283c4012c6a2fa77b96b8edf8a
Closes-Bug: #1201376
Closes-Bug: #1379252
Related-Bug: #1251507
This commit is contained in:
Tim Burke 2015-03-03 12:35:03 -08:00
parent 925c01ebfb
commit a4fb70ece1
5 changed files with 214 additions and 40 deletions

@ -45,7 +45,7 @@ class OutputManager(object):
"""
DEFAULT_OFFSET = 14
def __init__(self, print_stream=sys.stdout, error_stream=sys.stderr):
def __init__(self, print_stream=None, error_stream=None):
"""
:param print_stream: The stream to which :meth:`print_msg` sends
formatted messages.
@ -54,9 +54,10 @@ class OutputManager(object):
On Python 2, Unicode messages are encoded to utf8.
"""
self.print_stream = print_stream
self.print_stream = print_stream or sys.stdout
self.print_pool = ThreadPoolExecutor(max_workers=1)
self.error_stream = error_stream
self.error_stream = error_stream or sys.stderr
self.error_print_pool = ThreadPoolExecutor(max_workers=1)
self.error_count = 0

@ -1159,7 +1159,7 @@ class SwiftService(object):
'headers': [],
'segment_size': None,
'use_slo': False,
'segment_container: None,
'segment_container': None,
'leave_segments': False,
'changed': None,
'skip_identical': False,
@ -1502,6 +1502,51 @@ class SwiftService(object):
results_queue.put(res)
return res
def _get_chunk_data(self, conn, container, obj, headers):
chunks = []
if 'x-object-manifest' in headers:
scontainer, sprefix = headers['x-object-manifest'].split('/', 1)
for part in self.list(scontainer, {'prefix': sprefix}):
if part["success"]:
chunks.extend(part["listing"])
else:
raise part["error"]
elif config_true_value(headers.get('x-static-large-object')):
_, manifest_data = conn.get_object(
container, obj, query_string='multipart-manifest=get')
for chunk in json.loads(manifest_data):
if chunk.get('sub_slo'):
scont, sobj = chunk['name'].lstrip('/').split('/', 1)
chunks.extend(self._get_chunk_data(
conn, scont, sobj, {'x-static-large-object': True}))
else:
chunks.append(chunk)
else:
chunks.append({'hash': headers.get('etag').strip('"'),
'bytes': int(headers.get('content-length'))})
return chunks
def _is_identical(self, chunk_data, path):
try:
fp = open(path, 'rb')
except IOError:
return False
with fp:
for chunk in chunk_data:
to_read = chunk['bytes']
md5sum = md5()
while to_read:
data = fp.read(min(65536, to_read))
if not data:
return False
md5sum.update(data)
to_read -= len(data)
if md5sum.hexdigest() != chunk['hash']:
return False
# Each chunk is verified; check that we're at the end of the file
return not fp.read(1)
def _upload_object_job(self, conn, container, source, obj, options,
results_queue=None):
res = {
@ -1533,32 +1578,27 @@ class SwiftService(object):
old_manifest = None
old_slo_manifest_paths = []
new_slo_manifest_paths = set()
segment_size = int(0 if options['segment_size'] is None
else options['segment_size'])
if (options['changed'] or options['skip_identical']
or not options['leave_segments']):
checksum = None
if options['skip_identical']:
try:
fp = open(path, 'rb')
except IOError:
pass
else:
with fp:
md5sum = md5()
while True:
data = fp.read(65536)
if not data:
break
md5sum.update(data)
checksum = md5sum.hexdigest()
try:
headers = conn.head_object(container, obj)
if options['skip_identical'] and checksum is not None:
if checksum == headers.get('etag'):
res.update({
'success': True,
'status': 'skipped-identical'
})
return res
is_slo = config_true_value(
headers.get('x-static-large-object'))
if options['skip_identical'] or (
is_slo and not options['leave_segments']):
chunk_data = self._get_chunk_data(
conn, container, obj, headers)
if options['skip_identical'] and self._is_identical(
chunk_data, path):
res.update({
'success': True,
'status': 'skipped-identical'
})
return res
cl = int(headers.get('content-length'))
mt = headers.get('x-object-meta-mtime')
@ -1572,13 +1612,8 @@ class SwiftService(object):
return res
if not options['leave_segments']:
old_manifest = headers.get('x-object-manifest')
if config_true_value(
headers.get('x-static-large-object')):
headers, manifest_data = conn.get_object(
container, obj,
query_string='multipart-manifest=get'
)
for old_seg in json.loads(manifest_data):
if is_slo:
for old_seg in chunk_data:
seg_path = old_seg['name'].lstrip('/')
if isinstance(seg_path, text_type):
seg_path = seg_path.encode('utf-8')
@ -1598,8 +1633,8 @@ class SwiftService(object):
# a segment job if we're reading from a stream - we may fail if we
# go over the single object limit, but this gives us a nice way
# to create objects from memory
if (path is not None and options['segment_size']
and (getsize(path) > int(options['segment_size']))):
if (path is not None and segment_size
and (getsize(path) > segment_size)):
res['large_object'] = True
seg_container = container + '_segments'
if options['segment_container']:
@ -1612,7 +1647,6 @@ class SwiftService(object):
segment_start = 0
while segment_start < full_size:
segment_size = int(options['segment_size'])
if segment_start + segment_size > full_size:
segment_size = full_size - segment_start
if options['use_slo']:

@ -51,8 +51,13 @@ class TestFunctional(testtools.TestCase):
auth_ssl = config.getboolean('func_test', 'auth_ssl')
auth_prefix = config.get('func_test', 'auth_prefix')
self.auth_version = config.get('func_test', 'auth_version')
self.account = config.get('func_test', 'account')
self.username = config.get('func_test', 'username')
try:
self.account_username = config.get('func_test',
'account_username')
except configparser.NoOptionError:
account = config.get('func_test', 'account')
username = config.get('func_test', 'username')
self.account_username = "%s:%s" % (account, username)
self.password = config.get('func_test', 'password')
self.auth_url = ""
if auth_ssl:
@ -62,7 +67,6 @@ class TestFunctional(testtools.TestCase):
self.auth_url += "%s:%s%s" % (auth_host, auth_port, auth_prefix)
if self.auth_version == "1":
self.auth_url += 'v1.0'
self.account_username = "%s:%s" % (self.account, self.username)
else:
self.skip_tests = True

@ -817,3 +817,131 @@ class TestServiceUpload(testtools.TestCase):
contents = mock_conn.put_object.call_args[0][2]
self.assertEqual(contents.get_md5sum(), md5(b'a' * 30).hexdigest())
def test_upload_object_job_identical_etag(self):
with tempfile.NamedTemporaryFile() as f:
f.write(b'a' * 30)
f.flush()
mock_conn = mock.Mock()
mock_conn.head_object.return_value = {
'content-length': 30,
'etag': md5(b'a' * 30).hexdigest()}
type(mock_conn).attempts = mock.PropertyMock(return_value=2)
s = SwiftService()
r = s._upload_object_job(conn=mock_conn,
container='test_c',
source=f.name,
obj='test_o',
options={'changed': False,
'skip_identical': True,
'leave_segments': True,
'header': '',
'segment_size': 0})
self.assertTrue(r['success'])
self.assertIn('status', r)
self.assertEqual(r['status'], 'skipped-identical')
self.assertEqual(mock_conn.put_object.call_count, 0)
self.assertEqual(mock_conn.head_object.call_count, 1)
mock_conn.head_object.assert_called_with('test_c', 'test_o')
def test_upload_object_job_identical_slo_with_nesting(self):
with tempfile.NamedTemporaryFile() as f:
f.write(b'a' * 30)
f.flush()
seg_etag = md5(b'a' * 10).hexdigest()
submanifest = "[%s]" % ",".join(
['{"bytes":10,"hash":"%s"}' % seg_etag] * 2)
submanifest_etag = md5(seg_etag.encode('ascii') * 2).hexdigest()
manifest = "[%s]" % ",".join([
'{"sub_slo":true,"name":"/test_c_segments/test_sub_slo",'
'"bytes":20,"hash":"%s"}' % submanifest_etag,
'{"bytes":10,"hash":"%s"}' % seg_etag])
mock_conn = mock.Mock()
mock_conn.head_object.return_value = {
'x-static-large-object': True,
'content-length': 30,
'etag': md5(submanifest_etag.encode('ascii') +
seg_etag.encode('ascii')).hexdigest()}
mock_conn.get_object.side_effect = [
(None, manifest),
(None, submanifest)]
type(mock_conn).attempts = mock.PropertyMock(return_value=2)
s = SwiftService()
r = s._upload_object_job(conn=mock_conn,
container='test_c',
source=f.name,
obj='test_o',
options={'changed': False,
'skip_identical': True,
'leave_segments': True,
'header': '',
'segment_size': 10})
self.assertIsNone(r.get('error'))
self.assertTrue(r['success'])
self.assertEqual('skipped-identical', r.get('status'))
self.assertEqual(0, mock_conn.put_object.call_count)
self.assertEqual([mock.call('test_c', 'test_o')],
mock_conn.head_object.mock_calls)
self.assertEqual([
mock.call('test_c', 'test_o',
query_string='multipart-manifest=get'),
mock.call('test_c_segments', 'test_sub_slo',
query_string='multipart-manifest=get'),
], mock_conn.get_object.mock_calls)
def test_upload_object_job_identical_dlo(self):
with tempfile.NamedTemporaryFile() as f:
f.write(b'a' * 30)
f.flush()
segment_etag = md5(b'a' * 10).hexdigest()
mock_conn = mock.Mock()
mock_conn.head_object.return_value = {
'x-object-manifest': 'test_c_segments/test_o/prefix',
'content-length': 30,
'etag': md5(segment_etag.encode('ascii') * 3).hexdigest()}
mock_conn.get_container.side_effect = [
(None, [{"bytes": 10, "hash": segment_etag,
"name": "test_o/prefix/00"},
{"bytes": 10, "hash": segment_etag,
"name": "test_o/prefix/01"}]),
(None, [{"bytes": 10, "hash": segment_etag,
"name": "test_o/prefix/02"}]),
(None, {})]
type(mock_conn).attempts = mock.PropertyMock(return_value=2)
s = SwiftService()
with mock.patch('swiftclient.service.get_conn',
return_value=mock_conn):
r = s._upload_object_job(conn=mock_conn,
container='test_c',
source=f.name,
obj='test_o',
options={'changed': False,
'skip_identical': True,
'leave_segments': True,
'header': '',
'segment_size': 10})
self.assertIsNone(r.get('error'))
self.assertTrue(r['success'])
self.assertEqual('skipped-identical', r.get('status'))
self.assertEqual(0, mock_conn.put_object.call_count)
self.assertEqual(1, mock_conn.head_object.call_count)
self.assertEqual(3, mock_conn.get_container.call_count)
mock_conn.head_object.assert_called_with('test_c', 'test_o')
expected = [
mock.call('test_c_segments', prefix='test_o/prefix',
marker='', delimiter=None),
mock.call('test_c_segments', prefix='test_o/prefix',
marker="test_o/prefix/01", delimiter=None),
mock.call('test_c_segments', prefix='test_o/prefix',
marker="test_o/prefix/02", delimiter=None),
]
mock_conn.get_container.assert_has_calls(expected)

@ -207,6 +207,12 @@ class MockHttpTest(testtools.TestCase):
self.fake_connect = None
self.request_log = []
# Capture output, since the test-runner stdout/stderr moneky-patching
# won't cover the references to sys.stdout/sys.stderr in
# swiftclient.multithreading
self.capture_output = CaptureOutput()
self.capture_output.__enter__()
def fake_http_connection(*args, **kwargs):
self.validateMockedRequestsConsumed()
self.request_log = []
@ -367,6 +373,7 @@ class MockHttpTest(testtools.TestCase):
# un-hygienic mocking on the swiftclient.client module; which may lead
# to some unfortunate test order dependency bugs by way of the broken
# window theory if any other modules are similarly patched
self.capture_output.__exit__()
reload_module(c)
@ -392,7 +399,7 @@ class CaptureStream(object):
self.stream = stream
self._capture = six.StringIO()
self._buffer = CaptureStreamBuffer(self)
self.streams = [self.stream, self._capture]
self.streams = [self._capture]
@property
def buffer(self):