Merge "Refactor DiskFile to hide temp file names and exts"

This commit is contained in:
Jenkins 2012-11-16 22:58:39 +00:00 committed by Gerrit Code Review
commit d13869e64b
3 changed files with 66 additions and 35 deletions

@ -118,6 +118,7 @@ class DiskFile(object):
path, device, storage_directory(DATADIR, partition, name_hash)) path, device, storage_directory(DATADIR, partition, name_hash))
self.device_path = os.path.join(path, device) self.device_path = os.path.join(path, device)
self.tmpdir = os.path.join(path, device, 'tmp') self.tmpdir = os.path.join(path, device, 'tmp')
self.tmppath = None
self.logger = logger self.logger = logger
self.metadata = {} self.metadata = {}
self.meta_file = None self.meta_file = None
@ -278,30 +279,31 @@ class DiskFile(object):
"""Contextmanager to make a temporary file.""" """Contextmanager to make a temporary file."""
if not os.path.exists(self.tmpdir): if not os.path.exists(self.tmpdir):
mkdirs(self.tmpdir) mkdirs(self.tmpdir)
fd, tmppath = mkstemp(dir=self.tmpdir) fd, self.tmppath = mkstemp(dir=self.tmpdir)
try: try:
yield fd, tmppath yield fd
finally: finally:
try: try:
os.close(fd) os.close(fd)
except OSError: except OSError:
pass pass
tmppath, self.tmppath = self.tmppath, None
try: try:
os.unlink(tmppath) os.unlink(tmppath)
except OSError: except OSError:
pass pass
def put(self, fd, tmppath, metadata, extension='.data'): def put(self, fd, metadata, extension='.data'):
""" """
Finalize writing the file on disk, and renames it from the temp file to Finalize writing the file on disk, and renames it from the temp file to
the real location. This should be called after the data has been the real location. This should be called after the data has been
written to the temp file. written to the temp file.
:params fd: file descriptor of the temp file :param fd: file descriptor of the temp file
:param tmppath: path to the temporary file being used
:param metadata: dictionary of metadata to be written :param metadata: dictionary of metadata to be written
:param extension: extension to be used when making the file :param extension: extension to be used when making the file
""" """
assert self.tmppath is not None
metadata['name'] = self.name metadata['name'] = self.name
timestamp = normalize_timestamp(metadata['X-Timestamp']) timestamp = normalize_timestamp(metadata['X-Timestamp'])
write_metadata(fd, metadata) write_metadata(fd, metadata)
@ -309,9 +311,21 @@ class DiskFile(object):
self.drop_cache(fd, 0, int(metadata['Content-Length'])) self.drop_cache(fd, 0, int(metadata['Content-Length']))
tpool.execute(fsync, fd) tpool.execute(fsync, fd)
invalidate_hash(os.path.dirname(self.datadir)) invalidate_hash(os.path.dirname(self.datadir))
renamer(tmppath, os.path.join(self.datadir, timestamp + extension)) renamer(self.tmppath,
os.path.join(self.datadir, timestamp + extension))
self.metadata = metadata self.metadata = metadata
def put_metadata(self, metadata, tombstone=False):
"""
Short hand for putting metadata to .meta and .ts files.
:param metadata: dictionary of metadata to be written
:param tombstone: whether or not we are writing a tombstone
"""
extension = '.ts' if tombstone else '.meta'
with self.mkstemp() as fd:
self.put(fd, metadata, extension=extension)
def unlinkold(self, timestamp): def unlinkold(self, timestamp):
""" """
Remove any older versions of the object file. Any file that has an Remove any older versions of the object file. Any file that has an
@ -565,8 +579,7 @@ class ObjectController(object):
if old_delete_at: if old_delete_at:
self.delete_at_update('DELETE', old_delete_at, account, self.delete_at_update('DELETE', old_delete_at, account,
container, obj, request.headers, device) container, obj, request.headers, device)
with file.mkstemp() as (fd, tmppath): file.put_metadata(metadata)
file.put(fd, tmppath, metadata, extension='.meta')
return HTTPAccepted(request=request) return HTTPAccepted(request=request)
@public @public
@ -600,7 +613,7 @@ class ObjectController(object):
etag = md5() etag = md5()
upload_size = 0 upload_size = 0
last_sync = 0 last_sync = 0
with file.mkstemp() as (fd, tmppath): with file.mkstemp() as fd:
if 'content-length' in request.headers: if 'content-length' in request.headers:
try: try:
fallocate(fd, int(request.headers['content-length'])) fallocate(fd, int(request.headers['content-length']))
@ -654,7 +667,7 @@ class ObjectController(object):
self.delete_at_update( self.delete_at_update(
'DELETE', old_delete_at, account, container, obj, 'DELETE', old_delete_at, account, container, obj,
request.headers, device) request.headers, device)
file.put(fd, tmppath, metadata) file.put(fd, metadata)
file.unlinkold(metadata['X-Timestamp']) file.unlinkold(metadata['X-Timestamp'])
if not orig_timestamp or \ if not orig_timestamp or \
orig_timestamp < request.headers['x-timestamp']: orig_timestamp < request.headers['x-timestamp']:
@ -821,12 +834,11 @@ class ObjectController(object):
metadata = { metadata = {
'X-Timestamp': request.headers['X-Timestamp'], 'deleted': True, 'X-Timestamp': request.headers['X-Timestamp'], 'deleted': True,
} }
with file.mkstemp() as (fd, tmppath):
old_delete_at = int(file.metadata.get('X-Delete-At') or 0) old_delete_at = int(file.metadata.get('X-Delete-At') or 0)
if old_delete_at: if old_delete_at:
self.delete_at_update('DELETE', old_delete_at, account, self.delete_at_update('DELETE', old_delete_at, account,
container, obj, request.headers, device) container, obj, request.headers, device)
file.put(fd, tmppath, metadata, extension='.ts') file.put_metadata(metadata, tombstone=True)
file.unlinkold(metadata['X-Timestamp']) file.unlinkold(metadata['X-Timestamp'])
if not orig_timestamp or \ if not orig_timestamp or \
orig_timestamp < request.headers['x-timestamp']: orig_timestamp < request.headers['x-timestamp']:

