Make object creation more atomic in Linux

Linux 3.11 introduced O_TMPFILE as a flag to open() sys call. This would
enable users to get a fd to an unnamed temporary file. As it's unnamed,
it does not require the caller to devise unique names. It is also not
accessible through any path. Hence, file creation is race-free.

This file is initially unreachable. It is then populated with data(write),
metadata(fsetxattr) and fsync'd before being atomically linked into the
filesystem in a fully formed state using linkat() sys call. Only after a
successful linkat() will the object file will be available for reference.

Caveats
* Unlike os.rename(), linkat() cannot overwrite destination path if it
  already exists. If path exists, we unlink and try again.
* XFS support for O_TMPFILE was only added in Linux 3.15.
* If client disconnects during object upload, although there is no
  incomplete/stale file on disk, the object directory would persist
  and is not cleaned up immediately.

Change-Id: I8402439fab3aba5d7af449b5e465f89332f606ec
Signed-off-by: Prashanth Pai <ppai@redhat.com>
This commit is contained in:
Prashanth Pai 2015-03-05 18:18:25 +05:30
parent 075b081cb1
commit 773edb4a5d
8 changed files with 478 additions and 19 deletions

76
swift/common/linkat.py Normal file
View File

@ -0,0 +1,76 @@
# Copyright (c) 2016 OpenStack Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import ctypes
from ctypes.util import find_library
__all__ = ['linkat']
class Linkat(object):
# From include/uapi/linux/fcntl.h
AT_FDCWD = -100
AT_SYMLINK_FOLLOW = 0x400
__slots__ = '_c_linkat'
def __init__(self):
libc = ctypes.CDLL(find_library('c'), use_errno=True)
try:
c_linkat = libc.linkat
except AttributeError:
self._c_linkat = None
return
c_linkat.argtypes = [ctypes.c_int, ctypes.c_char_p,
ctypes.c_int, ctypes.c_char_p,
ctypes.c_int]
c_linkat.restype = ctypes.c_int
def errcheck(result, func, arguments):
if result == -1:
errno = ctypes.set_errno(0)
raise IOError(errno, 'linkat: %s' % os.strerror(errno))
else:
return result
c_linkat.errcheck = errcheck
self._c_linkat = c_linkat
@property
def available(self):
return self._c_linkat is not None
def __call__(self, olddirfd, oldpath, newdirfd, newpath, flags):
"""
linkat() creates a new link (also known as a hard link)
to an existing file.
See `man 2 linkat` for more info.
"""
if not self.available:
raise EnvironmentError('linkat not available')
if not isinstance(olddirfd, int) or not isinstance(newdirfd, int):
raise TypeError("fd must be an integer.")
return self._c_linkat(olddirfd, oldpath, newdirfd, newpath, flags)
linkat = Linkat()
del Linkat

View File

