From a4fb70ece189aff85f234ab6b3f275b69e936c03 Mon Sep 17 00:00:00 2001
From: Tim Burke <tim.burke@gmail.com>
Date: Tue, 3 Mar 2015 12:35:03 -0800
Subject: [PATCH] Compare each chunk of large objects when uploading

Previously, we compared the ETag from Swift against the MD5 of the
entire large object. However, the ETag for large objects is generally
the MD5 of the concatenation of the ETags for each segment, unless the
object is a DLO whose segments span more than one page of a container
listing. Rather than worry about ETags, just compare each chunk of the
segmented file. This allows the use of --skip-identical when uploading
SLOs and DLOs.

Additionally, there are several test-related improvements:
 * The default arguments for OutputManager are now evaluated on
   construction, rather than on definition, so that
   TestOutputManager.test_instantiation will succeed when using nosetest
   as a test runner. (See also: bug 1251507)
 * An account_username option is now available in the functional tests
   config file for auth systems that do not follow the account:username
   format.
 * CaptureOutput no longer writes to the captured stream, and
   MockHttpTest now captures output. These were polluting test output
   unnecessarily. (See also: bug 1201376)

Change-Id: Ic484e9a0c186c9283c4012c6a2fa77b96b8edf8a
Closes-Bug: #1201376
Closes-Bug: #1379252
Related-Bug: #1251507
---
 swiftclient/multithreading.py        |   7 +-
 swiftclient/service.py               | 100 ++++++++++++++-------
 tests/functional/test_swiftclient.py |  10 ++-
 tests/unit/test_service.py           | 128 +++++++++++++++++++++++++++
 tests/unit/utils.py                  |   9 +-
 5 files changed, 214 insertions(+), 40 deletions(-)

diff --git a/swiftclient/multithreading.py b/swiftclient/multithreading.py
index 32d8ffa8..c53d9870 100644
--- a/swiftclient/multithreading.py
+++ b/swiftclient/multithreading.py
@@ -45,7 +45,7 @@ class OutputManager(object):
     """
     DEFAULT_OFFSET = 14
 
-    def __init__(self, print_stream=sys.stdout, error_stream=sys.stderr):
+    def __init__(self, print_stream=None, error_stream=None):
         """
         :param print_stream: The stream to which :meth:`print_msg` sends
                              formatted messages.
