From a8c4df98eee43b419d6dd30e80c838d9f2efd025 Mon Sep 17 00:00:00 2001
From: Joel Wright <joel.wright@sohonet.com>
Date: Tue, 14 Oct 2014 16:54:41 +0100
Subject: [PATCH] Reduce memory usage for download/delete and add --no-shuffle
 option to st_download

The current code builds a full object listing before performing either a multiple
download or delete operation (and also shuffles this complete list in the case of
a download). This patch removes the creation of the full object list and adds the
ability to turn off shuffle for files when downloading. Also added is a limit on
the number of list results that can be queued by a single call to service.list
without consuming results (reduces memory overhead for large listings).

Some tests added for service.py download and list.

Change-Id: Ie737cbb7f8b1fa8a79bbb88914730b05aa7f2906
---
 swiftclient/service.py     | 129 ++++++----
 swiftclient/shell.py       |  18 ++
 tests/unit/test_service.py | 492 +++++++++++++++++++++++++++++++++----
 tests/unit/test_shell.py   |  56 +++++
 4 files changed, 595 insertions(+), 100 deletions(-)

diff --git a/swiftclient/service.py b/swiftclient/service.py
index 7c557691..4f331c4e 100644
--- a/swiftclient/service.py
+++ b/swiftclient/service.py
@@ -176,7 +176,8 @@ _default_local_options = {
     'fail_fast': False,
     'human': False,
     'dir_marker': False,
-    'checksum': True
+    'checksum': True,
+    'shuffle': False
 }
 
 POLICY = 'X-Storage-Policy'
@@ -752,7 +753,7 @@ class SwiftService(object):
         else:
             options = self._options
 
-        rq = Queue()
+        rq = Queue(maxsize=10)  # Just stop list running away consuming memory
 
         if container is None:
             listing_future = self.thread_manager.container_pool.submit(
@@ -895,6 +896,7 @@ class SwiftService(object):
                                 'out_directory': None,
                                 'out_file': None,
                                 'remove_prefix': False,
+                                'shuffle' : False
                             }
 
         :returns: A generator for returning the results of the download
@@ -916,40 +918,21 @@ class SwiftService(object):
                 try:
                     options_copy = deepcopy(options)
                     options_copy["long"] = False
-                    containers = []
+
                     for part in self.list(options=options_copy):
                         if part["success"]:
-                            containers.extend([
-                                i['name'] for i in part["listing"]
-                            ])
+                            containers = [i['name'] for i in part["listing"]]
+
+                            if options['shuffle']:
+                                shuffle(containers)
+
+                            for con in containers:
+                                for res in self._download_container(
+                                        con, options_copy):
+                                    yield res
                         else:
                             raise part["error"]
 
-                    shuffle(containers)
-
-                    o_downs = []
-                    for con in containers:
-                        objs = []
-                        for part in self.list(
-                                container=con, options=options_copy):
-                            if part["success"]:
-                                objs.extend([
-                                    i['name'] for i in part["listing"]
-                                ])
-                            else:
-                                raise part["error"]
-                        shuffle(objs)
-
-                        o_downs.extend(
-                            self.thread_manager.object_dd_pool.submit(
-                                self._download_object_job, con, obj,
-                                options_copy
-                            ) for obj in objs
-                        )
-
-                    for o_down in interruptable_as_completed(o_downs):
-                        yield o_down.result()
-
                 # If we see a 404 here, the listing of the account failed
                 except ClientException as err:
                     if err.http_status != 404:
@@ -1153,14 +1136,17 @@ class SwiftService(object):
             }
             return res
 
-    def _download_container(self, container, options):
+    def _submit_page_downloads(self, container, page_generator, options):
         try:
