From f6c7fec991939e02c78ba8f34069772027eb70b8 Mon Sep 17 00:00:00 2001
From: Samuel Merritt <sam@swiftstack.com>
Date: Tue, 12 Jun 2012 17:33:18 -0700
Subject: [PATCH] Configurable concurrency for swift client.

The 'delete', 'download', and 'upload' commands use multiple threads
for concurrency. However, the number of threads was hardcoded at
10. This patch simply makes those configurable.

For example, if I'm downloading a lot of files but I don't want to
saturate the downstream on my Internet connection, I might choose to
use only 1 or 2 threads for object downloads. Conversely, if I'm
uploading a lot of small files across a fast network, I would want
lots of threads to speed things along.

The default number of threads is 10, so the default behavior is
unchanged.

Change-Id: I64c06741b24ca97fef5ded206d7e898bf5cab3b8
---
 bin/swift | 44 +++++++++++++++++++++++++++++++++-----------
 1 file changed, 33 insertions(+), 11 deletions(-)

diff --git a/bin/swift b/bin/swift
index 059c376e..947ef1c2 100755
--- a/bin/swift
+++ b/bin/swift
@@ -100,7 +100,7 @@ class QueueFunctionThread(Thread):
                     self.queue.task_done()
 
 st_delete_help = '''
-delete --all OR delete container [--leave-segments] [object] [object] ...
+delete [options] --all OR delete container [options] [object] [object] ...
     Deletes everything in the account (with --all), or everything in a
     container, or a list of objects depending on the args given. Segments of
     manifest objects will be deleted as well, unless you specify the
@@ -114,6 +114,12 @@ def st_delete(parser, args, print_queue, error_queue):
     parser.add_option('', '--leave-segments', action='store_true',
         dest='leave_segments', default=False, help='Indicates that you want '
         'the segments of manifest objects left alone')
+    parser.add_option('', '--object-threads', type=int,
+                      default=10, help='Number of threads to use for '
+                      'deleting objects')
+    parser.add_option('', '--container-threads', type=int,
+                      default=10, help='Number of threads to use for '
+                      'deleting containers')
     (options, args) = parse_args(parser, args)
     args = args[1:]
     if (not args and not options.yes_all) or (args and options.yes_all):
@@ -152,7 +158,7 @@ def st_delete(parser, args, print_queue, error_queue):
                 if not segment_queue.empty():
                     segment_threads = [QueueFunctionThread(segment_queue,
                         _delete_segment, create_connection()) for _junk in
-                        xrange(10)]
+                        xrange(options.object_threads)]
                     for thread in segment_threads:
                         thread.start()
                     while not segment_queue.empty():
@@ -210,12 +216,15 @@ def st_delete(parser, args, print_queue, error_queue):
             error_queue.put('Container %s not found' % repr(container))
 
     create_connection = lambda: get_conn(options)
-    object_threads = [QueueFunctionThread(object_queue, _delete_object,
-        create_connection()) for _junk in xrange(10)]
+    object_threads = \
+        [QueueFunctionThread(object_queue, _delete_object, create_connection())
+         for _junk in xrange(options.object_threads)]
     for thread in object_threads:
         thread.start()
-    container_threads = [QueueFunctionThread(container_queue,
-        _delete_container, create_connection()) for _junk in xrange(10)]
+    container_threads = \
+        [QueueFunctionThread(container_queue, _delete_container,
+                             create_connection())
+         for _junk in xrange(options.container_threads)]
     for thread in container_threads:
         thread.start()
     if not args:
@@ -282,6 +291,12 @@ def st_download(parser, args, print_queue, error_queue):
         'account download')
     parser.add_option('-o', '--output', dest='out_file', help='For a single '
         'file download, stream the output to an alternate location ')
+    parser.add_option('', '--object-threads', type=int,
+                      default=10, help='Number of threads to use for '
+                      'downloading objects')
+    parser.add_option('', '--container-threads', type=int,
+                      default=10, help='Number of threads to use for '
+                      'listing containers')
     (options, args) = parse_args(parser, args)
     args = args[1:]
     if options.out_file == '-':
@@ -387,11 +402,12 @@ def st_download(parser, args, print_queue, error_queue):
 
     create_connection = lambda: get_conn(options)
     object_threads = [QueueFunctionThread(object_queue, _download_object,
-        create_connection()) for _junk in xrange(10)]
+        create_connection()) for _junk in xrange(options.object_threads)]
     for thread in object_threads:
         thread.start()
     container_threads = [QueueFunctionThread(container_queue,
-        _download_container, create_connection()) for _junk in xrange(10)]
+        _download_container, create_connection())
+        for _junk in xrange(options.container_threads)]
     for thread in container_threads:
         thread.start()
     if not args:
@@ -704,6 +720,12 @@ def st_upload(parser, args, print_queue, error_queue):
         dest='leave_segments', default=False, help='Indicates that you want '
         'the older segments of manifest objects left alone (in the case of '
         'overwrites)')
+    parser.add_option('', '--object-threads', type=int,
+                      default=10, help='Number of threads to use for '
+                      'uploading full objects')
+    parser.add_option('', '--segment-threads', type=int,
+                      default=10, help='Number of threads to use for '
+                      'uploading object segments')
     (options, args) = parse_args(parser, args)
     args = args[1:]
     if len(args) < 2:
@@ -781,7 +803,7 @@ def st_upload(parser, args, print_queue, error_queue):
                     segment_queue = Queue(10000)
                     segment_threads = [QueueFunctionThread(segment_queue,
                         _segment_job, create_connection()) for _junk in
-                        xrange(10)]
+                        xrange(options.segment_threads)]
                     for thread in segment_threads:
                         thread.start()
                     segment = 0
@@ -830,7 +852,7 @@ def st_upload(parser, args, print_queue, error_queue):
                     if not segment_queue.empty():
                         segment_threads = [QueueFunctionThread(segment_queue,
                             _segment_job, create_connection()) for _junk in
-                            xrange(10)]
+                            xrange(options.segment_threads)]
                         for thread in segment_threads:
                             thread.start()
                         while not segment_queue.empty():
@@ -865,7 +887,7 @@ def st_upload(parser, args, print_queue, error_queue):
 
     create_connection = lambda: get_conn(options)
     object_threads = [QueueFunctionThread(object_queue, _object_job,
-        create_connection()) for _junk in xrange(10)]
+        create_connection()) for _junk in xrange(options.object_threads)]
     for thread in object_threads:
         thread.start()
     conn = create_connection()