diff --git a/swiftclient/multithreading.py b/swiftclient/multithreading.py index 32d8ffa8..c53d9870 100644 --- a/swiftclient/multithreading.py +++ b/swiftclient/multithreading.py @@ -45,7 +45,7 @@ class OutputManager(object): """ DEFAULT_OFFSET = 14 - def __init__(self, print_stream=sys.stdout, error_stream=sys.stderr): + def __init__(self, print_stream=None, error_stream=None): """ :param print_stream: The stream to which :meth:`print_msg` sends formatted messages. @@ -54,9 +54,10 @@ class OutputManager(object): On Python 2, Unicode messages are encoded to utf8. """ - self.print_stream = print_stream + self.print_stream = print_stream or sys.stdout self.print_pool = ThreadPoolExecutor(max_workers=1) - self.error_stream = error_stream + + self.error_stream = error_stream or sys.stderr self.error_print_pool = ThreadPoolExecutor(max_workers=1) self.error_count = 0 diff --git a/swiftclient/service.py b/swiftclient/service.py index f24d4300..bb132ddb 100644 --- a/swiftclient/service.py +++ b/swiftclient/service.py @@ -1159,7 +1159,7 @@ class SwiftService(object): 'headers': [], 'segment_size': None, 'use_slo': False, - 'segment_container: None, + 'segment_container': None, 'leave_segments': False, 'changed': None, 'skip_identical': False, @@ -1502,6 +1502,51 @@ class SwiftService(object): results_queue.put(res) return res + def _get_chunk_data(self, conn, container, obj, headers): + chunks = [] + if 'x-object-manifest' in headers: + scontainer, sprefix = headers['x-object-manifest'].split('/', 1) + for part in self.list(scontainer, {'prefix': sprefix}): + if part["success"]: + chunks.extend(part["listing"]) + else: + raise part["error"] + elif config_true_value(headers.get('x-static-large-object')): + _, manifest_data = conn.get_object( + container, obj, query_string='multipart-manifest=get') + for chunk in json.loads(manifest_data): + if chunk.get('sub_slo'): + scont, sobj = chunk['name'].lstrip('/').split('/', 1) + chunks.extend(self._get_chunk_data( + conn, scont, sobj, {'x-static-large-object': True})) + else: + chunks.append(chunk) + else: + chunks.append({'hash': headers.get('etag').strip('"'), + 'bytes': int(headers.get('content-length'))}) + return chunks + + def _is_identical(self, chunk_data, path): + try: + fp = open(path, 'rb') + except IOError: + return False + + with fp: + for chunk in chunk_data: + to_read = chunk['bytes'] + md5sum = md5() + while to_read: + data = fp.read(min(65536, to_read)) + if not data: + return False + md5sum.update(data) + to_read -= len(data) + if md5sum.hexdigest() != chunk['hash']: + return False + # Each chunk is verified; check that we're at the end of the file + return not fp.read(1) + def _upload_object_job(self, conn, container, source, obj, options, results_queue=None): res = { @@ -1533,32 +1578,27 @@ class SwiftService(object): old_manifest = None old_slo_manifest_paths = [] new_slo_manifest_paths = set() + segment_size = int(0 if options['segment_size'] is None + else options['segment_size']) if (options['changed'] or options['skip_identical'] or not options['leave_segments']): - checksum = None - if options['skip_identical']: - try: - fp = open(path, 'rb') - except IOError: - pass - else: - with fp: - md5sum = md5() - while True: - data = fp.read(65536) - if not data: - break - md5sum.update(data) - checksum = md5sum.hexdigest() try: headers = conn.head_object(container, obj) - if options['skip_identical'] and checksum is not None: - if checksum == headers.get('etag'): - res.update({ - 'success': True, - 'status': 'skipped-identical' - }) - return res + is_slo = config_true_value( + headers.get('x-static-large-object')) + + if options['skip_identical'] or ( + is_slo and not options['leave_segments']): + chunk_data = self._get_chunk_data( + conn, container, obj, headers) + + if options['skip_identical'] and self._is_identical( + chunk_data, path): + res.update({ + 'success': True, + 'status': 'skipped-identical' + }) + return res cl = int(headers.get('content-length')) mt = headers.get('x-object-meta-mtime') @@ -1572,13 +1612,8 @@ class SwiftService(object): return res if not options['leave_segments']: old_manifest = headers.get('x-object-manifest') - if config_true_value( - headers.get('x-static-large-object')): - headers, manifest_data = conn.get_object( - container, obj, - query_string='multipart-manifest=get' - ) - for old_seg in json.loads(manifest_data): + if is_slo: + for old_seg in chunk_data: seg_path = old_seg['name'].lstrip('/') if isinstance(seg_path, text_type): seg_path = seg_path.encode('utf-8') @@ -1598,8 +1633,8 @@ class SwiftService(object): # a segment job if we're reading from a stream - we may fail if we # go over the single object limit, but this gives us a nice way # to create objects from memory - if (path is not None and options['segment_size'] - and (getsize(path) > int(options['segment_size']))): + if (path is not None and segment_size + and (getsize(path) > segment_size)): res['large_object'] = True seg_container = container + '_segments' if options['segment_container']: @@ -1612,7 +1647,6 @@ class SwiftService(object): segment_start = 0 while segment_start < full_size: - segment_size = int(options['segment_size']) if segment_start + segment_size > full_size: segment_size = full_size - segment_start if options['use_slo']: diff --git a/tests/functional/test_swiftclient.py b/tests/functional/test_swiftclient.py index f5d14aa3..4b57f1d9 100644 --- a/tests/functional/test_swiftclient.py +++ b/tests/functional/test_swiftclient.py @@ -51,8 +51,13 @@ class TestFunctional(testtools.TestCase): auth_ssl = config.getboolean('func_test', 'auth_ssl') auth_prefix = config.get('func_test', 'auth_prefix') self.auth_version = config.get('func_test', 'auth_version') - self.account = config.get('func_test', 'account') - self.username = config.get('func_test', 'username') + try: + self.account_username = config.get('func_test', + 'account_username') + except configparser.NoOptionError: + account = config.get('func_test', 'account') + username = config.get('func_test', 'username') + self.account_username = "%s:%s" % (account, username) self.password = config.get('func_test', 'password') self.auth_url = "" if auth_ssl: @@ -62,7 +67,6 @@ class TestFunctional(testtools.TestCase): self.auth_url += "%s:%s%s" % (auth_host, auth_port, auth_prefix) if self.auth_version == "1": self.auth_url += 'v1.0' - self.account_username = "%s:%s" % (self.account, self.username) else: self.skip_tests = True diff --git a/tests/unit/test_service.py b/tests/unit/test_service.py index 3309813f..073f06ea 100644 --- a/tests/unit/test_service.py +++ b/tests/unit/test_service.py @@ -817,3 +817,131 @@ class TestServiceUpload(testtools.TestCase): contents = mock_conn.put_object.call_args[0][2] self.assertEqual(contents.get_md5sum(), md5(b'a' * 30).hexdigest()) + + def test_upload_object_job_identical_etag(self): + with tempfile.NamedTemporaryFile() as f: + f.write(b'a' * 30) + f.flush() + + mock_conn = mock.Mock() + mock_conn.head_object.return_value = { + 'content-length': 30, + 'etag': md5(b'a' * 30).hexdigest()} + type(mock_conn).attempts = mock.PropertyMock(return_value=2) + + s = SwiftService() + r = s._upload_object_job(conn=mock_conn, + container='test_c', + source=f.name, + obj='test_o', + options={'changed': False, + 'skip_identical': True, + 'leave_segments': True, + 'header': '', + 'segment_size': 0}) + + self.assertTrue(r['success']) + self.assertIn('status', r) + self.assertEqual(r['status'], 'skipped-identical') + self.assertEqual(mock_conn.put_object.call_count, 0) + self.assertEqual(mock_conn.head_object.call_count, 1) + mock_conn.head_object.assert_called_with('test_c', 'test_o') + + def test_upload_object_job_identical_slo_with_nesting(self): + with tempfile.NamedTemporaryFile() as f: + f.write(b'a' * 30) + f.flush() + seg_etag = md5(b'a' * 10).hexdigest() + submanifest = "[%s]" % ",".join( + ['{"bytes":10,"hash":"%s"}' % seg_etag] * 2) + submanifest_etag = md5(seg_etag.encode('ascii') * 2).hexdigest() + manifest = "[%s]" % ",".join([ + '{"sub_slo":true,"name":"/test_c_segments/test_sub_slo",' + '"bytes":20,"hash":"%s"}' % submanifest_etag, + '{"bytes":10,"hash":"%s"}' % seg_etag]) + + mock_conn = mock.Mock() + mock_conn.head_object.return_value = { + 'x-static-large-object': True, + 'content-length': 30, + 'etag': md5(submanifest_etag.encode('ascii') + + seg_etag.encode('ascii')).hexdigest()} + mock_conn.get_object.side_effect = [ + (None, manifest), + (None, submanifest)] + type(mock_conn).attempts = mock.PropertyMock(return_value=2) + + s = SwiftService() + r = s._upload_object_job(conn=mock_conn, + container='test_c', + source=f.name, + obj='test_o', + options={'changed': False, + 'skip_identical': True, + 'leave_segments': True, + 'header': '', + 'segment_size': 10}) + + self.assertIsNone(r.get('error')) + self.assertTrue(r['success']) + self.assertEqual('skipped-identical', r.get('status')) + self.assertEqual(0, mock_conn.put_object.call_count) + self.assertEqual([mock.call('test_c', 'test_o')], + mock_conn.head_object.mock_calls) + self.assertEqual([ + mock.call('test_c', 'test_o', + query_string='multipart-manifest=get'), + mock.call('test_c_segments', 'test_sub_slo', + query_string='multipart-manifest=get'), + ], mock_conn.get_object.mock_calls) + + def test_upload_object_job_identical_dlo(self): + with tempfile.NamedTemporaryFile() as f: + f.write(b'a' * 30) + f.flush() + segment_etag = md5(b'a' * 10).hexdigest() + + mock_conn = mock.Mock() + mock_conn.head_object.return_value = { + 'x-object-manifest': 'test_c_segments/test_o/prefix', + 'content-length': 30, + 'etag': md5(segment_etag.encode('ascii') * 3).hexdigest()} + mock_conn.get_container.side_effect = [ + (None, [{"bytes": 10, "hash": segment_etag, + "name": "test_o/prefix/00"}, + {"bytes": 10, "hash": segment_etag, + "name": "test_o/prefix/01"}]), + (None, [{"bytes": 10, "hash": segment_etag, + "name": "test_o/prefix/02"}]), + (None, {})] + type(mock_conn).attempts = mock.PropertyMock(return_value=2) + + s = SwiftService() + with mock.patch('swiftclient.service.get_conn', + return_value=mock_conn): + r = s._upload_object_job(conn=mock_conn, + container='test_c', + source=f.name, + obj='test_o', + options={'changed': False, + 'skip_identical': True, + 'leave_segments': True, + 'header': '', + 'segment_size': 10}) + + self.assertIsNone(r.get('error')) + self.assertTrue(r['success']) + self.assertEqual('skipped-identical', r.get('status')) + self.assertEqual(0, mock_conn.put_object.call_count) + self.assertEqual(1, mock_conn.head_object.call_count) + self.assertEqual(3, mock_conn.get_container.call_count) + mock_conn.head_object.assert_called_with('test_c', 'test_o') + expected = [ + mock.call('test_c_segments', prefix='test_o/prefix', + marker='', delimiter=None), + mock.call('test_c_segments', prefix='test_o/prefix', + marker="test_o/prefix/01", delimiter=None), + mock.call('test_c_segments', prefix='test_o/prefix', + marker="test_o/prefix/02", delimiter=None), + ] + mock_conn.get_container.assert_has_calls(expected) diff --git a/tests/unit/utils.py b/tests/unit/utils.py index 88d6d129..bb68f4fb 100644 --- a/tests/unit/utils.py +++ b/tests/unit/utils.py @@ -207,6 +207,12 @@ class MockHttpTest(testtools.TestCase): self.fake_connect = None self.request_log = [] + # Capture output, since the test-runner stdout/stderr moneky-patching + # won't cover the references to sys.stdout/sys.stderr in + # swiftclient.multithreading + self.capture_output = CaptureOutput() + self.capture_output.__enter__() + def fake_http_connection(*args, **kwargs): self.validateMockedRequestsConsumed() self.request_log = [] @@ -367,6 +373,7 @@ class MockHttpTest(testtools.TestCase): # un-hygienic mocking on the swiftclient.client module; which may lead # to some unfortunate test order dependency bugs by way of the broken # window theory if any other modules are similarly patched + self.capture_output.__exit__() reload_module(c) @@ -392,7 +399,7 @@ class CaptureStream(object): self.stream = stream self._capture = six.StringIO() self._buffer = CaptureStreamBuffer(self) - self.streams = [self.stream, self._capture] + self.streams = [self._capture] @property def buffer(self):