Merge "Use SLO by default for segmented uploads if the cluster supports it"
This commit is contained in:
commit
31c279ff0e
@ -196,7 +196,7 @@ _default_global_options = _build_default_global_options()
|
||||
_default_local_options = {
|
||||
'sync_to': None,
|
||||
'sync_key': None,
|
||||
'use_slo': False,
|
||||
'use_slo': None,
|
||||
'segment_size': None,
|
||||
'segment_container': None,
|
||||
'leave_segments': False,
|
||||
@ -1467,7 +1467,7 @@ class SwiftService:
|
||||
'meta': [],
|
||||
'header': [],
|
||||
'segment_size': None,
|
||||
'use_slo': False,
|
||||
'use_slo': True,
|
||||
'segment_container': None,
|
||||
'leave_segments': False,
|
||||
'changed': None,
|
||||
@ -1493,6 +1493,18 @@ class SwiftService:
|
||||
except ValueError:
|
||||
raise SwiftError('Segment size should be an integer value')
|
||||
|
||||
if segment_size and options['use_slo'] is None:
|
||||
try:
|
||||
cap_result = self.capabilities()
|
||||
except ClientException:
|
||||
# pre-info swift, maybe? assume no slo middleware
|
||||
options['use_slo'] = False
|
||||
else:
|
||||
if not cap_result['success']:
|
||||
options['use_slo'] = False
|
||||
else:
|
||||
options['use_slo'] = 'slo' in cap_result['capabilities']
|
||||
|
||||
# Incase we have a psudeo-folder path for <container> arg, derive
|
||||
# the container name from the top path and prepend the rest to
|
||||
# the object name. (same as passing --object-name).
|
||||
|
@ -978,8 +978,8 @@ def st_copy(parser, args, output_manager, return_parser=False):
|
||||
st_upload_options = '''[--changed] [--skip-identical] [--segment-size <size>]
|
||||
[--segment-container <container>] [--leave-segments]
|
||||
[--object-threads <thread>] [--segment-threads <threads>]
|
||||
[--meta <name:value>] [--header <header>]
|
||||
[--use-slo] [--ignore-checksum] [--skip-container-put]
|
||||
[--meta <name:value>] [--header <header>] [--use-slo]
|
||||
[--use-dlo] [--ignore-checksum] [--skip-container-put]
|
||||
[--object-name <object-name>]
|
||||
<container> <file_or_directory> [<file_or_directory>] [...]
|
||||
'''
|
||||
@ -1024,8 +1024,11 @@ Optional arguments:
|
||||
repeated. Example: -H "content-type:text/plain"
|
||||
-H "Content-Length: 4000".
|
||||
--use-slo When used in conjunction with --segment-size it will
|
||||
create a Static Large Object instead of the default
|
||||
Dynamic Large Object.
|
||||
create a Static Large Object. Deprecated; this is now
|
||||
the default behavior when the cluster supports it.
|
||||
--use-dlo When used in conjunction with --segment-size it will
|
||||
create a Dynamic Large Object. May be useful with old
|
||||
swift clusters.
|
||||
--ignore-checksum Turn off checksum validation for uploads.
|
||||
--skip-container-put Assume all necessary containers already exist; don't
|
||||
automatically try to create them.
|
||||
@ -1087,10 +1090,13 @@ def st_upload(parser, args, output_manager, return_parser=False):
|
||||
' This option may be repeated. Example: -H "content-type:text/plain" '
|
||||
'-H "Content-Length: 4000"')
|
||||
parser.add_argument(
|
||||
'--use-slo', action='store_true', default=False,
|
||||
'--use-slo', action='store_true', default=None,
|
||||
help='When used in conjunction with --segment-size, it will '
|
||||
'create a Static Large Object instead of the default '
|
||||
'Dynamic Large Object.')
|
||||
'create a Static Large Object.')
|
||||
parser.add_argument(
|
||||
'--use-dlo', action='store_false', dest="use_slo", default=None,
|
||||
help='When used in conjunction with --segment-size, it will '
|
||||
'create a Dynamic Large Object.')
|
||||
parser.add_argument(
|
||||
'--object-name', dest='object_name',
|
||||
help='Upload file and name object to <object-name> or upload dir and '
|
||||
|
@ -1410,7 +1410,7 @@ class TestServiceUpload(_TestServiceBase):
|
||||
self.assertTrue(fp.closed,
|
||||
'Failed to close open(%s)' % formatted_args)
|
||||
|
||||
def test_upload_object_job_file_with_unicode_path(self):
|
||||
def test_upload_dlo_job_file_with_unicode_path(self):
|
||||
# Uploading a file results in the file object being wrapped in a
|
||||
# LengthWrapper. This test sets the options in such a way that much
|
||||
# of _upload_object_job is skipped bringing the critical path down
|
||||
@ -1471,6 +1471,84 @@ class TestServiceUpload(_TestServiceBase):
|
||||
headers={},
|
||||
response_dict={})
|
||||
|
||||
def test_upload_slo_job_file_with_unicode_path(self):
|
||||
# Uploading a file results in the file object being wrapped in a
|
||||
# LengthWrapper. This test sets the options in such a way that much
|
||||
# of _upload_object_job is skipped bringing the critical path down
|
||||
# to around 60 lines to ease testing.
|
||||
with tempfile.NamedTemporaryFile() as f:
|
||||
f.write(b'a' * 30)
|
||||
f.flush()
|
||||
expected_r = {
|
||||
'action': 'upload_object',
|
||||
'attempts': 2,
|
||||
'container': 'test_c',
|
||||
'headers': {},
|
||||
'large_object': True,
|
||||
'object': 'テスト/dummy.dat',
|
||||
'manifest_response_dict': {},
|
||||
'segment_results': [
|
||||
{'action': 'upload_segment',
|
||||
'success': True,
|
||||
'segment_index': i,
|
||||
'segment_location': 'test_c+segments/slo/%d' % i,
|
||||
'segment_etag': 'part-etag',
|
||||
'segment_size': 1000 + i}
|
||||
for i in range(3)],
|
||||
'status': 'uploaded',
|
||||
'success': True,
|
||||
}
|
||||
expected_mtime = '%f' % os.path.getmtime(f.name)
|
||||
|
||||
mock_conn = mock.Mock()
|
||||
mock_conn.put_object.return_value = ''
|
||||
type(mock_conn).attempts = mock.PropertyMock(return_value=2)
|
||||
|
||||
s = SwiftService()
|
||||
with mock.patch.object(s, '_upload_segment_job') as mock_job:
|
||||
mock_job.side_effect = [
|
||||
{'action': 'upload_segment',
|
||||
'success': True,
|
||||
'segment_index': i,
|
||||
'segment_location': 'test_c+segments/slo/%d' % i,
|
||||
'segment_etag': 'part-etag',
|
||||
'segment_size': 1000 + i}
|
||||
for i in range(3)]
|
||||
|
||||
r = s._upload_object_job(conn=mock_conn,
|
||||
container='test_c',
|
||||
source=f.name,
|
||||
obj='テスト/dummy.dat',
|
||||
options=dict(s._options,
|
||||
segment_size=10,
|
||||
leave_segments=True,
|
||||
use_slo=True))
|
||||
|
||||
mtime = r['headers']['x-object-meta-mtime']
|
||||
self.assertEqual(expected_mtime, mtime)
|
||||
del r['headers']['x-object-meta-mtime']
|
||||
|
||||
self.assertEqual(r['path'], f.name)
|
||||
del r['path']
|
||||
|
||||
self.assertEqual(r, expected_r)
|
||||
self.assertEqual(mock_conn.put_object.call_count, 1)
|
||||
mock_conn.put_object.assert_called_with(
|
||||
'test_c',
|
||||
'テスト/dummy.dat',
|
||||
' '.join([
|
||||
'[{"path": "test_c+segments/slo/0",',
|
||||
'"etag": "part-etag", "size_bytes": 1000},',
|
||||
'{"path": "test_c+segments/slo/1",',
|
||||
'"etag": "part-etag", "size_bytes": 1001},',
|
||||
'{"path": "test_c+segments/slo/2",',
|
||||
'"etag": "part-etag", "size_bytes": 1002}]',
|
||||
]),
|
||||
query_string='multipart-manifest=put',
|
||||
headers={},
|
||||
response_dict={},
|
||||
)
|
||||
|
||||
def test_upload_segment_job(self):
|
||||
with tempfile.NamedTemporaryFile() as f:
|
||||
f.write(b'a' * 10)
|
||||
|
@ -861,7 +861,8 @@ class TestShell(unittest.TestCase):
|
||||
# Upload in segments
|
||||
connection.return_value.head_container.return_value = {
|
||||
'x-storage-policy': 'one'}
|
||||
argv = ["", "upload", "container", self.tmpfile, "-S", "10"]
|
||||
argv = ["", "upload", "container", self.tmpfile,
|
||||
"-S", "10", "--use-dlo"]
|
||||
with open(self.tmpfile, "wb") as fh:
|
||||
fh.write(b'12345678901234567890')
|
||||
swiftclient.shell.main(argv)
|
||||
@ -907,6 +908,62 @@ class TestShell(unittest.TestCase):
|
||||
query_string='multipart-manifest=put',
|
||||
response_dict=mock.ANY)
|
||||
|
||||
# detect no SLO
|
||||
connection.reset_mock()
|
||||
connection.return_value.head_container.return_value = {
|
||||
'x-storage-policy': 'one'}
|
||||
argv = ["", "upload", "container/pseudo-folder/nested",
|
||||
self.tmpfile, "-S", "10"]
|
||||
with open(self.tmpfile, "wb") as fh:
|
||||
fh.write(b'12345678901234567890')
|
||||
connection.return_value.get_capabilities.return_value = {}
|
||||
swiftclient.shell.main(argv)
|
||||
expected_calls = [mock.call('container',
|
||||
{},
|
||||
response_dict={}),
|
||||
mock.call('container_segments',
|
||||
{'X-Storage-Policy': 'one'},
|
||||
response_dict={})]
|
||||
connection.return_value.put_container.assert_has_calls(expected_calls)
|
||||
connection.return_value.put_object.assert_called_with(
|
||||
'container',
|
||||
'pseudo-folder/nested' + self.tmpfile,
|
||||
'',
|
||||
content_length=0,
|
||||
headers={'x-object-manifest': mock.ANY,
|
||||
'x-object-meta-mtime': mock.ANY},
|
||||
response_dict={})
|
||||
|
||||
# detect SLO
|
||||
connection.reset_mock()
|
||||
connection.return_value.head_container.return_value = {
|
||||
'x-storage-policy': 'one'}
|
||||
argv = ["", "upload", "container/pseudo-folder/nested",
|
||||
self.tmpfile, "-S", "10"]
|
||||
with open(self.tmpfile, "wb") as fh:
|
||||
fh.write(b'12345678901234567890')
|
||||
connection.return_value.get_capabilities.return_value = {
|
||||
'slo': {},
|
||||
}
|
||||
swiftclient.shell.main(argv)
|
||||
expected_calls = [mock.call('container',
|
||||
{},
|
||||
response_dict={}),
|
||||
mock.call('container_segments',
|
||||
{'X-Storage-Policy': 'one'},
|
||||
response_dict={})]
|
||||
print(connection.return_value.mock_calls)
|
||||
connection.return_value.put_container.assert_has_calls(expected_calls)
|
||||
connection.return_value.put_object.assert_called_with(
|
||||
'container',
|
||||
'pseudo-folder/nested' + self.tmpfile,
|
||||
mock.ANY,
|
||||
headers={
|
||||
'x-object-meta-mtime': mock.ANY,
|
||||
},
|
||||
query_string='multipart-manifest=put',
|
||||
response_dict=mock.ANY)
|
||||
|
||||
@mock.patch('swiftclient.shell.walk')
|
||||
@mock.patch('swiftclient.service.Connection')
|
||||
def test_upload_skip_container_put(self, connection, walk):
|
||||
@ -934,7 +991,7 @@ class TestShell(unittest.TestCase):
|
||||
connection.return_value.head_container.return_value = {
|
||||
'x-storage-policy': 'one'}
|
||||
argv = ["", "upload", "container", "--skip-container-put",
|
||||
self.tmpfile, "-S", "10"]
|
||||
self.tmpfile, "-S", "10", "--use-dlo"]
|
||||
with open(self.tmpfile, "wb") as fh:
|
||||
fh.write(b'12345678901234567890')
|
||||
swiftclient.shell.main(argv)
|
||||
@ -1187,7 +1244,7 @@ class TestShell(unittest.TestCase):
|
||||
connection.return_value.attempts = 0
|
||||
connection.return_value.put_object.return_value = EMPTY_ETAG
|
||||
argv = ["", "upload", "container", self.tmpfile, "-S", "10",
|
||||
"-C", "container"]
|
||||
"-C", "container", "--use-dlo"]
|
||||
with open(self.tmpfile, "wb") as fh:
|
||||
fh.write(b'12345678901234567890')
|
||||
swiftclient.shell.main(argv)
|
||||
@ -3581,6 +3638,7 @@ class TestCrossAccountObjectAccess(TestBase, MockHttpTest):
|
||||
|
||||
args, env = self._make_cmd('upload',
|
||||
cmd_args=[self.cont, self.obj,
|
||||
'--use-dlo',
|
||||
'--leave-segments',
|
||||
'--segment-size=10',
|
||||
'--segment-container=%s'
|
||||
@ -3621,6 +3679,7 @@ class TestCrossAccountObjectAccess(TestBase, MockHttpTest):
|
||||
|
||||
args, env = self._make_cmd('upload',
|
||||
cmd_args=[self.cont, self.obj,
|
||||
'--use-dlo',
|
||||
'--leave-segments',
|
||||
'--segment-size=10'])
|
||||
with mock.patch('swiftclient.client.ksclient_v3', self.fake_ks):
|
||||
|
Loading…
x
Reference in New Issue
Block a user