-            objects = []
-            for part in self.list(container=container, options=options):
-                if part["success"]:
-                    objects.extend([o["name"] for o in part["listing"]])
-                else:
-                    raise part["error"]
+            list_page = next(page_generator)
+        except StopIteration:
+            return None
+
+        if list_page["success"]:
+            objects = [o["name"] for o in list_page["listing"]]
+
+            if options["shuffle"]:
+                shuffle(objects)
 
             o_downs = [
                 self.thread_manager.object_dd_pool.submit(
@@ -1168,14 +1154,60 @@ class SwiftService(object):
                 ) for obj in objects
             ]
 
-            for o_down in interruptable_as_completed(o_downs):
-                yield o_down.result()
+            return o_downs
+        else:
+            raise list_page["error"]
 
+    def _download_container(self, container, options):
+        _page_generator = self.list(container=container, options=options)
+        try:
+            next_page_downs = self._submit_page_downloads(
+                container, _page_generator, options
+            )
         except ClientException as err:
             if err.http_status != 404:
                 raise
-            raise SwiftError('Container %r not found' % container,
-                             container=container)
+            raise SwiftError(
+                'Container %r not found' % container, container=container
+            )
+
+        error = None
+        while next_page_downs:
+            page_downs = next_page_downs
+            next_page_downs = None
+
+            # Start downloading the next page of list results when
+            # we have completed 80% of the previous page
+            next_page_triggered = False
+            next_page_trigger_point = 0.8 * len(page_downs)
+
+            page_results_yielded = 0
+            for o_down in interruptable_as_completed(page_downs):
+                yield o_down.result()
+
+                # Do we need to start the next set of downloads yet?
+                if not next_page_triggered:
+                    page_results_yielded += 1
+                    if page_results_yielded >= next_page_trigger_point:
+                        try:
+                            next_page_downs = self._submit_page_downloads(
+                                container, _page_generator, options
+                            )
+                        except ClientException as err:
+                            # Allow the current page to finish downloading
+                            error = err
+                        except Exception:
+                            # Something unexpected went wrong - cancel
+                            # remaining downloads
+                            for _d in page_downs:
+                                _d.cancel()
+                            raise
+                        finally:
+                            # Stop counting and testing
+                            next_page_triggered = True
+
+        if error:
+            raise error
 
     # Upload related methods
     #
@@ -2080,17 +2112,18 @@ class SwiftService(object):
 
     def _delete_container(self, container, options):
         try:
-            objs = []
             for part in self.list(container=container):
                 if part["success"]:
-                    objs.extend([o['name'] for o in part['listing']])
+                    objs = [o['name'] for o in part['listing']]
+
+                    o_dels = self.delete(
+                        container=container, objects=objs, options=options
+                    )
+                    for res in o_dels:
+                        yield res
                 else:
                     raise part["error"]
 
-            for res in self.delete(
-                    container=container, objects=objs, options=options):
-                yield res
-
             con_del = self.thread_manager.container_pool.submit(
                 self._delete_empty_container, container
             )
diff --git a/swiftclient/shell.py b/swiftclient/shell.py
index a77ea07c..35d7c505 100755
--- a/swiftclient/shell.py
+++ b/swiftclient/shell.py
@@ -198,6 +198,14 @@ Optional arguments:
                         Example --header "content-type:text/plain"
   --skip-identical      Skip downloading files that are identical on both
                         sides.
+  --no-shuffle          By default, when downloading a complete account or
+                        container, download order is randomised in order to
+                        to reduce the load on individual drives when multiple
+                        clients are executed simultaneously to download the
+                        same set of objects (e.g. a nightly automated download
+                        script to multiple servers). Enable this option to
+                        submit download jobs to the thread pool in the order
+                        they are listed in the object store.
 '''.strip("\n")
 
 
@@ -247,6 +255,14 @@ def st_download(parser, args, output_manager):
         '--skip-identical', action='store_true', dest='skip_identical',
         default=False, help='Skip downloading files that are identical on '
         'both sides.')
+    parser.add_option(
+        '--no-shuffle', action='store_false', dest='shuffle',
+        default=True, help='By default, download order is randomised in order '
+        'to reduce the load on individual drives when multiple clients are '
+        'executed simultaneously to download the same set of objects (e.g. a '
+        'nightly automated download script to multiple servers). Enable this '
+        'option to submit download jobs to the thread pool in the order they '
+        'are listed in the object store.')
     (options, args) = parse_args(parser, args)
     args = args[1:]
     if options.out_file == '-':
@@ -353,6 +369,8 @@ def st_download(parser, args, output_manager):
 
         except SwiftError as e:
             output_manager.error(e.value)
+        except Exception as e:
+            output_manager.error(e)
 
 
 st_list_options = '''[--long] [--lh] [--totals] [--prefix <prefix>]
