Remove keep_data_fp argument from DiskFile constructor

All access to the data_file fp for DiskFile is moved after the new "open"
method.  This prepares to move some additional smarts into DiskFile and reduce
the surface area of the abstraction and the exposure of the underlying
implementation in the object-server.

Future work:

 * Consolidate put_metadata to DiskWriter
 * Add public "update_metdata" method to DiskFile
 * Create DiskReader class to gather all access of methods under "open"

Change-Id: I4de2f265bf099a810c5f1c14b5278d89bd0b382d
This commit is contained in:
Clay Gerrard 2013-08-30 18:08:24 -07:00
parent 21c322c35d
commit d51e873423
6 changed files with 157 additions and 87 deletions

View File

@ -38,6 +38,10 @@ class DiskFileError(SwiftException):
pass
class DiskFileNotOpenError(DiskFileError):
pass
class DiskFileCollision(DiskFileError):
pass

View File

@ -166,8 +166,8 @@ class AuditorWorker(object):
raise AuditException('Error when reading metadata: %s' % exc)
_junk, account, container, obj = name.split('/', 3)
df = diskfile.DiskFile(self.devices, device, partition,
account, container, obj, self.logger,
keep_data_fp=True)
account, container, obj, self.logger)
df.open()
try:
try:
obj_size = df.get_data_file_size()

View File

