From 804776b3794e137aa2bfa4c8184add9f614e06d2 Mon Sep 17 00:00:00 2001 From: Romain LE DISEZ Date: Thu, 26 Mar 2020 15:32:42 -0400 Subject: [PATCH] Optimize obj replicator/reconstructor healthchecks DaemonStrategy class calls Daemon.is_healthy() method every 0.1 seconds to ensure that all workers are running as wanted. On object replicator/reconstructor daemons, is_healthy() check if the rings changed to decide if workers must be created/killed. With large rings, this operation can be CPU intensive, especially on low-end CPU. This patch: - increases the check interval to 5 seconds by default, because none of these daemons are critical for performance (they are not in the datapath). But it allows each daemon to change this value if necessary - ensures that before doing a computation of all devices in the ring, object replicator/reconstructor checks that the ring really changed (by checking the mtime of the ring.gz files) On an Atom N2800 processor, this patch reduced the CPU usage of the main object replicator/reconstructor from 70% of a core to 0%. Change-Id: I2867e2be539f325778e2f044a151fd0773a7c390 --- swift/common/daemon.py | 3 ++- swift/obj/reconstructor.py | 6 +++++ swift/obj/replicator.py | 33 +++++++++++++----------- test/unit/__init__.py | 1 + test/unit/obj/test_reconstructor.py | 39 +++++++++++++++++++++++------ 5 files changed, 59 insertions(+), 23 deletions(-) diff --git a/swift/common/daemon.py b/swift/common/daemon.py index 53b4099090..4f838e8731 100644 --- a/swift/common/daemon.py +++ b/swift/common/daemon.py @@ -45,6 +45,7 @@ class Daemon(object): multiple daemonized workers, they simply provide the behavior of the daemon and context specific knowledge about how workers should be started. """ + WORKERS_HEALTHCHECK_INTERVAL = 5.0 def __init__(self, conf): self.conf = conf @@ -239,7 +240,7 @@ class DaemonStrategy(object): if not self.spawned_pids(): self.logger.notice('Finished %s', os.getpid()) break - time.sleep(0.1) + time.sleep(self.daemon.WORKERS_HEALTHCHECK_INTERVAL) self.daemon.post_multiprocess_run() return 0 diff --git a/swift/obj/reconstructor.py b/swift/obj/reconstructor.py index 6169cc7ec0..135f9d5f9f 100644 --- a/swift/obj/reconstructor.py +++ b/swift/obj/reconstructor.py @@ -210,6 +210,7 @@ class ObjectReconstructor(Daemon): 'rebuild_handoff_node_count', 2)) self._df_router = DiskFileRouter(conf, self.logger) self.all_local_devices = self.get_local_devices() + self.rings_mtime = None def get_worker_args(self, once=False, **kwargs): """ @@ -263,6 +264,11 @@ class ObjectReconstructor(Daemon): if now > self._next_rcache_update: self._next_rcache_update = now + self.stats_interval self.aggregate_recon_update() + rings_mtime = [os.path.getmtime(self.load_object_ring( + policy).serialized_path) for policy in self.policies] + if self.rings_mtime == rings_mtime: + return True + self.rings_mtime = rings_mtime return self.get_local_devices() == self.all_local_devices def aggregate_recon_update(self): diff --git a/swift/obj/replicator.py b/swift/obj/replicator.py index 762a77ea46..e3634bb8f8 100644 --- a/swift/obj/replicator.py +++ b/swift/obj/replicator.py @@ -139,6 +139,8 @@ class ObjectReplicator(Daemon): int(conf.get('bind_port', 6200)) self.concurrency = int(conf.get('concurrency', 1)) self.replicator_workers = int(conf.get('replicator_workers', 0)) + self.policies = [policy for policy in POLICIES + if policy.policy_type == REPL_POLICY] self.stats_interval = int(conf.get('stats_interval', '300')) self.ring_check_interval = int(conf.get('ring_check_interval', 15)) self.next_check = time.time() + self.ring_check_interval @@ -187,6 +189,7 @@ class ObjectReplicator(Daemon): self.is_multiprocess_worker = None self._df_router = DiskFileRouter(conf, self.logger) self._child_process_reaper_queue = queue.LightQueue() + self.rings_mtime = None def _zero_stats(self): self.stats_for_dev = defaultdict(Stats) @@ -204,7 +207,7 @@ class ObjectReplicator(Daemon): def _get_my_replication_ips(self): my_replication_ips = set() ips = whataremyips() - for policy in POLICIES: + for policy in self.policies: self.load_object_ring(policy) for local_dev in [dev for dev in policy.object_ring.devs if dev and dev['replication_ip'] in ips and @@ -291,6 +294,11 @@ class ObjectReplicator(Daemon): if time.time() >= self._next_rcache_update: update = self.aggregate_recon_update() dump_recon_cache(update, self.rcache, self.logger) + rings_mtime = [os.path.getmtime(self.load_object_ring( + policy).serialized_path) for policy in self.policies] + if self.rings_mtime == rings_mtime: + return True + self.rings_mtime = rings_mtime return self.get_local_devices() == self.all_local_devices def get_local_devices(self): @@ -303,9 +311,7 @@ class ObjectReplicator(Daemon): """ ips = whataremyips(self.bind_ip) local_devices = set() - for policy in POLICIES: - if policy.policy_type != REPL_POLICY: - continue + for policy in self.policies: self.load_object_ring(policy) for device in policy.object_ring.devs: if device and is_local_device( @@ -877,7 +883,7 @@ class ObjectReplicator(Daemon): """ jobs = [] ips = whataremyips(self.bind_ip) - for policy in POLICIES: + for policy in self.policies: # Skip replication if next_part_power is set. In this case # every object is hard-linked twice, but the replicator can't # detect them and would create a second copy of the file if not @@ -891,15 +897,14 @@ class ObjectReplicator(Daemon): policy.name) continue - if policy.policy_type == REPL_POLICY: - if (override_policies is not None and - policy.idx not in override_policies): - continue - # ensure rings are loaded for policy - self.load_object_ring(policy) - jobs += self.build_replication_jobs( - policy, ips, override_devices=override_devices, - override_partitions=override_partitions) + if (override_policies is not None and + policy.idx not in override_policies): + continue + # ensure rings are loaded for policy + self.load_object_ring(policy) + jobs += self.build_replication_jobs( + policy, ips, override_devices=override_devices, + override_partitions=override_partitions) random.shuffle(jobs) if self.handoffs_first: # Move the handoff parts to the front of the list diff --git a/test/unit/__init__.py b/test/unit/__init__.py index 5e6f5e99d7..990347537c 100644 --- a/test/unit/__init__.py +++ b/test/unit/__init__.py @@ -215,6 +215,7 @@ class FakeRing(Ring): def __init__(self, replicas=3, max_more_nodes=0, part_power=0, base_port=1000): + self.serialized_path = '/foo/bar/object.ring.gz' self._base_port = base_port self.max_more_nodes = max_more_nodes self._part_shift = 32 - part_power diff --git a/test/unit/obj/test_reconstructor.py b/test/unit/obj/test_reconstructor.py index 075246751d..92d0a3d338 100644 --- a/test/unit/obj/test_reconstructor.py +++ b/test/unit/obj/test_reconstructor.py @@ -1811,14 +1811,18 @@ class TestWorkerReconstructor(unittest.TestCase): logger=self.logger) # file does not exist to start self.assertFalse(os.path.exists(self.rcache)) - self.assertTrue(reconstructor.is_healthy()) + with mock.patch('swift.obj.reconstructor.os.path.getmtime', + return_value=10): + self.assertTrue(reconstructor.is_healthy()) # ... and isn't created until _next_rcache_update self.assertFalse(os.path.exists(self.rcache)) # ... but if we wait 5 mins (by default) orig_next_update = reconstructor._next_rcache_update with mock.patch('swift.obj.reconstructor.time.time', return_value=now + 301): - self.assertTrue(reconstructor.is_healthy()) + with mock.patch('swift.obj.reconstructor.os.path.getmtime', + return_value=11): + self.assertTrue(reconstructor.is_healthy()) self.assertGreater(reconstructor._next_rcache_update, orig_next_update) # ... it will be created self.assertTrue(os.path.exists(self.rcache)) @@ -1831,13 +1835,19 @@ class TestWorkerReconstructor(unittest.TestCase): reconstructor = object_reconstructor.ObjectReconstructor( {'recon_cache_path': self.recon_cache_path}, logger=self.logger) - self.assertTrue(reconstructor.is_healthy()) + with mock.patch('swift.obj.reconstructor.os.path.getmtime', + return_value=10): + self.assertTrue(reconstructor.is_healthy()) reconstructor.get_local_devices = lambda: { 'sdb%d' % p for p in reconstructor.policies} - self.assertFalse(reconstructor.is_healthy()) + with mock.patch('swift.obj.reconstructor.os.path.getmtime', + return_value=11): + self.assertFalse(reconstructor.is_healthy()) reconstructor.all_local_devices = { 'sdb%d' % p for p in reconstructor.policies} - self.assertTrue(reconstructor.is_healthy()) + with mock.patch('swift.obj.reconstructor.os.path.getmtime', + return_value=12): + self.assertTrue(reconstructor.is_healthy()) def test_is_healthy_detects_ring_change(self): reconstructor = object_reconstructor.ObjectReconstructor( @@ -1850,13 +1860,26 @@ class TestWorkerReconstructor(unittest.TestCase): self.assertEqual(14, len(p.object_ring.devs)) # sanity check worker_args = list(reconstructor.get_worker_args()) self.assertFalse(worker_args[0]['override_devices']) # no local devs - self.assertTrue(reconstructor.is_healthy()) + with mock.patch('swift.obj.reconstructor.os.path.getmtime', + return_value=10): + self.assertTrue(reconstructor.is_healthy()) # expand ring - now there are local devices p.object_ring.set_replicas(28) self.assertEqual(28, len(p.object_ring.devs)) # sanity check - self.assertFalse(reconstructor.is_healthy()) + + # If ring.gz mtime did not change, there is no change to detect + with mock.patch('swift.obj.reconstructor.os.path.getmtime', + return_value=10): + self.assertTrue(reconstructor.is_healthy()) + # Now, ring.gz mtime changed, so the change will be detected + with mock.patch('swift.obj.reconstructor.os.path.getmtime', + return_value=11): + self.assertFalse(reconstructor.is_healthy()) + self.assertNotEqual(worker_args, list(reconstructor.get_worker_args())) - self.assertTrue(reconstructor.is_healthy()) + with mock.patch('swift.obj.reconstructor.os.path.getmtime', + return_value=12): + self.assertTrue(reconstructor.is_healthy()) def test_final_recon_dump(self): reconstructor = object_reconstructor.ObjectReconstructor(