Configurable concurrency for swift client.

The 'delete', 'download', and 'upload' commands use multiple threads
for concurrency. However, the number of threads was hardcoded at
10. This patch simply makes those configurable.

For example, if I'm downloading a lot of files but I don't want to
saturate the downstream on my Internet connection, I might choose to
use only 1 or 2 threads for object downloads. Conversely, if I'm
uploading a lot of small files across a fast network, I would want
lots of threads to speed things along.

The default number of threads is 10, so the default behavior is
unchanged.

Change-Id: I64c06741b24ca97fef5ded206d7e898bf5cab3b8
This commit is contained in:
Samuel Merritt 2012-06-12 17:33:18 -07:00 committed by Samuel Merritt
parent 0519e58e2c
commit f6c7fec991

@ -100,7 +100,7 @@ class QueueFunctionThread(Thread):
self.queue.task_done()
st_delete_help = '''
delete --all OR delete container [--leave-segments] [object] [object] ...
delete [options] --all OR delete container [options] [object] [object] ...
Deletes everything in the account (with --all), or everything in a
container, or a list of objects depending on the args given. Segments of
manifest objects will be deleted as well, unless you specify the
@ -114,6 +114,12 @@ def st_delete(parser, args, print_queue, error_queue):
parser.add_option('', '--leave-segments', action='store_true',
dest='leave_segments', default=False, help='Indicates that you want '
'the segments of manifest objects left alone')
parser.add_option('', '--object-threads', type=int,
default=10, help='Number of threads to use for '
'deleting objects')
parser.add_option('', '--container-threads', type=int,
default=10, help='Number of threads to use for '
'deleting containers')
(options, args) = parse_args(parser, args)
args = args[1:]
if (not args and not options.yes_all) or (args and options.yes_all):
@ -152,7 +158,7 @@ def st_delete(parser, args, print_queue, error_queue):
if not segment_queue.empty():
segment_threads = [QueueFunctionThread(segment_queue,
_delete_segment, create_connection()) for _junk in
xrange(10)]
xrange(options.object_threads)]
for thread in segment_threads:
thread.start()
while not segment_queue.empty():
@ -210,12 +216,15 @@ def st_delete(parser, args, print_queue, error_queue):
error_queue.put('Container %s not found' % repr(container))
create_connection = lambda: get_conn(options)
object_threads = [QueueFunctionThread(object_queue, _delete_object,
create_connection()) for _junk in xrange(10)]
object_threads = \
[QueueFunctionThread(object_queue, _delete_object, create_connection())
for _junk in xrange(options.object_threads)]
for thread in object_threads:
thread.start()
container_threads = [QueueFunctionThread(container_queue,
_delete_container, create_connection()) for _junk in xrange(10)]
container_threads = \
[QueueFunctionThread(container_queue, _delete_container,
create_connection())
for _junk in xrange(options.container_threads)]
for thread in container_threads:
thread.start()
if not args:
@ -282,6 +291,12 @@ def st_download(parser, args, print_queue, error_queue):
'account download')
parser.add_option('-o', '--output', dest='out_file', help='For a single '
'file download, stream the output to an alternate location ')
parser.add_option('', '--object-threads', type=int,
default=10, help='Number of threads to use for '
'downloading objects')
parser.add_option('', '--container-threads', type=int,
default=10, help='Number of threads to use for '
'listing containers')
(options, args) = parse_args(parser, args)
args = args[1:]
if options.out_file == '-':
@ -387,11 +402,12 @@ def st_download(parser, args, print_queue, error_queue):
create_connection = lambda: get_conn(options)
object_threads = [QueueFunctionThread(object_queue, _download_object,
create_connection()) for _junk in xrange(10)]
create_connection()) for _junk in xrange(options.object_threads)]
for thread in object_threads:
thread.start()
container_threads = [QueueFunctionThread(container_queue,
_download_container, create_connection()) for _junk in xrange(10)]
_download_container, create_connection())
for _junk in xrange(options.container_threads)]
for thread in container_threads:
thread.start()
if not args:
@ -704,6 +720,12 @@ def st_upload(parser, args, print_queue, error_queue):
dest='leave_segments', default=False, help='Indicates that you want '
'the older segments of manifest objects left alone (in the case of '
'overwrites)')
parser.add_option('', '--object-threads', type=int,
default=10, help='Number of threads to use for '
'uploading full objects')
parser.add_option('', '--segment-threads', type=int,
default=10, help='Number of threads to use for '
'uploading object segments')
(options, args) = parse_args(parser, args)
args = args[1:]
if len(args) < 2:
@ -781,7 +803,7 @@ def st_upload(parser, args, print_queue, error_queue):
segment_queue = Queue(10000)
segment_threads = [QueueFunctionThread(segment_queue,
_segment_job, create_connection()) for _junk in
xrange(10)]
xrange(options.segment_threads)]
for thread in segment_threads:
thread.start()
segment = 0
@ -830,7 +852,7 @@ def st_upload(parser, args, print_queue, error_queue):
if not segment_queue.empty():
segment_threads = [QueueFunctionThread(segment_queue,
_segment_job, create_connection()) for _junk in
xrange(10)]
xrange(options.segment_threads)]
for thread in segment_threads:
thread.start()
while not segment_queue.empty():
@ -865,7 +887,7 @@ def st_upload(parser, args, print_queue, error_queue):
create_connection = lambda: get_conn(options)
object_threads = [QueueFunctionThread(object_queue, _object_job,
create_connection()) for _junk in xrange(10)]
create_connection()) for _junk in xrange(options.object_threads)]
for thread in object_threads:
thread.start()
conn = create_connection()