From c28004deb0e938a9a9532c9c2e0f3197b6e572cb Mon Sep 17 00:00:00 2001 From: Samuel Merritt Date: Thu, 22 Mar 2018 17:08:48 -0700 Subject: [PATCH] Multiprocess object replicator Add a multiprocess mode to the object replicator. Setting the "replicator_workers" setting to a positive value N will result in the replicator using up to N worker processes to perform replication tasks. At most one worker per disk will be spawned, so one can set replicator_workers=99999999 to always get one worker per disk regardless of the number of disks in each node. This is the same behavior that the object reconstructor has. Worker process logs will have a bit of information prepended so operators can tell which messages came from which worker. It looks like this: [worker 1/2 pid=16529] 154/154 (100.00%) partitions replicated in 1.02s (150.87/sec, 0s remaining) The prefix is "[worker M/N pid=P] ", where M is the worker's index, N is the total number of workers, and P is the process ID. Every message from the replicator's logger will have the prefix; this includes messages from down in diskfile, but does not include things printed to stdout or stderr. Drive-by fix: don't dump recon stats when replicating only certain policies. When running the object replicator with replicator_workers > 0 and "--policies=X,Y,Z", the replicator would update recon stats after running. Since it only ran on a subset of objects, it should not update recon, much like it doesn't update recon when run with --devices or --partitions. Change-Id: I6802a9ad9f1f9b9dafb99d8b095af0fdbf174dc5 --- doc/source/deployment_guide.rst | 10 +- etc/object-server.conf-sample | 9 + swift/common/daemon.py | 11 + swift/common/db_replicator.py | 8 +- swift/common/utils.py | 139 +++++-- swift/obj/reconstructor.py | 75 ++-- swift/obj/replicator.py | 415 +++++++++++++++----- test/unit/common/test_daemon.py | 8 + test/unit/common/test_utils.py | 79 +++- test/unit/obj/test_replicator.py | 582 ++++++++++++++++++++++++++++- test/unit/obj/test_ssync_sender.py | 19 +- 11 files changed, 1144 insertions(+), 211 deletions(-) diff --git a/doc/source/deployment_guide.rst b/doc/source/deployment_guide.rst index 3ea2344a47..d740b32c20 100644 --- a/doc/source/deployment_guide.rst +++ b/doc/source/deployment_guide.rst @@ -692,8 +692,14 @@ daemonize yes Whether or not to run rep as a daemon interval 30 Time in seconds to wait between replication passes -concurrency 1 Number of replication workers to - spawn +concurrency 1 Number of replication jobs to + run per worker process +replicator_workers 0 Number of worker processes to use. + No matter how big this number is, + at most one worker per disk will + be used. The default value of 0 + means no forking; all work is done + in the main process. sync_method rsync The sync method to use; default is rsync but you can use ssync to try the EXPERIMENTAL diff --git a/etc/object-server.conf-sample b/etc/object-server.conf-sample index 8aae6ba457..3f03c23e65 100644 --- a/etc/object-server.conf-sample +++ b/etc/object-server.conf-sample @@ -225,7 +225,16 @@ use = egg:swift#recon # run_pause is deprecated, use interval instead # run_pause = 30 # +# Number of concurrent replication jobs to run. This is per-process, +# so replicator_workers=W and concurrency=C will result in W*C +# replication jobs running at once. # concurrency = 1 +# +# Number of worker processes to use. No matter how big this number is, +# at most one worker per disk will be used. 0 means no forking; all work +# is done in the main process. +# replicator_workers = 0 +# # stats_interval = 300 # # default is rsync, alternative is ssync diff --git a/swift/common/daemon.py b/swift/common/daemon.py index 32e6b696b7..9eee9419d2 100644 --- a/swift/common/daemon.py +++ b/swift/common/daemon.py @@ -64,6 +64,16 @@ class Daemon(object): else: self.run_forever(**kwargs) + def post_multiprocess_run(self): + """ + Override this to do something after running using multiple worker + processes. This method is called in the parent process. + + This is probably only useful for run-once mode since there is no + "after running" in run-forever mode. + """ + pass + def get_worker_args(self, once=False, **kwargs): """ For each worker yield a (possibly empty) dict of kwargs to pass along @@ -229,6 +239,7 @@ class DaemonStrategy(object): self.logger.notice('Finished %s', os.getpid()) break time.sleep(0.1) + self.daemon.post_multiprocess_run() return 0 def cleanup(self): diff --git a/swift/common/db_replicator.py b/swift/common/db_replicator.py index 3b456cd8b8..c464341b21 100644 --- a/swift/common/db_replicator.py +++ b/swift/common/db_replicator.py @@ -33,7 +33,7 @@ from swift.common.direct_client import quote from swift.common.utils import get_logger, whataremyips, storage_directory, \ renamer, mkdirs, lock_parent_directory, config_true_value, \ unlink_older_than, dump_recon_cache, rsync_module_interpolation, \ - json, Timestamp, parse_overrides, round_robin_iter + json, Timestamp, parse_override_options, round_robin_iter, Everything from swift.common import ring from swift.common.ring.utils import is_local_device from swift.common.http import HTTP_NOT_FOUND, HTTP_INSUFFICIENT_STORAGE @@ -655,8 +655,10 @@ class Replicator(Daemon): def run_once(self, *args, **kwargs): """Run a replication pass once.""" - devices_to_replicate, partitions_to_replicate = parse_overrides( - **kwargs) + override_options = parse_override_options(once=True, **kwargs) + + devices_to_replicate = override_options.devices or Everything() + partitions_to_replicate = override_options.partitions or Everything() self._zero_stats() dirs = [] diff --git a/swift/common/utils.py b/swift/common/utils.py index a177d523c1..709886cfeb 100644 --- a/swift/common/utils.py +++ b/swift/common/utils.py @@ -19,6 +19,7 @@ from __future__ import print_function import base64 import binascii +import collections import errno import fcntl import grp @@ -1697,6 +1698,51 @@ def timing_stats(**dec_kwargs): return decorating_func +class SwiftLoggerAdapter(logging.LoggerAdapter): + """ + A logging.LoggerAdapter subclass that also passes through StatsD method + calls. + + Like logging.LoggerAdapter, you have to subclass this and override the + process() method to accomplish anything useful. + """ + def update_stats(self, *a, **kw): + return self.logger.update_stats(*a, **kw) + + def increment(self, *a, **kw): + return self.logger.increment(*a, **kw) + + def decrement(self, *a, **kw): + return self.logger.decrement(*a, **kw) + + def timing(self, *a, **kw): + return self.logger.timing(*a, **kw) + + def timing_since(self, *a, **kw): + return self.logger.timing_since(*a, **kw) + + def transfer_rate(self, *a, **kw): + return self.logger.transfer_rate(*a, **kw) + + +class PrefixLoggerAdapter(SwiftLoggerAdapter): + """ + Adds an optional prefix to all its log messages. When the prefix has not + been set, messages are unchanged. + """ + def set_prefix(self, prefix): + self.extra['prefix'] = prefix + + def exception(self, *a, **kw): + self.logger.exception(*a, **kw) + + def process(self, msg, kwargs): + msg, kwargs = super(PrefixLoggerAdapter, self).process(msg, kwargs) + if 'prefix' in self.extra: + msg = self.extra['prefix'] + msg + return (msg, kwargs) + + # double inheritance to support property with setter class LogAdapter(logging.LoggerAdapter, object): """ @@ -3262,8 +3308,24 @@ def dump_recon_cache(cache_dict, cache_file, logger, lock_timeout=2, except OSError as err: if err.errno != errno.ENOENT: raise - except (Exception, Timeout): - logger.exception(_('Exception dumping recon cache')) + except (Exception, Timeout) as err: + logger.exception('Exception dumping recon cache: %s' % err) + + +def load_recon_cache(cache_file): + """ + Load a recon cache file. Treats missing file as empty. + """ + try: + with open(cache_file) as fh: + return json.load(fh) + except IOError as e: + if e.errno == errno.ENOENT: + return {} + else: + raise + except ValueError: # invalid JSON + return {} def listdir(path): @@ -3436,27 +3498,6 @@ def list_from_csv(comma_separated_str): return [] -def parse_overrides(devices='', partitions='', **kwargs): - """ - Given daemon kwargs parse out device and partition overrides or Everything. - - :returns: a tuple of (devices, partitions) which an used like containers to - check if a given partition (integer) or device (string) is "in" - the collection on which we should act. - """ - devices = list_from_csv(devices) - if not devices: - devices = Everything() - - partitions = [ - int(part) for part in - list_from_csv(partitions)] - if not partitions: - partitions = Everything() - - return devices, partitions - - def csv_append(csv_string, item): """ Appends an item to a comma-separated string. @@ -4647,3 +4688,55 @@ def round_robin_iter(its): yield next(it) except StopIteration: its.remove(it) + + +OverrideOptions = collections.namedtuple( + 'OverrideOptions', ['devices', 'partitions', 'policies']) + + +def parse_override_options(**kwargs): + """ + Figure out which policies, devices, and partitions we should operate on, + based on kwargs. + + If 'override_policies' is already present in kwargs, then return that + value. This happens when using multiple worker processes; the parent + process supplies override_policies=X to each child process. + + Otherwise, in run-once mode, look at the 'policies' keyword argument. + This is the value of the "--policies" command-line option. In + run-forever mode or if no --policies option was provided, an empty list + will be returned. + + The procedures for devices and partitions are similar. + + :returns: a named tuple with fields "devices", "partitions", and + "policies". + """ + run_once = kwargs.get('once', False) + + if 'override_policies' in kwargs: + policies = kwargs['override_policies'] + elif run_once: + policies = [ + int(p) for p in list_from_csv(kwargs.get('policies'))] + else: + policies = [] + + if 'override_devices' in kwargs: + devices = kwargs['override_devices'] + elif run_once: + devices = list_from_csv(kwargs.get('devices')) + else: + devices = [] + + if 'override_partitions' in kwargs: + partitions = kwargs['override_partitions'] + elif run_once: + partitions = [ + int(p) for p in list_from_csv(kwargs.get('partitions'))] + else: + partitions = [] + + return OverrideOptions(devices=devices, partitions=partitions, + policies=policies) diff --git a/swift/obj/reconstructor.py b/swift/obj/reconstructor.py index e1b230d194..4abe0ea56e 100644 --- a/swift/obj/reconstructor.py +++ b/swift/obj/reconstructor.py @@ -32,8 +32,9 @@ from eventlet.support.greenlets import GreenletExit from swift import gettext_ as _ from swift.common.utils import ( whataremyips, unlink_older_than, compute_eta, get_logger, - dump_recon_cache, mkdirs, config_true_value, list_from_csv, - tpool_reraise, GreenAsyncPile, Timestamp, remove_file) + dump_recon_cache, mkdirs, config_true_value, + tpool_reraise, GreenAsyncPile, Timestamp, remove_file, + load_recon_cache, parse_override_options) from swift.common.header_key_dict import HeaderKeyDict from swift.common.bufferedhttp import http_connect from swift.common.daemon import Daemon @@ -90,30 +91,6 @@ def _full_path(node, part, relative_path, policy): } -def parse_override_options(**kwargs): - """ - Return a dict with keys `override_devices` and `override_partitions` whose - values have been parsed from `kwargs`. If either key is found in `kwargs` - then copy its value from kwargs. Otherwise, if `once` is set in `kwargs` - then parse `devices` and `partitions` keys for the value of - `override_devices` and `override_partitions` respectively. - - :return: a dict with keys `override_devices` and `override_partitions` - """ - if kwargs.get('once', False): - devices = list_from_csv(kwargs.get('devices')) - partitions = [ - int(p) for p in list_from_csv(kwargs.get('partitions'))] - else: - devices = [] - partitions = [] - - return { - 'override_devices': kwargs.get('override_devices', devices), - 'override_partitions': kwargs.get('override_partitions', partitions), - } - - class RebuildingECDiskFileStream(object): """ This class wraps the reconstructed fragment archive data and @@ -236,19 +213,20 @@ class ObjectReconstructor(Daemon): """ if self.reconstructor_workers < 1: return - override_options = parse_override_options(once=once, **kwargs) + override_opts = parse_override_options(once=once, **kwargs) # Note that this get re-used when dumping stats and in is_healthy self.all_local_devices = self.get_local_devices() - if override_options['override_devices']: - devices = [d for d in override_options['override_devices'] + if override_opts.devices: + devices = [d for d in override_opts.devices if d in self.all_local_devices] else: devices = list(self.all_local_devices) if not devices: # we only need a single worker to do nothing until a ring change - yield dict(override_options) + yield dict(override_devices=override_opts.devices, + override_partitions=override_opts.partitions) return # for somewhat uniform load per worker use same max_devices_per_worker # when handling all devices or just override devices... @@ -257,8 +235,9 @@ class ObjectReconstructor(Daemon): # ...but only use enough workers for the actual devices being handled n = int(math.ceil(1.0 * len(devices) / max_devices_per_worker)) override_devices_per_worker = [devices[i::n] for i in range(n)] - for override_devices in override_devices_per_worker: - yield dict(override_options, override_devices=override_devices) + for override_devices_pw in override_devices_per_worker: + yield dict(override_devices=override_devices_pw, + override_partitions=override_opts.partitions) def is_healthy(self): """ @@ -276,14 +255,7 @@ class ObjectReconstructor(Daemon): """ Aggregate per-disk rcache updates from child workers. """ - try: - with open(self.rcache) as f: - existing_data = json.load(f) - except IOError as e: - if e.errno != errno.ENOENT: - raise - # dump_recon_cache will create new file and dirs - existing_data = {} + existing_data = load_recon_cache(self.rcache) first_start = time.time() last_finish = 0 all_devices_reporting = True @@ -1247,18 +1219,20 @@ class ObjectReconstructor(Daemon): def run_once(self, *args, **kwargs): start = time.time() self.logger.info(_("Running object reconstructor in script mode.")) - override_options = parse_override_options(once=True, **kwargs) - self.reconstruct(**override_options) + override_opts = parse_override_options(once=True, **kwargs) + self.reconstruct(override_devices=override_opts.devices, + override_partitions=override_opts.partitions) total = (time.time() - start) / 60 self.logger.info( _("Object reconstruction complete (once). (%.02f minutes)"), total) # Only dump stats if they would actually be meaningful -- i.e. we're # collecting per-disk stats and covering all partitions, or we're # covering all partitions, all disks. - if not override_options['override_partitions'] and ( - self.reconstructor_workers > 0 or - not override_options['override_devices']): - self.final_recon_dump(total, **override_options) + if not override_opts.partitions and ( + self.reconstructor_workers > 0 or not override_opts.devices): + self.final_recon_dump( + total, override_devices=override_opts.devices, + override_partitions=override_opts.partitions) def run_forever(self, *args, **kwargs): self.logger.info(_("Starting object reconstructor in daemon mode.")) @@ -1266,13 +1240,16 @@ class ObjectReconstructor(Daemon): while True: start = time.time() self.logger.info(_("Starting object reconstruction pass.")) - override_options = parse_override_options(**kwargs) + override_opts = parse_override_options(**kwargs) # Run the reconstructor - self.reconstruct(**override_options) + self.reconstruct(override_devices=override_opts.devices, + override_partitions=override_opts.partitions) total = (time.time() - start) / 60 self.logger.info( _("Object reconstruction complete. (%.02f minutes)"), total) - self.final_recon_dump(total, **override_options) + self.final_recon_dump( + total, override_devices=override_opts.devices, + override_partitions=override_opts.partitions) self.logger.debug('reconstruction sleeping for %s seconds.', self.interval) sleep(self.interval) diff --git a/swift/obj/replicator.py b/swift/obj/replicator.py index 01354f987a..ba1e3b8a78 100644 --- a/swift/obj/replicator.py +++ b/swift/obj/replicator.py @@ -13,6 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. +from collections import defaultdict import os import errno from os.path import isdir, isfile, join, dirname @@ -32,8 +33,9 @@ from swift.common.constraints import check_drive from swift.common.ring.utils import is_local_device from swift.common.utils import whataremyips, unlink_older_than, \ compute_eta, get_logger, dump_recon_cache, \ - rsync_module_interpolation, mkdirs, config_true_value, list_from_csv, \ - tpool_reraise, config_auto_int_value, storage_directory + rsync_module_interpolation, mkdirs, config_true_value, \ + tpool_reraise, config_auto_int_value, storage_directory, \ + load_recon_cache, PrefixLoggerAdapter, parse_override_options from swift.common.bufferedhttp import http_connect from swift.common.daemon import Daemon from swift.common.http import HTTP_OK, HTTP_INSUFFICIENT_STORAGE @@ -48,6 +50,68 @@ def _do_listdir(partition, replication_cycle): return (((partition + replication_cycle) % 10) == 0) +class Stats(object): + fields = ['attempted', 'failure', 'hashmatch', 'remove', 'rsync', + 'success', 'suffix_count', 'suffix_hash', 'suffix_sync', + 'failure_nodes'] + + @classmethod + def from_recon(cls, dct): + return cls(**{k: v for k, v in dct.items() if k in cls.fields}) + + def to_recon(self): + return {k: getattr(self, k) for k in self.fields} + + def __init__(self, attempted=0, failure=0, hashmatch=0, remove=0, rsync=0, + success=0, suffix_count=0, suffix_hash=0, + suffix_sync=0, failure_nodes=None): + self.attempted = attempted + self.failure = failure + self.hashmatch = hashmatch + self.remove = remove + self.rsync = rsync + self.success = success + self.suffix_count = suffix_count + self.suffix_hash = suffix_hash + self.suffix_sync = suffix_sync + self.failure_nodes = defaultdict(lambda: defaultdict(int), + (failure_nodes or {})) + + def __add__(self, other): + total = type(self)() + total.attempted = self.attempted + other.attempted + total.failure = self.failure + other.failure + total.hashmatch = self.hashmatch + other.hashmatch + total.remove = self.remove + other.remove + total.rsync = self.rsync + other.rsync + total.success = self.success + other.success + total.suffix_count = self.suffix_count + other.suffix_count + total.suffix_hash = self.suffix_hash + other.suffix_hash + total.suffix_sync = self.suffix_sync + other.suffix_sync + + all_failed_ips = (set(self.failure_nodes.keys() + + other.failure_nodes.keys())) + for ip in all_failed_ips: + self_devs = self.failure_nodes.get(ip, {}) + other_devs = other.failure_nodes.get(ip, {}) + this_ip_failures = {} + for dev in set(self_devs.keys() + other_devs.keys()): + this_ip_failures[dev] = ( + self_devs.get(dev, 0) + other_devs.get(dev, 0)) + total.failure_nodes[ip] = this_ip_failures + return total + + def add_failure_stats(self, failures): + """ + Note the failure of one or more devices. + + :param failures: a list of (ip, device-name) pairs that failed + """ + self.failure += len(failures) + for ip, device in failures: + self.failure_nodes[ip][device] += 1 + + class ObjectReplicator(Daemon): """ Replicate objects. @@ -63,7 +127,8 @@ class ObjectReplicator(Daemon): :param logger: logging object """ self.conf = conf - self.logger = logger or get_logger(conf, log_route='object-replicator') + self.logger = PrefixLoggerAdapter( + logger or get_logger(conf, log_route='object-replicator'), {}) self.devices_dir = conf.get('devices', '/srv/node') self.mount_check = config_true_value(conf.get('mount_check', 'true')) self.swift_dir = conf.get('swift_dir', '/etc/swift') @@ -72,6 +137,7 @@ class ObjectReplicator(Daemon): self.port = None if self.servers_per_port else \ int(conf.get('bind_port', 6200)) self.concurrency = int(conf.get('concurrency', 1)) + self.replicator_workers = int(conf.get('replicator_workers', 0)) self.stats_interval = int(conf.get('stats_interval', '300')) self.ring_check_interval = int(conf.get('ring_check_interval', 15)) self.next_check = time.time() + self.ring_check_interval @@ -92,6 +158,7 @@ class ObjectReplicator(Daemon): self.recon_cache_path = conf.get('recon_cache_path', '/var/cache/swift') self.rcache = os.path.join(self.recon_cache_path, "object.recon") + self._next_rcache_update = time.time() + self.stats_interval self.conn_timeout = float(conf.get('conn_timeout', 0.5)) self.node_timeout = float(conf.get('node_timeout', 10)) self.sync_method = getattr(self, conf.get('sync_method') or 'rsync') @@ -110,21 +177,22 @@ class ObjectReplicator(Daemon): 'operation, please disable handoffs_first and ' 'handoff_delete before the next ' 'normal rebalance') + self.is_multiprocess_worker = None self._df_router = DiskFileRouter(conf, self.logger) self._child_process_reaper_queue = queue.LightQueue() def _zero_stats(self): - """Zero out the stats.""" - self.stats = {'attempted': 0, 'success': 0, 'failure': 0, - 'hashmatch': 0, 'rsync': 0, 'remove': 0, - 'start': time.time(), 'failure_nodes': {}} + self.stats_for_dev = defaultdict(Stats) - def _add_failure_stats(self, failure_devs_info): - for node, dev in failure_devs_info: - self.stats['failure'] += 1 - failure_devs = self.stats['failure_nodes'].setdefault(node, {}) - failure_devs.setdefault(dev, 0) - failure_devs[dev] += 1 + @property + def total_stats(self): + return sum(self.stats_for_dev.values(), Stats()) + + def _emplace_log_prefix(self, worker_index): + self.logger.set_prefix("[worker %d/%d pid=%d] " % ( + worker_index + 1, # use 1-based indexing for more readable logs + self.replicator_workers, + os.getpid())) def _get_my_replication_ips(self): my_replication_ips = set() @@ -167,6 +235,87 @@ class ObjectReplicator(Daemon): pass procs -= reaped_procs + def get_worker_args(self, once=False, **kwargs): + if self.replicator_workers < 1: + return [] + + override_opts = parse_override_options(once=once, **kwargs) + have_overrides = bool(override_opts.devices or override_opts.partitions + or override_opts.policies) + + # save this off for ring-change detection later in is_healthy() + self.all_local_devices = self.get_local_devices() + + if override_opts.devices: + devices_to_replicate = [ + d for d in override_opts.devices + if d in self.all_local_devices] + else: + # The sort isn't strictly necessary since we're just trying to + # spread devices around evenly, but it makes testing easier. + devices_to_replicate = sorted(self.all_local_devices) + + # Distribute devices among workers as evenly as possible + self.replicator_workers = min(self.replicator_workers, + len(devices_to_replicate)) + worker_args = [ + { + 'override_devices': [], + 'override_partitions': override_opts.partitions, + 'override_policies': override_opts.policies, + 'have_overrides': have_overrides, + 'multiprocess_worker_index': i, + } + for i in range(self.replicator_workers)] + for index, device in enumerate(devices_to_replicate): + idx = index % self.replicator_workers + worker_args[idx]['override_devices'].append(device) + return worker_args + + def is_healthy(self): + """ + Check whether our set of local devices remains the same. + + If devices have been added or removed, then we return False here so + that we can kill off any worker processes and then distribute the + new set of local devices across a new set of workers so that all + devices are, once again, being worked on. + + This function may also cause recon stats to be updated. + + :returns: False if any local devices have been added or removed, + True otherwise + """ + # We update recon here because this is the only function we have in + # a multiprocess replicator that gets called periodically in the + # parent process. + if time.time() >= self._next_rcache_update: + update = self.aggregate_recon_update() + dump_recon_cache(update, self.rcache, self.logger) + return self.get_local_devices() == self.all_local_devices + + def get_local_devices(self): + """ + Returns a set of all local devices in all replication-type storage + policies. + + This is the device names, e.g. "sdq" or "d1234" or something, not + the full ring entries. + """ + ips = whataremyips(self.bind_ip) + local_devices = set() + for policy in POLICIES: + if policy.policy_type != REPL_POLICY: + continue + self.load_object_ring(policy) + for device in policy.object_ring.devs: + if device and is_local_device( + ips, self.port, + device['replication_ip'], + device['replication_port']): + local_devices.add(device['device']) + return local_devices + # Just exists for doc anchor point def sync(self, node, job, suffixes, *args, **kwargs): """ @@ -319,7 +468,9 @@ class ObjectReplicator(Daemon): def tpool_get_suffixes(path): return [suff for suff in os.listdir(path) if len(suff) == 3 and isdir(join(path, suff))] - self.replication_count += 1 + + stats = self.stats_for_dev[job['device']] + stats.attempted += 1 self.logger.increment('partition.delete.count.%s' % (job['device'],)) headers = dict(self.default_headers) headers['X-Backend-Storage-Policy-Index'] = int(job['policy']) @@ -333,7 +484,7 @@ class ObjectReplicator(Daemon): delete_objs = None if suffixes: for node in job['nodes']: - self.stats['rsync'] += 1 + stats.rsync += 1 kwargs = {} if node['region'] in synced_remote_regions and \ self.conf.get('sync_method', 'rsync') == 'ssync': @@ -373,7 +524,7 @@ class ObjectReplicator(Daemon): delete_handoff = len(responses) == len(job['nodes']) and \ all(responses) if delete_handoff: - self.stats['remove'] += 1 + stats.remove += 1 if (self.conf.get('sync_method', 'rsync') == 'ssync' and delete_objs is not None): self.logger.info(_("Removing %s objects"), @@ -398,12 +549,12 @@ class ObjectReplicator(Daemon): handoff_partition_deleted = True except (Exception, Timeout): self.logger.exception(_("Error syncing handoff partition")) - self._add_failure_stats(failure_devs_info) + stats.add_failure_stats(failure_devs_info) finally: target_devs_info = set([(target_dev['replication_ip'], target_dev['device']) for target_dev in job['nodes']]) - self.stats['success'] += len(target_devs_info - failure_devs_info) + stats.success += len(target_devs_info - failure_devs_info) if not handoff_partition_deleted: self.handoffs_remaining += 1 self.partition_times.append(time.time() - begin) @@ -438,7 +589,8 @@ class ObjectReplicator(Daemon): :param job: a dict containing info about the partition to be replicated """ - self.replication_count += 1 + stats = self.stats_for_dev[job['device']] + stats.attempted += 1 self.logger.increment('partition.update.count.%s' % (job['device'],)) headers = dict(self.default_headers) headers['X-Backend-Storage-Policy-Index'] = int(job['policy']) @@ -453,7 +605,7 @@ class ObjectReplicator(Daemon): do_listdir=_do_listdir( int(job['partition']), self.replication_cycle)) - self.suffix_hash += hashed + stats.suffix_hash += hashed self.logger.update_stats('suffix.hashes', hashed) attempts_left = len(job['nodes']) synced_remote_regions = set() @@ -499,7 +651,7 @@ class ObjectReplicator(Daemon): local_hash[suffix] != remote_hash.get(suffix, -1)] if not suffixes: - self.stats['hashmatch'] += 1 + stats.hashmatch += 1 continue hashed, recalc_hash = tpool_reraise( df_mgr._get_hashes, @@ -510,7 +662,7 @@ class ObjectReplicator(Daemon): suffixes = [suffix for suffix in local_hash if local_hash[suffix] != remote_hash.get(suffix, -1)] - self.stats['rsync'] += 1 + stats.rsync += 1 success, _junk = self.sync(node, job, suffixes) with Timeout(self.http_timeout): conn = http_connect( @@ -525,14 +677,14 @@ class ObjectReplicator(Daemon): # add only remote region when replicate succeeded if success and node['region'] != job['region']: synced_remote_regions.add(node['region']) - self.suffix_sync += len(suffixes) + stats.suffix_sync += len(suffixes) self.logger.update_stats('suffix.syncs', len(suffixes)) except (Exception, Timeout): failure_devs_info.add((node['replication_ip'], node['device'])) self.logger.exception(_("Error syncing with node: %s") % node) - self.suffix_count += len(local_hash) + stats.suffix_count += len(local_hash) except StopIteration: self.logger.error('Ran out of handoffs while replicating ' 'partition %s of policy %d', @@ -541,8 +693,8 @@ class ObjectReplicator(Daemon): failure_devs_info.update(target_devs_info) self.logger.exception(_("Error syncing partition")) finally: - self._add_failure_stats(failure_devs_info) - self.stats['success'] += len(target_devs_info - failure_devs_info) + stats.add_failure_stats(failure_devs_info) + stats.success += len(target_devs_info - failure_devs_info) self.partition_times.append(time.time() - begin) self.logger.timing_since('partition.update.timing', begin) @@ -550,29 +702,35 @@ class ObjectReplicator(Daemon): """ Logs various stats for the currently running replication pass. """ - if self.replication_count: + stats = self.total_stats + replication_count = stats.attempted + if replication_count > self.last_replication_count: + self.last_replication_count = replication_count elapsed = (time.time() - self.start) or 0.000001 - rate = self.replication_count / elapsed + rate = replication_count / elapsed self.logger.info( _("%(replicated)d/%(total)d (%(percentage).2f%%)" " partitions replicated in %(time).2fs (%(rate).2f/sec, " "%(remaining)s remaining)"), - {'replicated': self.replication_count, 'total': self.job_count, - 'percentage': self.replication_count * 100.0 / self.job_count, + {'replicated': replication_count, 'total': self.job_count, + 'percentage': replication_count * 100.0 / self.job_count, 'time': time.time() - self.start, 'rate': rate, 'remaining': '%d%s' % compute_eta(self.start, - self.replication_count, + replication_count, self.job_count)}) self.logger.info(_('%(success)s successes, %(failure)s failures') - % self.stats) + % dict(success=stats.success, + failure=stats.failure)) - if self.suffix_count: + if stats.suffix_count: self.logger.info( _("%(checked)d suffixes checked - " "%(hashed).2f%% hashed, %(synced).2f%% synced"), - {'checked': self.suffix_count, - 'hashed': (self.suffix_hash * 100.0) / self.suffix_count, - 'synced': (self.suffix_sync * 100.0) / self.suffix_count}) + {'checked': stats.suffix_count, + 'hashed': + (stats.suffix_hash * 100.0) / stats.suffix_count, + 'synced': + (stats.suffix_sync * 100.0) / stats.suffix_count}) self.partition_times.sort() self.logger.info( _("Partition times: max %(max).4fs, " @@ -619,8 +777,9 @@ class ObjectReplicator(Daemon): found_local = True dev_path = check_drive(self.devices_dir, local_dev['device'], self.mount_check) + local_dev_stats = self.stats_for_dev[local_dev['device']] if not dev_path: - self._add_failure_stats( + local_dev_stats.add_failure_stats( [(failure_dev['replication_ip'], failure_dev['device']) for failure_dev in policy.object_ring.devs @@ -666,12 +825,12 @@ class ObjectReplicator(Daemon): region=local_dev['region'])) except ValueError: if part_nodes: - self._add_failure_stats( + local_dev_stats.add_failure_stats( [(failure_dev['replication_ip'], failure_dev['device']) for failure_dev in nodes]) else: - self._add_failure_stats( + local_dev_stats.add_failure_stats( [(failure_dev['replication_ip'], failure_dev['device']) for failure_dev in policy.object_ring.devs @@ -715,7 +874,7 @@ class ObjectReplicator(Daemon): if policy.policy_type == REPL_POLICY: if (override_policies is not None and - str(policy.idx) not in override_policies): + policy.idx not in override_policies): continue # ensure rings are loaded for policy self.load_object_ring(policy) @@ -730,14 +889,12 @@ class ObjectReplicator(Daemon): return jobs def replicate(self, override_devices=None, override_partitions=None, - override_policies=None): + override_policies=None, start_time=None): """Run a replication pass""" - self.start = time.time() - self.suffix_count = 0 - self.suffix_sync = 0 - self.suffix_hash = 0 - self.replication_count = 0 - self.last_replication_count = -1 + if start_time is None: + start_time = time.time() + self.start = start_time + self.last_replication_count = 0 self.replication_cycle = (self.replication_cycle + 1) % 10 self.partition_times = [] self.my_replication_ips = self._get_my_replication_ips() @@ -748,19 +905,23 @@ class ObjectReplicator(Daemon): eventlet.sleep() # Give spawns a cycle current_nodes = None + dev_stats = None + num_jobs = 0 try: self.run_pool = GreenPool(size=self.concurrency) jobs = self.collect_jobs(override_devices=override_devices, override_partitions=override_partitions, override_policies=override_policies) for job in jobs: + dev_stats = self.stats_for_dev[job['device']] + num_jobs += 1 current_nodes = job['nodes'] dev_path = check_drive(self.devices_dir, job['device'], self.mount_check) if not dev_path: - self._add_failure_stats([(failure_dev['replication_ip'], - failure_dev['device']) - for failure_dev in job['nodes']]) + dev_stats.add_failure_stats([ + (failure_dev['replication_ip'], failure_dev['device']) + for failure_dev in job['nodes']]) self.logger.warning(_('%s is not mounted'), job['device']) continue if self.handoffs_first and not job['delete']: @@ -794,57 +955,126 @@ class ObjectReplicator(Daemon): self.run_pool.spawn(self.update, job) current_nodes = None self.run_pool.waitall() - except (Exception, Timeout): - if current_nodes: - self._add_failure_stats([(failure_dev['replication_ip'], - failure_dev['device']) - for failure_dev in current_nodes]) - else: - self._add_failure_stats(self.all_devs_info) - self.logger.exception(_("Exception in top-level replication loop")) + except (Exception, Timeout) as err: + if dev_stats: + if current_nodes: + dev_stats.add_failure_stats( + [(failure_dev['replication_ip'], + failure_dev['device']) + for failure_dev in current_nodes]) + else: + dev_stats.add_failure_stats(self.all_devs_info) + self.logger.exception( + _("Exception in top-level replication loop: %s"), err) finally: stats.kill() self.stats_line() - self.stats['attempted'] = self.replication_count - def run_once(self, *args, **kwargs): + def update_recon(self, total, end_time, override_devices): + # Called at the end of a replication pass to update recon stats. + if self.is_multiprocess_worker: + # If it weren't for the failure_nodes field, we could do this as + # a bunch of shared memory using multiprocessing.Value, which + # would be nice because it'd avoid dealing with existing data + # during an upgrade. + update = { + 'object_replication_per_disk': { + od: {'replication_stats': + self.stats_for_dev[od].to_recon(), + 'replication_time': total, + 'replication_last': end_time, + 'object_replication_time': total, + 'object_replication_last': end_time} + for od in override_devices}} + else: + update = {'replication_stats': self.total_stats.to_recon(), + 'replication_time': total, + 'replication_last': end_time, + 'object_replication_time': total, + 'object_replication_last': end_time} + dump_recon_cache(update, self.rcache, self.logger) + + def aggregate_recon_update(self): + per_disk_stats = load_recon_cache(self.rcache).get( + 'object_replication_per_disk', {}) + recon_update = {} + min_repl_last = float('inf') + min_repl_time = float('inf') + + # If every child has reported some stats, then aggregate things. + if all(ld in per_disk_stats for ld in self.all_local_devices): + aggregated = Stats() + for device_name, data in per_disk_stats.items(): + aggregated += Stats.from_recon(data['replication_stats']) + min_repl_time = min( + min_repl_time, data['object_replication_time']) + min_repl_last = min( + min_repl_last, data['object_replication_last']) + recon_update['replication_stats'] = aggregated.to_recon() + recon_update['replication_last'] = min_repl_last + recon_update['replication_time'] = min_repl_time + recon_update['object_replication_last'] = min_repl_last + recon_update['object_replication_time'] = min_repl_time + + # Clear out entries for old local devices that we no longer have + devices_to_remove = set(per_disk_stats) - set(self.all_local_devices) + if devices_to_remove: + recon_update['object_replication_per_disk'] = { + dtr: {} for dtr in devices_to_remove} + + return recon_update + + def run_once(self, multiprocess_worker_index=None, + have_overrides=False, *args, **kwargs): + if multiprocess_worker_index is not None: + self.is_multiprocess_worker = True + self._emplace_log_prefix(multiprocess_worker_index) + rsync_reaper = eventlet.spawn(self._child_process_reaper) - self._zero_stats() self.logger.info(_("Running object replicator in script mode.")) - override_devices = list_from_csv(kwargs.get('devices')) - override_partitions = list_from_csv(kwargs.get('partitions')) - override_policies = list_from_csv(kwargs.get('policies')) - if not override_devices: - override_devices = None - if not override_partitions: - override_partitions = None - if not override_policies: - override_policies = None + override_opts = parse_override_options(once=True, **kwargs) + devices = override_opts.devices or None + partitions = override_opts.partitions or None + policies = override_opts.policies or None + start_time = time.time() self.replicate( - override_devices=override_devices, - override_partitions=override_partitions, - override_policies=override_policies) - total = (time.time() - self.stats['start']) / 60 + override_devices=devices, + override_partitions=partitions, + override_policies=policies, + start_time=start_time) + end_time = time.time() + total = (end_time - start_time) / 60 self.logger.info( _("Object replication complete (once). (%.02f minutes)"), total) - if not (override_partitions or override_devices): - replication_last = time.time() - dump_recon_cache({'replication_stats': self.stats, - 'replication_time': total, - 'replication_last': replication_last, - 'object_replication_time': total, - 'object_replication_last': replication_last}, - self.rcache, self.logger) + + # If we've been manually run on a subset of + # policies/devices/partitions, then our recon stats are not + # representative of how replication is doing, so we don't publish + # them. + if self.is_multiprocess_worker: + # The main process checked for overrides and determined that + # there were none + should_update_recon = not have_overrides + else: + # We are single-process, so update recon only if we worked on + # everything + should_update_recon = not (partitions or devices or policies) + if should_update_recon: + self.update_recon(total, end_time, devices) # Give rsync processes one last chance to exit, then bail out and # let them be init's problem self._child_process_reaper_queue.put(None) rsync_reaper.wait() - def run_forever(self, *args, **kwargs): + def run_forever(self, multiprocess_worker_index=None, + override_devices=None, *args, **kwargs): + if multiprocess_worker_index is not None: + self.is_multiprocess_worker = True + self._emplace_log_prefix(multiprocess_worker_index) self.logger.info(_("Starting object replicator in daemon mode.")) eventlet.spawn_n(self._child_process_reaper) # Run the replicator continually @@ -852,17 +1082,18 @@ class ObjectReplicator(Daemon): self._zero_stats() self.logger.info(_("Starting object replication pass.")) # Run the replicator - self.replicate() - total = (time.time() - self.stats['start']) / 60 + start = time.time() + self.replicate(override_devices=override_devices) + end = time.time() + total = (end - start) / 60 self.logger.info( _("Object replication complete. (%.02f minutes)"), total) - replication_last = time.time() - dump_recon_cache({'replication_stats': self.stats, - 'replication_time': total, - 'replication_last': replication_last, - 'object_replication_time': total, - 'object_replication_last': replication_last}, - self.rcache, self.logger) + self.update_recon(total, end, override_devices) self.logger.debug('Replication sleeping for %s seconds.', self.interval) sleep(self.interval) + + def post_multiprocess_run(self): + # This method is called after run_once using multiple workers. + update = self.aggregate_recon_update() + dump_recon_cache(update, self.rcache, self.logger) diff --git a/test/unit/common/test_daemon.py b/test/unit/common/test_daemon.py index 8754b513dd..6f37066a6e 100644 --- a/test/unit/common/test_daemon.py +++ b/test/unit/common/test_daemon.py @@ -67,6 +67,10 @@ class TestDaemon(unittest.TestCase): class MyWorkerDaemon(MyDaemon): + def __init__(self, *a, **kw): + super(MyWorkerDaemon, self).__init__(*a, **kw) + MyWorkerDaemon.post_multiprocess_run_called = False + def get_worker_args(self, once=False, **kwargs): return [kwargs for i in range(int(self.conf.get('workers', 0)))] @@ -76,6 +80,9 @@ class MyWorkerDaemon(MyDaemon): except IndexError: return True + def post_multiprocess_run(self): + MyWorkerDaemon.post_multiprocess_run_called = True + class TestWorkerDaemon(unittest.TestCase): @@ -231,6 +238,7 @@ class TestRunDaemon(unittest.TestCase): }) self.assertEqual([], self.mock_kill.call_args_list) self.assertIn('Finished', d.logger.get_lines_for_level('notice')[-1]) + self.assertTrue(MyWorkerDaemon.post_multiprocess_run_called) def test_forked_worker(self): d = MyWorkerDaemon({'workers': 3}) diff --git a/test/unit/common/test_utils.py b/test/unit/common/test_utils.py index 6e98edaf4e..c71b572e31 100644 --- a/test/unit/common/test_utils.py +++ b/test/unit/common/test_utils.py @@ -1533,6 +1533,25 @@ class TestUtils(unittest.TestCase): finally: rmtree(testdir_base) + def test_load_recon_cache(self): + stub_data = {'test': 'foo'} + with NamedTemporaryFile() as f: + f.write(json.dumps(stub_data).encode("utf-8")) + f.flush() + self.assertEqual(stub_data, utils.load_recon_cache(f.name)) + + # missing files are treated as empty + self.assertFalse(os.path.exists(f.name)) # sanity + self.assertEqual({}, utils.load_recon_cache(f.name)) + + # Corrupt files are treated as empty. We could crash and make an + # operator fix the corrupt file, but they'll "fix" it with "rm -f + # /var/cache/swift/*.recon", so let's just do it for them. + with NamedTemporaryFile() as f: + f.write(b"{not [valid (json") + f.flush() + self.assertEqual({}, utils.load_recon_cache(f.name)) + def test_get_logger(self): sio = StringIO() logger = logging.getLogger('server') @@ -3557,21 +3576,51 @@ cluster_dfw1 = http://dfw1.host/v1/ utils.get_hmac('GET', '/path', 1, 'abc'), 'b17f6ff8da0e251737aa9e3ee69a881e3e092e2f') - def test_parse_overrides(self): - devices, partitions = utils.parse_overrides(devices='sdb1,sdb2') - self.assertIn('sdb1', devices) - self.assertIn('sdb2', devices) - self.assertNotIn('sdb3', devices) - self.assertIn(1, partitions) - self.assertIn('1', partitions) # matches because of Everything - self.assertIn(None, partitions) # matches because of Everything - devices, partitions = utils.parse_overrides(partitions='1,2,3') - self.assertIn('sdb1', devices) - self.assertIn('1', devices) # matches because of Everything - self.assertIn(None, devices) # matches because of Everything - self.assertIn(1, partitions) - self.assertNotIn('1', partitions) - self.assertNotIn(None, partitions) + def test_parse_override_options(self): + # When override_ is passed in, it takes precedence. + opts = utils.parse_override_options( + override_policies=[0, 1], + override_devices=['sda', 'sdb'], + override_partitions=[100, 200], + policies='0,1,2,3', + devices='sda,sdb,sdc,sdd', + partitions='100,200,300,400') + self.assertEqual(opts.policies, [0, 1]) + self.assertEqual(opts.devices, ['sda', 'sdb']) + self.assertEqual(opts.partitions, [100, 200]) + + # When override_ is passed in, it applies even in run-once + # mode. + opts = utils.parse_override_options( + once=True, + override_policies=[0, 1], + override_devices=['sda', 'sdb'], + override_partitions=[100, 200], + policies='0,1,2,3', + devices='sda,sdb,sdc,sdd', + partitions='100,200,300,400') + self.assertEqual(opts.policies, [0, 1]) + self.assertEqual(opts.devices, ['sda', 'sdb']) + self.assertEqual(opts.partitions, [100, 200]) + + # In run-once mode, we honor the passed-in overrides. + opts = utils.parse_override_options( + once=True, + policies='0,1,2,3', + devices='sda,sdb,sdc,sdd', + partitions='100,200,300,400') + self.assertEqual(opts.policies, [0, 1, 2, 3]) + self.assertEqual(opts.devices, ['sda', 'sdb', 'sdc', 'sdd']) + self.assertEqual(opts.partitions, [100, 200, 300, 400]) + + # In run-forever mode, we ignore the passed-in overrides. + opts = utils.parse_override_options( + policies='0,1,2,3', + devices='sda,sdb,sdc,sdd', + partitions='100,200,300,400') + self.assertEqual(opts.policies, []) + self.assertEqual(opts.devices, []) + self.assertEqual(opts.partitions, []) def test_get_policy_index(self): # Account has no information about a policy diff --git a/test/unit/obj/test_replicator.py b/test/unit/obj/test_replicator.py index 0d061b4ec9..86c4599203 100644 --- a/test/unit/obj/test_replicator.py +++ b/test/unit/obj/test_replicator.py @@ -234,7 +234,8 @@ class TestObjectReplicator(unittest.TestCase): self.conf = dict( bind_ip=_ips()[0], bind_port=6200, swift_dir=self.testdir, devices=self.devices, mount_check='false', - timeout='300', stats_interval='1', sync_method='rsync') + timeout='300', stats_interval='1', sync_method='rsync', + recon_cache_path=self.recon_cache) self._create_replicator() self.ts = make_timestamp_iter() @@ -370,7 +371,6 @@ class TestObjectReplicator(unittest.TestCase): self.assertEqual((start + 1 + cycle) % 10, replicator.replication_cycle) - self.assertEqual(0, replicator.stats['start']) recon_fname = os.path.join(self.recon_cache, "object.recon") with open(recon_fname) as cachefile: recon = json.loads(cachefile.read()) @@ -527,7 +527,7 @@ class TestObjectReplicator(unittest.TestCase): _create_test_rings(self.testdir, devs) self.replicator.collect_jobs() - self.assertEqual(self.replicator.stats['failure'], 0) + self.assertEqual(self.replicator.total_stats.failure, 0) @mock.patch('swift.obj.replicator.random.shuffle', side_effect=lambda l: l) def test_collect_jobs_multi_disk(self, mock_shuffle): @@ -876,7 +876,7 @@ class TestObjectReplicator(unittest.TestCase): self.replicator.replicate() # all jobs processed! self.assertEqual(self.replicator.job_count, - self.replicator.replication_count) + self.replicator.total_stats.attempted) self.assertFalse(self.replicator.handoffs_remaining) # sanity, all the handoffs suffixes we filled in were rsync'd @@ -942,7 +942,7 @@ class TestObjectReplicator(unittest.TestCase): # jobs may have been spawned into the pool before the failed # update_deleted job incremented handoffs_remaining and caused the # handoffs_first check to abort the current pass - self.assertLessEqual(self.replicator.replication_count, + self.assertLessEqual(self.replicator.total_stats.attempted, 2 + self.replicator.concurrency) # sanity, all the handoffs suffixes we filled in were rsync'd @@ -979,10 +979,11 @@ class TestObjectReplicator(unittest.TestCase): side_effect=_ips), \ mocked_http_conn(*[200] * 14, body=stub_body) as conn_log: self.replicator.handoff_delete = 2 + self.replicator._zero_stats() self.replicator.replicate() # all jobs processed! self.assertEqual(self.replicator.job_count, - self.replicator.replication_count) + self.replicator.total_stats.attempted) self.assertFalse(self.replicator.handoffs_remaining) # sanity, all parts got replicated found_replicate_calls = defaultdict(int) @@ -1418,6 +1419,11 @@ class TestObjectReplicator(unittest.TestCase): self.assertFalse(os.access(pol1_part_path, os.F_OK)) self.assertTrue(os.access(pol0_part_path, os.F_OK)) + # since we weren't operating on everything, but only a subset of + # storage policies, we didn't dump any recon stats. + self.assertFalse(os.path.exists( + os.path.join(self.recon_cache, 'object.recon'))) + def test_delete_partition_ssync(self): with mock.patch('swift.obj.replicator.http_connect', mock_http_connect(200)): @@ -1735,7 +1741,7 @@ class TestObjectReplicator(unittest.TestCase): with mock.patch.object(replicator, 'sync', fake_sync): replicator.run_once() - log_lines = replicator.logger.get_lines_for_level('error') + log_lines = replicator.logger.logger.get_lines_for_level('error') self.assertIn("Error syncing with node:", log_lines[0]) self.assertFalse(log_lines[1:]) # setup creates 4 partitions; partition 1 does not map to local dev id @@ -1770,7 +1776,7 @@ class TestObjectReplicator(unittest.TestCase): # attempt to 16 times but succeeded only 15 times due to Timeout suffix_hashes = sum( count for (metric, count), _junk in - replicator.logger.log_dict['update_stats'] + replicator.logger.logger.log_dict['update_stats'] if metric == 'suffix.hashes') self.assertEqual(15, suffix_hashes) @@ -1801,7 +1807,8 @@ class TestObjectReplicator(unittest.TestCase): self.replicator.suffix_count = 0 self.replicator.suffix_sync = 0 self.replicator.suffix_hash = 0 - self.replicator.replication_count = 0 + self.replicator.last_replication_count = 0 + self.replicator._zero_stats() self.replicator.partition_times = [] self.headers = {'Content-Length': '0', @@ -1926,10 +1933,11 @@ class TestObjectReplicator(unittest.TestCase): reqs.append(mock.call(node, local_job, ['a83'])) fake_func.assert_has_calls(reqs, any_order=True) self.assertEqual(fake_func.call_count, 2) - self.assertEqual(self.replicator.replication_count, 1) - self.assertEqual(self.replicator.suffix_sync, 2) - self.assertEqual(self.replicator.suffix_hash, 1) - self.assertEqual(self.replicator.suffix_count, 1) + stats = self.replicator.total_stats + self.assertEqual(stats.attempted, 1) + self.assertEqual(stats.suffix_sync, 2) + self.assertEqual(stats.suffix_hash, 1) + self.assertEqual(stats.suffix_count, 1) # Efficient Replication Case set_default(self) @@ -1945,10 +1953,11 @@ class TestObjectReplicator(unittest.TestCase): # belong to another region self.replicator.update(job) self.assertEqual(fake_func.call_count, 1) - self.assertEqual(self.replicator.replication_count, 1) - self.assertEqual(self.replicator.suffix_sync, 1) - self.assertEqual(self.replicator.suffix_hash, 1) - self.assertEqual(self.replicator.suffix_count, 1) + stats = self.replicator.total_stats + self.assertEqual(stats.attempted, 1) + self.assertEqual(stats.suffix_sync, 1) + self.assertEqual(stats.suffix_hash, 1) + self.assertEqual(stats.suffix_count, 1) mock_http.reset_mock() self.logger.clear() @@ -2038,7 +2047,7 @@ class TestObjectReplicator(unittest.TestCase): _create_test_rings(self.testdir, next_part_power=4) self.replicator.replicate() self.assertEqual(0, self.replicator.job_count) - self.assertEqual(0, self.replicator.replication_count) + self.assertEqual(0, self.replicator.total_stats.attempted) warnings = self.logger.get_lines_for_level('warning') self.assertIn( "next_part_power set in policy 'one'. Skipping", warnings) @@ -2107,5 +2116,542 @@ class TestObjectReplicator(unittest.TestCase): self.assertEqual(len(mock_procs), 2) +@patch_policies([StoragePolicy(0, 'zero', False), + StoragePolicy(1, 'one', True)]) +class TestMultiProcessReplicator(unittest.TestCase): + def setUp(self): + # recon cache path + self.recon_cache = tempfile.mkdtemp() + rmtree(self.recon_cache, ignore_errors=1) + os.mkdir(self.recon_cache) + self.recon_file = os.path.join(self.recon_cache, 'object.recon') + + bind_port = 6200 + + # Set up some rings + self.testdir = tempfile.mkdtemp() + _create_test_rings(self.testdir, devs=[ + {'id': 0, 'device': 'sda', 'zone': 0, + 'region': 1, 'ip': '127.0.0.1', 'port': bind_port}, + {'id': 1, 'device': 'sdb', 'zone': 0, + 'region': 1, 'ip': '127.0.0.1', 'port': bind_port}, + {'id': 2, 'device': 'sdc', 'zone': 0, + 'region': 1, 'ip': '127.0.0.1', 'port': bind_port}, + {'id': 3, 'device': 'sdd', 'zone': 0, + 'region': 1, 'ip': '127.0.0.1', 'port': bind_port}, + {'id': 4, 'device': 'sde', 'zone': 0, + 'region': 1, 'ip': '127.0.0.1', 'port': bind_port}, + {'id': 100, 'device': 'notme0', 'zone': 0, + 'region': 1, 'ip': '127.99.99.99', 'port': bind_port}]) + + self.logger = debug_logger('test-replicator') + self.conf = dict( + bind_ip='127.0.0.1', bind_port=bind_port, + swift_dir=self.testdir, + mount_check='false', recon_cache_path=self.recon_cache, + timeout='300', stats_interval='1', sync_method='rsync') + + self.replicator = object_replicator.ObjectReplicator( + self.conf, logger=self.logger) + + def tearDown(self): + self.assertFalse(process_errors) + rmtree(self.testdir, ignore_errors=1) + rmtree(self.recon_cache, ignore_errors=1) + + def fake_replicate(self, override_devices, **kw): + # Faked-out replicate() method. Just updates the stats, but doesn't + # do any work. + for device in override_devices: + stats = self.replicator.stats_for_dev[device] + if device == 'sda': + stats.attempted = 1 + stats.success = 10 + stats.failure = 100 + stats.hashmatch = 1000 + stats.rsync = 10000 + stats.remove = 100000 + stats.suffix_count = 1000000 + stats.suffix_hash = 10000000 + stats.suffix_sync = 100000000 + stats.failure_nodes = { + '10.1.1.1': {'d11': 1}} + elif device == 'sdb': + stats.attempted = 2 + stats.success = 20 + stats.failure = 200 + stats.hashmatch = 2000 + stats.rsync = 20000 + stats.remove = 200000 + stats.suffix_count = 2000000 + stats.suffix_hash = 20000000 + stats.suffix_sync = 200000000 + stats.failure_nodes = { + '10.2.2.2': {'d22': 2}} + elif device == 'sdc': + stats.attempted = 3 + stats.success = 30 + stats.failure = 300 + stats.hashmatch = 3000 + stats.rsync = 30000 + stats.remove = 300000 + stats.suffix_count = 3000000 + stats.suffix_hash = 30000000 + stats.suffix_sync = 300000000 + stats.failure_nodes = { + '10.3.3.3': {'d33': 3}} + elif device == 'sdd': + stats.attempted = 4 + stats.success = 40 + stats.failure = 400 + stats.hashmatch = 4000 + stats.rsync = 40000 + stats.remove = 400000 + stats.suffix_count = 4000000 + stats.suffix_hash = 40000000 + stats.suffix_sync = 400000000 + stats.failure_nodes = { + '10.4.4.4': {'d44': 4}} + elif device == 'sde': + stats.attempted = 5 + stats.success = 50 + stats.failure = 500 + stats.hashmatch = 5000 + stats.rsync = 50000 + stats.remove = 500000 + stats.suffix_count = 5000000 + stats.suffix_hash = 50000000 + stats.suffix_sync = 500000000 + stats.failure_nodes = { + '10.5.5.5': {'d55': 5}} + else: + raise Exception("mock can't handle %r" % device) + + def test_no_multiprocessing(self): + self.replicator.replicator_workers = 0 + self.assertEqual(self.replicator.get_worker_args(), []) + + def test_device_distribution(self): + self.replicator.replicator_workers = 2 + self.assertEqual(self.replicator.get_worker_args(), [{ + 'override_devices': ['sda', 'sdc', 'sde'], + 'override_partitions': [], + 'override_policies': [], + 'have_overrides': False, + 'multiprocess_worker_index': 0, + }, { + 'override_devices': ['sdb', 'sdd'], + 'override_partitions': [], + 'override_policies': [], + 'have_overrides': False, + 'multiprocess_worker_index': 1, + }]) + + def test_override_policies(self): + self.replicator.replicator_workers = 2 + args = self.replicator.get_worker_args(policies="3,5,7", once=True) + self.assertEqual(args, [{ + 'override_devices': ['sda', 'sdc', 'sde'], + 'override_partitions': [], + 'override_policies': [3, 5, 7], + 'have_overrides': True, + 'multiprocess_worker_index': 0, + }, { + 'override_devices': ['sdb', 'sdd'], + 'override_partitions': [], + 'override_policies': [3, 5, 7], + 'have_overrides': True, + 'multiprocess_worker_index': 1, + }]) + + # override policies don't apply in run-forever mode + args = self.replicator.get_worker_args(policies="3,5,7", once=False) + self.assertEqual(args, [{ + 'override_devices': ['sda', 'sdc', 'sde'], + 'override_partitions': [], + 'override_policies': [], + 'have_overrides': False, + 'multiprocess_worker_index': 0, + }, { + 'override_devices': ['sdb', 'sdd'], + 'override_partitions': [], + 'override_policies': [], + 'have_overrides': False, + 'multiprocess_worker_index': 1, + }]) + + def test_more_workers_than_disks(self): + self.replicator.replicator_workers = 999 + self.assertEqual(self.replicator.get_worker_args(), [{ + 'override_devices': ['sda'], + 'override_partitions': [], + 'override_policies': [], + 'have_overrides': False, + 'multiprocess_worker_index': 0, + }, { + 'override_devices': ['sdb'], + 'override_partitions': [], + 'override_policies': [], + 'have_overrides': False, + 'multiprocess_worker_index': 1, + }, { + 'override_devices': ['sdc'], + 'override_partitions': [], + 'override_policies': [], + 'have_overrides': False, + 'multiprocess_worker_index': 2, + }, { + 'override_devices': ['sdd'], + 'override_partitions': [], + 'override_policies': [], + 'have_overrides': False, + 'multiprocess_worker_index': 3, + }, { + 'override_devices': ['sde'], + 'override_partitions': [], + 'override_policies': [], + 'have_overrides': False, + 'multiprocess_worker_index': 4, + }]) + + # Remember how many workers we actually have so that the log-line + # prefixes are reasonable. Otherwise, we'd have five workers, each + # logging lines starting with things like "[worker X/999 pid=P]" + # despite there being only five. + self.assertEqual(self.replicator.replicator_workers, 5) + + def test_command_line_overrides(self): + self.replicator.replicator_workers = 2 + + args = self.replicator.get_worker_args( + devices="sda,sdc,sdd", partitions="12,34,56", once=True) + self.assertEqual(args, [{ + 'override_devices': ['sda', 'sdd'], + 'override_partitions': [12, 34, 56], + 'override_policies': [], + 'have_overrides': True, + 'multiprocess_worker_index': 0, + }, { + 'override_devices': ['sdc'], + 'override_partitions': [12, 34, 56], + 'override_policies': [], + 'have_overrides': True, + 'multiprocess_worker_index': 1, + }]) + + args = self.replicator.get_worker_args( + devices="sda,sdc,sdd", once=True) + self.assertEqual(args, [{ + 'override_devices': ['sda', 'sdd'], + 'override_partitions': [], + 'override_policies': [], + 'have_overrides': True, + 'multiprocess_worker_index': 0, + }, { + 'override_devices': ['sdc'], + 'override_partitions': [], + 'override_policies': [], + 'have_overrides': True, + 'multiprocess_worker_index': 1, + }]) + + # no overrides apply in run-forever mode + args = self.replicator.get_worker_args( + devices="sda,sdc,sdd", partitions="12,34,56", once=False) + self.assertEqual(args, [{ + 'override_devices': ['sda', 'sdc', 'sde'], + 'override_partitions': [], + 'override_policies': [], + 'have_overrides': False, + 'multiprocess_worker_index': 0, + }, { + 'override_devices': ['sdb', 'sdd'], + 'override_partitions': [], + 'override_policies': [], + 'have_overrides': False, + 'multiprocess_worker_index': 1, + }]) + + def test_worker_logging(self): + self.replicator.replicator_workers = 3 + + def log_some_stuff(*a, **kw): + self.replicator.logger.debug("debug message") + self.replicator.logger.info("info message") + self.replicator.logger.warning("warning message") + self.replicator.logger.error("error message") + + with mock.patch.object(self.replicator, 'replicate', log_some_stuff), \ + mock.patch("os.getpid", lambda: 8804): + self.replicator.get_worker_args() + self.replicator.run_once(multiprocess_worker_index=0, + override_devices=['sda', 'sdb']) + + prefix = "[worker 1/3 pid=8804] " + for level, lines in self.logger.logger.all_log_lines().items(): + for line in lines: + self.assertTrue( + line.startswith(prefix), + "%r doesn't start with %r (level %s)" % ( + line, prefix, level)) + + def test_recon_run_once(self): + self.replicator.replicator_workers = 3 + + the_time = [1521680000] + + def mock_time(): + rv = the_time[0] + the_time[0] += 120 + return rv + + # Simulate a couple child processes + with mock.patch.object(self.replicator, 'replicate', + self.fake_replicate), \ + mock.patch('time.time', mock_time): + self.replicator.get_worker_args() + self.replicator.run_once(multiprocess_worker_index=0, + override_devices=['sda', 'sdb']) + self.replicator.run_once(multiprocess_worker_index=1, + override_devices=['sdc']) + self.replicator.run_once(multiprocess_worker_index=2, + override_devices=['sdd', 'sde']) + + with open(self.recon_file) as fh: + recon_data = json.load(fh) + self.assertIn('object_replication_per_disk', recon_data) + self.assertIn('sda', recon_data['object_replication_per_disk']) + self.assertIn('sdb', recon_data['object_replication_per_disk']) + self.assertIn('sdc', recon_data['object_replication_per_disk']) + self.assertIn('sdd', recon_data['object_replication_per_disk']) + self.assertIn('sde', recon_data['object_replication_per_disk']) + sda = recon_data['object_replication_per_disk']['sda'] + + # Spot-check a couple of fields + self.assertEqual(sda['replication_stats']['attempted'], 1) + self.assertEqual(sda['replication_stats']['success'], 10) + self.assertEqual(sda['object_replication_time'], 2) # minutes + self.assertEqual(sda['object_replication_last'], 1521680120) + + # Aggregate the workers' recon updates + self.replicator.post_multiprocess_run() + with open(self.recon_file) as fh: + recon_data = json.load(fh) + self.assertEqual(recon_data['replication_stats']['attempted'], 15) + self.assertEqual(recon_data['replication_stats']['failure'], 1500) + self.assertEqual(recon_data['replication_stats']['hashmatch'], 15000) + self.assertEqual(recon_data['replication_stats']['remove'], 1500000) + self.assertEqual(recon_data['replication_stats']['rsync'], 150000) + self.assertEqual(recon_data['replication_stats']['success'], 150) + self.assertEqual(recon_data['replication_stats']['suffix_count'], + 15000000) + self.assertEqual(recon_data['replication_stats']['suffix_hash'], + 150000000) + self.assertEqual(recon_data['replication_stats']['suffix_sync'], + 1500000000) + self.assertEqual(recon_data['replication_stats']['failure_nodes'], { + '10.1.1.1': {'d11': 1}, + '10.2.2.2': {'d22': 2}, + '10.3.3.3': {'d33': 3}, + '10.4.4.4': {'d44': 4}, + '10.5.5.5': {'d55': 5}, + }) + self.assertEqual(recon_data['object_replication_time'], 2) # minutes + self.assertEqual(recon_data['object_replication_last'], 1521680120) + + def test_recon_skipped_with_overrides(self): + self.replicator.replicator_workers = 3 + + the_time = [1521680000] + + def mock_time(): + rv = the_time[0] + the_time[0] += 120 + return rv + + with mock.patch.object(self.replicator, 'replicate', + self.fake_replicate), \ + mock.patch('time.time', mock_time): + self.replicator.get_worker_args() + self.replicator.run_once(multiprocess_worker_index=0, + have_overrides=True, + override_devices=['sda', 'sdb']) + self.assertFalse(os.path.exists(self.recon_file)) + + # have_overrides=False makes us get recon stats + with mock.patch.object(self.replicator, 'replicate', + self.fake_replicate), \ + mock.patch('time.time', mock_time): + self.replicator.get_worker_args() + self.replicator.run_once(multiprocess_worker_index=0, + have_overrides=False, + override_devices=['sda', 'sdb']) + with open(self.recon_file) as fh: + recon_data = json.load(fh) + self.assertIn('sda', recon_data['object_replication_per_disk']) + + def test_recon_run_forever(self): + the_time = [1521521521.52152] + + def mock_time(): + rv = the_time[0] + the_time[0] += 120 + return rv + + self.replicator.replicator_workers = 2 + self.replicator._next_rcache_update = the_time[0] + + # One worker has finished a pass, the other hasn't. + with mock.patch.object(self.replicator, 'replicate', + self.fake_replicate), \ + mock.patch('time.time', mock_time): + self.replicator.get_worker_args() + # Yes, this says run_once, but this is only to populate + # object.recon with some stats. The real test is for the + # aggregation. + self.replicator.run_once(multiprocess_worker_index=0, + override_devices=['sda', 'sdb', 'sdc']) + + # This will not produce aggregate stats since not every device has + # finished a pass. + the_time[0] += self.replicator.stats_interval + with mock.patch('time.time', mock_time): + rv = self.replicator.is_healthy() + self.assertTrue(rv) + with open(self.recon_file) as fh: + recon_data = json.load(fh) + self.assertNotIn('replication_stats', recon_data) + + # Now all the local devices have completed a replication pass, so we + # will produce aggregate stats. + with mock.patch.object(self.replicator, 'replicate', + self.fake_replicate), \ + mock.patch('time.time', mock_time): + self.replicator.get_worker_args() + self.replicator.run_once(multiprocess_worker_index=1, + override_devices=['sdd', 'sde']) + the_time[0] += self.replicator.stats_interval + with mock.patch('time.time', mock_time): + rv = self.replicator.is_healthy() + self.assertTrue(rv) + with open(self.recon_file) as fh: + recon_data = json.load(fh) + self.assertIn('replication_stats', recon_data) + + # no need to exhaustively check every sum + self.assertEqual(recon_data['replication_stats']['attempted'], 15) + self.assertEqual(recon_data['replication_stats']['success'], 150) + + self.assertEqual( + recon_data['replication_last'], + min(pd['replication_last'] + for pd in recon_data['object_replication_per_disk'].values())) + + +class TestReplicatorStats(unittest.TestCase): + def test_to_recon(self): + st = object_replicator.Stats( + attempted=1, failure=2, hashmatch=3, remove=4, + rsync=5, success=7, + suffix_count=8, suffix_hash=9, suffix_sync=10, + failure_nodes={'10.1.2.3': {'sda': 100, 'sdb': 200}}) + # This is what appears in the recon dump + self.assertEqual(st.to_recon(), { + 'attempted': 1, + 'failure': 2, + 'hashmatch': 3, + 'remove': 4, + 'rsync': 5, + 'success': 7, + 'suffix_count': 8, + 'suffix_hash': 9, + 'suffix_sync': 10, + 'failure_nodes': {'10.1.2.3': {'sda': 100, 'sdb': 200}}, + }) + + def test_recon_roundtrip(self): + before = object_replicator.Stats( + attempted=1, failure=2, hashmatch=3, remove=4, + rsync=5, success=7, + suffix_count=8, suffix_hash=9, suffix_sync=10, + failure_nodes={'10.1.2.3': {'sda': 100, 'sdb': 200}}) + after = object_replicator.Stats.from_recon(before.to_recon()) + self.assertEqual(after.attempted, before.attempted) + self.assertEqual(after.failure, before.failure) + self.assertEqual(after.hashmatch, before.hashmatch) + self.assertEqual(after.remove, before.remove) + self.assertEqual(after.rsync, before.rsync) + self.assertEqual(after.success, before.success) + self.assertEqual(after.suffix_count, before.suffix_count) + self.assertEqual(after.suffix_hash, before.suffix_hash) + self.assertEqual(after.suffix_sync, before.suffix_sync) + self.assertEqual(after.failure_nodes, before.failure_nodes) + + def test_from_recon_skips_extra_fields(self): + # If another attribute ever sneaks its way in, we should ignore it. + # This will make aborted upgrades a little less painful for + # operators. + recon_dict = {'attempted': 1, 'failure': 2, 'hashmatch': 3, + 'spices': 5, 'treasures': 8} + stats = object_replicator.Stats.from_recon(recon_dict) + self.assertEqual(stats.attempted, 1) + self.assertEqual(stats.failure, 2) + self.assertEqual(stats.hashmatch, 3) + # We don't gain attributes just because they're in object.recon. + self.assertFalse(hasattr(stats, 'spices')) + self.assertFalse(hasattr(stats, 'treasures')) + + def test_add_failure_stats(self): + st = object_replicator.Stats() + st.add_failure_stats([('10.1.1.1', 'd10'), ('10.1.1.1', 'd11')]) + st.add_failure_stats([('10.1.1.1', 'd10')]) + st.add_failure_stats([('10.1.1.1', 'd12'), ('10.2.2.2', 'd20'), + ('10.2.2.2', 'd21'), ('10.2.2.2', 'd21'), + ('10.2.2.2', 'd21')]) + self.assertEqual(st.failure, 8) + + as_dict = st.to_recon() + self.assertEqual(as_dict['failure_nodes'], { + '10.1.1.1': { + 'd10': 2, + 'd11': 1, + 'd12': 1, + }, + '10.2.2.2': { + 'd20': 1, + 'd21': 3, + }, + }) + + def test_add(self): + st1 = object_replicator.Stats( + attempted=1, failure=2, hashmatch=3, remove=4, rsync=5, + success=6, suffix_count=7, suffix_hash=8, suffix_sync=9, + failure_nodes={ + '10.1.1.1': {'sda': 10, 'sdb': 20}, + '10.1.1.2': {'sda': 10, 'sdb': 20}}) + st2 = object_replicator.Stats( + attempted=2, failure=4, hashmatch=6, remove=8, rsync=10, + success=12, suffix_count=14, suffix_hash=16, suffix_sync=18, + failure_nodes={ + '10.1.1.2': {'sda': 10, 'sdb': 20}, + '10.1.1.3': {'sda': 10, 'sdb': 20}}) + total = st1 + st2 + self.assertEqual(total.attempted, 3) + self.assertEqual(total.failure, 6) + self.assertEqual(total.hashmatch, 9) + self.assertEqual(total.remove, 12) + self.assertEqual(total.rsync, 15) + self.assertEqual(total.success, 18) + self.assertEqual(total.suffix_count, 21) + self.assertEqual(total.suffix_hash, 24) + self.assertEqual(total.suffix_sync, 27) + self.assertEqual(total.failure_nodes, { + '10.1.1.1': {'sda': 10, 'sdb': 20}, + '10.1.1.2': {'sda': 20, 'sdb': 40}, + '10.1.1.3': {'sda': 10, 'sdb': 20}, + }) + + if __name__ == '__main__': unittest.main() diff --git a/test/unit/obj/test_ssync_sender.py b/test/unit/obj/test_ssync_sender.py index ddb3f44023..77c08f9d33 100644 --- a/test/unit/obj/test_ssync_sender.py +++ b/test/unit/obj/test_ssync_sender.py @@ -87,8 +87,9 @@ class TestSender(BaseTest): def setUp(self): skip_if_no_xattrs() super(TestSender, self).setUp() + self.daemon_logger = debug_logger('test-ssync-sender') self.daemon = ObjectReplicator(self.daemon_conf, - debug_logger('test-ssync-sender')) + self.daemon_logger) job = {'policy': POLICIES.legacy} # sufficient for Sender.__init__ self.sender = ssync_sender.Sender(self.daemon, None, job, None) @@ -109,7 +110,7 @@ class TestSender(BaseTest): success, candidates = self.sender() self.assertFalse(success) self.assertEqual(candidates, {}) - error_lines = self.daemon.logger.get_lines_for_level('error') + error_lines = self.daemon_logger.get_lines_for_level('error') self.assertEqual(1, len(error_lines)) self.assertEqual('1.2.3.4:5678/sda1/9 1 second: test connect', error_lines[0]) @@ -128,7 +129,7 @@ class TestSender(BaseTest): success, candidates = self.sender() self.assertFalse(success) self.assertEqual(candidates, {}) - error_lines = self.daemon.logger.get_lines_for_level('error') + error_lines = self.daemon_logger.get_lines_for_level('error') self.assertEqual(1, len(error_lines)) self.assertEqual('1.2.3.4:5678/sda1/9 test connect', error_lines[0]) @@ -143,10 +144,10 @@ class TestSender(BaseTest): success, candidates = self.sender() self.assertFalse(success) self.assertEqual(candidates, {}) - error_lines = self.daemon.logger.get_lines_for_level('error') + error_lines = self.daemon_logger.get_lines_for_level('error') for line in error_lines: self.assertTrue(line.startswith( - '1.2.3.4:5678/sda1/9 EXCEPTION in ssync.Sender:')) + '1.2.3.4:5678/sda1/9 EXCEPTION in ssync.Sender: ')) def test_call_catches_exception_handling_exception(self): self.sender.node = None # Will cause inside exception handler to fail @@ -155,7 +156,7 @@ class TestSender(BaseTest): success, candidates = self.sender() self.assertFalse(success) self.assertEqual(candidates, {}) - error_lines = self.daemon.logger.get_lines_for_level('error') + error_lines = self.daemon_logger.get_lines_for_level('error') for line in error_lines: self.assertTrue(line.startswith( 'EXCEPTION in ssync.Sender')) @@ -598,7 +599,7 @@ class TestSender(BaseTest): success, candidates = self.sender() self.assertFalse(success) self.assertEqual(candidates, {}) - error_lines = self.daemon.logger.get_lines_for_level('error') + error_lines = self.daemon_logger.get_lines_for_level('error') for line in error_lines: self.assertTrue(line.startswith( '1.2.3.4:5678/sda1/9 0.01 seconds: connect send')) @@ -622,7 +623,7 @@ class TestSender(BaseTest): success, candidates = self.sender() self.assertFalse(success) self.assertEqual(candidates, {}) - error_lines = self.daemon.logger.get_lines_for_level('error') + error_lines = self.daemon_logger.get_lines_for_level('error') for line in error_lines: self.assertTrue(line.startswith( '1.2.3.4:5678/sda1/9 0.02 seconds: connect receive')) @@ -650,7 +651,7 @@ class TestSender(BaseTest): success, candidates = self.sender() self.assertFalse(success) self.assertEqual(candidates, {}) - error_lines = self.daemon.logger.get_lines_for_level('error') + error_lines = self.daemon_logger.get_lines_for_level('error') for line in error_lines: self.assertTrue(line.startswith( '1.2.3.4:5678/sda1/9 Expected status 200; got 503'))