@ -33,6 +33,7 @@ import uuid
import functools import functools
import platform import platform
import email.parser import email.parser
from distutils.version import LooseVersion
from hashlib import md5, sha1 from hashlib import md5, sha1
from random import random, shuffle from random import random, shuffle
from contextlib import contextmanager, closing from contextlib import contextmanager, closing
@ -69,6 +70,7 @@ import swift.common.exceptions
from swift.common.http import is_success, is_redirection, HTTP_NOT_FOUND, \ from swift.common.http import is_success, is_redirection, HTTP_NOT_FOUND, \
HTTP_PRECONDITION_FAILED, HTTP_REQUESTED_RANGE_NOT_SATISFIABLE HTTP_PRECONDITION_FAILED, HTTP_REQUESTED_RANGE_NOT_SATISFIABLE
from swift.common.header_key_dict import HeaderKeyDict from swift.common.header_key_dict import HeaderKeyDict
from swift.common.linkat import linkat
if six.PY3: if six.PY3:
stdlib_queue = eventlet.patcher.original('queue') stdlib_queue = eventlet.patcher.original('queue')
@ -163,9 +165,10 @@ SWIFT_CONF_FILE = '/etc/swift/swift.conf'
# These constants are Linux-specific, and Python doesn't seem to know # These constants are Linux-specific, and Python doesn't seem to know
# about them. We ask anyway just in case that ever gets fixed. # about them. We ask anyway just in case that ever gets fixed.
# #
# The values were copied from the Linux 3.0 kernel headers. # The values were copied from the Linux 3.x kernel headers.
AF_ALG = getattr(socket, 'AF_ALG', 38) AF_ALG = getattr(socket, 'AF_ALG', 38)
F_SETPIPE_SZ = getattr(fcntl, 'F_SETPIPE_SZ', 1031) F_SETPIPE_SZ = getattr(fcntl, 'F_SETPIPE_SZ', 1031)
O_TMPFILE = getattr(os, 'O_TMPFILE', 0o20000000 | os.O_DIRECTORY)
# Used by the parse_socket_string() function to validate IPv6 addresses # Used by the parse_socket_string() function to validate IPv6 addresses
IPV6_RE = re.compile("^\[(?P<address>.*)\](:(?P<port>[0-9]+))?$") IPV6_RE = re.compile("^\[(?P<address>.*)\](:(?P<port>[0-9]+))?$")
@ -1196,6 +1199,47 @@ def renamer(old, new, fsync=True):
dirpath = os.path.dirname(dirpath) dirpath = os.path.dirname(dirpath)
def link_fd_to_path(fd, target_path, dirs_created=0, retries=2, fsync=True):
"""
Creates a link to file descriptor at target_path specified. This method
does not close the fd for you. Unlike rename, as linkat() cannot
overwrite target_path if it exists, we unlink and try again.
Attempts to fix / hide race conditions like empty object directories
being removed by backend processes during uploads, by retrying.
:param fd: File descriptor to be linked
:param target_path: Path in filesystem where fd is to be linked
:param dirs_created: Number of newly created directories that needs to
be fsync'd.
:param retries: number of retries to make
:param fsync: fsync on containing directory of target_path and also all
the newly created directories.
"""
dirpath = os.path.dirname(target_path)
for _junk in range(0, retries):
try:
linkat(linkat.AT_FDCWD, "/proc/self/fd/%d" % (fd),
linkat.AT_FDCWD, target_path, linkat.AT_SYMLINK_FOLLOW)
break
except IOError as err:
if err.errno == errno.ENOENT:
dirs_created = makedirs_count(dirpath)
elif err.errno == errno.EEXIST:
try:
os.unlink(target_path)
except OSError as e:
if e.errno != errno.ENOENT:
raise
else:
raise
if fsync:
for i in range(0, dirs_created + 1):
fsync_dir(dirpath)
dirpath = os.path.dirname(dirpath)
def split_path(path, minsegs=1, maxsegs=None, rest_with_last=False): def split_path(path, minsegs=1, maxsegs=None, rest_with_last=False):
""" """
Validate and split the given HTTP request path. Validate and split the given HTTP request path.
@ -2404,6 +2448,7 @@ def write_pickle(obj, dest, tmp=None, pickle_protocol=0):
""" """
if tmp is None: if tmp is None:
tmp = os.path.dirname(dest) tmp = os.path.dirname(dest)
mkdirs(tmp)
fd, tmppath = mkstemp(dir=tmp, suffix='.tmp') fd, tmppath = mkstemp(dir=tmp, suffix='.tmp')
with os.fdopen(fd, 'wb') as fo: with os.fdopen(fd, 'wb') as fo:
pickle.dump(obj, fo, pickle_protocol) pickle.dump(obj, fo, pickle_protocol)
@ -4027,3 +4072,19 @@ def modify_priority(conf, logger):
return return
io_priority = conf.get("ionice_priority", 0) io_priority = conf.get("ionice_priority", 0)
_ioprio_set(io_class, io_priority) _ioprio_set(io_class, io_priority)
def o_tmpfile_supported():
"""
Returns True if O_TMPFILE flag is supported.
O_TMPFILE was introduced in Linux 3.11 but it also requires support from
underlying filesystem being used. Some common filesystems and linux
versions in which those filesystems added support for O_TMPFILE:
xfs (3.15)
ext4 (3.11)
btrfs (3.16)
"""
return all([linkat.available,
platform.system() == 'Linux',
LooseVersion(platform.release()) >= LooseVersion('3.16')])

View File

