relinker: Remove replication locks for empty parts
Otherwise, the rmdir will fail. Drive-by: pull the 10s default for various locks up to a module-level DEFAULT_LOCK_TIMEOUT that can be monkey-patched. Change-Id: I559f40d04bf30a74b507d9428c532111a7152b28
This commit is contained in:
@@ -24,10 +24,11 @@ import os
|
|||||||
import time
|
import time
|
||||||
from collections import defaultdict
|
from collections import defaultdict
|
||||||
|
|
||||||
|
from swift.common.exceptions import LockTimeout
|
||||||
from swift.common.storage_policy import POLICIES
|
from swift.common.storage_policy import POLICIES
|
||||||
from swift.common.utils import replace_partition_in_path, config_true_value, \
|
from swift.common.utils import replace_partition_in_path, config_true_value, \
|
||||||
audit_location_generator, get_logger, readconf, drop_privileges, \
|
audit_location_generator, get_logger, readconf, drop_privileges, \
|
||||||
RateLimitedIterator, lock_path, PrefixLoggerAdapter, distribute_evenly, \
|
RateLimitedIterator, PrefixLoggerAdapter, distribute_evenly, \
|
||||||
non_negative_float, non_negative_int, config_auto_int_value, \
|
non_negative_float, non_negative_int, config_auto_int_value, \
|
||||||
dump_recon_cache, get_partition_from_path
|
dump_recon_cache, get_partition_from_path
|
||||||
from swift.obj import diskfile
|
from swift.obj import diskfile
|
||||||
@@ -325,8 +326,11 @@ class Relinker(object):
|
|||||||
device, dirty_partition, [], self.policy)
|
device, dirty_partition, [], self.policy)
|
||||||
|
|
||||||
if self.do_cleanup:
|
if self.do_cleanup:
|
||||||
hashes = self.diskfile_mgr.get_hashes(
|
try:
|
||||||
device, int(partition), [], self.policy)
|
hashes = self.diskfile_mgr.get_hashes(
|
||||||
|
device, int(partition), [], self.policy)
|
||||||
|
except LockTimeout:
|
||||||
|
hashes = 1 # truthy, but invalid
|
||||||
# In any reasonably-large cluster, we'd expect all old
|
# In any reasonably-large cluster, we'd expect all old
|
||||||
# partitions P to be empty after cleanup (i.e., it's unlikely
|
# partitions P to be empty after cleanup (i.e., it's unlikely
|
||||||
# that there's another partition Q := P//2 that also has data
|
# that there's another partition Q := P//2 that also has data
|
||||||
@@ -338,21 +342,24 @@ class Relinker(object):
|
|||||||
# starts and little data is written to handoffs during the
|
# starts and little data is written to handoffs during the
|
||||||
# increase).
|
# increase).
|
||||||
if not hashes:
|
if not hashes:
|
||||||
with lock_path(partition_path):
|
try:
|
||||||
# Same lock used by invalidate_hashes, consolidate_hashes,
|
with self.diskfile_mgr.replication_lock(
|
||||||
# get_hashes
|
device, self.policy, partition), \
|
||||||
try:
|
self.diskfile_mgr.partition_lock(
|
||||||
for f in ('hashes.pkl', 'hashes.invalid', '.lock'):
|
device, self.policy, partition):
|
||||||
|
# Order here is somewhat important for crash-tolerance
|
||||||
|
for f in ('hashes.pkl', 'hashes.invalid', '.lock',
|
||||||
|
'.lock-replication'):
|
||||||
try:
|
try:
|
||||||
os.unlink(os.path.join(partition_path, f))
|
os.unlink(os.path.join(partition_path, f))
|
||||||
except OSError as e:
|
except OSError as e:
|
||||||
if e.errno != errno.ENOENT:
|
if e.errno != errno.ENOENT:
|
||||||
raise
|
raise
|
||||||
except OSError:
|
# Note that as soon as we've deleted the lock files, some
|
||||||
pass
|
# other process could come along and make new ones -- so
|
||||||
try:
|
# this may well complain that the directory is not empty
|
||||||
os.rmdir(partition_path)
|
os.rmdir(partition_path)
|
||||||
except OSError:
|
except (OSError, LockTimeout):
|
||||||
# Most likely, some data landed in here or we hit an error
|
# Most likely, some data landed in here or we hit an error
|
||||||
# above. Let the replicator deal with things; it was worth
|
# above. Let the replicator deal with things; it was worth
|
||||||
# a shot.
|
# a shot.
|
||||||
@@ -512,7 +519,17 @@ class Relinker(object):
|
|||||||
if created_links:
|
if created_links:
|
||||||
self.linked_into_partitions.add(get_partition_from_path(
|
self.linked_into_partitions.add(get_partition_from_path(
|
||||||
self.conf['devices'], new_hash_path))
|
self.conf['devices'], new_hash_path))
|
||||||
diskfile.invalidate_hash(os.path.dirname(new_hash_path))
|
try:
|
||||||
|
diskfile.invalidate_hash(os.path.dirname(new_hash_path))
|
||||||
|
except (Exception, LockTimeout) as exc:
|
||||||
|
# at this point, the link's created. even if we counted it as
|
||||||
|
# an error, a subsequent run wouldn't find any work to do. so,
|
||||||
|
# don't bother; instead, wait for replication to be re-enabled
|
||||||
|
# so post-replication rehashing or periodic rehashing can
|
||||||
|
# eventually pick up the change
|
||||||
|
self.logger.warning(
|
||||||
|
'Error invalidating suffix for %s: %r',
|
||||||
|
new_hash_path, exc)
|
||||||
|
|
||||||
if self.do_cleanup and not missing_links:
|
if self.do_cleanup and not missing_links:
|
||||||
# use the sorted list to help unit testing
|
# use the sorted list to help unit testing
|
||||||
@@ -539,7 +556,7 @@ class Relinker(object):
|
|||||||
# relinking into the new part-power space
|
# relinking into the new part-power space
|
||||||
try:
|
try:
|
||||||
diskfile.invalidate_hash(os.path.dirname(hash_path))
|
diskfile.invalidate_hash(os.path.dirname(hash_path))
|
||||||
except Exception as exc:
|
except (Exception, LockTimeout) as exc:
|
||||||
# note: not counted as an error
|
# note: not counted as an error
|
||||||
self.logger.warning(
|
self.logger.warning(
|
||||||
'Error invalidating suffix for %s: %r',
|
'Error invalidating suffix for %s: %r',
|
||||||
|
@@ -205,6 +205,7 @@ LOG_LINE_DEFAULT_FORMAT = '{remote_addr} - - [{time.d}/{time.b}/{time.Y}' \
|
|||||||
'"{referer}" "{txn_id}" "{user_agent}" ' \
|
'"{referer}" "{txn_id}" "{user_agent}" ' \
|
||||||
'{trans_time:.4f} "{additional_info}" {pid} ' \
|
'{trans_time:.4f} "{additional_info}" {pid} ' \
|
||||||
'{policy_index}'
|
'{policy_index}'
|
||||||
|
DEFAULT_LOCK_TIMEOUT = 10
|
||||||
|
|
||||||
|
|
||||||
class InvalidHashPathConfigError(ValueError):
|
class InvalidHashPathConfigError(ValueError):
|
||||||
@@ -2849,7 +2850,8 @@ def _get_any_lock(fds):
|
|||||||
|
|
||||||
|
|
||||||
@contextmanager
|
@contextmanager
|
||||||
def lock_path(directory, timeout=10, timeout_class=None, limit=1, name=None):
|
def lock_path(directory, timeout=None, timeout_class=None,
|
||||||
|
limit=1, name=None):
|
||||||
"""
|
"""
|
||||||
Context manager that acquires a lock on a directory. This will block until
|
Context manager that acquires a lock on a directory. This will block until
|
||||||
the lock can be acquired, or the timeout time has expired (whichever occurs
|
the lock can be acquired, or the timeout time has expired (whichever occurs
|
||||||
@@ -2860,7 +2862,8 @@ def lock_path(directory, timeout=10, timeout_class=None, limit=1, name=None):
|
|||||||
workaround by locking a hidden file in the directory.
|
workaround by locking a hidden file in the directory.
|
||||||
|
|
||||||
:param directory: directory to be locked
|
:param directory: directory to be locked
|
||||||
:param timeout: timeout (in seconds)
|
:param timeout: timeout (in seconds). If None, defaults to
|
||||||
|
DEFAULT_LOCK_TIMEOUT
|
||||||
:param timeout_class: The class of the exception to raise if the
|
:param timeout_class: The class of the exception to raise if the
|
||||||
lock cannot be granted within the timeout. Will be
|
lock cannot be granted within the timeout. Will be
|
||||||
constructed as timeout_class(timeout, lockpath). Default:
|
constructed as timeout_class(timeout, lockpath). Default:
|
||||||
@@ -2874,10 +2877,12 @@ def lock_path(directory, timeout=10, timeout_class=None, limit=1, name=None):
|
|||||||
:raises TypeError: if limit is not an int.
|
:raises TypeError: if limit is not an int.
|
||||||
:raises ValueError: if limit is less than 1.
|
:raises ValueError: if limit is less than 1.
|
||||||
"""
|
"""
|
||||||
if limit < 1:
|
if timeout is None:
|
||||||
raise ValueError('limit must be greater than or equal to 1')
|
timeout = DEFAULT_LOCK_TIMEOUT
|
||||||
if timeout_class is None:
|
if timeout_class is None:
|
||||||
timeout_class = swift.common.exceptions.LockTimeout
|
timeout_class = swift.common.exceptions.LockTimeout
|
||||||
|
if limit < 1:
|
||||||
|
raise ValueError('limit must be greater than or equal to 1')
|
||||||
mkdirs(directory)
|
mkdirs(directory)
|
||||||
lockpath = '%s/.lock' % directory
|
lockpath = '%s/.lock' % directory
|
||||||
if name:
|
if name:
|
||||||
@@ -2905,17 +2910,20 @@ def lock_path(directory, timeout=10, timeout_class=None, limit=1, name=None):
|
|||||||
|
|
||||||
|
|
||||||
@contextmanager
|
@contextmanager
|
||||||
def lock_file(filename, timeout=10, append=False, unlink=True):
|
def lock_file(filename, timeout=None, append=False, unlink=True):
|
||||||
"""
|
"""
|
||||||
Context manager that acquires a lock on a file. This will block until
|
Context manager that acquires a lock on a file. This will block until
|
||||||
the lock can be acquired, or the timeout time has expired (whichever occurs
|
the lock can be acquired, or the timeout time has expired (whichever occurs
|
||||||
first).
|
first).
|
||||||
|
|
||||||
:param filename: file to be locked
|
:param filename: file to be locked
|
||||||
:param timeout: timeout (in seconds)
|
:param timeout: timeout (in seconds). If None, defaults to
|
||||||
|
DEFAULT_LOCK_TIMEOUT
|
||||||
:param append: True if file should be opened in append mode
|
:param append: True if file should be opened in append mode
|
||||||
:param unlink: True if the file should be unlinked at the end
|
:param unlink: True if the file should be unlinked at the end
|
||||||
"""
|
"""
|
||||||
|
if timeout is None:
|
||||||
|
timeout = DEFAULT_LOCK_TIMEOUT
|
||||||
flags = os.O_CREAT | os.O_RDWR
|
flags = os.O_CREAT | os.O_RDWR
|
||||||
if append:
|
if append:
|
||||||
flags |= os.O_APPEND
|
flags |= os.O_APPEND
|
||||||
@@ -2950,14 +2958,15 @@ def lock_file(filename, timeout=10, append=False, unlink=True):
|
|||||||
file_obj.close()
|
file_obj.close()
|
||||||
|
|
||||||
|
|
||||||
def lock_parent_directory(filename, timeout=10):
|
def lock_parent_directory(filename, timeout=None):
|
||||||
"""
|
"""
|
||||||
Context manager that acquires a lock on the parent directory of the given
|
Context manager that acquires a lock on the parent directory of the given
|
||||||
file path. This will block until the lock can be acquired, or the timeout
|
file path. This will block until the lock can be acquired, or the timeout
|
||||||
time has expired (whichever occurs first).
|
time has expired (whichever occurs first).
|
||||||
|
|
||||||
:param filename: file path of the parent directory to be locked
|
:param filename: file path of the parent directory to be locked
|
||||||
:param timeout: timeout (in seconds)
|
:param timeout: timeout (in seconds). If None, defaults to
|
||||||
|
DEFAULT_LOCK_TIMEOUT
|
||||||
"""
|
"""
|
||||||
return lock_path(os.path.dirname(filename), timeout=timeout)
|
return lock_path(os.path.dirname(filename), timeout=timeout)
|
||||||
|
|
||||||
|
@@ -38,7 +38,7 @@ from swift.common.storage_policy import (
|
|||||||
get_policy_string)
|
get_policy_string)
|
||||||
|
|
||||||
from swift.obj.diskfile import write_metadata, DiskFileRouter, \
|
from swift.obj.diskfile import write_metadata, DiskFileRouter, \
|
||||||
DiskFileManager, relink_paths
|
DiskFileManager, relink_paths, BaseDiskFileManager
|
||||||
|
|
||||||
from test.debug_logger import debug_logger
|
from test.debug_logger import debug_logger
|
||||||
from test.unit import skip_if_no_xattrs, DEFAULT_TEST_EC_TYPE, \
|
from test.unit import skip_if_no_xattrs, DEFAULT_TEST_EC_TYPE, \
|
||||||
@@ -206,10 +206,12 @@ class TestRelinker(unittest.TestCase):
|
|||||||
@contextmanager
|
@contextmanager
|
||||||
def _mock_relinker(self):
|
def _mock_relinker(self):
|
||||||
with mock.patch.object(relinker.logging, 'getLogger',
|
with mock.patch.object(relinker.logging, 'getLogger',
|
||||||
return_value=self.logger):
|
return_value=self.logger), \
|
||||||
with mock.patch('swift.cli.relinker.DEFAULT_RECON_CACHE_PATH',
|
mock.patch.object(relinker, 'get_logger',
|
||||||
self.recon_cache_path):
|
return_value=self.logger), \
|
||||||
yield
|
mock.patch('swift.cli.relinker.DEFAULT_RECON_CACHE_PATH',
|
||||||
|
self.recon_cache_path):
|
||||||
|
yield
|
||||||
|
|
||||||
def test_workers_parent(self):
|
def test_workers_parent(self):
|
||||||
os.mkdir(os.path.join(self.devices, 'sda2'))
|
os.mkdir(os.path.join(self.devices, 'sda2'))
|
||||||
@@ -3394,7 +3396,7 @@ class TestRelinker(unittest.TestCase):
|
|||||||
'(0 files, 0 linked, 1 removed, 0 errors)', info_lines)
|
'(0 files, 0 linked, 1 removed, 0 errors)', info_lines)
|
||||||
self.assertEqual([], self.logger.get_lines_for_level('error'))
|
self.assertEqual([], self.logger.get_lines_for_level('error'))
|
||||||
|
|
||||||
def test_cleanup_old_part_careful(self):
|
def test_cleanup_old_part_careful_file(self):
|
||||||
self._common_test_cleanup()
|
self._common_test_cleanup()
|
||||||
# make some extra junk file in the part
|
# make some extra junk file in the part
|
||||||
extra_file = os.path.join(self.part_dir, 'extra')
|
extra_file = os.path.join(self.part_dir, 'extra')
|
||||||
@@ -3411,6 +3413,124 @@ class TestRelinker(unittest.TestCase):
|
|||||||
self.assertTrue(os.path.exists(self.part_dir))
|
self.assertTrue(os.path.exists(self.part_dir))
|
||||||
self.assertEqual([], self.logger.get_lines_for_level('error'))
|
self.assertEqual([], self.logger.get_lines_for_level('error'))
|
||||||
|
|
||||||
|
def test_cleanup_old_part_careful_dir(self):
|
||||||
|
self._common_test_cleanup()
|
||||||
|
# make some extra junk directory in the part
|
||||||
|
extra_dir = os.path.join(self.part_dir, 'extra')
|
||||||
|
os.mkdir(extra_dir)
|
||||||
|
self.assertEqual(0, relinker.main([
|
||||||
|
'cleanup',
|
||||||
|
'--swift-dir', self.testdir,
|
||||||
|
'--devices', self.devices,
|
||||||
|
'--skip-mount',
|
||||||
|
]))
|
||||||
|
# old partition can't be cleaned up
|
||||||
|
self.assertTrue(os.path.exists(self.part_dir))
|
||||||
|
self.assertTrue(os.path.exists(extra_dir))
|
||||||
|
|
||||||
|
def test_cleanup_old_part_replication_lock_taken(self):
|
||||||
|
# verify that relinker must take the replication lock before deleting
|
||||||
|
# it, and handles the LockTimeout when unable to take it
|
||||||
|
self._common_test_cleanup()
|
||||||
|
|
||||||
|
config = """
|
||||||
|
[DEFAULT]
|
||||||
|
swift_dir = %s
|
||||||
|
devices = %s
|
||||||
|
mount_check = false
|
||||||
|
replication_lock_timeout = 1
|
||||||
|
|
||||||
|
[object-relinker]
|
||||||
|
""" % (self.testdir, self.devices)
|
||||||
|
conf_file = os.path.join(self.testdir, 'relinker.conf')
|
||||||
|
with open(conf_file, 'w') as f:
|
||||||
|
f.write(dedent(config))
|
||||||
|
|
||||||
|
with utils.lock_path(self.part_dir, name='replication'):
|
||||||
|
# lock taken so relinker should be unable to remove the lock file
|
||||||
|
with self._mock_relinker():
|
||||||
|
self.assertEqual(0, relinker.main(['cleanup', conf_file]))
|
||||||
|
# old partition can't be cleaned up
|
||||||
|
self.assertTrue(os.path.exists(self.part_dir))
|
||||||
|
self.assertTrue(os.path.exists(
|
||||||
|
os.path.join(self.part_dir, '.lock-replication')))
|
||||||
|
self.assertEqual([], self.logger.get_lines_for_level('error'))
|
||||||
|
|
||||||
|
def test_cleanup_old_part_partition_lock_taken_during_get_hashes(self):
|
||||||
|
# verify that relinker handles LockTimeouts when rehashing
|
||||||
|
self._common_test_cleanup()
|
||||||
|
|
||||||
|
config = """
|
||||||
|
[DEFAULT]
|
||||||
|
swift_dir = %s
|
||||||
|
devices = %s
|
||||||
|
mount_check = false
|
||||||
|
replication_lock_timeout = 1
|
||||||
|
|
||||||
|
[object-relinker]
|
||||||
|
""" % (self.testdir, self.devices)
|
||||||
|
conf_file = os.path.join(self.testdir, 'relinker.conf')
|
||||||
|
with open(conf_file, 'w') as f:
|
||||||
|
f.write(dedent(config))
|
||||||
|
|
||||||
|
orig_get_hashes = BaseDiskFileManager.get_hashes
|
||||||
|
|
||||||
|
def new_get_hashes(*args, **kwargs):
|
||||||
|
# lock taken so relinker should be unable to rehash
|
||||||
|
with utils.lock_path(self.part_dir):
|
||||||
|
return orig_get_hashes(*args, **kwargs)
|
||||||
|
|
||||||
|
with self._mock_relinker(), \
|
||||||
|
mock.patch('swift.common.utils.DEFAULT_LOCK_TIMEOUT', 0.1), \
|
||||||
|
mock.patch.object(BaseDiskFileManager,
|
||||||
|
'get_hashes', new_get_hashes):
|
||||||
|
self.assertEqual(0, relinker.main(['cleanup', conf_file]))
|
||||||
|
# old partition can't be cleaned up
|
||||||
|
self.assertTrue(os.path.exists(self.part_dir))
|
||||||
|
self.assertTrue(os.path.exists(
|
||||||
|
os.path.join(self.part_dir, '.lock')))
|
||||||
|
self.assertEqual([], self.logger.get_lines_for_level('error'))
|
||||||
|
self.assertEqual([], self.logger.get_lines_for_level('warning'))
|
||||||
|
|
||||||
|
def test_cleanup_old_part_lock_taken_between_get_hashes_and_rm(self):
|
||||||
|
# verify that relinker must take the partition lock before deleting
|
||||||
|
# it, and handles the LockTimeout when unable to take it
|
||||||
|
self._common_test_cleanup()
|
||||||
|
|
||||||
|
config = """
|
||||||
|
[DEFAULT]
|
||||||
|
swift_dir = %s
|
||||||
|
devices = %s
|
||||||
|
mount_check = false
|
||||||
|
replication_lock_timeout = 1
|
||||||
|
|
||||||
|
[object-relinker]
|
||||||
|
""" % (self.testdir, self.devices)
|
||||||
|
conf_file = os.path.join(self.testdir, 'relinker.conf')
|
||||||
|
with open(conf_file, 'w') as f:
|
||||||
|
f.write(dedent(config))
|
||||||
|
|
||||||
|
orig_replication_lock = BaseDiskFileManager.replication_lock
|
||||||
|
|
||||||
|
@contextmanager
|
||||||
|
def new_lock(*args, **kwargs):
|
||||||
|
# lock taken so relinker should be unable to rehash
|
||||||
|
with utils.lock_path(self.part_dir):
|
||||||
|
with orig_replication_lock(*args, **kwargs) as cm:
|
||||||
|
yield cm
|
||||||
|
|
||||||
|
with self._mock_relinker(), \
|
||||||
|
mock.patch('swift.common.utils.DEFAULT_LOCK_TIMEOUT', 0.1), \
|
||||||
|
mock.patch.object(BaseDiskFileManager,
|
||||||
|
'replication_lock', new_lock):
|
||||||
|
self.assertEqual(0, relinker.main(['cleanup', conf_file]))
|
||||||
|
# old partition can't be cleaned up
|
||||||
|
self.assertTrue(os.path.exists(self.part_dir))
|
||||||
|
self.assertTrue(os.path.exists(
|
||||||
|
os.path.join(self.part_dir, '.lock')))
|
||||||
|
self.assertEqual([], self.logger.get_lines_for_level('error'))
|
||||||
|
self.assertEqual([], self.logger.get_lines_for_level('warning'))
|
||||||
|
|
||||||
def test_cleanup_old_part_robust(self):
|
def test_cleanup_old_part_robust(self):
|
||||||
self._common_test_cleanup()
|
self._common_test_cleanup()
|
||||||
|
|
||||||
@@ -3425,6 +3545,10 @@ class TestRelinker(unittest.TestCase):
|
|||||||
set(os.listdir(self.part_dir)))
|
set(os.listdir(self.part_dir)))
|
||||||
# unlink a random file, should be empty
|
# unlink a random file, should be empty
|
||||||
os.unlink(os.path.join(self.part_dir, 'hashes.pkl'))
|
os.unlink(os.path.join(self.part_dir, 'hashes.pkl'))
|
||||||
|
# create an ssync replication lock, too
|
||||||
|
with open(os.path.join(self.part_dir,
|
||||||
|
'.lock-replication'), 'w'):
|
||||||
|
pass
|
||||||
calls.append(True)
|
calls.append(True)
|
||||||
elif part == self.next_part:
|
elif part == self.next_part:
|
||||||
# sometimes our random obj needs to rehash the next part too
|
# sometimes our random obj needs to rehash the next part too
|
||||||
|
Reference in New Issue
Block a user