Static large object support.
Also fixed bug with current large objects with segment listing prefixes. Allow retry for object PUTs when possible. Change-Id: I0edff127fd5d5c53da33aa7cb76a4f4dc85bf6e6
This commit is contained in:
		
							
								
								
									
										107
									
								
								bin/swift
									
									
									
									
									
								
							
							
						
						
									
										107
									
								
								bin/swift
									
									
									
									
									
								
							| @@ -30,6 +30,11 @@ from time import sleep, time | ||||
| from traceback import format_exception | ||||
| from urllib import quote, unquote | ||||
|  | ||||
| try: | ||||
|     import simplejson as json | ||||
| except ImportError: | ||||
|     import json | ||||
|  | ||||
| from swiftclient import Connection, ClientException, HTTPException, utils | ||||
| from swiftclient.version import version_info | ||||
|  | ||||
| @@ -111,6 +116,8 @@ class QueueFunctionThread(Thread): | ||||
|         self.args = args | ||||
|         self.kwargs = kwargs | ||||
|         self.exc_infos = [] | ||||
|         self.results = [] | ||||
|         self.store_results = kwargs.pop('store_results', False) | ||||
|  | ||||
|     def run(self): | ||||
|         while True: | ||||
| @@ -123,7 +130,9 @@ class QueueFunctionThread(Thread): | ||||
|             else: | ||||
|                 try: | ||||
|                     if not self.abort: | ||||
|                         self.func(item, *self.args, **self.kwargs) | ||||
|                         res = self.func(item, *self.args, **self.kwargs) | ||||
|                         if self.store_results: | ||||
|                             self.results.append(res) | ||||
|                 except Exception: | ||||
|                     self.exc_infos.append(exc_info()) | ||||
|                 finally: | ||||
| @@ -171,19 +180,23 @@ def st_delete(parser, args, print_queue, error_queue): | ||||
|     def _delete_object((container, obj), conn): | ||||
|         try: | ||||
|             old_manifest = None | ||||
|             query_string = None | ||||
|             if not options.leave_segments: | ||||
|                 try: | ||||
|                     old_manifest = conn.head_object(container, obj).get( | ||||
|                         'x-object-manifest') | ||||
|                     headers = conn.head_object(container, obj) | ||||
|                     old_manifest = headers.get('x-object-manifest') | ||||
|                     if utils.config_true_value( | ||||
|                             headers.get('x-static-large-object')): | ||||
|                         query_string = 'multipart-manifest=delete' | ||||
|                 except ClientException, err: | ||||
|                     if err.http_status != 404: | ||||
|                         raise | ||||
|             conn.delete_object(container, obj) | ||||
|             conn.delete_object(container, obj, query_string=query_string) | ||||
|             if old_manifest: | ||||
|                 segment_queue = Queue(10000) | ||||
|                 scontainer, sprefix = old_manifest.split('/', 1) | ||||
|                 scontainer = unquote(scontainer) | ||||
|                 sprefix = unquote(sprefix) | ||||
|                 sprefix = unquote(sprefix).rstrip('/') + '/' | ||||
|                 for delobj in conn.get_container(scontainer, | ||||
|                                                  prefix=sprefix)[1]: | ||||
|                     segment_queue.put((scontainer, delobj['name'])) | ||||
| @@ -793,7 +806,10 @@ def st_upload(parser, args, print_queue, error_queue): | ||||
|         default=[], help='Set request headers with the syntax header:value. ' | ||||
|         ' This option may be repeated. Example -H content-type:text/plain ' | ||||
|         '-H "Content-Length: 4000"') | ||||
|  | ||||
|     parser.add_option('', '--use-slo', action='store_true', default=False, | ||||
|                       help='When used in conjuction with --segment-size will ' | ||||
|                       'create a Static Large Object instead of the default ' | ||||
|                       'Dynamic Large Object.') | ||||
|     (options, args) = parse_args(parser, args) | ||||
|     args = args[1:] | ||||
|     if len(args) < 2: | ||||
| @@ -811,14 +827,17 @@ def st_upload(parser, args, print_queue, error_queue): | ||||
|             seg_container = args[0] +'_segments' | ||||
|             if options.segment_container: | ||||
|                seg_container = options.segment_container | ||||
|             conn.put_object(job.get('container', seg_container), | ||||
|             etag = conn.put_object(job.get('container', seg_container), | ||||
|                 job['obj'], fp, content_length=job['segment_size']) | ||||
|             job['segment_location'] = '/%s/%s' % (seg_container, job['obj']) | ||||
|             job['segment_etag'] = etag | ||||
|         if options.verbose and 'log_line' in job: | ||||
|             if conn.attempts > 1: | ||||
|                 print_queue.put('%s [after %d attempts]' % | ||||
|                                 (job['log_line'], conn.attempts)) | ||||
|             else: | ||||
|                 print_queue.put(job['log_line']) | ||||
|         return job | ||||
|  | ||||
|     def _object_job(job, conn): | ||||
|         path = job['path'] | ||||
| @@ -855,6 +874,8 @@ def st_upload(parser, args, print_queue, error_queue): | ||||
|                 # manifest object and need to delete the old segments | ||||
|                 # ourselves. | ||||
|                 old_manifest = None | ||||
|                 old_slo_manifest_paths = [] | ||||
|                 new_slo_manifest_paths = set() | ||||
|                 if options.changed or not options.leave_segments: | ||||
|                     try: | ||||
|                         headers = conn.head_object(container, obj) | ||||
| @@ -865,6 +886,16 @@ def st_upload(parser, args, print_queue, error_queue): | ||||
|                             return | ||||
|                         if not options.leave_segments: | ||||
|                             old_manifest = headers.get('x-object-manifest') | ||||
|                             if utils.config_true_value( | ||||
|                                     headers.get('x-static-large-object')): | ||||
|                                 headers, manifest_data = conn.get_object( | ||||
|                                     container, obj, | ||||
|                                     query_string='multipart-manifest=get') | ||||
|                                 for old_seg in json.loads(manifest_data): | ||||
|                                     seg_path = old_seg['name'].lstrip('/') | ||||
|                                     if isinstance(seg_path, unicode): | ||||
|                                         seg_path = seg_path.encode('utf-8') | ||||
|                                     old_slo_manifest_paths.append(seg_path) | ||||
|                     except ClientException, err: | ||||
|                         if err.http_status != 404: | ||||
|                             raise | ||||
| @@ -879,9 +910,10 @@ def st_upload(parser, args, print_queue, error_queue): | ||||
|                         seg_container = options.segment_container | ||||
|                     full_size = getsize(path) | ||||
|                     segment_queue = Queue(10000) | ||||
|                     segment_threads = [QueueFunctionThread(segment_queue, | ||||
|                         _segment_job, create_connection()) for _junk in | ||||
|                         xrange(options.segment_threads)] | ||||
|                     segment_threads = [ | ||||
|                         QueueFunctionThread(segment_queue, | ||||
|                         _segment_job, create_connection(), store_results=True) | ||||
|                         for _junk in xrange(options.segment_threads)] | ||||
|                     for thread in segment_threads: | ||||
|                         thread.start() | ||||
|                     segment = 0 | ||||
| @@ -890,12 +922,19 @@ def st_upload(parser, args, print_queue, error_queue): | ||||
|                         segment_size = int(options.segment_size) | ||||
|                         if segment_start + segment_size > full_size: | ||||
|                             segment_size = full_size - segment_start | ||||
|                         segment_queue.put({'path': path, | ||||
|                             'obj': '%s/%s/%s/%s/%08d' % (obj, | ||||
|                                 put_headers['x-object-meta-mtime'], full_size, | ||||
|                                 options.segment_size, segment), | ||||
|                         if options.use_slo: | ||||
|                             segment_name = '%s/slo/%s/%s/%s/%08d' % ( | ||||
|                                 obj, put_headers['x-object-meta-mtime'], | ||||
|                                 full_size, options.segment_size, segment) | ||||
|                         else: | ||||
|                             segment_name = '%s/%s/%s/%s/%08d' % ( | ||||
|                                 obj, put_headers['x-object-meta-mtime'], | ||||
|                                 full_size, options.segment_size, segment) | ||||
|                         segment_queue.put( | ||||
|                             {'path': path, 'obj': segment_name, | ||||
|                              'segment_start': segment_start, | ||||
|                              'segment_size': segment_size, | ||||
|                              'segment_index': segment, | ||||
|                              'log_line': '%s segment %s' % (obj, segment)}) | ||||
|                         segment += 1 | ||||
|                         segment_start += segment_size | ||||
| @@ -909,11 +948,34 @@ def st_upload(parser, args, print_queue, error_queue): | ||||
|                         raise ClientException('Aborting manifest creation ' | ||||
|                             'because not all segments could be uploaded. %s/%s' | ||||
|                             % (container, obj)) | ||||
|                     new_object_manifest = '%s/%s/%s/%s/%s' % ( | ||||
|                     if options.use_slo: | ||||
|                         slo_segments = [] | ||||
|                         for thread in segment_threads: | ||||
|                             slo_segments += thread.results | ||||
|                         slo_segments.sort(key=lambda d: d['segment_index']) | ||||
|                         for seg in slo_segments: | ||||
|                             seg_loc = seg['segment_location'].lstrip('/') | ||||
|                             if isinstance(seg_loc, unicode): | ||||
|                                 seg_loc = seg_loc.encode('utf-8') | ||||
|                             new_slo_manifest_paths.add(seg_loc) | ||||
|  | ||||
|                         manifest_data = json.dumps([ | ||||
|                             {'path': d['segment_location'], | ||||
|                              'etag': d['segment_etag'], | ||||
|                              'size_bytes': d['segment_size']} | ||||
|                             for d in slo_segments]) | ||||
|  | ||||
|                         put_headers['x-static-large-object'] = 'true' | ||||
|                         conn.put_object(container, obj, manifest_data, | ||||
|                                         headers=put_headers, | ||||
|                                         query_string='multipart-manifest=put') | ||||
|                     else: | ||||
|                         new_object_manifest = '%s/%s/%s/%s/%s/' % ( | ||||
|                             quote(seg_container), quote(obj), | ||||
|                             put_headers['x-object-meta-mtime'], full_size, | ||||
|                                 options.segment_size) | ||||
|                     if old_manifest == new_object_manifest: | ||||
|                         if old_manifest and old_manifest.rstrip('/') == \ | ||||
|                                 new_object_manifest.rstrip('/'): | ||||
|                             old_manifest = None | ||||
|                         put_headers['x-object-manifest'] = new_object_manifest | ||||
|                         conn.put_object(container, obj, '', content_length=0, | ||||
| @@ -921,15 +983,24 @@ def st_upload(parser, args, print_queue, error_queue): | ||||
|                 else: | ||||
|                     conn.put_object(container, obj, open(path, 'rb'), | ||||
|                         content_length=getsize(path), headers=put_headers) | ||||
|                 if old_manifest: | ||||
|                 if old_manifest or old_slo_manifest_paths: | ||||
|                     segment_queue = Queue(10000) | ||||
|                     if old_manifest: | ||||
|                         scontainer, sprefix = old_manifest.split('/', 1) | ||||
|                         scontainer = unquote(scontainer) | ||||
|                     sprefix = unquote(sprefix) | ||||
|                         sprefix = unquote(sprefix).rstrip('/') + '/' | ||||
|                         for delobj in conn.get_container(scontainer, | ||||
|                                                          prefix=sprefix)[1]: | ||||
|                             segment_queue.put({'delete': True, | ||||
|                                 'container': scontainer, 'obj': delobj['name']}) | ||||
|                     if old_slo_manifest_paths: | ||||
|                         for seg_to_delete in old_slo_manifest_paths: | ||||
|                             if seg_to_delete in new_slo_manifest_paths: | ||||
|                                 continue | ||||
|                             scont, sobj = \ | ||||
|                                 seg_to_delete.split('/', 1) | ||||
|                             segment_queue.put({'delete': True, | ||||
|                                 'container': scont, 'obj': sobj}) | ||||
|                     if not segment_queue.empty(): | ||||
|                         segment_threads = [QueueFunctionThread(segment_queue, | ||||
|                             _segment_job, create_connection()) for _junk in | ||||
|   | ||||
| @@ -672,7 +672,7 @@ def delete_container(url, token, container, http_conn=None): | ||||
|  | ||||
|  | ||||
| def get_object(url, token, container, name, http_conn=None, | ||||
|                resp_chunk_size=None): | ||||
|                resp_chunk_size=None, query_string=None): | ||||
|     """ | ||||
|     Get an object | ||||
|  | ||||
| @@ -686,6 +686,7 @@ def get_object(url, token, container, name, http_conn=None, | ||||
|                             you specify a resp_chunk_size you must fully read | ||||
|                             the object's contents before making another | ||||
|                             request. | ||||
|     :param query_string: if set will be appended with '?' to generated path | ||||
|     :returns: a tuple of (response headers, the object's contents) The response | ||||
|               headers will be a dict and all header names will be lowercase. | ||||
|     :raises ClientException: HTTP GET request failed | ||||
| @@ -695,6 +696,8 @@ def get_object(url, token, container, name, http_conn=None, | ||||
|     else: | ||||
|         parsed, conn = http_connection(url) | ||||
|     path = '%s/%s/%s' % (parsed.path, quote(container), quote(name)) | ||||
|     if query_string: | ||||
|         path += '?' + query_string | ||||
|     method = 'GET' | ||||
|     headers = {'X-Auth-Token': token} | ||||
|     conn.request(method, path, '', headers) | ||||
| @@ -766,7 +769,8 @@ def head_object(url, token, container, name, http_conn=None): | ||||
|  | ||||
| def put_object(url, token=None, container=None, name=None, contents=None, | ||||
|                content_length=None, etag=None, chunk_size=None, | ||||
|                content_type=None, headers=None, http_conn=None, proxy=None): | ||||
|                content_type=None, headers=None, http_conn=None, proxy=None, | ||||
|                query_string=None): | ||||
|     """ | ||||
|     Put an object | ||||
|  | ||||
| @@ -794,6 +798,7 @@ def put_object(url, token=None, container=None, name=None, contents=None, | ||||
|                       conn object) | ||||
|     :param proxy: proxy to connect through, if any; None by default; str of the | ||||
|                   format 'http://127.0.0.1:8888' to set one | ||||
|     :param query_string: if set will be appended with '?' to generated path | ||||
|     :returns: etag from server response | ||||
|     :raises ClientException: HTTP PUT request failed | ||||
|     """ | ||||
| @@ -806,6 +811,8 @@ def put_object(url, token=None, container=None, name=None, contents=None, | ||||
|         path = '%s/%s' % (path.rstrip('/'), quote(container)) | ||||
|     if name: | ||||
|         path = '%s/%s' % (path.rstrip('/'), quote(name)) | ||||
|     if query_string: | ||||
|         path += '?' + query_string | ||||
|     if headers: | ||||
|         headers = dict(headers) | ||||
|     else: | ||||
| @@ -901,7 +908,7 @@ def post_object(url, token, container, name, headers, http_conn=None): | ||||
|  | ||||
|  | ||||
| def delete_object(url, token=None, container=None, name=None, http_conn=None, | ||||
|                   headers=None, proxy=None): | ||||
|                   headers=None, proxy=None, query_string=None): | ||||
|     """ | ||||
|     Delete object | ||||
|  | ||||
| @@ -916,6 +923,7 @@ def delete_object(url, token=None, container=None, name=None, http_conn=None, | ||||
|     :param headers: additional headers to include in the request | ||||
|     :param proxy: proxy to connect through, if any; None by default; str of the | ||||
|                   format 'http://127.0.0.1:8888' to set one | ||||
|     :param query_string: if set will be appended with '?' to generated path | ||||
|     :raises ClientException: HTTP DELETE request failed | ||||
|     """ | ||||
|     if http_conn: | ||||
| @@ -927,6 +935,8 @@ def delete_object(url, token=None, container=None, name=None, http_conn=None, | ||||
|         path = '%s/%s' % (path.rstrip('/'), quote(container)) | ||||
|     if name: | ||||
|         path = '%s/%s' % (path.rstrip('/'), quote(name)) | ||||
|     if query_string: | ||||
|         path += '?' + query_string | ||||
|     if headers: | ||||
|         headers = dict(headers) | ||||
|     else: | ||||
| @@ -1085,14 +1095,16 @@ class Connection(object): | ||||
|         """Wrapper for :func:`head_object`""" | ||||
|         return self._retry(None, head_object, container, obj) | ||||
|  | ||||
|     def get_object(self, container, obj, resp_chunk_size=None): | ||||
|     def get_object(self, container, obj, resp_chunk_size=None, | ||||
|                    query_string=None): | ||||
|         """Wrapper for :func:`get_object`""" | ||||
|         return self._retry(None, get_object, container, obj, | ||||
|                            resp_chunk_size=resp_chunk_size) | ||||
|                            resp_chunk_size=resp_chunk_size, | ||||
|                            query_string=query_string) | ||||
|  | ||||
|     def put_object(self, container, obj, contents, content_length=None, | ||||
|                    etag=None, chunk_size=None, content_type=None, | ||||
|                    headers=None): | ||||
|                    headers=None, query_string=None): | ||||
|         """Wrapper for :func:`put_object`""" | ||||
|  | ||||
|         def _default_reset(*args, **kwargs): | ||||
| @@ -1100,6 +1112,10 @@ class Connection(object): | ||||
|                                   'ability to reset contents for reupload.' | ||||
|                                   % (container, obj)) | ||||
|  | ||||
|         if isinstance(contents, str): | ||||
|             # if its a str then you can retry as much as you want | ||||
|             reset_func = None | ||||
|         else: | ||||
|             reset_func = _default_reset | ||||
|         tell = getattr(contents, 'tell', None) | ||||
|         seek = getattr(contents, 'seek', None) | ||||
| @@ -1112,12 +1128,13 @@ class Connection(object): | ||||
|         return self._retry(reset_func, put_object, container, obj, contents, | ||||
|                            content_length=content_length, etag=etag, | ||||
|                            chunk_size=chunk_size, content_type=content_type, | ||||
|                            headers=headers) | ||||
|                            headers=headers, query_string=query_string) | ||||
|  | ||||
|     def post_object(self, container, obj, headers): | ||||
|         """Wrapper for :func:`post_object`""" | ||||
|         return self._retry(None, post_object, container, obj, headers) | ||||
|  | ||||
|     def delete_object(self, container, obj): | ||||
|     def delete_object(self, container, obj, query_string=None): | ||||
|         """Wrapper for :func:`delete_object`""" | ||||
|         return self._retry(None, delete_object, container, obj) | ||||
|         return self._retry(None, delete_object, container, obj, | ||||
|                            query_string=query_string) | ||||
|   | ||||
| @@ -123,12 +123,15 @@ class MockHttpTest(testtools.TestCase): | ||||
|         def fake_http_connection(*args, **kwargs): | ||||
|             _orig_http_connection = c.http_connection | ||||
|             return_read = kwargs.get('return_read') | ||||
|             query_string = kwargs.get('query_string') | ||||
|  | ||||
|             def wrapper(url, proxy=None): | ||||
|                 parsed, _conn = _orig_http_connection(url, proxy=proxy) | ||||
|                 conn = fake_http_connect(*args, **kwargs)() | ||||
|  | ||||
|                 def request(*args, **kwargs): | ||||
|                 def request(method, url, *args, **kwargs): | ||||
|                     if query_string: | ||||
|                         self.assert_(url.endswith('?' + query_string)) | ||||
|                     return | ||||
|                 conn.request = request | ||||
|  | ||||
| @@ -436,6 +439,12 @@ class TestGetObject(MockHttpTest): | ||||
|         self.assertRaises(c.ClientException, c.get_object, | ||||
|                           'http://www.test.com', 'asdf', 'asdf', 'asdf') | ||||
|  | ||||
|     def test_query_string(self): | ||||
|         c.http_connection = self.fake_http_connection(200, | ||||
|                                                       query_string="hello=20") | ||||
|         c.get_object('http://www.test.com', 'asdf', 'asdf', 'asdf', | ||||
|                      query_string="hello=20") | ||||
|  | ||||
|  | ||||
| class TestHeadObject(MockHttpTest): | ||||
|  | ||||
| @@ -492,7 +501,6 @@ class TestPutObject(MockHttpTest): | ||||
|             self.assertEquals(len(w), 1) | ||||
|             self.assertTrue(issubclass(w[-1].category, UserWarning)) | ||||
|  | ||||
|  | ||||
|     def test_server_error(self): | ||||
|         body = 'c' * 60 | ||||
|         c.http_connection = self.fake_http_connection(500, body=body) | ||||
| @@ -503,6 +511,12 @@ class TestPutObject(MockHttpTest): | ||||
|         except c.ClientException as e: | ||||
|             self.assertEquals(e.http_response_content, body) | ||||
|  | ||||
|     def test_query_string(self): | ||||
|         c.http_connection = self.fake_http_connection(200, | ||||
|                                                       query_string="hello=20") | ||||
|         c.put_object('http://www.test.com', 'asdf', 'asdf', 'asdf', | ||||
|                      query_string="hello=20") | ||||
|  | ||||
|  | ||||
| class TestPostObject(MockHttpTest): | ||||
|  | ||||
| @@ -550,6 +564,12 @@ class TestDeleteObject(MockHttpTest): | ||||
|         self.assertRaises(c.ClientException, c.delete_object, | ||||
|                           'http://www.test.com', 'asdf', 'asdf', 'asdf') | ||||
|  | ||||
|     def test_query_string(self): | ||||
|         c.http_connection = self.fake_http_connection(200, | ||||
|                                                       query_string="hello=20") | ||||
|         c.delete_object('http://www.test.com', 'asdf', 'asdf', 'asdf', | ||||
|                         query_string="hello=20") | ||||
|  | ||||
|  | ||||
| class TestConnection(MockHttpTest): | ||||
|  | ||||
|   | ||||
		Reference in New Issue
	
	Block a user
	 David Goetz
					David Goetz