@ -59,7 +59,8 @@ from swift.common.utils import mkdirs, Timestamp, \
fsync_dir, drop_buffer_cache, lock_path, write_pickle, \ fsync_dir, drop_buffer_cache, lock_path, write_pickle, \
config_true_value, listdir, split_path, ismount, remove_file, \ config_true_value, listdir, split_path, ismount, remove_file, \
get_md5_socket, F_SETPIPE_SZ, decode_timestamps, encode_timestamps, \ get_md5_socket, F_SETPIPE_SZ, decode_timestamps, encode_timestamps, \
tpool_reraise, MD5_OF_EMPTY_STRING tpool_reraise, MD5_OF_EMPTY_STRING, link_fd_to_path, o_tmpfile_supported, \
O_TMPFILE, makedirs_count
from swift.common.splice import splice, tee from swift.common.splice import splice, tee
from swift.common.exceptions import DiskFileQuarantined, DiskFileNotExist, \ from swift.common.exceptions import DiskFileQuarantined, DiskFileNotExist, \
DiskFileCollision, DiskFileNoSpace, DiskFileDeviceUnavailable, \ DiskFileCollision, DiskFileNoSpace, DiskFileDeviceUnavailable, \
@ -573,6 +574,7 @@ class BaseDiskFileManager(object):
with open('/proc/sys/fs/pipe-max-size') as f: with open('/proc/sys/fs/pipe-max-size') as f:
max_pipe_size = int(f.read()) max_pipe_size = int(f.read())
self.pipe_size = min(max_pipe_size, self.disk_chunk_size) self.pipe_size = min(max_pipe_size, self.disk_chunk_size)
self.use_linkat = o_tmpfile_supported()
def make_on_disk_filename(self, timestamp, ext=None, def make_on_disk_filename(self, timestamp, ext=None,
ctype_timestamp=None, *a, **kw): ctype_timestamp=None, *a, **kw):
@ -1150,7 +1152,8 @@ class BaseDiskFileManager(object):
return self.diskfile_cls(self, dev_path, return self.diskfile_cls(self, dev_path,
partition, account, container, obj, partition, account, container, obj,
policy=policy, use_splice=self.use_splice, policy=policy, use_splice=self.use_splice,
pipe_size=self.pipe_size, **kwargs) pipe_size=self.pipe_size,
use_linkat=self.use_linkat, **kwargs)
def object_audit_location_generator(self, device_dirs=None, def object_audit_location_generator(self, device_dirs=None,
auditor_type="ALL"): auditor_type="ALL"):
@ -1441,9 +1444,15 @@ class BaseDiskFileWriter(object):
# clean). # clean).
drop_buffer_cache(self._fd, 0, self._upload_size) drop_buffer_cache(self._fd, 0, self._upload_size)
self.manager.invalidate_hash(dirname(self._datadir)) self.manager.invalidate_hash(dirname(self._datadir))
# After the rename completes, this object will be available for other # After the rename/linkat completes, this object will be available for
# requests to reference. # requests to reference.
renamer(self._tmppath, target_path) if self._tmppath:
# It was a named temp file created by mkstemp()
renamer(self._tmppath, target_path)
else:
# It was an unnamed temp file created by open() with O_TMPFILE
link_fd_to_path(self._fd, target_path,
self._diskfile._dirs_created)
# If rename is successful, flag put as succeeded. This is done to avoid # If rename is successful, flag put as succeeded. This is done to avoid
# unnecessary os.unlink() of tempfile later. As renamer() has # unnecessary os.unlink() of tempfile later. As renamer() has
# succeeded, the tempfile would no longer exist at its original path. # succeeded, the tempfile would no longer exist at its original path.
@ -1829,13 +1838,15 @@ class BaseDiskFile(object):
:param policy: the StoragePolicy instance :param policy: the StoragePolicy instance
:param use_splice: if true, use zero-copy splice() to send data :param use_splice: if true, use zero-copy splice() to send data
:param pipe_size: size of pipe buffer used in zero-copy operations :param pipe_size: size of pipe buffer used in zero-copy operations
:param use_linkat: if True, use open() with linkat() to create obj file
""" """
reader_cls = None # must be set by subclasses reader_cls = None # must be set by subclasses
writer_cls = None # must be set by subclasses writer_cls = None # must be set by subclasses
def __init__(self, mgr, device_path, partition, def __init__(self, mgr, device_path, partition,
account=None, container=None, obj=None, _datadir=None, account=None, container=None, obj=None, _datadir=None,
policy=None, use_splice=False, pipe_size=None, **kwargs): policy=None, use_splice=False, pipe_size=None,
use_linkat=False, **kwargs):
self._manager = mgr self._manager = mgr
self._device_path = device_path self._device_path = device_path
self._logger = mgr.logger self._logger = mgr.logger
@ -1843,6 +1854,14 @@ class BaseDiskFile(object):
self._bytes_per_sync = mgr.bytes_per_sync self._bytes_per_sync = mgr.bytes_per_sync
self._use_splice = use_splice self._use_splice = use_splice
self._pipe_size = pipe_size self._pipe_size = pipe_size
self._use_linkat = use_linkat
# This might look a lttle hacky i.e tracking number of newly created
# dirs to fsync only those many later. If there is a better way,
# please suggest.
# Or one could consider getting rid of doing fsyncs on dirs altogether
# and mounting XFS with the 'dirsync' mount option which should result
# in all entry fops being carried out synchronously.
self._dirs_created = 0
self.policy = policy self.policy = policy
if account and container and obj: if account and container and obj:
self._name = '/' + '/'.join((account, container, obj)) self._name = '/' + '/'.join((account, container, obj))
@ -2337,6 +2356,28 @@ class BaseDiskFile(object):
self._fp = None self._fp = None
return dr return dr
def _get_tempfile(self):
fallback_to_mkstemp = False
tmppath = None
if self._use_linkat:
self._dirs_created = makedirs_count(self._datadir)
try:
fd = os.open(self._datadir, O_TMPFILE | os.O_WRONLY)
except OSError as err:
if err.errno in (errno.EOPNOTSUPP, errno.EISDIR, errno.EINVAL):
msg = 'open(%s, O_TMPFILE | O_WRONLY) failed: %s \
Falling back to using mkstemp()' \
% (self._datadir, os.strerror(err.errno))
self._logger.warning(msg)
fallback_to_mkstemp = True
else:
raise
if not self._use_linkat or fallback_to_mkstemp:
if not exists(self._tmpdir):
mkdirs(self._tmpdir)
fd, tmppath = mkstemp(dir=self._tmpdir)
return fd, tmppath
@contextmanager @contextmanager
def create(self, size=None): def create(self, size=None):
""" """
@ -2353,10 +2394,8 @@ class BaseDiskFile(object):
disk disk
:raises DiskFileNoSpace: if a size is specified and allocation fails :raises DiskFileNoSpace: if a size is specified and allocation fails
""" """
if not exists(self._tmpdir):
mkdirs(self._tmpdir)
try: try:
fd, tmppath = mkstemp(dir=self._tmpdir) fd, tmppath = self._get_tempfile()
except OSError as err: except OSError as err:
if err.errno in (errno.ENOSPC, errno.EDQUOT): if err.errno in (errno.ENOSPC, errno.EDQUOT):
# No more inodes in filesystem # No more inodes in filesystem
@ -2386,7 +2425,9 @@ class BaseDiskFile(object):
# dfw.put_succeeded is set to True after renamer() succeeds in # dfw.put_succeeded is set to True after renamer() succeeds in
# DiskFileWriter._finalize_put() # DiskFileWriter._finalize_put()
try: try:
os.unlink(tmppath) if tmppath:
# when mkstemp() was used
os.unlink(tmppath)
except OSError: except OSError:
self._logger.exception('Error removing tempfile: %s' % self._logger.exception('Error removing tempfile: %s' %
tmppath) tmppath)

