Merge "Add -p option to download command."

This commit is contained in:
Jenkins 2013-07-05 17:16:34 +00:00 committed by Gerrit Code Review
commit 6f7458a290
1 changed files with 151 additions and 130 deletions

281
bin/swift
View File

@ -25,7 +25,7 @@ from os.path import basename, dirname, getmtime, getsize, isdir, join
from Queue import Queue from Queue import Queue
from random import shuffle from random import shuffle
from sys import argv, exc_info, exit, stderr, stdout from sys import argv, exc_info, exit, stderr, stdout
from threading import enumerate as threading_enumerate, Thread from threading import Thread
from time import sleep, time, gmtime, strftime from time import sleep, time, gmtime, strftime
from traceback import format_exception from traceback import format_exception
from urllib import quote, unquote from urllib import quote, unquote
@ -84,16 +84,6 @@ class StopWorkerThreadSignal(object):
pass pass
def shutdown_worker_threads(queue, thread_list):
for thread in [t for t in thread_list if t.isAlive()]:
queue.put(StopWorkerThreadSignal())
def immediate_exit(signum, frame):
stderr.write(" Aborted\n")
os_exit(2)
class QueueFunctionThread(Thread): class QueueFunctionThread(Thread):
def __init__(self, queue, func, *args, **kwargs): def __init__(self, queue, func, *args, **kwargs):
@ -128,6 +118,24 @@ class QueueFunctionThread(Thread):
self.exc_infos.append(exc_info()) self.exc_infos.append(exc_info())
def shutdown_worker_threads(queue, thread_list):
"""
Takes a job queue and a list of associated QueueFunctionThread objects,
puts a StopWorkerThreadSignal object into the queue, and waits for the
queue to flush.
"""
for thread in [t for t in thread_list if t.isAlive()]:
queue.put(StopWorkerThreadSignal())
while any(map(QueueFunctionThread.is_alive, thread_list)):
sleep(0.05)
def immediate_exit(signum, frame):
stderr.write(" Aborted\n")
os_exit(2)
st_delete_help = ''' st_delete_help = '''
delete [options] --all OR delete container [options] [object] [object] ... delete [options] --all OR delete container [options] [object] [object] ...
Deletes everything in the account (with --all), or everything in a Deletes everything in the account (with --all), or everything in a
@ -261,47 +269,52 @@ def st_delete(parser, args, print_queue, error_queue):
for _junk in xrange(options.container_threads)] for _junk in xrange(options.container_threads)]
for thread in container_threads: for thread in container_threads:
thread.start() thread.start()
if not args:
conn = create_connection()
try:
marker = ''
while True:
containers = \
[c['name'] for c in conn.get_account(marker=marker)[1]]
if not containers:
break
for container in containers:
container_queue.put(container)
marker = containers[-1]
except ClientException as err:
if err.http_status != 404:
raise
error_queue.put('Account not found')
elif len(args) == 1:
if '/' in args[0]:
print >> stderr, 'WARNING: / in container name; you might have ' \
'meant %r instead of %r.' % \
(args[0].replace('/', ' ', 1), args[0])
conn = create_connection()
_delete_container(args[0], conn)
else:
for obj in args[1:]:
object_queue.put((args[0], obj))
shutdown_worker_threads(container_queue, container_threads) try:
put_errors_from_threads(container_threads, error_queue) if not args:
conn = create_connection()
try:
marker = ''
while True:
containers = [
c['name'] for c in conn.get_account(marker=marker)[1]]
if not containers:
break
for container in containers:
container_queue.put(container)
marker = containers[-1]
except ClientException as err:
if err.http_status != 404:
raise
error_queue.put('Account not found')
elif len(args) == 1:
if '/' in args[0]:
print >> stderr, 'WARNING: / in container name; you might ' \
'have meant %r instead of %r.' % (
args[0].replace('/', ' ', 1), args[0])
conn = create_connection()
_delete_container(args[0], conn)
else:
for obj in args[1:]:
object_queue.put((args[0], obj))
finally:
shutdown_worker_threads(container_queue, container_threads)
put_errors_from_threads(container_threads, error_queue)
shutdown_worker_threads(object_queue, object_threads) shutdown_worker_threads(object_queue, object_threads)
put_errors_from_threads(object_threads, error_queue) put_errors_from_threads(object_threads, error_queue)
st_download_help = ''' st_download_help = '''
download --all OR download container [options] [object] [object] ... download --all [options] OR download container [options] [object] [object] ...
Downloads everything in the account (with --all), or everything in a Downloads everything in the account (with --all), or everything in all
container, or a list of objects depending on the args given. For a single containers in the account matching a prefix (with --all and -p [--prefix]),
object download, you may use the -o [--output] <filename> option to or everything in a container, or a subset of a container with -p
redirect the output to a specific file or if "-" then just redirect to [--prefix], or a list of objects depending on the args given. -p or
stdout.'''.strip('\n') --prefix is an option that will only download items beginning with that
prefix. For a single object download, you may use the -o [--output]
<filename> option to redirect the output to a specific file or if "-" then
just redirect to stdout.'''.strip('\n')
def st_download(parser, args, print_queue, error_queue): def st_download(parser, args, print_queue, error_queue):
@ -313,6 +326,9 @@ def st_download(parser, args, print_queue, error_queue):
'-m', '--marker', dest='marker', '-m', '--marker', dest='marker',
default='', help='Marker to use when starting a container or ' default='', help='Marker to use when starting a container or '
'account download') 'account download')
parser.add_option(
'-p', '--prefix', dest='prefix',
help='Will only download items beginning with the prefix')
parser.add_option( parser.add_option(
'-o', '--output', dest='out_file', help='For a single ' '-o', '--output', dest='out_file', help='For a single '
'file download, stream the output to an alternate location ') 'file download, stream the output to an alternate location ')
@ -426,12 +442,14 @@ def st_download(parser, args, print_queue, error_queue):
container_queue = Queue(10000) container_queue = Queue(10000)
def _download_container(container, conn): def _download_container(container, conn, prefix=None):
try: try:
marker = options.marker marker = options.marker
while True: while True:
objects = [o['name'] for o in objects = [
conn.get_container(container, marker=marker)[1]] o['name'] for o in
conn.get_container(container, marker=marker,
prefix=prefix)[1]]
if not objects: if not objects:
break break
marker = objects[-1] marker = objects[-1]
@ -455,42 +473,50 @@ def st_download(parser, args, print_queue, error_queue):
for _junk in xrange(options.container_threads)] for _junk in xrange(options.container_threads)]
for thread in container_threads: for thread in container_threads:
thread.start() thread.start()
if not args:
conn = create_connection() # We musn't let the main thread die with an exception while non-daemonic
try: # threads exist or the process with hang and ignore Ctrl-C. So we catch
marker = options.marker # anything and tidy up the threads in a finally block.
while True: try:
containers = [c['name'] if not args:
for c in conn.get_account(marker=marker)[1]] # --all case
if not containers: conn = create_connection()
break try:
marker = containers[-1] marker = options.marker
shuffle(containers) while True:
for container in containers: containers = [
container_queue.put(container) c['name'] for c in conn.get_account(
except ClientException as err: marker=marker, prefix=options.prefix)[1]]
if err.http_status != 404: if not containers:
raise break
error_queue.put('Account not found') marker = containers[-1]
elif len(args) == 1: shuffle(containers)
if '/' in args[0]: for container in containers:
print >> stderr, 'WARNING: / in container name; you might have ' \ container_queue.put(container)
'meant %r instead of %r.' % \ except ClientException as err:
(args[0].replace('/', ' ', 1), args[0]) if err.http_status != 404:
_download_container(args[0], create_connection()) raise
else: error_queue.put('Account not found')
if len(args) == 2: elif len(args) == 1:
obj = args[1] if '/' in args[0]:
object_queue.put((args[0], obj, options.out_file)) print >> stderr, ('WARNING: / in container name; you might '
'have meant %r instead of %r.' % (
args[0].replace('/', ' ', 1), args[0]))
_download_container(args[0], create_connection(),
options.prefix)
else: else:
for obj in args[1:]: if len(args) == 2:
object_queue.put((args[0], obj)) obj = args[1]
object_queue.put((args[0], obj, options.out_file))
else:
for obj in args[1:]:
object_queue.put((args[0], obj))
finally:
shutdown_worker_threads(container_queue, container_threads)
put_errors_from_threads(container_threads, error_queue)
shutdown_worker_threads(container_queue, container_threads) shutdown_worker_threads(object_queue, object_threads)
put_errors_from_threads(container_threads, error_queue) put_errors_from_threads(object_threads, error_queue)
shutdown_worker_threads(object_queue, object_threads)
put_errors_from_threads(object_threads, error_queue)
def prt_bytes(bytes, human_flag): def prt_bytes(bytes, human_flag):
@ -550,7 +576,7 @@ def st_list(parser, args, print_queue, error_queue):
parser.add_option( parser.add_option(
'-d', '--delimiter', dest='delimiter', '-d', '--delimiter', dest='delimiter',
help='Will roll up items with the given delimiter' help='Will roll up items with the given delimiter'
' (see Cloud Files general documentation for what this means)') ' (see OpenStack Swift API documentation for what this means)')
(options, args) = parse_args(parser, args) (options, args) = parse_args(parser, args)
args = args[1:] args = args[1:]
if options.delimiter and not args: if options.delimiter and not args:
@ -1002,34 +1028,37 @@ def st_upload(parser, args, print_queue, error_queue):
for _junk in xrange(options.segment_threads)] for _junk in xrange(options.segment_threads)]
for thread in segment_threads: for thread in segment_threads:
thread.start() thread.start()
segment = 0 try:
segment_start = 0 segment = 0
while segment_start < full_size: segment_start = 0
segment_size = int(options.segment_size) while segment_start < full_size:
if segment_start + segment_size > full_size: segment_size = int(options.segment_size)
segment_size = full_size - segment_start if segment_start + segment_size > full_size:
if options.use_slo: segment_size = full_size - segment_start
segment_name = '%s/slo/%s/%s/%s/%08d' % ( if options.use_slo:
obj, put_headers['x-object-meta-mtime'], segment_name = '%s/slo/%s/%s/%s/%08d' % (
full_size, options.segment_size, segment) obj, put_headers['x-object-meta-mtime'],
else: full_size, options.segment_size, segment)
segment_name = '%s/%s/%s/%s/%08d' % ( else:
obj, put_headers['x-object-meta-mtime'], segment_name = '%s/%s/%s/%s/%08d' % (
full_size, options.segment_size, segment) obj, put_headers['x-object-meta-mtime'],
segment_queue.put( full_size, options.segment_size, segment)
{'path': path, 'obj': segment_name, segment_queue.put(
'segment_start': segment_start, {'path': path, 'obj': segment_name,
'segment_size': segment_size, 'segment_start': segment_start,
'segment_index': segment, 'segment_size': segment_size,
'log_line': '%s segment %s' % (obj, segment)}) 'segment_index': segment,
segment += 1 'log_line': '%s segment %s' % (obj, segment)})
segment_start += segment_size segment += 1
shutdown_worker_threads(segment_queue, segment_threads) segment_start += segment_size
if put_errors_from_threads(segment_threads, error_queue): finally:
raise ClientException( shutdown_worker_threads(segment_queue, segment_threads)
'Aborting manifest creation ' if put_errors_from_threads(segment_threads,
'because not all segments could be uploaded. %s/%s' error_queue):
% (container, obj)) raise ClientException(
'Aborting manifest creation '
'because not all segments could be uploaded. '
'%s/%s' % (container, obj))
if options.use_slo: if options.use_slo:
slo_segments = [] slo_segments = []
for thread in segment_threads: for thread in segment_threads:
@ -1149,19 +1178,20 @@ def st_upload(parser, args, print_queue, error_queue):
except Exception as err: except Exception as err:
error_queue.put( error_queue.put(
'Error trying to create container %r: %s' % (args[0], err)) 'Error trying to create container %r: %s' % (args[0], err))
try: try:
for arg in args[1:]: for arg in args[1:]:
if isdir(arg): if isdir(arg):
_upload_dir(arg) _upload_dir(arg)
else: else:
object_queue.put({'path': arg}) object_queue.put({'path': arg})
shutdown_worker_threads(object_queue, object_threads)
put_errors_from_threads(object_threads, error_queue)
except ClientException as err: except ClientException as err:
if err.http_status != 404: if err.http_status != 404:
raise raise
error_queue.put('Account not found') error_queue.put('Account not found')
finally:
shutdown_worker_threads(object_queue, object_threads)
put_errors_from_threads(object_threads, error_queue)
def split_headers(options, prefix='', error_queue=None): def split_headers(options, prefix='', error_queue=None):
@ -1395,7 +1425,7 @@ Examples:
print item print item
print_thread = QueueFunctionThread(print_queue, _print) print_thread = QueueFunctionThread(print_queue, _print)
print_thread.setDaemon(True) print_thread.start()
error_count = 0 error_count = 0
error_queue = Queue(10000) error_queue = Queue(10000)
@ -1408,7 +1438,7 @@ Examples:
print >> stderr, item print >> stderr, item
error_thread = QueueFunctionThread(error_queue, _error) error_thread = QueueFunctionThread(error_queue, _error)
error_thread.setDaemon(True) error_thread.start()
parser.usage = globals()['st_%s_help' % args[0]] parser.usage = globals()['st_%s_help' % args[0]]
try: try:
@ -1416,18 +1446,9 @@ Examples:
error_queue) error_queue)
except (ClientException, HTTPException, socket.error) as err: except (ClientException, HTTPException, socket.error) as err:
error_queue.put(str(err)) error_queue.put(str(err))
finally:
# Let other threads start working, now start print and error thread, shutdown_worker_threads(print_queue, [print_thread])
# this is to prevent the main thread shutdown two thread prematurely shutdown_worker_threads(error_queue, [error_thread])
print_thread.start()
error_thread.start()
# If not all the worker threads have finished, then the main thread
# has to wait. Only when there are main, error and print thread left
# the main thread can proceed to finish up.
while (len(threading_enumerate()) > 3 or not error_queue.empty() or
not print_queue.empty()):
sleep(0.5)
if error_count: if error_count:
exit(1) exit(1)