diff --git a/tests/unit/test_service.py b/tests/unit/test_service.py
index 339aca1f..db47263f 100644
--- a/tests/unit/test_service.py
+++ b/tests/unit/test_service.py
@@ -14,23 +14,25 @@
 # limitations under the License.
 import mock
 import os
+import six
 import tempfile
 import testtools
 import time
+
+from concurrent.futures import Future
 from hashlib import md5
 from mock import Mock, PropertyMock
 from six.moves.queue import Queue, Empty as QueueEmptyError
 from six import BytesIO
+from time import sleep
+
 import swiftclient
 import swiftclient.utils as utils
 from swiftclient.client import Connection, ClientException
-from swiftclient.service import SwiftService, SwiftError,\
-    SwiftUploadObject
-import six
-if six.PY2:
-    import __builtin__ as builtins
-else:
-    import builtins
+from swiftclient.service import (
+    SwiftService, SwiftError, SwiftUploadObject
+)
+
 
 clean_os_environ = {}
 environ_prefixes = ('ST_', 'OS_')
@@ -39,6 +41,12 @@ for key in os.environ:
         clean_os_environ[key] = ''
 
 
+if six.PY2:
+    import __builtin__ as builtins
+else:
+    import builtins
+
+
 class TestSwiftPostObject(testtools.TestCase):
 
     def setUp(self):
@@ -142,25 +150,24 @@ class TestSwiftReader(testtools.TestCase):
                          '97ac82a5b825239e782d0339e2d7b910')
 
 
-class TestServiceDelete(testtools.TestCase):
-    def setUp(self):
-        super(TestServiceDelete, self).setUp()
-        self.opts = {'leave_segments': False, 'yes_all': False}
-        self.exc = Exception('test_exc')
-        # Base response to be copied and updated to matched the expected
-        # response for each test
-        self.expected = {
-            'action': None,   # Should be string in the form delete_XX
-            'container': 'test_c',
-            'object': 'test_o',
-            'attempts': 2,
-            'response_dict': {},
-            'success': None   # Should be a bool
-        }
+class _TestServiceBase(testtools.TestCase):
+    def _assertDictEqual(self, a, b, m=None):
+        # assertDictEqual is not available in py2.6 so use a shallow check
+        # instead
+        if hasattr(self, 'assertDictEqual'):
+            self.assertDictEqual(a, b, m)
+        else:
+            self.assertTrue(isinstance(a, dict))
+            self.assertTrue(isinstance(b, dict))
+            self.assertEqual(len(a), len(b), m)
+            for k, v in a.items():
+                self.assertTrue(k in b, m)
+                self.assertEqual(b[k], v, m)
 
     def _get_mock_connection(self, attempts=2):
         m = Mock(spec=Connection)
         type(m).attempts = PropertyMock(return_value=attempts)
+        type(m).auth_end_time = PropertyMock(return_value=4)
         return m
 
     def _get_queue(self, q):
@@ -178,18 +185,22 @@ class TestServiceDelete(testtools.TestCase):
 
         return expected
 
