diff --git a/swiftclient/service.py b/swiftclient/service.py index 7b5ecd44..ed5e9e98 100644 --- a/swiftclient/service.py +++ b/swiftclient/service.py @@ -1502,7 +1502,8 @@ class SwiftService(object): if hasattr(s, 'read'): # We've got a file like object to upload to o file_future = self.thread_manager.object_uu_pool.submit( - self._upload_object_job, container, s, o, object_options + self._upload_object_job, container, s, o, object_options, + results_queue=rq ) details['file'] = s details['object'] = o @@ -1784,6 +1785,132 @@ class SwiftService(object): if fp is not None: fp.close() + @staticmethod + def _put_object(conn, container, name, content, headers=None, md5=None): + """ + Upload object into a given container and verify the resulting ETag, if + the md5 optional parameter is passed. + + :param conn: The Swift connection to use for uploads. + :param container: The container to put the object into. + :param name: The name of the object. + :param content: Object content. + :param headers: Headers (optional) to associate with the object. + :param md5: MD5 sum of the content. If passed in, will be used to + verify the returned ETag. + + :returns: A dictionary as the response from calling put_object. + The keys are: + - status + - reason + - headers + On error, the dictionary contains the following keys: + - success (with value False) + - error - the encountered exception (object) + - error_timestamp + - response_dict - results from the put_object call, as + documented above + - attempts - number of attempts made + """ + if headers is None: + headers = {} + else: + headers = dict(headers) + if md5 is not None: + headers['etag'] = md5 + results = {} + try: + etag = conn.put_object( + container, name, content, content_length=len(content), + headers=headers, response_dict=results) + if md5 is not None and etag != md5: + raise SwiftError('Upload verification failed for {0}: md5 ' + 'mismatch {1} != {2}'.format(name, md5, etag)) + results['success'] = True + except Exception as err: + traceback, err_time = report_traceback() + logger.exception(err) + return { + 'success': False, + 'error': err, + 'error_timestamp': err_time, + 'response_dict': results, + 'attempts': conn.attempts, + 'traceback': traceback + } + return results + + @staticmethod + def _upload_stream_segment(conn, container, object_name, + segment_container, segment_name, + segment_size, segment_index, + headers, fd): + """ + Upload a segment from a stream, buffering it in memory first. The + resulting object is placed either as a segment in the segment + container, or if it is smaller than a single segment, as the given + object name. + + :param conn: Swift Connection to use. + :param container: Container in which the object would be placed. + :param object_name: Name of the final object (used in case the stream + is smaller than the segment_size) + :param segment_container: Container to hold the object segments. + :param segment_name: The name of the segment. + :param segment_size: Minimum segment size. + :param segment_index: The segment index. + :param headers: Headers to attach to the segment/object. + :param fd: File-like handle for the content. Must implement read(). + + :returns: Dictionary, containing the following keys: + - complete -- whether the stream is exhausted + - segment_size - the actual size of the segment (may be + smaller than the passed in segment_size) + - segment_location - path to the segment + - segment_index - index of the segment + - segment_etag - the ETag for the segment + """ + buf = [] + dgst = md5() + bytes_read = 0 + while bytes_read < segment_size: + data = fd.read(segment_size - bytes_read) + if not data: + break + bytes_read += len(data) + dgst.update(data) + buf.append(data) + buf = b''.join(buf) + segment_hash = dgst.hexdigest() + + if not buf and segment_index > 0: + # Happens if the segment size aligns with the object size + return {'complete': True, + 'segment_size': 0, + 'segment_index': None, + 'segment_etag': None, + 'segment_location': None, + 'success': True} + + if segment_index == 0 and len(buf) < segment_size: + ret = SwiftService._put_object( + conn, container, object_name, buf, headers, segment_hash) + ret['segment_location'] = '/%s/%s' % (container, object_name) + else: + ret = SwiftService._put_object( + conn, segment_container, segment_name, buf, headers, + segment_hash) + ret['segment_location'] = '/%s/%s' % ( + segment_container, segment_name) + + ret.update( + dict(complete=len(buf) < segment_size, + segment_size=len(buf), + segment_index=segment_index, + segment_etag=segment_hash, + for_object=object_name)) + return ret + def _get_chunk_data(self, conn, container, obj, headers, manifest=None): chunks = [] if 'x-object-manifest' in headers: @@ -1833,6 +1960,47 @@ class SwiftService(object): # Each chunk is verified; check that we're at the end of the file return not fp.read(1) + @staticmethod + def _upload_slo_manifest(conn, segment_results, container, obj, headers): + """ + Upload an SLO manifest, given the results of uploading each segment, to + the specified container. + + :param segment_results: List of response_dict structures, as populated + by _upload_segment_job. Specifically, each + entry must container the following keys: + - segment_location + - segment_etag + - segment_size + - segment_index + :param container: The container to put the manifest into. + :param obj: The name of the manifest object to use. + :param headers: Optional set of headers to attach to the manifest. + """ + if headers is None: + headers = {} + segment_results.sort(key=lambda di: di['segment_index']) + for seg in segment_results: + seg_loc = seg['segment_location'].lstrip('/') + if isinstance(seg_loc, text_type): + seg_loc = seg_loc.encode('utf-8') + + manifest_data = json.dumps([ + { + 'path': d['segment_location'], + 'etag': d['segment_etag'], + 'size_bytes': d['segment_size'] + } for d in segment_results + ]) + + response = {} + conn.put_object( + container, obj, manifest_data, + headers=headers, + query_string='multipart-manifest=put', + response_dict=response) + return response + def _upload_object_job(self, conn, container, source, obj, options, results_queue=None): if obj.startswith('./') or obj.startswith('.\\'): @@ -1990,29 +2158,11 @@ class SwiftService(object): res['segment_results'] = segment_results if options['use_slo']: - segment_results.sort(key=lambda di: di['segment_index']) - for seg in segment_results: - seg_loc = seg['segment_location'].lstrip('/') - if isinstance(seg_loc, text_type): - seg_loc = seg_loc.encode('utf-8') - new_slo_manifest_paths.add(seg_loc) - - manifest_data = json.dumps([ - { - 'path': d['segment_location'], - 'etag': d['segment_etag'], - 'size_bytes': d['segment_size'] - } for d in segment_results - ]) - - mr = {} - conn.put_object( - container, obj, manifest_data, - headers=put_headers, - query_string='multipart-manifest=put', - response_dict=mr - ) - res['manifest_response_dict'] = mr + response = self._upload_slo_manifest( + conn, segment_results, container, obj, put_headers) + res['manifest_response_dict'] = response + new_slo_manifest_paths = { + seg['segment_location'] for seg in segment_results} else: new_object_manifest = '%s/%s/%s/%s/%s/' % ( quote(seg_container.encode('utf8')), @@ -2030,6 +2180,51 @@ class SwiftService(object): response_dict=mr ) res['manifest_response_dict'] = mr + elif options['use_slo'] and segment_size and not path: + segment = 0 + results = [] + while True: + segment_name = '%s/slo/%s/%s/%08d' % ( + obj, put_headers['x-object-meta-mtime'], + segment_size, segment + ) + seg_container = container + '_segments' + if options['segment_container']: + seg_container = options['segment_container'] + ret = self._upload_stream_segment( + conn, container, obj, + seg_container, + segment_name, + segment_size, + segment, + put_headers, + stream + ) + if not ret['success']: + return ret + if (ret['complete'] and segment == 0) or\ + ret['segment_size'] > 0: + results.append(ret) + if results_queue is not None: + # Don't insert the 0-sized segments or objects + # themselves + if ret['segment_location'] != '/%s/%s' % ( + container, obj) and ret['segment_size'] > 0: + results_queue.put(ret) + if ret['complete']: + break + segment += 1 + if results[0]['segment_location'] != '/%s/%s' % ( + container, obj): + response = self._upload_slo_manifest( + conn, results, container, obj, put_headers) + res['manifest_response_dict'] = response + new_slo_manifest_paths = { + r['segment_location'] for r in results} + res['large_object'] = True + else: + res['response_dict'] = ret + res['large_object'] = False else: res['large_object'] = False obr = {} @@ -2063,7 +2258,6 @@ class SwiftService(object): finally: if fp is not None: fp.close() - if old_manifest or old_slo_manifest_paths: drs = [] delobjsmap = {} diff --git a/swiftclient/shell.py b/swiftclient/shell.py index 43fcf475..d02c709f 100755 --- a/swiftclient/shell.py +++ b/swiftclient/shell.py @@ -947,6 +947,8 @@ Optional arguments: def st_upload(parser, args, output_manager): + DEFAULT_STDIN_SEGMENT = 10 * 1024 * 1024 + parser.add_argument( '-c', '--changed', action='store_true', dest='changed', default=False, help='Only upload files that have changed since ' @@ -1060,6 +1062,12 @@ def st_upload(parser, args, output_manager): st_upload_help) return + if from_stdin: + if not options['use_slo']: + options['use_slo'] = True + if not options['segment_size']: + options['segment_size'] = DEFAULT_STDIN_SEGMENT + options['object_uu_threads'] = options['object_threads'] with SwiftService(options=options) as swift: try: diff --git a/tests/unit/test_service.py b/tests/unit/test_service.py index 5ccc0813..12fbaa00 100644 --- a/tests/unit/test_service.py +++ b/tests/unit/test_service.py @@ -36,6 +36,8 @@ from swiftclient.service import ( SwiftService, SwiftError, SwiftUploadObject ) +from tests.unit import utils as test_utils + clean_os_environ = {} environ_prefixes = ('ST_', 'OS_') @@ -1088,6 +1090,83 @@ class TestService(unittest.TestCase): self.assertEqual(upload_obj_resp['path'], obj['path']) self.assertTrue(mock_open.return_value.closed) + @mock.patch('swiftclient.service.Connection') + def test_upload_stream(self, mock_conn): + service = SwiftService({}) + + stream = test_utils.FakeStream(2048) + segment_etag = md5(b'A' * 1024).hexdigest() + + mock_conn.return_value.head_object.side_effect = \ + ClientException('Not Found', http_status=404) + mock_conn.return_value.put_object.return_value = \ + segment_etag + options = {'use_slo': True, 'segment_size': 1024} + resp_iter = service.upload( + 'container', + [SwiftUploadObject(stream, object_name='streamed')], + options) + responses = [x for x in resp_iter] + for resp in responses: + self.assertFalse('error' in resp) + self.assertTrue(resp['success']) + self.assertEqual(5, len(responses)) + container_resp, segment_container_resp = responses[0:2] + segment_response = responses[2:4] + upload_obj_resp = responses[-1] + self.assertEqual(container_resp['action'], + 'create_container') + self.assertEqual(upload_obj_resp['action'], + 'upload_object') + self.assertEqual(upload_obj_resp['object'], + 'streamed') + self.assertTrue(upload_obj_resp['path'] is None) + self.assertTrue(upload_obj_resp['large_object']) + self.assertIn('manifest_response_dict', upload_obj_resp) + self.assertEqual(upload_obj_resp['manifest_response_dict'], {}) + for i, resp in enumerate(segment_response): + self.assertEqual(i, resp['segment_index']) + self.assertEqual(1024, resp['segment_size']) + self.assertEqual('d47b127bc2de2d687ddc82dac354c415', + resp['segment_etag']) + self.assertTrue(resp['segment_location'].endswith( + '/0000000%d' % i)) + self.assertTrue(resp['segment_location'].startswith( + '/container_segments/streamed')) + + @mock.patch('swiftclient.service.Connection') + def test_upload_stream_fits_in_one_segment(self, mock_conn): + service = SwiftService({}) + + stream = test_utils.FakeStream(2048) + whole_etag = md5(b'A' * 2048).hexdigest() + + mock_conn.return_value.head_object.side_effect = \ + ClientException('Not Found', http_status=404) + mock_conn.return_value.put_object.return_value = \ + whole_etag + options = {'use_slo': True, 'segment_size': 10240} + resp_iter = service.upload( + 'container', + [SwiftUploadObject(stream, object_name='streamed')], + options) + responses = [x for x in resp_iter] + for resp in responses: + self.assertNotIn('error', resp) + self.assertTrue(resp['success']) + self.assertEqual(3, len(responses)) + container_resp, segment_container_resp = responses[0:2] + upload_obj_resp = responses[-1] + self.assertEqual(container_resp['action'], + 'create_container') + self.assertEqual(upload_obj_resp['action'], + 'upload_object') + self.assertEqual(upload_obj_resp['object'], + 'streamed') + self.assertTrue(upload_obj_resp['path'] is None) + self.assertFalse(upload_obj_resp['large_object']) + self.assertNotIn('manifest_response_dict', upload_obj_resp) + class TestServiceUpload(_TestServiceBase): @@ -1226,6 +1305,141 @@ class TestServiceUpload(_TestServiceBase): self.assertIsInstance(contents, utils.LengthWrapper) self.assertEqual(len(contents), 10) + def test_upload_stream_segment(self): + common_params = { + 'segment_container': 'segments', + 'segment_name': 'test_stream_2', + 'container': 'test_stream', + 'object': 'stream_object', + } + tests = [ + {'test_params': { + 'segment_size': 1024, + 'segment_index': 2, + 'content_size': 1024}, + 'put_object_args': { + 'container': 'segments', + 'object': 'test_stream_2'}, + 'expected': { + 'complete': False, + 'segment_etag': md5(b'A' * 1024).hexdigest()}}, + {'test_params': { + 'segment_size': 2048, + 'segment_index': 0, + 'content_size': 512}, + 'put_object_args': { + 'container': 'test_stream', + 'object': 'stream_object'}, + 'expected': { + 'complete': True, + 'segment_etag': md5(b'A' * 512).hexdigest()}}, + # 0-sized segment should not be uploaded + {'test_params': { + 'segment_size': 1024, + 'segment_index': 1, + 'content_size': 0}, + 'put_object_args': {}, + 'expected': { + 'complete': True}}, + # 0-sized objects should be uploaded + {'test_params': { + 'segment_size': 1024, + 'segment_index': 0, + 'content_size': 0}, + 'put_object_args': { + 'container': 'test_stream', + 'object': 'stream_object'}, + 'expected': { + 'complete': True, + 'segment_etag': md5(b'').hexdigest()}}, + # Test boundary conditions + {'test_params': { + 'segment_size': 1024, + 'segment_index': 1, + 'content_size': 1023}, + 'put_object_args': { + 'container': 'segments', + 'object': 'test_stream_2'}, + 'expected': { + 'complete': True, + 'segment_etag': md5(b'A' * 1023).hexdigest()}}, + {'test_params': { + 'segment_size': 2048, + 'segment_index': 0, + 'content_size': 2047}, + 'put_object_args': { + 'container': 'test_stream', + 'object': 'stream_object'}, + 'expected': { + 'complete': True, + 'segment_etag': md5(b'A' * 2047).hexdigest()}}, + {'test_params': { + 'segment_size': 1024, + 'segment_index': 2, + 'content_size': 1025}, + 'put_object_args': { + 'container': 'segments', + 'object': 'test_stream_2'}, + 'expected': { + 'complete': False, + 'segment_etag': md5(b'A' * 1024).hexdigest()}}, + ] + + for test_args in tests: + params = test_args['test_params'] + stream = test_utils.FakeStream(params['content_size']) + segment_size = params['segment_size'] + segment_index = params['segment_index'] + + def _fake_put_object(*args, **kwargs): + contents = args[2] + # Consume and compute md5 + return md5(contents).hexdigest() + + mock_conn = mock.Mock() + mock_conn.put_object.side_effect = _fake_put_object + + s = SwiftService() + resp = s._upload_stream_segment( + conn=mock_conn, + container=common_params['container'], + object_name=common_params['object'], + segment_container=common_params['segment_container'], + segment_name=common_params['segment_name'], + segment_size=segment_size, + segment_index=segment_index, + headers={}, + fd=stream) + expected_args = test_args['expected'] + put_args = test_args['put_object_args'] + expected_response = { + 'segment_size': min(len(stream), segment_size), + 'complete': expected_args['complete'], + 'success': True, + } + if len(stream) or segment_index == 0: + segment_location = '/%s/%s' % (put_args['container'], + put_args['object']) + expected_response.update( + {'segment_index': segment_index, + 'segment_location': segment_location, + 'segment_etag': expected_args['segment_etag'], + 'for_object': common_params['object']}) + mock_conn.put_object.assert_called_once_with( + put_args['container'], + put_args['object'], + mock.ANY, + content_length=min(len(stream), segment_size), + headers={'etag': expected_args['segment_etag']}, + response_dict=mock.ANY) + else: + self.assertEqual([], mock_conn.put_object.mock_calls) + expected_response.update( + {'segment_index': None, + 'segment_location': None, + 'segment_etag': None}) + self.assertEqual(expected_response, resp) + def test_etag_mismatch_with_ignore_checksum(self): def _consuming_conn(*a, **kw): contents = a[2] diff --git a/tests/unit/utils.py b/tests/unit/utils.py index c05146ec..2def73f5 100644 --- a/tests/unit/utils.py +++ b/tests/unit/utils.py @@ -548,3 +548,24 @@ def _make_fake_import_keystone_client(fake_import): return fake_import, fake_import return _fake_import_keystone_client + + +class FakeStream(object): + def __init__(self, size): + self.bytes_read = 0 + self.size = size + + def read(self, size=-1): + if self.bytes_read == self.size: + return b'' + + if size == -1 or size + self.bytes_read > self.size: + remaining = self.size - self.bytes_read + self.bytes_read = self.size + return b'A' * remaining + + self.bytes_read += size + return b'A' * size + + def __len__(self): + return self.size