View File

@ -53,6 +53,7 @@ import six.moves.cPickle as pickle
from gzip import GzipFile from gzip import GzipFile
import mock as mocklib import mock as mocklib
import inspect import inspect
from nose import SkipTest
EMPTY_ETAG = md5().hexdigest() EMPTY_ETAG = md5().hexdigest()
@ -1079,3 +1080,12 @@ class Timeout(object):
class TimeoutException(Exception): class TimeoutException(Exception):
pass pass
raise TimeoutException raise TimeoutException
def requires_o_tmpfile_support(func):
@functools.wraps(func)
def wrapper(*args, **kwargs):
if not utils.o_tmpfile_supported():
raise SkipTest('Requires O_TMPFILE support')
return func(*args, **kwargs)
return wrapper

View File

@ -0,0 +1,103 @@
# Copyright (c) 2016 OpenStack Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
'''Tests for `swift.common.linkat`'''
import ctypes
import unittest
import os
import mock
from uuid import uuid4
from swift.common.linkat import linkat
from swift.common.utils import O_TMPFILE
from test.unit import requires_o_tmpfile_support
class TestLinkat(unittest.TestCase):
def test_flags(self):
self.assertTrue(hasattr(linkat, 'AT_FDCWD'))
self.assertTrue(hasattr(linkat, 'AT_SYMLINK_FOLLOW'))
@mock.patch('swift.common.linkat.linkat._c_linkat', None)
def test_available(self):
self.assertFalse(linkat.available)
@requires_o_tmpfile_support
def test_errno(self):
with open('/dev/null', 'r') as fd:
self.assertRaises(IOError, linkat,
linkat.AT_FDCWD, "/proc/self/fd/%s" % (fd),
linkat.AT_FDCWD, "/tmp/testlinkat",
linkat.AT_SYMLINK_FOLLOW)
self.assertEqual(ctypes.get_errno(), 0)
@mock.patch('swift.common.linkat.linkat._c_linkat', None)
def test_unavailable(self):
self.assertRaises(EnvironmentError, linkat, 0, None, 0, None, 0)
def test_unavailable_in_libc(self):
class LibC(object):
def __init__(self):
self.linkat_retrieved = False
@property
def linkat(self):
self.linkat_retrieved = True
raise AttributeError
libc = LibC()
mock_cdll = mock.Mock(return_value=libc)
with mock.patch('ctypes.CDLL', new=mock_cdll):
# Force re-construction of a `Linkat` instance
# Something you're not supposed to do in actual code
new_linkat = type(linkat)()
self.assertFalse(new_linkat.available)
libc_name = ctypes.util.find_library('c')
mock_cdll.assert_called_once_with(libc_name, use_errno=True)
self.assertTrue(libc.linkat_retrieved)
@requires_o_tmpfile_support
def test_linkat_success(self):
fd = None
path = None
ret = -1
try:
fd = os.open('/tmp', O_TMPFILE | os.O_WRONLY)
path = os.path.join('/tmp', uuid4().hex)
ret = linkat(linkat.AT_FDCWD, "/proc/self/fd/%d" % (fd),
linkat.AT_FDCWD, path, linkat.AT_SYMLINK_FOLLOW)
self.assertEqual(ret, 0)
self.assertTrue(os.path.exists(path))
finally:
if fd:
os.close(fd)
if path and ret == 0:
# if linkat succeeded, remove file
os.unlink(path)
@mock.patch('swift.common.linkat.linkat._c_linkat')
def test_linkat_fd_not_integer(self, _mock_linkat):
self.assertRaises(TypeError, linkat,
"not_int", None, "not_int", None, 0)
self.assertFalse(_mock_linkat.called)

