Fix a race condition in case of cross-replication

In a situation where two nodes does not have the same version of a ring
and they both think the other node is the primary node of a partition,
a race condition can lead to the loss of some of the objects of the
partition.

The following sequence leads to the loss of some of the objects:

  1. A gets and reloads the new ring
  2. A starts to replicate/revert the partition P to node B
  3. B (with the old ring) starts to replicate/revert the (partial)
     partition P to node A
     => replication should be fast as all objects are already on node A
  4. B finished replication of (partial) partition P to node A
  5. B remove the (partial) partition P after replication succeeded
  6. A finishes replication of partition P to node B
  7. A removes the partition P
  8. B gets and reloads the new ring

All data transfered between steps 2 and 5 will be lost as they are not
anymore on node B and they are also removed from node A.

This commit make the replicator/reconstructor to hold a replication_lock
on partition P so that remote node cannot start an opposite replication.

Change-Id: I29acc1302a75ed52c935f42485f775cd41648e4d
Closes-Bug: #1897177
This commit is contained in:
Romain LE DISEZ
2020-09-24 20:36:36 -04:00
parent 004052dc65
commit 8c0a1abf74
8 changed files with 207 additions and 103 deletions

View File

@@ -215,6 +215,10 @@ class ReplicationLockTimeout(LockTimeout):
pass
class PartitionLockTimeout(LockTimeout):
pass
class MimeInvalid(SwiftException):
pass

View File

