Merge "Add writer method to DiskFile"
This commit is contained in:
@@ -1619,13 +1619,14 @@ class BaseDiskFileWriter(object):
|
||||
:param next_part_power: the next partition power to be used
|
||||
"""
|
||||
|
||||
def __init__(self, name, datadir, fd, tmppath, bytes_per_sync, diskfile,
|
||||
def __init__(self, name, datadir, size, bytes_per_sync, diskfile,
|
||||
next_part_power):
|
||||
# Parameter tracking
|
||||
self._name = name
|
||||
self._datadir = datadir
|
||||
self._fd = fd
|
||||
self._tmppath = tmppath
|
||||
self._fd = None
|
||||
self._tmppath = None
|
||||
self._size = size
|
||||
self._bytes_per_sync = bytes_per_sync
|
||||
self._diskfile = diskfile
|
||||
self.next_part_power = next_part_power
|
||||
@@ -1641,8 +1642,71 @@ class BaseDiskFileWriter(object):
|
||||
return self._diskfile.manager
|
||||
|
||||
@property
|
||||
def put_succeeded(self):
|
||||
return self._put_succeeded
|
||||
def logger(self):
|
||||
try:
|
||||
return self._logger
|
||||
except AttributeError:
|
||||
self._logger = self.manager.logger
|
||||
return self._logger
|
||||
|
||||
def _get_tempfile(self):
|
||||
fallback_to_mkstemp = False
|
||||
tmppath = None
|
||||
if self.manager.use_linkat:
|
||||
self._dirs_created = makedirs_count(self._datadir)
|
||||
try:
|
||||
fd = os.open(self._datadir, O_TMPFILE | os.O_WRONLY)
|
||||
except OSError as err:
|
||||
if err.errno in (errno.EOPNOTSUPP, errno.EISDIR, errno.EINVAL):
|
||||
msg = 'open(%s, O_TMPFILE | O_WRONLY) failed: %s \
|
||||
Falling back to using mkstemp()' \
|
||||
% (self._datadir, os.strerror(err.errno))
|
||||
self.logger.warning(msg)
|
||||
fallback_to_mkstemp = True
|
||||
else:
|
||||
raise
|
||||
if not self.manager.use_linkat or fallback_to_mkstemp:
|
||||
tmpdir = join(self._diskfile._device_path,
|
||||
get_tmp_dir(self._diskfile.policy))
|
||||
if not exists(tmpdir):
|
||||
mkdirs(tmpdir)
|
||||
fd, tmppath = mkstemp(dir=tmpdir)
|
||||
return fd, tmppath
|
||||
|
||||
def open(self):
|
||||
try:
|
||||
self._fd, self._tmppath = self._get_tempfile()
|
||||
except OSError as err:
|
||||
if err.errno in (errno.ENOSPC, errno.EDQUOT):
|
||||
# No more inodes in filesystem
|
||||
raise DiskFileNoSpace()
|
||||
raise
|
||||
if self._size is not None and self._size > 0:
|
||||
try:
|
||||
fallocate(self._fd, self._size)
|
||||
except OSError as err:
|
||||
if err.errno in (errno.ENOSPC, errno.EDQUOT):
|
||||
raise DiskFileNoSpace()
|
||||
raise
|
||||
return self
|
||||
|
||||
def close(self):
|
||||
if self._fd:
|
||||
try:
|
||||
os.close(self._fd)
|
||||
except OSError:
|
||||
pass
|
||||
if self._tmppath and not self._put_succeeded:
|
||||
# Try removing the temp file only if put did NOT succeed.
|
||||
#
|
||||
# dfw.put_succeeded is set to True after renamer() succeeds in
|
||||
# DiskFileWriter._finalize_put()
|
||||
try:
|
||||
# when mkstemp() was used
|
||||
os.unlink(self._tmppath)
|
||||
except OSError:
|
||||
self.logger.exception('Error removing tempfile: %s' %
|
||||
self._tmppath)
|
||||
|
||||
def write(self, chunk):
|
||||
"""
|
||||
@@ -2148,7 +2212,7 @@ class BaseDiskFile(object):
|
||||
def __init__(self, mgr, device_path, partition,
|
||||
account=None, container=None, obj=None, _datadir=None,
|
||||
policy=None, use_splice=False, pipe_size=None,
|
||||
use_linkat=False, open_expired=False, next_part_power=None,
|
||||
open_expired=False, next_part_power=None,
|
||||
**kwargs):
|
||||
self._manager = mgr
|
||||
self._device_path = device_path
|
||||
@@ -2157,7 +2221,6 @@ class BaseDiskFile(object):
|
||||
self._bytes_per_sync = mgr.bytes_per_sync
|
||||
self._use_splice = use_splice
|
||||
self._pipe_size = pipe_size
|
||||
self._use_linkat = use_linkat
|
||||
self._open_expired = open_expired
|
||||
# This might look a lttle hacky i.e tracking number of newly created
|
||||
# dirs to fsync only those many later. If there is a better way,
|
||||
@@ -2687,27 +2750,10 @@ class BaseDiskFile(object):
|
||||
self._fp = None
|
||||
return dr
|
||||
|
||||
def _get_tempfile(self):
|
||||
fallback_to_mkstemp = False
|
||||
tmppath = None
|
||||
if self._use_linkat:
|
||||
self._dirs_created = makedirs_count(self._datadir)
|
||||
try:
|
||||
fd = os.open(self._datadir, O_TMPFILE | os.O_WRONLY)
|
||||
except OSError as err:
|
||||
if err.errno in (errno.EOPNOTSUPP, errno.EISDIR, errno.EINVAL):
|
||||
msg = 'open(%s, O_TMPFILE | O_WRONLY) failed: %s \
|
||||
Falling back to using mkstemp()' \
|
||||
% (self._datadir, os.strerror(err.errno))
|
||||
self._logger.warning(msg)
|
||||
fallback_to_mkstemp = True
|
||||
else:
|
||||
raise
|
||||
if not self._use_linkat or fallback_to_mkstemp:
|
||||
if not exists(self._tmpdir):
|
||||
mkdirs(self._tmpdir)
|
||||
fd, tmppath = mkstemp(dir=self._tmpdir)
|
||||
return fd, tmppath
|
||||
def writer(self, size=None):
|
||||
return self.writer_cls(self._name, self._datadir, size,
|
||||
self._bytes_per_sync, self,
|
||||
self.next_part_power)
|
||||
|
||||
@contextmanager
|
||||
def create(self, size=None):
|
||||
@@ -2725,44 +2771,11 @@ class BaseDiskFile(object):
|
||||
disk
|
||||
:raises DiskFileNoSpace: if a size is specified and allocation fails
|
||||
"""
|
||||
dfw = self.writer(size)
|
||||
try:
|
||||
fd, tmppath = self._get_tempfile()
|
||||
except OSError as err:
|
||||
if err.errno in (errno.ENOSPC, errno.EDQUOT):
|
||||
# No more inodes in filesystem
|
||||
raise DiskFileNoSpace()
|
||||
raise
|
||||
dfw = None
|
||||
try:
|
||||
if size is not None and size > 0:
|
||||
try:
|
||||
fallocate(fd, size)
|
||||
except OSError as err:
|
||||
if err.errno in (errno.ENOSPC, errno.EDQUOT):
|
||||
raise DiskFileNoSpace()
|
||||
raise
|
||||
dfw = self.writer_cls(self._name, self._datadir, fd, tmppath,
|
||||
bytes_per_sync=self._bytes_per_sync,
|
||||
diskfile=self,
|
||||
next_part_power=self.next_part_power)
|
||||
yield dfw
|
||||
yield dfw.open()
|
||||
finally:
|
||||
try:
|
||||
os.close(fd)
|
||||
except OSError:
|
||||
pass
|
||||
if (dfw is None) or (not dfw.put_succeeded):
|
||||
# Try removing the temp file only if put did NOT succeed.
|
||||
#
|
||||
# dfw.put_succeeded is set to True after renamer() succeeds in
|
||||
# DiskFileWriter._finalize_put()
|
||||
try:
|
||||
if tmppath:
|
||||
# when mkstemp() was used
|
||||
os.unlink(tmppath)
|
||||
except OSError:
|
||||
self._logger.exception('Error removing tempfile: %s' %
|
||||
tmppath)
|
||||
dfw.close()
|
||||
|
||||
def write_metadata(self, metadata):
|
||||
"""
|
||||
|
||||
@@ -99,12 +99,19 @@ class DiskFileWriter(object):
|
||||
:param name: standard object name
|
||||
:param fp: `StringIO` in-memory representation object
|
||||
"""
|
||||
def __init__(self, fs, name, fp):
|
||||
def __init__(self, fs, name):
|
||||
self._filesystem = fs
|
||||
self._name = name
|
||||
self._fp = fp
|
||||
self._fp = None
|
||||
self._upload_size = 0
|
||||
|
||||
def open(self):
|
||||
self._fp = moves.cStringIO()
|
||||
return self
|
||||
|
||||
def close(self):
|
||||
self._fp = None
|
||||
|
||||
def write(self, chunk):
|
||||
"""
|
||||
Write a chunk of data into the `StringIO` object.
|
||||
@@ -413,6 +420,9 @@ class DiskFile(object):
|
||||
self._fp = None
|
||||
return dr
|
||||
|
||||
def writer(self, size=None):
|
||||
return DiskFileWriter(self._filesystem, self._name)
|
||||
|
||||
@contextmanager
|
||||
def create(self, size=None):
|
||||
"""
|
||||
@@ -423,11 +433,11 @@ class DiskFile(object):
|
||||
disk
|
||||
:raises DiskFileNoSpace: if a size is specified and allocation fails
|
||||
"""
|
||||
fp = moves.cStringIO()
|
||||
writer = self.writer(size)
|
||||
try:
|
||||
yield DiskFileWriter(self._filesystem, self._name, fp)
|
||||
yield writer.open()
|
||||
finally:
|
||||
del fp
|
||||
writer.close()
|
||||
|
||||
def write_metadata(self, metadata):
|
||||
"""
|
||||
|
||||
@@ -3533,7 +3533,7 @@ class DiskFileMixin(BaseDiskFileTestMixin):
|
||||
diskfile.get_tmp_dir(policy))
|
||||
os.rmdir(tmpdir)
|
||||
df = self._simple_get_diskfile(policy=policy)
|
||||
df._use_linkat = False
|
||||
df.manager.use_linkat = False
|
||||
with df.create():
|
||||
self.assertTrue(os.path.exists(tmpdir))
|
||||
|
||||
@@ -3902,7 +3902,7 @@ class DiskFileMixin(BaseDiskFileTestMixin):
|
||||
def test_create_mkstemp_no_space(self):
|
||||
df = self.df_mgr.get_diskfile(self.existing_device, '0', 'abc', '123',
|
||||
'xyz', policy=POLICIES.legacy)
|
||||
df._use_linkat = False
|
||||
df.manager.use_linkat = False
|
||||
for e in (errno.ENOSPC, errno.EDQUOT):
|
||||
with mock.patch("swift.obj.diskfile.mkstemp",
|
||||
mock.MagicMock(side_effect=OSError(
|
||||
@@ -4958,7 +4958,7 @@ class DiskFileMixin(BaseDiskFileTestMixin):
|
||||
# Test cleanup when DiskFileNoSpace() is raised.
|
||||
df = self.df_mgr.get_diskfile(self.existing_device, '0', 'abc', '123',
|
||||
'xyz', policy=POLICIES.legacy)
|
||||
df._use_linkat = False
|
||||
df.manager.use_linkat = False
|
||||
_m_fallocate = mock.MagicMock(side_effect=OSError(errno.ENOSPC,
|
||||
os.strerror(errno.ENOSPC)))
|
||||
_m_unlink = mock.Mock()
|
||||
@@ -4981,7 +4981,7 @@ class DiskFileMixin(BaseDiskFileTestMixin):
|
||||
os.strerror(errno.ENOENT)))
|
||||
_m_unlink = mock.Mock()
|
||||
df = self._simple_get_diskfile()
|
||||
df._use_linkat = False
|
||||
df.manager.use_linkat = False
|
||||
data = '0' * 100
|
||||
metadata = {
|
||||
'ETag': md5(data).hexdigest(),
|
||||
@@ -4998,7 +4998,7 @@ class DiskFileMixin(BaseDiskFileTestMixin):
|
||||
pass
|
||||
else:
|
||||
self.fail("Expected OSError exception")
|
||||
self.assertFalse(writer.put_succeeded)
|
||||
self.assertFalse(writer._put_succeeded)
|
||||
self.assertTrue(_m_renamer.called)
|
||||
self.assertTrue(_m_unlink.called)
|
||||
self.assertNotIn('error', self.logger.all_log_lines())
|
||||
@@ -5007,7 +5007,7 @@ class DiskFileMixin(BaseDiskFileTestMixin):
|
||||
# Test logging of os.unlink() failures.
|
||||
df = self.df_mgr.get_diskfile(self.existing_device, '0', 'abc', '123',
|
||||
'xyz', policy=POLICIES.legacy)
|
||||
df._use_linkat = False
|
||||
df.manager.use_linkat = False
|
||||
_m_fallocate = mock.MagicMock(side_effect=OSError(errno.ENOSPC,
|
||||
os.strerror(errno.ENOSPC)))
|
||||
_m_unlink = mock.MagicMock(side_effect=OSError(errno.ENOENT,
|
||||
@@ -5030,14 +5030,15 @@ class DiskFileMixin(BaseDiskFileTestMixin):
|
||||
@requires_o_tmpfile_support
|
||||
def test_get_tempfile_use_linkat_os_open_called(self):
|
||||
df = self._simple_get_diskfile()
|
||||
self.assertTrue(df._use_linkat)
|
||||
self.assertTrue(df.manager.use_linkat)
|
||||
_m_mkstemp = mock.MagicMock()
|
||||
_m_os_open = mock.Mock(return_value=12345)
|
||||
_m_mkc = mock.Mock()
|
||||
with mock.patch("swift.obj.diskfile.mkstemp", _m_mkstemp):
|
||||
with mock.patch("swift.obj.diskfile.os.open", _m_os_open):
|
||||
with mock.patch("swift.obj.diskfile.makedirs_count", _m_mkc):
|
||||
fd, tmppath = df._get_tempfile()
|
||||
writer = df.writer()
|
||||
fd, tmppath = writer._get_tempfile()
|
||||
self.assertTrue(_m_mkc.called)
|
||||
flags = O_TMPFILE | os.O_WRONLY
|
||||
_m_os_open.assert_called_once_with(df._datadir, flags)
|
||||
@@ -5049,7 +5050,7 @@ class DiskFileMixin(BaseDiskFileTestMixin):
|
||||
def test_get_tempfile_fallback_to_mkstemp(self):
|
||||
df = self._simple_get_diskfile()
|
||||
df._logger = debug_logger()
|
||||
self.assertTrue(df._use_linkat)
|
||||
self.assertTrue(df.manager.use_linkat)
|
||||
for err in (errno.EOPNOTSUPP, errno.EISDIR, errno.EINVAL):
|
||||
_m_open = mock.Mock(side_effect=OSError(err, os.strerror(err)))
|
||||
_m_mkstemp = mock.MagicMock(return_value=(0, "blah"))
|
||||
@@ -5058,14 +5059,15 @@ class DiskFileMixin(BaseDiskFileTestMixin):
|
||||
with mock.patch("swift.obj.diskfile.mkstemp", _m_mkstemp):
|
||||
with mock.patch("swift.obj.diskfile.makedirs_count",
|
||||
_m_mkc):
|
||||
fd, tmppath = df._get_tempfile()
|
||||
writer = df.writer()
|
||||
fd, tmppath = writer._get_tempfile()
|
||||
self.assertTrue(_m_mkc.called)
|
||||
# Fallback should succeed and mkstemp() should be called.
|
||||
self.assertTrue(_m_mkstemp.called)
|
||||
self.assertEqual(tmppath, "blah")
|
||||
# Despite fs not supporting O_TMPFILE, use_linkat should not change
|
||||
self.assertTrue(df._use_linkat)
|
||||
log = df._logger.get_lines_for_level('warning')
|
||||
self.assertTrue(df.manager.use_linkat)
|
||||
log = df.manager.logger.get_lines_for_level('warning')
|
||||
self.assertGreater(len(log), 0)
|
||||
self.assertTrue('O_TMPFILE' in log[-1])
|
||||
|
||||
@@ -5080,7 +5082,8 @@ class DiskFileMixin(BaseDiskFileTestMixin):
|
||||
with mock.patch("swift.obj.diskfile.mkstemp", _m_mkstemp):
|
||||
with mock.patch("swift.obj.diskfile.makedirs_count", _m_mkc):
|
||||
try:
|
||||
fd, tmppath = df._get_tempfile()
|
||||
writer = df.writer()
|
||||
fd, tmppath = writer._get_tempfile()
|
||||
except OSError as err:
|
||||
self.assertEqual(err.errno, errno.ENOSPC)
|
||||
else:
|
||||
@@ -5103,7 +5106,7 @@ class DiskFileMixin(BaseDiskFileTestMixin):
|
||||
with df.create(size=100) as writer:
|
||||
writer.write(data)
|
||||
writer.put(metadata)
|
||||
self.assertTrue(writer.put_succeeded)
|
||||
self.assertTrue(writer._put_succeeded)
|
||||
|
||||
self.assertFalse(_m_renamer.called)
|
||||
|
||||
|
||||
Reference in New Issue
Block a user