View File

@ -53,6 +53,7 @@ from tempfile import TemporaryFile, NamedTemporaryFile, mkdtemp
from netifaces import AF_INET6 from netifaces import AF_INET6
from mock import MagicMock, patch from mock import MagicMock, patch
from six.moves.configparser import NoSectionError, NoOptionError from six.moves.configparser import NoSectionError, NoOptionError
from uuid import uuid4
from swift.common.exceptions import Timeout, MessageTimeout, \ from swift.common.exceptions import Timeout, MessageTimeout, \
ConnectionTimeout, LockTimeout, ReplicationLockTimeout, \ ConnectionTimeout, LockTimeout, ReplicationLockTimeout, \
@ -62,7 +63,7 @@ from swift.common.utils import is_valid_ip, is_valid_ipv4, is_valid_ipv6
from swift.common.container_sync_realms import ContainerSyncRealms from swift.common.container_sync_realms import ContainerSyncRealms
from swift.common.header_key_dict import HeaderKeyDict from swift.common.header_key_dict import HeaderKeyDict
from swift.common.swob import Request, Response from swift.common.swob import Request, Response
from test.unit import FakeLogger from test.unit import FakeLogger, requires_o_tmpfile_support
threading = eventlet.patcher.original('threading') threading = eventlet.patcher.original('threading')
@ -3590,6 +3591,79 @@ cluster_dfw1 = http://dfw1.host/v1/
patch('platform.architecture', return_value=('64bit', '')): patch('platform.architecture', return_value=('64bit', '')):
self.assertRaises(OSError, utils.NR_ioprio_set) self.assertRaises(OSError, utils.NR_ioprio_set)
@requires_o_tmpfile_support
def test_link_fd_to_path_linkat_success(self):
tempdir = mkdtemp(dir='/tmp')
fd = os.open(tempdir, utils.O_TMPFILE | os.O_WRONLY)
data = "I'm whatever Gotham needs me to be"
_m_fsync_dir = mock.Mock()
try:
os.write(fd, data)
# fd is O_WRONLY
self.assertRaises(OSError, os.read, fd, 1)
file_path = os.path.join(tempdir, uuid4().hex)
with mock.patch('swift.common.utils.fsync_dir', _m_fsync_dir):
utils.link_fd_to_path(fd, file_path, 1)
with open(file_path, 'r') as f:
self.assertEqual(f.read(), data)
self.assertEqual(_m_fsync_dir.call_count, 2)
finally:
os.close(fd)
shutil.rmtree(tempdir)
@requires_o_tmpfile_support
def test_link_fd_to_path_target_exists(self):
tempdir = mkdtemp(dir='/tmp')
# Create and write to a file
fd, path = tempfile.mkstemp(dir=tempdir)
os.write(fd, "hello world")
os.fsync(fd)
os.close(fd)
self.assertTrue(os.path.exists(path))
fd = os.open(tempdir, utils.O_TMPFILE | os.O_WRONLY)
try:
os.write(fd, "bye world")
os.fsync(fd)
utils.link_fd_to_path(fd, path, 0, fsync=False)
# Original file now should have been over-written
with open(path, 'r') as f:
self.assertEqual(f.read(), "bye world")
finally:
os.close(fd)
shutil.rmtree(tempdir)
@requires_o_tmpfile_support
def test_link_fd_to_path_errno_not_EEXIST_or_ENOENT(self):
_m_linkat = mock.Mock(
side_effect=IOError(errno.EACCES, os.strerror(errno.EACCES)))
with mock.patch('swift.common.utils.linkat', _m_linkat):
try:
utils.link_fd_to_path(0, '/path', 1)
except IOError as err:
self.assertEqual(err.errno, errno.EACCES)
else:
self.fail("Expecting IOError exception")
self.assertTrue(_m_linkat.called)
@requires_o_tmpfile_support
def test_linkat_race_dir_not_exists(self):
tempdir = mkdtemp(dir='/tmp')
target_dir = os.path.join(tempdir, uuid4().hex)
target_path = os.path.join(target_dir, uuid4().hex)
os.mkdir(target_dir)
fd = os.open(target_dir, utils.O_TMPFILE | os.O_WRONLY)
# Simulating directory deletion by other backend process
os.rmdir(target_dir)
self.assertFalse(os.path.exists(target_dir))
try:
utils.link_fd_to_path(fd, target_path, 1)
self.assertTrue(os.path.exists(target_dir))
self.assertTrue(os.path.exists(target_path))
finally:
os.close(fd)
shutil.rmtree(tempdir)
class ResellerConfReader(unittest.TestCase): class ResellerConfReader(unittest.TestCase):

