diff --git a/swift/common/exceptions.py b/swift/common/exceptions.py index 219b03de75..ea7be528c9 100644 --- a/swift/common/exceptions.py +++ b/swift/common/exceptions.py @@ -50,6 +50,10 @@ class DiskFileNotExist(SwiftException): pass +class DiskFileNoSpace(SwiftException): + pass + + class PathNotDir(OSError): pass diff --git a/swift/obj/server.py b/swift/obj/server.py index 58a1fb756f..6f08079f9f 100644 --- a/swift/obj/server.py +++ b/swift/obj/server.py @@ -38,7 +38,7 @@ from swift.common.bufferedhttp import http_connect from swift.common.constraints import check_object_creation, check_mount, \ check_float, check_utf8 from swift.common.exceptions import ConnectionTimeout, DiskFileError, \ - DiskFileNotExist, DiskFileCollision + DiskFileNotExist, DiskFileCollision, DiskFileNoSpace from swift.obj.replicator import tpool_reraise, invalidate_hash, \ quarantine_renamer, get_hashes from swift.common.http import is_success @@ -284,12 +284,22 @@ class DiskFile(object): int(self.metadata['X-Delete-At']) <= time.time()) @contextmanager - def mkstemp(self): - """Contextmanager to make a temporary file.""" + def mkstemp(self, size=None): + """ + Contextmanager to make a temporary file. + + :param size: optional initial size of file to allocate on disk + :raises DiskFileNoSpace: if a size is specified and fallocate fails + """ if not os.path.exists(self.tmpdir): mkdirs(self.tmpdir) fd, self.tmppath = mkstemp(dir=self.tmpdir) try: + if size is not None and size > 0: + try: + fallocate(fd, size) + except OSError: + raise DiskFileNoSpace() yield fd finally: try: @@ -302,13 +312,14 @@ class DiskFile(object): except OSError: pass - def put(self, fd, metadata, extension='.data'): + def put(self, fd, fsize, metadata, extension='.data'): """ Finalize writing the file on disk, and renames it from the temp file to the real location. This should be called after the data has been written to the temp file. :param fd: file descriptor of the temp file + :param fsize: final on-disk size of the created file :param metadata: dictionary of metadata to be written :param extension: extension to be used when making the file """ @@ -322,11 +333,10 @@ class DiskFile(object): # redundant work the drop cache code will perform on the pages (now # that after fsync the pages will be all clean). tpool.execute(fsync, fd) - if 'Content-Length' in metadata: - # From the Department of the Redundancy Department, make sure we - # call drop_cache() after fsync() to avoid redundant work (pages - # all clean). - self.drop_cache(fd, 0, int(metadata['Content-Length'])) + # From the Department of the Redundancy Department, make sure we + # call drop_cache() after fsync() to avoid redundant work (pages + # all clean). + self.drop_cache(fd, 0, fsize) invalidate_hash(os.path.dirname(self.datadir)) # After the rename completes, this object will be available for other # requests to reference. @@ -343,7 +353,7 @@ class DiskFile(object): """ extension = '.ts' if tombstone else '.meta' with self.mkstemp() as fd: - self.put(fd, metadata, extension=extension) + self.put(fd, 0, metadata, extension=extension) def unlinkold(self, timestamp): """ @@ -660,68 +670,70 @@ class ObjectController(object): orig_timestamp = file.metadata.get('X-Timestamp') upload_expiration = time.time() + self.max_upload_time etag = md5() + fsize = request.headers.get('content-length', None) + if fsize is not None: + fsize = int(fsize) upload_size = 0 last_sync = 0 elapsed_time = 0 - with file.mkstemp() as fd: - try: - fallocate(fd, int(request.headers.get('content-length', 0))) - except OSError: - return HTTPInsufficientStorage(drive=device, request=request) - reader = request.environ['wsgi.input'].read - for chunk in iter(lambda: reader(self.network_chunk_size), ''): - start_time = time.time() - upload_size += len(chunk) - if time.time() > upload_expiration: - self.logger.increment('PUT.timeouts') - return HTTPRequestTimeout(request=request) - etag.update(chunk) - while chunk: - written = os.write(fd, chunk) - chunk = chunk[written:] - # For large files sync every 512MB (by default) written - if upload_size - last_sync >= self.bytes_per_sync: - tpool.execute(fdatasync, fd) - drop_buffer_cache(fd, last_sync, upload_size - last_sync) - last_sync = upload_size - sleep() - elapsed_time += time.time() - start_time + try: + with file.mkstemp(size=fsize) as fd: + reader = request.environ['wsgi.input'].read + for chunk in iter(lambda: reader(self.network_chunk_size), ''): + start_time = time.time() + upload_size += len(chunk) + if time.time() > upload_expiration: + self.logger.increment('PUT.timeouts') + return HTTPRequestTimeout(request=request) + etag.update(chunk) + while chunk: + written = os.write(fd, chunk) + chunk = chunk[written:] + # For large files sync every 512MB (by default) written + if upload_size - last_sync >= self.bytes_per_sync: + tpool.execute(fdatasync, fd) + drop_buffer_cache(fd, last_sync, + upload_size - last_sync) + last_sync = upload_size + sleep() + elapsed_time += time.time() - start_time - if upload_size: - self.logger.transfer_rate( - 'PUT.' + device + '.timing', elapsed_time, upload_size) + if upload_size: + self.logger.transfer_rate( + 'PUT.' + device + '.timing', elapsed_time, upload_size) - if 'content-length' in request.headers and \ - int(request.headers['content-length']) != upload_size: - return HTTPClientDisconnect(request=request) - etag = etag.hexdigest() - if 'etag' in request.headers and \ - request.headers['etag'].lower() != etag: - return HTTPUnprocessableEntity(request=request) - metadata = { - 'X-Timestamp': request.headers['x-timestamp'], - 'Content-Type': request.headers['content-type'], - 'ETag': etag, - 'Content-Length': str(upload_size), - } - metadata.update(val for val in request.headers.iteritems() - if val[0].lower().startswith('x-object-meta-') and - len(val[0]) > 14) - for header_key in self.allowed_headers: - if header_key in request.headers: - header_caps = header_key.title() - metadata[header_caps] = request.headers[header_key] - old_delete_at = int(file.metadata.get('X-Delete-At') or 0) - if old_delete_at != new_delete_at: - if new_delete_at: - self.delete_at_update( - 'PUT', new_delete_at, account, container, obj, - request.headers, device) - if old_delete_at: - self.delete_at_update( - 'DELETE', old_delete_at, account, container, obj, - request.headers, device) - file.put(fd, metadata) + if fsize is not None and fsize != upload_size: + return HTTPClientDisconnect(request=request) + etag = etag.hexdigest() + if 'etag' in request.headers and \ + request.headers['etag'].lower() != etag: + return HTTPUnprocessableEntity(request=request) + metadata = { + 'X-Timestamp': request.headers['x-timestamp'], + 'Content-Type': request.headers['content-type'], + 'ETag': etag, + 'Content-Length': str(upload_size), + } + metadata.update(val for val in request.headers.iteritems() + if val[0].lower().startswith('x-object-meta-') + and len(val[0]) > 14) + for header_key in self.allowed_headers: + if header_key in request.headers: + header_caps = header_key.title() + metadata[header_caps] = request.headers[header_key] + old_delete_at = int(file.metadata.get('X-Delete-At') or 0) + if old_delete_at != new_delete_at: + if new_delete_at: + self.delete_at_update( + 'PUT', new_delete_at, account, container, obj, + request.headers, device) + if old_delete_at: + self.delete_at_update( + 'DELETE', old_delete_at, account, container, obj, + request.headers, device) + file.put(fd, upload_size, metadata) + except DiskFileNoSpace: + return HTTPInsufficientStorage(drive=device, request=request) file.unlinkold(metadata['X-Timestamp']) if not orig_timestamp or \ orig_timestamp < request.headers['x-timestamp']: diff --git a/test/unit/obj/test_auditor.py b/test/unit/obj/test_auditor.py index 250cdf3db9..db80063007 100644 --- a/test/unit/obj/test_auditor.py +++ b/test/unit/obj/test_auditor.py @@ -72,7 +72,7 @@ class TestAuditor(unittest.TestCase): 'X-Timestamp': timestamp, 'Content-Length': str(os.fstat(fd).st_size), } - self.disk_file.put(fd, metadata) + self.disk_file.put(fd, 1024, metadata) pre_quarantines = self.auditor.quarantines self.auditor.object_audit( @@ -100,7 +100,7 @@ class TestAuditor(unittest.TestCase): 'X-Timestamp': timestamp, 'Content-Length': str(os.fstat(fd).st_size), } - self.disk_file.put(fd, metadata) + self.disk_file.put(fd, 1024, metadata) pre_quarantines = self.auditor.quarantines # remake so it will have metadata self.disk_file = DiskFile(self.devices, 'sda', '0', 'a', 'c', 'o', @@ -161,7 +161,7 @@ class TestAuditor(unittest.TestCase): 'X-Timestamp': timestamp, 'Content-Length': str(os.fstat(fd).st_size), } - self.disk_file.put(fd, metadata) + self.disk_file.put(fd, 1024, metadata) self.disk_file.close() self.auditor.audit_all_objects() self.assertEquals(self.auditor.quarantines, pre_quarantines) @@ -181,7 +181,7 @@ class TestAuditor(unittest.TestCase): 'X-Timestamp': timestamp, 'Content-Length': str(os.fstat(fd).st_size), } - self.disk_file.put(fd, metadata) + self.disk_file.put(fd, 1024, metadata) self.disk_file.close() os.write(fd, 'extra_data') self.auditor.audit_all_objects() @@ -202,7 +202,7 @@ class TestAuditor(unittest.TestCase): 'X-Timestamp': timestamp, 'Content-Length': str(os.fstat(fd).st_size), } - self.disk_file.put(fd, metadata) + self.disk_file.put(fd, 10, metadata) self.disk_file.close() self.auditor.audit_all_objects() self.disk_file = DiskFile(self.devices, 'sdb', '0', 'a', 'c', @@ -218,7 +218,7 @@ class TestAuditor(unittest.TestCase): 'X-Timestamp': timestamp, 'Content-Length': str(os.fstat(fd).st_size), } - self.disk_file.put(fd, metadata) + self.disk_file.put(fd, 10, metadata) self.disk_file.close() os.write(fd, 'extra_data') self.auditor.audit_all_objects() @@ -238,7 +238,7 @@ class TestAuditor(unittest.TestCase): 'X-Timestamp': str(normalize_timestamp(time.time())), 'Content-Length': str(os.fstat(fd).st_size), } - self.disk_file.put(fd, metadata) + self.disk_file.put(fd, 1024, metadata) etag = md5() etag.update('1' + '0' * 1023) etag = etag.hexdigest() @@ -275,7 +275,7 @@ class TestAuditor(unittest.TestCase): 'X-Timestamp': str(normalize_timestamp(time.time())), 'Content-Length': 10, } - self.disk_file.put(fd, metadata) + self.disk_file.put(fd, 10, metadata) etag = md5() etag = etag.hexdigest() metadata['ETag'] = etag diff --git a/test/unit/obj/test_server.py b/test/unit/obj/test_server.py index 7c924198cd..ac70ec5ac8 100755 --- a/test/unit/obj/test_server.py +++ b/test/unit/obj/test_server.py @@ -224,7 +224,7 @@ class TestDiskFile(unittest.TestCase): 'X-Timestamp': timestamp, 'Content-Length': str(os.fstat(fd).st_size), } - df.put(fd, metadata, extension=extension) + df.put(fd, fsize, metadata, extension=extension) if invalid_type == 'ETag': etag = md5() etag.update('1' + '0' * (fsize - 1))