Multiprocess object replicator

Add a multiprocess mode to the object replicator. Setting the
"replicator_workers" setting to a positive value N will result in the
replicator using up to N worker processes to perform replication
tasks.

At most one worker per disk will be spawned, so one can set
replicator_workers=99999999 to always get one worker per disk
regardless of the number of disks in each node. This is the same
behavior that the object reconstructor has.

Worker process logs will have a bit of information prepended so
operators can tell which messages came from which worker. It looks
like this:

  [worker 1/2 pid=16529] 154/154 (100.00%) partitions replicated in 1.02s (150.87/sec, 0s remaining)

The prefix is "[worker M/N pid=P] ", where M is the worker's index, N
is the total number of workers, and P is the process ID. Every message
from the replicator's logger will have the prefix; this includes
messages from down in diskfile, but does not include things printed to
stdout or stderr.

Drive-by fix: don't dump recon stats when replicating only certain
policies. When running the object replicator with replicator_workers >
0 and "--policies=X,Y,Z", the replicator would update recon stats
after running. Since it only ran on a subset of objects, it should not
update recon, much like it doesn't update recon when run with
--devices or --partitions.

Change-Id: I6802a9ad9f1f9b9dafb99d8b095af0fdbf174dc5
This commit is contained in:
Samuel Merritt 2018-03-22 17:08:48 -07:00 committed by Tim Burke
parent 0a6f0d615c
commit c28004deb0
11 changed files with 1144 additions and 211 deletions

View File

@ -692,8 +692,14 @@ daemonize yes Whether or not to run rep
as a daemon
interval 30 Time in seconds to wait between
replication passes
concurrency 1 Number of replication workers to
spawn
concurrency 1 Number of replication jobs to
run per worker process
replicator_workers 0 Number of worker processes to use.
No matter how big this number is,
at most one worker per disk will
be used. The default value of 0
means no forking; all work is done
in the main process.
sync_method rsync The sync method to use; default
is rsync but you can use ssync to
try the EXPERIMENTAL

View File

@ -225,7 +225,16 @@ use = egg:swift#recon
# run_pause is deprecated, use interval instead
# run_pause = 30
#
# Number of concurrent replication jobs to run. This is per-process,
# so replicator_workers=W and concurrency=C will result in W*C
# replication jobs running at once.
# concurrency = 1
#
# Number of worker processes to use. No matter how big this number is,
# at most one worker per disk will be used. 0 means no forking; all work
# is done in the main process.
# replicator_workers = 0
#
# stats_interval = 300
#
# default is rsync, alternative is ssync

View File