-    def _assertDictEqual(self, a, b, m=None):
-        # assertDictEqual is not available in py2.6 so use a shallow check
-        # instead
-        if hasattr(self, 'assertDictEqual'):
-            self.assertDictEqual(a, b, m)
-        else:
-            self.assertTrue(isinstance(a, dict))
-            self.assertTrue(isinstance(b, dict))
-            self.assertEqual(len(a), len(b), m)
-            for k, v in a.items():
-                self.assertTrue(k in b, m)
-                self.assertEqual(b[k], v, m)
+
+class TestServiceDelete(_TestServiceBase):
+    def setUp(self):
+        super(TestServiceDelete, self).setUp()
+        self.opts = {'leave_segments': False, 'yes_all': False}
+        self.exc = Exception('test_exc')
+        # Base response to be copied and updated to matched the expected
+        # response for each test
+        self.expected = {
+            'action': None,   # Should be string in the form delete_XX
+            'container': 'test_c',
+            'object': 'test_o',
+            'attempts': 2,
+            'response_dict': {},
+            'success': None   # Should be a bool
+        }
 
     def test_delete_segment(self):
         mock_q = Queue()
@@ -542,6 +553,226 @@ class TestSwiftUploadObject(testtools.TestCase):
         self.assertRaises(SwiftError, self.suo, [])
 
 
