remove busy-wait so that swift client won't use up all CPU cycles

The current implementation uses busy-wait and checking flags to
communicate between threads. It wastes a lot of CPU powers. With
python 2.6 is required for Swift, the communication between
threads should now be using queue and signal mechanisms. This
patch removed the busy-wait loops and use queue and queue join
for threads coordination which will not consume CPU cycles if
a thread is blocked.

Change-Id: I648cd637a92a159d5c13baa83f357cee2dfe7937
This commit is contained in:
Tong Li 2013-05-23 16:39:31 -04:00 committed by Gerrit Code Review
parent 7e84b69291
commit 874e0e4427

176
bin/swift

@ -22,10 +22,10 @@ from hashlib import md5
from optparse import OptionParser, SUPPRESS_HELP from optparse import OptionParser, SUPPRESS_HELP
from os import environ, listdir, makedirs, utime, _exit as os_exit from os import environ, listdir, makedirs, utime, _exit as os_exit
from os.path import basename, dirname, getmtime, getsize, isdir, join from os.path import basename, dirname, getmtime, getsize, isdir, join
from Queue import Empty, Queue from Queue import Queue
from random import shuffle from random import shuffle
from sys import argv, exc_info, exit, stderr, stdout from sys import argv, exc_info, exit, stderr, stdout
from threading import current_thread, enumerate as threading_enumerate, Thread from threading import enumerate as threading_enumerate, Thread
from time import sleep, time from time import sleep, time
from traceback import format_exception from traceback import format_exception
from urllib import quote, unquote from urllib import quote, unquote
@ -79,38 +79,29 @@ def put_errors_from_threads(threads, error_queue):
return was_error return was_error
def attempt_graceful_exit(signum, frame): class StopWorkerThreadSignal(object):
""" pass
Try to gracefully shut down. Sets abort=True on all non-main threads.
More importantly, installs the immediate_exit handler on the
signal that triggered this handler. If this function is installed def shutdown_worker_threads(queue, thread_list):
as a signal handler for SIGINT, then pressing Ctrl-C once will for thread in [t for t in thread_list if t.isAlive()]:
cause this program to finish operations in progress, then exit. queue.put(StopWorkerThreadSignal())
Pressing it again will cause an immediate exit; no cleanup
handlers will get called.
"""
stderr.write("Attempting graceful exit. "
"Press Ctrl-C again to exit immediately.\n")
main_thread = current_thread()
for thread in [t for t in threading_enumerate() if t is not main_thread]:
thread.abort = True
signal.signal(signum, immediate_exit)
def immediate_exit(signum, frame): def immediate_exit(signum, frame):
stderr.write(" Aborted\n")
os_exit(2) os_exit(2)
class QueueFunctionThread(Thread): class QueueFunctionThread(Thread):
def __init__(self, queue, func, *args, **kwargs): def __init__(self, queue, func, *args, **kwargs):
""" Calls func for each item in queue; func is called with a queued """
item as the first arg followed by *args and **kwargs. Use the abort Calls func for each item in queue; func is called with a queued
attribute to have the thread empty the queue (without processing) item as the first arg followed by *args and **kwargs. Use the
and exit. """ PriorityQueue for sending quit signal when Ctrl-C is pressed.
"""
Thread.__init__(self) Thread.__init__(self)
self.abort = False
self.queue = queue self.queue = queue
self.func = func self.func = func
self.args = args self.args = args
@ -122,21 +113,19 @@ class QueueFunctionThread(Thread):
def run(self): def run(self):
while True: while True:
try: try:
item = self.queue.get_nowait() item = self.queue.get()
except Empty: if isinstance(item, StopWorkerThreadSignal):
if self.abort:
break break
sleep(0.01) except:
# This catch is important and it may occur when ctrl-C is
# pressed, in this case simply quit the thread
break
else: else:
try: try:
if not self.abort: self.func(item, *self.args, **self.kwargs)
res = self.func(item, *self.args, **self.kwargs)
if self.store_results:
self.results.append(res)
except Exception: except Exception:
self.exc_infos.append(exc_info()) self.exc_infos.append(exc_info())
finally:
self.queue.task_done()
st_delete_help = ''' st_delete_help = '''
delete [options] --all OR delete container [options] [object] [object] ... delete [options] --all OR delete container [options] [object] [object] ...
@ -210,12 +199,7 @@ def st_delete(parser, args, print_queue, error_queue):
xrange(options.object_threads)] xrange(options.object_threads)]
for thread in segment_threads: for thread in segment_threads:
thread.start() thread.start()
while not segment_queue.empty(): shutdown_worker_threads(segment_queue, segment_threads)
sleep(0.01)
for thread in segment_threads:
thread.abort = True
while thread.isAlive():
thread.join(0.01)
put_errors_from_threads(segment_threads, error_queue) put_errors_from_threads(segment_threads, error_queue)
if options.verbose: if options.verbose:
path = options.yes_all and join(container, obj) or obj path = options.yes_all and join(container, obj) or obj
@ -237,20 +221,16 @@ def st_delete(parser, args, print_queue, error_queue):
def _delete_container(container, conn): def _delete_container(container, conn):
try: try:
marker = '' marker = ''
had_objects = False
while True: while True:
objects = [o['name'] for o in objects = [o['name'] for o in
conn.get_container(container, marker=marker)[1]] conn.get_container(container, marker=marker)[1]]
if not objects: if not objects:
break break
had_objects = True
for obj in objects: for obj in objects:
object_queue.put((container, obj)) object_queue.put((container, obj))
marker = objects[-1] marker = objects[-1]
if had_objects: while not object_queue.empty():
# By using join() instead of empty() we should avoid most sleep(0.05)
# occurrences of 409 below.
object_queue.join()
attempts = 1 attempts = 1
while True: while True:
try: try:
@ -292,10 +272,6 @@ def st_delete(parser, args, print_queue, error_queue):
for container in containers: for container in containers:
container_queue.put(container) container_queue.put(container)
marker = containers[-1] marker = containers[-1]
while not container_queue.empty():
sleep(0.01)
while not object_queue.empty():
sleep(0.01)
except ClientException as err: except ClientException as err:
if err.http_status != 404: if err.http_status != 404:
raise raise
@ -310,19 +286,11 @@ def st_delete(parser, args, print_queue, error_queue):
else: else:
for obj in args[1:]: for obj in args[1:]:
object_queue.put((args[0], obj)) object_queue.put((args[0], obj))
while not container_queue.empty():
sleep(0.01) shutdown_worker_threads(container_queue, container_threads)
for thread in container_threads:
thread.abort = True
while thread.isAlive():
thread.join(0.01)
put_errors_from_threads(container_threads, error_queue) put_errors_from_threads(container_threads, error_queue)
while not object_queue.empty():
sleep(0.01) shutdown_worker_threads(object_queue, object_threads)
for thread in object_threads:
thread.abort = True
while thread.isAlive():
thread.join(0.01)
put_errors_from_threads(object_threads, error_queue) put_errors_from_threads(object_threads, error_queue)
@ -516,19 +484,11 @@ def st_download(parser, args, print_queue, error_queue):
else: else:
for obj in args[1:]: for obj in args[1:]:
object_queue.put((args[0], obj)) object_queue.put((args[0], obj))
while not container_queue.empty():
sleep(0.01) shutdown_worker_threads(container_queue, container_threads)
for thread in container_threads:
thread.abort = True
while thread.isAlive():
thread.join(0.01)
put_errors_from_threads(container_threads, error_queue) put_errors_from_threads(container_threads, error_queue)
while not object_queue.empty():
sleep(0.01) shutdown_worker_threads(object_queue, object_threads)
for thread in object_threads:
thread.abort = True
while thread.isAlive():
thread.join(0.01)
put_errors_from_threads(object_threads, error_queue) put_errors_from_threads(object_threads, error_queue)
@ -969,12 +929,7 @@ def st_upload(parser, args, print_queue, error_queue):
'log_line': '%s segment %s' % (obj, segment)}) 'log_line': '%s segment %s' % (obj, segment)})
segment += 1 segment += 1
segment_start += segment_size segment_start += segment_size
while not segment_queue.empty(): shutdown_worker_threads(segment_queue, segment_threads)
sleep(0.01)
for thread in segment_threads:
thread.abort = True
while thread.isAlive():
thread.join(0.01)
if put_errors_from_threads(segment_threads, error_queue): if put_errors_from_threads(segment_threads, error_queue):
raise ClientException( raise ClientException(
'Aborting manifest creation ' 'Aborting manifest creation '
@ -1045,12 +1000,7 @@ def st_upload(parser, args, print_queue, error_queue):
for _junk in xrange(options.segment_threads)] for _junk in xrange(options.segment_threads)]
for thread in segment_threads: for thread in segment_threads:
thread.start() thread.start()
while not segment_queue.empty(): shutdown_worker_threads(segment_queue, segment_threads)
sleep(0.01)
for thread in segment_threads:
thread.abort = True
while thread.isAlive():
thread.join(0.01)
put_errors_from_threads(segment_threads, error_queue) put_errors_from_threads(segment_threads, error_queue)
if options.verbose: if options.verbose:
if conn.attempts > 1: if conn.attempts > 1:
@ -1110,12 +1060,8 @@ def st_upload(parser, args, print_queue, error_queue):
_upload_dir(arg) _upload_dir(arg)
else: else:
object_queue.put({'path': arg}) object_queue.put({'path': arg})
while not object_queue.empty():
sleep(0.01) shutdown_worker_threads(object_queue, object_threads)
for thread in object_threads:
thread.abort = True
while thread.isAlive():
thread.join(0.01)
put_errors_from_threads(object_threads, error_queue) put_errors_from_threads(object_threads, error_queue)
except ClientException as err: except ClientException as err:
if err.http_status != 404: if err.http_status != 404:
@ -1333,7 +1279,7 @@ Examples:
exit('no such command: %s' % args[0]) exit('no such command: %s' % args[0])
exit() exit()
signal.signal(signal.SIGINT, attempt_graceful_exit) signal.signal(signal.SIGINT, immediate_exit)
if options.debug: if options.debug:
logger = logging.getLogger("swiftclient") logger = logging.getLogger("swiftclient")
@ -1347,7 +1293,7 @@ Examples:
print item print item
print_thread = QueueFunctionThread(print_queue, _print) print_thread = QueueFunctionThread(print_queue, _print)
print_thread.start() print_thread.setDaemon(True)
error_count = 0 error_count = 0
error_queue = Queue(10000) error_queue = Queue(10000)
@ -1360,28 +1306,26 @@ Examples:
print >> stderr, item print >> stderr, item
error_thread = QueueFunctionThread(error_queue, _error) error_thread = QueueFunctionThread(error_queue, _error)
error_thread.setDaemon(True)
parser.usage = globals()['st_%s_help' % args[0]]
try:
globals()['st_%s' % args[0]](parser, argv[1:], print_queue,
error_queue)
except (ClientException, HTTPException, socket.error) as err:
error_queue.put(str(err))
# Let other threads start working, now start print and error thread,
# this is to prevent the main thread shutdown two thread prematurely
print_thread.start()
error_thread.start() error_thread.start()
try: # If not all the worker threads have finished, then the main thread
parser.usage = globals()['st_%s_help' % args[0]] # has to wait. Only when there are main, error and print thread left
try: # the main thread can proceed to finish up.
globals()['st_%s' % args[0]](parser, argv[1:], print_queue, while (len(threading_enumerate()) > 3 or not error_queue.empty() or
error_queue) not print_queue.empty()):
except (ClientException, HTTPException, socket.error) as err: sleep(0.5)
error_queue.put(str(err))
while not print_queue.empty(): if error_count:
sleep(0.01) exit(1)
print_thread.abort = True
while print_thread.isAlive():
print_thread.join(0.01)
while not error_queue.empty():
sleep(0.01)
error_thread.abort = True
while error_thread.isAlive():
error_thread.join(0.01)
if error_count:
exit(1)
except (SystemExit, Exception):
for thread in threading_enumerate():
thread.abort = True
raise