Reduce memory usage for download/delete and add --no-shuffle option to st_download
The current code builds a full object listing before performing either a multiple download or delete operation (and also shuffles this complete list in the case of a download). This patch removes the creation of the full object list and adds the ability to turn off shuffle for files when downloading. Also added is a limit on the number of list results that can be queued by a single call to service.list without consuming results (reduces memory overhead for large listings). Some tests added for service.py download and list. Change-Id: Ie737cbb7f8b1fa8a79bbb88914730b05aa7f2906
This commit is contained in:
parent
63998b481c
commit
a8c4df98ee
@ -176,7 +176,8 @@ _default_local_options = {
|
||||
'fail_fast': False,
|
||||
'human': False,
|
||||
'dir_marker': False,
|
||||
'checksum': True
|
||||
'checksum': True,
|
||||
'shuffle': False
|
||||
}
|
||||
|
||||
POLICY = 'X-Storage-Policy'
|
||||
@ -752,7 +753,7 @@ class SwiftService(object):
|
||||
else:
|
||||
options = self._options
|
||||
|
||||
rq = Queue()
|
||||
rq = Queue(maxsize=10) # Just stop list running away consuming memory
|
||||
|
||||
if container is None:
|
||||
listing_future = self.thread_manager.container_pool.submit(
|
||||
@ -895,6 +896,7 @@ class SwiftService(object):
|
||||
'out_directory': None,
|
||||
'out_file': None,
|
||||
'remove_prefix': False,
|
||||
'shuffle' : False
|
||||
}
|
||||
|
||||
:returns: A generator for returning the results of the download
|
||||
@ -916,40 +918,21 @@ class SwiftService(object):
|
||||
try:
|
||||
options_copy = deepcopy(options)
|
||||
options_copy["long"] = False
|
||||
containers = []
|
||||
|
||||
for part in self.list(options=options_copy):
|
||||
if part["success"]:
|
||||
containers.extend([
|
||||
i['name'] for i in part["listing"]
|
||||
])
|
||||
containers = [i['name'] for i in part["listing"]]
|
||||
|
||||
if options['shuffle']:
|
||||
shuffle(containers)
|
||||
|
||||
for con in containers:
|
||||
for res in self._download_container(
|
||||
con, options_copy):
|
||||
yield res
|
||||
else:
|
||||
raise part["error"]
|
||||
|
||||
shuffle(containers)
|
||||
|
||||
o_downs = []
|
||||
for con in containers:
|
||||
objs = []
|
||||
for part in self.list(
|
||||
container=con, options=options_copy):
|
||||
if part["success"]:
|
||||
objs.extend([
|
||||
i['name'] for i in part["listing"]
|
||||
])
|
||||
else:
|
||||
raise part["error"]
|
||||
shuffle(objs)
|
||||
|
||||
o_downs.extend(
|
||||
self.thread_manager.object_dd_pool.submit(
|
||||
self._download_object_job, con, obj,
|
||||
options_copy
|
||||
) for obj in objs
|
||||
)
|
||||
|
||||
for o_down in interruptable_as_completed(o_downs):
|
||||
yield o_down.result()
|
||||
|
||||
# If we see a 404 here, the listing of the account failed
|
||||
except ClientException as err:
|
||||
if err.http_status != 404:
|
||||
@ -1153,14 +1136,17 @@ class SwiftService(object):
|
||||
}
|
||||
return res
|
||||
|
||||
def _download_container(self, container, options):
|
||||
def _submit_page_downloads(self, container, page_generator, options):
|
||||
try:
|
||||
objects = []
|
||||
for part in self.list(container=container, options=options):
|
||||
if part["success"]:
|
||||
objects.extend([o["name"] for o in part["listing"]])
|
||||
else:
|
||||
raise part["error"]
|
||||
list_page = next(page_generator)
|
||||
except StopIteration:
|
||||
return None
|
||||
|
||||
if list_page["success"]:
|
||||
objects = [o["name"] for o in list_page["listing"]]
|
||||
|
||||
if options["shuffle"]:
|
||||
shuffle(objects)
|
||||
|
||||
o_downs = [
|
||||
self.thread_manager.object_dd_pool.submit(
|
||||
@ -1168,14 +1154,60 @@ class SwiftService(object):
|
||||
) for obj in objects
|
||||
]
|
||||
|
||||
for o_down in interruptable_as_completed(o_downs):
|
||||
yield o_down.result()
|
||||
return o_downs
|
||||
else:
|
||||
raise list_page["error"]
|
||||
|
||||
def _download_container(self, container, options):
|
||||
_page_generator = self.list(container=container, options=options)
|
||||
try:
|
||||
next_page_downs = self._submit_page_downloads(
|
||||
container, _page_generator, options
|
||||
)
|
||||
except ClientException as err:
|
||||
if err.http_status != 404:
|
||||
raise
|
||||
raise SwiftError('Container %r not found' % container,
|
||||
container=container)
|
||||
raise SwiftError(
|
||||
'Container %r not found' % container, container=container
|
||||
)
|
||||
|
||||
error = None
|
||||
while next_page_downs:
|
||||
page_downs = next_page_downs
|
||||
next_page_downs = None
|
||||
|
||||
# Start downloading the next page of list results when
|
||||
# we have completed 80% of the previous page
|
||||
next_page_triggered = False
|
||||
next_page_trigger_point = 0.8 * len(page_downs)
|
||||
|
||||
page_results_yielded = 0
|
||||
for o_down in interruptable_as_completed(page_downs):
|
||||
yield o_down.result()
|
||||
|
||||
# Do we need to start the next set of downloads yet?
|
||||
if not next_page_triggered:
|
||||
page_results_yielded += 1
|
||||
if page_results_yielded >= next_page_trigger_point:
|
||||
try:
|
||||
next_page_downs = self._submit_page_downloads(
|
||||
container, _page_generator, options
|
||||
)
|
||||
except ClientException as err:
|
||||
# Allow the current page to finish downloading
|
||||
error = err
|
||||
except Exception:
|
||||
# Something unexpected went wrong - cancel
|
||||
# remaining downloads
|
||||
for _d in page_downs:
|
||||
_d.cancel()
|
||||
raise
|
||||
finally:
|
||||
# Stop counting and testing
|
||||
next_page_triggered = True
|
||||
|
||||
if error:
|
||||
raise error
|
||||
|
||||
# Upload related methods
|
||||
#
|
||||
@ -2080,17 +2112,18 @@ class SwiftService(object):
|
||||
|
||||
def _delete_container(self, container, options):
|
||||
try:
|
||||
objs = []
|
||||
for part in self.list(container=container):
|
||||
if part["success"]:
|
||||
objs.extend([o['name'] for o in part['listing']])
|
||||
objs = [o['name'] for o in part['listing']]
|
||||
|
||||
o_dels = self.delete(
|
||||
container=container, objects=objs, options=options
|
||||
)
|
||||
for res in o_dels:
|
||||
yield res
|
||||
else:
|
||||
raise part["error"]
|
||||
|
||||
for res in self.delete(
|
||||
container=container, objects=objs, options=options):
|
||||
yield res
|
||||
|
||||
con_del = self.thread_manager.container_pool.submit(
|
||||
self._delete_empty_container, container
|
||||
)
|
||||
|
@ -198,6 +198,14 @@ Optional arguments:
|
||||
Example --header "content-type:text/plain"
|
||||
--skip-identical Skip downloading files that are identical on both
|
||||
sides.
|
||||
--no-shuffle By default, when downloading a complete account or
|
||||
container, download order is randomised in order to
|
||||
to reduce the load on individual drives when multiple
|
||||
clients are executed simultaneously to download the
|
||||
same set of objects (e.g. a nightly automated download
|
||||
script to multiple servers). Enable this option to
|
||||
submit download jobs to the thread pool in the order
|
||||
they are listed in the object store.
|
||||
'''.strip("\n")
|
||||
|
||||
|
||||
@ -247,6 +255,14 @@ def st_download(parser, args, output_manager):
|
||||
'--skip-identical', action='store_true', dest='skip_identical',
|
||||
default=False, help='Skip downloading files that are identical on '
|
||||
'both sides.')
|
||||
parser.add_option(
|
||||
'--no-shuffle', action='store_false', dest='shuffle',
|
||||
default=True, help='By default, download order is randomised in order '
|
||||
'to reduce the load on individual drives when multiple clients are '
|
||||
'executed simultaneously to download the same set of objects (e.g. a '
|
||||
'nightly automated download script to multiple servers). Enable this '
|
||||
'option to submit download jobs to the thread pool in the order they '
|
||||
'are listed in the object store.')
|
||||
(options, args) = parse_args(parser, args)
|
||||
args = args[1:]
|
||||
if options.out_file == '-':
|
||||
@ -353,6 +369,8 @@ def st_download(parser, args, output_manager):
|
||||
|
||||
except SwiftError as e:
|
||||
output_manager.error(e.value)
|
||||
except Exception as e:
|
||||
output_manager.error(e)
|
||||
|
||||
|
||||
st_list_options = '''[--long] [--lh] [--totals] [--prefix <prefix>]
|
||||
|
@ -14,23 +14,25 @@
|
||||
# limitations under the License.
|
||||
import mock
|
||||
import os
|
||||
import six
|
||||
import tempfile
|
||||
import testtools
|
||||
import time
|
||||
|
||||
from concurrent.futures import Future
|
||||
from hashlib import md5
|
||||
from mock import Mock, PropertyMock
|
||||
from six.moves.queue import Queue, Empty as QueueEmptyError
|
||||
from six import BytesIO
|
||||
from time import sleep
|
||||
|
||||
import swiftclient
|
||||
import swiftclient.utils as utils
|
||||
from swiftclient.client import Connection, ClientException
|
||||
from swiftclient.service import SwiftService, SwiftError,\
|
||||
SwiftUploadObject
|
||||
import six
|
||||
if six.PY2:
|
||||
import __builtin__ as builtins
|
||||
else:
|
||||
import builtins
|
||||
from swiftclient.service import (
|
||||
SwiftService, SwiftError, SwiftUploadObject
|
||||
)
|
||||
|
||||
|
||||
clean_os_environ = {}
|
||||
environ_prefixes = ('ST_', 'OS_')
|
||||
@ -39,6 +41,12 @@ for key in os.environ:
|
||||
clean_os_environ[key] = ''
|
||||
|
||||
|
||||
if six.PY2:
|
||||
import __builtin__ as builtins
|
||||
else:
|
||||
import builtins
|
||||
|
||||
|
||||
class TestSwiftPostObject(testtools.TestCase):
|
||||
|
||||
def setUp(self):
|
||||
@ -142,25 +150,24 @@ class TestSwiftReader(testtools.TestCase):
|
||||
'97ac82a5b825239e782d0339e2d7b910')
|
||||
|
||||
|
||||
class TestServiceDelete(testtools.TestCase):
|
||||
def setUp(self):
|
||||
super(TestServiceDelete, self).setUp()
|
||||
self.opts = {'leave_segments': False, 'yes_all': False}
|
||||
self.exc = Exception('test_exc')
|
||||
# Base response to be copied and updated to matched the expected
|
||||
# response for each test
|
||||
self.expected = {
|
||||
'action': None, # Should be string in the form delete_XX
|
||||
'container': 'test_c',
|
||||
'object': 'test_o',
|
||||
'attempts': 2,
|
||||
'response_dict': {},
|
||||
'success': None # Should be a bool
|
||||
}
|
||||
class _TestServiceBase(testtools.TestCase):
|
||||
def _assertDictEqual(self, a, b, m=None):
|
||||
# assertDictEqual is not available in py2.6 so use a shallow check
|
||||
# instead
|
||||
if hasattr(self, 'assertDictEqual'):
|
||||
self.assertDictEqual(a, b, m)
|
||||
else:
|
||||
self.assertTrue(isinstance(a, dict))
|
||||
self.assertTrue(isinstance(b, dict))
|
||||
self.assertEqual(len(a), len(b), m)
|
||||
for k, v in a.items():
|
||||
self.assertTrue(k in b, m)
|
||||
self.assertEqual(b[k], v, m)
|
||||
|
||||
def _get_mock_connection(self, attempts=2):
|
||||
m = Mock(spec=Connection)
|
||||
type(m).attempts = PropertyMock(return_value=attempts)
|
||||
type(m).auth_end_time = PropertyMock(return_value=4)
|
||||
return m
|
||||
|
||||
def _get_queue(self, q):
|
||||
@ -178,18 +185,22 @@ class TestServiceDelete(testtools.TestCase):
|
||||
|
||||
return expected
|
||||
|
||||
def _assertDictEqual(self, a, b, m=None):
|
||||
# assertDictEqual is not available in py2.6 so use a shallow check
|
||||
# instead
|
||||
if hasattr(self, 'assertDictEqual'):
|
||||
self.assertDictEqual(a, b, m)
|
||||
else:
|
||||
self.assertTrue(isinstance(a, dict))
|
||||
self.assertTrue(isinstance(b, dict))
|
||||
self.assertEqual(len(a), len(b), m)
|
||||
for k, v in a.items():
|
||||
self.assertTrue(k in b, m)
|
||||
self.assertEqual(b[k], v, m)
|
||||
|
||||
class TestServiceDelete(_TestServiceBase):
|
||||
def setUp(self):
|
||||
super(TestServiceDelete, self).setUp()
|
||||
self.opts = {'leave_segments': False, 'yes_all': False}
|
||||
self.exc = Exception('test_exc')
|
||||
# Base response to be copied and updated to matched the expected
|
||||
# response for each test
|
||||
self.expected = {
|
||||
'action': None, # Should be string in the form delete_XX
|
||||
'container': 'test_c',
|
||||
'object': 'test_o',
|
||||
'attempts': 2,
|
||||
'response_dict': {},
|
||||
'success': None # Should be a bool
|
||||
}
|
||||
|
||||
def test_delete_segment(self):
|
||||
mock_q = Queue()
|
||||
@ -542,6 +553,226 @@ class TestSwiftUploadObject(testtools.TestCase):
|
||||
self.assertRaises(SwiftError, self.suo, [])
|
||||
|
||||
|
||||
class TestServiceList(_TestServiceBase):
|
||||
def setUp(self):
|
||||
super(TestServiceList, self).setUp()
|
||||
self.opts = {'prefix': None, 'long': False, 'delimiter': ''}
|
||||
self.exc = Exception('test_exc')
|
||||
# Base response to be copied and updated to matched the expected
|
||||
# response for each test
|
||||
self.expected = {
|
||||
'action': None, # Should be list_X_part (account or container)
|
||||
'container': None, # Should be a string when listing a container
|
||||
'prefix': None,
|
||||
'success': None # Should be a bool
|
||||
}
|
||||
|
||||
def test_list_account(self):
|
||||
mock_q = Queue()
|
||||
mock_conn = self._get_mock_connection()
|
||||
get_account_returns = [
|
||||
(None, [{'name': 'test_c'}]),
|
||||
(None, [])
|
||||
]
|
||||
mock_conn.get_account = Mock(side_effect=get_account_returns)
|
||||
|
||||
expected_r = self._get_expected({
|
||||
'action': 'list_account_part',
|
||||
'success': True,
|
||||
'listing': [{'name': 'test_c'}],
|
||||
'marker': ''
|
||||
})
|
||||
|
||||
SwiftService._list_account_job(
|
||||
mock_conn, self.opts, mock_q
|
||||
)
|
||||
self._assertDictEqual(expected_r, self._get_queue(mock_q))
|
||||
self.assertIsNone(self._get_queue(mock_q))
|
||||
|
||||
long_opts = dict(self.opts, **{'long': True})
|
||||
mock_conn.head_container = Mock(return_value={'test_m': '1'})
|
||||
get_account_returns = [
|
||||
(None, [{'name': 'test_c'}]),
|
||||
(None, [])
|
||||
]
|
||||
mock_conn.get_account = Mock(side_effect=get_account_returns)
|
||||
|
||||
expected_r_long = self._get_expected({
|
||||
'action': 'list_account_part',
|
||||
'success': True,
|
||||
'listing': [{'name': 'test_c', 'meta': {'test_m': '1'}}],
|
||||
'marker': '',
|
||||
})
|
||||
|
||||
SwiftService._list_account_job(
|
||||
mock_conn, long_opts, mock_q
|
||||
)
|
||||
self._assertDictEqual(expected_r_long, self._get_queue(mock_q))
|
||||
self.assertIsNone(self._get_queue(mock_q))
|
||||
|
||||
def test_list_account_exception(self):
|
||||
mock_q = Queue()
|
||||
mock_conn = self._get_mock_connection()
|
||||
mock_conn.get_account = Mock(side_effect=self.exc)
|
||||
expected_r = self._get_expected({
|
||||
'action': 'list_account_part',
|
||||
'success': False,
|
||||
'error': self.exc,
|
||||
'marker': ''
|
||||
})
|
||||
|
||||
SwiftService._list_account_job(
|
||||
mock_conn, self.opts, mock_q)
|
||||
|
||||
mock_conn.get_account.assert_called_once_with(
|
||||
marker='', prefix=None
|
||||
)
|
||||
self._assertDictEqual(expected_r, self._get_queue(mock_q))
|
||||
self.assertIsNone(self._get_queue(mock_q))
|
||||
|
||||
def test_list_container(self):
|
||||
mock_q = Queue()
|
||||
mock_conn = self._get_mock_connection()
|
||||
get_container_returns = [
|
||||
(None, [{'name': 'test_o'}]),
|
||||
(None, [])
|
||||
]
|
||||
mock_conn.get_container = Mock(side_effect=get_container_returns)
|
||||
|
||||
expected_r = self._get_expected({
|
||||
'action': 'list_container_part',
|
||||
'container': 'test_c',
|
||||
'success': True,
|
||||
'listing': [{'name': 'test_o'}],
|
||||
'marker': ''
|
||||
})
|
||||
|
||||
SwiftService._list_container_job(
|
||||
mock_conn, 'test_c', self.opts, mock_q
|
||||
)
|
||||
self._assertDictEqual(expected_r, self._get_queue(mock_q))
|
||||
self.assertIsNone(self._get_queue(mock_q))
|
||||
|
||||
long_opts = dict(self.opts, **{'long': True})
|
||||
mock_conn.head_container = Mock(return_value={'test_m': '1'})
|
||||
get_container_returns = [
|
||||
(None, [{'name': 'test_o'}]),
|
||||
(None, [])
|
||||
]
|
||||
mock_conn.get_container = Mock(side_effect=get_container_returns)
|
||||
|
||||
expected_r_long = self._get_expected({
|
||||
'action': 'list_container_part',
|
||||
'container': 'test_c',
|
||||
'success': True,
|
||||
'listing': [{'name': 'test_o'}],
|
||||
'marker': ''
|
||||
})
|
||||
|
||||
SwiftService._list_container_job(
|
||||
mock_conn, 'test_c', long_opts, mock_q
|
||||
)
|
||||
self._assertDictEqual(expected_r_long, self._get_queue(mock_q))
|
||||
self.assertIsNone(self._get_queue(mock_q))
|
||||
|
||||
def test_list_container_exception(self):
|
||||
mock_q = Queue()
|
||||
mock_conn = self._get_mock_connection()
|
||||
mock_conn.get_container = Mock(side_effect=self.exc)
|
||||
expected_r = self._get_expected({
|
||||
'action': 'list_container_part',
|
||||
'container': 'test_c',
|
||||
'success': False,
|
||||
'error': self.exc,
|
||||
'marker': ''
|
||||
})
|
||||
|
||||
SwiftService._list_container_job(
|
||||
mock_conn, 'test_c', self.opts, mock_q
|
||||
)
|
||||
|
||||
mock_conn.get_container.assert_called_once_with(
|
||||
'test_c', marker='', delimiter='', prefix=None
|
||||
)
|
||||
self._assertDictEqual(expected_r, self._get_queue(mock_q))
|
||||
self.assertIsNone(self._get_queue(mock_q))
|
||||
|
||||
@mock.patch('swiftclient.service.get_conn')
|
||||
def test_list_queue_size(self, mock_get_conn):
|
||||
mock_conn = self._get_mock_connection()
|
||||
# Return more results than should fit in the results queue
|
||||
get_account_returns = [
|
||||
(None, [{'name': 'container1'}]),
|
||||
(None, [{'name': 'container2'}]),
|
||||
(None, [{'name': 'container3'}]),
|
||||
(None, [{'name': 'container4'}]),
|
||||
(None, [{'name': 'container5'}]),
|
||||
(None, [{'name': 'container6'}]),
|
||||
(None, [{'name': 'container7'}]),
|
||||
(None, [{'name': 'container8'}]),
|
||||
(None, [{'name': 'container9'}]),
|
||||
(None, [{'name': 'container10'}]),
|
||||
(None, [{'name': 'container11'}]),
|
||||
(None, [{'name': 'container12'}]),
|
||||
(None, [{'name': 'container13'}]),
|
||||
(None, [{'name': 'container14'}]),
|
||||
(None, [])
|
||||
]
|
||||
mock_conn.get_account = Mock(side_effect=get_account_returns)
|
||||
mock_get_conn.return_value = mock_conn
|
||||
|
||||
s = SwiftService(options=self.opts)
|
||||
lg = s.list()
|
||||
|
||||
# Start the generator
|
||||
first_list_part = next(lg)
|
||||
|
||||
# Wait for the number of calls to get_account to reach our expected
|
||||
# value, then let it run some more to make sure the value remains
|
||||
# stable
|
||||
count = mock_conn.get_account.call_count
|
||||
stable = 0
|
||||
while mock_conn.get_account.call_count != count or stable < 5:
|
||||
if mock_conn.get_account.call_count == count:
|
||||
stable += 1
|
||||
else:
|
||||
count = mock_conn.get_account.call_count
|
||||
stable = 0
|
||||
# The test requires a small sleep to allow other threads to
|
||||
# execute - in this mocked environment we assume that if the call
|
||||
# count to get_account has not changed in 0.25s then no more calls
|
||||
# will be made.
|
||||
sleep(0.05)
|
||||
|
||||
stable_get_account_call_count = mock_conn.get_account.call_count
|
||||
|
||||
# Collect all remaining results from the generator
|
||||
list_results = [first_list_part] + list(lg)
|
||||
|
||||
# Make sure the stable call count is correct - this should be 12 calls
|
||||
# to get_account;
|
||||
# 1 for first_list_part
|
||||
# 10 for the values on the queue
|
||||
# 1 for the value blocking whilst trying to place onto the queue
|
||||
self.assertEqual(12, stable_get_account_call_count)
|
||||
|
||||
# Make sure all the containers were listed and placed onto the queue
|
||||
self.assertEqual(15, mock_conn.get_account.call_count)
|
||||
|
||||
# Check the results were all returned
|
||||
observed_listing = []
|
||||
for lir in list_results:
|
||||
observed_listing.append(
|
||||
[li['name'] for li in lir['listing']]
|
||||
)
|
||||
expected_listing = []
|
||||
for gar in get_account_returns[:-1]: # The empty list is not returned
|
||||
expected_listing.append(
|
||||
[li['name'] for li in gar[1]]
|
||||
)
|
||||
self.assertEqual(observed_listing, expected_listing)
|
||||
|
||||
|
||||
class TestService(testtools.TestCase):
|
||||
|
||||
def test_upload_with_bad_segment_size(self):
|
||||
@ -589,23 +820,7 @@ class TestService(testtools.TestCase):
|
||||
self.assertEqual(upload_obj_resp['path'], obj['path'])
|
||||
|
||||
|
||||
class TestServiceUpload(testtools.TestCase):
|
||||
|
||||
def _assertDictEqual(self, a, b, m=None):
|
||||
# assertDictEqual is not available in py2.6 so use a shallow check
|
||||
# instead
|
||||
if not m:
|
||||
m = '{0} != {1}'.format(a, b)
|
||||
|
||||
if hasattr(self, 'assertDictEqual'):
|
||||
self.assertDictEqual(a, b, m)
|
||||
else:
|
||||
self.assertIsInstance(a, dict, m)
|
||||
self.assertIsInstance(b, dict, m)
|
||||
self.assertEqual(len(a), len(b), m)
|
||||
for k, v in a.items():
|
||||
self.assertIn(k, b, m)
|
||||
self.assertEqual(b[k], v, m)
|
||||
class TestServiceUpload(_TestServiceBase):
|
||||
|
||||
def test_upload_segment_job(self):
|
||||
with tempfile.NamedTemporaryFile() as f:
|
||||
@ -1027,7 +1242,7 @@ class TestServiceUpload(testtools.TestCase):
|
||||
mock_conn.get_container.assert_has_calls(expected)
|
||||
|
||||
|
||||
class TestServiceDownload(testtools.TestCase):
|
||||
class TestServiceDownload(_TestServiceBase):
|
||||
|
||||
def setUp(self):
|
||||
super(TestServiceDownload, self).setUp()
|
||||
@ -1036,6 +1251,19 @@ class TestServiceDownload(testtools.TestCase):
|
||||
self.obj_content = b'c' * 10
|
||||
self.obj_etag = md5(self.obj_content).hexdigest()
|
||||
self.obj_len = len(self.obj_content)
|
||||
self.exc = Exception('test_exc')
|
||||
# Base response to be copied and updated to matched the expected
|
||||
# response for each test
|
||||
self.expected = {
|
||||
'action': 'download_object', # Should always be download_object
|
||||
'container': 'test_c',
|
||||
'object': 'test_o',
|
||||
'attempts': 2,
|
||||
'response_dict': {},
|
||||
'path': 'test_o',
|
||||
'pseudodir': False,
|
||||
'success': None # Should be a bool
|
||||
}
|
||||
|
||||
def _readbody(self):
|
||||
yield self.obj_content
|
||||
@ -1056,6 +1284,166 @@ class TestServiceDownload(testtools.TestCase):
|
||||
self.assertIn(k, b, m)
|
||||
self.assertEqual(b[k], v, m)
|
||||
|
||||
@mock.patch('swiftclient.service.SwiftService.list')
|
||||
@mock.patch('swiftclient.service.SwiftService._submit_page_downloads')
|
||||
@mock.patch('swiftclient.service.interruptable_as_completed')
|
||||
def test_download_container_job(self, as_comp, sub_page, service_list):
|
||||
"""
|
||||
Check that paged downloads work correctly
|
||||
"""
|
||||
as_comp.side_effect = [
|
||||
|
||||
]
|
||||
sub_page.side_effect = [
|
||||
range(0, 10), range(0, 10), [] # simulate multiple result pages
|
||||
]
|
||||
r = Mock(spec=Future)
|
||||
r.result.return_value = self._get_expected({
|
||||
'success': True,
|
||||
'start_time': 1,
|
||||
'finish_time': 2,
|
||||
'headers_receipt': 3,
|
||||
'auth_end_time': 4,
|
||||
'read_length': len(b'objcontent'),
|
||||
})
|
||||
as_comp.side_effect = [
|
||||
[r for _ in range(0, 10)],
|
||||
[r for _ in range(0, 10)]
|
||||
]
|
||||
|
||||
s = SwiftService()
|
||||
down_gen = s._download_container('test_c', self.opts)
|
||||
results = list(down_gen)
|
||||
self.assertEqual(20, len(results))
|
||||
|
||||
@mock.patch('swiftclient.service.SwiftService.list')
|
||||
@mock.patch('swiftclient.service.SwiftService._submit_page_downloads')
|
||||
@mock.patch('swiftclient.service.interruptable_as_completed')
|
||||
def test_download_container_job_error(
|
||||
self, as_comp, sub_page, service_list):
|
||||
"""
|
||||
Check that paged downloads work correctly
|
||||
"""
|
||||
class BoomError(Exception):
|
||||
def __init__(self, value):
|
||||
self.value = value
|
||||
|
||||
def __str__(self):
|
||||
return repr(self.value)
|
||||
|
||||
def _make_result():
|
||||
r = Mock(spec=Future)
|
||||
r.result.return_value = self._get_expected({
|
||||
'success': True,
|
||||
'start_time': 1,
|
||||
'finish_time': 2,
|
||||
'headers_receipt': 3,
|
||||
'auth_end_time': 4,
|
||||
'read_length': len(b'objcontent'),
|
||||
})
|
||||
return r
|
||||
|
||||
as_comp.side_effect = [
|
||||
|
||||
]
|
||||
# We need Futures here because the error will cause a call to .cancel()
|
||||
sub_page_effects = [
|
||||
[_make_result() for _ in range(0, 10)],
|
||||
BoomError('Go Boom')
|
||||
]
|
||||
sub_page.side_effect = sub_page_effects
|
||||
# ...but we must also mock the returns to as_completed
|
||||
as_comp.side_effect = [
|
||||
[_make_result() for _ in range(0, 10)]
|
||||
]
|
||||
|
||||
s = SwiftService()
|
||||
self.assertRaises(
|
||||
BoomError,
|
||||
lambda: list(s._download_container('test_c', self.opts))
|
||||
)
|
||||
# This was an unknown error, so make sure we attempt to cancel futures
|
||||
for spe in sub_page_effects[0]:
|
||||
spe.cancel.assert_called_once_with()
|
||||
|
||||
# Now test ClientException
|
||||
sub_page_effects = [
|
||||
[_make_result() for _ in range(0, 10)],
|
||||
ClientException('Go Boom')
|
||||
]
|
||||
sub_page.side_effect = sub_page_effects
|
||||
as_comp.side_effect = [
|
||||
[_make_result() for _ in range(0, 10)],
|
||||
[_make_result() for _ in range(0, 10)]
|
||||
]
|
||||
self.assertRaises(
|
||||
ClientException,
|
||||
lambda: list(s._download_container('test_c', self.opts))
|
||||
)
|
||||
# This was a ClientException, so make sure we don't cancel futures
|
||||
for spe in sub_page_effects[0]:
|
||||
self.assertFalse(spe.cancel.called)
|
||||
|
||||
def test_download_object_job(self):
|
||||
mock_conn = self._get_mock_connection()
|
||||
objcontent = six.BytesIO(b'objcontent')
|
||||
mock_conn.get_object.side_effect = [
|
||||
({'content-type': 'text/plain',
|
||||
'etag': '2cbbfe139a744d6abbe695e17f3c1991'},
|
||||
objcontent)
|
||||
]
|
||||
expected_r = self._get_expected({
|
||||
'success': True,
|
||||
'start_time': 1,
|
||||
'finish_time': 2,
|
||||
'headers_receipt': 3,
|
||||
'auth_end_time': 4,
|
||||
'read_length': len(b'objcontent'),
|
||||
})
|
||||
|
||||
with mock.patch.object(builtins, 'open') as mock_open:
|
||||
written_content = Mock()
|
||||
mock_open.return_value = written_content
|
||||
s = SwiftService()
|
||||
_opts = self.opts.copy()
|
||||
_opts['no_download'] = False
|
||||
actual_r = s._download_object_job(
|
||||
mock_conn, 'test_c', 'test_o', _opts)
|
||||
actual_r = dict( # Need to override the times we got from the call
|
||||
actual_r,
|
||||
**{
|
||||
'start_time': 1,
|
||||
'finish_time': 2,
|
||||
'headers_receipt': 3
|
||||
}
|
||||
)
|
||||
mock_open.assert_called_once_with('test_o', 'wb')
|
||||
written_content.write.assert_called_once_with(b'objcontent')
|
||||
|
||||
mock_conn.get_object.assert_called_once_with(
|
||||
'test_c', 'test_o', resp_chunk_size=65536, headers={},
|
||||
response_dict={}
|
||||
)
|
||||
self._assertDictEqual(expected_r, actual_r)
|
||||
|
||||
def test_download_object_job_exception(self):
|
||||
mock_conn = self._get_mock_connection()
|
||||
mock_conn.get_object = Mock(side_effect=self.exc)
|
||||
expected_r = self._get_expected({
|
||||
'success': False,
|
||||
'error': self.exc
|
||||
})
|
||||
|
||||
s = SwiftService()
|
||||
actual_r = s._download_object_job(
|
||||
mock_conn, 'test_c', 'test_o', self.opts)
|
||||
|
||||
mock_conn.get_object.assert_called_once_with(
|
||||
'test_c', 'test_o', resp_chunk_size=65536, headers={},
|
||||
response_dict={}
|
||||
)
|
||||
self._assertDictEqual(expected_r, actual_r)
|
||||
|
||||
def test_download(self):
|
||||
service = SwiftService()
|
||||
with mock.patch('swiftclient.service.Connection') as mock_conn:
|
||||
|
@ -377,6 +377,62 @@ class TestShell(unittest.TestCase):
|
||||
swiftclient.shell.main(argv)
|
||||
self.assertEqual('objcontent', output.out)
|
||||
|
||||
@mock.patch('swiftclient.service.shuffle')
|
||||
@mock.patch('swiftclient.service.Connection')
|
||||
def test_download_shuffle(self, connection, mock_shuffle):
|
||||
# Test that the container and object lists are shuffled
|
||||
mock_shuffle.side_effect = lambda l: l
|
||||
connection.return_value.get_object.return_value = [
|
||||
{'content-type': 'text/plain',
|
||||
'etag': EMPTY_ETAG},
|
||||
'']
|
||||
|
||||
connection.return_value.get_container.side_effect = [
|
||||
(None, [{'name': 'object'}]),
|
||||
(None, [{'name': 'pseudo/'}]),
|
||||
(None, []),
|
||||
]
|
||||
connection.return_value.auth_end_time = 0
|
||||
connection.return_value.attempts = 0
|
||||
connection.return_value.get_account.side_effect = [
|
||||
(None, [{'name': 'container'}]),
|
||||
(None, [])
|
||||
]
|
||||
|
||||
with mock.patch(BUILTIN_OPEN) as mock_open:
|
||||
argv = ["", "download", "--all"]
|
||||
swiftclient.shell.main(argv)
|
||||
self.assertEqual(3, mock_shuffle.call_count)
|
||||
mock_shuffle.assert_any_call(['container'])
|
||||
mock_shuffle.assert_any_call(['object'])
|
||||
mock_shuffle.assert_any_call(['pseudo/'])
|
||||
mock_open.assert_called_once_with('container/object', 'wb')
|
||||
|
||||
# Test that the container and object lists are not shuffled
|
||||
mock_shuffle.reset_mock()
|
||||
connection.return_value.get_object.return_value = [
|
||||
{'content-type': 'text/plain',
|
||||
'etag': 'd41d8cd98f00b204e9800998ecf8427e'},
|
||||
'']
|
||||
|
||||
connection.return_value.get_container.side_effect = [
|
||||
(None, [{'name': 'object'}]),
|
||||
(None, [{'name': 'pseudo/'}]),
|
||||
(None, []),
|
||||
]
|
||||
connection.return_value.auth_end_time = 0
|
||||
connection.return_value.attempts = 0
|
||||
connection.return_value.get_account.side_effect = [
|
||||
(None, [{'name': 'container'}]),
|
||||
(None, [])
|
||||
]
|
||||
|
||||
with mock.patch(BUILTIN_OPEN) as mock_open:
|
||||
argv = ["", "download", "--all", "--no-shuffle"]
|
||||
swiftclient.shell.main(argv)
|
||||
self.assertEqual(0, mock_shuffle.call_count)
|
||||
mock_open.assert_called_once_with('container/object', 'wb')
|
||||
|
||||
@mock.patch('swiftclient.service.Connection')
|
||||
def test_download_no_content_type(self, connection):
|
||||
connection.return_value.get_object.return_value = [
|
||||
|
Loading…
x
Reference in New Issue
Block a user