reconciler: PPI aware reconciler
This patch makes the reconciler PPI aware. It does this by adding a helper method `can_reconcile_policy` that is used to check that the policies used for the source and destination aren't in the middle of a PPI (their ring doesn't have next_part_power set). In order to accomplish this the reconciler has had to include the POLICIES singleton and grown swift_dir and ring_check_interval config options. Closes-Bug: #1934314 Change-Id: I78a94dd1be90913a7a75d90850ec5ef4a85be4db
This commit is contained in:
@@ -1,6 +1,7 @@
|
|||||||
[DEFAULT]
|
[DEFAULT]
|
||||||
# swift_dir = /etc/swift
|
# swift_dir = /etc/swift
|
||||||
# user = swift
|
# user = swift
|
||||||
|
# ring_check_interval = 15.0
|
||||||
# You can specify default log routing here if you want:
|
# You can specify default log routing here if you want:
|
||||||
# log_name = swift
|
# log_name = swift
|
||||||
# log_facility = LOG_LOCAL0
|
# log_facility = LOG_LOCAL0
|
||||||
|
|||||||
@@ -37,6 +37,9 @@ from swift.common.utils import hash_path, validate_configuration, md5
|
|||||||
from swift.common.ring.utils import tiers_for_dev
|
from swift.common.ring.utils import tiers_for_dev
|
||||||
|
|
||||||
|
|
||||||
|
DEFAULT_RELOAD_TIME = 15
|
||||||
|
|
||||||
|
|
||||||
def calc_replica_count(replica2part2dev_id):
|
def calc_replica_count(replica2part2dev_id):
|
||||||
if not replica2part2dev_id:
|
if not replica2part2dev_id:
|
||||||
return 0
|
return 0
|
||||||
@@ -272,7 +275,7 @@ class Ring(object):
|
|||||||
:raises RingLoadError: if the loaded ring data violates its constraint
|
:raises RingLoadError: if the loaded ring data violates its constraint
|
||||||
"""
|
"""
|
||||||
|
|
||||||
def __init__(self, serialized_path, reload_time=15, ring_name=None,
|
def __init__(self, serialized_path, reload_time=None, ring_name=None,
|
||||||
validation_hook=lambda ring_data: None):
|
validation_hook=lambda ring_data: None):
|
||||||
# can't use the ring unless HASH_PATH_SUFFIX is set
|
# can't use the ring unless HASH_PATH_SUFFIX is set
|
||||||
validate_configuration()
|
validate_configuration()
|
||||||
@@ -281,7 +284,8 @@ class Ring(object):
|
|||||||
ring_name + '.ring.gz')
|
ring_name + '.ring.gz')
|
||||||
else:
|
else:
|
||||||
self.serialized_path = os.path.join(serialized_path)
|
self.serialized_path = os.path.join(serialized_path)
|
||||||
self.reload_time = reload_time
|
self.reload_time = (DEFAULT_RELOAD_TIME if reload_time is None
|
||||||
|
else reload_time)
|
||||||
self._validation_hook = validation_hook
|
self._validation_hook = validation_hook
|
||||||
self._reload(force=True)
|
self._reload(force=True)
|
||||||
|
|
||||||
@@ -362,6 +366,8 @@ class Ring(object):
|
|||||||
|
|
||||||
@property
|
@property
|
||||||
def next_part_power(self):
|
def next_part_power(self):
|
||||||
|
if time() > self._rtime:
|
||||||
|
self._reload()
|
||||||
return self._next_part_power
|
return self._next_part_power
|
||||||
|
|
||||||
@property
|
@property
|
||||||
|
|||||||
@@ -366,15 +366,26 @@ class BaseStoragePolicy(object):
|
|||||||
self._validate_policy_name(name)
|
self._validate_policy_name(name)
|
||||||
self.alias_list.insert(0, name)
|
self.alias_list.insert(0, name)
|
||||||
|
|
||||||
def load_ring(self, swift_dir):
|
def validate_ring_data(self, ring_data):
|
||||||
|
"""
|
||||||
|
Validation hook used when loading the ring; currently only used for EC
|
||||||
|
"""
|
||||||
|
|
||||||
|
def load_ring(self, swift_dir, reload_time=None):
|
||||||
"""
|
"""
|
||||||
Load the ring for this policy immediately.
|
Load the ring for this policy immediately.
|
||||||
|
|
||||||
:param swift_dir: path to rings
|
:param swift_dir: path to rings
|
||||||
|
:param reload_time: time interval in seconds to check for a ring change
|
||||||
"""
|
"""
|
||||||
if self.object_ring:
|
if self.object_ring:
|
||||||
|
if reload_time is not None:
|
||||||
|
self.object_ring.reload_time = reload_time
|
||||||
return
|
return
|
||||||
self.object_ring = Ring(swift_dir, ring_name=self.ring_name)
|
|
||||||
|
self.object_ring = Ring(
|
||||||
|
swift_dir, ring_name=self.ring_name,
|
||||||
|
validation_hook=self.validate_ring_data, reload_time=reload_time)
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def quorum(self):
|
def quorum(self):
|
||||||
@@ -643,38 +654,25 @@ class ECStoragePolicy(BaseStoragePolicy):
|
|||||||
"""
|
"""
|
||||||
return self._ec_quorum_size * self.ec_duplication_factor
|
return self._ec_quorum_size * self.ec_duplication_factor
|
||||||
|
|
||||||
def load_ring(self, swift_dir):
|
def validate_ring_data(self, ring_data):
|
||||||
"""
|
"""
|
||||||
Load the ring for this policy immediately.
|
EC specific validation
|
||||||
|
|
||||||
:param swift_dir: path to rings
|
Replica count check - we need _at_least_ (#data + #parity) replicas
|
||||||
|
configured. Also if the replica count is larger than exactly that
|
||||||
|
number there's a non-zero risk of error for code that is
|
||||||
|
considering the number of nodes in the primary list from the ring.
|
||||||
"""
|
"""
|
||||||
if self.object_ring:
|
|
||||||
return
|
|
||||||
|
|
||||||
def validate_ring_data(ring_data):
|
configured_fragment_count = ring_data.replica_count
|
||||||
"""
|
required_fragment_count = \
|
||||||
EC specific validation
|
(self.ec_n_unique_fragments) * self.ec_duplication_factor
|
||||||
|
if configured_fragment_count != required_fragment_count:
|
||||||
Replica count check - we need _at_least_ (#data + #parity) replicas
|
raise RingLoadError(
|
||||||
configured. Also if the replica count is larger than exactly that
|
'EC ring for policy %s needs to be configured with '
|
||||||
number there's a non-zero risk of error for code that is
|
'exactly %d replicas. Got %s.' % (
|
||||||
considering the number of nodes in the primary list from the ring.
|
self.name, required_fragment_count,
|
||||||
"""
|
configured_fragment_count))
|
||||||
|
|
||||||
configured_fragment_count = ring_data.replica_count
|
|
||||||
required_fragment_count = \
|
|
||||||
(self.ec_n_unique_fragments) * self.ec_duplication_factor
|
|
||||||
if configured_fragment_count != required_fragment_count:
|
|
||||||
raise RingLoadError(
|
|
||||||
'EC ring for policy %s needs to be configured with '
|
|
||||||
'exactly %d replicas. Got %s.' % (
|
|
||||||
self.name, required_fragment_count,
|
|
||||||
configured_fragment_count))
|
|
||||||
|
|
||||||
self.object_ring = Ring(
|
|
||||||
swift_dir, ring_name=self.ring_name,
|
|
||||||
validation_hook=validate_ring_data)
|
|
||||||
|
|
||||||
def get_backend_index(self, node_index):
|
def get_backend_index(self, node_index):
|
||||||
"""
|
"""
|
||||||
|
|||||||
@@ -32,6 +32,7 @@ from swift.common.request_helpers import MISPLACED_OBJECTS_ACCOUNT, \
|
|||||||
from swift.common.utils import get_logger, split_path, majority_size, \
|
from swift.common.utils import get_logger, split_path, majority_size, \
|
||||||
FileLikeIter, Timestamp, last_modified_date_to_timestamp, \
|
FileLikeIter, Timestamp, last_modified_date_to_timestamp, \
|
||||||
LRUCache, decode_timestamps
|
LRUCache, decode_timestamps
|
||||||
|
from swift.common.storage_policy import POLICIES
|
||||||
|
|
||||||
MISPLACED_OBJECTS_CONTAINER_DIVISOR = 3600 # 1 hour
|
MISPLACED_OBJECTS_CONTAINER_DIVISOR = 3600 # 1 hour
|
||||||
CONTAINER_POLICY_TTL = 30
|
CONTAINER_POLICY_TTL = 30
|
||||||
@@ -372,8 +373,10 @@ class ContainerReconciler(Daemon):
|
|||||||
'Swift Container Reconciler',
|
'Swift Container Reconciler',
|
||||||
request_tries,
|
request_tries,
|
||||||
use_replication_network=True)
|
use_replication_network=True)
|
||||||
|
self.swift_dir = conf.get('swift_dir', '/etc/swift')
|
||||||
self.stats = defaultdict(int)
|
self.stats = defaultdict(int)
|
||||||
self.last_stat_time = time.time()
|
self.last_stat_time = time.time()
|
||||||
|
self.ring_check_interval = float(conf.get('ring_check_interval', 15))
|
||||||
|
|
||||||
def stats_log(self, metric, msg, *args, **kwargs):
|
def stats_log(self, metric, msg, *args, **kwargs):
|
||||||
"""
|
"""
|
||||||
@@ -417,6 +420,13 @@ class ContainerReconciler(Daemon):
|
|||||||
self.swift.container_ring, MISPLACED_OBJECTS_ACCOUNT,
|
self.swift.container_ring, MISPLACED_OBJECTS_ACCOUNT,
|
||||||
container, obj, headers=headers)
|
container, obj, headers=headers)
|
||||||
|
|
||||||
|
def can_reconcile_policy(self, policy_index):
|
||||||
|
pol = POLICIES.get_by_index(policy_index)
|
||||||
|
if pol:
|
||||||
|
pol.load_ring(self.swift_dir, reload_time=self.ring_check_interval)
|
||||||
|
return pol.object_ring.next_part_power is None
|
||||||
|
return False
|
||||||
|
|
||||||
def throw_tombstones(self, account, container, obj, timestamp,
|
def throw_tombstones(self, account, container, obj, timestamp,
|
||||||
policy_index, path):
|
policy_index, path):
|
||||||
"""
|
"""
|
||||||
@@ -483,6 +493,18 @@ class ContainerReconciler(Daemon):
|
|||||||
container_policy_index, q_policy_index)
|
container_policy_index, q_policy_index)
|
||||||
return True
|
return True
|
||||||
|
|
||||||
|
# don't reconcile if the source or container policy_index is in the
|
||||||
|
# middle of a PPI
|
||||||
|
if not self.can_reconcile_policy(q_policy_index):
|
||||||
|
self.stats_log('ppi_skip', 'Source policy (%r) in the middle of '
|
||||||
|
'a part power increase (PPI)', q_policy_index)
|
||||||
|
return False
|
||||||
|
if not self.can_reconcile_policy(container_policy_index):
|
||||||
|
self.stats_log('ppi_skip', 'Container policy (%r) in the middle '
|
||||||
|
'of a part power increase (PPI)',
|
||||||
|
container_policy_index)
|
||||||
|
return False
|
||||||
|
|
||||||
# check if object exists in the destination already
|
# check if object exists in the destination already
|
||||||
self.logger.debug('checking for %r (%f) in destination '
|
self.logger.debug('checking for %r (%f) in destination '
|
||||||
'policy_index %s', path, q_ts,
|
'policy_index %s', path, q_ts,
|
||||||
|
|||||||
@@ -215,7 +215,8 @@ class PatchPolicies(object):
|
|||||||
class FakeRing(Ring):
|
class FakeRing(Ring):
|
||||||
|
|
||||||
def __init__(self, replicas=3, max_more_nodes=0, part_power=0,
|
def __init__(self, replicas=3, max_more_nodes=0, part_power=0,
|
||||||
base_port=1000, separate_replication=False):
|
base_port=1000, separate_replication=False,
|
||||||
|
next_part_power=None, reload_time=15):
|
||||||
self.serialized_path = '/foo/bar/object.ring.gz'
|
self.serialized_path = '/foo/bar/object.ring.gz'
|
||||||
self._base_port = base_port
|
self._base_port = base_port
|
||||||
self.max_more_nodes = max_more_nodes
|
self.max_more_nodes = max_more_nodes
|
||||||
@@ -224,8 +225,9 @@ class FakeRing(Ring):
|
|||||||
self.separate_replication = separate_replication
|
self.separate_replication = separate_replication
|
||||||
# 9 total nodes (6 more past the initial 3) is the cap, no matter if
|
# 9 total nodes (6 more past the initial 3) is the cap, no matter if
|
||||||
# this is set higher, or R^2 for R replicas
|
# this is set higher, or R^2 for R replicas
|
||||||
|
self.reload_time = reload_time
|
||||||
self.set_replicas(replicas)
|
self.set_replicas(replicas)
|
||||||
self._next_part_power = None
|
self._next_part_power = next_part_power
|
||||||
self._reload()
|
self._reload()
|
||||||
|
|
||||||
def has_changed(self):
|
def has_changed(self):
|
||||||
|
|||||||
@@ -1127,7 +1127,8 @@ class TestStoragePolicies(unittest.TestCase):
|
|||||||
|
|
||||||
class NamedFakeRing(FakeRing):
|
class NamedFakeRing(FakeRing):
|
||||||
|
|
||||||
def __init__(self, swift_dir, ring_name=None):
|
def __init__(self, swift_dir, reload_time=15, ring_name=None,
|
||||||
|
validation_hook=None):
|
||||||
self.ring_name = ring_name
|
self.ring_name = ring_name
|
||||||
super(NamedFakeRing, self).__init__()
|
super(NamedFakeRing, self).__init__()
|
||||||
|
|
||||||
|
|||||||
@@ -28,6 +28,8 @@ from collections import defaultdict
|
|||||||
from datetime import datetime
|
from datetime import datetime
|
||||||
import six
|
import six
|
||||||
from six.moves import urllib
|
from six.moves import urllib
|
||||||
|
from swift.common.storage_policy import StoragePolicy, ECStoragePolicy
|
||||||
|
|
||||||
from swift.container import reconciler
|
from swift.container import reconciler
|
||||||
from swift.container.server import gen_resp_headers
|
from swift.container.server import gen_resp_headers
|
||||||
from swift.common.direct_client import ClientException
|
from swift.common.direct_client import ClientException
|
||||||
@@ -36,7 +38,8 @@ from swift.common.header_key_dict import HeaderKeyDict
|
|||||||
from swift.common.utils import split_path, Timestamp, encode_timestamps
|
from swift.common.utils import split_path, Timestamp, encode_timestamps
|
||||||
|
|
||||||
from test.debug_logger import debug_logger
|
from test.debug_logger import debug_logger
|
||||||
from test.unit import FakeRing, fake_http_connect
|
from test.unit import FakeRing, fake_http_connect, patch_policies, \
|
||||||
|
DEFAULT_TEST_EC_TYPE
|
||||||
from test.unit.common.middleware import helpers
|
from test.unit.common.middleware import helpers
|
||||||
|
|
||||||
|
|
||||||
@@ -720,6 +723,11 @@ def listing_qs(marker):
|
|||||||
urllib.parse.quote(marker.encode('utf-8')))
|
urllib.parse.quote(marker.encode('utf-8')))
|
||||||
|
|
||||||
|
|
||||||
|
@patch_policies(
|
||||||
|
[StoragePolicy(0, 'zero', is_default=True),
|
||||||
|
ECStoragePolicy(1, 'one', ec_type=DEFAULT_TEST_EC_TYPE,
|
||||||
|
ec_ndata=6, ec_nparity=2), ],
|
||||||
|
fake_ring_args=[{}, {'replicas': 8}])
|
||||||
class TestReconciler(unittest.TestCase):
|
class TestReconciler(unittest.TestCase):
|
||||||
|
|
||||||
maxDiff = None
|
maxDiff = None
|
||||||
@@ -885,6 +893,46 @@ class TestReconciler(unittest.TestCase):
|
|||||||
self.assertFalse(deleted_container_entries)
|
self.assertFalse(deleted_container_entries)
|
||||||
self.assertEqual(self.reconciler.stats['retry'], 1)
|
self.assertEqual(self.reconciler.stats['retry'], 1)
|
||||||
|
|
||||||
|
@patch_policies(
|
||||||
|
[StoragePolicy(0, 'zero', is_default=True),
|
||||||
|
StoragePolicy(1, 'one'),
|
||||||
|
ECStoragePolicy(2, 'two', ec_type=DEFAULT_TEST_EC_TYPE,
|
||||||
|
ec_ndata=6, ec_nparity=2)],
|
||||||
|
fake_ring_args=[
|
||||||
|
{'next_part_power': 1}, {}, {'next_part_power': 1}])
|
||||||
|
def test_can_reconcile_policy(self):
|
||||||
|
for policy_index, expected in ((0, False), (1, True), (2, False),
|
||||||
|
(3, False), ('apple', False),
|
||||||
|
(None, False)):
|
||||||
|
self.assertEqual(
|
||||||
|
self.reconciler.can_reconcile_policy(policy_index), expected)
|
||||||
|
|
||||||
|
@patch_policies(
|
||||||
|
[StoragePolicy(0, 'zero', is_default=True),
|
||||||
|
ECStoragePolicy(1, 'one', ec_type=DEFAULT_TEST_EC_TYPE,
|
||||||
|
ec_ndata=6, ec_nparity=2), ],
|
||||||
|
fake_ring_args=[{'next_part_power': 1}, {}])
|
||||||
|
def test_fail_to_move_if_ppi(self):
|
||||||
|
self._mock_listing({
|
||||||
|
(None, "/.misplaced_objects/3600/1:/AUTH_bob/c/o1"): 3618.84187,
|
||||||
|
(1, "/AUTH_bob/c/o1"): 3618.84187,
|
||||||
|
})
|
||||||
|
self._mock_oldest_spi({'c': 0})
|
||||||
|
deleted_container_entries = self._run_once()
|
||||||
|
|
||||||
|
# skipped sending because policy_index 0 is in the middle of a PPI
|
||||||
|
self.assertFalse(deleted_container_entries)
|
||||||
|
self.assertEqual(
|
||||||
|
self.fake_swift.calls,
|
||||||
|
[('GET', self.current_container_path),
|
||||||
|
('GET', '/v1/.misplaced_objects' + listing_qs('')),
|
||||||
|
('GET', '/v1/.misplaced_objects' + listing_qs('3600')),
|
||||||
|
('GET', '/v1/.misplaced_objects/3600' + listing_qs('')),
|
||||||
|
('GET', '/v1/.misplaced_objects/3600' +
|
||||||
|
listing_qs('1:/AUTH_bob/c/o1'))])
|
||||||
|
self.assertEqual(self.reconciler.stats['ppi_skip'], 1)
|
||||||
|
self.assertEqual(self.reconciler.stats['retry'], 1)
|
||||||
|
|
||||||
def test_object_move(self):
|
def test_object_move(self):
|
||||||
self._mock_listing({
|
self._mock_listing({
|
||||||
(None, "/.misplaced_objects/3600/1:/AUTH_bob/c/o1"): 3618.84187,
|
(None, "/.misplaced_objects/3600/1:/AUTH_bob/c/o1"): 3618.84187,
|
||||||
|
|||||||
Reference in New Issue
Block a user