+class TestServiceList(_TestServiceBase):
+    def setUp(self):
+        super(TestServiceList, self).setUp()
+        self.opts = {'prefix': None, 'long': False, 'delimiter': ''}
+        self.exc = Exception('test_exc')
+        # Base response to be copied and updated to matched the expected
+        # response for each test
+        self.expected = {
+            'action': None,   # Should be list_X_part (account or container)
+            'container': None,   # Should be a string when listing a container
+            'prefix': None,
+            'success': None   # Should be a bool
+        }
+
+    def test_list_account(self):
+        mock_q = Queue()
+        mock_conn = self._get_mock_connection()
+        get_account_returns = [
+            (None, [{'name': 'test_c'}]),
+            (None, [])
+        ]
+        mock_conn.get_account = Mock(side_effect=get_account_returns)
+
+        expected_r = self._get_expected({
+            'action': 'list_account_part',
+            'success': True,
+            'listing': [{'name': 'test_c'}],
+            'marker': ''
+        })
+
+        SwiftService._list_account_job(
+            mock_conn, self.opts, mock_q
+        )
+        self._assertDictEqual(expected_r, self._get_queue(mock_q))
+        self.assertIsNone(self._get_queue(mock_q))
+
+        long_opts = dict(self.opts, **{'long': True})
+        mock_conn.head_container = Mock(return_value={'test_m': '1'})
+        get_account_returns = [
+            (None, [{'name': 'test_c'}]),
+            (None, [])
+        ]
+        mock_conn.get_account = Mock(side_effect=get_account_returns)
+
+        expected_r_long = self._get_expected({
+            'action': 'list_account_part',
+            'success': True,
+            'listing': [{'name': 'test_c', 'meta': {'test_m': '1'}}],
+            'marker': '',
+        })
+
+        SwiftService._list_account_job(
+            mock_conn, long_opts, mock_q
+        )
+        self._assertDictEqual(expected_r_long, self._get_queue(mock_q))
+        self.assertIsNone(self._get_queue(mock_q))
+
+    def test_list_account_exception(self):
+        mock_q = Queue()
+        mock_conn = self._get_mock_connection()
+        mock_conn.get_account = Mock(side_effect=self.exc)
+        expected_r = self._get_expected({
+            'action': 'list_account_part',
+            'success': False,
+            'error': self.exc,
+            'marker': ''
+        })
+
+        SwiftService._list_account_job(
+            mock_conn, self.opts, mock_q)
+
+        mock_conn.get_account.assert_called_once_with(
+            marker='', prefix=None
+        )
+        self._assertDictEqual(expected_r, self._get_queue(mock_q))
+        self.assertIsNone(self._get_queue(mock_q))
+
+    def test_list_container(self):
+        mock_q = Queue()
+        mock_conn = self._get_mock_connection()
+        get_container_returns = [
+            (None, [{'name': 'test_o'}]),
+            (None, [])
+        ]
+        mock_conn.get_container = Mock(side_effect=get_container_returns)
+
+        expected_r = self._get_expected({
+            'action': 'list_container_part',
+            'container': 'test_c',
+            'success': True,
+            'listing': [{'name': 'test_o'}],
+            'marker': ''
+        })
+
+        SwiftService._list_container_job(
+            mock_conn, 'test_c', self.opts, mock_q
+        )
+        self._assertDictEqual(expected_r, self._get_queue(mock_q))
+        self.assertIsNone(self._get_queue(mock_q))
+
+        long_opts = dict(self.opts, **{'long': True})
+        mock_conn.head_container = Mock(return_value={'test_m': '1'})
+        get_container_returns = [
+            (None, [{'name': 'test_o'}]),
+            (None, [])
+        ]
+        mock_conn.get_container = Mock(side_effect=get_container_returns)
+
+        expected_r_long = self._get_expected({
+            'action': 'list_container_part',
+            'container': 'test_c',
+            'success': True,
+            'listing': [{'name': 'test_o'}],
+            'marker': ''
+        })
+
+        SwiftService._list_container_job(
+            mock_conn, 'test_c', long_opts, mock_q
+        )
+        self._assertDictEqual(expected_r_long, self._get_queue(mock_q))
+        self.assertIsNone(self._get_queue(mock_q))
+
+    def test_list_container_exception(self):
+        mock_q = Queue()
+        mock_conn = self._get_mock_connection()
+        mock_conn.get_container = Mock(side_effect=self.exc)
+        expected_r = self._get_expected({
+            'action': 'list_container_part',
+            'container': 'test_c',
+            'success': False,
+            'error': self.exc,
+            'marker': ''
+        })
+
+        SwiftService._list_container_job(
+            mock_conn, 'test_c', self.opts, mock_q
+        )
+
+        mock_conn.get_container.assert_called_once_with(
+            'test_c', marker='', delimiter='', prefix=None
+        )
+        self._assertDictEqual(expected_r, self._get_queue(mock_q))
+        self.assertIsNone(self._get_queue(mock_q))
+
+    @mock.patch('swiftclient.service.get_conn')
+    def test_list_queue_size(self, mock_get_conn):
+        mock_conn = self._get_mock_connection()
+        # Return more results than should fit in the results queue
+        get_account_returns = [
+            (None, [{'name': 'container1'}]),
+            (None, [{'name': 'container2'}]),
+            (None, [{'name': 'container3'}]),
+            (None, [{'name': 'container4'}]),
+            (None, [{'name': 'container5'}]),
+            (None, [{'name': 'container6'}]),
+            (None, [{'name': 'container7'}]),
+            (None, [{'name': 'container8'}]),
+            (None, [{'name': 'container9'}]),
+            (None, [{'name': 'container10'}]),
+            (None, [{'name': 'container11'}]),
+            (None, [{'name': 'container12'}]),
+            (None, [{'name': 'container13'}]),
+            (None, [{'name': 'container14'}]),
+            (None, [])
+        ]
+        mock_conn.get_account = Mock(side_effect=get_account_returns)
+        mock_get_conn.return_value = mock_conn
+
+        s = SwiftService(options=self.opts)
+        lg = s.list()
+
+        # Start the generator
+        first_list_part = next(lg)
+
+        # Wait for the number of calls to get_account to reach our expected
+        # value, then let it run some more to make sure the value remains
+        # stable
+        count = mock_conn.get_account.call_count
+        stable = 0
+        while mock_conn.get_account.call_count != count or stable < 5:
+            if mock_conn.get_account.call_count == count:
+                stable += 1
+            else:
+                count = mock_conn.get_account.call_count
+                stable = 0
+            # The test requires a small sleep to allow other threads to
+            # execute - in this mocked environment we assume that if the call
+            # count to get_account has not changed in 0.25s then no more calls
+            # will be made.
+            sleep(0.05)
+
+        stable_get_account_call_count = mock_conn.get_account.call_count
+
+        # Collect all remaining results from the generator
+        list_results = [first_list_part] + list(lg)
+
+        # Make sure the stable call count is correct - this should be 12 calls
+        # to get_account;
+        #  1 for first_list_part
+        #  10 for the values on the queue
+        #  1 for the value blocking whilst trying to place onto the queue
+        self.assertEqual(12, stable_get_account_call_count)
+
+        # Make sure all the containers were listed and placed onto the queue
+        self.assertEqual(15, mock_conn.get_account.call_count)
+
+        # Check the results were all returned
+        observed_listing = []
+        for lir in list_results:
+            observed_listing.append(
+                [li['name'] for li in lir['listing']]
+            )
+        expected_listing = []
+        for gar in get_account_returns[:-1]:  # The empty list is not returned
+            expected_listing.append(
+                [li['name'] for li in gar[1]]
+            )
+        self.assertEqual(observed_listing, expected_listing)
+
+
 class TestService(testtools.TestCase):
 
     def test_upload_with_bad_segment_size(self):
