Updated st's copy of client.py; st no longer aborts everything on one error; st prints when it had to retry; st prints ClientExceptions without the full stack trace; st aborts manifest creation if segments couldn't be uploaded; client.py will retry on 408s; client.py will treat empty contents as resettable

This commit is contained in:
gholt 2011-05-14 02:31:47 +00:00
parent 00e81e256c
commit 6a1327cad0
2 changed files with 121 additions and 37 deletions
bin
swift/common

154
bin/st

@ -20,9 +20,10 @@ from optparse import OptionParser
from os import environ, listdir, makedirs, utime
from os.path import basename, dirname, getmtime, getsize, isdir, join
from Queue import Empty, Queue
from sys import argv, exit, stderr, stdout
from sys import argv, exc_info, exit, stderr, stdout
from threading import enumerate as threading_enumerate, Thread
from time import sleep
from traceback import format_exception
# # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # #
@ -30,22 +31,29 @@ from time import sleep
import socket
from cStringIO import StringIO
from httplib import HTTPException, HTTPSConnection
from re import compile, DOTALL
from tokenize import generate_tokens, STRING, NAME, OP
from urllib import quote as _quote, unquote
from urlparse import urlparse, urlunparse
try:
from eventlet.green.httplib import HTTPException, HTTPSConnection
except ImportError:
from httplib import HTTPException, HTTPSConnection
try:
from eventlet import sleep
except Exception:
except ImportError:
from time import sleep
try:
from swift.common.bufferedhttp \
import BufferedHTTPConnection as HTTPConnection
except Exception:
from httplib import HTTPConnection
except ImportError:
try:
from eventlet.green.httplib import HTTPConnection
except ImportError:
from httplib import HTTPConnection
def quote(value, safe='/'):
@ -226,7 +234,7 @@ def get_account(url, token, marker=None, limit=None, prefix=None,
listing = \
get_account(url, token, marker, limit, prefix, http_conn)[1]
if listing:
rv.extend(listing)
rv[1].extend(listing)
return rv
parsed, conn = http_conn
qs = 'format=json'
@ -700,7 +708,7 @@ class Connection(object):
"""Convenience class to make requests that will also retry the request"""
def __init__(self, authurl, user, key, retries=5, preauthurl=None,
preauthtoken=None, snet=False):
preauthtoken=None, snet=False, starting_backoff=1):
"""
:param authurl: authenitcation URL
:param user: user name to authenticate as
@ -720,6 +728,7 @@ class Connection(object):
self.token = preauthtoken
self.attempts = 0
self.snet = snet
self.starting_backoff = starting_backoff
def get_auth(self):
return get_auth(self.authurl, self.user, self.key, snet=self.snet)
@ -727,9 +736,9 @@ class Connection(object):
def http_connection(self):
return http_connection(self.url)
def _retry(self, func, *args, **kwargs):
def _retry(self, reset_func, func, *args, **kwargs):
self.attempts = 0
backoff = 1
backoff = self.starting_backoff
while self.attempts <= self.retries:
self.attempts += 1
try:
@ -752,16 +761,20 @@ class Connection(object):
self.url = self.token = None
if self.attempts > 1:
raise
elif err.http_status == 408:
self.http_conn = None
elif 500 <= err.http_status <= 599:
pass
else:
raise
sleep(backoff)
backoff *= 2
if reset_func:
reset_func(func, *args, **kwargs)
def head_account(self):
"""Wrapper for :func:`head_account`"""
return self._retry(head_account)
return self._retry(None, head_account)
def get_account(self, marker=None, limit=None, prefix=None,
full_listing=False):
@ -769,16 +782,16 @@ class Connection(object):
# TODO(unknown): With full_listing=True this will restart the entire
# listing with each retry. Need to make a better version that just
# retries where it left off.
return self._retry(get_account, marker=marker, limit=limit,
return self._retry(None, get_account, marker=marker, limit=limit,
prefix=prefix, full_listing=full_listing)
def post_account(self, headers):
"""Wrapper for :func:`post_account`"""
return self._retry(post_account, headers)
return self._retry(None, post_account, headers)
def head_container(self, container):
"""Wrapper for :func:`head_container`"""
return self._retry(head_container, container)
return self._retry(None, head_container, container)
def get_container(self, container, marker=None, limit=None, prefix=None,
delimiter=None, full_listing=False):
@ -786,46 +799,60 @@ class Connection(object):
# TODO(unknown): With full_listing=True this will restart the entire
# listing with each retry. Need to make a better version that just
# retries where it left off.
return self._retry(get_container, container, marker=marker,
return self._retry(None, get_container, container, marker=marker,
limit=limit, prefix=prefix, delimiter=delimiter,
full_listing=full_listing)
def put_container(self, container, headers=None):
"""Wrapper for :func:`put_container`"""
return self._retry(put_container, container, headers=headers)
return self._retry(None, put_container, container, headers=headers)
def post_container(self, container, headers):
"""Wrapper for :func:`post_container`"""
return self._retry(post_container, container, headers)
return self._retry(None, post_container, container, headers)
def delete_container(self, container):
"""Wrapper for :func:`delete_container`"""
return self._retry(delete_container, container)
return self._retry(None, delete_container, container)
def head_object(self, container, obj):
"""Wrapper for :func:`head_object`"""
return self._retry(head_object, container, obj)
return self._retry(None, head_object, container, obj)
def get_object(self, container, obj, resp_chunk_size=None):
"""Wrapper for :func:`get_object`"""
return self._retry(get_object, container, obj,
return self._retry(None, get_object, container, obj,
resp_chunk_size=resp_chunk_size)
def put_object(self, container, obj, contents, content_length=None,
etag=None, chunk_size=65536, content_type=None,
headers=None):
"""Wrapper for :func:`put_object`"""
return self._retry(put_object, container, obj, contents,
def _default_reset(*args, **kwargs):
raise ClientException('put_object(%r, %r, ...) failure and no '
'ability to reset contents for reupload.' % (container, obj))
reset_func = _default_reset
tell = getattr(contents, 'tell', None)
seek = getattr(contents, 'seek', None)
if tell and seek:
orig_pos = tell()
reset_func = lambda *a, **k: seek(orig_pos)
elif not contents:
reset_func = lambda *a, **k: None
return self._retry(reset_func, put_object, container, obj, contents,
content_length=content_length, etag=etag, chunk_size=chunk_size,
content_type=content_type, headers=headers)
def post_object(self, container, obj, headers):
"""Wrapper for :func:`post_object`"""
return self._retry(post_object, container, obj, headers)
return self._retry(None, post_object, container, obj, headers)
def delete_object(self, container, obj):
"""Wrapper for :func:`delete_object`"""
return self._retry(delete_object, container, obj)
return self._retry(None, delete_object, container, obj)
# End inclusion of swift.common.client
# # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # #
@ -839,6 +866,24 @@ def mkdirs(path):
raise
def put_errors_from_threads(threads, error_queue):
"""
Places any errors from the threads into error_queue.
:param threads: A list of QueueFunctionThread instances.
:param error_queue: A queue to put error strings into.
:returns: True if any errors were found.
"""
was_error = False
for thread in threads:
for info in thread.exc_infos:
was_error = True
if isinstance(info[1], ClientException):
error_queue.put(str(info[1]))
else:
error_queue.put(''.join(format_exception(*info)))
return was_error
class QueueFunctionThread(Thread):
def __init__(self, queue, func, *args, **kwargs):
@ -852,18 +897,22 @@ class QueueFunctionThread(Thread):
self.func = func
self.args = args
self.kwargs = kwargs
self.exc_infos = []
def run(self):
while True:
try:
item = self.queue.get_nowait()
if not self.abort:
self.func(item, *self.args, **self.kwargs)
self.queue.task_done()
except Empty:
if self.abort:
break
sleep(0.01)
try:
while True:
try:
item = self.queue.get_nowait()
if not self.abort:
self.func(item, *self.args, **self.kwargs)
self.queue.task_done()
except Empty:
if self.abort:
break
sleep(0.01)
except Exception:
self.exc_infos.append(exc_info())
st_delete_help = '''
@ -891,7 +940,11 @@ def st_delete(parser, args, print_queue, error_queue):
def _delete_segment((container, obj), conn):
conn.delete_object(container, obj)
if options.verbose:
print_queue.put('%s/%s' % (container, obj))
if conn.attempts > 2:
print_queue.put('%s/%s [after %d attempts]' %
(container, obj, conn.attempts))
else:
print_queue.put('%s/%s' % (container, obj))
object_queue = Queue(10000)
@ -924,11 +977,16 @@ def st_delete(parser, args, print_queue, error_queue):
thread.abort = True
while thread.isAlive():
thread.join(0.01)
put_errors_from_threads(segment_threads, error_queue)
if options.verbose:
path = options.yes_all and join(container, obj) or obj
if path[:1] in ('/', '\\'):
path = path[1:]
print_queue.put(path)
if conn.attempts > 1:
print_queue.put('%s [after %d attempts]' %
(path, conn.attempts))
else:
print_queue.put(path)
except ClientException, err:
if err.http_status != 404:
raise
@ -1015,12 +1073,14 @@ def st_delete(parser, args, print_queue, error_queue):
thread.abort = True
while thread.isAlive():
thread.join(0.01)
put_errors_from_threads(container_threads, error_queue)
while not object_queue.empty():
sleep(0.01)
for thread in object_threads:
thread.abort = True
while thread.isAlive():
thread.join(0.01)
put_errors_from_threads(object_threads, error_queue)
st_download_help = '''
@ -1112,7 +1172,11 @@ def st_download(options, args, print_queue, error_queue):
mtime = float(headers['x-object-meta-mtime'])
utime(path, (mtime, mtime))
if options.verbose:
print_queue.put(path)
if conn.attempts > 1:
print_queue.put('%s [after %d attempts' %
(path, conn.attempts))
else:
print_queue.put(path)
except ClientException, err:
if err.http_status != 404:
raise
@ -1184,12 +1248,14 @@ def st_download(options, args, print_queue, error_queue):
thread.abort = True
while thread.isAlive():
thread.join(0.01)
put_errors_from_threads(container_threads, error_queue)
while not object_queue.empty():
sleep(0.01)
for thread in object_threads:
thread.abort = True
while thread.isAlive():
thread.join(0.01)
put_errors_from_threads(object_threads, error_queue)
st_list_help = '''
@ -1472,7 +1538,11 @@ def st_upload(options, args, print_queue, error_queue):
conn.put_object(job.get('container', args[0] + '_segments'),
job['obj'], fp, content_length=job['segment_size'])
if options.verbose and 'log_line' in job:
print_queue.put(job['log_line'])
if conn.attempts > 1:
print_queue.put('%s [after %d attempts]' %
(job['log_line'], conn.attempts))
else:
print_queue.put(job['log_line'])
def _object_job(job, conn):
path = job['path']
@ -1550,6 +1620,10 @@ def st_upload(options, args, print_queue, error_queue):
thread.abort = True
while thread.isAlive():
thread.join(0.01)
if put_errors_from_threads(segment_threads, error_queue):
raise ClientException('Aborting manifest creation '
'because not all segments could be uploaded. %s/%s'
% (container, obj))
new_object_manifest = '%s_segments/%s/%s/%s/' % (
container, obj, put_headers['x-object-meta-mtime'],
full_size)
@ -1580,8 +1654,13 @@ def st_upload(options, args, print_queue, error_queue):
thread.abort = True
while thread.isAlive():
thread.join(0.01)
put_errors_from_threads(segment_threads, error_queue)
if options.verbose:
print_queue.put(obj)
if conn.attempts > 1:
print_queue.put(
'%s [after %d attempts]' % (obj, conn.attempts))
else:
print_queue.put(obj)
except OSError, err:
if err.errno != ENOENT:
raise
@ -1630,6 +1709,7 @@ def st_upload(options, args, print_queue, error_queue):
thread.abort = True
while thread.isAlive():
thread.join(0.01)
put_errors_from_threads(object_threads, error_queue)
except ClientException, err:
if err.http_status != 404:
raise

@ -748,6 +748,8 @@ class Connection(object):
self.url = self.token = None
if self.attempts > 1:
raise
elif err.http_status == 408:
self.http_conn = None
elif 500 <= err.http_status <= 599:
pass
else:
@ -824,6 +826,8 @@ class Connection(object):
if tell and seek:
orig_pos = tell()
reset_func = lambda *a, **k: seek(orig_pos)
elif not contents:
reset_func = lambda *a, **k: None
return self._retry(reset_func, put_object, container, obj, contents,
content_length=content_length, etag=etag, chunk_size=chunk_size,