Move multi-threading code to a library.

This patch extracts the multi-threading code from bin/swift into
swiftclient/multithreading and adds tests.  In particular, this new way
of doing it (with context managers) will prevent non-daemonic threads
from wedging the process when unexpected exceptions happen.

I enabled reporting of which lines, specifically, are not covered by
unit tests (added -m option to "coverage report" in .unittests).

This patch includes a drive-by fix for uploading a segmented file with
--use-slo when that object already exists.  A key of "name" was used
instead of "path", raising KeyError.

There's also another drive-by fix for uploading segmented objects with
--use-slo.  Commit 874e0e4427b80e1b15b74a1557b73ba9d61443ca regressed
this by removing the capturing of thread-worker results in
QueueFunctionThread.run().  This patch restores that functionality and
the feature (uploading SLO objects).

Change-Id: I0b4f677e4a734e83d1a25088d9a74f7d46384e53
This commit is contained in:
Darrell Bishop 2013-06-26 22:47:49 -07:00
parent 5d9c6f845c
commit 9198e95468
9 changed files with 927 additions and 462 deletions

@ -3,6 +3,6 @@ set -e
python setup.py testr --coverage
RET=$?
coverage report
coverage report -m
rm -f .coverage
exit $RET

528
bin/swift

@ -22,12 +22,9 @@ from hashlib import md5
from optparse import OptionParser, SUPPRESS_HELP
from os import environ, listdir, makedirs, utime, _exit as os_exit
from os.path import basename, dirname, getmtime, getsize, isdir, join
from Queue import Queue
from random import shuffle
from sys import argv, exc_info, exit, stderr, stdout
from threading import Thread
from sys import argv, exit, stderr, stdout
from time import sleep, time, gmtime, strftime
from traceback import format_exception
from urllib import quote, unquote
try:
@ -35,7 +32,10 @@ try:
except ImportError:
import json
from swiftclient import Connection, ClientException, HTTPException, utils
from swiftclient import Connection, HTTPException
from swiftclient.utils import config_true_value
from swiftclient.multithreading import MultiThreadingManager
from swiftclient.exceptions import ClientException
from swiftclient.version import version_info
@ -63,75 +63,6 @@ def mkdirs(path):
raise
def put_errors_from_threads(threads, error_queue):
"""
Places any errors from the threads into error_queue.
:param threads: A list of QueueFunctionThread instances.
:param error_queue: A queue to put error strings into.
:returns: True if any errors were found.
"""
was_error = False
for thread in threads:
for info in thread.exc_infos:
was_error = True
if isinstance(info[1], ClientException):
error_queue.put(str(info[1]))
else:
error_queue.put(''.join(format_exception(*info)))
return was_error
class StopWorkerThreadSignal(object):
pass
class QueueFunctionThread(Thread):
def __init__(self, queue, func, *args, **kwargs):
"""
Calls func for each item in queue; func is called with a queued
item as the first arg followed by *args and **kwargs. Use the
PriorityQueue for sending quit signal when Ctrl-C is pressed.
"""
Thread.__init__(self)
self.queue = queue
self.func = func
self.args = args
self.kwargs = kwargs
self.exc_infos = []
self.results = []
self.store_results = kwargs.pop('store_results', False)
def run(self):
while True:
try:
item = self.queue.get()
if isinstance(item, StopWorkerThreadSignal):
break
except:
# This catch is important and it may occur when ctrl-C is
# pressed, in this case simply quit the thread
break
else:
try:
self.func(item, *self.args, **self.kwargs)
except Exception:
self.exc_infos.append(exc_info())
def shutdown_worker_threads(queue, thread_list):
"""
Takes a job queue and a list of associated QueueFunctionThread objects,
puts a StopWorkerThreadSignal object into the queue, and waits for the
queue to flush.
"""
for thread in [t for t in thread_list if t.isAlive()]:
queue.put(StopWorkerThreadSignal())
while any(map(QueueFunctionThread.is_alive, thread_list)):
sleep(0.05)
def immediate_exit(signum, frame):
stderr.write(" Aborted\n")
os_exit(2)
@ -145,7 +76,7 @@ delete [options] --all OR delete container [options] [object] [object] ...
--leave-segments option.'''.strip('\n')
def st_delete(parser, args, print_queue, error_queue):
def st_delete(parser, args, thread_manager):
parser.add_option(
'-a', '--all', action='store_true', dest='yes_all',
default=False, help='Indicates that you really want to delete '
@ -164,20 +95,19 @@ def st_delete(parser, args, print_queue, error_queue):
(options, args) = parse_args(parser, args)
args = args[1:]
if (not args and not options.yes_all) or (args and options.yes_all):
error_queue.put('Usage: %s [options] %s' %
(basename(argv[0]), st_delete_help))
thread_manager.error('Usage: %s [options] %s',
basename(argv[0]), st_delete_help)
return
def _delete_segment((container, obj), conn):
conn.delete_object(container, obj)
if options.verbose:
if conn.attempts > 2:
print_queue.put('%s/%s [after %d attempts]' %
(container, obj, conn.attempts))
thread_manager.print_msg(
'%s/%s [after %d attempts]', container,
obj, conn.attempts)
else:
print_queue.put('%s/%s' % (container, obj))
object_queue = Queue(10000)
thread_manager.print_msg('%s/%s', container, obj)
def _delete_object((container, obj), conn):
try:
@ -187,7 +117,7 @@ def st_delete(parser, args, print_queue, error_queue):
try:
headers = conn.head_object(container, obj)
old_manifest = headers.get('x-object-manifest')
if utils.config_true_value(
if config_true_value(
headers.get('x-static-large-object')):
query_string = 'multipart-manifest=delete'
except ClientException as err:
@ -195,7 +125,10 @@ def st_delete(parser, args, print_queue, error_queue):
raise
conn.delete_object(container, obj, query_string=query_string)
if old_manifest:
segment_queue = Queue(10000)
segment_manager = thread_manager.queue_manager(
_delete_segment, options.object_threads,
connection_maker=create_connection)
segment_queue = segment_manager.queue
scontainer, sprefix = old_manifest.split('/', 1)
scontainer = unquote(scontainer)
sprefix = unquote(sprefix).rstrip('/') + '/'
@ -203,32 +136,23 @@ def st_delete(parser, args, print_queue, error_queue):
prefix=sprefix)[1]:
segment_queue.put((scontainer, delobj['name']))
if not segment_queue.empty():
segment_threads = [QueueFunctionThread(
segment_queue,
_delete_segment, create_connection()) for _junk in
xrange(options.object_threads)]
for thread in segment_threads:
thread.start()
shutdown_worker_threads(segment_queue, segment_threads)
put_errors_from_threads(segment_threads, error_queue)
with segment_manager:
pass
if options.verbose:
path = options.yes_all and join(container, obj) or obj
if path[:1] in ('/', '\\'):
path = path[1:]
if conn.attempts > 1:
print_queue.put('%s [after %d attempts]' %
(path, conn.attempts))
thread_manager.print_msg('%s [after %d attempts]', path,
conn.attempts)
else:
print_queue.put(path)
thread_manager.print_msg(path)
except ClientException as err:
if err.http_status != 404:
raise
error_queue.put('Object %s not found' %
repr('%s/%s' % (container, obj)))
thread_manager.error("Object '%s/%s' not found", container, obj)
container_queue = Queue(10000)
def _delete_container(container, conn):
def _delete_container(container, conn, object_queue):
try:
marker = ''
while True:
@ -256,29 +180,25 @@ def st_delete(parser, args, print_queue, error_queue):
except ClientException as err:
if err.http_status != 404:
raise
error_queue.put('Container %s not found' % repr(container))
thread_manager.error('Container %r not found', container)
create_connection = lambda: get_conn(options)
object_threads = \
[QueueFunctionThread(object_queue, _delete_object, create_connection())
for _junk in xrange(options.object_threads)]
for thread in object_threads:
thread.start()
container_threads = \
[QueueFunctionThread(container_queue, _delete_container,
create_connection())
for _junk in xrange(options.container_threads)]
for thread in container_threads:
thread.start()
try:
obj_manager = thread_manager.queue_manager(
_delete_object, options.object_threads,
connection_maker=create_connection)
with obj_manager as object_queue:
cont_manager = thread_manager.queue_manager(
_delete_container, options.container_threads, object_queue,
connection_maker=create_connection)
with cont_manager as container_queue:
if not args:
conn = create_connection()
try:
marker = ''
while True:
containers = [
c['name'] for c in conn.get_account(marker=marker)[1]]
c['name']
for c in conn.get_account(marker=marker)[1]]
if not containers:
break
for container in containers:
@ -287,23 +207,16 @@ def st_delete(parser, args, print_queue, error_queue):
except ClientException as err:
if err.http_status != 404:
raise
error_queue.put('Account not found')
thread_manager.error('Account not found')
elif len(args) == 1:
if '/' in args[0]:
print >> stderr, 'WARNING: / in container name; you might ' \
'have meant %r instead of %r.' % (
print >> stderr, 'WARNING: / in container name; you '
'might have meant %r instead of %r.' % (
args[0].replace('/', ' ', 1), args[0])
conn = create_connection()
_delete_container(args[0], conn)
container_queue.put(args[0])
else:
for obj in args[1:]:
object_queue.put((args[0], obj))
finally:
shutdown_worker_threads(container_queue, container_threads)
put_errors_from_threads(container_threads, error_queue)
shutdown_worker_threads(object_queue, object_threads)
put_errors_from_threads(object_threads, error_queue)
st_download_help = '''
@ -318,7 +231,7 @@ download --all [options] OR download container [options] [object] [object] ...
just redirect to stdout.'''.strip('\n')
def st_download(parser, args, print_queue, error_queue):
def st_download(parser, args, thread_manager):
parser.add_option(
'-a', '--all', action='store_true', dest='yes_all',
default=False, help='Indicates that you really want to download '
@ -350,12 +263,10 @@ def st_download(parser, args, print_queue, error_queue):
if options.out_file and len(args) != 2:
exit('-o option only allowed for single file downloads')
if (not args and not options.yes_all) or (args and options.yes_all):
error_queue.put('Usage: %s [options] %s' %
(basename(argv[0]), st_download_help))
thread_manager.error('Usage: %s [options] %s', basename(argv[0]),
st_download_help)
return
object_queue = Queue(10000)
def _download_object(queue_arg, conn):
if len(queue_arg) == 2:
container, obj = queue_arg
@ -415,11 +326,12 @@ def st_download(parser, args, print_queue, error_queue):
if not options.no_download:
fp.close()
if md5sum and md5sum.hexdigest() != etag:
error_queue.put('%s: md5sum != etag, %s != %s' %
(path, md5sum.hexdigest(), etag))
thread_manager.error('%s: md5sum != etag, %s != %s',
path, md5sum.hexdigest(), etag)
if content_length is not None and read_length != content_length:
error_queue.put('%s: read_length != content_length, %d != %d' %
(path, read_length, content_length))
thread_manager.error(
'%s: read_length != content_length, %d != %d',
path, read_length, content_length)
if 'x-object-meta-mtime' in headers and not options.out_file \
and not options.no_download:
@ -431,19 +343,23 @@ def st_download(parser, args, print_queue, error_queue):
header_receipt - start_time, finish_time - start_time,
float(read_length) / (finish_time - start_time) / 1000000)
if conn.attempts > 1:
print_queue.put('%s [%s after %d attempts]' %
(path, time_str, conn.attempts))
thread_manager.print_msg('%s [%s after %d attempts]', path,
time_str, conn.attempts)
else:
print_queue.put('%s [%s]' % (path, time_str))
thread_manager.print_msg('%s [%s]', path, time_str)
except ClientException as err:
if err.http_status != 404:
raise
error_queue.put('Object %s not found' %
repr('%s/%s' % (container, obj)))
thread_manager.error("Object '%s/%s' not found", container, obj)
container_queue = Queue(10000)
def _download_container(container, conn, prefix=None):
def _download_container(queue_arg, conn):
if len(queue_arg) == 2:
container, object_queue = queue_arg
prefix = None
elif len(queue_arg) == 3:
container, object_queue, prefix = queue_arg
else:
raise Exception("Invalid queue_arg length of %s" % len(queue_arg))
try:
marker = options.marker
while True:
@ -460,25 +376,17 @@ def st_download(parser, args, print_queue, error_queue):
except ClientException as err:
if err.http_status != 404:
raise
error_queue.put('Container %s not found' % repr(container))
thread_manager.error('Container %r not found', container)
create_connection = lambda: get_conn(options)
object_threads = [QueueFunctionThread(
object_queue, _download_object,
create_connection()) for _junk in xrange(options.object_threads)]
for thread in object_threads:
thread.start()
container_threads = [QueueFunctionThread(
container_queue,
_download_container, create_connection())
for _junk in xrange(options.container_threads)]
for thread in container_threads:
thread.start()
# We musn't let the main thread die with an exception while non-daemonic
# threads exist or the process with hang and ignore Ctrl-C. So we catch
# anything and tidy up the threads in a finally block.
try:
obj_manager = thread_manager.queue_manager(
_download_object, options.object_threads,
connection_maker=create_connection)
with obj_manager as object_queue:
cont_manager = thread_manager.queue_manager(
_download_container, options.container_threads,
connection_maker=create_connection)
with cont_manager as container_queue:
if not args:
# --all case
conn = create_connection()
@ -493,18 +401,17 @@ def st_download(parser, args, print_queue, error_queue):
marker = containers[-1]
shuffle(containers)
for container in containers:
container_queue.put(container)
container_queue.put((container, object_queue))
except ClientException as err:
if err.http_status != 404:
raise
error_queue.put('Account not found')
thread_manager.error('Account not found')
elif len(args) == 1:
if '/' in args[0]:
print >> stderr, ('WARNING: / in container name; you might '
'have meant %r instead of %r.' % (
print >> stderr, ('WARNING: / in container name; you '
'might have meant %r instead of %r.' % (
args[0].replace('/', ' ', 1), args[0]))
_download_container(args[0], create_connection(),
options.prefix)
container_queue.put((args[0], object_queue, options.prefix))
else:
if len(args) == 2:
obj = args[1]
@ -512,12 +419,6 @@ def st_download(parser, args, print_queue, error_queue):
else:
for obj in args[1:]:
object_queue.put((args[0], obj))
finally:
shutdown_worker_threads(container_queue, container_threads)
put_errors_from_threads(container_threads, error_queue)
shutdown_worker_threads(object_queue, object_threads)
put_errors_from_threads(object_threads, error_queue)
def prt_bytes(bytes, human_flag):
@ -560,7 +461,7 @@ list [options] [container]
'''.strip('\n')
def st_list(parser, args, print_queue, error_queue):
def st_list(parser, args, thread_manager):
parser.add_option(
'-l', '--long', dest='long', help='Long listing '
'similar to ls -l command', action='store_true', default=False)
@ -583,8 +484,8 @@ def st_list(parser, args, print_queue, error_queue):
if options.delimiter and not args:
exit('-d option only allowed for container listings')
if len(args) > 1 or len(args) == 1 and args[0].find('/') >= 0:
error_queue.put('Usage: %s [options] %s' %
(basename(argv[0]), st_list_help))
thread_manager.error('Usage: %s [options] %s', basename(argv[0]),
st_list_help)
return
conn = get_conn(options)
@ -605,12 +506,13 @@ def st_list(parser, args, print_queue, error_queue):
item_name = item.get('name')
if not options.long and not options.human:
print_queue.put(item.get('name', item.get('subdir')))
thread_manager.print_msg(
item.get('name', item.get('subdir')))
else:
item_bytes = item.get('bytes')
total_bytes += item_bytes
if len(args) == 0: # listing containers
bytes = prt_bytes(item_bytes, options.human)
byte_str = prt_bytes(item_bytes, options.human)
count = item.get('count')
total_count += count
try:
@ -620,41 +522,42 @@ def st_list(parser, args, print_queue, error_queue):
except ClientException:
datestamp = '????-??-?? ??:??:??'
if not options.totals:
print_queue.put("%5s %s %s %s" %
(count, bytes, datestamp,
item_name))
thread_manager.print_msg("%5s %s %s %s", count,
byte_str, datestamp,
item_name)
else: # list container contents
subdir = item.get('subdir')
if subdir is None:
bytes = prt_bytes(item_bytes, options.human)
byte_str = prt_bytes(item_bytes, options.human)
date, xtime = item.get('last_modified').split('T')
xtime = xtime.split('.')[0]
else:
bytes = prt_bytes(0, options.human)
byte_str = prt_bytes(0, options.human)
date = xtime = ''
item_name = subdir
if not options.totals:
print_queue.put("%s %10s %8s %s" %
(bytes, date, xtime, item_name))
thread_manager.print_msg("%s %10s %8s %s",
byte_str, date, xtime,
item_name)
marker = items[-1].get('name', items[-1].get('subdir'))
# report totals
if options.long or options.human:
if len(args) == 0:
print_queue.put("%5s %s" % (prt_bytes(total_count, True),
prt_bytes(total_bytes,
options.human)))
thread_manager.print_msg(
"%5s %s", prt_bytes(total_count, True),
prt_bytes(total_bytes, options.human))
else:
print_queue.put("%s" % (prt_bytes(total_bytes, options.human)))
thread_manager.print_msg(prt_bytes(total_bytes, options.human))
except ClientException as err:
if err.http_status != 404:
raise
if not args:
error_queue.put('Account not found')
thread_manager.error('Account not found')
else:
error_queue.put('Container %s not found' % repr(args[0]))
thread_manager.error('Container %r not found', args[0])
st_stat_help = '''
stat [container] [object]
@ -663,7 +566,7 @@ stat [container] [object]
like 'list --lh' noting number of objs a multiple of 1024'''.strip('\n')
def st_stat(parser, args, print_queue, error_queue):
def st_stat(parser, args, thread_manager):
parser.add_option(
'--lh', dest='human', help="report totals like 'list --lh'",
action='store_true', default=False)
@ -674,36 +577,36 @@ def st_stat(parser, args, print_queue, error_queue):
try:
headers = conn.head_account()
if options.verbose > 1:
print_queue.put('''
thread_manager.print_msg('''
StorageURL: %s
Auth Token: %s
'''.strip('\n') % (conn.url, conn.token))
'''.strip('\n'), conn.url, conn.token)
container_count = int(headers.get('x-account-container-count', 0))
object_count = prt_bytes(headers.get('x-account-object-count', 0),
options.human).lstrip()
bytes_used = prt_bytes(headers.get('x-account-bytes-used', 0),
options.human).lstrip()
print_queue.put('''
thread_manager.print_msg('''
Account: %s
Containers: %d
Objects: %s
Bytes: %s'''.strip('\n') % (conn.url.rsplit('/', 1)[-1], container_count,
object_count, bytes_used))
Bytes: %s'''.strip('\n'), conn.url.rsplit('/', 1)[-1], container_count,
object_count, bytes_used)
for key, value in headers.items():
if key.startswith('x-account-meta-'):
print_queue.put(
'%10s: %s' % ('Meta %s' %
key[len('x-account-meta-'):].title(), value))
thread_manager.print_msg(
'%10s: %s',
'Meta %s' % key[len('x-account-meta-'):].title(),
value)
for key, value in headers.items():
if not key.startswith('x-account-meta-') and key not in (
'content-length', 'date', 'x-account-container-count',
'x-account-object-count', 'x-account-bytes-used'):
print_queue.put(
'%10s: %s' % (key.title(), value))
thread_manager.print_msg('%10s: %s', key.title(), value)
except ClientException as err:
if err.http_status != 404:
raise
error_queue.put('Account not found')
thread_manager.error('Account not found')
elif len(args) == 1:
if '/' in args[0]:
print >> stderr, 'WARNING: / in container name; you might have ' \
@ -716,7 +619,7 @@ Containers: %d
options.human).lstrip()
bytes_used = prt_bytes(headers.get('x-container-bytes-used', 0),
options.human).lstrip()
print_queue.put('''
thread_manager.print_msg('''
Account: %s
Container: %s
Objects: %s
@ -724,69 +627,68 @@ Container: %s
Read ACL: %s
Write ACL: %s
Sync To: %s
Sync Key: %s'''.strip('\n') % (conn.url.rsplit('/', 1)[-1], args[0],
Sync Key: %s'''.strip('\n'), conn.url.rsplit('/', 1)[-1], args[0],
object_count, bytes_used,
headers.get('x-container-read', ''),
headers.get('x-container-write', ''),
headers.get('x-container-sync-to', ''),
headers.get('x-container-sync-key', '')))
headers.get('x-container-sync-key', ''))
for key, value in headers.items():
if key.startswith('x-container-meta-'):
print_queue.put(
'%9s: %s' % ('Meta %s' %
key[len('x-container-meta-'):].title(), value))
thread_manager.print_msg(
'%9s: %s',
'Meta %s' % key[len('x-container-meta-'):].title(),
value)
for key, value in headers.items():
if not key.startswith('x-container-meta-') and key not in (
'content-length', 'date', 'x-container-object-count',
'x-container-bytes-used', 'x-container-read',
'x-container-write', 'x-container-sync-to',
'x-container-sync-key'):
print_queue.put(
'%9s: %s' % (key.title(), value))
thread_manager.print_msg('%9s: %s', key.title(), value)
except ClientException as err:
if err.http_status != 404:
raise
error_queue.put('Container %s not found' % repr(args[0]))
thread_manager.error('Container %r not found', args[0])
elif len(args) == 2:
try:
headers = conn.head_object(args[0], args[1])
print_queue.put('''
thread_manager.print_msg('''
Account: %s
Container: %s
Object: %s
Content Type: %s'''.strip('\n') % (conn.url.rsplit('/', 1)[-1], args[0],
args[1], headers.get('content-type')))
Content Type: %s'''.strip('\n'), conn.url.rsplit('/', 1)[-1], args[0],
args[1], headers.get('content-type'))
if 'content-length' in headers:
print_queue.put('Content Length: %s' %
thread_manager.print_msg('Content Length: %s',
prt_bytes(headers['content-length'],
options.human).lstrip())
if 'last-modified' in headers:
print_queue.put(' Last Modified: %s' %
thread_manager.print_msg(' Last Modified: %s',
headers['last-modified'])
if 'etag' in headers:
print_queue.put(' ETag: %s' % headers['etag'])
thread_manager.print_msg(' ETag: %s', headers['etag'])
if 'x-object-manifest' in headers:
print_queue.put(' Manifest: %s' %
thread_manager.print_msg(' Manifest: %s',
headers['x-object-manifest'])
for key, value in headers.items():
if key.startswith('x-object-meta-'):
print_queue.put(
'%14s: %s' % ('Meta %s' %
key[len('x-object-meta-'):].title(), value))
thread_manager.print_msg(
'%14s: %s',
'Meta %s' % key[len('x-object-meta-'):].title(),
value)
for key, value in headers.items():
if not key.startswith('x-object-meta-') and key not in (
'content-type', 'content-length', 'last-modified',
'etag', 'date', 'x-object-manifest'):
print_queue.put(
'%14s: %s' % (key.title(), value))
thread_manager.print_msg('%14s: %s', key.title(), value)
except ClientException as err:
if err.http_status != 404:
raise
error_queue.put('Object %s not found' %
repr('%s/%s' % (args[0], args[1])))
thread_manager.error("Object %s/%s not found", args[0], args[1])
else:
error_queue.put('Usage: %s [options] %s' %
(basename(argv[0]), st_stat_help))
thread_manager.error('Usage: %s [options] %s', basename(argv[0]),
st_stat_help)
st_post_help = '''
@ -800,7 +702,7 @@ post [options] [container] [object]
post -m Color:Blue -m Size:Large'''.strip('\n')
def st_post(parser, args, print_queue, error_queue):
def st_post(parser, args, thread_manager):
parser.add_option(
'-r', '--read-acl', dest='read_acl', help='Sets the '
'Read ACL for containers. Quick summary of ACL syntax: .r:*, '
@ -831,19 +733,21 @@ def st_post(parser, args, print_queue, error_queue):
exit('-r, -w, -t, and -k options only allowed for containers')
conn = get_conn(options)
if not args:
headers = split_headers(options.meta, 'X-Account-Meta-', error_queue)
headers = split_headers(
options.meta, 'X-Account-Meta-', thread_manager)
try:
conn.post_account(headers=headers)
except ClientException as err:
if err.http_status != 404:
raise
error_queue.put('Account not found')
thread_manager.error('Account not found')
elif len(args) == 1:
if '/' in args[0]:
print >> stderr, 'WARNING: / in container name; you might have ' \
'meant %r instead of %r.' % \
(args[0].replace('/', ' ', 1), args[0])
headers = split_headers(options.meta, 'X-Container-Meta-', error_queue)
headers = split_headers(options.meta, 'X-Container-Meta-',
thread_manager)
if options.read_acl is not None:
headers['X-Container-Read'] = options.read_acl
if options.write_acl is not None:
@ -859,19 +763,18 @@ def st_post(parser, args, print_queue, error_queue):
raise
conn.put_container(args[0], headers=headers)
elif len(args) == 2:
headers = split_headers(options.meta, 'X-Object-Meta-', error_queue)
headers = split_headers(options.meta, 'X-Object-Meta-', thread_manager)
# add header options to the headers object for the request.
headers.update(split_headers(options.header, '', error_queue))
headers.update(split_headers(options.header, '', thread_manager))
try:
conn.post_object(args[0], args[1], headers=headers)
except ClientException as err:
if err.http_status != 404:
raise
error_queue.put('Object %s not found' %
repr('%s/%s' % (args[0], args[1])))
thread_manager.error("Object '%s/%s' not found", args[0], args[1])
else:
error_queue.put('Usage: %s [options] %s' %
(basename(argv[0]), st_post_help))
thread_manager.error('Usage: %s [options] %s', basename(argv[0]),
st_post_help)
st_upload_help = '''
@ -885,7 +788,7 @@ upload [options] container file_or_directory [file_or_directory] [...]
'''.strip('\n')
def st_upload(parser, args, print_queue, error_queue):
def st_upload(parser, args, thread_manager):
parser.add_option(
'-c', '--changed', action='store_true', dest='changed',
default=False, help='Will only upload files that have changed since '
@ -924,10 +827,9 @@ def st_upload(parser, args, print_queue, error_queue):
(options, args) = parse_args(parser, args)
args = args[1:]
if len(args) < 2:
error_queue.put('Usage: %s [options] %s' %
(basename(argv[0]), st_upload_help))
thread_manager.error(
'Usage: %s [options] %s', basename(argv[0]), st_upload_help)
return
object_queue = Queue(10000)
def _segment_job(job, conn):
if job.get('delete', False):
@ -945,10 +847,10 @@ def st_upload(parser, args, print_queue, error_queue):
job['segment_etag'] = etag
if options.verbose and 'log_line' in job:
if conn.attempts > 1:
print_queue.put('%s [after %d attempts]' %
(job['log_line'], conn.attempts))
thread_manager.print_msg('%s [after %d attempts]',
job['log_line'], conn.attempts)
else:
print_queue.put(job['log_line'])
thread_manager.print_msg(job['log_line'])
return job
def _object_job(job, conn):
@ -998,13 +900,13 @@ def st_upload(parser, args, print_queue, error_queue):
return
if not options.leave_segments:
old_manifest = headers.get('x-object-manifest')
if utils.config_true_value(
if config_true_value(
headers.get('x-static-large-object')):
headers, manifest_data = conn.get_object(
container, obj,
query_string='multipart-manifest=get')
for old_seg in json.loads(manifest_data):
seg_path = old_seg['name'].lstrip('/')
seg_path = old_seg['path'].lstrip('/')
if isinstance(seg_path, unicode):
seg_path = seg_path.encode('utf-8')
old_slo_manifest_paths.append(seg_path)
@ -1013,7 +915,7 @@ def st_upload(parser, args, print_queue, error_queue):
raise
# Merge the command line header options to the put_headers
put_headers.update(split_headers(options.header, '',
error_queue))
thread_manager))
# Don't do segment job if object is not big enough
if options.segment_size and \
getsize(path) > int(options.segment_size):
@ -1021,15 +923,15 @@ def st_upload(parser, args, print_queue, error_queue):
if options.segment_container:
seg_container = options.segment_container
full_size = getsize(path)
segment_queue = Queue(10000)
segment_threads = [
QueueFunctionThread(
segment_queue, _segment_job,
create_connection(), store_results=True)
for _junk in xrange(options.segment_threads)]
for thread in segment_threads:
thread.start()
try:
slo_segments = []
error_counter = [0]
segment_manager = thread_manager.queue_manager(
_segment_job, options.segment_threads,
store_results=slo_segments,
error_counter=error_counter,
connection_maker=create_connection)
with segment_manager as segment_queue:
segment = 0
segment_start = 0
while segment_start < full_size:
@ -1052,18 +954,12 @@ def st_upload(parser, args, print_queue, error_queue):
'log_line': '%s segment %s' % (obj, segment)})
segment += 1
segment_start += segment_size
finally:
shutdown_worker_threads(segment_queue, segment_threads)
if put_errors_from_threads(segment_threads,
error_queue):
if error_counter[0]:
raise ClientException(
'Aborting manifest creation '
'because not all segments could be uploaded. '
'%s/%s' % (container, obj))
'because not all segments could be uploaded. %s/%s'
% (container, obj))
if options.use_slo:
slo_segments = []
for thread in segment_threads:
slo_segments += thread.results
slo_segments.sort(key=lambda d: d['segment_index'])
for seg in slo_segments:
seg_loc = seg['segment_location'].lstrip('/')
@ -1097,7 +993,10 @@ def st_upload(parser, args, print_queue, error_queue):
container, obj, open(path, 'rb'),
content_length=getsize(path), headers=put_headers)
if old_manifest or old_slo_manifest_paths:
segment_queue = Queue(10000)
segment_manager = thread_manager.queue_manager(
_segment_job, options.segment_threads,
connection_maker=create_connection)
segment_queue = segment_manager.queue
if old_manifest:
scontainer, sprefix = old_manifest.split('/', 1)
scontainer = unquote(scontainer)
@ -1118,27 +1017,20 @@ def st_upload(parser, args, print_queue, error_queue):
{'delete': True,
'container': scont, 'obj': sobj})
if not segment_queue.empty():
segment_threads = [
QueueFunctionThread(
segment_queue,
_segment_job, create_connection())
for _junk in xrange(options.segment_threads)]
for thread in segment_threads:
thread.start()
shutdown_worker_threads(segment_queue, segment_threads)
put_errors_from_threads(segment_threads, error_queue)
with segment_manager:
pass
if options.verbose:
if conn.attempts > 1:
print_queue.put(
'%s [after %d attempts]' % (obj, conn.attempts))
thread_manager.print_msg('%s [after %d attempts]', obj,
conn.attempts)
else:
print_queue.put(obj)
thread_manager.print_msg(obj)
except OSError as err:
if err.errno != ENOENT:
raise
error_queue.put('Local file %s not found' % repr(path))
thread_manager.error('Local file %r not found', path)
def _upload_dir(path):
def _upload_dir(path, object_queue):
names = listdir(path)
if not names:
object_queue.put({'path': path, 'dir_marker': True})
@ -1146,17 +1038,13 @@ def st_upload(parser, args, print_queue, error_queue):
for name in listdir(path):
subpath = join(path, name)
if isdir(subpath):
_upload_dir(subpath)
_upload_dir(subpath, object_queue)
else:
object_queue.put({'path': subpath})
create_connection = lambda: get_conn(options)
object_threads = [
QueueFunctionThread(object_queue, _object_job, create_connection())
for _junk in xrange(options.object_threads)]
for thread in object_threads:
thread.start()
conn = create_connection()
# Try to create the container, just in case it doesn't exist. If this
# fails, it might just be because the user doesn't have container PUT
# permissions, so we'll ignore any error. If there's really a problem,
@ -1174,34 +1062,38 @@ def st_upload(parser, args, print_queue, error_queue):
if msg:
msg += ': '
msg += err.http_response_content[:60]
error_queue.put(
'Error trying to create container %r: %s' % (args[0], msg))
thread_manager.error(
'Error trying to create container %r: %s', args[0],
msg)
except Exception as err:
error_queue.put(
'Error trying to create container %r: %s' % (args[0], err))
thread_manager.error(
'Error trying to create container %r: %s', args[0],
err)
object_manager = thread_manager.queue_manager(
_object_job, options.object_threads,
connection_maker=create_connection)
with object_manager as object_queue:
try:
for arg in args[1:]:
if isdir(arg):
_upload_dir(arg)
_upload_dir(arg, object_queue)
else:
object_queue.put({'path': arg})
except ClientException as err:
if err.http_status != 404:
raise
error_queue.put('Account not found')
finally:
shutdown_worker_threads(object_queue, object_threads)
put_errors_from_threads(object_threads, error_queue)
thread_manager.error('Account not found')
def split_headers(options, prefix='', error_queue=None):
def split_headers(options, prefix='', thread_manager=None):
"""
Splits 'Key: Value' strings and returns them as a dictionary.
:param options: An array of 'Key: Value' strings
:param prefix: String to prepend to all of the keys in the dictionary.
:param error_queue: Queue for thread safe error reporting.
:param thread_manager: MultiThreadingManager for thread safe error
reporting.
"""
headers = {}
for item in options:
@ -1211,8 +1103,8 @@ def split_headers(options, prefix='', error_queue=None):
else:
error_string = "Metadata parameter %s must contain a ':'.\n%s" \
% (item, st_post_help)
if error_queue:
error_queue.put(error_string)
if thread_manager:
thread_manager.error(error_string)
else:
exit(error_string)
return headers
@ -1391,7 +1283,7 @@ Examples:
help='Specify a CA bundle file to use in verifying a '
'TLS (https) server certificate. '
'Defaults to env[OS_CACERT]')
default_val = utils.config_true_value(environ.get('SWIFTCLIENT_INSECURE'))
default_val = config_true_value(environ.get('SWIFTCLIENT_INSECURE'))
parser.add_option('--insecure',
action="store_true", dest="insecure",
default=default_val,
@ -1422,38 +1314,16 @@ Examples:
logger = logging.getLogger("swiftclient")
logging.basicConfig(level=logging.DEBUG)
print_queue = Queue(10000)
def _print(item):
if isinstance(item, unicode):
item = item.encode('utf8')
print item
print_thread = QueueFunctionThread(print_queue, _print)
print_thread.start()
error_count = 0
error_queue = Queue(10000)
def _error(item):
global error_count
error_count += 1
if isinstance(item, unicode):
item = item.encode('utf8')
print >> stderr, item
error_thread = QueueFunctionThread(error_queue, _error)
error_thread.start()
had_error = False
with MultiThreadingManager() as thread_manager:
parser.usage = globals()['st_%s_help' % args[0]]
try:
globals()['st_%s' % args[0]](parser, argv[1:], print_queue,
error_queue)
globals()['st_%s' % args[0]](parser, argv[1:], thread_manager)
except (ClientException, HTTPException, socket.error) as err:
error_queue.put(str(err))
finally:
shutdown_worker_threads(print_queue, [print_thread])
shutdown_worker_threads(error_queue, [error_thread])
thread_manager.error(str(err))
if error_count:
had_error = thread_manager.error_count
if had_error:
exit(1)

@ -15,8 +15,6 @@
import sys
import os
import swiftclient
# If extensions (or modules to document with autodoc) are in another directory,
# add these directories to sys.path here. If the directory is relative to the
@ -36,6 +34,9 @@ sys.path.insert(0, ROOT)
extensions = ['sphinx.ext.autodoc', 'sphinx.ext.doctest', 'sphinx.ext.todo',
'sphinx.ext.coverage']
autoclass_content = 'both'
autodoc_default_flags = ['members', 'undoc-members', 'show-inheritance']
# Add any paths that contain templates here, relative to this directory.
templates_path = ['_templates']
@ -50,7 +51,7 @@ master_doc = 'index'
# General information about the project.
project = u'Swiftclient'
copyright = u'2012 OpenStack, LLC.'
copyright = u'2013 OpenStack, LLC.'
# The version info for the project you're documenting, acts as replacement for
# |version| and |release|, also used in various other places throughout the

@ -4,14 +4,18 @@ swiftclient
==============
.. automodule:: swiftclient
:members:
:undoc-members:
:show-inheritance:
swiftclient.client
==================
.. automodule:: swiftclient.client
:members:
:undoc-members:
:show-inheritance:
swiftclient.exceptions
======================
.. automodule:: swiftclient.exceptions
swiftclient.multithreading
==========================
.. automodule:: swiftclient.multithreading

@ -28,6 +28,8 @@ from urlparse import urlparse, urlunparse
from httplib import HTTPException, HTTPConnection, HTTPSConnection
from time import sleep
from swiftclient.exceptions import ClientException, InvalidHeadersException
try:
from swiftclient.https_connection import HTTPSConnectionNoSSLComp
except ImportError:
@ -102,64 +104,6 @@ except ImportError:
from json import loads as json_loads
class InvalidHeadersException(Exception):
pass
class ClientException(Exception):
def __init__(self, msg, http_scheme='', http_host='', http_port='',
http_path='', http_query='', http_status=0, http_reason='',
http_device='', http_response_content=''):
Exception.__init__(self, msg)
self.msg = msg
self.http_scheme = http_scheme
self.http_host = http_host
self.http_port = http_port
self.http_path = http_path
self.http_query = http_query
self.http_status = http_status
self.http_reason = http_reason
self.http_device = http_device
self.http_response_content = http_response_content
def __str__(self):
a = self.msg
b = ''
if self.http_scheme:
b += '%s://' % self.http_scheme
if self.http_host:
b += self.http_host
if self.http_port:
b += ':%s' % self.http_port
if self.http_path:
b += self.http_path
if self.http_query:
b += '?%s' % self.http_query
if self.http_status:
if b:
b = '%s %s' % (b, self.http_status)
else:
b = str(self.http_status)
if self.http_reason:
if b:
b = '%s %s' % (b, self.http_reason)
else:
b = '- %s' % self.http_reason
if self.http_device:
if b:
b = '%s: device %s' % (b, self.http_device)
else:
b = 'device %s' % self.http_device
if self.http_response_content:
if len(self.http_response_content) <= 60:
b += ' %s' % self.http_response_content
else:
b += ' [first 60 chars of response] %s' \
% self.http_response_content[:60]
return b and '%s: %s' % (a, b) or a
def http_connection(url, proxy=None, ssl_compression=True):
"""
Make an HTTPConnection or HTTPSConnection

72
swiftclient/exceptions.py Normal file

@ -0,0 +1,72 @@
# Copyright (c) 2010-2013 OpenStack, LLC.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
class ClientException(Exception):
def __init__(self, msg, http_scheme='', http_host='', http_port='',
http_path='', http_query='', http_status=0, http_reason='',
http_device='', http_response_content=''):
Exception.__init__(self, msg)
self.msg = msg
self.http_scheme = http_scheme
self.http_host = http_host
self.http_port = http_port
self.http_path = http_path
self.http_query = http_query
self.http_status = http_status
self.http_reason = http_reason
self.http_device = http_device
self.http_response_content = http_response_content
def __str__(self):
a = self.msg
b = ''
if self.http_scheme:
b += '%s://' % self.http_scheme
if self.http_host:
b += self.http_host
if self.http_port:
b += ':%s' % self.http_port
if self.http_path:
b += self.http_path
if self.http_query:
b += '?%s' % self.http_query
if self.http_status:
if b:
b = '%s %s' % (b, self.http_status)
else:
b = str(self.http_status)
if self.http_reason:
if b:
b = '%s %s' % (b, self.http_reason)
else:
b = '- %s' % self.http_reason
if self.http_device:
if b:
b = '%s: device %s' % (b, self.http_device)
else:
b = 'device %s' % self.http_device
if self.http_response_content:
if len(self.http_response_content) <= 60:
b += ' %s' % self.http_response_content
else:
b += ' [first 60 chars of response] %s' \
% self.http_response_content[:60]
return b and '%s: %s' % (a, b) or a
class InvalidHeadersException(Exception):
pass

@ -0,0 +1,241 @@
# Copyright (c) 2010-2012 OpenStack, LLC.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import sys
from time import sleep
from Queue import Queue
from threading import Thread
from traceback import format_exception
from swiftclient.exceptions import ClientException
class StopWorkerThreadSignal(object):
pass
class QueueFunctionThread(Thread):
"""
Calls `func`` for each item in ``queue``; ``func`` is called with a
de-queued item as the first arg followed by ``*args`` and ``**kwargs``.
Any exceptions raised by ``func`` are stored in :attr:`self.exc_infos`.
If the optional kwarg ``store_results`` is specified, it must be a list and
each result of invoking ``func`` will be appended to that list.
Putting a :class:`StopWorkerThreadSignal` instance into queue will cause
this thread to exit.
"""
def __init__(self, queue, func, *args, **kwargs):
"""
:param queue: A :class:`Queue` object from which work jobs will be
pulled.
:param func: A callable which will be invoked with a dequeued item
followed by ``*args`` and ``**kwargs``.
:param \*args: Optional positional arguments for ``func``.
:param \*\*kwargs: Optional kwargs for func. If the kwarg
``store_results`` is specified, its value must be a
list, and every result from invoking ``func`` will
be appended to the supplied list. The kwarg
``store_results`` will not be passed into ``func``.
"""
Thread.__init__(self)
self.queue = queue
self.func = func
self.args = args
self.kwargs = kwargs
self.exc_infos = []
self.store_results = kwargs.pop('store_results', None)
def run(self):
while True:
item = self.queue.get()
if isinstance(item, StopWorkerThreadSignal):
break
try:
result = self.func(item, *self.args, **self.kwargs)
if self.store_results is not None:
self.store_results.append(result)
except Exception:
self.exc_infos.append(sys.exc_info())
class QueueFunctionManager(object):
"""
A context manager to handle the life-cycle of a single :class:`Queue`
and a list of associated :class:`QueueFunctionThread` instances.
This class is not usually instantiated directly. Instead, call the
:meth:`MultiThreadingManager.queue_manager` object method,
which will return an instance of this class.
When entering the context, ``thread_count`` :class:`QueueFunctionThread`
instances are created and started. The input queue is returned. Inside
the context, any work item put into the queue will get worked on by one of
the :class:`QueueFunctionThread` instances.
When the context is exited, all threads are sent a
:class:`StopWorkerThreadSignal` instance and then all threads are waited
upon. Finally, any exceptions from any of the threads are reported on via
the supplied ``thread_manager``'s :meth:`error` method. If an
``error_counter`` list was supplied on instantiation, its first element is
incremented once for every exception which occurred.
"""
def __init__(self, func, thread_count, thread_manager, thread_args=None,
thread_kwargs=None, error_counter=None,
connection_maker=None):
"""
:param func: The worker function which will be passed into each
:class:`QueueFunctionThread`'s constructor.
:param thread_count: The number of worker threads to run.
:param thread_manager: An instance of :class:`MultiThreadingManager`.
:param thread_args: Optional positional arguments to be passed into
each invocation of ``func`` after the de-queued
work item.
:param thread_kwargs: Optional keyword arguments to be passed into each
invocation of ``func``. If a list is supplied as
the ``store_results`` keyword argument, it will
be filled with every result of invoking ``func``
in all threads.
:param error_counter: Optional list containing one integer. If
supplied, the list's first element will be
incremented once for each exception in any
thread. This happens only when exiting the
context.
:param connection_maker: Optional callable. If supplied, this callable
will be invoked once per created thread, and
the result will be passed into func after the
de-queued work item but before ``thread_args``
and ``thread_kwargs``. This is used to ensure
each thread has its own connection to Swift.
"""
self.func = func
self.thread_count = thread_count
self.thread_manager = thread_manager
self.error_counter = error_counter
self.connection_maker = connection_maker
self.queue = Queue(10000)
self.thread_list = []
self.thread_args = thread_args if thread_args else ()
self.thread_kwargs = thread_kwargs if thread_kwargs else {}
def __enter__(self):
for _junk in xrange(self.thread_count):
if self.connection_maker:
thread_args = (self.connection_maker(),) + self.thread_args
else:
thread_args = self.thread_args
qf_thread = QueueFunctionThread(self.queue, self.func,
*thread_args, **self.thread_kwargs)
qf_thread.start()
self.thread_list.append(qf_thread)
return self.queue
def __exit__(self, exc_type, exc_value, traceback):
for thread in [t for t in self.thread_list if t.isAlive()]:
self.queue.put(StopWorkerThreadSignal())
while any(map(QueueFunctionThread.is_alive, self.thread_list)):
sleep(0.05)
for thread in self.thread_list:
for info in thread.exc_infos:
if self.error_counter:
self.error_counter[0] += 1
if isinstance(info[1], ClientException):
self.thread_manager.error(str(info[1]))
else:
self.thread_manager.error(''.join(format_exception(*info)))
class MultiThreadingManager(object):
"""
One object to manage context for multi-threading. This should make
bin/swift less error-prone and allow us to test this code.
This object is a context manager and returns itself into the context. When
entering the context, two printing threads are created (see below) and they
are waited on and cleaned up when exiting the context.
A convenience method, :meth:`queue_manager`, is provided to create a
:class:`QueueFunctionManager` context manager (a thread-pool with an
associated input queue for work items).
Also, thread-safe printing to two streams is provided. The
:meth:`print_msg` method will print to the supplied ``print_stream``
(defaults to ``sys.stdout``) and the :meth:`error` method will print to the
supplied ``error_stream`` (defaults to ``sys.stderr``). Both of these
printing methods will format the given string with any supplied ``*args``
(a la printf) and encode the result to utf8 if necessary.
The attribute :attr:`self.error_count` is incremented once per error
message printed, so an application can tell if any worker threads
encountered exceptions or otherwise called :meth:`error` on this instance.
The swift command-line tool uses this to exit non-zero if any error strings
were printed.
"""
def __init__(self, print_stream=sys.stdout, error_stream=sys.stderr):
"""
:param print_stream: The stream to which :meth:`print_msg` sends
formatted messages, encoded to utf8 if necessary.
:param error_stream: The stream to which :meth:`error` sends formatted
messages, encoded to utf8 if necessary.
"""
self.print_stream = print_stream
self.printer = QueueFunctionManager(self._print, 1, self)
self.error_stream = error_stream
self.error_printer = QueueFunctionManager(self._print_error, 1, self)
self.error_count = 0
def __enter__(self):
self.printer.__enter__()
self.error_printer.__enter__()
return self
def __exit__(self, exc_type, exc_value, traceback):
self.error_printer.__exit__(exc_type, exc_value, traceback)
self.printer.__exit__(exc_type, exc_value, traceback)
def queue_manager(self, func, thread_count, *args, **kwargs):
connection_maker = kwargs.pop('connection_maker', None)
error_counter = kwargs.pop('error_counter', None)
return QueueFunctionManager(func, thread_count, self, thread_args=args,
thread_kwargs=kwargs,
connection_maker=connection_maker,
error_counter=error_counter)
def print_msg(self, msg, *fmt_args):
if fmt_args:
msg = msg % fmt_args
self.printer.queue.put(msg)
def error(self, msg, *fmt_args):
if fmt_args:
msg = msg % fmt_args
self.error_printer.queue.put(msg)
def _print(self, item, stream=None):
if stream is None:
stream = self.print_stream
if isinstance(item, unicode):
item = item.encode('utf8')
print >>stream, item
def _print_error(self, item):
self.error_count += 1
return self._print(item, stream=self.error_stream)

@ -12,7 +12,6 @@
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Miscellaneous utility functions for use with Swift."""
TRUE_VALUES = set(('true', '1', 'yes', 'on', 't', 'y'))

@ -0,0 +1,334 @@
# Copyright (c) 2010-2013 OpenStack, LLC.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import sys
import time
import mock
import testtools
import threading
from cStringIO import StringIO
from Queue import Queue, Empty
from swiftclient import multithreading as mt
from swiftclient.exceptions import ClientException
class ThreadTestCase(testtools.TestCase):
def setUp(self):
super(ThreadTestCase, self).setUp()
self.got_args_kwargs = Queue()
self.starting_thread_count = threading.active_count()
def _func(self, q_item, *args, **kwargs):
self.got_items.put(q_item)
self.got_args_kwargs.put((args, kwargs))
if q_item == 'go boom':
raise Exception('I went boom!')
if q_item == 'c boom':
raise ClientException(
'Client Boom', http_scheme='http', http_host='192.168.22.1',
http_port=80, http_path='/booze', http_status=404,
http_reason='to much', http_response_content='no sir!')
return 'best result EVAR!'
def assertQueueContains(self, queue, expected_contents):
got_contents = []
try:
while True:
got_contents.append(queue.get(timeout=0.1))
except Empty:
pass
if isinstance(expected_contents, set):
got_contents = set(got_contents)
self.assertEqual(expected_contents, got_contents)
class TestQueueFunctionThread(ThreadTestCase):
def setUp(self):
super(TestQueueFunctionThread, self).setUp()
self.input_queue = Queue()
self.got_items = Queue()
self.stored_results = []
self.qft = mt.QueueFunctionThread(self.input_queue, self._func,
'one_arg', 'two_arg',
red_fish='blue_arg',
store_results=self.stored_results)
self.qft.start()
def tearDown(self):
if self.qft.is_alive():
self.finish_up_thread()
super(TestQueueFunctionThread, self).tearDown()
def finish_up_thread(self):
self.input_queue.put(mt.StopWorkerThreadSignal())
while self.qft.is_alive():
time.sleep(0.05)
def test_plumbing_and_store_results(self):
self.input_queue.put('abc')
self.input_queue.put(123)
self.finish_up_thread()
self.assertQueueContains(self.got_items, ['abc', 123])
self.assertQueueContains(self.got_args_kwargs, [
(('one_arg', 'two_arg'), {'red_fish': 'blue_arg'}),
(('one_arg', 'two_arg'), {'red_fish': 'blue_arg'})])
self.assertEqual(self.stored_results,
['best result EVAR!', 'best result EVAR!'])
def test_exception_handling(self):
self.input_queue.put('go boom')
self.input_queue.put('ok')
self.input_queue.put('go boom')
self.finish_up_thread()
self.assertQueueContains(self.got_items,
['go boom', 'ok', 'go boom'])
self.assertEqual(len(self.qft.exc_infos), 2)
self.assertEqual(Exception, self.qft.exc_infos[0][0])
self.assertEqual(Exception, self.qft.exc_infos[1][0])
self.assertEqual(('I went boom!',), self.qft.exc_infos[0][1].args)
self.assertEqual(('I went boom!',), self.qft.exc_infos[1][1].args)
class TestQueueFunctionManager(ThreadTestCase):
def setUp(self):
super(TestQueueFunctionManager, self).setUp()
self.thread_manager = mock.create_autospec(
mt.MultiThreadingManager, spec_set=True, instance=True)
self.thread_count = 4
self.error_counter = [0]
self.got_items = Queue()
self.stored_results = []
self.qfq = mt.QueueFunctionManager(
self._func, self.thread_count, self.thread_manager,
thread_args=('1arg', '2arg'),
thread_kwargs={'a': 'b', 'store_results': self.stored_results},
error_counter=self.error_counter,
connection_maker=self.connection_maker)
def connection_maker(self):
return 'yup, I made a connection'
def test_context_manager_without_error_counter(self):
self.qfq = mt.QueueFunctionManager(
self._func, self.thread_count, self.thread_manager,
thread_args=('1arg', '2arg'),
thread_kwargs={'a': 'b', 'store_results': self.stored_results},
connection_maker=self.connection_maker)
with self.qfq as input_queue:
self.assertEqual(self.starting_thread_count + self.thread_count,
threading.active_count())
input_queue.put('go boom')
self.assertEqual(self.starting_thread_count, threading.active_count())
error_strs = map(str, self.thread_manager.error.call_args_list)
self.assertEqual(1, len(error_strs))
self.assertTrue('Exception: I went boom!' in error_strs[0])
def test_context_manager_without_conn_maker_or_error_counter(self):
self.qfq = mt.QueueFunctionManager(
self._func, self.thread_count, self.thread_manager,
thread_args=('1arg', '2arg'), thread_kwargs={'a': 'b'})
with self.qfq as input_queue:
self.assertEqual(self.starting_thread_count + self.thread_count,
threading.active_count())
for i in xrange(20):
input_queue.put('slap%d' % i)
self.assertEqual(self.starting_thread_count, threading.active_count())
self.assertEqual([], self.thread_manager.error.call_args_list)
self.assertEqual(0, self.error_counter[0])
self.assertQueueContains(self.got_items,
set(['slap%d' % i for i in xrange(20)]))
self.assertQueueContains(
self.got_args_kwargs,
[(('1arg', '2arg'), {'a': 'b'})] * 20)
self.assertEqual(self.stored_results, [])
def test_context_manager_with_exceptions(self):
with self.qfq as input_queue:
self.assertEqual(self.starting_thread_count + self.thread_count,
threading.active_count())
for i in xrange(20):
input_queue.put('item%d' % i if i % 2 == 0 else 'go boom')
self.assertEqual(self.starting_thread_count, threading.active_count())
error_strs = map(str, self.thread_manager.error.call_args_list)
self.assertEqual(10, len(error_strs))
self.assertTrue(all(['Exception: I went boom!' in s for s in
error_strs]))
self.assertEqual(10, self.error_counter[0])
expected_items = set(['go boom'] + ['item%d' % i for i in xrange(20)
if i % 2 == 0])
self.assertQueueContains(self.got_items, expected_items)
self.assertQueueContains(
self.got_args_kwargs,
[(('yup, I made a connection', '1arg', '2arg'), {'a': 'b'})] * 20)
self.assertEqual(self.stored_results, ['best result EVAR!'] * 10)
def test_context_manager_with_client_exceptions(self):
with self.qfq as input_queue:
self.assertEqual(self.starting_thread_count + self.thread_count,
threading.active_count())
for i in xrange(20):
input_queue.put('item%d' % i if i % 2 == 0 else 'c boom')
self.assertEqual(self.starting_thread_count, threading.active_count())
error_strs = map(str, self.thread_manager.error.call_args_list)
self.assertEqual(10, len(error_strs))
stringification = 'Client Boom: ' \
'http://192.168.22.1:80/booze 404 to much no sir!'
self.assertTrue(all([stringification in s for s in error_strs]))
self.assertEqual(10, self.error_counter[0])
expected_items = set(['c boom'] + ['item%d' % i for i in xrange(20)
if i % 2 == 0])
self.assertQueueContains(self.got_items, expected_items)
self.assertQueueContains(
self.got_args_kwargs,
[(('yup, I made a connection', '1arg', '2arg'), {'a': 'b'})] * 20)
self.assertEqual(self.stored_results, ['best result EVAR!'] * 10)
def test_context_manager_with_connection_maker(self):
with self.qfq as input_queue:
self.assertEqual(self.starting_thread_count + self.thread_count,
threading.active_count())
for i in xrange(20):
input_queue.put('item%d' % i)
self.assertEqual(self.starting_thread_count, threading.active_count())
self.assertEqual([], self.thread_manager.error.call_args_list)
self.assertEqual(0, self.error_counter[0])
self.assertQueueContains(self.got_items,
set(['item%d' % i for i in xrange(20)]))
self.assertQueueContains(
self.got_args_kwargs,
[(('yup, I made a connection', '1arg', '2arg'), {'a': 'b'})] * 20)
self.assertEqual(self.stored_results, ['best result EVAR!'] * 20)
class TestMultiThreadingManager(ThreadTestCase):
@mock.patch('swiftclient.multithreading.QueueFunctionManager')
def test_instantiation(self, mock_qfq):
thread_manager = mt.MultiThreadingManager()
self.assertEqual([
mock.call(thread_manager._print, 1, thread_manager),
mock.call(thread_manager._print_error, 1, thread_manager),
], mock_qfq.call_args_list)
# These contexts don't get entered into until the
# MultiThreadingManager's context is entered.
self.assertEqual([], thread_manager.printer.__enter__.call_args_list)
self.assertEqual([],
thread_manager.error_printer.__enter__.call_args_list)
# Test default values for the streams.
self.assertEqual(sys.stdout, thread_manager.print_stream)
self.assertEqual(sys.stderr, thread_manager.error_stream)
@mock.patch('swiftclient.multithreading.QueueFunctionManager')
def test_queue_manager_no_args(self, mock_qfq):
thread_manager = mt.MultiThreadingManager()
mock_qfq.reset_mock()
mock_qfq.return_value = 'slap happy!'
self.assertEqual(
'slap happy!',
thread_manager.queue_manager(self._func, 88))
self.assertEqual([
mock.call(self._func, 88, thread_manager, thread_args=(),
thread_kwargs={}, connection_maker=None,
error_counter=None)
], mock_qfq.call_args_list)
@mock.patch('swiftclient.multithreading.QueueFunctionManager')
def test_queue_manager_with_args(self, mock_qfq):
thread_manager = mt.MultiThreadingManager()
mock_qfq.reset_mock()
mock_qfq.return_value = 'do run run'
self.assertEqual(
'do run run',
thread_manager.queue_manager(self._func, 88, 'fun', times='are',
connection_maker='abc', to='be had',
error_counter='def'))
self.assertEqual([
mock.call(self._func, 88, thread_manager, thread_args=('fun',),
thread_kwargs={'times': 'are', 'to': 'be had'},
connection_maker='abc', error_counter='def')
], mock_qfq.call_args_list)
def test_printers(self):
out_stream = StringIO()
err_stream = StringIO()
with mt.MultiThreadingManager(
print_stream=out_stream,
error_stream=err_stream) as thread_manager:
# Sanity-checking these gives power to the previous test which
# looked at the default values of thread_manager.print/error_stream
self.assertEqual(out_stream, thread_manager.print_stream)
self.assertEqual(err_stream, thread_manager.error_stream)
self.assertEqual(self.starting_thread_count + 2,
threading.active_count())
thread_manager.print_msg('one-argument')
thread_manager.print_msg('one %s, %d fish', 'fish', 88)
thread_manager.error('I have %d problems, but a %s is not one',
99, u'\u062A\u062A')
thread_manager.print_msg('some\n%s\nover the %r', 'where',
u'\u062A\u062A')
thread_manager.error('one-error-argument')
thread_manager.error('Sometimes\n%.1f%% just\ndoes not\nwork!',
3.14159)
self.assertEqual(self.starting_thread_count, threading.active_count())
out_stream.seek(0)
self.assertEqual([
'one-argument\n',
'one fish, 88 fish\n',
'some\n', 'where\n', "over the u'\\u062a\\u062a'\n",
], list(out_stream.readlines()))
err_stream.seek(0)
self.assertEqual([
u'I have 99 problems, but a \u062A\u062A is not one\n'.encode(
'utf8'),
'one-error-argument\n',
'Sometimes\n', '3.1% just\n', 'does not\n', 'work!\n',
], list(err_stream.readlines()))
self.assertEqual(3, thread_manager.error_count)
if __name__ == '__main__':
testtools.main()