@@ -54,9 +54,10 @@ class OutputManager(object):
 
         On Python 2, Unicode messages are encoded to utf8.
         """
-        self.print_stream = print_stream
+        self.print_stream = print_stream or sys.stdout
         self.print_pool = ThreadPoolExecutor(max_workers=1)
-        self.error_stream = error_stream
+
+        self.error_stream = error_stream or sys.stderr
         self.error_print_pool = ThreadPoolExecutor(max_workers=1)
         self.error_count = 0
 
diff --git a/swiftclient/service.py b/swiftclient/service.py
index f24d4300..bb132ddb 100644
--- a/swiftclient/service.py
+++ b/swiftclient/service.py
@@ -1159,7 +1159,7 @@ class SwiftService(object):
                                 'headers': [],
                                 'segment_size': None,
                                 'use_slo': False,
-                                'segment_container: None,
+                                'segment_container': None,
                                 'leave_segments': False,
                                 'changed': None,
                                 'skip_identical': False,
@@ -1502,6 +1502,51 @@ class SwiftService(object):
                 results_queue.put(res)
             return res
 
+    def _get_chunk_data(self, conn, container, obj, headers):
+        chunks = []
+        if 'x-object-manifest' in headers:
+            scontainer, sprefix = headers['x-object-manifest'].split('/', 1)
+            for part in self.list(scontainer, {'prefix': sprefix}):
+                if part["success"]:
+                    chunks.extend(part["listing"])
+                else:
+                    raise part["error"]
+        elif config_true_value(headers.get('x-static-large-object')):
+            _, manifest_data = conn.get_object(
+                container, obj, query_string='multipart-manifest=get')
+            for chunk in json.loads(manifest_data):
+                if chunk.get('sub_slo'):
+                    scont, sobj = chunk['name'].lstrip('/').split('/', 1)
+                    chunks.extend(self._get_chunk_data(
+                        conn, scont, sobj, {'x-static-large-object': True}))
+                else:
+                    chunks.append(chunk)
+        else:
+            chunks.append({'hash': headers.get('etag').strip('"'),
+                           'bytes': int(headers.get('content-length'))})
+        return chunks
+
+    def _is_identical(self, chunk_data, path):
+        try:
+            fp = open(path, 'rb')
+        except IOError:
+            return False
+
+        with fp:
+            for chunk in chunk_data:
+                to_read = chunk['bytes']
+                md5sum = md5()
+                while to_read:
+                    data = fp.read(min(65536, to_read))
+                    if not data:
+                        return False
+                    md5sum.update(data)
+                    to_read -= len(data)
+                if md5sum.hexdigest() != chunk['hash']:
+                    return False
+            # Each chunk is verified; check that we're at the end of the file
+            return not fp.read(1)
+
     def _upload_object_job(self, conn, container, source, obj, options,
                            results_queue=None):
         res = {
@@ -1533,32 +1578,27 @@ class SwiftService(object):
             old_manifest = None
             old_slo_manifest_paths = []
             new_slo_manifest_paths = set()
+            segment_size = int(0 if options['segment_size'] is None
+                               else options['segment_size'])
             if (options['changed'] or options['skip_identical']
                     or not options['leave_segments']):
-                checksum = None
-                if options['skip_identical']:
-                    try:
-                        fp = open(path, 'rb')
-                    except IOError:
-                        pass
-                    else:
-                        with fp:
-                            md5sum = md5()
-                            while True:
-                                data = fp.read(65536)
-                                if not data:
-                                    break
-                                md5sum.update(data)
-                        checksum = md5sum.hexdigest()
                 try:
                     headers = conn.head_object(container, obj)
-                    if options['skip_identical'] and checksum is not None:
-                        if checksum == headers.get('etag'):
-                            res.update({
-                                'success': True,
-                                'status': 'skipped-identical'
-                            })
-                            return res
+                    is_slo = config_true_value(
+                        headers.get('x-static-large-object'))
+
+                    if options['skip_identical'] or (
+                            is_slo and not options['leave_segments']):
+                        chunk_data = self._get_chunk_data(
+                            conn, container, obj, headers)
+
+                    if options['skip_identical'] and self._is_identical(
+                            chunk_data, path):
+                        res.update({
+                            'success': True,
+                            'status': 'skipped-identical'
+                        })
+                        return res
 
                     cl = int(headers.get('content-length'))
                     mt = headers.get('x-object-meta-mtime')
@@ -1572,13 +1612,8 @@ class SwiftService(object):
                         return res
                     if not options['leave_segments']:
                         old_manifest = headers.get('x-object-manifest')
-                        if config_true_value(
-                                headers.get('x-static-large-object')):
-                            headers, manifest_data = conn.get_object(
-                                container, obj,
-                                query_string='multipart-manifest=get'
-                            )
-                            for old_seg in json.loads(manifest_data):
+                        if is_slo:
+                            for old_seg in chunk_data:
                                 seg_path = old_seg['name'].lstrip('/')
                                 if isinstance(seg_path, text_type):
                                     seg_path = seg_path.encode('utf-8')
@@ -1598,8 +1633,8 @@ class SwiftService(object):
             # a segment job if we're reading from a stream - we may fail if we
             # go over the single object limit, but this gives us a nice way
             # to create objects from memory
-            if (path is not None and options['segment_size']
-                    and (getsize(path) > int(options['segment_size']))):
+            if (path is not None and segment_size
+                    and (getsize(path) > segment_size)):
                 res['large_object'] = True
                 seg_container = container + '_segments'
                 if options['segment_container']:
@@ -1612,7 +1647,6 @@ class SwiftService(object):
                 segment_start = 0
 
                 while segment_start < full_size:
-                    segment_size = int(options['segment_size'])
                     if segment_start + segment_size > full_size:
                         segment_size = full_size - segment_start
                     if options['use_slo']:
diff --git a/tests/functional/test_swiftclient.py b/tests/functional/test_swiftclient.py
index f5d14aa3..4b57f1d9 100644
--- a/tests/functional/test_swiftclient.py
+++ b/tests/functional/test_swiftclient.py
@@ -51,8 +51,13 @@ class TestFunctional(testtools.TestCase):
             auth_ssl = config.getboolean('func_test', 'auth_ssl')
             auth_prefix = config.get('func_test', 'auth_prefix')
             self.auth_version = config.get('func_test', 'auth_version')
-            self.account = config.get('func_test', 'account')
-            self.username = config.get('func_test', 'username')
+            try:
+                self.account_username = config.get('func_test',
+                                                   'account_username')
+            except configparser.NoOptionError:
+                account = config.get('func_test', 'account')
+                username = config.get('func_test', 'username')
+                self.account_username = "%s:%s" % (account, username)
             self.password = config.get('func_test', 'password')
             self.auth_url = ""
             if auth_ssl:
@@ -62,7 +67,6 @@ class TestFunctional(testtools.TestCase):
             self.auth_url += "%s:%s%s" % (auth_host, auth_port, auth_prefix)
             if self.auth_version == "1":
                 self.auth_url += 'v1.0'
-            self.account_username = "%s:%s" % (self.account, self.username)
 
         else:
             self.skip_tests = True
diff --git a/tests/unit/test_service.py b/tests/unit/test_service.py
index 3309813f..073f06ea 100644
--- a/tests/unit/test_service.py
+++ b/tests/unit/test_service.py
@@ -817,3 +817,131 @@ class TestServiceUpload(testtools.TestCase):
 
             contents = mock_conn.put_object.call_args[0][2]
             self.assertEqual(contents.get_md5sum(), md5(b'a' * 30).hexdigest())
+
+    def test_upload_object_job_identical_etag(self):
+        with tempfile.NamedTemporaryFile() as f:
+            f.write(b'a' * 30)
+            f.flush()
+
+            mock_conn = mock.Mock()
+            mock_conn.head_object.return_value = {
+                'content-length': 30,
+                'etag': md5(b'a' * 30).hexdigest()}
+            type(mock_conn).attempts = mock.PropertyMock(return_value=2)
+
+            s = SwiftService()
+            r = s._upload_object_job(conn=mock_conn,
+                                     container='test_c',
+                                     source=f.name,
+                                     obj='test_o',
+                                     options={'changed': False,
+                                              'skip_identical': True,
+                                              'leave_segments': True,
+                                              'header': '',
+                                              'segment_size': 0})
+
+            self.assertTrue(r['success'])
+            self.assertIn('status', r)
+            self.assertEqual(r['status'], 'skipped-identical')
+            self.assertEqual(mock_conn.put_object.call_count, 0)
+            self.assertEqual(mock_conn.head_object.call_count, 1)
+            mock_conn.head_object.assert_called_with('test_c', 'test_o')
+
+    def test_upload_object_job_identical_slo_with_nesting(self):
+        with tempfile.NamedTemporaryFile() as f:
+            f.write(b'a' * 30)
+            f.flush()
+            seg_etag = md5(b'a' * 10).hexdigest()
+            submanifest = "[%s]" % ",".join(
+                ['{"bytes":10,"hash":"%s"}' % seg_etag] * 2)
+            submanifest_etag = md5(seg_etag.encode('ascii') * 2).hexdigest()
+            manifest = "[%s]" % ",".join([
+                '{"sub_slo":true,"name":"/test_c_segments/test_sub_slo",'
+                '"bytes":20,"hash":"%s"}' % submanifest_etag,
+                '{"bytes":10,"hash":"%s"}' % seg_etag])
+
+            mock_conn = mock.Mock()
+            mock_conn.head_object.return_value = {
+                'x-static-large-object': True,
+                'content-length': 30,
+                'etag': md5(submanifest_etag.encode('ascii') +
+                            seg_etag.encode('ascii')).hexdigest()}
+            mock_conn.get_object.side_effect = [
+                (None, manifest),
+                (None, submanifest)]
+            type(mock_conn).attempts = mock.PropertyMock(return_value=2)
+
+            s = SwiftService()
+            r = s._upload_object_job(conn=mock_conn,
+                                     container='test_c',
+                                     source=f.name,
+                                     obj='test_o',
+                                     options={'changed': False,
+                                              'skip_identical': True,
+                                              'leave_segments': True,
+                                              'header': '',
+                                              'segment_size': 10})
+
+            self.assertIsNone(r.get('error'))
+            self.assertTrue(r['success'])
+            self.assertEqual('skipped-identical', r.get('status'))
+            self.assertEqual(0, mock_conn.put_object.call_count)
+            self.assertEqual([mock.call('test_c', 'test_o')],
+                             mock_conn.head_object.mock_calls)
+            self.assertEqual([
+                mock.call('test_c', 'test_o',
+                          query_string='multipart-manifest=get'),
+                mock.call('test_c_segments', 'test_sub_slo',
+                          query_string='multipart-manifest=get'),
+            ], mock_conn.get_object.mock_calls)
+
+    def test_upload_object_job_identical_dlo(self):
+        with tempfile.NamedTemporaryFile() as f:
+            f.write(b'a' * 30)
+            f.flush()
+            segment_etag = md5(b'a' * 10).hexdigest()
+
+            mock_conn = mock.Mock()
+            mock_conn.head_object.return_value = {
+                'x-object-manifest': 'test_c_segments/test_o/prefix',
+                'content-length': 30,
+                'etag': md5(segment_etag.encode('ascii') * 3).hexdigest()}
+            mock_conn.get_container.side_effect = [
+                (None, [{"bytes": 10, "hash": segment_etag,
+                         "name": "test_o/prefix/00"},
+                        {"bytes": 10, "hash": segment_etag,
+                         "name": "test_o/prefix/01"}]),
+                (None, [{"bytes": 10, "hash": segment_etag,
+                         "name": "test_o/prefix/02"}]),
+                (None, {})]
+            type(mock_conn).attempts = mock.PropertyMock(return_value=2)
+
+            s = SwiftService()
+            with mock.patch('swiftclient.service.get_conn',
+                            return_value=mock_conn):
+                r = s._upload_object_job(conn=mock_conn,
+                                         container='test_c',
+                                         source=f.name,
+                                         obj='test_o',
+                                         options={'changed': False,
+                                                  'skip_identical': True,
+                                                  'leave_segments': True,
+                                                  'header': '',
+                                                  'segment_size': 10})
+
+            self.assertIsNone(r.get('error'))
+            self.assertTrue(r['success'])
+            self.assertEqual('skipped-identical', r.get('status'))
+            self.assertEqual(0, mock_conn.put_object.call_count)
+            self.assertEqual(1, mock_conn.head_object.call_count)
+            self.assertEqual(3, mock_conn.get_container.call_count)
+            mock_conn.head_object.assert_called_with('test_c', 'test_o')
+            expected = [
+                mock.call('test_c_segments', prefix='test_o/prefix',
+                          marker='', delimiter=None),
+                mock.call('test_c_segments', prefix='test_o/prefix',
+                          marker="test_o/prefix/01", delimiter=None),
+                mock.call('test_c_segments', prefix='test_o/prefix',
+                          marker="test_o/prefix/02", delimiter=None),
+            ]
+            mock_conn.get_container.assert_has_calls(expected)
diff --git a/tests/unit/utils.py b/tests/unit/utils.py
index 88d6d129..bb68f4fb 100644
--- a/tests/unit/utils.py
+++ b/tests/unit/utils.py
@@ -207,6 +207,12 @@ class MockHttpTest(testtools.TestCase):
         self.fake_connect = None
         self.request_log = []
 
+        # Capture output, since the test-runner stdout/stderr moneky-patching
+        # won't cover the references to sys.stdout/sys.stderr in
+        # swiftclient.multithreading
+        self.capture_output = CaptureOutput()
+        self.capture_output.__enter__()
+
         def fake_http_connection(*args, **kwargs):
             self.validateMockedRequestsConsumed()
             self.request_log = []
@@ -367,6 +373,7 @@ class MockHttpTest(testtools.TestCase):
         # un-hygienic mocking on the swiftclient.client module; which may lead
         # to some unfortunate test order dependency bugs by way of the broken
         # window theory if any other modules are similarly patched
+        self.capture_output.__exit__()
         reload_module(c)
 
 
@@ -392,7 +399,7 @@ class CaptureStream(object):
         self.stream = stream
         self._capture = six.StringIO()
         self._buffer = CaptureStreamBuffer(self)
-        self.streams = [self.stream, self._capture]
+        self.streams = [self._capture]
 
     @property
     def buffer(self):