diff --git a/bin/swift b/bin/swift index b4440133..416b1839 100755 --- a/bin/swift +++ b/bin/swift @@ -25,7 +25,7 @@ from os.path import basename, dirname, getmtime, getsize, isdir, join from Queue import Queue from random import shuffle from sys import argv, exc_info, exit, stderr, stdout -from threading import enumerate as threading_enumerate, Thread +from threading import Thread from time import sleep, time, gmtime, strftime from traceback import format_exception from urllib import quote, unquote @@ -84,16 +84,6 @@ class StopWorkerThreadSignal(object): pass -def shutdown_worker_threads(queue, thread_list): - for thread in [t for t in thread_list if t.isAlive()]: - queue.put(StopWorkerThreadSignal()) - - -def immediate_exit(signum, frame): - stderr.write(" Aborted\n") - os_exit(2) - - class QueueFunctionThread(Thread): def __init__(self, queue, func, *args, **kwargs): @@ -128,6 +118,24 @@ class QueueFunctionThread(Thread): self.exc_infos.append(exc_info()) +def shutdown_worker_threads(queue, thread_list): + """ + Takes a job queue and a list of associated QueueFunctionThread objects, + puts a StopWorkerThreadSignal object into the queue, and waits for the + queue to flush. + """ + for thread in [t for t in thread_list if t.isAlive()]: + queue.put(StopWorkerThreadSignal()) + + while any(map(QueueFunctionThread.is_alive, thread_list)): + sleep(0.05) + + +def immediate_exit(signum, frame): + stderr.write(" Aborted\n") + os_exit(2) + + st_delete_help = ''' delete [options] --all OR delete container [options] [object] [object] ... Deletes everything in the account (with --all), or everything in a @@ -261,47 +269,52 @@ def st_delete(parser, args, print_queue, error_queue): for _junk in xrange(options.container_threads)] for thread in container_threads: thread.start() - if not args: - conn = create_connection() - try: - marker = '' - while True: - containers = \ - [c['name'] for c in conn.get_account(marker=marker)[1]] - if not containers: - break - for container in containers: - container_queue.put(container) - marker = containers[-1] - except ClientException as err: - if err.http_status != 404: - raise - error_queue.put('Account not found') - elif len(args) == 1: - if '/' in args[0]: - print >> stderr, 'WARNING: / in container name; you might have ' \ - 'meant %r instead of %r.' % \ - (args[0].replace('/', ' ', 1), args[0]) - conn = create_connection() - _delete_container(args[0], conn) - else: - for obj in args[1:]: - object_queue.put((args[0], obj)) - shutdown_worker_threads(container_queue, container_threads) - put_errors_from_threads(container_threads, error_queue) + try: + if not args: + conn = create_connection() + try: + marker = '' + while True: + containers = [ + c['name'] for c in conn.get_account(marker=marker)[1]] + if not containers: + break + for container in containers: + container_queue.put(container) + marker = containers[-1] + except ClientException as err: + if err.http_status != 404: + raise + error_queue.put('Account not found') + elif len(args) == 1: + if '/' in args[0]: + print >> stderr, 'WARNING: / in container name; you might ' \ + 'have meant %r instead of %r.' % ( + args[0].replace('/', ' ', 1), args[0]) + conn = create_connection() + _delete_container(args[0], conn) + else: + for obj in args[1:]: + object_queue.put((args[0], obj)) + finally: + shutdown_worker_threads(container_queue, container_threads) + put_errors_from_threads(container_threads, error_queue) - shutdown_worker_threads(object_queue, object_threads) - put_errors_from_threads(object_threads, error_queue) + shutdown_worker_threads(object_queue, object_threads) + put_errors_from_threads(object_threads, error_queue) st_download_help = ''' -download --all OR download container [options] [object] [object] ... - Downloads everything in the account (with --all), or everything in a - container, or a list of objects depending on the args given. For a single - object download, you may use the -o [--output] <filename> option to - redirect the output to a specific file or if "-" then just redirect to - stdout.'''.strip('\n') +download --all [options] OR download container [options] [object] [object] ... + Downloads everything in the account (with --all), or everything in all + containers in the account matching a prefix (with --all and -p [--prefix]), + or everything in a container, or a subset of a container with -p + [--prefix], or a list of objects depending on the args given. -p or + --prefix is an option that will only download items beginning with that + prefix. For a single object download, you may use the -o [--output] + <filename> option to redirect the output to a specific file or if "-" then + just redirect to stdout.'''.strip('\n') def st_download(parser, args, print_queue, error_queue): @@ -313,6 +326,9 @@ def st_download(parser, args, print_queue, error_queue): '-m', '--marker', dest='marker', default='', help='Marker to use when starting a container or ' 'account download') + parser.add_option( + '-p', '--prefix', dest='prefix', + help='Will only download items beginning with the prefix') parser.add_option( '-o', '--output', dest='out_file', help='For a single ' 'file download, stream the output to an alternate location ') @@ -426,12 +442,14 @@ def st_download(parser, args, print_queue, error_queue): container_queue = Queue(10000) - def _download_container(container, conn): + def _download_container(container, conn, prefix=None): try: marker = options.marker while True: - objects = [o['name'] for o in - conn.get_container(container, marker=marker)[1]] + objects = [ + o['name'] for o in + conn.get_container(container, marker=marker, + prefix=prefix)[1]] if not objects: break marker = objects[-1] @@ -455,42 +473,50 @@ def st_download(parser, args, print_queue, error_queue): for _junk in xrange(options.container_threads)] for thread in container_threads: thread.start() - if not args: - conn = create_connection() - try: - marker = options.marker - while True: - containers = [c['name'] - for c in conn.get_account(marker=marker)[1]] - if not containers: - break - marker = containers[-1] - shuffle(containers) - for container in containers: - container_queue.put(container) - except ClientException as err: - if err.http_status != 404: - raise - error_queue.put('Account not found') - elif len(args) == 1: - if '/' in args[0]: - print >> stderr, 'WARNING: / in container name; you might have ' \ - 'meant %r instead of %r.' % \ - (args[0].replace('/', ' ', 1), args[0]) - _download_container(args[0], create_connection()) - else: - if len(args) == 2: - obj = args[1] - object_queue.put((args[0], obj, options.out_file)) + + # We musn't let the main thread die with an exception while non-daemonic + # threads exist or the process with hang and ignore Ctrl-C. So we catch + # anything and tidy up the threads in a finally block. + try: + if not args: + # --all case + conn = create_connection() + try: + marker = options.marker + while True: + containers = [ + c['name'] for c in conn.get_account( + marker=marker, prefix=options.prefix)[1]] + if not containers: + break + marker = containers[-1] + shuffle(containers) + for container in containers: + container_queue.put(container) + except ClientException as err: + if err.http_status != 404: + raise + error_queue.put('Account not found') + elif len(args) == 1: + if '/' in args[0]: + print >> stderr, ('WARNING: / in container name; you might ' + 'have meant %r instead of %r.' % ( + args[0].replace('/', ' ', 1), args[0])) + _download_container(args[0], create_connection(), + options.prefix) else: - for obj in args[1:]: - object_queue.put((args[0], obj)) + if len(args) == 2: + obj = args[1] + object_queue.put((args[0], obj, options.out_file)) + else: + for obj in args[1:]: + object_queue.put((args[0], obj)) + finally: + shutdown_worker_threads(container_queue, container_threads) + put_errors_from_threads(container_threads, error_queue) - shutdown_worker_threads(container_queue, container_threads) - put_errors_from_threads(container_threads, error_queue) - - shutdown_worker_threads(object_queue, object_threads) - put_errors_from_threads(object_threads, error_queue) + shutdown_worker_threads(object_queue, object_threads) + put_errors_from_threads(object_threads, error_queue) def prt_bytes(bytes, human_flag): @@ -550,7 +576,7 @@ def st_list(parser, args, print_queue, error_queue): parser.add_option( '-d', '--delimiter', dest='delimiter', help='Will roll up items with the given delimiter' - ' (see Cloud Files general documentation for what this means)') + ' (see OpenStack Swift API documentation for what this means)') (options, args) = parse_args(parser, args) args = args[1:] if options.delimiter and not args: @@ -1002,34 +1028,37 @@ def st_upload(parser, args, print_queue, error_queue): for _junk in xrange(options.segment_threads)] for thread in segment_threads: thread.start() - segment = 0 - segment_start = 0 - while segment_start < full_size: - segment_size = int(options.segment_size) - if segment_start + segment_size > full_size: - segment_size = full_size - segment_start - if options.use_slo: - segment_name = '%s/slo/%s/%s/%s/%08d' % ( - obj, put_headers['x-object-meta-mtime'], - full_size, options.segment_size, segment) - else: - segment_name = '%s/%s/%s/%s/%08d' % ( - obj, put_headers['x-object-meta-mtime'], - full_size, options.segment_size, segment) - segment_queue.put( - {'path': path, 'obj': segment_name, - 'segment_start': segment_start, - 'segment_size': segment_size, - 'segment_index': segment, - 'log_line': '%s segment %s' % (obj, segment)}) - segment += 1 - segment_start += segment_size - shutdown_worker_threads(segment_queue, segment_threads) - if put_errors_from_threads(segment_threads, error_queue): - raise ClientException( - 'Aborting manifest creation ' - 'because not all segments could be uploaded. %s/%s' - % (container, obj)) + try: + segment = 0 + segment_start = 0 + while segment_start < full_size: + segment_size = int(options.segment_size) + if segment_start + segment_size > full_size: + segment_size = full_size - segment_start + if options.use_slo: + segment_name = '%s/slo/%s/%s/%s/%08d' % ( + obj, put_headers['x-object-meta-mtime'], + full_size, options.segment_size, segment) + else: + segment_name = '%s/%s/%s/%s/%08d' % ( + obj, put_headers['x-object-meta-mtime'], + full_size, options.segment_size, segment) + segment_queue.put( + {'path': path, 'obj': segment_name, + 'segment_start': segment_start, + 'segment_size': segment_size, + 'segment_index': segment, + 'log_line': '%s segment %s' % (obj, segment)}) + segment += 1 + segment_start += segment_size + finally: + shutdown_worker_threads(segment_queue, segment_threads) + if put_errors_from_threads(segment_threads, + error_queue): + raise ClientException( + 'Aborting manifest creation ' + 'because not all segments could be uploaded. ' + '%s/%s' % (container, obj)) if options.use_slo: slo_segments = [] for thread in segment_threads: @@ -1149,19 +1178,20 @@ def st_upload(parser, args, print_queue, error_queue): except Exception as err: error_queue.put( 'Error trying to create container %r: %s' % (args[0], err)) + try: for arg in args[1:]: if isdir(arg): _upload_dir(arg) else: object_queue.put({'path': arg}) - - shutdown_worker_threads(object_queue, object_threads) - put_errors_from_threads(object_threads, error_queue) except ClientException as err: if err.http_status != 404: raise error_queue.put('Account not found') + finally: + shutdown_worker_threads(object_queue, object_threads) + put_errors_from_threads(object_threads, error_queue) def split_headers(options, prefix='', error_queue=None): @@ -1395,7 +1425,7 @@ Examples: print item print_thread = QueueFunctionThread(print_queue, _print) - print_thread.setDaemon(True) + print_thread.start() error_count = 0 error_queue = Queue(10000) @@ -1408,7 +1438,7 @@ Examples: print >> stderr, item error_thread = QueueFunctionThread(error_queue, _error) - error_thread.setDaemon(True) + error_thread.start() parser.usage = globals()['st_%s_help' % args[0]] try: @@ -1416,18 +1446,9 @@ Examples: error_queue) except (ClientException, HTTPException, socket.error) as err: error_queue.put(str(err)) - - # Let other threads start working, now start print and error thread, - # this is to prevent the main thread shutdown two thread prematurely - print_thread.start() - error_thread.start() - - # If not all the worker threads have finished, then the main thread - # has to wait. Only when there are main, error and print thread left - # the main thread can proceed to finish up. - while (len(threading_enumerate()) > 3 or not error_queue.empty() or - not print_queue.empty()): - sleep(0.5) + finally: + shutdown_worker_threads(print_queue, [print_thread]) + shutdown_worker_threads(error_queue, [error_thread]) if error_count: exit(1)