Merge "reconciler: PPI aware reconciler"
This commit is contained in:
commit
a5fc6a8211
@ -1,6 +1,7 @@
|
||||
[DEFAULT]
|
||||
# swift_dir = /etc/swift
|
||||
# user = swift
|
||||
# ring_check_interval = 15.0
|
||||
# You can specify default log routing here if you want:
|
||||
# log_name = swift
|
||||
# log_facility = LOG_LOCAL0
|
||||
|
@ -37,6 +37,9 @@ from swift.common.utils import hash_path, validate_configuration, md5
|
||||
from swift.common.ring.utils import tiers_for_dev
|
||||
|
||||
|
||||
DEFAULT_RELOAD_TIME = 15
|
||||
|
||||
|
||||
def calc_replica_count(replica2part2dev_id):
|
||||
if not replica2part2dev_id:
|
||||
return 0
|
||||
@ -272,7 +275,7 @@ class Ring(object):
|
||||
:raises RingLoadError: if the loaded ring data violates its constraint
|
||||
"""
|
||||
|
||||
def __init__(self, serialized_path, reload_time=15, ring_name=None,
|
||||
def __init__(self, serialized_path, reload_time=None, ring_name=None,
|
||||
validation_hook=lambda ring_data: None):
|
||||
# can't use the ring unless HASH_PATH_SUFFIX is set
|
||||
validate_configuration()
|
||||
@ -281,7 +284,8 @@ class Ring(object):
|
||||
ring_name + '.ring.gz')
|
||||
else:
|
||||
self.serialized_path = os.path.join(serialized_path)
|
||||
self.reload_time = reload_time
|
||||
self.reload_time = (DEFAULT_RELOAD_TIME if reload_time is None
|
||||
else reload_time)
|
||||
self._validation_hook = validation_hook
|
||||
self._reload(force=True)
|
||||
|
||||
@ -362,6 +366,8 @@ class Ring(object):
|
||||
|
||||
@property
|
||||
def next_part_power(self):
|
||||
if time() > self._rtime:
|
||||
self._reload()
|
||||
return self._next_part_power
|
||||
|
||||
@property
|
||||
|
@ -366,15 +366,26 @@ class BaseStoragePolicy(object):
|
||||
self._validate_policy_name(name)
|
||||
self.alias_list.insert(0, name)
|
||||
|
||||
def load_ring(self, swift_dir):
|
||||
def validate_ring_data(self, ring_data):
|
||||
"""
|
||||
Validation hook used when loading the ring; currently only used for EC
|
||||
"""
|
||||
|
||||
def load_ring(self, swift_dir, reload_time=None):
|
||||
"""
|
||||
Load the ring for this policy immediately.
|
||||
|
||||
:param swift_dir: path to rings
|
||||
:param reload_time: time interval in seconds to check for a ring change
|
||||
"""
|
||||
if self.object_ring:
|
||||
if reload_time is not None:
|
||||
self.object_ring.reload_time = reload_time
|
||||
return
|
||||
self.object_ring = Ring(swift_dir, ring_name=self.ring_name)
|
||||
|
||||
self.object_ring = Ring(
|
||||
swift_dir, ring_name=self.ring_name,
|
||||
validation_hook=self.validate_ring_data, reload_time=reload_time)
|
||||
|
||||
@property
|
||||
def quorum(self):
|
||||
@ -643,38 +654,25 @@ class ECStoragePolicy(BaseStoragePolicy):
|
||||
"""
|
||||
return self._ec_quorum_size * self.ec_duplication_factor
|
||||
|
||||
def load_ring(self, swift_dir):
|
||||
def validate_ring_data(self, ring_data):
|
||||
"""
|
||||
Load the ring for this policy immediately.
|
||||
EC specific validation
|
||||
|
||||
:param swift_dir: path to rings
|
||||
Replica count check - we need _at_least_ (#data + #parity) replicas
|
||||
configured. Also if the replica count is larger than exactly that
|
||||
number there's a non-zero risk of error for code that is
|
||||
considering the number of nodes in the primary list from the ring.
|
||||
"""
|
||||
if self.object_ring:
|
||||
return
|
||||
|
||||
def validate_ring_data(ring_data):
|
||||
"""
|
||||
EC specific validation
|
||||
|
||||
Replica count check - we need _at_least_ (#data + #parity) replicas
|
||||
configured. Also if the replica count is larger than exactly that
|
||||
number there's a non-zero risk of error for code that is
|
||||
considering the number of nodes in the primary list from the ring.
|
||||
"""
|
||||
|
||||
configured_fragment_count = ring_data.replica_count
|
||||
required_fragment_count = \
|
||||
(self.ec_n_unique_fragments) * self.ec_duplication_factor
|
||||
if configured_fragment_count != required_fragment_count:
|
||||
raise RingLoadError(
|
||||
'EC ring for policy %s needs to be configured with '
|
||||
'exactly %d replicas. Got %s.' % (
|
||||
self.name, required_fragment_count,
|
||||
configured_fragment_count))
|
||||
|
||||
self.object_ring = Ring(
|
||||
swift_dir, ring_name=self.ring_name,
|
||||
validation_hook=validate_ring_data)
|
||||
configured_fragment_count = ring_data.replica_count
|
||||
required_fragment_count = \
|
||||
(self.ec_n_unique_fragments) * self.ec_duplication_factor
|
||||
if configured_fragment_count != required_fragment_count:
|
||||
raise RingLoadError(
|
||||
'EC ring for policy %s needs to be configured with '
|
||||
'exactly %d replicas. Got %s.' % (
|
||||
self.name, required_fragment_count,
|
||||
configured_fragment_count))
|
||||
|
||||
def get_backend_index(self, node_index):
|
||||
"""
|
||||
|
@ -32,6 +32,7 @@ from swift.common.request_helpers import MISPLACED_OBJECTS_ACCOUNT, \
|
||||
from swift.common.utils import get_logger, split_path, majority_size, \
|
||||
FileLikeIter, Timestamp, last_modified_date_to_timestamp, \
|
||||
LRUCache, decode_timestamps
|
||||
from swift.common.storage_policy import POLICIES
|
||||
|
||||
MISPLACED_OBJECTS_CONTAINER_DIVISOR = 3600 # 1 hour
|
||||
CONTAINER_POLICY_TTL = 30
|
||||
@ -372,8 +373,10 @@ class ContainerReconciler(Daemon):
|
||||
'Swift Container Reconciler',
|
||||
request_tries,
|
||||
use_replication_network=True)
|
||||
self.swift_dir = conf.get('swift_dir', '/etc/swift')
|
||||
self.stats = defaultdict(int)
|
||||
self.last_stat_time = time.time()
|
||||
self.ring_check_interval = float(conf.get('ring_check_interval', 15))
|
||||
|
||||
def stats_log(self, metric, msg, *args, **kwargs):
|
||||
"""
|
||||
@ -417,6 +420,13 @@ class ContainerReconciler(Daemon):
|
||||
self.swift.container_ring, MISPLACED_OBJECTS_ACCOUNT,
|
||||
container, obj, headers=headers)
|
||||
|
||||
def can_reconcile_policy(self, policy_index):
|
||||
pol = POLICIES.get_by_index(policy_index)
|
||||
if pol:
|
||||
pol.load_ring(self.swift_dir, reload_time=self.ring_check_interval)
|
||||
return pol.object_ring.next_part_power is None
|
||||
return False
|
||||
|
||||
def throw_tombstones(self, account, container, obj, timestamp,
|
||||
policy_index, path):
|
||||
"""
|
||||
@ -483,6 +493,18 @@ class ContainerReconciler(Daemon):
|
||||
container_policy_index, q_policy_index)
|
||||
return True
|
||||
|
||||
# don't reconcile if the source or container policy_index is in the
|
||||
# middle of a PPI
|
||||
if not self.can_reconcile_policy(q_policy_index):
|
||||
self.stats_log('ppi_skip', 'Source policy (%r) in the middle of '
|
||||
'a part power increase (PPI)', q_policy_index)
|
||||
return False
|
||||
if not self.can_reconcile_policy(container_policy_index):
|
||||
self.stats_log('ppi_skip', 'Container policy (%r) in the middle '
|
||||
'of a part power increase (PPI)',
|
||||
container_policy_index)
|
||||
return False
|
||||
|
||||
# check if object exists in the destination already
|
||||
self.logger.debug('checking for %r (%f) in destination '
|
||||
'policy_index %s', path, q_ts,
|
||||
|
@ -215,7 +215,8 @@ class PatchPolicies(object):
|
||||
class FakeRing(Ring):
|
||||
|
||||
def __init__(self, replicas=3, max_more_nodes=0, part_power=0,
|
||||
base_port=1000, separate_replication=False):
|
||||
base_port=1000, separate_replication=False,
|
||||
next_part_power=None, reload_time=15):
|
||||
self.serialized_path = '/foo/bar/object.ring.gz'
|
||||
self._base_port = base_port
|
||||
self.max_more_nodes = max_more_nodes
|
||||
@ -224,8 +225,9 @@ class FakeRing(Ring):
|
||||
self.separate_replication = separate_replication
|
||||
# 9 total nodes (6 more past the initial 3) is the cap, no matter if
|
||||
# this is set higher, or R^2 for R replicas
|
||||
self.reload_time = reload_time
|
||||
self.set_replicas(replicas)
|
||||
self._next_part_power = None
|
||||
self._next_part_power = next_part_power
|
||||
self._reload()
|
||||
|
||||
def has_changed(self):
|
||||
|
@ -1127,7 +1127,8 @@ class TestStoragePolicies(unittest.TestCase):
|
||||
|
||||
class NamedFakeRing(FakeRing):
|
||||
|
||||
def __init__(self, swift_dir, ring_name=None):
|
||||
def __init__(self, swift_dir, reload_time=15, ring_name=None,
|
||||
validation_hook=None):
|
||||
self.ring_name = ring_name
|
||||
super(NamedFakeRing, self).__init__()
|
||||
|
||||
|
@ -28,6 +28,8 @@ from collections import defaultdict
|
||||
from datetime import datetime
|
||||
import six
|
||||
from six.moves import urllib
|
||||
from swift.common.storage_policy import StoragePolicy, ECStoragePolicy
|
||||
|
||||
from swift.container import reconciler
|
||||
from swift.container.server import gen_resp_headers
|
||||
from swift.common.direct_client import ClientException
|
||||
@ -36,7 +38,8 @@ from swift.common.header_key_dict import HeaderKeyDict
|
||||
from swift.common.utils import split_path, Timestamp, encode_timestamps
|
||||
|
||||
from test.debug_logger import debug_logger
|
||||
from test.unit import FakeRing, fake_http_connect
|
||||
from test.unit import FakeRing, fake_http_connect, patch_policies, \
|
||||
DEFAULT_TEST_EC_TYPE
|
||||
from test.unit.common.middleware import helpers
|
||||
|
||||
|
||||
@ -720,6 +723,11 @@ def listing_qs(marker):
|
||||
urllib.parse.quote(marker.encode('utf-8')))
|
||||
|
||||
|
||||
@patch_policies(
|
||||
[StoragePolicy(0, 'zero', is_default=True),
|
||||
ECStoragePolicy(1, 'one', ec_type=DEFAULT_TEST_EC_TYPE,
|
||||
ec_ndata=6, ec_nparity=2), ],
|
||||
fake_ring_args=[{}, {'replicas': 8}])
|
||||
class TestReconciler(unittest.TestCase):
|
||||
|
||||
maxDiff = None
|
||||
@ -885,6 +893,46 @@ class TestReconciler(unittest.TestCase):
|
||||
self.assertFalse(deleted_container_entries)
|
||||
self.assertEqual(self.reconciler.stats['retry'], 1)
|
||||
|
||||
@patch_policies(
|
||||
[StoragePolicy(0, 'zero', is_default=True),
|
||||
StoragePolicy(1, 'one'),
|
||||
ECStoragePolicy(2, 'two', ec_type=DEFAULT_TEST_EC_TYPE,
|
||||
ec_ndata=6, ec_nparity=2)],
|
||||
fake_ring_args=[
|
||||
{'next_part_power': 1}, {}, {'next_part_power': 1}])
|
||||
def test_can_reconcile_policy(self):
|
||||
for policy_index, expected in ((0, False), (1, True), (2, False),
|
||||
(3, False), ('apple', False),
|
||||
(None, False)):
|
||||
self.assertEqual(
|
||||
self.reconciler.can_reconcile_policy(policy_index), expected)
|
||||
|
||||
@patch_policies(
|
||||
[StoragePolicy(0, 'zero', is_default=True),
|
||||
ECStoragePolicy(1, 'one', ec_type=DEFAULT_TEST_EC_TYPE,
|
||||
ec_ndata=6, ec_nparity=2), ],
|
||||
fake_ring_args=[{'next_part_power': 1}, {}])
|
||||
def test_fail_to_move_if_ppi(self):
|
||||
self._mock_listing({
|
||||
(None, "/.misplaced_objects/3600/1:/AUTH_bob/c/o1"): 3618.84187,
|
||||
(1, "/AUTH_bob/c/o1"): 3618.84187,
|
||||
})
|
||||
self._mock_oldest_spi({'c': 0})
|
||||
deleted_container_entries = self._run_once()
|
||||
|
||||
# skipped sending because policy_index 0 is in the middle of a PPI
|
||||
self.assertFalse(deleted_container_entries)
|
||||
self.assertEqual(
|
||||
self.fake_swift.calls,
|
||||
[('GET', self.current_container_path),
|
||||
('GET', '/v1/.misplaced_objects' + listing_qs('')),
|
||||
('GET', '/v1/.misplaced_objects' + listing_qs('3600')),
|
||||
('GET', '/v1/.misplaced_objects/3600' + listing_qs('')),
|
||||
('GET', '/v1/.misplaced_objects/3600' +
|
||||
listing_qs('1:/AUTH_bob/c/o1'))])
|
||||
self.assertEqual(self.reconciler.stats['ppi_skip'], 1)
|
||||
self.assertEqual(self.reconciler.stats['retry'], 1)
|
||||
|
||||
def test_object_move(self):
|
||||
self._mock_listing({
|
||||
(None, "/.misplaced_objects/3600/1:/AUTH_bob/c/o1"): 3618.84187,
|
||||
|
Loading…
Reference in New Issue
Block a user