@ -38,7 +38,7 @@ from swift.common.utils import mkdirs, normalize_timestamp, \
fdatasync, drop_buffer_cache, ThreadPool, lock_path, write_pickle
from swift.common.exceptions import DiskFileError, DiskFileNotExist, \
DiskFileCollision, DiskFileNoSpace, DiskFileDeviceUnavailable, \
PathNotDir
PathNotDir, DiskFileNotOpenError
from swift.common.swob import multi_range_iterator
@ -342,7 +342,6 @@ class DiskWriter(object):
self.threadpool.force_run_in_thread(
self._finalize_put, metadata, target_path)
self.disk_file.metadata = metadata
class DiskFile(object):
@ -355,19 +354,17 @@ class DiskFile(object):
:param account: account name for the object
:param container: container name for the object
:param obj: object name for the object
:param keep_data_fp: if True, don't close the fp, otherwise close it
:param disk_chunk_size: size of chunks on file reads
:param bytes_per_sync: number of bytes between fdatasync calls
:param iter_hook: called when __iter__ returns a chunk
:param threadpool: thread pool in which to do blocking operations
:raises DiskFileCollision: on md5 collision
"""
def __init__(self, path, device, partition, account, container, obj,
logger, keep_data_fp=False, disk_chunk_size=65536,
bytes_per_sync=(512 * 1024 * 1024), iter_hook=None,
threadpool=None, obj_dir='objects', mount_check=False):
logger, disk_chunk_size=65536,
bytes_per_sync=(512 * 1024 * 1024),
iter_hook=None, threadpool=None, obj_dir='objects',
mount_check=False):
if mount_check and not check_mount(path, device):
raise DiskFileDeviceUnavailable()
self.disk_chunk_size = disk_chunk_size
@ -380,27 +377,50 @@ class DiskFile(object):
self.device_path = join(path, device)
self.tmpdir = join(path, device, 'tmp')
self.logger = logger
self._metadata = {}
self._metadata = None
self.data_file = None
self._data_file_size = None
self.fp = None
self.iter_etag = None
self.started_at_0 = False
self.read_to_eof = False
self.quarantined_dir = None
self.keep_cache = False
self.suppress_file_closing = False
self._verify_close = False
self.threadpool = threadpool or ThreadPool(nthreads=0)
# FIXME(clayg): this attribute is set after open and affects the
# behavior of the class (i.e. public interface)
self.keep_cache = False
def open(self, verify_close=False):
"""
Open the file and read the metadata.
This method must populate the _metadata attribute.
:param verify_close: force implicit close to verify_file, no effect on
explicit close.
:raises DiskFileCollision: on md5 collision
"""
data_file, meta_file, ts_file = self._get_ondisk_file()
if not data_file:
if ts_file:
self._construct_from_ts_file(ts_file)
else:
fp = self._construct_from_data_file(data_file, meta_file)
if keep_data_fp:
self.fp = fp
else:
fp.close()
self.fp = self._construct_from_data_file(data_file, meta_file)
self._verify_close = verify_close
self._metadata = self._metadata or {}
return self
def __enter__(self):
if self._metadata is None:
raise DiskFileNotOpenError()
return self
def __exit__(self, t, v, tb):
self.close(verify_file=self._verify_close)
def _get_ondisk_file(self):
"""
@ -508,6 +528,8 @@ class DiskFile(object):
def __iter__(self):
"""Returns an iterator over the data file."""
if self.fp is None:
raise DiskFileNotOpenError()
try:
dropped_cache = 0
read = 0
@ -610,6 +632,9 @@ class DiskFile(object):
finally:
self.fp.close()
self.fp = None
self._metadata = None
self._data_file_size = None
self._verify_close = False
def get_metadata(self):
"""
@ -617,6 +642,8 @@ class DiskFile(object):
:returns: object's metadata dictionary
"""
if self._metadata is None:
raise DiskFileNotOpenError()
return self._metadata
def is_deleted(self):
@ -716,13 +743,20 @@ class DiskFile(object):
:raises DiskFileError: on file size mismatch.
:raises DiskFileNotExist: on file not existing (including deleted)
"""
if self._data_file_size is None:
self._data_file_size = self._get_data_file_size()
return self._data_file_size
def _get_data_file_size(self):
# ensure file is opened
metadata = self.get_metadata()
try:
file_size = 0
if self.data_file:
file_size = self.threadpool.run_in_thread(
getsize, self.data_file)
if 'Content-Length' in self._metadata:
metadata_size = int(self._metadata['Content-Length'])
if 'Content-Length' in metadata:
metadata_size = int(metadata['Content-Length'])
if file_size != metadata_size:
raise DiskFileError(
'Content-Length of %s does not match file size '

View File

@ -298,14 +298,15 @@ class ObjectController(object):
obj)
except DiskFileDeviceUnavailable:
return HTTPInsufficientStorage(drive=device, request=request)
if disk_file.is_deleted() or disk_file.is_expired():
return HTTPNotFound(request=request)
try:
disk_file.get_data_file_size()
except (DiskFileError, DiskFileNotExist):
disk_file.quarantine()
return HTTPNotFound(request=request)
orig_metadata = disk_file.get_metadata()
with disk_file.open():
if disk_file.is_deleted() or disk_file.is_expired():
return HTTPNotFound(request=request)
try:
disk_file.get_data_file_size()
except (DiskFileError, DiskFileNotExist):
disk_file.quarantine()
return HTTPNotFound(request=request)
orig_metadata = disk_file.get_metadata()
orig_timestamp = orig_metadata.get('X-Timestamp', '0')
if orig_timestamp >= request.headers['x-timestamp']:
return HTTPConflict(request=request)
@ -355,7 +356,8 @@ class ObjectController(object):
obj)
except DiskFileDeviceUnavailable:
return HTTPInsufficientStorage(drive=device, request=request)
orig_metadata = disk_file.get_metadata()
with disk_file.open():
orig_metadata = disk_file.get_metadata()
old_delete_at = int(orig_metadata.get('X-Delete-At') or 0)
orig_timestamp = orig_metadata.get('X-Timestamp')
if orig_timestamp and orig_timestamp >= request.headers['x-timestamp']:
@ -432,9 +434,10 @@ class ObjectController(object):
split_and_validate_path(request, 5, 5, True)
try:
disk_file = self._diskfile(device, partition, account, container,
obj, keep_data_fp=True, iter_hook=sleep)
obj, iter_hook=sleep)
except DiskFileDeviceUnavailable:
return HTTPInsufficientStorage(drive=device, request=request)
disk_file.open()
if disk_file.is_deleted() or disk_file.is_expired():
if request.headers.get('if-match') == '*':
return HTTPPreconditionFailed(request=request)
@ -510,15 +513,16 @@ class ObjectController(object):
obj)
except DiskFileDeviceUnavailable:
return HTTPInsufficientStorage(drive=device, request=request)
if disk_file.is_deleted() or disk_file.is_expired():
return HTTPNotFound(request=request)
try:
file_size = disk_file.get_data_file_size()
except (DiskFileError, DiskFileNotExist):
disk_file.quarantine()
return HTTPNotFound(request=request)
with disk_file.open():
if disk_file.is_deleted() or disk_file.is_expired():
return HTTPNotFound(request=request)
try:
file_size = disk_file.get_data_file_size()
except (DiskFileError, DiskFileNotExist):
disk_file.quarantine()
return HTTPNotFound(request=request)
metadata = disk_file.get_metadata()
response = Response(request=request, conditional_response=True)
metadata = disk_file.get_metadata()
response.headers['Content-Type'] = metadata.get(
'Content-Type', 'application/octet-stream')
for key, value in metadata.iteritems():
@ -549,7 +553,10 @@ class ObjectController(object):
obj)
except DiskFileDeviceUnavailable:
return HTTPInsufficientStorage(drive=device, request=request)
orig_metadata = disk_file.get_metadata()
with disk_file.open():
orig_metadata = disk_file.get_metadata()
is_deleted = disk_file.is_deleted()
is_expired = disk_file.is_expired()
if 'x-if-delete-at' in request.headers and \
int(request.headers['x-if-delete-at']) != \
int(orig_metadata.get('X-Delete-At') or 0):
@ -562,7 +569,7 @@ class ObjectController(object):
container, obj, request, device)
orig_timestamp = orig_metadata.get('X-Timestamp', 0)
req_timestamp = request.headers['X-Timestamp']
if disk_file.is_deleted() or disk_file.is_expired():
if is_deleted or is_expired:
response_class = HTTPNotFound
else:
if orig_timestamp < req_timestamp:

View File

@ -377,14 +377,15 @@ class TestDiskFile(unittest.TestCase):
setxattr(f.fileno(), diskfile.METADATA_KEY,
pickle.dumps(md, diskfile.PICKLE_PROTOCOL))
def _create_test_file(self, data, keep_data_fp=True, timestamp=None):
def _create_test_file(self, data, timestamp=None):
df = diskfile.DiskFile(self.testdir, 'sda1', '0', 'a', 'c', 'o',
FakeLogger())
if timestamp is None:
timestamp = time()
self._create_ondisk_file(df, data, timestamp)
df = diskfile.DiskFile(self.testdir, 'sda1', '0', 'a', 'c', 'o',
FakeLogger(), keep_data_fp=keep_data_fp)
FakeLogger())
df.open()
return df
def test_get_metadata(self):
@ -397,33 +398,37 @@ class TestDiskFile(unittest.TestCase):
orig_metadata = {'X-Object-Meta-Key1': 'Value1',
'Content-Type': 'text/garbage'}
df = self._get_disk_file(ts=41, extra_metadata=orig_metadata)
self.assertEquals('1024', df._metadata['Content-Length'])
with df.open():
self.assertEquals('1024', df._metadata['Content-Length'])
# write some new metadata (fast POST, don't send orig meta, ts 42)
df = diskfile.DiskFile(self.testdir, 'sda1', '0', 'a', 'c', 'o',
FakeLogger())
df.put_metadata({'X-Timestamp': '42', 'X-Object-Meta-Key2': 'Value2'})
df = diskfile.DiskFile(self.testdir, 'sda1', '0', 'a', 'c', 'o',
FakeLogger())
# non-fast-post updateable keys are preserved
self.assertEquals('text/garbage', df._metadata['Content-Type'])
# original fast-post updateable keys are removed
self.assert_('X-Object-Meta-Key1' not in df._metadata)
# new fast-post updateable keys are added
self.assertEquals('Value2', df._metadata['X-Object-Meta-Key2'])
with df.open():
# non-fast-post updateable keys are preserved
self.assertEquals('text/garbage', df._metadata['Content-Type'])
# original fast-post updateable keys are removed
self.assert_('X-Object-Meta-Key1' not in df._metadata)
# new fast-post updateable keys are added
self.assertEquals('Value2', df._metadata['X-Object-Meta-Key2'])
def test_disk_file_app_iter_corners(self):
df = self._create_test_file('1234567890')
self.assertEquals(''.join(df.app_iter_range(0, None)), '1234567890')
df = diskfile.DiskFile(self.testdir, 'sda1', '0', 'a', 'c', 'o',
FakeLogger(), keep_data_fp=True)
self.assertEqual(''.join(df.app_iter_range(5, None)), '67890')
FakeLogger())
with df.open():
self.assertEqual(''.join(df.app_iter_range(5, None)), '67890')
def test_disk_file_app_iter_partial_closes(self):
df = self._create_test_file('1234567890')
it = df.app_iter_range(0, 5)
self.assertEqual(''.join(it), '12345')
self.assertEqual(df.fp, None)
with df.open():
it = df.app_iter_range(0, 5)
self.assertEqual(''.join(it), '12345')
self.assertEqual(df.fp, None)
def test_disk_file_app_iter_ranges(self):
df = self._create_test_file('012345678911234567892123456789')
@ -482,10 +487,11 @@ class TestDiskFile(unittest.TestCase):
self.assertEqual(''.join(it), '')
df = diskfile.DiskFile(self.testdir, 'sda1', '0', 'a', 'c', 'o',
FakeLogger(), keep_data_fp=True)
it = df.app_iter_ranges(None, 'app/something',
'\r\n--someheader\r\n', 150)
self.assertEqual(''.join(it), '')
FakeLogger())
with df.open():
it = df.app_iter_ranges(None, 'app/something',
'\r\n--someheader\r\n', 150)
self.assertEqual(''.join(it), '')
def test_disk_file_mkstemp_creates_dir(self):
tmpdir = os.path.join(self.testdir, 'sda1', 'tmp')
@ -501,8 +507,9 @@ class TestDiskFile(unittest.TestCase):
hook_call_count[0] += 1
df = self._get_disk_file(fsize=65, csize=8, iter_hook=hook)
for _ in df:
pass
with df.open():
for _ in df:
pass
self.assertEquals(hook_call_count[0], 9)
@ -525,7 +532,8 @@ class TestDiskFile(unittest.TestCase):
# have to remake the datadir and file
self._create_ondisk_file(df, '', time()) # still empty
df = diskfile.DiskFile(self.testdir, 'sda1', '0', 'a', 'c', 'o',
FakeLogger(), keep_data_fp=True)
FakeLogger())
df.open()
double_uuid_path = df.quarantine()
self.assert_(os.path.isdir(double_uuid_path))
self.assert_('-' in os.path.basename(double_uuid_path))
@ -573,10 +581,10 @@ class TestDiskFile(unittest.TestCase):
df = diskfile.DiskFile(self.testdir, 'sda1', '0', 'a', 'c',
obj_name, FakeLogger(),
keep_data_fp=True, disk_chunk_size=csize,
disk_chunk_size=csize,
iter_hook=iter_hook, mount_check=mount_check)
df.open()
if invalid_type == 'Zero-Byte':
os.remove(df.data_file)
fp = open(df.data_file, 'w')
fp.close()
df.unit_test_len = fsize
@ -694,8 +702,9 @@ class TestDiskFile(unittest.TestCase):
df = self._get_disk_file(fsize=1024 * 2)
df._handle_close_quarantine = err
for chunk in df:
pass
with df.open():
for chunk in df:
pass
# close is called at the end of the iterator
self.assertEquals(df.fp, None)
self.assertEquals(len(df.logger.log_dict['error']), 1)
@ -726,10 +735,12 @@ class TestDiskFile(unittest.TestCase):
self._create_ondisk_file(df, 'A', ext='.data', timestamp=5)
df = diskfile.DiskFile(self.testdir, 'sda1', '0', 'a', 'c', 'o',
FakeLogger())
self.assertTrue('X-Timestamp' in df._metadata)
self.assertEquals(df._metadata['X-Timestamp'], normalize_timestamp(10))
self.assertTrue('deleted' in df._metadata)
self.assertTrue(df._metadata['deleted'])
with df.open():
self.assertTrue('X-Timestamp' in df._metadata)
self.assertEquals(df._metadata['X-Timestamp'],
normalize_timestamp(10))
self.assertTrue('deleted' in df._metadata)
self.assertTrue(df._metadata['deleted'])
def test_ondisk_search_loop_meta_ts_data(self):
df = diskfile.DiskFile(self.testdir, 'sda1', '0', 'a', 'c', 'o',
@ -742,9 +753,11 @@ class TestDiskFile(unittest.TestCase):
self._create_ondisk_file(df, 'A', ext='.data', timestamp=5)
df = diskfile.DiskFile(self.testdir, 'sda1', '0', 'a', 'c', 'o',
FakeLogger())
self.assertTrue('X-Timestamp' in df._metadata)
self.assertEquals(df._metadata['X-Timestamp'], normalize_timestamp(8))
self.assertTrue('deleted' in df._metadata)
with df.open():
self.assertTrue('X-Timestamp' in df._metadata)
self.assertEquals(df._metadata['X-Timestamp'],
normalize_timestamp(8))
self.assertTrue('deleted' in df._metadata)
def test_ondisk_search_loop_meta_data_ts(self):
df = diskfile.DiskFile(self.testdir, 'sda1', '0', 'a', 'c', 'o',
@ -757,9 +770,11 @@ class TestDiskFile(unittest.TestCase):
self._create_ondisk_file(df, '', ext='.ts', timestamp=5)
df = diskfile.DiskFile(self.testdir, 'sda1', '0', 'a', 'c', 'o',
FakeLogger())
self.assertTrue('X-Timestamp' in df._metadata)
self.assertEquals(df._metadata['X-Timestamp'], normalize_timestamp(10))
self.assertTrue('deleted' not in df._metadata)
with df.open():
self.assertTrue('X-Timestamp' in df._metadata)
self.assertEquals(df._metadata['X-Timestamp'],
normalize_timestamp(10))
self.assertTrue('deleted' not in df._metadata)
def test_ondisk_search_loop_data_meta_ts(self):
df = diskfile.DiskFile(self.testdir, 'sda1', '0', 'a', 'c', 'o',
@ -772,9 +787,11 @@ class TestDiskFile(unittest.TestCase):
self._create_ondisk_file(df, '', ext='.meta', timestamp=5)
df = diskfile.DiskFile(self.testdir, 'sda1', '0', 'a', 'c', 'o',
FakeLogger())
self.assertTrue('X-Timestamp' in df._metadata)
self.assertEquals(df._metadata['X-Timestamp'], normalize_timestamp(10))
self.assertTrue('deleted' not in df._metadata)
with df.open():
self.assertTrue('X-Timestamp' in df._metadata)
self.assertEquals(df._metadata['X-Timestamp'],
normalize_timestamp(10))
self.assertTrue('deleted' not in df._metadata)
def test_ondisk_search_loop_wayward_files_ignored(self):
df = diskfile.DiskFile(self.testdir, 'sda1', '0', 'a', 'c', 'o',
@ -788,9 +805,11 @@ class TestDiskFile(unittest.TestCase):
self._create_ondisk_file(df, '', ext='.meta', timestamp=5)
df = diskfile.DiskFile(self.testdir, 'sda1', '0', 'a', 'c', 'o',
FakeLogger())
self.assertTrue('X-Timestamp' in df._metadata)
self.assertEquals(df._metadata['X-Timestamp'], normalize_timestamp(10))
self.assertTrue('deleted' not in df._metadata)
with df.open():
self.assertTrue('X-Timestamp' in df._metadata)
self.assertEquals(df._metadata['X-Timestamp'],
normalize_timestamp(10))
self.assertTrue('deleted' not in df._metadata)
def test_ondisk_search_loop_listdir_error(self):
df = diskfile.DiskFile(self.testdir, 'sda1', '0', 'a', 'c', 'o',
@ -807,8 +826,9 @@ class TestDiskFile(unittest.TestCase):
self._create_ondisk_file(df, '', ext='.ts', timestamp=7)
self._create_ondisk_file(df, '', ext='.meta', timestamp=6)
self._create_ondisk_file(df, '', ext='.meta', timestamp=5)
self.assertRaises(OSError, diskfile.DiskFile, self.testdir, 'sda1',
'0', 'a', 'c', 'o', FakeLogger())
df = diskfile.DiskFile(self.testdir, 'sda1', '0', 'a', 'c', 'o',
FakeLogger())
self.assertRaises(OSError, df.open)
def test_exception_in_handle_close_quarantine(self):
df = self._get_disk_file()

View File

@ -361,7 +361,8 @@ class TestObjectController(unittest.TestCase):
resp = req.get_response(self.object_controller)
self.assertEquals(resp.status_int, 201)
objfile = diskfile.DiskFile(self.testdir, 'sda1', 'p', 'a', 'c', 'o',
FakeLogger(), keep_data_fp=True)
FakeLogger())
objfile.open()
file_name = os.path.basename(objfile.data_file)
with open(objfile.data_file) as fp:
@ -718,7 +719,8 @@ class TestObjectController(unittest.TestCase):
resp = req.get_response(self.object_controller)
self.assertEquals(resp.status_int, 201)
objfile = diskfile.DiskFile(self.testdir, 'sda1', 'p', 'a', 'c', 'o',
FakeLogger(), keep_data_fp=True)
FakeLogger())
objfile.open()
file_name = os.path.basename(objfile.data_file)
with open(objfile.data_file) as fp:
@ -1016,7 +1018,8 @@ class TestObjectController(unittest.TestCase):
resp = req.get_response(self.object_controller)
self.assertEquals(resp.status_int, 201)
objfile = diskfile.DiskFile(self.testdir, 'sda1', 'p', 'a', 'c', 'o',
FakeLogger(), keep_data_fp=True)
FakeLogger())
objfile.open()
file_name = os.path.basename(objfile.data_file)
etag = md5()
etag.update('VERIF')
@ -1048,7 +1051,8 @@ class TestObjectController(unittest.TestCase):
resp = req.get_response(self.object_controller)
self.assertEquals(resp.status_int, 201)
objfile = diskfile.DiskFile(self.testdir, 'sda1', 'p', 'a', 'c', 'o',
FakeLogger(), keep_data_fp=True)
FakeLogger())
objfile.open()
file_name = os.path.basename(objfile.data_file)
with open(objfile.data_file) as fp:
metadata = diskfile.read_metadata(fp)
@ -1076,7 +1080,8 @@ class TestObjectController(unittest.TestCase):
resp = req.get_response(self.object_controller)
self.assertEquals(resp.status_int, 201)
objfile = diskfile.DiskFile(self.testdir, 'sda1', 'p', 'a', 'c', 'o',
FakeLogger(), keep_data_fp=True)
FakeLogger())
objfile.open()
file_name = os.path.basename(objfile.data_file)
etag = md5()
etag.update('VERIF')