@@ -72,7 +72,7 @@ from swift.common.exceptions import DiskFileQuarantined, DiskFileNotExist, \
DiskFileCollision, DiskFileNoSpace, DiskFileDeviceUnavailable, \
DiskFileDeleted, DiskFileError, DiskFileNotOpen, PathNotDir, \
ReplicationLockTimeout, DiskFileExpired, DiskFileXattrNotSupported, \
DiskFileBadMetadataChecksum
DiskFileBadMetadataChecksum, PartitionLockTimeout
from swift.common.swob import multi_range_iterator
from swift.common.storage_policy import (
get_policy_string, split_policy_string, PolicyError, POLICIES,
@@ -1333,8 +1333,8 @@ class BaseDiskFileManager(object):
@contextmanager
def replication_lock(self, device, policy, partition):
"""
A context manager that will lock on the device given, if
configured to do so.
A context manager that will lock on the partition and, if configured
to do so, on the device given.
:param device: name of target device
:param policy: policy targeted by the replication request
@@ -1342,24 +1342,36 @@ class BaseDiskFileManager(object):
:raises ReplicationLockTimeout: If the lock on the device
cannot be granted within the configured timeout.
"""
if self.replication_concurrency_per_device:
dev_path = self.get_dev_path(device)
part_path = os.path.join(dev_path, get_data_dir(policy),
str(partition))
limit_time = time.time() + self.replication_lock_timeout
with lock_path(
dev_path,
timeout=self.replication_lock_timeout,
timeout_class=ReplicationLockTimeout,
limit=self.replication_concurrency_per_device):
with lock_path(
part_path,
timeout=limit_time - time.time(),
timeout_class=ReplicationLockTimeout,
limit=1,
name='replication'):
limit_time = time.time() + self.replication_lock_timeout
with self.partition_lock(device, policy, partition, name='replication',
timeout=self.replication_lock_timeout):
if self.replication_concurrency_per_device:
with lock_path(self.get_dev_path(device),
timeout=limit_time - time.time(),
timeout_class=ReplicationLockTimeout,
limit=self.replication_concurrency_per_device):
yield True
else:
else:
yield True
@contextmanager
def partition_lock(self, device, policy, partition, name=None,
timeout=None):
"""
A context manager that will lock on the partition given.
:param device: device targeted by the lock request
:param policy: policy targeted by the lock request
:param partition: partition targeted by the lock request
:raises PartitionLockTimeout: If the lock on the partition
cannot be granted within the configured timeout.
"""
if timeout is None:
timeout = self.replication_lock_timeout
part_path = os.path.join(self.get_dev_path(device),
get_data_dir(policy), str(partition))
with lock_path(part_path, timeout=timeout,
timeout_class=PartitionLockTimeout, limit=1, name=name):
yield True
def pickle_async_update(self, device, account, container, obj, data,

View File

@@ -45,7 +45,7 @@ from swift.obj.diskfile import DiskFileRouter, get_data_dir, \
get_tmp_dir
from swift.common.storage_policy import POLICIES, EC_POLICY
from swift.common.exceptions import ConnectionTimeout, DiskFileError, \
SuffixSyncError
SuffixSyncError, PartitionLockTimeout
SYNC, REVERT = ('sync_only', 'sync_revert')
@@ -879,19 +879,34 @@ class ObjectReconstructor(Daemon):
'partition.delete.count.%s' % (job['local_dev']['device'],))
syncd_with = 0
reverted_objs = {}
for node in job['sync_to']:
node['backend_index'] = job['policy'].get_backend_index(
node['index'])
success, in_sync_objs = ssync_sender(
self, node, job, job['suffixes'])()
if success:
self.rehash_remote(node, job, job['suffixes'])
syncd_with += 1
reverted_objs.update(in_sync_objs)
if syncd_with >= len(job['sync_to']):
self.delete_reverted_objs(
job, reverted_objs, job['frag_index'])
else:
try:
df_mgr = self._df_router[job['policy']]
# Only object-server can take this lock if an incoming SSYNC is
# running on the same partition. Taking the lock here ensure we
# won't enter a race condition where both nodes try to
# cross-replicate the same partition and both delete it.
with df_mgr.partition_lock(job['device'], job['policy'],
job['partition'], name='replication',
timeout=0.2):
for node in job['sync_to']:
node['backend_index'] = job['policy'].get_backend_index(
node['index'])
success, in_sync_objs = ssync_sender(
self, node, job, job['suffixes'])()
if success:
self.rehash_remote(node, job, job['suffixes'])
syncd_with += 1
reverted_objs.update(in_sync_objs)
if syncd_with >= len(job['sync_to']):
self.delete_reverted_objs(
job, reverted_objs, job['frag_index'])
else:
self.handoffs_remaining += 1
except PartitionLockTimeout:
self.logger.info("Unable to lock handoff partition %d for revert "
"on device %s policy %d",
job['partition'], job['device'], job['policy'])
self.logger.increment('partition.lock-failure.count')
self.handoffs_remaining += 1
self.logger.timing_since('partition.delete.timing', begin)

View File

@@ -43,6 +43,7 @@ from swift.common.http import HTTP_OK, HTTP_INSUFFICIENT_STORAGE
from swift.obj import ssync_sender
from swift.obj.diskfile import get_data_dir, get_tmp_dir, DiskFileRouter
from swift.common.storage_policy import POLICIES, REPL_POLICY
from swift.common.exceptions import PartitionLockTimeout
DEFAULT_RSYNC_TIMEOUT = 900
@@ -498,75 +499,90 @@ class ObjectReplicator(Daemon):
begin = time.time()
handoff_partition_deleted = False
try:
responses = []
suffixes = tpool.execute(tpool_get_suffixes, job['path'])
synced_remote_regions = {}
delete_objs = None
if suffixes:
for node in job['nodes']:
stats.rsync += 1
kwargs = {}
if node['region'] in synced_remote_regions and \
self.conf.get('sync_method', 'rsync') == 'ssync':
kwargs['remote_check_objs'] = \
synced_remote_regions[node['region']]
# candidates is a dict(hash=>timestamp) of objects
# for deletion
success, candidates = self.sync(
node, job, suffixes, **kwargs)
if success:
with Timeout(self.http_timeout):
conn = http_connect(
node['replication_ip'],
node['replication_port'],
node['device'], job['partition'], 'REPLICATE',
'/' + '-'.join(suffixes), headers=headers)
conn.getresponse().read()
if node['region'] != job['region']:
synced_remote_regions[node['region']] = viewkeys(
candidates)
else:
failure_devs_info.add((node['replication_ip'],
node['device']))
responses.append(success)
for cand_objs in synced_remote_regions.values():
if delete_objs is None:
delete_objs = cand_objs
else:
delete_objs = delete_objs & cand_objs
df_mgr = self._df_router[job['policy']]
# Only object-server can take this lock if an incoming SSYNC is
# running on the same partition. Taking the lock here ensure we
# won't enter a race condition where both nodes try to
# cross-replicate the same partition and both delete it.
with df_mgr.partition_lock(job['device'], job['policy'],
job['partition'], name='replication',
timeout=0.2):
responses = []
suffixes = tpool.execute(tpool_get_suffixes, job['path'])
synced_remote_regions = {}
delete_objs = None
if suffixes:
for node in job['nodes']:
stats.rsync += 1
kwargs = {}
if self.conf.get('sync_method', 'rsync') == 'ssync' \
and node['region'] in synced_remote_regions:
kwargs['remote_check_objs'] = \
synced_remote_regions[node['region']]
# candidates is a dict(hash=>timestamp) of objects
# for deletion
success, candidates = self.sync(
node, job, suffixes, **kwargs)
if success:
with Timeout(self.http_timeout):
conn = http_connect(
node['replication_ip'],
node['replication_port'],
node['device'], job['partition'],
'REPLICATE', '/' + '-'.join(suffixes),
headers=headers)
conn.getresponse().read()
if node['region'] != job['region']:
synced_remote_regions[node['region']] = \
viewkeys(candidates)
else:
failure_devs_info.add((node['replication_ip'],
node['device']))
responses.append(success)
for cand_objs in synced_remote_regions.values():
if delete_objs is None:
delete_objs = cand_objs
else:
delete_objs = delete_objs & cand_objs
if self.handoff_delete:
# delete handoff if we have had handoff_delete successes
delete_handoff = len([resp for resp in responses if resp]) >= \
self.handoff_delete
else:
# delete handoff if all syncs were successful
delete_handoff = len(responses) == len(job['nodes']) and \
all(responses)
if delete_handoff:
stats.remove += 1
if (self.conf.get('sync_method', 'rsync') == 'ssync' and
delete_objs is not None):
self.logger.info(_("Removing %s objects"),
len(delete_objs))
_junk, error_paths = self.delete_handoff_objs(
job, delete_objs)
# if replication works for a hand-off device and it failed,
# the remote devices which are target of the replication
# from the hand-off device will be marked. Because cleanup
# after replication failed means replicator needs to
# replicate again with the same info.
if error_paths:
failure_devs_info.update(
[(failure_dev['replication_ip'],
failure_dev['device'])
for failure_dev in job['nodes']])
if self.handoff_delete:
# delete handoff if we have had handoff_delete successes
successes_count = len([resp for resp in responses if resp])
delete_handoff = successes_count >= self.handoff_delete
else:
# delete handoff if all syncs were successful
delete_handoff = len(responses) == len(job['nodes']) and \
all(responses)
if delete_handoff:
stats.remove += 1
if (self.conf.get('sync_method', 'rsync') == 'ssync' and
delete_objs is not None):
self.logger.info(_("Removing %s objects"),
len(delete_objs))
_junk, error_paths = self.delete_handoff_objs(
job, delete_objs)
# if replication works for a hand-off device and it
# failed, the remote devices which are target of the
# replication from the hand-off device will be marked.
# Because cleanup after replication failed means
# replicator needs to replicate again with the same
# info.
if error_paths:
failure_devs_info.update(
[(failure_dev['replication_ip'],
failure_dev['device'])
for failure_dev in job['nodes']])
else:
self.delete_partition(job['path'])
handoff_partition_deleted = True
elif not suffixes:
self.delete_partition(job['path'])
handoff_partition_deleted = True
elif not suffixes:
self.delete_partition(job['path'])
handoff_partition_deleted = True
except PartitionLockTimeout:
self.logger.info("Unable to lock handoff partition %d for "
"replication on device %s policy %d",
job['partition'], job['device'], job['policy'])
self.logger.increment('partition.lock-failure.count')
except (Exception, Timeout):
self.logger.exception(_("Error syncing handoff partition"))
stats.add_failure_stats(failure_devs_info)

View File

@@ -173,7 +173,7 @@ class Receiver(object):
finally:
if self.app.replication_semaphore:
self.app.replication_semaphore.release()
except exceptions.ReplicationLockTimeout as err:
except exceptions.LockTimeout as err:
self.app.logger.debug(
'%s/%s/%s SSYNC LOCK TIMEOUT: %s' % (
self.request.remote_addr, self.device, self.partition,

View File

@@ -56,7 +56,8 @@ from swift.common.splice import splice
from swift.common.exceptions import DiskFileNotExist, DiskFileQuarantined, \
DiskFileDeviceUnavailable, DiskFileDeleted, DiskFileNotOpen, \
DiskFileError, ReplicationLockTimeout, DiskFileCollision, \
DiskFileExpired, SwiftException, DiskFileNoSpace, DiskFileXattrNotSupported
DiskFileExpired, SwiftException, DiskFileNoSpace, \
DiskFileXattrNotSupported, PartitionLockTimeout
from swift.common.storage_policy import (
POLICIES, get_policy_string, StoragePolicy, ECStoragePolicy, REPL_POLICY,
EC_POLICY, PolicyError)
@@ -1205,12 +1206,60 @@ class DiskFileManagerMixin(BaseDiskFileTestMixin):
success = False
with self.df_mgr.replication_lock(self.existing_device,
POLICIES.legacy, '1'):
with self.assertRaises(ReplicationLockTimeout):
with self.assertRaises(PartitionLockTimeout):
with self.df_mgr.replication_lock(self.existing_device,
POLICIES.legacy, '1'):
success = True
self.assertFalse(success)
def test_partition_lock_same_partition(self):
# Double check settings
self.df_mgr.replication_lock_timeout = 0.1
success = False
with self.df_mgr.partition_lock(self.existing_device,
POLICIES.legacy, '1', name='foo'):
with self.assertRaises(PartitionLockTimeout):
with self.df_mgr.partition_lock(self.existing_device,
POLICIES.legacy, '1',
name='foo'):
success = True
self.assertFalse(success)
def test_partition_lock_same_partition_different_name(self):
# Double check settings
self.df_mgr.replication_lock_timeout = 0.1
success = False
with self.df_mgr.partition_lock(self.existing_device,
POLICIES.legacy, '1', name='foo'):
with self.df_mgr.partition_lock(self.existing_device,
POLICIES.legacy, '1',
name='bar'):
success = True
self.assertTrue(success)
def test_partition_lock_and_replication_lock_same_partition(self):
# Double check settings
self.df_mgr.replication_lock_timeout = 0.1
success = False
with self.df_mgr.partition_lock(self.existing_device,
POLICIES.legacy, '1',
name='replication'):
with self.assertRaises(PartitionLockTimeout):
with self.df_mgr.replication_lock(self.existing_device,
POLICIES.legacy, '1'):
success = True
self.assertFalse(success)
success = False
with self.df_mgr.replication_lock(self.existing_device,
POLICIES.legacy, '1'):
with self.assertRaises(PartitionLockTimeout):
with self.df_mgr.partition_lock(self.existing_device,
POLICIES.legacy, '1',
name='replication'):
success = True
self.assertFalse(success)
def test_missing_splice_warning(self):
with mock.patch('swift.common.splice.splice._c_splice', None):
self.conf['splice'] = 'yes'

View File

@@ -4326,6 +4326,7 @@ class TestObjectReconstructor(BaseTestObjectReconstructor):
part_path = os.path.join(self.devices, self.local_dev['device'],
diskfile.get_data_dir(self.policy),
str(partition))
os.makedirs(part_path)
job = {
'job_type': object_reconstructor.REVERT,
'frag_index': frag_index,
@@ -4336,6 +4337,7 @@ class TestObjectReconstructor(BaseTestObjectReconstructor):
'hashes': stub_hashes,
'policy': self.policy,
'local_dev': self.local_dev,
'device': self.local_dev['device'],
}
ssync_calls = []
@@ -4375,6 +4377,7 @@ class TestObjectReconstructor(BaseTestObjectReconstructor):
part_path = os.path.join(self.devices, self.local_dev['device'],
diskfile.get_data_dir(self.policy),
str(partition))
os.makedirs(part_path)
job = {
'job_type': object_reconstructor.REVERT,
'frag_index': frag_index,
@@ -4385,6 +4388,7 @@ class TestObjectReconstructor(BaseTestObjectReconstructor):
'hashes': stub_hashes,
'policy': self.policy,
'local_dev': self.local_dev,
'device': self.local_dev['device'],
}
non_local = {'called': 0}
@@ -4429,6 +4433,7 @@ class TestObjectReconstructor(BaseTestObjectReconstructor):
part_path = os.path.join(self.devices, self.local_dev['device'],
diskfile.get_data_dir(self.policy),
str(partition))
os.makedirs(part_path)
job = {
'job_type': object_reconstructor.REVERT,
'frag_index': frag_index,
@@ -4439,6 +4444,7 @@ class TestObjectReconstructor(BaseTestObjectReconstructor):
'hashes': stub_hashes,
'policy': self.policy,
'local_dev': handoff_nodes[-1],
'device': self.local_dev['device'],
}
def ssync_response_callback(*args):
@@ -4507,6 +4513,7 @@ class TestObjectReconstructor(BaseTestObjectReconstructor):
'hashes': {},
'policy': self.policy,
'local_dev': self.local_dev,
'device': self.local_dev['device'],
}
def ssync_response_callback(*args):
@@ -4559,6 +4566,7 @@ class TestObjectReconstructor(BaseTestObjectReconstructor):
'hashes': {},
'policy': self.policy,
'local_dev': self.local_dev,
'device': self.local_dev['device'],
}
def ssync_response_callback(*args):

View File

@@ -491,7 +491,7 @@ class TestReceiver(unittest.TestCase):
self.assertEqual(resp.status_int, 200)
self.assertEqual([mock.call(os.path.join(
self.controller._diskfile_router[POLICIES.legacy].devices,
'device'))], mocks['ismount'].call_args_list)
'device'))] * 2, mocks['ismount'].call_args_list)
def test_SSYNC_Exception(self):