Merge "Static large object support."

This commit is contained in:
Jenkins 2013-04-04 23:08:25 +00:00 committed by Gerrit Code Review
commit 2d97609a52
3 changed files with 154 additions and 45 deletions

139
bin/swift
View File

@ -30,6 +30,11 @@ from time import sleep, time
from traceback import format_exception from traceback import format_exception
from urllib import quote, unquote from urllib import quote, unquote
try:
import simplejson as json
except ImportError:
import json
from swiftclient import Connection, ClientException, HTTPException, utils from swiftclient import Connection, ClientException, HTTPException, utils
from swiftclient.version import version_info from swiftclient.version import version_info
@ -111,6 +116,8 @@ class QueueFunctionThread(Thread):
self.args = args self.args = args
self.kwargs = kwargs self.kwargs = kwargs
self.exc_infos = [] self.exc_infos = []
self.results = []
self.store_results = kwargs.pop('store_results', False)
def run(self): def run(self):
while True: while True:
@ -123,7 +130,9 @@ class QueueFunctionThread(Thread):
else: else:
try: try:
if not self.abort: if not self.abort:
self.func(item, *self.args, **self.kwargs) res = self.func(item, *self.args, **self.kwargs)
if self.store_results:
self.results.append(res)
except Exception: except Exception:
self.exc_infos.append(exc_info()) self.exc_infos.append(exc_info())
finally: finally:
@ -171,19 +180,23 @@ def st_delete(parser, args, print_queue, error_queue):
def _delete_object((container, obj), conn): def _delete_object((container, obj), conn):
try: try:
old_manifest = None old_manifest = None
query_string = None
if not options.leave_segments: if not options.leave_segments:
try: try:
old_manifest = conn.head_object(container, obj).get( headers = conn.head_object(container, obj)
'x-object-manifest') old_manifest = headers.get('x-object-manifest')
if utils.config_true_value(
headers.get('x-static-large-object')):
query_string = 'multipart-manifest=delete'
except ClientException, err: except ClientException, err:
if err.http_status != 404: if err.http_status != 404:
raise raise
conn.delete_object(container, obj) conn.delete_object(container, obj, query_string=query_string)
if old_manifest: if old_manifest:
segment_queue = Queue(10000) segment_queue = Queue(10000)
scontainer, sprefix = old_manifest.split('/', 1) scontainer, sprefix = old_manifest.split('/', 1)
scontainer = unquote(scontainer) scontainer = unquote(scontainer)
sprefix = unquote(sprefix) sprefix = unquote(sprefix).rstrip('/') + '/'
for delobj in conn.get_container(scontainer, for delobj in conn.get_container(scontainer,
prefix=sprefix)[1]: prefix=sprefix)[1]:
segment_queue.put((scontainer, delobj['name'])) segment_queue.put((scontainer, delobj['name']))
@ -793,7 +806,10 @@ def st_upload(parser, args, print_queue, error_queue):
default=[], help='Set request headers with the syntax header:value. ' default=[], help='Set request headers with the syntax header:value. '
' This option may be repeated. Example -H content-type:text/plain ' ' This option may be repeated. Example -H content-type:text/plain '
'-H "Content-Length: 4000"') '-H "Content-Length: 4000"')
parser.add_option('', '--use-slo', action='store_true', default=False,
help='When used in conjuction with --segment-size will '
'create a Static Large Object instead of the default '
'Dynamic Large Object.')
(options, args) = parse_args(parser, args) (options, args) = parse_args(parser, args)
args = args[1:] args = args[1:]
if len(args) < 2: if len(args) < 2:
@ -811,14 +827,17 @@ def st_upload(parser, args, print_queue, error_queue):
seg_container = args[0] +'_segments' seg_container = args[0] +'_segments'
if options.segment_container: if options.segment_container:
seg_container = options.segment_container seg_container = options.segment_container
conn.put_object(job.get('container', seg_container), etag = conn.put_object(job.get('container', seg_container),
job['obj'], fp, content_length=job['segment_size']) job['obj'], fp, content_length=job['segment_size'])
job['segment_location'] = '/%s/%s' % (seg_container, job['obj'])
job['segment_etag'] = etag
if options.verbose and 'log_line' in job: if options.verbose and 'log_line' in job:
if conn.attempts > 1: if conn.attempts > 1:
print_queue.put('%s [after %d attempts]' % print_queue.put('%s [after %d attempts]' %
(job['log_line'], conn.attempts)) (job['log_line'], conn.attempts))
else: else:
print_queue.put(job['log_line']) print_queue.put(job['log_line'])
return job
def _object_job(job, conn): def _object_job(job, conn):
path = job['path'] path = job['path']
@ -855,6 +874,8 @@ def st_upload(parser, args, print_queue, error_queue):
# manifest object and need to delete the old segments # manifest object and need to delete the old segments
# ourselves. # ourselves.
old_manifest = None old_manifest = None
old_slo_manifest_paths = []
new_slo_manifest_paths = set()
if options.changed or not options.leave_segments: if options.changed or not options.leave_segments:
try: try:
headers = conn.head_object(container, obj) headers = conn.head_object(container, obj)
@ -865,6 +886,16 @@ def st_upload(parser, args, print_queue, error_queue):
return return
if not options.leave_segments: if not options.leave_segments:
old_manifest = headers.get('x-object-manifest') old_manifest = headers.get('x-object-manifest')
if utils.config_true_value(
headers.get('x-static-large-object')):
headers, manifest_data = conn.get_object(
container, obj,
query_string='multipart-manifest=get')
for old_seg in json.loads(manifest_data):
seg_path = old_seg['name'].lstrip('/')
if isinstance(seg_path, unicode):
seg_path = seg_path.encode('utf-8')
old_slo_manifest_paths.append(seg_path)
except ClientException, err: except ClientException, err:
if err.http_status != 404: if err.http_status != 404:
raise raise
@ -879,9 +910,10 @@ def st_upload(parser, args, print_queue, error_queue):
seg_container = options.segment_container seg_container = options.segment_container
full_size = getsize(path) full_size = getsize(path)
segment_queue = Queue(10000) segment_queue = Queue(10000)
segment_threads = [QueueFunctionThread(segment_queue, segment_threads = [
_segment_job, create_connection()) for _junk in QueueFunctionThread(segment_queue,
xrange(options.segment_threads)] _segment_job, create_connection(), store_results=True)
for _junk in xrange(options.segment_threads)]
for thread in segment_threads: for thread in segment_threads:
thread.start() thread.start()
segment = 0 segment = 0
@ -890,13 +922,20 @@ def st_upload(parser, args, print_queue, error_queue):
segment_size = int(options.segment_size) segment_size = int(options.segment_size)
if segment_start + segment_size > full_size: if segment_start + segment_size > full_size:
segment_size = full_size - segment_start segment_size = full_size - segment_start
segment_queue.put({'path': path, if options.use_slo:
'obj': '%s/%s/%s/%s/%08d' % (obj, segment_name = '%s/slo/%s/%s/%s/%08d' % (
put_headers['x-object-meta-mtime'], full_size, obj, put_headers['x-object-meta-mtime'],
options.segment_size, segment), full_size, options.segment_size, segment)
'segment_start': segment_start, else:
'segment_size': segment_size, segment_name = '%s/%s/%s/%s/%08d' % (
'log_line': '%s segment %s' % (obj, segment)}) obj, put_headers['x-object-meta-mtime'],
full_size, options.segment_size, segment)
segment_queue.put(
{'path': path, 'obj': segment_name,
'segment_start': segment_start,
'segment_size': segment_size,
'segment_index': segment,
'log_line': '%s segment %s' % (obj, segment)})
segment += 1 segment += 1
segment_start += segment_size segment_start += segment_size
while not segment_queue.empty(): while not segment_queue.empty():
@ -909,27 +948,59 @@ def st_upload(parser, args, print_queue, error_queue):
raise ClientException('Aborting manifest creation ' raise ClientException('Aborting manifest creation '
'because not all segments could be uploaded. %s/%s' 'because not all segments could be uploaded. %s/%s'
% (container, obj)) % (container, obj))
new_object_manifest = '%s/%s/%s/%s/%s' % ( if options.use_slo:
quote(seg_container), quote(obj), slo_segments = []
put_headers['x-object-meta-mtime'], full_size, for thread in segment_threads:
options.segment_size) slo_segments += thread.results
if old_manifest == new_object_manifest: slo_segments.sort(key=lambda d: d['segment_index'])
old_manifest = None for seg in slo_segments:
put_headers['x-object-manifest'] = new_object_manifest seg_loc = seg['segment_location'].lstrip('/')
conn.put_object(container, obj, '', content_length=0, if isinstance(seg_loc, unicode):
headers=put_headers) seg_loc = seg_loc.encode('utf-8')
new_slo_manifest_paths.add(seg_loc)
manifest_data = json.dumps([
{'path': d['segment_location'],
'etag': d['segment_etag'],
'size_bytes': d['segment_size']}
for d in slo_segments])
put_headers['x-static-large-object'] = 'true'
conn.put_object(container, obj, manifest_data,
headers=put_headers,
query_string='multipart-manifest=put')
else:
new_object_manifest = '%s/%s/%s/%s/%s/' % (
quote(seg_container), quote(obj),
put_headers['x-object-meta-mtime'], full_size,
options.segment_size)
if old_manifest and old_manifest.rstrip('/') == \
new_object_manifest.rstrip('/'):
old_manifest = None
put_headers['x-object-manifest'] = new_object_manifest
conn.put_object(container, obj, '', content_length=0,
headers=put_headers)
else: else:
conn.put_object(container, obj, open(path, 'rb'), conn.put_object(container, obj, open(path, 'rb'),
content_length=getsize(path), headers=put_headers) content_length=getsize(path), headers=put_headers)
if old_manifest: if old_manifest or old_slo_manifest_paths:
segment_queue = Queue(10000) segment_queue = Queue(10000)
scontainer, sprefix = old_manifest.split('/', 1) if old_manifest:
scontainer = unquote(scontainer) scontainer, sprefix = old_manifest.split('/', 1)
sprefix = unquote(sprefix) scontainer = unquote(scontainer)
for delobj in conn.get_container(scontainer, sprefix = unquote(sprefix).rstrip('/') + '/'
prefix=sprefix)[1]: for delobj in conn.get_container(scontainer,
segment_queue.put({'delete': True, prefix=sprefix)[1]:
'container': scontainer, 'obj': delobj['name']}) segment_queue.put({'delete': True,
'container': scontainer, 'obj': delobj['name']})
if old_slo_manifest_paths:
for seg_to_delete in old_slo_manifest_paths:
if seg_to_delete in new_slo_manifest_paths:
continue
scont, sobj = \
seg_to_delete.split('/', 1)
segment_queue.put({'delete': True,
'container': scont, 'obj': sobj})
if not segment_queue.empty(): if not segment_queue.empty():
segment_threads = [QueueFunctionThread(segment_queue, segment_threads = [QueueFunctionThread(segment_queue,
_segment_job, create_connection()) for _junk in _segment_job, create_connection()) for _junk in

View File

@ -672,7 +672,7 @@ def delete_container(url, token, container, http_conn=None):
def get_object(url, token, container, name, http_conn=None, def get_object(url, token, container, name, http_conn=None,
resp_chunk_size=None): resp_chunk_size=None, query_string=None):
""" """
Get an object Get an object
@ -686,6 +686,7 @@ def get_object(url, token, container, name, http_conn=None,
you specify a resp_chunk_size you must fully read you specify a resp_chunk_size you must fully read
the object's contents before making another the object's contents before making another
request. request.
:param query_string: if set will be appended with '?' to generated path
:returns: a tuple of (response headers, the object's contents) The response :returns: a tuple of (response headers, the object's contents) The response
headers will be a dict and all header names will be lowercase. headers will be a dict and all header names will be lowercase.
:raises ClientException: HTTP GET request failed :raises ClientException: HTTP GET request failed
@ -695,6 +696,8 @@ def get_object(url, token, container, name, http_conn=None,
else: else:
parsed, conn = http_connection(url) parsed, conn = http_connection(url)
path = '%s/%s/%s' % (parsed.path, quote(container), quote(name)) path = '%s/%s/%s' % (parsed.path, quote(container), quote(name))
if query_string:
path += '?' + query_string
method = 'GET' method = 'GET'
headers = {'X-Auth-Token': token} headers = {'X-Auth-Token': token}
conn.request(method, path, '', headers) conn.request(method, path, '', headers)
@ -766,7 +769,8 @@ def head_object(url, token, container, name, http_conn=None):
def put_object(url, token=None, container=None, name=None, contents=None, def put_object(url, token=None, container=None, name=None, contents=None,
content_length=None, etag=None, chunk_size=None, content_length=None, etag=None, chunk_size=None,
content_type=None, headers=None, http_conn=None, proxy=None): content_type=None, headers=None, http_conn=None, proxy=None,
query_string=None):
""" """
Put an object Put an object
@ -794,6 +798,7 @@ def put_object(url, token=None, container=None, name=None, contents=None,
conn object) conn object)
:param proxy: proxy to connect through, if any; None by default; str of the :param proxy: proxy to connect through, if any; None by default; str of the
format 'http://127.0.0.1:8888' to set one format 'http://127.0.0.1:8888' to set one
:param query_string: if set will be appended with '?' to generated path
:returns: etag from server response :returns: etag from server response
:raises ClientException: HTTP PUT request failed :raises ClientException: HTTP PUT request failed
""" """
@ -806,6 +811,8 @@ def put_object(url, token=None, container=None, name=None, contents=None,
path = '%s/%s' % (path.rstrip('/'), quote(container)) path = '%s/%s' % (path.rstrip('/'), quote(container))
if name: if name:
path = '%s/%s' % (path.rstrip('/'), quote(name)) path = '%s/%s' % (path.rstrip('/'), quote(name))
if query_string:
path += '?' + query_string
if headers: if headers:
headers = dict(headers) headers = dict(headers)
else: else:
@ -901,7 +908,7 @@ def post_object(url, token, container, name, headers, http_conn=None):
def delete_object(url, token=None, container=None, name=None, http_conn=None, def delete_object(url, token=None, container=None, name=None, http_conn=None,
headers=None, proxy=None): headers=None, proxy=None, query_string=None):
""" """
Delete object Delete object
@ -916,6 +923,7 @@ def delete_object(url, token=None, container=None, name=None, http_conn=None,
:param headers: additional headers to include in the request :param headers: additional headers to include in the request
:param proxy: proxy to connect through, if any; None by default; str of the :param proxy: proxy to connect through, if any; None by default; str of the
format 'http://127.0.0.1:8888' to set one format 'http://127.0.0.1:8888' to set one
:param query_string: if set will be appended with '?' to generated path
:raises ClientException: HTTP DELETE request failed :raises ClientException: HTTP DELETE request failed
""" """
if http_conn: if http_conn:
@ -927,6 +935,8 @@ def delete_object(url, token=None, container=None, name=None, http_conn=None,
path = '%s/%s' % (path.rstrip('/'), quote(container)) path = '%s/%s' % (path.rstrip('/'), quote(container))
if name: if name:
path = '%s/%s' % (path.rstrip('/'), quote(name)) path = '%s/%s' % (path.rstrip('/'), quote(name))
if query_string:
path += '?' + query_string
if headers: if headers:
headers = dict(headers) headers = dict(headers)
else: else:
@ -1085,14 +1095,16 @@ class Connection(object):
"""Wrapper for :func:`head_object`""" """Wrapper for :func:`head_object`"""
return self._retry(None, head_object, container, obj) return self._retry(None, head_object, container, obj)
def get_object(self, container, obj, resp_chunk_size=None): def get_object(self, container, obj, resp_chunk_size=None,
query_string=None):
"""Wrapper for :func:`get_object`""" """Wrapper for :func:`get_object`"""
return self._retry(None, get_object, container, obj, return self._retry(None, get_object, container, obj,
resp_chunk_size=resp_chunk_size) resp_chunk_size=resp_chunk_size,
query_string=query_string)
def put_object(self, container, obj, contents, content_length=None, def put_object(self, container, obj, contents, content_length=None,
etag=None, chunk_size=None, content_type=None, etag=None, chunk_size=None, content_type=None,
headers=None): headers=None, query_string=None):
"""Wrapper for :func:`put_object`""" """Wrapper for :func:`put_object`"""
def _default_reset(*args, **kwargs): def _default_reset(*args, **kwargs):
@ -1100,7 +1112,11 @@ class Connection(object):
'ability to reset contents for reupload.' 'ability to reset contents for reupload.'
% (container, obj)) % (container, obj))
reset_func = _default_reset if isinstance(contents, str):
# if its a str then you can retry as much as you want
reset_func = None
else:
reset_func = _default_reset
tell = getattr(contents, 'tell', None) tell = getattr(contents, 'tell', None)
seek = getattr(contents, 'seek', None) seek = getattr(contents, 'seek', None)
if tell and seek: if tell and seek:
@ -1112,12 +1128,13 @@ class Connection(object):
return self._retry(reset_func, put_object, container, obj, contents, return self._retry(reset_func, put_object, container, obj, contents,
content_length=content_length, etag=etag, content_length=content_length, etag=etag,
chunk_size=chunk_size, content_type=content_type, chunk_size=chunk_size, content_type=content_type,
headers=headers) headers=headers, query_string=query_string)
def post_object(self, container, obj, headers): def post_object(self, container, obj, headers):
"""Wrapper for :func:`post_object`""" """Wrapper for :func:`post_object`"""
return self._retry(None, post_object, container, obj, headers) return self._retry(None, post_object, container, obj, headers)
def delete_object(self, container, obj): def delete_object(self, container, obj, query_string=None):
"""Wrapper for :func:`delete_object`""" """Wrapper for :func:`delete_object`"""
return self._retry(None, delete_object, container, obj) return self._retry(None, delete_object, container, obj,
query_string=query_string)

View File

@ -121,12 +121,15 @@ class MockHttpTest(testtools.TestCase):
def fake_http_connection(*args, **kwargs): def fake_http_connection(*args, **kwargs):
_orig_http_connection = c.http_connection _orig_http_connection = c.http_connection
return_read = kwargs.get('return_read') return_read = kwargs.get('return_read')
query_string = kwargs.get('query_string')
def wrapper(url, proxy=None): def wrapper(url, proxy=None):
parsed, _conn = _orig_http_connection(url, proxy=proxy) parsed, _conn = _orig_http_connection(url, proxy=proxy)
conn = fake_http_connect(*args, **kwargs)() conn = fake_http_connect(*args, **kwargs)()
def request(*args, **kwargs): def request(method, url, *args, **kwargs):
if query_string:
self.assert_(url.endswith('?' + query_string))
return return
conn.request = request conn.request = request
@ -430,6 +433,12 @@ class TestGetObject(MockHttpTest):
self.assertRaises(c.ClientException, c.get_object, self.assertRaises(c.ClientException, c.get_object,
'http://www.test.com', 'asdf', 'asdf', 'asdf') 'http://www.test.com', 'asdf', 'asdf', 'asdf')
def test_query_string(self):
c.http_connection = self.fake_http_connection(200,
query_string="hello=20")
c.get_object('http://www.test.com', 'asdf', 'asdf', 'asdf',
query_string="hello=20")
class TestHeadObject(MockHttpTest): class TestHeadObject(MockHttpTest):
@ -496,6 +505,12 @@ class TestPutObject(MockHttpTest):
except c.ClientException as e: except c.ClientException as e:
self.assertEquals(e.http_response_content, body) self.assertEquals(e.http_response_content, body)
def test_query_string(self):
c.http_connection = self.fake_http_connection(200,
query_string="hello=20")
c.put_object('http://www.test.com', 'asdf', 'asdf', 'asdf',
query_string="hello=20")
class TestPostObject(MockHttpTest): class TestPostObject(MockHttpTest):
@ -543,6 +558,12 @@ class TestDeleteObject(MockHttpTest):
self.assertRaises(c.ClientException, c.delete_object, self.assertRaises(c.ClientException, c.delete_object,
'http://www.test.com', 'asdf', 'asdf', 'asdf') 'http://www.test.com', 'asdf', 'asdf', 'asdf')
def test_query_string(self):
c.http_connection = self.fake_http_connection(200,
query_string="hello=20")
c.delete_object('http://www.test.com', 'asdf', 'asdf', 'asdf',
query_string="hello=20")
class TestConnection(MockHttpTest): class TestConnection(MockHttpTest):