@@ -589,23 +820,7 @@ class TestService(testtools.TestCase):
                 self.assertEqual(upload_obj_resp['path'], obj['path'])
 
 
-class TestServiceUpload(testtools.TestCase):
-
-    def _assertDictEqual(self, a, b, m=None):
-        # assertDictEqual is not available in py2.6 so use a shallow check
-        # instead
-        if not m:
-            m = '{0} != {1}'.format(a, b)
-
-        if hasattr(self, 'assertDictEqual'):
-            self.assertDictEqual(a, b, m)
-        else:
-            self.assertIsInstance(a, dict, m)
-            self.assertIsInstance(b, dict, m)
-            self.assertEqual(len(a), len(b), m)
-            for k, v in a.items():
-                self.assertIn(k, b, m)
-                self.assertEqual(b[k], v, m)
+class TestServiceUpload(_TestServiceBase):
 
     def test_upload_segment_job(self):
         with tempfile.NamedTemporaryFile() as f:
@@ -1027,7 +1242,7 @@ class TestServiceUpload(testtools.TestCase):
             mock_conn.get_container.assert_has_calls(expected)
 
 
-class TestServiceDownload(testtools.TestCase):
+class TestServiceDownload(_TestServiceBase):
 
     def setUp(self):
         super(TestServiceDownload, self).setUp()
@@ -1036,6 +1251,19 @@ class TestServiceDownload(testtools.TestCase):
         self.obj_content = b'c' * 10
         self.obj_etag = md5(self.obj_content).hexdigest()
         self.obj_len = len(self.obj_content)
+        self.exc = Exception('test_exc')
+        # Base response to be copied and updated to matched the expected
+        # response for each test
+        self.expected = {
+            'action': 'download_object',   # Should always be download_object
+            'container': 'test_c',
+            'object': 'test_o',
+            'attempts': 2,
+            'response_dict': {},
+            'path': 'test_o',
+            'pseudodir': False,
+            'success': None   # Should be a bool
+        }
 
     def _readbody(self):
         yield self.obj_content
@@ -1056,6 +1284,166 @@ class TestServiceDownload(testtools.TestCase):
                 self.assertIn(k, b, m)
                 self.assertEqual(b[k], v, m)
 
