Rebuild frags for unmounted disks

Change the behavior of the EC reconstructor to perform a fragment
rebuild to a handoff node when a primary peer responds with 507 to the
REPLICATE request.

Each primary node in a EC ring will sync with exactly three primary
peers, in addition to the left & right nodes we now select a third node
from the far side of the ring.  If any of these partners respond
unmounted the reconstructor will rebuild it's fragments to a handoff
node with the appropriate index.

To prevent ssync (which is uninterruptible) receiving a 409 (Conflict)
we must give the remote handoff node the correct backend_index for the
fragments it will recieve.  In the common case we will use
determistically different handoffs for each fragment index to prevent
multiple unmounted primary disks from forcing a single handoff node to
hold more than one rebuilt fragment.

Handoff nodes will continue to attempt to revert rebuilt handoff
fragments to the appropriate primary until it is remounted or
rebalanced.  After a rebalance of EC rings (potentially removing
unmounted/failed devices), it's most IO efficient to run in
handoffs_only mode to avoid unnecessary rebuilds.

Closes-Bug: #1510342

Change-Id: Ief44ed39d97f65e4270bf73051da9a2dd0ddbaec
This commit is contained in:
Clay Gerrard 2019-02-04 15:46:40 -06:00 committed by Tim Burke
parent 6c6bb80e40
commit ea8e545a27
7 changed files with 721 additions and 360 deletions

View File

@ -352,6 +352,15 @@ use = egg:swift#recon
# honored as a synonym, but may be ignored in a future release.
# handoffs_only = False
#
# The default strategy for unmounted drives will stage rebuilt data on a
# handoff node until updated rings are deployed. Because fragments are rebuilt
# on offset handoffs based on fragment index and the proxy limits how deep it
# will search for EC frags we restrict how many nodes we'll try. Setting to 0
# will disable rebuilds to handoffs and only rebuild fragments for unmounted
# devices to mounted primaries after a ring change.
# Setting to -1 means "no limit".
# rebuild_handoff_node_count = 2
#
# You can set scheduling priority of processes. Niceness values range from -20
# (most favorable to the process) to 19 (least favorable to the process).
# nice_priority =

View File