@ -64,7 +64,7 @@ class TestAuditor(unittest.TestCase):
self.auditor = auditor.AuditorWorker(self.conf) self.auditor = auditor.AuditorWorker(self.conf)
data = '0' * 1024 data = '0' * 1024
etag = md5() etag = md5()
with self.disk_file.mkstemp() as (fd, tmppath): with self.disk_file.mkstemp() as fd:
os.write(fd, data) os.write(fd, data)
etag.update(data) etag.update(data)
etag = etag.hexdigest() etag = etag.hexdigest()
@ -74,7 +74,7 @@ class TestAuditor(unittest.TestCase):
'X-Timestamp': timestamp, 'X-Timestamp': timestamp,
'Content-Length': str(os.fstat(fd).st_size), 'Content-Length': str(os.fstat(fd).st_size),
} }
self.disk_file.put(fd, tmppath, metadata) self.disk_file.put(fd, metadata)
pre_quarantines = self.auditor.quarantines pre_quarantines = self.auditor.quarantines
self.auditor.object_audit( self.auditor.object_audit(
@ -93,7 +93,7 @@ class TestAuditor(unittest.TestCase):
data = '0' * 1024 data = '0' * 1024
etag = md5() etag = md5()
timestamp = str(normalize_timestamp(time.time())) timestamp = str(normalize_timestamp(time.time()))
with self.disk_file.mkstemp() as (fd, tmppath): with self.disk_file.mkstemp() as fd:
os.write(fd, data) os.write(fd, data)
etag.update(data) etag.update(data)
etag = etag.hexdigest() etag = etag.hexdigest()
@ -102,7 +102,7 @@ class TestAuditor(unittest.TestCase):
'X-Timestamp': timestamp, 'X-Timestamp': timestamp,
'Content-Length': str(os.fstat(fd).st_size), 'Content-Length': str(os.fstat(fd).st_size),
} }
self.disk_file.put(fd, tmppath, metadata) self.disk_file.put(fd, metadata)
pre_quarantines = self.auditor.quarantines pre_quarantines = self.auditor.quarantines
# remake so it will have metadata # remake so it will have metadata
self.disk_file = DiskFile(self.devices, 'sda', '0', 'a', 'c', 'o', self.disk_file = DiskFile(self.devices, 'sda', '0', 'a', 'c', 'o',
@ -154,7 +154,7 @@ class TestAuditor(unittest.TestCase):
pre_quarantines = self.auditor.quarantines pre_quarantines = self.auditor.quarantines
data = '0' * 1024 data = '0' * 1024
etag = md5() etag = md5()
with self.disk_file.mkstemp() as (fd, tmppath): with self.disk_file.mkstemp() as fd:
os.write(fd, data) os.write(fd, data)
etag.update(data) etag.update(data)
etag = etag.hexdigest() etag = etag.hexdigest()
@ -163,7 +163,7 @@ class TestAuditor(unittest.TestCase):
'X-Timestamp': timestamp, 'X-Timestamp': timestamp,
'Content-Length': str(os.fstat(fd).st_size), 'Content-Length': str(os.fstat(fd).st_size),
} }
self.disk_file.put(fd, tmppath, metadata) self.disk_file.put(fd, metadata)
self.disk_file.close() self.disk_file.close()
self.auditor.audit_all_objects() self.auditor.audit_all_objects()
self.assertEquals(self.auditor.quarantines, pre_quarantines) self.assertEquals(self.auditor.quarantines, pre_quarantines)
@ -174,7 +174,7 @@ class TestAuditor(unittest.TestCase):
pre_quarantines = self.auditor.quarantines pre_quarantines = self.auditor.quarantines
data = '0' * 1024 data = '0' * 1024
etag = md5() etag = md5()
with self.disk_file.mkstemp() as (fd, tmppath): with self.disk_file.mkstemp() as fd:
os.write(fd, data) os.write(fd, data)
etag.update(data) etag.update(data)
etag = etag.hexdigest() etag = etag.hexdigest()
@ -183,7 +183,7 @@ class TestAuditor(unittest.TestCase):
'X-Timestamp': timestamp, 'X-Timestamp': timestamp,
'Content-Length': str(os.fstat(fd).st_size), 'Content-Length': str(os.fstat(fd).st_size),
} }
self.disk_file.put(fd, tmppath, metadata) self.disk_file.put(fd, metadata)
self.disk_file.close() self.disk_file.close()
os.write(fd, 'extra_data') os.write(fd, 'extra_data')
self.auditor.audit_all_objects() self.auditor.audit_all_objects()
@ -195,7 +195,7 @@ class TestAuditor(unittest.TestCase):
pre_quarantines = self.auditor.quarantines pre_quarantines = self.auditor.quarantines
data = '0' * 10 data = '0' * 10
etag = md5() etag = md5()
with self.disk_file.mkstemp() as (fd, tmppath): with self.disk_file.mkstemp() as fd:
os.write(fd, data) os.write(fd, data)
etag.update(data) etag.update(data)
etag = etag.hexdigest() etag = etag.hexdigest()
@ -204,14 +204,14 @@ class TestAuditor(unittest.TestCase):
'X-Timestamp': timestamp, 'X-Timestamp': timestamp,
'Content-Length': str(os.fstat(fd).st_size), 'Content-Length': str(os.fstat(fd).st_size),
} }
self.disk_file.put(fd, tmppath, metadata) self.disk_file.put(fd, metadata)
self.disk_file.close() self.disk_file.close()
self.auditor.audit_all_objects() self.auditor.audit_all_objects()
self.disk_file = DiskFile(self.devices, 'sdb', '0', 'a', 'c', self.disk_file = DiskFile(self.devices, 'sdb', '0', 'a', 'c',
'ob', self.logger) 'ob', self.logger)
data = '1' * 10 data = '1' * 10
etag = md5() etag = md5()
with self.disk_file.mkstemp() as (fd, tmppath): with self.disk_file.mkstemp() as fd:
os.write(fd, data) os.write(fd, data)
etag.update(data) etag.update(data)
etag = etag.hexdigest() etag = etag.hexdigest()
@ -220,7 +220,7 @@ class TestAuditor(unittest.TestCase):
'X-Timestamp': timestamp, 'X-Timestamp': timestamp,
'Content-Length': str(os.fstat(fd).st_size), 'Content-Length': str(os.fstat(fd).st_size),
} }
self.disk_file.put(fd, tmppath, metadata) self.disk_file.put(fd, metadata)
self.disk_file.close() self.disk_file.close()
os.write(fd, 'extra_data') os.write(fd, 'extra_data')
self.auditor.audit_all_objects() self.auditor.audit_all_objects()
@ -231,7 +231,7 @@ class TestAuditor(unittest.TestCase):
self.auditor.log_time = 0 self.auditor.log_time = 0
data = '0' * 1024 data = '0' * 1024
etag = md5() etag = md5()
with self.disk_file.mkstemp() as (fd, tmppath): with self.disk_file.mkstemp() as fd:
os.write(fd, data) os.write(fd, data)
etag.update(data) etag.update(data)
etag = etag.hexdigest() etag = etag.hexdigest()
@ -240,7 +240,7 @@ class TestAuditor(unittest.TestCase):
'X-Timestamp': str(normalize_timestamp(time.time())), 'X-Timestamp': str(normalize_timestamp(time.time())),
'Content-Length': str(os.fstat(fd).st_size), 'Content-Length': str(os.fstat(fd).st_size),
} }
self.disk_file.put(fd, tmppath, metadata) self.disk_file.put(fd, metadata)
etag = md5() etag = md5()
etag.update('1' + '0' * 1023) etag.update('1' + '0' * 1023)
etag = etag.hexdigest() etag = etag.hexdigest()
@ -270,14 +270,14 @@ class TestAuditor(unittest.TestCase):
fp.close() fp.close()
etag = md5() etag = md5()
with self.disk_file.mkstemp() as (fd, tmppath): with self.disk_file.mkstemp() as fd:
etag = etag.hexdigest() etag = etag.hexdigest()
metadata = { metadata = {
'ETag': etag, 'ETag': etag,
'X-Timestamp': str(normalize_timestamp(time.time())), 'X-Timestamp': str(normalize_timestamp(time.time())),
'Content-Length': 10, 'Content-Length': 10,
} }
self.disk_file.put(fd, tmppath, metadata) self.disk_file.put(fd, metadata)
etag = md5() etag = md5()
etag = etag.hexdigest() etag = etag.hexdigest()
metadata['ETag'] = etag metadata['ETag'] = etag