+    @mock.patch('swiftclient.service.SwiftService.list')
+    @mock.patch('swiftclient.service.SwiftService._submit_page_downloads')
+    @mock.patch('swiftclient.service.interruptable_as_completed')
+    def test_download_container_job(self, as_comp, sub_page, service_list):
+        """
+        Check that paged downloads work correctly
+        """
+        as_comp.side_effect = [
+
+        ]
+        sub_page.side_effect = [
+            range(0, 10), range(0, 10), []  # simulate multiple result pages
+        ]
+        r = Mock(spec=Future)
+        r.result.return_value = self._get_expected({
+            'success': True,
+            'start_time': 1,
+            'finish_time': 2,
+            'headers_receipt': 3,
+            'auth_end_time': 4,
+            'read_length': len(b'objcontent'),
+        })
+        as_comp.side_effect = [
+            [r for _ in range(0, 10)],
+            [r for _ in range(0, 10)]
+        ]
+
+        s = SwiftService()
+        down_gen = s._download_container('test_c', self.opts)
+        results = list(down_gen)
+        self.assertEqual(20, len(results))
+
+    @mock.patch('swiftclient.service.SwiftService.list')
+    @mock.patch('swiftclient.service.SwiftService._submit_page_downloads')
+    @mock.patch('swiftclient.service.interruptable_as_completed')
+    def test_download_container_job_error(
+            self, as_comp, sub_page, service_list):
+        """
+        Check that paged downloads work correctly
+        """
+        class BoomError(Exception):
+            def __init__(self, value):
+                self.value = value
+
+            def __str__(self):
+                return repr(self.value)
+
+        def _make_result():
+            r = Mock(spec=Future)
+            r.result.return_value = self._get_expected({
+                'success': True,
+                'start_time': 1,
+                'finish_time': 2,
+                'headers_receipt': 3,
+                'auth_end_time': 4,
+                'read_length': len(b'objcontent'),
+            })
+            return r
+
+        as_comp.side_effect = [
+
+        ]
+        # We need Futures here because the error will cause a call to .cancel()
+        sub_page_effects = [
+            [_make_result() for _ in range(0, 10)],
+            BoomError('Go Boom')
+        ]
+        sub_page.side_effect = sub_page_effects
+        # ...but we must also mock the returns to as_completed
+        as_comp.side_effect = [
+            [_make_result() for _ in range(0, 10)]
+        ]
+
+        s = SwiftService()
+        self.assertRaises(
+            BoomError,
+            lambda: list(s._download_container('test_c', self.opts))
+        )
+        # This was an unknown error, so make sure we attempt to cancel futures
+        for spe in sub_page_effects[0]:
+            spe.cancel.assert_called_once_with()
+
+        # Now test ClientException
+        sub_page_effects = [
+            [_make_result() for _ in range(0, 10)],
+            ClientException('Go Boom')
+        ]
+        sub_page.side_effect = sub_page_effects
+        as_comp.side_effect = [
+            [_make_result() for _ in range(0, 10)],
+            [_make_result() for _ in range(0, 10)]
+        ]
+        self.assertRaises(
+            ClientException,
+            lambda: list(s._download_container('test_c', self.opts))
+        )
+        # This was a ClientException, so make sure we don't cancel futures
+        for spe in sub_page_effects[0]:
+            self.assertFalse(spe.cancel.called)
+
+    def test_download_object_job(self):
+        mock_conn = self._get_mock_connection()
+        objcontent = six.BytesIO(b'objcontent')
+        mock_conn.get_object.side_effect = [
+            ({'content-type': 'text/plain',
+              'etag': '2cbbfe139a744d6abbe695e17f3c1991'},
+             objcontent)
+        ]
+        expected_r = self._get_expected({
+            'success': True,
+            'start_time': 1,
+            'finish_time': 2,
+            'headers_receipt': 3,
+            'auth_end_time': 4,
+            'read_length': len(b'objcontent'),
+        })
+
+        with mock.patch.object(builtins, 'open') as mock_open:
+            written_content = Mock()
+            mock_open.return_value = written_content
+            s = SwiftService()
+            _opts = self.opts.copy()
+            _opts['no_download'] = False
+            actual_r = s._download_object_job(
+                mock_conn, 'test_c', 'test_o', _opts)
+            actual_r = dict(  # Need to override the times we got from the call
+                actual_r,
+                **{
+                    'start_time': 1,
+                    'finish_time': 2,
+                    'headers_receipt': 3
+                }
+            )
+            mock_open.assert_called_once_with('test_o', 'wb')
+            written_content.write.assert_called_once_with(b'objcontent')
+
+        mock_conn.get_object.assert_called_once_with(
+            'test_c', 'test_o', resp_chunk_size=65536, headers={},
+            response_dict={}
+        )
+        self._assertDictEqual(expected_r, actual_r)
+
+    def test_download_object_job_exception(self):
+        mock_conn = self._get_mock_connection()
+        mock_conn.get_object = Mock(side_effect=self.exc)
+        expected_r = self._get_expected({
+            'success': False,
+            'error': self.exc
+        })
+
+        s = SwiftService()
+        actual_r = s._download_object_job(
+            mock_conn, 'test_c', 'test_o', self.opts)
+
+        mock_conn.get_object.assert_called_once_with(
+            'test_c', 'test_o', resp_chunk_size=65536, headers={},
+            response_dict={}
+        )
+        self._assertDictEqual(expected_r, actual_r)
+
     def test_download(self):
         service = SwiftService()
         with mock.patch('swiftclient.service.Connection') as mock_conn:
