Merge "Reduce memory usage for download/delete and add --no-shuffle option to st_download"

This commit is contained in:
Jenkins 2015-08-28 00:48:52 +00:00 committed by Gerrit Code Review
commit e52df5d8a5
4 changed files with 595 additions and 100 deletions

@ -176,7 +176,8 @@ _default_local_options = {
'fail_fast': False,
'human': False,
'dir_marker': False,
'checksum': True
'checksum': True,
'shuffle': False
POLICY = 'X-Storage-Policy'
@ -752,7 +753,7 @@ class SwiftService(object):
options = self._options
rq = Queue()
rq = Queue(maxsize=10) # Just stop list running away consuming memory
if container is None:
listing_future = self.thread_manager.container_pool.submit(
@ -895,6 +896,7 @@ class SwiftService(object):
'out_directory': None,
'out_file': None,
'remove_prefix': False,
'shuffle' : False
:returns: A generator for returning the results of the download
@ -916,40 +918,21 @@ class SwiftService(object):
options_copy = deepcopy(options)
options_copy["long"] = False
containers = []
for part in self.list(options=options_copy):
if part["success"]:
i['name'] for i in part["listing"]
containers = [i['name'] for i in part["listing"]]
if options['shuffle']:
for con in containers:
for res in self._download_container(
con, options_copy):
yield res
raise part["error"]
o_downs = []
for con in containers:
objs = []
for part in self.list(
container=con, options=options_copy):
if part["success"]:
i['name'] for i in part["listing"]
raise part["error"]
self._download_object_job, con, obj,
) for obj in objs
for o_down in interruptable_as_completed(o_downs):
yield o_down.result()
# If we see a 404 here, the listing of the account failed
except ClientException as err:
if err.http_status != 404:
@ -1153,14 +1136,17 @@ class SwiftService(object):
return res
def _download_container(self, container, options):
def _submit_page_downloads(self, container, page_generator, options):
objects = []
for part in self.list(container=container, options=options):
if part["success"]:
objects.extend([o["name"] for o in part["listing"]])
raise part["error"]
list_page = next(page_generator)
except StopIteration:
return None
if list_page["success"]:
objects = [o["name"] for o in list_page["listing"]]
if options["shuffle"]:
o_downs = [
@ -1168,14 +1154,60 @@ class SwiftService(object):
) for obj in objects
for o_down in interruptable_as_completed(o_downs):
yield o_down.result()
return o_downs
raise list_page["error"]
def _download_container(self, container, options):
_page_generator = self.list(container=container, options=options)
next_page_downs = self._submit_page_downloads(
container, _page_generator, options
except ClientException as err:
if err.http_status != 404:
raise SwiftError('Container %r not found' % container,
raise SwiftError(
'Container %r not found' % container, container=container
error = None
while next_page_downs:
page_downs = next_page_downs
next_page_downs = None
# Start downloading the next page of list results when
# we have completed 80% of the previous page
next_page_triggered = False
next_page_trigger_point = 0.8 * len(page_downs)
page_results_yielded = 0
for o_down in interruptable_as_completed(page_downs):
yield o_down.result()
# Do we need to start the next set of downloads yet?
if not next_page_triggered:
page_results_yielded += 1
if page_results_yielded >= next_page_trigger_point:
next_page_downs = self._submit_page_downloads(
container, _page_generator, options
except ClientException as err:
# Allow the current page to finish downloading
error = err
except Exception:
# Something unexpected went wrong - cancel
# remaining downloads
for _d in page_downs:
# Stop counting and testing
next_page_triggered = True
if error:
raise error
# Upload related methods
@ -2080,17 +2112,18 @@ class SwiftService(object):
def _delete_container(self, container, options):
objs = []
for part in self.list(container=container):
if part["success"]:
objs.extend([o['name'] for o in part['listing']])
objs = [o['name'] for o in part['listing']]
o_dels = self.delete(
container=container, objects=objs, options=options
for res in o_dels:
yield res
raise part["error"]
for res in self.delete(
container=container, objects=objs, options=options):
yield res
con_del = self.thread_manager.container_pool.submit(
self._delete_empty_container, container

@ -200,6 +200,14 @@ Optional arguments:
Example --header "content-type:text/plain"
--skip-identical Skip downloading files that are identical on both
--no-shuffle By default, when downloading a complete account or
container, download order is randomised in order to
to reduce the load on individual drives when multiple
clients are executed simultaneously to download the
same set of objects (e.g. a nightly automated download
script to multiple servers). Enable this option to
submit download jobs to the thread pool in the order
they are listed in the object store.
@ -249,6 +257,14 @@ def st_download(parser, args, output_manager):
'--skip-identical', action='store_true', dest='skip_identical',
default=False, help='Skip downloading files that are identical on '
'both sides.')
'--no-shuffle', action='store_false', dest='shuffle',
default=True, help='By default, download order is randomised in order '
'to reduce the load on individual drives when multiple clients are '
'executed simultaneously to download the same set of objects (e.g. a '
'nightly automated download script to multiple servers). Enable this '
'option to submit download jobs to the thread pool in the order they '
'are listed in the object store.')
(options, args) = parse_args(parser, args)
args = args[1:]
if options.out_file == '-':
@ -355,6 +371,8 @@ def st_download(parser, args, output_manager):
except SwiftError as e:
except Exception as e:
st_list_options = '''[--long] [--lh] [--totals] [--prefix <prefix>]

@ -14,23 +14,25 @@
# limitations under the License.
import mock
import os
import six
import tempfile
import testtools
import time
from concurrent.futures import Future
from hashlib import md5
from mock import Mock, PropertyMock
from six.moves.queue import Queue, Empty as QueueEmptyError
from six import BytesIO
from time import sleep
import swiftclient
import swiftclient.utils as utils
from swiftclient.client import Connection, ClientException
from swiftclient.service import SwiftService, SwiftError,\
import six
if six.PY2:
import __builtin__ as builtins
import builtins
from swiftclient.service import (
SwiftService, SwiftError, SwiftUploadObject
clean_os_environ = {}
environ_prefixes = ('ST_', 'OS_')
@ -39,6 +41,12 @@ for key in os.environ:
clean_os_environ[key] = ''
if six.PY2:
import __builtin__ as builtins
import builtins
class TestSwiftPostObject(testtools.TestCase):
def setUp(self):
@ -142,25 +150,24 @@ class TestSwiftReader(testtools.TestCase):
class TestServiceDelete(testtools.TestCase):
def setUp(self):
super(TestServiceDelete, self).setUp()
self.opts = {'leave_segments': False, 'yes_all': False}
self.exc = Exception('test_exc')
# Base response to be copied and updated to matched the expected
# response for each test
self.expected = {
'action': None, # Should be string in the form delete_XX
'container': 'test_c',
'object': 'test_o',
'attempts': 2,
'response_dict': {},
'success': None # Should be a bool
class _TestServiceBase(testtools.TestCase):
def _assertDictEqual(self, a, b, m=None):
# assertDictEqual is not available in py2.6 so use a shallow check
# instead
if hasattr(self, 'assertDictEqual'):
self.assertDictEqual(a, b, m)
self.assertTrue(isinstance(a, dict))
self.assertTrue(isinstance(b, dict))
self.assertEqual(len(a), len(b), m)
for k, v in a.items():
self.assertTrue(k in b, m)
self.assertEqual(b[k], v, m)
def _get_mock_connection(self, attempts=2):
m = Mock(spec=Connection)
type(m).attempts = PropertyMock(return_value=attempts)
type(m).auth_end_time = PropertyMock(return_value=4)
return m
def _get_queue(self, q):
@ -178,18 +185,22 @@ class TestServiceDelete(testtools.TestCase):
return expected
def _assertDictEqual(self, a, b, m=None):
# assertDictEqual is not available in py2.6 so use a shallow check
# instead
if hasattr(self, 'assertDictEqual'):
self.assertDictEqual(a, b, m)
self.assertTrue(isinstance(a, dict))
self.assertTrue(isinstance(b, dict))
self.assertEqual(len(a), len(b), m)
for k, v in a.items():
self.assertTrue(k in b, m)
self.assertEqual(b[k], v, m)
class TestServiceDelete(_TestServiceBase):
def setUp(self):
super(TestServiceDelete, self).setUp()
self.opts = {'leave_segments': False, 'yes_all': False}
self.exc = Exception('test_exc')
# Base response to be copied and updated to matched the expected
# response for each test
self.expected = {
'action': None, # Should be string in the form delete_XX
'container': 'test_c',
'object': 'test_o',
'attempts': 2,
'response_dict': {},
'success': None # Should be a bool
def test_delete_segment(self):
mock_q = Queue()
@ -542,6 +553,226 @@ class TestSwiftUploadObject(testtools.TestCase):
self.assertRaises(SwiftError, self.suo, [])
class TestServiceList(_TestServiceBase):
def setUp(self):
super(TestServiceList, self).setUp()
self.opts = {'prefix': None, 'long': False, 'delimiter': ''}
self.exc = Exception('test_exc')
# Base response to be copied and updated to matched the expected
# response for each test
self.expected = {
'action': None, # Should be list_X_part (account or container)
'container': None, # Should be a string when listing a container
'prefix': None,
'success': None # Should be a bool
def test_list_account(self):
mock_q = Queue()
mock_conn = self._get_mock_connection()
get_account_returns = [
(None, [{'name': 'test_c'}]),
(None, [])
mock_conn.get_account = Mock(side_effect=get_account_returns)
expected_r = self._get_expected({
'action': 'list_account_part',
'success': True,
'listing': [{'name': 'test_c'}],
'marker': ''
mock_conn, self.opts, mock_q
self._assertDictEqual(expected_r, self._get_queue(mock_q))
long_opts = dict(self.opts, **{'long': True})
mock_conn.head_container = Mock(return_value={'test_m': '1'})
get_account_returns = [
(None, [{'name': 'test_c'}]),
(None, [])
mock_conn.get_account = Mock(side_effect=get_account_returns)
expected_r_long = self._get_expected({
'action': 'list_account_part',
'success': True,
'listing': [{'name': 'test_c', 'meta': {'test_m': '1'}}],
'marker': '',
mock_conn, long_opts, mock_q
self._assertDictEqual(expected_r_long, self._get_queue(mock_q))
def test_list_account_exception(self):
mock_q = Queue()
mock_conn = self._get_mock_connection()
mock_conn.get_account = Mock(side_effect=self.exc)
expected_r = self._get_expected({
'action': 'list_account_part',
'success': False,
'error': self.exc,
'marker': ''
mock_conn, self.opts, mock_q)
marker='', prefix=None
self._assertDictEqual(expected_r, self._get_queue(mock_q))
def test_list_container(self):
mock_q = Queue()
mock_conn = self._get_mock_connection()
get_container_returns = [
(None, [{'name': 'test_o'}]),
(None, [])
mock_conn.get_container = Mock(side_effect=get_container_returns)
expected_r = self._get_expected({
'action': 'list_container_part',
'container': 'test_c',
'success': True,
'listing': [{'name': 'test_o'}],
'marker': ''
mock_conn, 'test_c', self.opts, mock_q
self._assertDictEqual(expected_r, self._get_queue(mock_q))
long_opts = dict(self.opts, **{'long': True})
mock_conn.head_container = Mock(return_value={'test_m': '1'})
get_container_returns = [
(None, [{'name': 'test_o'}]),
(None, [])
mock_conn.get_container = Mock(side_effect=get_container_returns)
expected_r_long = self._get_expected({
'action': 'list_container_part',
'container': 'test_c',
'success': True,
'listing': [{'name': 'test_o'}],
'marker': ''
mock_conn, 'test_c', long_opts, mock_q
self._assertDictEqual(expected_r_long, self._get_queue(mock_q))
def test_list_container_exception(self):
mock_q = Queue()
mock_conn = self._get_mock_connection()
mock_conn.get_container = Mock(side_effect=self.exc)
expected_r = self._get_expected({
'action': 'list_container_part',
'container': 'test_c',
'success': False,
'error': self.exc,
'marker': ''
mock_conn, 'test_c', self.opts, mock_q
'test_c', marker='', delimiter='', prefix=None
self._assertDictEqual(expected_r, self._get_queue(mock_q))
def test_list_queue_size(self, mock_get_conn):
mock_conn = self._get_mock_connection()
# Return more results than should fit in the results queue
get_account_returns = [
(None, [{'name': 'container1'}]),
(None, [{'name': 'container2'}]),
(None, [{'name': 'container3'}]),
(None, [{'name': 'container4'}]),
(None, [{'name': 'container5'}]),
(None, [{'name': 'container6'}]),
(None, [{'name': 'container7'}]),
(None, [{'name': 'container8'}]),
(None, [{'name': 'container9'}]),
(None, [{'name': 'container10'}]),
(None, [{'name': 'container11'}]),
(None, [{'name': 'container12'}]),
(None, [{'name': 'container13'}]),
(None, [{'name': 'container14'}]),
(None, [])
mock_conn.get_account = Mock(side_effect=get_account_returns)
mock_get_conn.return_value = mock_conn
s = SwiftService(options=self.opts)
lg = s.list()
# Start the generator
first_list_part = next(lg)
# Wait for the number of calls to get_account to reach our expected
# value, then let it run some more to make sure the value remains
# stable
count = mock_conn.get_account.call_count
stable = 0
while mock_conn.get_account.call_count != count or stable < 5:
if mock_conn.get_account.call_count == count:
stable += 1
count = mock_conn.get_account.call_count
stable = 0
# The test requires a small sleep to allow other threads to
# execute - in this mocked environment we assume that if the call
# count to get_account has not changed in 0.25s then no more calls
# will be made.
stable_get_account_call_count = mock_conn.get_account.call_count
# Collect all remaining results from the generator
list_results = [first_list_part] + list(lg)
# Make sure the stable call count is correct - this should be 12 calls
# to get_account;
# 1 for first_list_part
# 10 for the values on the queue
# 1 for the value blocking whilst trying to place onto the queue
self.assertEqual(12, stable_get_account_call_count)
# Make sure all the containers were listed and placed onto the queue
self.assertEqual(15, mock_conn.get_account.call_count)
# Check the results were all returned
observed_listing = []
for lir in list_results:
[li['name'] for li in lir['listing']]
expected_listing = []
for gar in get_account_returns[:-1]: # The empty list is not returned
[li['name'] for li in gar[1]]
self.assertEqual(observed_listing, expected_listing)
class TestService(testtools.TestCase):
def test_upload_with_bad_segment_size(self):
@ -589,23 +820,7 @@ class TestService(testtools.TestCase):
self.assertEqual(upload_obj_resp['path'], obj['path'])
class TestServiceUpload(testtools.TestCase):
def _assertDictEqual(self, a, b, m=None):
# assertDictEqual is not available in py2.6 so use a shallow check
# instead
if not m:
m = '{0} != {1}'.format(a, b)
if hasattr(self, 'assertDictEqual'):
self.assertDictEqual(a, b, m)
self.assertIsInstance(a, dict, m)
self.assertIsInstance(b, dict, m)
self.assertEqual(len(a), len(b), m)
for k, v in a.items():
self.assertIn(k, b, m)
self.assertEqual(b[k], v, m)
class TestServiceUpload(_TestServiceBase):
def test_upload_segment_job(self):
with tempfile.NamedTemporaryFile() as f:
@ -1027,7 +1242,7 @@ class TestServiceUpload(testtools.TestCase):
class TestServiceDownload(testtools.TestCase):
class TestServiceDownload(_TestServiceBase):
def setUp(self):
super(TestServiceDownload, self).setUp()
@ -1036,6 +1251,19 @@ class TestServiceDownload(testtools.TestCase):
self.obj_content = b'c' * 10
self.obj_etag = md5(self.obj_content).hexdigest()
self.obj_len = len(self.obj_content)
self.exc = Exception('test_exc')
# Base response to be copied and updated to matched the expected
# response for each test
self.expected = {
'action': 'download_object', # Should always be download_object
'container': 'test_c',
'object': 'test_o',
'attempts': 2,
'response_dict': {},
'path': 'test_o',
'pseudodir': False,
'success': None # Should be a bool
def _readbody(self):
yield self.obj_content
@ -1056,6 +1284,166 @@ class TestServiceDownload(testtools.TestCase):
self.assertIn(k, b, m)
self.assertEqual(b[k], v, m)
def test_download_container_job(self, as_comp, sub_page, service_list):
Check that paged downloads work correctly
as_comp.side_effect = [
sub_page.side_effect = [
range(0, 10), range(0, 10), [] # simulate multiple result pages
r = Mock(spec=Future)
r.result.return_value = self._get_expected({
'success': True,
'start_time': 1,
'finish_time': 2,
'headers_receipt': 3,
'auth_end_time': 4,
'read_length': len(b'objcontent'),
as_comp.side_effect = [
[r for _ in range(0, 10)],
[r for _ in range(0, 10)]
s = SwiftService()
down_gen = s._download_container('test_c', self.opts)
results = list(down_gen)
self.assertEqual(20, len(results))
def test_download_container_job_error(
self, as_comp, sub_page, service_list):
Check that paged downloads work correctly
class BoomError(Exception):
def __init__(self, value):
self.value = value
def __str__(self):
return repr(self.value)
def _make_result():
r = Mock(spec=Future)
r.result.return_value = self._get_expected({
'success': True,
'start_time': 1,
'finish_time': 2,
'headers_receipt': 3,
'auth_end_time': 4,
'read_length': len(b'objcontent'),
return r
as_comp.side_effect = [
# We need Futures here because the error will cause a call to .cancel()
sub_page_effects = [
[_make_result() for _ in range(0, 10)],
BoomError('Go Boom')
sub_page.side_effect = sub_page_effects
# ...but we must also mock the returns to as_completed
as_comp.side_effect = [
[_make_result() for _ in range(0, 10)]
s = SwiftService()
lambda: list(s._download_container('test_c', self.opts))
# This was an unknown error, so make sure we attempt to cancel futures
for spe in sub_page_effects[0]:
# Now test ClientException
sub_page_effects = [
[_make_result() for _ in range(0, 10)],
ClientException('Go Boom')
sub_page.side_effect = sub_page_effects
as_comp.side_effect = [
[_make_result() for _ in range(0, 10)],
[_make_result() for _ in range(0, 10)]
lambda: list(s._download_container('test_c', self.opts))
# This was a ClientException, so make sure we don't cancel futures
for spe in sub_page_effects[0]:
def test_download_object_job(self):
mock_conn = self._get_mock_connection()
objcontent = six.BytesIO(b'objcontent')
mock_conn.get_object.side_effect = [
({'content-type': 'text/plain',
'etag': '2cbbfe139a744d6abbe695e17f3c1991'},
expected_r = self._get_expected({
'success': True,
'start_time': 1,
'finish_time': 2,
'headers_receipt': 3,
'auth_end_time': 4,
'read_length': len(b'objcontent'),
with mock.patch.object(builtins, 'open') as mock_open:
written_content = Mock()
mock_open.return_value = written_content
s = SwiftService()
_opts = self.opts.copy()
_opts['no_download'] = False
actual_r = s._download_object_job(
mock_conn, 'test_c', 'test_o', _opts)
actual_r = dict( # Need to override the times we got from the call
'start_time': 1,
'finish_time': 2,
'headers_receipt': 3
mock_open.assert_called_once_with('test_o', 'wb')
'test_c', 'test_o', resp_chunk_size=65536, headers={},
self._assertDictEqual(expected_r, actual_r)
def test_download_object_job_exception(self):
mock_conn = self._get_mock_connection()
mock_conn.get_object = Mock(side_effect=self.exc)
expected_r = self._get_expected({
'success': False,
'error': self.exc
s = SwiftService()
actual_r = s._download_object_job(
mock_conn, 'test_c', 'test_o', self.opts)
'test_c', 'test_o', resp_chunk_size=65536, headers={},
self._assertDictEqual(expected_r, actual_r)
def test_download(self):
service = SwiftService()
with mock.patch('swiftclient.service.Connection') as mock_conn:

@ -377,6 +377,62 @@ class TestShell(unittest.TestCase):
self.assertEqual('objcontent', output.out)
def test_download_shuffle(self, connection, mock_shuffle):
# Test that the container and object lists are shuffled
mock_shuffle.side_effect = lambda l: l
connection.return_value.get_object.return_value = [
{'content-type': 'text/plain',
'etag': EMPTY_ETAG},
connection.return_value.get_container.side_effect = [
(None, [{'name': 'object'}]),
(None, [{'name': 'pseudo/'}]),
(None, []),
connection.return_value.auth_end_time = 0
connection.return_value.attempts = 0
connection.return_value.get_account.side_effect = [
(None, [{'name': 'container'}]),
(None, [])
with mock.patch(BUILTIN_OPEN) as mock_open:
argv = ["", "download", "--all"]
self.assertEqual(3, mock_shuffle.call_count)
mock_open.assert_called_once_with('container/object', 'wb')
# Test that the container and object lists are not shuffled
connection.return_value.get_object.return_value = [
{'content-type': 'text/plain',
'etag': 'd41d8cd98f00b204e9800998ecf8427e'},
connection.return_value.get_container.side_effect = [
(None, [{'name': 'object'}]),
(None, [{'name': 'pseudo/'}]),
(None, []),
connection.return_value.auth_end_time = 0
connection.return_value.attempts = 0
connection.return_value.get_account.side_effect = [
(None, [{'name': 'container'}]),
(None, [])
with mock.patch(BUILTIN_OPEN) as mock_open:
argv = ["", "download", "--all", "--no-shuffle"]
self.assertEqual(0, mock_shuffle.call_count)
mock_open.assert_called_once_with('container/object', 'wb')
def test_download_no_content_type(self, connection):
connection.return_value.get_object.return_value = [