Allow for object uploads > 5GB from stdin.

When uploading from standard input, swiftclient should turn the upload
into an SLO in the case of large objects. This patch picks the
threshold as 10MB (and uses that as the default segment size). The
consumers can also supply the --segment-size option to alter that
threshold and the SLO segment size. The patch does buffer one segment
in memory (which is why 10MB default was chosen).

(test is updated)

Change-Id: Ib13e0b687bc85930c29fe9f151cf96bc53b2e594
This commit is contained in:
Timur Alperovich 2017-06-28 12:02:21 -07:00 committed by Tim Burke
parent a9b8f0a0d1
commit 2faea93287
4 changed files with 462 additions and 25 deletions

View File

@ -1502,7 +1502,8 @@ class SwiftService(object):
if hasattr(s, 'read'): if hasattr(s, 'read'):
# We've got a file like object to upload to o # We've got a file like object to upload to o
file_future = self.thread_manager.object_uu_pool.submit( file_future = self.thread_manager.object_uu_pool.submit(
self._upload_object_job, container, s, o, object_options self._upload_object_job, container, s, o, object_options,
results_queue=rq
) )
details['file'] = s details['file'] = s
details['object'] = o details['object'] = o
@ -1784,6 +1785,132 @@ class SwiftService(object):
if fp is not None: if fp is not None:
fp.close() fp.close()
@staticmethod
def _put_object(conn, container, name, content, headers=None, md5=None):
"""
Upload object into a given container and verify the resulting ETag, if
the md5 optional parameter is passed.
:param conn: The Swift connection to use for uploads.
:param container: The container to put the object into.
:param name: The name of the object.
:param content: Object content.
:param headers: Headers (optional) to associate with the object.
:param md5: MD5 sum of the content. If passed in, will be used to
verify the returned ETag.
:returns: A dictionary as the response from calling put_object.
The keys are:
- status
- reason
- headers
On error, the dictionary contains the following keys:
- success (with value False)
- error - the encountered exception (object)
- error_timestamp
- response_dict - results from the put_object call, as
documented above
- attempts - number of attempts made
"""
if headers is None:
headers = {}
else:
headers = dict(headers)
if md5 is not None:
headers['etag'] = md5
results = {}
try:
etag = conn.put_object(
container, name, content, content_length=len(content),
headers=headers, response_dict=results)
if md5 is not None and etag != md5:
raise SwiftError('Upload verification failed for {0}: md5 '
'mismatch {1} != {2}'.format(name, md5, etag))
results['success'] = True
except Exception as err:
traceback, err_time = report_traceback()
logger.exception(err)
return {
'success': False,
'error': err,
'error_timestamp': err_time,
'response_dict': results,
'attempts': conn.attempts,
'traceback': traceback
}
return results
@staticmethod
def _upload_stream_segment(conn, container, object_name,
segment_container, segment_name,
segment_size, segment_index,
headers, fd):
"""
Upload a segment from a stream, buffering it in memory first. The
resulting object is placed either as a segment in the segment
container, or if it is smaller than a single segment, as the given
object name.
:param conn: Swift Connection to use.
:param container: Container in which the object would be placed.
:param object_name: Name of the final object (used in case the stream
is smaller than the segment_size)
:param segment_container: Container to hold the object segments.
:param segment_name: The name of the segment.
:param segment_size: Minimum segment size.
:param segment_index: The segment index.
:param headers: Headers to attach to the segment/object.
:param fd: File-like handle for the content. Must implement read().
:returns: Dictionary, containing the following keys:
- complete -- whether the stream is exhausted
- segment_size - the actual size of the segment (may be
smaller than the passed in segment_size)
- segment_location - path to the segment
- segment_index - index of the segment
- segment_etag - the ETag for the segment
"""
buf = []
dgst = md5()
bytes_read = 0
while bytes_read < segment_size:
data = fd.read(segment_size - bytes_read)
if not data:
break
bytes_read += len(data)
dgst.update(data)
buf.append(data)
buf = b''.join(buf)
segment_hash = dgst.hexdigest()
if not buf and segment_index > 0:
# Happens if the segment size aligns with the object size
return {'complete': True,
'segment_size': 0,
'segment_index': None,
'segment_etag': None,
'segment_location': None,
'success': True}
if segment_index == 0 and len(buf) < segment_size:
ret = SwiftService._put_object(
conn, container, object_name, buf, headers, segment_hash)
ret['segment_location'] = '/%s/%s' % (container, object_name)
else:
ret = SwiftService._put_object(
conn, segment_container, segment_name, buf, headers,
segment_hash)
ret['segment_location'] = '/%s/%s' % (
segment_container, segment_name)
ret.update(
dict(complete=len(buf) < segment_size,
segment_size=len(buf),
segment_index=segment_index,
segment_etag=segment_hash,
for_object=object_name))
return ret
def _get_chunk_data(self, conn, container, obj, headers, manifest=None): def _get_chunk_data(self, conn, container, obj, headers, manifest=None):
chunks = [] chunks = []
if 'x-object-manifest' in headers: if 'x-object-manifest' in headers:
@ -1833,6 +1960,47 @@ class SwiftService(object):
# Each chunk is verified; check that we're at the end of the file # Each chunk is verified; check that we're at the end of the file
return not fp.read(1) return not fp.read(1)
@staticmethod
def _upload_slo_manifest(conn, segment_results, container, obj, headers):
"""
Upload an SLO manifest, given the results of uploading each segment, to
the specified container.
:param segment_results: List of response_dict structures, as populated
by _upload_segment_job. Specifically, each
entry must container the following keys:
- segment_location
- segment_etag
- segment_size
- segment_index
:param container: The container to put the manifest into.
:param obj: The name of the manifest object to use.
:param headers: Optional set of headers to attach to the manifest.
"""
if headers is None:
headers = {}
segment_results.sort(key=lambda di: di['segment_index'])
for seg in segment_results:
seg_loc = seg['segment_location'].lstrip('/')
if isinstance(seg_loc, text_type):
seg_loc = seg_loc.encode('utf-8')
manifest_data = json.dumps([
{
'path': d['segment_location'],
'etag': d['segment_etag'],
'size_bytes': d['segment_size']
} for d in segment_results
])
response = {}
conn.put_object(
container, obj, manifest_data,
headers=headers,
query_string='multipart-manifest=put',
response_dict=response)
return response
def _upload_object_job(self, conn, container, source, obj, options, def _upload_object_job(self, conn, container, source, obj, options,
results_queue=None): results_queue=None):
if obj.startswith('./') or obj.startswith('.\\'): if obj.startswith('./') or obj.startswith('.\\'):
@ -1990,29 +2158,11 @@ class SwiftService(object):
res['segment_results'] = segment_results res['segment_results'] = segment_results
if options['use_slo']: if options['use_slo']:
segment_results.sort(key=lambda di: di['segment_index']) response = self._upload_slo_manifest(
for seg in segment_results: conn, segment_results, container, obj, put_headers)
seg_loc = seg['segment_location'].lstrip('/') res['manifest_response_dict'] = response
if isinstance(seg_loc, text_type): new_slo_manifest_paths = {
seg_loc = seg_loc.encode('utf-8') seg['segment_location'] for seg in segment_results}
new_slo_manifest_paths.add(seg_loc)
manifest_data = json.dumps([
{
'path': d['segment_location'],
'etag': d['segment_etag'],
'size_bytes': d['segment_size']
} for d in segment_results
])
mr = {}
conn.put_object(
container, obj, manifest_data,
headers=put_headers,
query_string='multipart-manifest=put',
response_dict=mr
)
res['manifest_response_dict'] = mr
else: else:
new_object_manifest = '%s/%s/%s/%s/%s/' % ( new_object_manifest = '%s/%s/%s/%s/%s/' % (
quote(seg_container.encode('utf8')), quote(seg_container.encode('utf8')),
@ -2030,6 +2180,51 @@ class SwiftService(object):
response_dict=mr response_dict=mr
) )
res['manifest_response_dict'] = mr res['manifest_response_dict'] = mr
elif options['use_slo'] and segment_size and not path:
segment = 0
results = []
while True:
segment_name = '%s/slo/%s/%s/%08d' % (
obj, put_headers['x-object-meta-mtime'],
segment_size, segment
)
seg_container = container + '_segments'
if options['segment_container']:
seg_container = options['segment_container']
ret = self._upload_stream_segment(
conn, container, obj,
seg_container,
segment_name,
segment_size,
segment,
put_headers,
stream
)
if not ret['success']:
return ret
if (ret['complete'] and segment == 0) or\
ret['segment_size'] > 0:
results.append(ret)
if results_queue is not None:
# Don't insert the 0-sized segments or objects
# themselves
if ret['segment_location'] != '/%s/%s' % (
container, obj) and ret['segment_size'] > 0:
results_queue.put(ret)
if ret['complete']:
break
segment += 1
if results[0]['segment_location'] != '/%s/%s' % (
container, obj):
response = self._upload_slo_manifest(
conn, results, container, obj, put_headers)
res['manifest_response_dict'] = response
new_slo_manifest_paths = {
r['segment_location'] for r in results}
res['large_object'] = True
else:
res['response_dict'] = ret
res['large_object'] = False
else: else:
res['large_object'] = False res['large_object'] = False
obr = {} obr = {}
@ -2063,7 +2258,6 @@ class SwiftService(object):
finally: finally:
if fp is not None: if fp is not None:
fp.close() fp.close()
if old_manifest or old_slo_manifest_paths: if old_manifest or old_slo_manifest_paths:
drs = [] drs = []
delobjsmap = {} delobjsmap = {}