@ -64,6 +64,16 @@ class Daemon(object):
else:
self.run_forever(**kwargs)
def post_multiprocess_run(self):
"""
Override this to do something after running using multiple worker
processes. This method is called in the parent process.
This is probably only useful for run-once mode since there is no
"after running" in run-forever mode.
"""
pass
def get_worker_args(self, once=False, **kwargs):
"""
For each worker yield a (possibly empty) dict of kwargs to pass along
@ -229,6 +239,7 @@ class DaemonStrategy(object):
self.logger.notice('Finished %s', os.getpid())
break
time.sleep(0.1)
self.daemon.post_multiprocess_run()
return 0
def cleanup(self):

View File

@ -33,7 +33,7 @@ from swift.common.direct_client import quote
from swift.common.utils import get_logger, whataremyips, storage_directory, \
renamer, mkdirs, lock_parent_directory, config_true_value, \
unlink_older_than, dump_recon_cache, rsync_module_interpolation, \
json, Timestamp, parse_overrides, round_robin_iter
json, Timestamp, parse_override_options, round_robin_iter, Everything
from swift.common import ring
from swift.common.ring.utils import is_local_device
from swift.common.http import HTTP_NOT_FOUND, HTTP_INSUFFICIENT_STORAGE
@ -655,8 +655,10 @@ class Replicator(Daemon):
def run_once(self, *args, **kwargs):
"""Run a replication pass once."""
devices_to_replicate, partitions_to_replicate = parse_overrides(
**kwargs)
override_options = parse_override_options(once=True, **kwargs)
devices_to_replicate = override_options.devices or Everything()
partitions_to_replicate = override_options.partitions or Everything()
self._zero_stats()
dirs = []

View File

@ -19,6 +19,7 @@ from __future__ import print_function
import base64
import binascii
import collections
import errno
import fcntl
import grp
@ -1697,6 +1698,51 @@ def timing_stats(**dec_kwargs):
return decorating_func
class SwiftLoggerAdapter(logging.LoggerAdapter):
"""
A logging.LoggerAdapter subclass that also passes through StatsD method
calls.
Like logging.LoggerAdapter, you have to subclass this and override the
process() method to accomplish anything useful.
"""
def update_stats(self, *a, **kw):
return self.logger.update_stats(*a, **kw)
def increment(self, *a, **kw):
return self.logger.increment(*a, **kw)
def decrement(self, *a, **kw):
return self.logger.decrement(*a, **kw)
def timing(self, *a, **kw):
return self.logger.timing(*a, **kw)
def timing_since(self, *a, **kw):
return self.logger.timing_since(*a, **kw)
def transfer_rate(self, *a, **kw):
return self.logger.transfer_rate(*a, **kw)
class PrefixLoggerAdapter(SwiftLoggerAdapter):
"""
Adds an optional prefix to all its log messages. When the prefix has not
been set, messages are unchanged.
"""
def set_prefix(self, prefix):
self.extra['prefix'] = prefix
def exception(self, *a, **kw):
self.logger.exception(*a, **kw)
def process(self, msg, kwargs):
msg, kwargs = super(PrefixLoggerAdapter, self).process(msg, kwargs)
if 'prefix' in self.extra:
msg = self.extra['prefix'] + msg
return (msg, kwargs)
# double inheritance to support property with setter
class LogAdapter(logging.LoggerAdapter, object):
"""
@ -3262,8 +3308,24 @@ def dump_recon_cache(cache_dict, cache_file, logger, lock_timeout=2,
except OSError as err:
if err.errno != errno.ENOENT:
raise
except (Exception, Timeout):
logger.exception(_('Exception dumping recon cache'))
except (Exception, Timeout) as err:
logger.exception('Exception dumping recon cache: %s' % err)
def load_recon_cache(cache_file):
"""
Load a recon cache file. Treats missing file as empty.
"""
try:
with open(cache_file) as fh:
return json.load(fh)
except IOError as e:
if e.errno == errno.ENOENT:
return {}
else:
raise
except ValueError: # invalid JSON
return {}
def listdir(path):
@ -3436,27 +3498,6 @@ def list_from_csv(comma_separated_str):
return []
def parse_overrides(devices='', partitions='', **kwargs):
"""
Given daemon kwargs parse out device and partition overrides or Everything.
:returns: a tuple of (devices, partitions) which an used like containers to
check if a given partition (integer) or device (string) is "in"
the collection on which we should act.
"""
devices = list_from_csv(devices)
if not devices:
devices = Everything()
partitions = [
int(part) for part in
list_from_csv(partitions)]
if not partitions:
partitions = Everything()
return devices, partitions
def csv_append(csv_string, item):
"""
Appends an item to a comma-separated string.
@ -4647,3 +4688,55 @@ def round_robin_iter(its):
yield next(it)
except StopIteration:
its.remove(it)
OverrideOptions = collections.namedtuple(
'OverrideOptions', ['devices', 'partitions', 'policies'])
def parse_override_options(**kwargs):
"""
Figure out which policies, devices, and partitions we should operate on,
based on kwargs.
If 'override_policies' is already present in kwargs, then return that
value. This happens when using multiple worker processes; the parent
process supplies override_policies=X to each child process.
Otherwise, in run-once mode, look at the 'policies' keyword argument.
This is the value of the "--policies" command-line option. In
run-forever mode or if no --policies option was provided, an empty list
will be returned.
The procedures for devices and partitions are similar.
:returns: a named tuple with fields "devices", "partitions", and
"policies".
"""
run_once = kwargs.get('once', False)
if 'override_policies' in kwargs:
policies = kwargs['override_policies']
elif run_once:
policies = [
int(p) for p in list_from_csv(kwargs.get('policies'))]
else:
policies = []
if 'override_devices' in kwargs:
devices = kwargs['override_devices']
elif run_once:
devices = list_from_csv(kwargs.get('devices'))
else:
devices = []
if 'override_partitions' in kwargs:
partitions = kwargs['override_partitions']
elif run_once:
partitions = [
int(p) for p in list_from_csv(kwargs.get('partitions'))]
else:
partitions = []
return OverrideOptions(devices=devices, partitions=partitions,
policies=policies)

View File

@ -32,8 +32,9 @@ from eventlet.support.greenlets import GreenletExit
from swift import gettext_ as _
from swift.common.utils import (
whataremyips, unlink_older_than, compute_eta, get_logger,
dump_recon_cache, mkdirs, config_true_value, list_from_csv,
tpool_reraise, GreenAsyncPile, Timestamp, remove_file)
dump_recon_cache, mkdirs, config_true_value,
tpool_reraise, GreenAsyncPile, Timestamp, remove_file,
load_recon_cache, parse_override_options)
from swift.common.header_key_dict import HeaderKeyDict
from swift.common.bufferedhttp import http_connect
from swift.common.daemon import Daemon
@ -90,30 +91,6 @@ def _full_path(node, part, relative_path, policy):
}
def parse_override_options(**kwargs):
"""
Return a dict with keys `override_devices` and `override_partitions` whose
values have been parsed from `kwargs`. If either key is found in `kwargs`
then copy its value from kwargs. Otherwise, if `once` is set in `kwargs`
then parse `devices` and `partitions` keys for the value of
`override_devices` and `override_partitions` respectively.
:return: a dict with keys `override_devices` and `override_partitions`
"""
if kwargs.get('once', False):
devices = list_from_csv(kwargs.get('devices'))
partitions = [
int(p) for p in list_from_csv(kwargs.get('partitions'))]
else:
devices = []
partitions = []
return {
'override_devices': kwargs.get('override_devices', devices),
'override_partitions': kwargs.get('override_partitions', partitions),
}
class RebuildingECDiskFileStream(object):
"""
This class wraps the reconstructed fragment archive data and
@ -236,19 +213,20 @@ class ObjectReconstructor(Daemon):
"""
if self.reconstructor_workers < 1:
return
override_options = parse_override_options(once=once, **kwargs)
override_opts = parse_override_options(once=once, **kwargs)
# Note that this get re-used when dumping stats and in is_healthy
self.all_local_devices = self.get_local_devices()
if override_options['override_devices']:
devices = [d for d in override_options['override_devices']
if override_opts.devices:
devices = [d for d in override_opts.devices
if d in self.all_local_devices]
else:
devices = list(self.all_local_devices)
if not devices:
# we only need a single worker to do nothing until a ring change
yield dict(override_options)
yield dict(override_devices=override_opts.devices,
override_partitions=override_opts.partitions)
return
# for somewhat uniform load per worker use same max_devices_per_worker
# when handling all devices or just override devices...
@ -257,8 +235,9 @@ class ObjectReconstructor(Daemon):
# ...but only use enough workers for the actual devices being handled
n = int(math.ceil(1.0 * len(devices) / max_devices_per_worker))
override_devices_per_worker = [devices[i::n] for i in range(n)]
for override_devices in override_devices_per_worker:
yield dict(override_options, override_devices=override_devices)
for override_devices_pw in override_devices_per_worker:
yield dict(override_devices=override_devices_pw,
override_partitions=override_opts.partitions)
def is_healthy(self):
"""
@ -276,14 +255,7 @@ class ObjectReconstructor(Daemon):
"""
Aggregate per-disk rcache updates from child workers.
"""
try:
with open(self.rcache) as f:
existing_data = json.load(f)
except IOError as e:
if e.errno != errno.ENOENT:
raise
# dump_recon_cache will create new file and dirs
existing_data = {}
existing_data = load_recon_cache(self.rcache)
first_start = time.time()
last_finish = 0
all_devices_reporting = True
@ -1247,18 +1219,20 @@ class ObjectReconstructor(Daemon):
def run_once(self, *args, **kwargs):
start = time.time()
self.logger.info(_("Running object reconstructor in script mode."))
override_options = parse_override_options(once=True, **kwargs)
self.reconstruct(**override_options)
override_opts = parse_override_options(once=True, **kwargs)
self.reconstruct(override_devices=override_opts.devices,
override_partitions=override_opts.partitions)
total = (time.time() - start) / 60
self.logger.info(
_("Object reconstruction complete (once). (%.02f minutes)"), total)
# Only dump stats if they would actually be meaningful -- i.e. we're
# collecting per-disk stats and covering all partitions, or we're
# covering all partitions, all disks.
if not override_options['override_partitions'] and (
self.reconstructor_workers > 0 or
not override_options['override_devices']):
self.final_recon_dump(total, **override_options)
if not override_opts.partitions and (
self.reconstructor_workers > 0 or not override_opts.devices):
self.final_recon_dump(
total, override_devices=override_opts.devices,
override_partitions=override_opts.partitions)
def run_forever(self, *args, **kwargs):
self.logger.info(_("Starting object reconstructor in daemon mode."))
@ -1266,13 +1240,16 @@ class ObjectReconstructor(Daemon):
while True:
start = time.time()
self.logger.info(_("Starting object reconstruction pass."))
override_options = parse_override_options(**kwargs)
override_opts = parse_override_options(**kwargs)
# Run the reconstructor
self.reconstruct(**override_options)
self.reconstruct(override_devices=override_opts.devices,
override_partitions=override_opts.partitions)
total = (time.time() - start) / 60
self.logger.info(
_("Object reconstruction complete. (%.02f minutes)"), total)
self.final_recon_dump(total, **override_options)
self.final_recon_dump(
total, override_devices=override_opts.devices,
override_partitions=override_opts.partitions)
self.logger.debug('reconstruction sleeping for %s seconds.',
self.interval)
sleep(self.interval)

View File

@ -13,6 +13,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from collections import defaultdict
import os
import errno
from os.path import isdir, isfile, join, dirname
@ -32,8 +33,9 @@ from swift.common.constraints import check_drive
from swift.common.ring.utils import is_local_device
from swift.common.utils import whataremyips, unlink_older_than, \
compute_eta, get_logger, dump_recon_cache, \
rsync_module_interpolation, mkdirs, config_true_value, list_from_csv, \
tpool_reraise, config_auto_int_value, storage_directory
rsync_module_interpolation, mkdirs, config_true_value, \
tpool_reraise, config_auto_int_value, storage_directory, \
load_recon_cache, PrefixLoggerAdapter, parse_override_options
from swift.common.bufferedhttp import http_connect
from swift.common.daemon import Daemon
from swift.common.http import HTTP_OK, HTTP_INSUFFICIENT_STORAGE
@ -48,6 +50,68 @@ def _do_listdir(partition, replication_cycle):
return (((partition + replication_cycle) % 10) == 0)
class Stats(object):
fields = ['attempted', 'failure', 'hashmatch', 'remove', 'rsync',
'success', 'suffix_count', 'suffix_hash', 'suffix_sync',
'failure_nodes']
@classmethod
def from_recon(cls, dct):
return cls(**{k: v for k, v in dct.items() if k in cls.fields})
def to_recon(self):
return {k: getattr(self, k) for k in self.fields}
def __init__(self, attempted=0, failure=0, hashmatch=0, remove=0, rsync=0,
success=0, suffix_count=0, suffix_hash=0,
suffix_sync=0, failure_nodes=None):
self.attempted = attempted
self.failure = failure
self.hashmatch = hashmatch
self.remove = remove
self.rsync = rsync
self.success = success
self.suffix_count = suffix_count
self.suffix_hash = suffix_hash
self.suffix_sync = suffix_sync
self.failure_nodes = defaultdict(lambda: defaultdict(int),
(failure_nodes or {}))
def __add__(self, other):
total = type(self)()
total.attempted = self.attempted + other.attempted
total.failure = self.failure + other.failure
total.hashmatch = self.hashmatch + other.hashmatch
total.remove = self.remove + other.remove
total.rsync = self.rsync + other.rsync
total.success = self.success + other.success
total.suffix_count = self.suffix_count + other.suffix_count
total.suffix_hash = self.suffix_hash + other.suffix_hash
total.suffix_sync = self.suffix_sync + other.suffix_sync
all_failed_ips = (set(self.failure_nodes.keys() +
other.failure_nodes.keys()))
for ip in all_failed_ips:
self_devs = self.failure_nodes.get(ip, {})
other_devs = other.failure_nodes.get(ip, {})
this_ip_failures = {}
for dev in set(self_devs.keys() + other_devs.keys()):
this_ip_failures[dev] = (
self_devs.get(dev, 0) + other_devs.get(dev, 0))
total.failure_nodes[ip] = this_ip_failures
return total
def add_failure_stats(self, failures):
"""
Note the failure of one or more devices.
:param failures: a list of (ip, device-name) pairs that failed
"""
self.failure += len(failures)
for ip, device in failures:
self.failure_nodes[ip][device] += 1
class ObjectReplicator(Daemon):
"""
Replicate objects.
@ -63,7 +127,8 @@ class ObjectReplicator(Daemon):
:param logger: logging object
"""
self.conf = conf
self.logger = logger or get_logger(conf, log_route='object-replicator')
self.logger = PrefixLoggerAdapter(
logger or get_logger(conf, log_route='object-replicator'), {})
self.devices_dir = conf.get('devices', '/srv/node')
self.mount_check = config_true_value(conf.get('mount_check', 'true'))
self.swift_dir = conf.get('swift_dir', '/etc/swift')
@ -72,6 +137,7 @@ class ObjectReplicator(Daemon):
self.port = None if self.servers_per_port else \
int(conf.get('bind_port', 6200))
self.concurrency = int(conf.get('concurrency', 1))
self.replicator_workers = int(conf.get('replicator_workers', 0))
self.stats_interval = int(conf.get('stats_interval', '300'))
self.ring_check_interval = int(conf.get('ring_check_interval', 15))
self.next_check = time.time() + self.ring_check_interval
@ -92,6 +158,7 @@ class ObjectReplicator(Daemon):
self.recon_cache_path = conf.get('recon_cache_path',
'/var/cache/swift')
self.rcache = os.path.join(self.recon_cache_path, "object.recon")
self._next_rcache_update = time.time() + self.stats_interval
self.conn_timeout = float(conf.get('conn_timeout', 0.5))
self.node_timeout = float(conf.get('node_timeout', 10))
self.sync_method = getattr(self, conf.get('sync_method') or 'rsync')
@ -110,21 +177,22 @@ class ObjectReplicator(Daemon):
'operation, please disable handoffs_first and '
'handoff_delete before the next '
'normal rebalance')
self.is_multiprocess_worker = None
self._df_router = DiskFileRouter(conf, self.logger)
self._child_process_reaper_queue = queue.LightQueue()
def _zero_stats(self):
"""Zero out the stats."""
self.stats = {'attempted': 0, 'success': 0, 'failure': 0,
'hashmatch': 0, 'rsync': 0, 'remove': 0,
'start': time.time(), 'failure_nodes': {}}
self.stats_for_dev = defaultdict(Stats)
def _add_failure_stats(self, failure_devs_info):
for node, dev in failure_devs_info:
self.stats['failure'] += 1
failure_devs = self.stats['failure_nodes'].setdefault(node, {})
failure_devs.setdefault(dev, 0)
failure_devs[dev] += 1
@property
def total_stats(self):
return sum(self.stats_for_dev.values(), Stats())
def _emplace_log_prefix(self, worker_index):
self.logger.set_prefix("[worker %d/%d pid=%d] " % (
worker_index + 1, # use 1-based indexing for more readable logs
self.replicator_workers,
os.getpid()))
def _get_my_replication_ips(self):
my_replication_ips = set()
@ -167,6 +235,87 @@ class ObjectReplicator(Daemon):
pass
procs -= reaped_procs
def get_worker_args(self, once=False, **kwargs):
if self.replicator_workers < 1:
return []
override_opts = parse_override_options(once=once, **kwargs)
have_overrides = bool(override_opts.devices or override_opts.partitions
or override_opts.policies)
# save this off for ring-change detection later in is_healthy()
self.all_local_devices = self.get_local_devices()
if override_opts.devices:
devices_to_replicate = [
d for d in override_opts.devices
if d in self.all_local_devices]
else:
# The sort isn't strictly necessary since we're just trying to
# spread devices around evenly, but it makes testing easier.
devices_to_replicate = sorted(self.all_local_devices)
# Distribute devices among workers as evenly as possible
self.replicator_workers = min(self.replicator_workers,
len(devices_to_replicate))
worker_args = [
{
'override_devices': [],
'override_partitions': override_opts.partitions,
'override_policies': override_opts.policies,
'have_overrides': have_overrides,
'multiprocess_worker_index': i,
}
for i in range(self.replicator_workers)]
for index, device in enumerate(devices_to_replicate):
idx = index % self.replicator_workers
worker_args[idx]['override_devices'].append(device)
return worker_args
def is_healthy(self):
"""
Check whether our set of local devices remains the same.
If devices have been added or removed, then we return False here so
that we can kill off any worker processes and then distribute the
new set of local devices across a new set of workers so that all
devices are, once again, being worked on.
This function may also cause recon stats to be updated.
:returns: False if any local devices have been added or removed,
True otherwise
"""
# We update recon here because this is the only function we have in
# a multiprocess replicator that gets called periodically in the
# parent process.
if time.time() >= self._next_rcache_update:
update = self.aggregate_recon_update()
dump_recon_cache(update, self.rcache, self.logger)
return self.get_local_devices() == self.all_local_devices
def get_local_devices(self):
"""
Returns a set of all local devices in all replication-type storage
policies.
This is the device names, e.g. "sdq" or "d1234" or something, not
the full ring entries.
"""
ips = whataremyips(self.bind_ip)
local_devices = set()
for policy in POLICIES:
if policy.policy_type != REPL_POLICY:
continue
self.load_object_ring(policy)
for device in policy.object_ring.devs:
if device and is_local_device(
ips, self.port,
device['replication_ip'],
device['replication_port']):
local_devices.add(device['device'])
return local_devices
# Just exists for doc anchor point
def sync(self, node, job, suffixes, *args, **kwargs):
"""
@ -319,7 +468,9 @@ class ObjectReplicator(Daemon):
def tpool_get_suffixes(path):
return [suff for suff in os.listdir(path)
if len(suff) == 3 and isdir(join(path, suff))]
self.replication_count += 1
stats = self.stats_for_dev[job['device']]
stats.attempted += 1
self.logger.increment('partition.delete.count.%s' % (job['device'],))
headers = dict(self.default_headers)
headers['X-Backend-Storage-Policy-Index'] = int(job['policy'])
@ -333,7 +484,7 @@ class ObjectReplicator(Daemon):
delete_objs = None
if suffixes:
for node in job['nodes']:
self.stats['rsync'] += 1
stats.rsync += 1
kwargs = {}
if node['region'] in synced_remote_regions and \
self.conf.get('sync_method', 'rsync') == 'ssync':
@ -373,7 +524,7 @@ class ObjectReplicator(Daemon):
delete_handoff = len(responses) == len(job['nodes']) and \
all(responses)
if delete_handoff:
self.stats['remove'] += 1
stats.remove += 1
if (self.conf.get('sync_method', 'rsync') == 'ssync' and
delete_objs is not None):
self.logger.info(_("Removing %s objects"),
@ -398,12 +549,12 @@ class ObjectReplicator(Daemon):
handoff_partition_deleted = True
except (Exception, Timeout):
self.logger.exception(_("Error syncing handoff partition"))
self._add_failure_stats(failure_devs_info)
stats.add_failure_stats(failure_devs_info)
finally:
target_devs_info = set([(target_dev['replication_ip'],
target_dev['device'])
for target_dev in job['nodes']])
self.stats['success'] += len(target_devs_info - failure_devs_info)
stats.success += len(target_devs_info - failure_devs_info)
if not handoff_partition_deleted:
self.handoffs_remaining += 1
self.partition_times.append(time.time() - begin)
@ -438,7 +589,8 @@ class ObjectReplicator(Daemon):
:param job: a dict containing info about the partition to be replicated
"""
self.replication_count += 1
stats = self.stats_for_dev[job['device']]
stats.attempted += 1
self.logger.increment('partition.update.count.%s' % (job['device'],))
headers = dict(self.default_headers)
headers['X-Backend-Storage-Policy-Index'] = int(job['policy'])
@ -453,7 +605,7 @@ class ObjectReplicator(Daemon):
do_listdir=_do_listdir(
int(job['partition']),
self.replication_cycle))
self.suffix_hash += hashed
stats.suffix_hash += hashed
self.logger.update_stats('suffix.hashes', hashed)
attempts_left = len(job['nodes'])
synced_remote_regions = set()
@ -499,7 +651,7 @@ class ObjectReplicator(Daemon):
local_hash[suffix] !=
remote_hash.get(suffix, -1)]
if not suffixes:
self.stats['hashmatch'] += 1
stats.hashmatch += 1
continue
hashed, recalc_hash = tpool_reraise(
df_mgr._get_hashes,
@ -510,7 +662,7 @@ class ObjectReplicator(Daemon):
suffixes = [suffix for suffix in local_hash if
local_hash[suffix] !=
remote_hash.get(suffix, -1)]
self.stats['rsync'] += 1
stats.rsync += 1
success, _junk = self.sync(node, job, suffixes)
with Timeout(self.http_timeout):
conn = http_connect(
@ -525,14 +677,14 @@ class ObjectReplicator(Daemon):
# add only remote region when replicate succeeded
if success and node['region'] != job['region']:
synced_remote_regions.add(node['region'])
self.suffix_sync += len(suffixes)
stats.suffix_sync += len(suffixes)
self.logger.update_stats('suffix.syncs', len(suffixes))
except (Exception, Timeout):
failure_devs_info.add((node['replication_ip'],
node['device']))
self.logger.exception(_("Error syncing with node: %s") %
node)
self.suffix_count += len(local_hash)
stats.suffix_count += len(local_hash)
except StopIteration:
self.logger.error('Ran out of handoffs while replicating '
'partition %s of policy %d',
@ -541,8 +693,8 @@ class ObjectReplicator(Daemon):
failure_devs_info.update(target_devs_info)
self.logger.exception(_("Error syncing partition"))
finally:
self._add_failure_stats(failure_devs_info)
self.stats['success'] += len(target_devs_info - failure_devs_info)
stats.add_failure_stats(failure_devs_info)
stats.success += len(target_devs_info - failure_devs_info)
self.partition_times.append(time.time() - begin)
self.logger.timing_since('partition.update.timing', begin)
@ -550,29 +702,35 @@ class ObjectReplicator(Daemon):
"""
Logs various stats for the currently running replication pass.
"""
if self.replication_count:
stats = self.total_stats
replication_count = stats.attempted
if replication_count > self.last_replication_count:
self.last_replication_count = replication_count
elapsed = (time.time() - self.start) or 0.000001
rate = self.replication_count / elapsed
rate = replication_count / elapsed
self.logger.info(
_("%(replicated)d/%(total)d (%(percentage).2f%%)"
" partitions replicated in %(time).2fs (%(rate).2f/sec, "
"%(remaining)s remaining)"),
{'replicated': self.replication_count, 'total': self.job_count,
'percentage': self.replication_count * 100.0 / self.job_count,
{'replicated': replication_count, 'total': self.job_count,
'percentage': replication_count * 100.0 / self.job_count,
'time': time.time() - self.start, 'rate': rate,
'remaining': '%d%s' % compute_eta(self.start,
self.replication_count,
replication_count,
self.job_count)})
self.logger.info(_('%(success)s successes, %(failure)s failures')
% self.stats)
% dict(success=stats.success,
failure=stats.failure))
if self.suffix_count:
if stats.suffix_count:
self.logger.info(
_("%(checked)d suffixes checked - "
"%(hashed).2f%% hashed, %(synced).2f%% synced"),
{'checked': self.suffix_count,
'hashed': (self.suffix_hash * 100.0) / self.suffix_count,
'synced': (self.suffix_sync * 100.0) / self.suffix_count})
{'checked': stats.suffix_count,
'hashed':
(stats.suffix_hash * 100.0) / stats.suffix_count,
'synced':
(stats.suffix_sync * 100.0) / stats.suffix_count})
self.partition_times.sort()
self.logger.info(
_("Partition times: max %(max).4fs, "
@ -619,8 +777,9 @@ class ObjectReplicator(Daemon):
found_local = True
dev_path = check_drive(self.devices_dir, local_dev['device'],
self.mount_check)
local_dev_stats = self.stats_for_dev[local_dev['device']]
if not dev_path:
self._add_failure_stats(
local_dev_stats.add_failure_stats(
[(failure_dev['replication_ip'],
failure_dev['device'])
for failure_dev in policy.object_ring.devs
@ -666,12 +825,12 @@ class ObjectReplicator(Daemon):
region=local_dev['region']))
except ValueError:
if part_nodes:
self._add_failure_stats(
local_dev_stats.add_failure_stats(
[(failure_dev['replication_ip'],
failure_dev['device'])
for failure_dev in nodes])
else:
self._add_failure_stats(
local_dev_stats.add_failure_stats(
[(failure_dev['replication_ip'],
failure_dev['device'])
for failure_dev in policy.object_ring.devs
@ -715,7 +874,7 @@ class ObjectReplicator(Daemon):
if policy.policy_type == REPL_POLICY:
if (override_policies is not None and
str(policy.idx) not in override_policies):
policy.idx not in override_policies):
continue
# ensure rings are loaded for policy
self.load_object_ring(policy)
@ -730,14 +889,12 @@ class ObjectReplicator(Daemon):
return jobs
def replicate(self, override_devices=None, override_partitions=None,
override_policies=None):
override_policies=None, start_time=None):
"""Run a replication pass"""
self.start = time.time()
self.suffix_count = 0
self.suffix_sync = 0
self.suffix_hash = 0
self.replication_count = 0
self.last_replication_count = -1
if start_time is None:
start_time = time.time()
self.start = start_time
self.last_replication_count = 0
self.replication_cycle = (self.replication_cycle + 1) % 10
self.partition_times = []
self.my_replication_ips = self._get_my_replication_ips()
@ -748,19 +905,23 @@ class ObjectReplicator(Daemon):
eventlet.sleep() # Give spawns a cycle
current_nodes = None
dev_stats = None
num_jobs = 0
try:
self.run_pool = GreenPool(size=self.concurrency)
jobs = self.collect_jobs(override_devices=override_devices,
override_partitions=override_partitions,
override_policies=override_policies)
for job in jobs:
dev_stats = self.stats_for_dev[job['device']]
num_jobs += 1
current_nodes = job['nodes']
dev_path = check_drive(self.devices_dir, job['device'],
self.mount_check)
if not dev_path:
self._add_failure_stats([(failure_dev['replication_ip'],
failure_dev['device'])
for failure_dev in job['nodes']])
dev_stats.add_failure_stats([
(failure_dev['replication_ip'], failure_dev['device'])
for failure_dev in job['nodes']])
self.logger.warning(_('%s is not mounted'), job['device'])
continue
if self.handoffs_first and not job['delete']:
@ -794,57 +955,126 @@ class ObjectReplicator(Daemon):
self.run_pool.spawn(self.update, job)
current_nodes = None
self.run_pool.waitall()
except (Exception, Timeout):
if current_nodes:
self._add_failure_stats([(failure_dev['replication_ip'],
failure_dev['device'])
for failure_dev in current_nodes])
else:
self._add_failure_stats(self.all_devs_info)
self.logger.exception(_("Exception in top-level replication loop"))
except (Exception, Timeout) as err:
if dev_stats:
if current_nodes:
dev_stats.add_failure_stats(
[(failure_dev['replication_ip'],
failure_dev['device'])
for failure_dev in current_nodes])
else:
dev_stats.add_failure_stats(self.all_devs_info)
self.logger.exception(
_("Exception in top-level replication loop: %s"), err)
finally:
stats.kill()
self.stats_line()
self.stats['attempted'] = self.replication_count
def run_once(self, *args, **kwargs):
def update_recon(self, total, end_time, override_devices):
# Called at the end of a replication pass to update recon stats.
if self.is_multiprocess_worker:
# If it weren't for the failure_nodes field, we could do this as
# a bunch of shared memory using multiprocessing.Value, which
# would be nice because it'd avoid dealing with existing data
# during an upgrade.
update = {
'object_replication_per_disk': {
od: {'replication_stats':
self.stats_for_dev[od].to_recon(),
'replication_time': total,
'replication_last': end_time,
'object_replication_time': total,
'object_replication_last': end_time}
for od in override_devices}}
else:
update = {'replication_stats': self.total_stats.to_recon(),
'replication_time': total,
'replication_last': end_time,
'object_replication_time': total,
'object_replication_last': end_time}
dump_recon_cache(update, self.rcache, self.logger)
def aggregate_recon_update(self):
per_disk_stats = load_recon_cache(self.rcache).get(
'object_replication_per_disk', {})
recon_update = {}
min_repl_last = float('inf')
min_repl_time = float('inf')
# If every child has reported some stats, then aggregate things.
if all(ld in per_disk_stats for ld in self.all_local_devices):
aggregated = Stats()
for device_name, data in per_disk_stats.items():
aggregated += Stats.from_recon(data['replication_stats'])
min_repl_time = min(
min_repl_time, data['object_replication_time'])
min_repl_last = min(
min_repl_last, data['object_replication_last'])
recon_update['replication_stats'] = aggregated.to_recon()
recon_update['replication_last'] = min_repl_last
recon_update['replication_time'] = min_repl_time
recon_update['object_replication_last'] = min_repl_last
recon_update['object_replication_time'] = min_repl_time
# Clear out entries for old local devices that we no longer have
devices_to_remove = set(per_disk_stats) - set(self.all_local_devices)
if devices_to_remove:
recon_update['object_replication_per_disk'] = {
dtr: {} for dtr in devices_to_remove}
return recon_update
def run_once(self, multiprocess_worker_index=None,
have_overrides=False, *args, **kwargs):
if multiprocess_worker_index is not None:
self.is_multiprocess_worker = True
self._emplace_log_prefix(multiprocess_worker_index)
rsync_reaper = eventlet.spawn(self._child_process_reaper)
self._zero_stats()
self.logger.info(_("Running object replicator in script mode."))
override_devices = list_from_csv(kwargs.get('devices'))
override_partitions = list_from_csv(kwargs.get('partitions'))
override_policies = list_from_csv(kwargs.get('policies'))
if not override_devices:
override_devices = None
if not override_partitions:
override_partitions = None
if not override_policies:
override_policies = None
override_opts = parse_override_options(once=True, **kwargs)
devices = override_opts.devices or None
partitions = override_opts.partitions or None
policies = override_opts.policies or None
start_time = time.time()
self.replicate(
override_devices=override_devices,
override_partitions=override_partitions,
override_policies=override_policies)
total = (time.time() - self.stats['start']) / 60
override_devices=devices,
override_partitions=partitions,
override_policies=policies,
start_time=start_time)
end_time = time.time()
total = (end_time - start_time) / 60
self.logger.info(
_("Object replication complete (once). (%.02f minutes)"), total)
if not (override_partitions or override_devices):
replication_last = time.time()
dump_recon_cache({'replication_stats': self.stats,
'replication_time': total,
'replication_last': replication_last,
'object_replication_time': total,
'object_replication_last': replication_last},
self.rcache, self.logger)
# If we've been manually run on a subset of
# policies/devices/partitions, then our recon stats are not
# representative of how replication is doing, so we don't publish
# them.
if self.is_multiprocess_worker:
# The main process checked for overrides and determined that
# there were none
should_update_recon = not have_overrides
else:
# We are single-process, so update recon only if we worked on
# everything
should_update_recon = not (partitions or devices or policies)
if should_update_recon:
self.update_recon(total, end_time, devices)
# Give rsync processes one last chance to exit, then bail out and
# let them be init's problem
self._child_process_reaper_queue.put(None)
rsync_reaper.wait()
def run_forever(self, *args, **kwargs):
def run_forever(self, multiprocess_worker_index=None,
override_devices=None, *args, **kwargs):
if multiprocess_worker_index is not None:
self.is_multiprocess_worker = True
self._emplace_log_prefix(multiprocess_worker_index)
self.logger.info(_("Starting object replicator in daemon mode."))
eventlet.spawn_n(self._child_process_reaper)
# Run the replicator continually
@ -852,17 +1082,18 @@ class ObjectReplicator(Daemon):
self._zero_stats()
self.logger.info(_("Starting object replication pass."))
# Run the replicator
self.replicate()
total = (time.time() - self.stats['start']) / 60
start = time.time()
self.replicate(override_devices=override_devices)
end = time.time()
total = (end - start) / 60
self.logger.info(
_("Object replication complete. (%.02f minutes)"), total)
replication_last = time.time()
dump_recon_cache({'replication_stats': self.stats,
'replication_time': total,
'replication_last': replication_last,
'object_replication_time': total,
'object_replication_last': replication_last},
self.rcache, self.logger)
self.update_recon(total, end, override_devices)
self.logger.debug('Replication sleeping for %s seconds.',
self.interval)
sleep(self.interval)
def post_multiprocess_run(self):
# This method is called after run_once using multiple workers.
update = self.aggregate_recon_update()
dump_recon_cache(update, self.rcache, self.logger)

View File

@ -67,6 +67,10 @@ class TestDaemon(unittest.TestCase):
class MyWorkerDaemon(MyDaemon):
def __init__(self, *a, **kw):
super(MyWorkerDaemon, self).__init__(*a, **kw)
MyWorkerDaemon.post_multiprocess_run_called = False
def get_worker_args(self, once=False, **kwargs):
return [kwargs for i in range(int(self.conf.get('workers', 0)))]
@ -76,6 +80,9 @@ class MyWorkerDaemon(MyDaemon):
except IndexError:
return True
def post_multiprocess_run(self):
MyWorkerDaemon.post_multiprocess_run_called = True
class TestWorkerDaemon(unittest.TestCase):
@ -231,6 +238,7 @@ class TestRunDaemon(unittest.TestCase):
})
self.assertEqual([], self.mock_kill.call_args_list)
self.assertIn('Finished', d.logger.get_lines_for_level('notice')[-1])
self.assertTrue(MyWorkerDaemon.post_multiprocess_run_called)
def test_forked_worker(self):
d = MyWorkerDaemon({'workers': 3})

View File

@ -1533,6 +1533,25 @@ class TestUtils(unittest.TestCase):
finally:
rmtree(testdir_base)
def test_load_recon_cache(self):
stub_data = {'test': 'foo'}
with NamedTemporaryFile() as f:
f.write(json.dumps(stub_data).encode("utf-8"))
f.flush()
self.assertEqual(stub_data, utils.load_recon_cache(f.name))
# missing files are treated as empty
self.assertFalse(os.path.exists(f.name)) # sanity
self.assertEqual({}, utils.load_recon_cache(f.name))
# Corrupt files are treated as empty. We could crash and make an
# operator fix the corrupt file, but they'll "fix" it with "rm -f
# /var/cache/swift/*.recon", so let's just do it for them.
with NamedTemporaryFile() as f:
f.write(b"{not [valid (json")
f.flush()
self.assertEqual({}, utils.load_recon_cache(f.name))
def test_get_logger(self):
sio = StringIO()
logger = logging.getLogger('server')
@ -3557,21 +3576,51 @@ cluster_dfw1 = http://dfw1.host/v1/
utils.get_hmac('GET', '/path', 1, 'abc'),
'b17f6ff8da0e251737aa9e3ee69a881e3e092e2f')
def test_parse_overrides(self):
devices, partitions = utils.parse_overrides(devices='sdb1,sdb2')
self.assertIn('sdb1', devices)
self.assertIn('sdb2', devices)
self.assertNotIn('sdb3', devices)
self.assertIn(1, partitions)
self.assertIn('1', partitions) # matches because of Everything
self.assertIn(None, partitions) # matches because of Everything
devices, partitions = utils.parse_overrides(partitions='1,2,3')
self.assertIn('sdb1', devices)
self.assertIn('1', devices) # matches because of Everything
self.assertIn(None, devices) # matches because of Everything
self.assertIn(1, partitions)
self.assertNotIn('1', partitions)
self.assertNotIn(None, partitions)
def test_parse_override_options(self):
# When override_<thing> is passed in, it takes precedence.
opts = utils.parse_override_options(
override_policies=[0, 1],
override_devices=['sda', 'sdb'],
override_partitions=[100, 200],
policies='0,1,2,3',
devices='sda,sdb,sdc,sdd',
partitions='100,200,300,400')
self.assertEqual(opts.policies, [0, 1])
self.assertEqual(opts.devices, ['sda', 'sdb'])
self.assertEqual(opts.partitions, [100, 200])
# When override_<thing> is passed in, it applies even in run-once
# mode.
opts = utils.parse_override_options(
once=True,
override_policies=[0, 1],
override_devices=['sda', 'sdb'],
override_partitions=[100, 200],
policies='0,1,2,3',
devices='sda,sdb,sdc,sdd',
partitions='100,200,300,400')
self.assertEqual(opts.policies, [0, 1])
self.assertEqual(opts.devices, ['sda', 'sdb'])
self.assertEqual(opts.partitions, [100, 200])
# In run-once mode, we honor the passed-in overrides.
opts = utils.parse_override_options(
once=True,
policies='0,1,2,3',
devices='sda,sdb,sdc,sdd',
partitions='100,200,300,400')
self.assertEqual(opts.policies, [0, 1, 2, 3])
self.assertEqual(opts.devices, ['sda', 'sdb', 'sdc', 'sdd'])
self.assertEqual(opts.partitions, [100, 200, 300, 400])
# In run-forever mode, we ignore the passed-in overrides.
opts = utils.parse_override_options(
policies='0,1,2,3',
devices='sda,sdb,sdc,sdd',
partitions='100,200,300,400')
self.assertEqual(opts.policies, [])
self.assertEqual(opts.devices, [])
self.assertEqual(opts.partitions, [])
def test_get_policy_index(self):
# Account has no information about a policy

View File

@ -234,7 +234,8 @@ class TestObjectReplicator(unittest.TestCase):
self.conf = dict(
bind_ip=_ips()[0], bind_port=6200,
swift_dir=self.testdir, devices=self.devices, mount_check='false',
timeout='300', stats_interval='1', sync_method='rsync')
timeout='300', stats_interval='1', sync_method='rsync',
recon_cache_path=self.recon_cache)
self._create_replicator()
self.ts = make_timestamp_iter()
@ -370,7 +371,6 @@ class TestObjectReplicator(unittest.TestCase):
self.assertEqual((start + 1 + cycle) % 10,
replicator.replication_cycle)
self.assertEqual(0, replicator.stats['start'])
recon_fname = os.path.join(self.recon_cache, "object.recon")
with open(recon_fname) as cachefile:
recon = json.loads(cachefile.read())
@ -527,7 +527,7 @@ class TestObjectReplicator(unittest.TestCase):
_create_test_rings(self.testdir, devs)
self.replicator.collect_jobs()
self.assertEqual(self.replicator.stats['failure'], 0)
self.assertEqual(self.replicator.total_stats.failure, 0)
@mock.patch('swift.obj.replicator.random.shuffle', side_effect=lambda l: l)
def test_collect_jobs_multi_disk(self, mock_shuffle):
@ -876,7 +876,7 @@ class TestObjectReplicator(unittest.TestCase):
self.replicator.replicate()
# all jobs processed!
self.assertEqual(self.replicator.job_count,
self.replicator.replication_count)
self.replicator.total_stats.attempted)
self.assertFalse(self.replicator.handoffs_remaining)
# sanity, all the handoffs suffixes we filled in were rsync'd
@ -942,7 +942,7 @@ class TestObjectReplicator(unittest.TestCase):
# jobs may have been spawned into the pool before the failed
# update_deleted job incremented handoffs_remaining and caused the
# handoffs_first check to abort the current pass
self.assertLessEqual(self.replicator.replication_count,
self.assertLessEqual(self.replicator.total_stats.attempted,
2 + self.replicator.concurrency)
# sanity, all the handoffs suffixes we filled in were rsync'd
@ -979,10 +979,11 @@ class TestObjectReplicator(unittest.TestCase):
side_effect=_ips), \
mocked_http_conn(*[200] * 14, body=stub_body) as conn_log:
self.replicator.handoff_delete = 2
self.replicator._zero_stats()
self.replicator.replicate()
# all jobs processed!
self.assertEqual(self.replicator.job_count,
self.replicator.replication_count)
self.replicator.total_stats.attempted)
self.assertFalse(self.replicator.handoffs_remaining)
# sanity, all parts got replicated
found_replicate_calls = defaultdict(int)
@ -1418,6 +1419,11 @@ class TestObjectReplicator(unittest.TestCase):
self.assertFalse(os.access(pol1_part_path, os.F_OK))
self.assertTrue(os.access(pol0_part_path, os.F_OK))
# since we weren't operating on everything, but only a subset of
# storage policies, we didn't dump any recon stats.
self.assertFalse(os.path.exists(
os.path.join(self.recon_cache, 'object.recon')))
def test_delete_partition_ssync(self):
with mock.patch('swift.obj.replicator.http_connect',
mock_http_connect(200)):
@ -1735,7 +1741,7 @@ class TestObjectReplicator(unittest.TestCase):
with mock.patch.object(replicator, 'sync', fake_sync):
replicator.run_once()
log_lines = replicator.logger.get_lines_for_level('error')
log_lines = replicator.logger.logger.get_lines_for_level('error')
self.assertIn("Error syncing with node:", log_lines[0])
self.assertFalse(log_lines[1:])
# setup creates 4 partitions; partition 1 does not map to local dev id
@ -1770,7 +1776,7 @@ class TestObjectReplicator(unittest.TestCase):
# attempt to 16 times but succeeded only 15 times due to Timeout
suffix_hashes = sum(
count for (metric, count), _junk in
replicator.logger.log_dict['update_stats']
replicator.logger.logger.log_dict['update_stats']
if metric == 'suffix.hashes')
self.assertEqual(15, suffix_hashes)
@ -1801,7 +1807,8 @@ class TestObjectReplicator(unittest.TestCase):
self.replicator.suffix_count = 0
self.replicator.suffix_sync = 0
self.replicator.suffix_hash = 0
self.replicator.replication_count = 0
self.replicator.last_replication_count = 0
self.replicator._zero_stats()
self.replicator.partition_times = []
self.headers = {'Content-Length': '0',
@ -1926,10 +1933,11 @@ class TestObjectReplicator(unittest.TestCase):
reqs.append(mock.call(node, local_job, ['a83']))
fake_func.assert_has_calls(reqs, any_order=True)
self.assertEqual(fake_func.call_count, 2)
self.assertEqual(self.replicator.replication_count, 1)
self.assertEqual(self.replicator.suffix_sync, 2)
self.assertEqual(self.replicator.suffix_hash, 1)
self.assertEqual(self.replicator.suffix_count, 1)
stats = self.replicator.total_stats
self.assertEqual(stats.attempted, 1)
self.assertEqual(stats.suffix_sync, 2)
self.assertEqual(stats.suffix_hash, 1)
self.assertEqual(stats.suffix_count, 1)
# Efficient Replication Case
set_default(self)
@ -1945,10 +1953,11 @@ class TestObjectReplicator(unittest.TestCase):
# belong to another region
self.replicator.update(job)
self.assertEqual(fake_func.call_count, 1)
self.assertEqual(self.replicator.replication_count, 1)
self.assertEqual(self.replicator.suffix_sync, 1)
self.assertEqual(self.replicator.suffix_hash, 1)
self.assertEqual(self.replicator.suffix_count, 1)
stats = self.replicator.total_stats
self.assertEqual(stats.attempted, 1)
self.assertEqual(stats.suffix_sync, 1)
self.assertEqual(stats.suffix_hash, 1)
self.assertEqual(stats.suffix_count, 1)
mock_http.reset_mock()
self.logger.clear()
@ -2038,7 +2047,7 @@ class TestObjectReplicator(unittest.TestCase):
_create_test_rings(self.testdir, next_part_power=4)
self.replicator.replicate()
self.assertEqual(0, self.replicator.job_count)
self.assertEqual(0, self.replicator.replication_count)
self.assertEqual(0, self.replicator.total_stats.attempted)
warnings = self.logger.get_lines_for_level('warning')
self.assertIn(
"next_part_power set in policy 'one'. Skipping", warnings)
@ -2107,5 +2116,542 @@ class TestObjectReplicator(unittest.TestCase):
self.assertEqual(len(mock_procs), 2)
@patch_policies([StoragePolicy(0, 'zero', False),
StoragePolicy(1, 'one', True)])
class TestMultiProcessReplicator(unittest.TestCase):
def setUp(self):
# recon cache path
self.recon_cache = tempfile.mkdtemp()
rmtree(self.recon_cache, ignore_errors=1)
os.mkdir(self.recon_cache)
self.recon_file = os.path.join(self.recon_cache, 'object.recon')
bind_port = 6200
# Set up some rings
self.testdir = tempfile.mkdtemp()
_create_test_rings(self.testdir, devs=[
{'id': 0, 'device': 'sda', 'zone': 0,
'region': 1, 'ip': '127.0.0.1', 'port': bind_port},
{'id': 1, 'device': 'sdb', 'zone': 0,
'region': 1, 'ip': '127.0.0.1', 'port': bind_port},
{'id': 2, 'device': 'sdc', 'zone': 0,
'region': 1, 'ip': '127.0.0.1', 'port': bind_port},
{'id': 3, 'device': 'sdd', 'zone': 0,
'region': 1, 'ip': '127.0.0.1', 'port': bind_port},
{'id': 4, 'device': 'sde', 'zone': 0,
'region': 1, 'ip': '127.0.0.1', 'port': bind_port},
{'id': 100, 'device': 'notme0', 'zone': 0,
'region': 1, 'ip': '127.99.99.99', 'port': bind_port}])
self.logger = debug_logger('test-replicator')
self.conf = dict(
bind_ip='127.0.0.1', bind_port=bind_port,
swift_dir=self.testdir,
mount_check='false', recon_cache_path=self.recon_cache,
timeout='300', stats_interval='1', sync_method='rsync')
self.replicator = object_replicator.ObjectReplicator(
self.conf, logger=self.logger)
def tearDown(self):
self.assertFalse(process_errors)
rmtree(self.testdir, ignore_errors=1)
rmtree(self.recon_cache, ignore_errors=1)
def fake_replicate(self, override_devices, **kw):
# Faked-out replicate() method. Just updates the stats, but doesn't
# do any work.
for device in override_devices:
stats = self.replicator.stats_for_dev[device]
if device == 'sda':
stats.attempted = 1
stats.success = 10
stats.failure = 100
stats.hashmatch = 1000
stats.rsync = 10000
stats.remove = 100000
stats.suffix_count = 1000000
stats.suffix_hash = 10000000
stats.suffix_sync = 100000000
stats.failure_nodes = {
'10.1.1.1': {'d11': 1}}
elif device == 'sdb':
stats.attempted = 2
stats.success = 20
stats.failure = 200
stats.hashmatch = 2000
stats.rsync = 20000
stats.remove = 200000
stats.suffix_count = 2000000
stats.suffix_hash = 20000000
stats.suffix_sync = 200000000
stats.failure_nodes = {
'10.2.2.2': {'d22': 2}}
elif device == 'sdc':
stats.attempted = 3
stats.success = 30
stats.failure = 300
stats.hashmatch = 3000
stats.rsync = 30000
stats.remove = 300000
stats.suffix_count = 3000000
stats.suffix_hash = 30000000
stats.suffix_sync = 300000000
stats.failure_nodes = {
'10.3.3.3': {'d33': 3}}
elif device == 'sdd':
stats.attempted = 4
stats.success = 40
stats.failure = 400
stats.hashmatch = 4000
stats.rsync = 40000
stats.remove = 400000
stats.suffix_count = 4000000
stats.suffix_hash = 40000000
stats.suffix_sync = 400000000
stats.failure_nodes = {
'10.4.4.4': {'d44': 4}}
elif device == 'sde':
stats.attempted = 5
stats.success = 50
stats.failure = 500
stats.hashmatch = 5000
stats.rsync = 50000
stats.remove = 500000
stats.suffix_count = 5000000
stats.suffix_hash = 50000000
stats.suffix_sync = 500000000
stats.failure_nodes = {
'10.5.5.5': {'d55': 5}}
else:
raise Exception("mock can't handle %r" % device)
def test_no_multiprocessing(self):
self.replicator.replicator_workers = 0
self.assertEqual(self.replicator.get_worker_args(), [])
def test_device_distribution(self):
self.replicator.replicator_workers = 2
self.assertEqual(self.replicator.get_worker_args(), [{
'override_devices': ['sda', 'sdc', 'sde'],
'override_partitions': [],
'override_policies': [],
'have_overrides': False,
'multiprocess_worker_index': 0,
}, {
'override_devices': ['sdb', 'sdd'],
'override_partitions': [],
'override_policies': [],
'have_overrides': False,
'multiprocess_worker_index': 1,
}])
def test_override_policies(self):
self.replicator.replicator_workers = 2
args = self.replicator.get_worker_args(policies="3,5,7", once=True)
self.assertEqual(args, [{
'override_devices': ['sda', 'sdc', 'sde'],
'override_partitions': [],
'override_policies': [3, 5, 7],
'have_overrides': True,
'multiprocess_worker_index': 0,
}, {
'override_devices': ['sdb', 'sdd'],
'override_partitions': [],
'override_policies': [3, 5, 7],
'have_overrides': True,
'multiprocess_worker_index': 1,
}])
# override policies don't apply in run-forever mode
args = self.replicator.get_worker_args(policies="3,5,7", once=False)
self.assertEqual(args, [{
'override_devices': ['sda', 'sdc', 'sde'],
'override_partitions': [],
'override_policies': [],
'have_overrides': False,
'multiprocess_worker_index': 0,
}, {
'override_devices': ['sdb', 'sdd'],
'override_partitions': [],
'override_policies': [],
'have_overrides': False,
'multiprocess_worker_index': 1,
}])
def test_more_workers_than_disks(self):
self.replicator.replicator_workers = 999
self.assertEqual(self.replicator.get_worker_args(), [{
'override_devices': ['sda'],
'override_partitions': [],
'override_policies': [],
'have_overrides': False,
'multiprocess_worker_index': 0,
}, {
'override_devices': ['sdb'],
'override_partitions': [],
'override_policies': [],
'have_overrides': False,
'multiprocess_worker_index': 1,
}, {
'override_devices': ['sdc'],
'override_partitions': [],
'override_policies': [],
'have_overrides': False,
'multiprocess_worker_index': 2,
}, {
'override_devices': ['sdd'],
'override_partitions': [],
'override_policies': [],
'have_overrides': False,
'multiprocess_worker_index': 3,
}, {
'override_devices': ['sde'],
'override_partitions': [],
'override_policies': [],
'have_overrides': False,
'multiprocess_worker_index': 4,
}])
# Remember how many workers we actually have so that the log-line
# prefixes are reasonable. Otherwise, we'd have five workers, each
# logging lines starting with things like "[worker X/999 pid=P]"
# despite there being only five.
self.assertEqual(self.replicator.replicator_workers, 5)
def test_command_line_overrides(self):
self.replicator.replicator_workers = 2
args = self.replicator.get_worker_args(
devices="sda,sdc,sdd", partitions="12,34,56", once=True)
self.assertEqual(args, [{
'override_devices': ['sda', 'sdd'],
'override_partitions': [12, 34, 56],
'override_policies': [],
'have_overrides': True,
'multiprocess_worker_index': 0,
}, {
'override_devices': ['sdc'],
'override_partitions': [12, 34, 56],
'override_policies': [],
'have_overrides': True,
'multiprocess_worker_index': 1,
}])
args = self.replicator.get_worker_args(
devices="sda,sdc,sdd", once=True)
self.assertEqual(args, [{
'override_devices': ['sda', 'sdd'],
'override_partitions': [],
'override_policies': [],
'have_overrides': True,
'multiprocess_worker_index': 0,
}, {
'override_devices': ['sdc'],
'override_partitions': [],
'override_policies': [],
'have_overrides': True,
'multiprocess_worker_index': 1,
}])
# no overrides apply in run-forever mode
args = self.replicator.get_worker_args(
devices="sda,sdc,sdd", partitions="12,34,56", once=False)
self.assertEqual(args, [{
'override_devices': ['sda', 'sdc', 'sde'],
'override_partitions': [],
'override_policies': [],
'have_overrides': False,
'multiprocess_worker_index': 0,
}, {
'override_devices': ['sdb', 'sdd'],
'override_partitions': [],
'override_policies': [],
'have_overrides': False,
'multiprocess_worker_index': 1,
}])
def test_worker_logging(self):
self.replicator.replicator_workers = 3
def log_some_stuff(*a, **kw):
self.replicator.logger.debug("debug message")
self.replicator.logger.info("info message")
self.replicator.logger.warning("warning message")
self.replicator.logger.error("error message")
with mock.patch.object(self.replicator, 'replicate', log_some_stuff), \
mock.patch("os.getpid", lambda: 8804):
self.replicator.get_worker_args()
self.replicator.run_once(multiprocess_worker_index=0,
override_devices=['sda', 'sdb'])
prefix = "[worker 1/3 pid=8804] "
for level, lines in self.logger.logger.all_log_lines().items():
for line in lines:
self.assertTrue(
line.startswith(prefix),
"%r doesn't start with %r (level %s)" % (
line, prefix, level))
def test_recon_run_once(self):
self.replicator.replicator_workers = 3
the_time = [1521680000]
def mock_time():
rv = the_time[0]
the_time[0] += 120
return rv
# Simulate a couple child processes
with mock.patch.object(self.replicator, 'replicate',
self.fake_replicate), \
mock.patch('time.time', mock_time):
self.replicator.get_worker_args()
self.replicator.run_once(multiprocess_worker_index=0,
override_devices=['sda', 'sdb'])
self.replicator.run_once(multiprocess_worker_index=1,
override_devices=['sdc'])
self.replicator.run_once(multiprocess_worker_index=2,
override_devices=['sdd', 'sde'])
with open(self.recon_file) as fh:
recon_data = json.load(fh)
self.assertIn('object_replication_per_disk', recon_data)
self.assertIn('sda', recon_data['object_replication_per_disk'])
self.assertIn('sdb', recon_data['object_replication_per_disk'])
self.assertIn('sdc', recon_data['object_replication_per_disk'])
self.assertIn('sdd', recon_data['object_replication_per_disk'])
self.assertIn('sde', recon_data['object_replication_per_disk'])
sda = recon_data['object_replication_per_disk']['sda']
# Spot-check a couple of fields
self.assertEqual(sda['replication_stats']['attempted'], 1)
self.assertEqual(sda['replication_stats']['success'], 10)
self.assertEqual(sda['object_replication_time'], 2) # minutes
self.assertEqual(sda['object_replication_last'], 1521680120)
# Aggregate the workers' recon updates
self.replicator.post_multiprocess_run()
with open(self.recon_file) as fh:
recon_data = json.load(fh)
self.assertEqual(recon_data['replication_stats']['attempted'], 15)
self.assertEqual(recon_data['replication_stats']['failure'], 1500)
self.assertEqual(recon_data['replication_stats']['hashmatch'], 15000)
self.assertEqual(recon_data['replication_stats']['remove'], 1500000)
self.assertEqual(recon_data['replication_stats']['rsync'], 150000)
self.assertEqual(recon_data['replication_stats']['success'], 150)
self.assertEqual(recon_data['replication_stats']['suffix_count'],
15000000)
self.assertEqual(recon_data['replication_stats']['suffix_hash'],
150000000)
self.assertEqual(recon_data['replication_stats']['suffix_sync'],
1500000000)
self.assertEqual(recon_data['replication_stats']['failure_nodes'], {
'10.1.1.1': {'d11': 1},
'10.2.2.2': {'d22': 2},
'10.3.3.3': {'d33': 3},
'10.4.4.4': {'d44': 4},
'10.5.5.5': {'d55': 5},
})
self.assertEqual(recon_data['object_replication_time'], 2) # minutes
self.assertEqual(recon_data['object_replication_last'], 1521680120)
def test_recon_skipped_with_overrides(self):
self.replicator.replicator_workers = 3
the_time = [1521680000]
def mock_time():
rv = the_time[0]
the_time[0] += 120
return rv
with mock.patch.object(self.replicator, 'replicate',
self.fake_replicate), \
mock.patch('time.time', mock_time):
self.replicator.get_worker_args()
self.replicator.run_once(multiprocess_worker_index=0,
have_overrides=True,
override_devices=['sda', 'sdb'])
self.assertFalse(os.path.exists(self.recon_file))
# have_overrides=False makes us get recon stats
with mock.patch.object(self.replicator, 'replicate',
self.fake_replicate), \
mock.patch('time.time', mock_time):
self.replicator.get_worker_args()
self.replicator.run_once(multiprocess_worker_index=0,
have_overrides=False,
override_devices=['sda', 'sdb'])
with open(self.recon_file) as fh:
recon_data = json.load(fh)
self.assertIn('sda', recon_data['object_replication_per_disk'])
def test_recon_run_forever(self):
the_time = [1521521521.52152]
def mock_time():
rv = the_time[0]
the_time[0] += 120
return rv
self.replicator.replicator_workers = 2
self.replicator._next_rcache_update = the_time[0]
# One worker has finished a pass, the other hasn't.
with mock.patch.object(self.replicator, 'replicate',
self.fake_replicate), \
mock.patch('time.time', mock_time):
self.replicator.get_worker_args()
# Yes, this says run_once, but this is only to populate
# object.recon with some stats. The real test is for the
# aggregation.
self.replicator.run_once(multiprocess_worker_index=0,
override_devices=['sda', 'sdb', 'sdc'])
# This will not produce aggregate stats since not every device has
# finished a pass.
the_time[0] += self.replicator.stats_interval
with mock.patch('time.time', mock_time):
rv = self.replicator.is_healthy()
self.assertTrue(rv)
with open(self.recon_file) as fh:
recon_data = json.load(fh)
self.assertNotIn('replication_stats', recon_data)
# Now all the local devices have completed a replication pass, so we
# will produce aggregate stats.
with mock.patch.object(self.replicator, 'replicate',
self.fake_replicate), \
mock.patch('time.time', mock_time):
self.replicator.get_worker_args()
self.replicator.run_once(multiprocess_worker_index=1,
override_devices=['sdd', 'sde'])
the_time[0] += self.replicator.stats_interval
with mock.patch('time.time', mock_time):
rv = self.replicator.is_healthy()
self.assertTrue(rv)
with open(self.recon_file) as fh:
recon_data = json.load(fh)
self.assertIn('replication_stats', recon_data)
# no need to exhaustively check every sum
self.assertEqual(recon_data['replication_stats']['attempted'], 15)
self.assertEqual(recon_data['replication_stats']['success'], 150)
self.assertEqual(
recon_data['replication_last'],
min(pd['replication_last']
for pd in recon_data['object_replication_per_disk'].values()))
class TestReplicatorStats(unittest.TestCase):
def test_to_recon(self):
st = object_replicator.Stats(
attempted=1, failure=2, hashmatch=3, remove=4,
rsync=5, success=7,
suffix_count=8, suffix_hash=9, suffix_sync=10,
failure_nodes={'10.1.2.3': {'sda': 100, 'sdb': 200}})
# This is what appears in the recon dump
self.assertEqual(st.to_recon(), {
'attempted': 1,
'failure': 2,
'hashmatch': 3,
'remove': 4,
'rsync': 5,
'success': 7,
'suffix_count': 8,
'suffix_hash': 9,
'suffix_sync': 10,
'failure_nodes': {'10.1.2.3': {'sda': 100, 'sdb': 200}},
})
def test_recon_roundtrip(self):
before = object_replicator.Stats(
attempted=1, failure=2, hashmatch=3, remove=4,
rsync=5, success=7,
suffix_count=8, suffix_hash=9, suffix_sync=10,
failure_nodes={'10.1.2.3': {'sda': 100, 'sdb': 200}})
after = object_replicator.Stats.from_recon(before.to_recon())
self.assertEqual(after.attempted, before.attempted)
self.assertEqual(after.failure, before.failure)
self.assertEqual(after.hashmatch, before.hashmatch)
self.assertEqual(after.remove, before.remove)
self.assertEqual(after.rsync, before.rsync)
self.assertEqual(after.success, before.success)
self.assertEqual(after.suffix_count, before.suffix_count)
self.assertEqual(after.suffix_hash, before.suffix_hash)
self.assertEqual(after.suffix_sync, before.suffix_sync)
self.assertEqual(after.failure_nodes, before.failure_nodes)
def test_from_recon_skips_extra_fields(self):
# If another attribute ever sneaks its way in, we should ignore it.
# This will make aborted upgrades a little less painful for
# operators.
recon_dict = {'attempted': 1, 'failure': 2, 'hashmatch': 3,
'spices': 5, 'treasures': 8}
stats = object_replicator.Stats.from_recon(recon_dict)
self.assertEqual(stats.attempted, 1)
self.assertEqual(stats.failure, 2)
self.assertEqual(stats.hashmatch, 3)
# We don't gain attributes just because they're in object.recon.
self.assertFalse(hasattr(stats, 'spices'))
self.assertFalse(hasattr(stats, 'treasures'))
def test_add_failure_stats(self):
st = object_replicator.Stats()
st.add_failure_stats([('10.1.1.1', 'd10'), ('10.1.1.1', 'd11')])
st.add_failure_stats([('10.1.1.1', 'd10')])
st.add_failure_stats([('10.1.1.1', 'd12'), ('10.2.2.2', 'd20'),
('10.2.2.2', 'd21'), ('10.2.2.2', 'd21'),
('10.2.2.2', 'd21')])
self.assertEqual(st.failure, 8)
as_dict = st.to_recon()
self.assertEqual(as_dict['failure_nodes'], {
'10.1.1.1': {
'd10': 2,
'd11': 1,
'd12': 1,
},
'10.2.2.2': {
'd20': 1,
'd21': 3,
},
})
def test_add(self):
st1 = object_replicator.Stats(
attempted=1, failure=2, hashmatch=3, remove=4, rsync=5,
success=6, suffix_count=7, suffix_hash=8, suffix_sync=9,
failure_nodes={
'10.1.1.1': {'sda': 10, 'sdb': 20},
'10.1.1.2': {'sda': 10, 'sdb': 20}})
st2 = object_replicator.Stats(
attempted=2, failure=4, hashmatch=6, remove=8, rsync=10,
success=12, suffix_count=14, suffix_hash=16, suffix_sync=18,
failure_nodes={
'10.1.1.2': {'sda': 10, 'sdb': 20},
'10.1.1.3': {'sda': 10, 'sdb': 20}})
total = st1 + st2
self.assertEqual(total.attempted, 3)
self.assertEqual(total.failure, 6)
self.assertEqual(total.hashmatch, 9)
self.assertEqual(total.remove, 12)
self.assertEqual(total.rsync, 15)
self.assertEqual(total.success, 18)
self.assertEqual(total.suffix_count, 21)
self.assertEqual(total.suffix_hash, 24)
self.assertEqual(total.suffix_sync, 27)
self.assertEqual(total.failure_nodes, {
'10.1.1.1': {'sda': 10, 'sdb': 20},
'10.1.1.2': {'sda': 20, 'sdb': 40},
'10.1.1.3': {'sda': 10, 'sdb': 20},
})
if __name__ == '__main__':
unittest.main()

View File

@ -87,8 +87,9 @@ class TestSender(BaseTest):
def setUp(self):
skip_if_no_xattrs()
super(TestSender, self).setUp()
self.daemon_logger = debug_logger('test-ssync-sender')
self.daemon = ObjectReplicator(self.daemon_conf,
debug_logger('test-ssync-sender'))
self.daemon_logger)
job = {'policy': POLICIES.legacy} # sufficient for Sender.__init__
self.sender = ssync_sender.Sender(self.daemon, None, job, None)
@ -109,7 +110,7 @@ class TestSender(BaseTest):
success, candidates = self.sender()
self.assertFalse(success)
self.assertEqual(candidates, {})
error_lines = self.daemon.logger.get_lines_for_level('error')
error_lines = self.daemon_logger.get_lines_for_level('error')
self.assertEqual(1, len(error_lines))
self.assertEqual('1.2.3.4:5678/sda1/9 1 second: test connect',
error_lines[0])
@ -128,7 +129,7 @@ class TestSender(BaseTest):
success, candidates = self.sender()
self.assertFalse(success)
self.assertEqual(candidates, {})
error_lines = self.daemon.logger.get_lines_for_level('error')
error_lines = self.daemon_logger.get_lines_for_level('error')
self.assertEqual(1, len(error_lines))
self.assertEqual('1.2.3.4:5678/sda1/9 test connect',
error_lines[0])
@ -143,10 +144,10 @@ class TestSender(BaseTest):
success, candidates = self.sender()
self.assertFalse(success)
self.assertEqual(candidates, {})
error_lines = self.daemon.logger.get_lines_for_level('error')
error_lines = self.daemon_logger.get_lines_for_level('error')
for line in error_lines:
self.assertTrue(line.startswith(
'1.2.3.4:5678/sda1/9 EXCEPTION in ssync.Sender:'))
'1.2.3.4:5678/sda1/9 EXCEPTION in ssync.Sender: '))
def test_call_catches_exception_handling_exception(self):
self.sender.node = None # Will cause inside exception handler to fail
@ -155,7 +156,7 @@ class TestSender(BaseTest):
success, candidates = self.sender()
self.assertFalse(success)
self.assertEqual(candidates, {})
error_lines = self.daemon.logger.get_lines_for_level('error')
error_lines = self.daemon_logger.get_lines_for_level('error')
for line in error_lines:
self.assertTrue(line.startswith(
'EXCEPTION in ssync.Sender'))
@ -598,7 +599,7 @@ class TestSender(BaseTest):
success, candidates = self.sender()
self.assertFalse(success)
self.assertEqual(candidates, {})
error_lines = self.daemon.logger.get_lines_for_level('error')
error_lines = self.daemon_logger.get_lines_for_level('error')
for line in error_lines:
self.assertTrue(line.startswith(
'1.2.3.4:5678/sda1/9 0.01 seconds: connect send'))
@ -622,7 +623,7 @@ class TestSender(BaseTest):
success, candidates = self.sender()
self.assertFalse(success)
self.assertEqual(candidates, {})
error_lines = self.daemon.logger.get_lines_for_level('error')
error_lines = self.daemon_logger.get_lines_for_level('error')
for line in error_lines:
self.assertTrue(line.startswith(
'1.2.3.4:5678/sda1/9 0.02 seconds: connect receive'))
@ -650,7 +651,7 @@ class TestSender(BaseTest):
success, candidates = self.sender()
self.assertFalse(success)
self.assertEqual(candidates, {})
error_lines = self.daemon.logger.get_lines_for_level('error')
error_lines = self.daemon_logger.get_lines_for_level('error')
for line in error_lines:
self.assertTrue(line.startswith(
'1.2.3.4:5678/sda1/9 Expected status 200; got 503'))