@ -24,7 +24,7 @@ from time import time
import os
from io import BufferedReader
from hashlib import md5
from itertools import chain
from itertools import chain, count
from tempfile import NamedTemporaryFile
import sys
@ -237,35 +237,36 @@ class Ring(object):
self._replica2part2dev_id = ring_data._replica2part2dev_id
self._part_shift = ring_data._part_shift
self._rebuild_tier_data()
# Do this now, when we know the data has changed, rather than
# doing it on every call to get_more_nodes().
#
# Since this is to speed up the finding of handoffs, we only
# consider devices with at least one partition assigned. This
# way, a region, zone, or server with no partitions assigned
# does not count toward our totals, thereby keeping the early
# bailouts in get_more_nodes() working.
dev_ids_with_parts = set()
for part2dev_id in self._replica2part2dev_id:
for dev_id in part2dev_id:
dev_ids_with_parts.add(dev_id)
regions = set()
zones = set()
ips = set()
self._num_devs = 0
for dev in self._devs:
if dev and dev['id'] in dev_ids_with_parts:
regions.add(dev['region'])
zones.add((dev['region'], dev['zone']))
ips.add((dev['region'], dev['zone'], dev['ip']))
self._num_devs += 1
self._num_regions = len(regions)
self._num_zones = len(zones)
self._num_ips = len(ips)
self._update_bookkeeping()
self._next_part_power = ring_data.next_part_power
def _update_bookkeeping(self):
# Do this now, when we know the data has changed, rather than
# doing it on every call to get_more_nodes().
#
# Since this is to speed up the finding of handoffs, we only
# consider devices with at least one partition assigned. This
# way, a region, zone, or server with no partitions assigned
# does not count toward our totals, thereby keeping the early
# bailouts in get_more_nodes() working.
dev_ids_with_parts = set()
for part2dev_id in self._replica2part2dev_id:
for dev_id in part2dev_id:
dev_ids_with_parts.add(dev_id)
regions = set()
zones = set()
ips = set()
self._num_devs = 0
for dev in self._devs:
if dev and dev['id'] in dev_ids_with_parts:
regions.add(dev['region'])
zones.add((dev['region'], dev['zone']))
ips.add((dev['region'], dev['zone'], dev['ip']))
self._num_devs += 1
self._num_regions = len(regions)
self._num_zones = len(zones)
self._num_ips = len(ips)
@property
def next_part_power(self):
return self._next_part_power
@ -407,8 +408,8 @@ class Ring(object):
if time() > self._rtime:
self._reload()
primary_nodes = self._get_part_nodes(part)
used = set(d['id'] for d in primary_nodes)
index = count()
same_regions = set(d['region'] for d in primary_nodes)
same_zones = set((d['region'], d['zone']) for d in primary_nodes)
same_ips = set(
@ -434,7 +435,7 @@ class Ring(object):
dev = self._devs[dev_id]
region = dev['region']
if dev_id not in used and region not in same_regions:
yield dev
yield dict(dev, handoff_index=next(index))
used.add(dev_id)
same_regions.add(region)
zone = dev['zone']
@ -459,7 +460,7 @@ class Ring(object):
dev = self._devs[dev_id]
zone = (dev['region'], dev['zone'])
if dev_id not in used and zone not in same_zones:
yield dev
yield dict(dev, handoff_index=next(index))
used.add(dev_id)
same_zones.add(zone)
ip = zone + (dev['ip'],)
@ -482,7 +483,7 @@ class Ring(object):
dev = self._devs[dev_id]
ip = (dev['region'], dev['zone'], dev['ip'])
if dev_id not in used and ip not in same_ips:
yield dev
yield dict(dev, handoff_index=next(index))
used.add(dev_id)
same_ips.add(ip)
if len(same_ips) == self._num_ips:
@ -501,7 +502,8 @@ class Ring(object):
if handoff_part < len(part2dev_id):
dev_id = part2dev_id[handoff_part]
if dev_id not in used:
yield self._devs[dev_id]
dev = self._devs[dev_id]
yield dict(dev, handoff_index=next(index))
used.add(dev_id)
if len(used) == self._num_devs:
hit_all_devs = True

View File

@ -19,7 +19,6 @@ import os
from os.path import join
import random
import time
import itertools
from collections import defaultdict
import six
import six.moves.cPickle as pickle
@ -51,18 +50,22 @@ from swift.common.exceptions import ConnectionTimeout, DiskFileError, \
SYNC, REVERT = ('sync_only', 'sync_revert')
def _get_partners(frag_index, part_nodes):
def _get_partners(node_index, part_nodes):
"""
Returns the left and right partners of the node whose index is
equal to the given frag_index.
Returns the left, right and far partners of the node whose index is equal
to the given node_index.
:param frag_index: a fragment index
:param node_index: the primary index
:param part_nodes: a list of primary nodes
:returns: [<node-to-left>, <node-to-right>]
:returns: [<node-to-left>, <node-to-right>, <node-opposite>]
"""
num_nodes = len(part_nodes)
return [
part_nodes[(frag_index - 1) % len(part_nodes)],
part_nodes[(frag_index + 1) % len(part_nodes)],
part_nodes[(node_index - 1) % num_nodes],
part_nodes[(node_index + 1) % num_nodes],
part_nodes[(
node_index + (num_nodes // 2)
) % num_nodes],
]
@ -203,6 +206,8 @@ class ObjectReconstructor(Daemon):
elif default_handoffs_only:
self.logger.warning('Ignored handoffs_first option in favor '
'of handoffs_only.')
self.rebuild_handoff_node_count = int(conf.get(
'rebuild_handoff_node_count', 2))
self._df_router = DiskFileRouter(conf, self.logger)
self.all_local_devices = self.get_local_devices()
@ -667,6 +672,33 @@ class ObjectReconstructor(Daemon):
_("Trying to sync suffixes with %s") % _full_path(
node, job['partition'], '', job['policy']))
def _iter_nodes_for_frag(self, policy, partition, node):
"""
Generate a priority list of nodes that can sync to the given node.
The primary node is always the highest priority, after that we'll use
handoffs.
To avoid conflicts placing frags we'll skip through the handoffs and
only yield back those that are offset equal to to the given primary
node index.
Nodes returned from this iterator will have 'backend_index' set.
"""
node['backend_index'] = policy.get_backend_index(node['index'])
yield node
count = 0
for handoff_node in policy.object_ring.get_more_nodes(partition):
handoff_backend_index = policy.get_backend_index(
handoff_node['handoff_index'])
if handoff_backend_index == node['backend_index']:
if (self.rebuild_handoff_node_count >= 0 and
count >= self.rebuild_handoff_node_count):
break
handoff_node['backend_index'] = handoff_backend_index
yield handoff_node
count += 1
def _get_suffixes_to_sync(self, job, node):
"""
For SYNC jobs we need to make a remote REPLICATE request to get
@ -677,48 +709,56 @@ class ObjectReconstructor(Daemon):
:param: the job dict, with the keys defined in ``_get_part_jobs``
:param node: the remote node dict
:returns: a (possibly empty) list of strings, the suffixes to be
synced with the remote node.
synced and the remote node.
"""
# get hashes from the remote node
remote_suffixes = None
attempts_remaining = 1
headers = self.headers.copy()
headers['X-Backend-Storage-Policy-Index'] = int(job['policy'])
try:
with Timeout(self.http_timeout):
resp = http_connect(
node['replication_ip'], node['replication_port'],
node['device'], job['partition'], 'REPLICATE',
'', headers=headers).getresponse()
if resp.status == HTTP_INSUFFICIENT_STORAGE:
self.logger.error(
_('%s responded as unmounted'),
_full_path(node, job['partition'], '',
job['policy']))
elif resp.status != HTTP_OK:
full_path = _full_path(node, job['partition'], '',
job['policy'])
self.logger.error(
_("Invalid response %(resp)s from %(full_path)s"),
{'resp': resp.status, 'full_path': full_path})
else:
remote_suffixes = pickle.loads(resp.read())
except (Exception, Timeout):
# all exceptions are logged here so that our caller can
# safely catch our exception and continue to the next node
# without logging
self.logger.exception('Unable to get remote suffix hashes '
'from %r' % _full_path(
node, job['partition'], '',
job['policy']))
possible_nodes = self._iter_nodes_for_frag(
job['policy'], job['partition'], node)
while remote_suffixes is None and attempts_remaining:
try:
node = next(possible_nodes)
except StopIteration:
break
attempts_remaining -= 1
try:
with Timeout(self.http_timeout):
resp = http_connect(
node['replication_ip'], node['replication_port'],
node['device'], job['partition'], 'REPLICATE',
'', headers=headers).getresponse()
if resp.status == HTTP_INSUFFICIENT_STORAGE:
self.logger.error(
_('%s responded as unmounted'),
_full_path(node, job['partition'], '',
job['policy']))
attempts_remaining += 1
elif resp.status != HTTP_OK:
full_path = _full_path(node, job['partition'], '',
job['policy'])
self.logger.error(
_("Invalid response %(resp)s from %(full_path)s"),
{'resp': resp.status, 'full_path': full_path})
else:
remote_suffixes = pickle.loads(resp.read())
except (Exception, Timeout):
# all exceptions are logged here so that our caller can
# safely catch our exception and continue to the next node
# without logging
self.logger.exception('Unable to get remote suffix hashes '
'from %r' % _full_path(
node, job['partition'], '',
job['policy']))
if remote_suffixes is None:
raise SuffixSyncError('Unable to get remote suffix hashes')
suffixes = self.get_suffix_delta(job['hashes'],
job['frag_index'],
remote_suffixes,
job['policy'].get_backend_index(
node['index']))
node['backend_index'])
# now recalculate local hashes for suffixes that don't
# match so we're comparing the latest
local_suff = self._get_hashes(job['local_dev']['device'],
@ -728,11 +768,10 @@ class ObjectReconstructor(Daemon):
suffixes = self.get_suffix_delta(local_suff,
job['frag_index'],
remote_suffixes,
job['policy'].get_backend_index(
node['index']))
node['backend_index'])
self.suffix_count += len(suffixes)
return suffixes
return suffixes, node
def delete_reverted_objs(self, job, objects, frag_index):
"""
@ -798,38 +837,15 @@ class ObjectReconstructor(Daemon):
"""
self.logger.increment(
'partition.update.count.%s' % (job['local_dev']['device'],))
# after our left and right partners, if there's some sort of
# failure we'll continue onto the remaining primary nodes and
# make sure they're in sync - or potentially rebuild missing
# fragments we find
dest_nodes = itertools.chain(
job['sync_to'],
# I think we could order these based on our index to better
# protect against a broken chain
[
n for n in
job['policy'].object_ring.get_part_nodes(job['partition'])
if n['id'] != job['local_dev']['id'] and
n['id'] not in (m['id'] for m in job['sync_to'])
],
)
syncd_with = 0
for node in dest_nodes:
if syncd_with >= len(job['sync_to']):
# success!
break
for node in job['sync_to']:
try:
suffixes = self._get_suffixes_to_sync(job, node)
suffixes, node = self._get_suffixes_to_sync(job, node)
except SuffixSyncError:
continue
if not suffixes:
syncd_with += 1
continue
node['backend_index'] = job['policy'].get_backend_index(
node['index'])
# ssync any out-of-sync suffixes with the remote node
success, _ = ssync_sender(
self, node, job, suffixes)()
@ -838,8 +854,6 @@ class ObjectReconstructor(Daemon):
# update stats for this attempt
self.suffix_sync += len(suffixes)
self.logger.update_stats('suffix.syncs', len(suffixes))
if success:
syncd_with += 1
self.logger.timing_since('partition.update.timing', begin)
def _revert(self, job, begin):
@ -951,6 +965,8 @@ class ObjectReconstructor(Daemon):
try:
suffixes = data_fi_to_suffixes.pop(frag_index)
except KeyError:
# N.B. If this function ever returns an empty list of jobs
# the entire partition will be deleted.
suffixes = []
sync_job = build_job(
job_type=SYNC,

View File

@ -22,7 +22,6 @@ import unittest
import uuid
import shutil
import random
from collections import defaultdict
import os
import time
@ -32,7 +31,6 @@ from test.probe.common import ECProbeTest
from swift.common import direct_client
from swift.common.storage_policy import EC_POLICY
from swift.common.manager import Manager
from swift.obj.reconstructor import _get_partners
from swiftclient import client, ClientException
@ -300,46 +298,46 @@ class TestReconstructorRebuild(ECProbeTest):
self._test_rebuild_scenario(failed, non_durable, 3)
def test_rebuild_partner_down(self):
# find a primary server that only has one of it's devices in the
# primary node list
group_nodes_by_config = defaultdict(list)
for n in self.onodes:
group_nodes_by_config[self.config_number(n)].append(n)
for config_number, node_list in group_nodes_by_config.items():
if len(node_list) == 1:
break
else:
self.fail('ring balancing did not use all available nodes')
primary_node = node_list[0]
# we have to pick a lower index because we have few handoffs
nodes = self.onodes[:2]
random.shuffle(nodes) # left or right is fine
primary_node, partner_node = nodes
# pick one it's partners to fail randomly
partner_node = random.choice(_get_partners(
primary_node['index'], self.onodes))
# capture fragment etag from partner
failed_partner_meta, failed_partner_etag = self.direct_get(
partner_node, self.opart)
# 507 the partner device
# and 507 the failed partner device
device_path = self.device_dir('object', partner_node)
self.kill_drive(device_path)
# select another primary sync_to node to fail
failed_primary = [n for n in self.onodes if n['id'] not in
(primary_node['id'], partner_node['id'])][0]
# ... capture it's fragment etag
failed_primary_meta, failed_primary_etag = self.direct_get(
failed_primary, self.opart)
# ... and delete it
part_dir = self.storage_dir('object', failed_primary, part=self.opart)
shutil.rmtree(part_dir, True)
# reconstruct from the primary, while one of it's partners is 507'd
self.reconstructor.once(number=self.config_number(primary_node))
# the other failed primary will get it's fragment rebuilt instead
failed_primary_meta_new, failed_primary_etag_new = self.direct_get(
failed_primary, self.opart)
del failed_primary_meta['Date']
del failed_primary_meta_new['Date']
self.assertEqual(failed_primary_etag, failed_primary_etag_new)
self.assertEqual(failed_primary_meta, failed_primary_meta_new)
# a handoff will pickup the rebuild
hnodes = list(self.object_ring.get_more_nodes(self.opart))
for node in hnodes:
try:
found_meta, found_etag = self.direct_get(
node, self.opart)
except DirectClientException as e:
if e.http_status != 404:
raise
else:
break
else:
self.fail('Unable to fetch rebuilt frag from handoffs %r '
'given primary nodes %r with %s unmounted '
'trying to rebuild from %s' % (
[h['device'] for h in hnodes],
[n['device'] for n in self.onodes],
partner_node['device'],
primary_node['device'],
))
self.assertEqual(failed_partner_etag, found_etag)
del failed_partner_meta['Date']
del found_meta['Date']
self.assertEqual(failed_partner_meta, found_meta)
# just to be nice
self.revive_drive(device_path)

View File

@ -274,6 +274,7 @@ class FakeRing(Ring):
return [dict(node, index=i) for i, node in enumerate(list(self._devs))]
def get_more_nodes(self, part):
index_counter = itertools.count()
for x in range(self.replicas, (self.replicas + self.max_more_nodes)):
yield {'ip': '10.0.0.%s' % x,
'replication_ip': '10.0.0.%s' % x,
@ -282,7 +283,8 @@ class FakeRing(Ring):
'device': 'sda',
'zone': x % 3,
'region': x % 2,
'id': x}
'id': x,
'handoff_index': next(index_counter)}
def write_fake_ring(path, *devs):
@ -346,6 +348,9 @@ class FabricatedRing(Ring):
self._part_shift = 32 - part_power
self._reload()
def has_changed(self):
return False
def _reload(self, *args, **kwargs):
self._rtime = time.time() * 2
if hasattr(self, '_replica2part2dev_id'):
@ -370,6 +375,7 @@ class FabricatedRing(Ring):
for p in range(2 ** self.part_power):
for r in range(self.replicas):
self._replica2part2dev_id[r][p] = next(dev_ids)
self._update_bookkeeping()
class FakeMemcache(object):

View File

@ -568,6 +568,10 @@ class TestRing(TestRingBase):
self.assertEqual(len(devs), len(exp_handoffs))
dev_ids = [d['id'] for d in devs]
self.assertEqual(dev_ids, exp_handoffs)
# We mark handoffs so code consuming extra nodes can reason about how
# far they've gone
for i, d in enumerate(devs):
self.assertEqual(d['handoff_index'], i)
# The first 6 replicas plus the 3 primary nodes should cover all 9
# zones in this test

File diff suppressed because it is too large Load Diff