View File

@ -947,6 +947,8 @@ Optional arguments:
def st_upload(parser, args, output_manager): def st_upload(parser, args, output_manager):
DEFAULT_STDIN_SEGMENT = 10 * 1024 * 1024
parser.add_argument( parser.add_argument(
'-c', '--changed', action='store_true', dest='changed', '-c', '--changed', action='store_true', dest='changed',
default=False, help='Only upload files that have changed since ' default=False, help='Only upload files that have changed since '
@ -1060,6 +1062,12 @@ def st_upload(parser, args, output_manager):
st_upload_help) st_upload_help)
return return
if from_stdin:
if not options['use_slo']:
options['use_slo'] = True
if not options['segment_size']:
options['segment_size'] = DEFAULT_STDIN_SEGMENT
options['object_uu_threads'] = options['object_threads'] options['object_uu_threads'] = options['object_threads']
with SwiftService(options=options) as swift: with SwiftService(options=options) as swift:
try: try:

View File

@ -36,6 +36,8 @@ from swiftclient.service import (
SwiftService, SwiftError, SwiftUploadObject SwiftService, SwiftError, SwiftUploadObject
) )
from tests.unit import utils as test_utils
clean_os_environ = {} clean_os_environ = {}
environ_prefixes = ('ST_', 'OS_') environ_prefixes = ('ST_', 'OS_')
@ -1088,6 +1090,83 @@ class TestService(unittest.TestCase):
self.assertEqual(upload_obj_resp['path'], obj['path']) self.assertEqual(upload_obj_resp['path'], obj['path'])
self.assertTrue(mock_open.return_value.closed) self.assertTrue(mock_open.return_value.closed)
@mock.patch('swiftclient.service.Connection')
def test_upload_stream(self, mock_conn):
service = SwiftService({})
stream = test_utils.FakeStream(2048)
segment_etag = md5(b'A' * 1024).hexdigest()
mock_conn.return_value.head_object.side_effect = \
ClientException('Not Found', http_status=404)
mock_conn.return_value.put_object.return_value = \
segment_etag
options = {'use_slo': True, 'segment_size': 1024}
resp_iter = service.upload(
'container',
[SwiftUploadObject(stream, object_name='streamed')],
options)
responses = [x for x in resp_iter]
for resp in responses:
self.assertFalse('error' in resp)
self.assertTrue(resp['success'])
self.assertEqual(5, len(responses))
container_resp, segment_container_resp = responses[0:2]
segment_response = responses[2:4]
upload_obj_resp = responses[-1]
self.assertEqual(container_resp['action'],
'create_container')
self.assertEqual(upload_obj_resp['action'],
'upload_object')
self.assertEqual(upload_obj_resp['object'],
'streamed')
self.assertTrue(upload_obj_resp['path'] is None)
self.assertTrue(upload_obj_resp['large_object'])
self.assertIn('manifest_response_dict', upload_obj_resp)
self.assertEqual(upload_obj_resp['manifest_response_dict'], {})
for i, resp in enumerate(segment_response):
self.assertEqual(i, resp['segment_index'])
self.assertEqual(1024, resp['segment_size'])
self.assertEqual('d47b127bc2de2d687ddc82dac354c415',
resp['segment_etag'])
self.assertTrue(resp['segment_location'].endswith(
'/0000000%d' % i))
self.assertTrue(resp['segment_location'].startswith(
'/container_segments/streamed'))
@mock.patch('swiftclient.service.Connection')
def test_upload_stream_fits_in_one_segment(self, mock_conn):
service = SwiftService({})
stream = test_utils.FakeStream(2048)
whole_etag = md5(b'A' * 2048).hexdigest()
mock_conn.return_value.head_object.side_effect = \
ClientException('Not Found', http_status=404)
mock_conn.return_value.put_object.return_value = \
whole_etag
options = {'use_slo': True, 'segment_size': 10240}
resp_iter = service.upload(
'container',
[SwiftUploadObject(stream, object_name='streamed')],
options)
responses = [x for x in resp_iter]
for resp in responses:
self.assertNotIn('error', resp)
self.assertTrue(resp['success'])
self.assertEqual(3, len(responses))
container_resp, segment_container_resp = responses[0:2]
upload_obj_resp = responses[-1]
self.assertEqual(container_resp['action'],
'create_container')
self.assertEqual(upload_obj_resp['action'],
'upload_object')
self.assertEqual(upload_obj_resp['object'],
'streamed')
self.assertTrue(upload_obj_resp['path'] is None)
self.assertFalse(upload_obj_resp['large_object'])
self.assertNotIn('manifest_response_dict', upload_obj_resp)
class TestServiceUpload(_TestServiceBase): class TestServiceUpload(_TestServiceBase):
@ -1226,6 +1305,141 @@ class TestServiceUpload(_TestServiceBase):
self.assertIsInstance(contents, utils.LengthWrapper) self.assertIsInstance(contents, utils.LengthWrapper)
self.assertEqual(len(contents), 10) self.assertEqual(len(contents), 10)
def test_upload_stream_segment(self):
common_params = {
'segment_container': 'segments',
'segment_name': 'test_stream_2',
'container': 'test_stream',
'object': 'stream_object',
}
tests = [
{'test_params': {
'segment_size': 1024,
'segment_index': 2,
'content_size': 1024},
'put_object_args': {
'container': 'segments',
'object': 'test_stream_2'},
'expected': {
'complete': False,
'segment_etag': md5(b'A' * 1024).hexdigest()}},
{'test_params': {
'segment_size': 2048,
'segment_index': 0,
'content_size': 512},
'put_object_args': {
'container': 'test_stream',
'object': 'stream_object'},
'expected': {
'complete': True,
'segment_etag': md5(b'A' * 512).hexdigest()}},
# 0-sized segment should not be uploaded
{'test_params': {
'segment_size': 1024,
'segment_index': 1,
'content_size': 0},
'put_object_args': {},
'expected': {
'complete': True}},
# 0-sized objects should be uploaded
{'test_params': {
'segment_size': 1024,
'segment_index': 0,
'content_size': 0},
'put_object_args': {
'container': 'test_stream',
'object': 'stream_object'},
'expected': {
'complete': True,
'segment_etag': md5(b'').hexdigest()}},
# Test boundary conditions
{'test_params': {
'segment_size': 1024,
'segment_index': 1,
'content_size': 1023},
'put_object_args': {
'container': 'segments',
'object': 'test_stream_2'},
'expected': {
'complete': True,
'segment_etag': md5(b'A' * 1023).hexdigest()}},
{'test_params': {
'segment_size': 2048,
'segment_index': 0,
'content_size': 2047},
'put_object_args': {
'container': 'test_stream',
'object': 'stream_object'},
'expected': {
'complete': True,
'segment_etag': md5(b'A' * 2047).hexdigest()}},
{'test_params': {
'segment_size': 1024,
'segment_index': 2,
'content_size': 1025},
'put_object_args': {
'container': 'segments',
'object': 'test_stream_2'},
'expected': {
'complete': False,
'segment_etag': md5(b'A' * 1024).hexdigest()}},
]
for test_args in tests:
params = test_args['test_params']
stream = test_utils.FakeStream(params['content_size'])
segment_size = params['segment_size']
segment_index = params['segment_index']
def _fake_put_object(*args, **kwargs):
contents = args[2]
# Consume and compute md5
return md5(contents).hexdigest()
mock_conn = mock.Mock()
mock_conn.put_object.side_effect = _fake_put_object
s = SwiftService()
resp = s._upload_stream_segment(
conn=mock_conn,
container=common_params['container'],
object_name=common_params['object'],
segment_container=common_params['segment_container'],
segment_name=common_params['segment_name'],
segment_size=segment_size,
segment_index=segment_index,
headers={},
fd=stream)
expected_args = test_args['expected']
put_args = test_args['put_object_args']
expected_response = {
'segment_size': min(len(stream), segment_size),
'complete': expected_args['complete'],
'success': True,
}
if len(stream) or segment_index == 0:
segment_location = '/%s/%s' % (put_args['container'],
put_args['object'])
expected_response.update(
{'segment_index': segment_index,
'segment_location': segment_location,
'segment_etag': expected_args['segment_etag'],
'for_object': common_params['object']})
mock_conn.put_object.assert_called_once_with(
put_args['container'],
put_args['object'],
mock.ANY,
content_length=min(len(stream), segment_size),
headers={'etag': expected_args['segment_etag']},
response_dict=mock.ANY)
else:
self.assertEqual([], mock_conn.put_object.mock_calls)
expected_response.update(
{'segment_index': None,
'segment_location': None,
'segment_etag': None})
self.assertEqual(expected_response, resp)
def test_etag_mismatch_with_ignore_checksum(self): def test_etag_mismatch_with_ignore_checksum(self):
def _consuming_conn(*a, **kw): def _consuming_conn(*a, **kw):
contents = a[2] contents = a[2]

View File

@ -548,3 +548,24 @@ def _make_fake_import_keystone_client(fake_import):
return fake_import, fake_import return fake_import, fake_import
return _fake_import_keystone_client return _fake_import_keystone_client
class FakeStream(object):
def __init__(self, size):
self.bytes_read = 0
self.size = size
def read(self, size=-1):
if self.bytes_read == self.size:
return b''
if size == -1 or size + self.bytes_read > self.size:
remaining = self.size - self.bytes_read
self.bytes_read = self.size
return b'A' * remaining
self.bytes_read += size
return b'A' * size
def __len__(self):
return self.size