@ -216,7 +216,7 @@ class TestDiskFile(unittest.TestCase):
timestamp = ts timestamp = ts
else: else:
timestamp = str(normalize_timestamp(time())) timestamp = str(normalize_timestamp(time()))
with df.mkstemp() as (fd, tmppath): with df.mkstemp() as fd:
os.write(fd, data) os.write(fd, data)
etag.update(data) etag.update(data)
etag = etag.hexdigest() etag = etag.hexdigest()
@ -225,7 +225,7 @@ class TestDiskFile(unittest.TestCase):
'X-Timestamp': timestamp, 'X-Timestamp': timestamp,
'Content-Length': str(os.fstat(fd).st_size), 'Content-Length': str(os.fstat(fd).st_size),
} }
df.put(fd, tmppath, metadata, extension=extension) df.put(fd, metadata, extension=extension)
if invalid_type == 'ETag': if invalid_type == 'ETag':
etag = md5() etag = md5()
etag.update('1' + '0' * (fsize - 1)) etag.update('1' + '0' * (fsize - 1))
@ -316,7 +316,6 @@ class TestDiskFile(unittest.TestCase):
extension='.data') extension='.data')
df.close() df.close()
self.assertTrue(df.quarantined_dir) self.assertTrue(df.quarantined_dir)
df = self._get_data_file(invalid_type='Content-Length', df = self._get_data_file(invalid_type='Content-Length',
extension='.ts') extension='.ts')
df.close() df.close()
@ -325,6 +324,26 @@ class TestDiskFile(unittest.TestCase):
extension='.ts') extension='.ts')
self.assertRaises(DiskFileNotExist, df.get_data_file_size) self.assertRaises(DiskFileNotExist, df.get_data_file_size)
def test_put_metadata(self):
df = self._get_data_file()
ts = time()
metadata = { 'X-Timestamp': ts, 'X-Object-Meta-test': 'data' }
df.put_metadata(metadata)
exp_name = '%s.meta' % str(normalize_timestamp(ts))
dl = os.listdir(df.datadir)
self.assertEquals(len(dl), 2)
self.assertTrue(exp_name in set(dl))
def test_put_metadata_ts(self):
df = self._get_data_file()
ts = time()
metadata = { 'X-Timestamp': ts, 'X-Object-Meta-test': 'data' }
df.put_metadata(metadata, tombstone=True)
exp_name = '%s.ts' % str(normalize_timestamp(ts))
dl = os.listdir(df.datadir)
self.assertEquals(len(dl), 2)
self.assertTrue(exp_name in set(dl))
def test_unlinkold(self): def test_unlinkold(self):
df1 = self._get_data_file() df1 = self._get_data_file()
future_time = str(normalize_timestamp(time() + 100)) future_time = str(normalize_timestamp(time() + 100))