respect bulk delete page size and fix logic error
Previously, using SwiftService to delete "many" objects would use bulk delete if available, but it would not respect the bulk delete page size. If the number of objects to delete exceeded the bulk delete page size, SwiftService would ignore the error and nothing would be deleted. This patch changes _should_bulk_delete() to be _bulk_delete_page_size(); instead of returning a simple True/False, it returns the page size for the bulk deleter, or 1 if objects should be deleted one at a time. Delete SDK calls are then spread across multiple bulk DELETEs if the requested number of objects to delete exceeds the returned page size. Fixed the logic in _should_bulk_delete() so that if the object list is exactly 2x the thread count, it will not bulk delete. This is the natural conclusion following the logic that existed previously: if the delete request can be satisfied by every worker thread doing one or two tasks, don't bulk delete. But if it requires a worker thread to do three or more tasks, do a bulk delete instead. Previously, the logic would mean that if every worker thread did exactly two tasks, it would bulk delete. This patch changes a "<" to a "<=". Closes-Bug: 1679851 Change-Id: I3c18f89bac1170dc62187114ef06dbe721afcc2e
This commit is contained in:
parent
058fb0323f
commit
0cc4d8af18
@ -43,7 +43,8 @@ from swiftclient.command_helpers import (
|
||||
)
|
||||
from swiftclient.utils import (
|
||||
config_true_value, ReadableToIterable, LengthWrapper, EMPTY_ETAG,
|
||||
parse_api_response, report_traceback, n_groups, split_request_headers
|
||||
parse_api_response, report_traceback, n_groups, split_request_headers,
|
||||
n_at_a_time
|
||||
)
|
||||
from swiftclient.exceptions import ClientException
|
||||
from swiftclient.multithreading import MultiThreadingManager
|
||||
@ -2151,11 +2152,15 @@ class SwiftService(object):
|
||||
rq = Queue()
|
||||
obj_dels = {}
|
||||
|
||||
if self._should_bulk_delete(objects):
|
||||
for obj_slice in n_groups(
|
||||
objects, self._options['object_dd_threads']):
|
||||
self._bulk_delete(container, obj_slice, options,
|
||||
obj_dels)
|
||||
bulk_page_size = self._bulk_delete_page_size(objects)
|
||||
if bulk_page_size > 1:
|
||||
page_at_a_time = n_at_a_time(objects, bulk_page_size)
|
||||
for page_slice in page_at_a_time:
|
||||
for obj_slice in n_groups(
|
||||
page_slice,
|
||||
self._options['object_dd_threads']):
|
||||
self._bulk_delete(container, obj_slice, options,
|
||||
obj_dels)
|
||||
else:
|
||||
self._per_item_delete(container, objects, options,
|
||||
obj_dels, rq)
|
||||
@ -2208,23 +2213,36 @@ class SwiftService(object):
|
||||
and not res['success']):
|
||||
cancelled = True
|
||||
|
||||
def _should_bulk_delete(self, objects):
|
||||
if len(objects) < 2 * self._options['object_dd_threads']:
|
||||
def _bulk_delete_page_size(self, objects):
|
||||
'''
|
||||
Given the iterable 'objects', will return how many items should be
|
||||
deleted at a time.
|
||||
|
||||
:param objects: An iterable that supports 'len()'
|
||||
:returns: The bulk delete page size (i.e. the max number of
|
||||
objects that can be bulk deleted at once, as reported by
|
||||
the cluster). If bulk delete is disabled, return 1
|
||||
'''
|
||||
if len(objects) <= 2 * self._options['object_dd_threads']:
|
||||
# Not many objects; may as well delete one-by-one
|
||||
return False
|
||||
return 1
|
||||
|
||||
try:
|
||||
cap_result = self.capabilities()
|
||||
if not cap_result['success']:
|
||||
# This shouldn't actually happen, but just in case we start
|
||||
# being more nuanced about our capabilities result...
|
||||
return False
|
||||
return 1
|
||||
except ClientException:
|
||||
# Old swift, presumably; assume no bulk middleware
|
||||
return False
|
||||
return 1
|
||||
|
||||
swift_info = cap_result['capabilities']
|
||||
return 'bulk_delete' in swift_info
|
||||
if 'bulk_delete' in swift_info:
|
||||
return swift_info['bulk_delete'].get(
|
||||
'max_deletes_per_request', 10000)
|
||||
else:
|
||||
return 1
|
||||
|
||||
def _per_item_delete(self, container, objects, options, rdict, rq):
|
||||
for obj in objects:
|
||||
|
@ -474,6 +474,40 @@ class TestServiceDelete(_TestServiceBase):
|
||||
self.assertLessEqual(r['error_timestamp'], after)
|
||||
self.assertIn('Traceback', r['traceback'])
|
||||
|
||||
@mock.patch.object(swiftclient.service.SwiftService, 'capabilities',
|
||||
lambda *a: {'action': 'capabilities',
|
||||
'timestamp': time.time(),
|
||||
'success': True,
|
||||
'capabilities': {
|
||||
'bulk_delete':
|
||||
{'max_deletes_per_request': 10}}
|
||||
})
|
||||
def test_bulk_delete_page_size(self):
|
||||
# make a list of 100 objects
|
||||
obj_list = ['x%02d' % i for i in range(100)]
|
||||
errors = []
|
||||
|
||||
# _bulk_delete_page_size uses 2x the number of threads to determine
|
||||
# if if there are "many" object to delete or not
|
||||
|
||||
# format is: [(thread_count, expected result), ...]
|
||||
obj_threads_exp = [
|
||||
(10, 10), # something small
|
||||
(49, 10), # just under the bounds
|
||||
(50, 1), # cutover point
|
||||
(51, 1), # just over bounds
|
||||
(100, 1), # something big
|
||||
]
|
||||
for thread_count, exp in obj_threads_exp:
|
||||
s = SwiftService(options={'object_dd_threads': thread_count})
|
||||
res = s._bulk_delete_page_size(obj_list)
|
||||
if res != exp:
|
||||
msg = 'failed for thread_count %d: got %r expected %r' % \
|
||||
(thread_count, res, exp)
|
||||
errors.append(msg)
|
||||
if errors:
|
||||
self.fail('_bulk_delete_page_size() failed\n' + '\n'.join(errors))
|
||||
|
||||
|
||||
class TestSwiftError(unittest.TestCase):
|
||||
|
||||
|
@ -903,8 +903,8 @@ class TestShell(unittest.TestCase):
|
||||
'x-object-meta-mtime': mock.ANY},
|
||||
response_dict={})
|
||||
|
||||
@mock.patch.object(swiftclient.service.SwiftService, '_should_bulk_delete',
|
||||
lambda *a: False)
|
||||
@mock.patch.object(swiftclient.service.SwiftService,
|
||||
'_bulk_delete_page_size', lambda *a: 0)
|
||||
@mock.patch('swiftclient.service.Connection')
|
||||
def test_delete_bad_threads(self, mock_connection):
|
||||
mock_connection.return_value.get_container.return_value = (None, [])
|
||||
@ -934,8 +934,8 @@ class TestShell(unittest.TestCase):
|
||||
check_good(["--object-threads", "1"])
|
||||
check_good(["--container-threads", "1"])
|
||||
|
||||
@mock.patch.object(swiftclient.service.SwiftService, '_should_bulk_delete',
|
||||
lambda *a: False)
|
||||
@mock.patch.object(swiftclient.service.SwiftService,
|
||||
'_bulk_delete_page_size', lambda *a: 1)
|
||||
@mock.patch('swiftclient.service.Connection')
|
||||
def test_delete_account(self, connection):
|
||||
connection.return_value.get_account.side_effect = [
|
||||
@ -971,8 +971,8 @@ class TestShell(unittest.TestCase):
|
||||
mock.call('container2', response_dict={}, headers={}),
|
||||
mock.call('empty_container', response_dict={}, headers={})])
|
||||
|
||||
@mock.patch.object(swiftclient.service.SwiftService, '_should_bulk_delete',
|
||||
lambda *a: True)
|
||||
@mock.patch.object(swiftclient.service.SwiftService,
|
||||
'_bulk_delete_page_size', lambda *a: 10)
|
||||
@mock.patch('swiftclient.service.Connection')
|
||||
def test_delete_bulk_account(self, connection):
|
||||
connection.return_value.get_account.side_effect = [
|
||||
@ -1085,8 +1085,80 @@ class TestShell(unittest.TestCase):
|
||||
self.assertEqual(connection.return_value.get_capabilities.mock_calls,
|
||||
[mock.call(None)]) # only one /info request
|
||||
|
||||
@mock.patch.object(swiftclient.service.SwiftService, '_should_bulk_delete',
|
||||
lambda *a: False)
|
||||
@mock.patch('swiftclient.service.Connection')
|
||||
def test_delete_bulk_account_with_capabilities_and_pages(self, connection):
|
||||
connection.return_value.get_capabilities.return_value = {
|
||||
'bulk_delete': {
|
||||
'max_deletes_per_request': 2,
|
||||
'max_failed_deletes': 1000,
|
||||
},
|
||||
}
|
||||
connection.return_value.get_account.side_effect = [
|
||||
[None, [{'name': 'container'}]],
|
||||
[None, [{'name': 'container2'}]],
|
||||
[None, [{'name': 'empty_container'}]],
|
||||
[None, []],
|
||||
]
|
||||
connection.return_value.get_container.side_effect = [
|
||||
[None, [{'name': 'object'}, {'name': 'obj\xe9ct2'},
|
||||
{'name': 'z_object'}, {'name': 'z_obj\xe9ct2'}]],
|
||||
[None, []],
|
||||
[None, [{'name': 'object'}, {'name': 'obj\xe9ct2'},
|
||||
{'name': 'z_object'}, {'name': 'z_obj\xe9ct2'}]],
|
||||
[None, []],
|
||||
[None, []],
|
||||
]
|
||||
connection.return_value.attempts = 0
|
||||
argv = ["", "delete", "--all", "--object-threads", "1"]
|
||||
connection.return_value.post_account.return_value = {}, (
|
||||
b'{"Number Not Found": 0, "Response Status": "200 OK", '
|
||||
b'"Errors": [], "Number Deleted": 1, "Response Body": ""}')
|
||||
swiftclient.shell.main(argv)
|
||||
# check that each bulk call was only called with 2 objects
|
||||
self.assertEqual(
|
||||
connection.return_value.post_account.mock_calls, [
|
||||
mock.call(query_string='bulk-delete',
|
||||
data=b''.join([
|
||||
b'/container/object\n',
|
||||
b'/container/obj%C3%A9ct2\n',
|
||||
]),
|
||||
headers={'Content-Type': 'text/plain',
|
||||
'Accept': 'application/json'},
|
||||
response_dict={}),
|
||||
mock.call(query_string='bulk-delete',
|
||||
data=b''.join([
|
||||
b'/container/z_object\n',
|
||||
b'/container/z_obj%C3%A9ct2\n'
|
||||
]),
|
||||
headers={'Content-Type': 'text/plain',
|
||||
'Accept': 'application/json'},
|
||||
response_dict={}),
|
||||
mock.call(query_string='bulk-delete',
|
||||
data=b''.join([
|
||||
b'/container2/object\n',
|
||||
b'/container2/obj%C3%A9ct2\n',
|
||||
]),
|
||||
headers={'Content-Type': 'text/plain',
|
||||
'Accept': 'application/json'},
|
||||
response_dict={}),
|
||||
mock.call(query_string='bulk-delete',
|
||||
data=b''.join([
|
||||
b'/container2/z_object\n',
|
||||
b'/container2/z_obj%C3%A9ct2\n'
|
||||
]),
|
||||
headers={'Content-Type': 'text/plain',
|
||||
'Accept': 'application/json'},
|
||||
response_dict={})])
|
||||
self.assertEqual(
|
||||
connection.return_value.delete_container.mock_calls, [
|
||||
mock.call('container', response_dict={}, headers={}),
|
||||
mock.call('container2', response_dict={}, headers={}),
|
||||
mock.call('empty_container', response_dict={}, headers={})])
|
||||
self.assertEqual(connection.return_value.get_capabilities.mock_calls,
|
||||
[mock.call(None)]) # only one /info request
|
||||
|
||||
@mock.patch.object(swiftclient.service.SwiftService,
|
||||
'_bulk_delete_page_size', lambda *a: 1)
|
||||
@mock.patch('swiftclient.service.Connection')
|
||||
def test_delete_container(self, connection):
|
||||
connection.return_value.get_container.side_effect = [
|
||||
@ -1103,8 +1175,8 @@ class TestShell(unittest.TestCase):
|
||||
'container', 'object', query_string=None, response_dict={},
|
||||
headers={})
|
||||
|
||||
@mock.patch.object(swiftclient.service.SwiftService, '_should_bulk_delete',
|
||||
lambda *a: False)
|
||||
@mock.patch.object(swiftclient.service.SwiftService,
|
||||
'_bulk_delete_page_size', lambda *a: 1)
|
||||
@mock.patch('swiftclient.service.Connection')
|
||||
def test_delete_container_headers(self, connection):
|
||||
connection.return_value.get_container.side_effect = [
|
||||
@ -1122,8 +1194,8 @@ class TestShell(unittest.TestCase):
|
||||
'container', 'object', query_string=None, response_dict={},
|
||||
headers={'Skip-Middleware': 'Test'})
|
||||
|
||||
@mock.patch.object(swiftclient.service.SwiftService, '_should_bulk_delete',
|
||||
lambda *a: True)
|
||||
@mock.patch.object(swiftclient.service.SwiftService,
|
||||
'_bulk_delete_page_size', lambda *a: 10)
|
||||
@mock.patch('swiftclient.service.Connection')
|
||||
def test_delete_bulk_container(self, connection):
|
||||
connection.return_value.get_container.side_effect = [
|
||||
@ -1176,8 +1248,8 @@ class TestShell(unittest.TestCase):
|
||||
self.assertTrue(out.out.find(
|
||||
't\u00e9st_c [after 2 attempts]') >= 0, out)
|
||||
|
||||
@mock.patch.object(swiftclient.service.SwiftService, '_should_bulk_delete',
|
||||
lambda *a: False)
|
||||
@mock.patch.object(swiftclient.service.SwiftService,
|
||||
'_bulk_delete_page_size', lambda *a: 1)
|
||||
@mock.patch('swiftclient.service.Connection')
|
||||
def test_delete_per_object(self, connection):
|
||||
argv = ["", "delete", "container", "object"]
|
||||
@ -1188,8 +1260,8 @@ class TestShell(unittest.TestCase):
|
||||
'container', 'object', query_string=None, response_dict={},
|
||||
headers={})
|
||||
|
||||
@mock.patch.object(swiftclient.service.SwiftService, '_should_bulk_delete',
|
||||
lambda *a: True)
|
||||
@mock.patch.object(swiftclient.service.SwiftService,
|
||||
'_bulk_delete_page_size', lambda *a: 10)
|
||||
@mock.patch('swiftclient.service.Connection')
|
||||
def test_delete_bulk_object(self, connection):
|
||||
argv = ["", "delete", "container", "object"]
|
||||
@ -2714,8 +2786,8 @@ class TestCrossAccountObjectAccess(TestBase, MockHttpTest):
|
||||
return status
|
||||
return on_request
|
||||
|
||||
@mock.patch.object(swiftclient.service.SwiftService, '_should_bulk_delete',
|
||||
lambda *a: False)
|
||||
@mock.patch.object(swiftclient.service.SwiftService,
|
||||
'_bulk_delete_page_size', lambda *a: 1)
|
||||
@mock.patch('swiftclient.service.Connection')
|
||||
def test_upload_bad_threads(self, mock_connection):
|
||||
mock_connection.return_value.put_object.return_value = EMPTY_ETAG
|
||||
@ -2897,8 +2969,8 @@ class TestCrossAccountObjectAccess(TestBase, MockHttpTest):
|
||||
self.assertIn(expected_err, out.err)
|
||||
self.assertEqual('', out)
|
||||
|
||||
@mock.patch.object(swiftclient.service.SwiftService, '_should_bulk_delete',
|
||||
lambda *a: False)
|
||||
@mock.patch.object(swiftclient.service.SwiftService,
|
||||
'_bulk_delete_page_size', lambda *a: 1)
|
||||
@mock.patch('swiftclient.service.Connection')
|
||||
def test_download_bad_threads(self, mock_connection):
|
||||
mock_connection.return_value.get_object.return_value = [{}, '']
|
||||
|
Loading…
Reference in New Issue
Block a user