View File

@ -41,12 +41,13 @@ from eventlet import hubs, timeout, tpool
from swift.obj.diskfile import MD5_OF_EMPTY_STRING from swift.obj.diskfile import MD5_OF_EMPTY_STRING
from test.unit import (FakeLogger, mock as unit_mock, temptree, from test.unit import (FakeLogger, mock as unit_mock, temptree,
patch_policies, debug_logger, EMPTY_ETAG, patch_policies, debug_logger, EMPTY_ETAG,
make_timestamp_iter, DEFAULT_TEST_EC_TYPE) make_timestamp_iter, DEFAULT_TEST_EC_TYPE,
requires_o_tmpfile_support)
from nose import SkipTest from nose import SkipTest
from swift.obj import diskfile from swift.obj import diskfile
from swift.common import utils from swift.common import utils
from swift.common.utils import hash_path, mkdirs, Timestamp, encode_timestamps from swift.common.utils import hash_path, mkdirs, Timestamp, \
encode_timestamps, O_TMPFILE
from swift.common import ring from swift.common import ring
from swift.common.splice import splice from swift.common.splice import splice
from swift.common.exceptions import DiskFileNotExist, DiskFileQuarantined, \ from swift.common.exceptions import DiskFileNotExist, DiskFileQuarantined, \
@ -246,6 +247,7 @@ class TestDiskFileModuleMethods(unittest.TestCase):
self.assertFalse(os.path.isdir(tmp_path)) self.assertFalse(os.path.isdir(tmp_path))
pickle_args = (self.existing_device, 'a', 'c', 'o', pickle_args = (self.existing_device, 'a', 'c', 'o',
'data', 0.0, policy) 'data', 0.0, policy)
os.makedirs(tmp_path)
# now create a async update # now create a async update
self.df_mgr.pickle_async_update(*pickle_args) self.df_mgr.pickle_async_update(*pickle_args)
# check tempdir # check tempdir
@ -2560,6 +2562,7 @@ class DiskFileMixin(BaseDiskFileTestMixin):
diskfile.get_tmp_dir(policy)) diskfile.get_tmp_dir(policy))
os.rmdir(tmpdir) os.rmdir(tmpdir)
df = self._simple_get_diskfile(policy=policy) df = self._simple_get_diskfile(policy=policy)
df._use_linkat = False
with df.create(): with df.create():
self.assertTrue(os.path.exists(tmpdir)) self.assertTrue(os.path.exists(tmpdir))
@ -2925,6 +2928,7 @@ class DiskFileMixin(BaseDiskFileTestMixin):
def test_create_mkstemp_no_space(self): def test_create_mkstemp_no_space(self):
df = self.df_mgr.get_diskfile(self.existing_device, '0', 'abc', '123', df = self.df_mgr.get_diskfile(self.existing_device, '0', 'abc', '123',
'xyz', policy=POLICIES.legacy) 'xyz', policy=POLICIES.legacy)
df._use_linkat = False
for e in (errno.ENOSPC, errno.EDQUOT): for e in (errno.ENOSPC, errno.EDQUOT):
with mock.patch("swift.obj.diskfile.mkstemp", with mock.patch("swift.obj.diskfile.mkstemp",
mock.MagicMock(side_effect=OSError( mock.MagicMock(side_effect=OSError(
@ -3844,6 +3848,7 @@ class DiskFileMixin(BaseDiskFileTestMixin):
# Test cleanup when DiskFileNoSpace() is raised. # Test cleanup when DiskFileNoSpace() is raised.
df = self.df_mgr.get_diskfile(self.existing_device, '0', 'abc', '123', df = self.df_mgr.get_diskfile(self.existing_device, '0', 'abc', '123',
'xyz', policy=POLICIES.legacy) 'xyz', policy=POLICIES.legacy)
df._use_linkat = False
_m_fallocate = mock.MagicMock(side_effect=OSError(errno.ENOSPC, _m_fallocate = mock.MagicMock(side_effect=OSError(errno.ENOSPC,
os.strerror(errno.ENOSPC))) os.strerror(errno.ENOSPC)))
_m_unlink = mock.Mock() _m_unlink = mock.Mock()
@ -3866,6 +3871,7 @@ class DiskFileMixin(BaseDiskFileTestMixin):
os.strerror(errno.ENOENT))) os.strerror(errno.ENOENT)))
_m_unlink = mock.Mock() _m_unlink = mock.Mock()
df = self._simple_get_diskfile() df = self._simple_get_diskfile()
df._use_linkat = False
data = '0' * 100 data = '0' * 100
metadata = { metadata = {
'ETag': md5(data).hexdigest(), 'ETag': md5(data).hexdigest(),
@ -3891,6 +3897,7 @@ class DiskFileMixin(BaseDiskFileTestMixin):
# Test logging of os.unlink() failures. # Test logging of os.unlink() failures.
df = self.df_mgr.get_diskfile(self.existing_device, '0', 'abc', '123', df = self.df_mgr.get_diskfile(self.existing_device, '0', 'abc', '123',
'xyz', policy=POLICIES.legacy) 'xyz', policy=POLICIES.legacy)
df._use_linkat = False
_m_fallocate = mock.MagicMock(side_effect=OSError(errno.ENOSPC, _m_fallocate = mock.MagicMock(side_effect=OSError(errno.ENOSPC,
os.strerror(errno.ENOSPC))) os.strerror(errno.ENOSPC)))
_m_unlink = mock.MagicMock(side_effect=OSError(errno.ENOENT, _m_unlink = mock.MagicMock(side_effect=OSError(errno.ENOENT,
@ -3910,6 +3917,86 @@ class DiskFileMixin(BaseDiskFileTestMixin):
for line in error_lines: for line in error_lines:
self.assertTrue(line.startswith("Error removing tempfile:")) self.assertTrue(line.startswith("Error removing tempfile:"))
@requires_o_tmpfile_support
def test_get_tempfile_use_linkat_os_open_called(self):
df = self._simple_get_diskfile()
self.assertTrue(df._use_linkat)
_m_mkstemp = mock.MagicMock()
_m_os_open = mock.Mock(return_value=12345)
_m_mkc = mock.Mock()
with mock.patch("swift.obj.diskfile.mkstemp", _m_mkstemp):
with mock.patch("swift.obj.diskfile.os.open", _m_os_open):
with mock.patch("swift.obj.diskfile.makedirs_count", _m_mkc):
fd, tmppath = df._get_tempfile()
self.assertTrue(_m_mkc.called)
flags = O_TMPFILE | os.O_WRONLY
_m_os_open.assert_called_once_with(df._datadir, flags)
self.assertEqual(tmppath, None)
self.assertEqual(fd, 12345)
self.assertFalse(_m_mkstemp.called)
@requires_o_tmpfile_support
def test_get_tempfile_fallback_to_mkstemp(self):
df = self._simple_get_diskfile()
df._logger = debug_logger()
self.assertTrue(df._use_linkat)
for err in (errno.EOPNOTSUPP, errno.EISDIR, errno.EINVAL):
_m_open = mock.Mock(side_effect=OSError(err, os.strerror(err)))
_m_mkstemp = mock.MagicMock(return_value=(0, "blah"))
_m_mkc = mock.Mock()
with mock.patch("swift.obj.diskfile.os.open", _m_open):
with mock.patch("swift.obj.diskfile.mkstemp", _m_mkstemp):
with mock.patch("swift.obj.diskfile.makedirs_count",
_m_mkc):
fd, tmppath = df._get_tempfile()
self.assertTrue(_m_mkc.called)
# Fallback should succeed and mkstemp() should be called.
self.assertTrue(_m_mkstemp.called)
self.assertEqual(tmppath, "blah")
# Despite fs not supporting O_TMPFILE, use_linkat should not change
self.assertTrue(df._use_linkat)
log = df._logger.get_lines_for_level('warning')
self.assertTrue(len(log) > 0)
self.assertTrue('O_TMPFILE' in log[-1])
@requires_o_tmpfile_support
def test_get_tmpfile_os_open_other_exceptions_are_raised(self):
df = self._simple_get_diskfile()
_m_open = mock.Mock(side_effect=OSError(errno.ENOSPC,
os.strerror(errno.ENOSPC)))
_m_mkstemp = mock.MagicMock()
_m_mkc = mock.Mock()
with mock.patch("swift.obj.diskfile.os.open", _m_open):
with mock.patch("swift.obj.diskfile.mkstemp", _m_mkstemp):
with mock.patch("swift.obj.diskfile.makedirs_count", _m_mkc):
try:
fd, tmppath = df._get_tempfile()
except OSError as err:
self.assertEqual(err.errno, errno.ENOSPC)
else:
self.fail("Expecting ENOSPC")
self.assertTrue(_m_mkc.called)
# mkstemp() should not be invoked.
self.assertFalse(_m_mkstemp.called)
@requires_o_tmpfile_support
def test_create_use_linkat_renamer_not_called(self):
df = self._simple_get_diskfile()
data = '0' * 100
metadata = {
'ETag': md5(data).hexdigest(),
'X-Timestamp': Timestamp(time()).internal,
'Content-Length': str(100),
}
_m_renamer = mock.Mock()
with mock.patch("swift.obj.diskfile.renamer", _m_renamer):
with df.create(size=100) as writer:
writer.write(data)
writer.put(metadata)
self.assertTrue(writer.put_succeeded)
self.assertFalse(_m_renamer.called)
@patch_policies(test_policies) @patch_policies(test_policies)
class TestDiskFile(DiskFileMixin, unittest.TestCase): class TestDiskFile(DiskFileMixin, unittest.TestCase):

View File

@ -2037,9 +2037,14 @@ class TestObjectController(unittest.TestCase):
# diskfile open won't succeed because no durable was written, # diskfile open won't succeed because no durable was written,
# so look under the hood for data files. # so look under the hood for data files.
files = os.listdir(df._datadir) files = os.listdir(df._datadir)
num_data_files = len([f for f in files if f.endswith('.data')]) if len(files) > 0:
self.assertEqual(1, num_data_files) # Although the third fragment archive hasn't landed on
found += 1 # disk, the directory df._datadir is pre-maturely created
# and is empty when we use O_TMPFILE + linkat()
num_data_files = \
len([f for f in files if f.endswith('.data')])
self.assertEqual(1, num_data_files)
found += 1
except OSError: except OSError:
pass pass
self.assertEqual(found, 2) self.assertEqual(found, 2)
@ -2096,7 +2101,8 @@ class TestObjectController(unittest.TestCase):
df = df_mgr.get_diskfile(node['device'], partition, df = df_mgr.get_diskfile(node['device'], partition,
'a', 'ec-con', 'quorum', 'a', 'ec-con', 'quorum',
policy=POLICIES[3]) policy=POLICIES[3])
self.assertFalse(os.path.exists(df._datadir)) if os.path.exists(df._datadir):
self.assertFalse(os.listdir(df._datadir)) # should be empty
@unpatch_policies @unpatch_policies
def test_PUT_ec_fragment_quorum_bad_request(self): def test_PUT_ec_fragment_quorum_bad_request(self):
@ -2154,7 +2160,8 @@ class TestObjectController(unittest.TestCase):
df = df_mgr.get_diskfile(node['device'], partition, df = df_mgr.get_diskfile(node['device'], partition,
'a', 'ec-con', 'quorum', 'a', 'ec-con', 'quorum',
policy=POLICIES[3]) policy=POLICIES[3])
self.assertFalse(os.path.exists(df._datadir)) if os.path.exists(df._datadir):
self.assertFalse(os.listdir(df._datadir)) # should be empty
@unpatch_policies @unpatch_policies
def test_PUT_ec_if_none_match(self): def test_PUT_ec_if_none_match(self):