diff --git a/tests/unit/test_shell.py b/tests/unit/test_shell.py
index 8384f7a7..5f9d4977 100644
--- a/tests/unit/test_shell.py
+++ b/tests/unit/test_shell.py
@@ -377,6 +377,62 @@ class TestShell(unittest.TestCase):
             swiftclient.shell.main(argv)
             self.assertEqual('objcontent', output.out)
 
+    @mock.patch('swiftclient.service.shuffle')
+    @mock.patch('swiftclient.service.Connection')
+    def test_download_shuffle(self, connection, mock_shuffle):
+        # Test that the container and object lists are shuffled
+        mock_shuffle.side_effect = lambda l: l
+        connection.return_value.get_object.return_value = [
+            {'content-type': 'text/plain',
+             'etag': EMPTY_ETAG},
+            '']
+
+        connection.return_value.get_container.side_effect = [
+            (None, [{'name': 'object'}]),
+            (None, [{'name': 'pseudo/'}]),
+            (None, []),
+        ]
+        connection.return_value.auth_end_time = 0
+        connection.return_value.attempts = 0
+        connection.return_value.get_account.side_effect = [
+            (None, [{'name': 'container'}]),
+            (None, [])
+        ]
+
+        with mock.patch(BUILTIN_OPEN) as mock_open:
+            argv = ["", "download", "--all"]
+            swiftclient.shell.main(argv)
+            self.assertEqual(3, mock_shuffle.call_count)
+            mock_shuffle.assert_any_call(['container'])
+            mock_shuffle.assert_any_call(['object'])
+            mock_shuffle.assert_any_call(['pseudo/'])
+            mock_open.assert_called_once_with('container/object', 'wb')
+
+        # Test that the container and object lists are not shuffled
+        mock_shuffle.reset_mock()
+        connection.return_value.get_object.return_value = [
+            {'content-type': 'text/plain',
+             'etag': 'd41d8cd98f00b204e9800998ecf8427e'},
+            '']
+
+        connection.return_value.get_container.side_effect = [
+            (None, [{'name': 'object'}]),
+            (None, [{'name': 'pseudo/'}]),
+            (None, []),
+        ]
+        connection.return_value.auth_end_time = 0
+        connection.return_value.attempts = 0
+        connection.return_value.get_account.side_effect = [
+            (None, [{'name': 'container'}]),
+            (None, [])
+        ]
+
+        with mock.patch(BUILTIN_OPEN) as mock_open:
+            argv = ["", "download", "--all", "--no-shuffle"]
+            swiftclient.shell.main(argv)
+            self.assertEqual(0, mock_shuffle.call_count)
+            mock_open.assert_called_once_with('container/object', 'wb')
+
     @mock.patch('swiftclient.service.Connection')
     def test_download_no_content_type(self, connection):
         connection.return_value.get_object.return_value = [