diff --git a/bin/swift-object-replicator b/bin/swift-object-replicator index 6ad4fa9b..ed3cb852 100755 --- a/bin/swift-object-replicator +++ b/bin/swift-object-replicator @@ -27,5 +27,8 @@ if __name__ == '__main__': parser.add_option('-p', '--partitions', help='Replicate only given partitions. ' 'Comma-separated list') + parser.add_option('-i', '--policies', + help='Replicate only given policy indices. ' + 'Comma-separated list') conf_file, options = parse_options(parser=parser, once=True) run_daemon(ObjectReplicator, conf_file, **options) diff --git a/swift/obj/replicator.py b/swift/obj/replicator.py index 5cad5d4c..2964a52d 100644 --- a/swift/obj/replicator.py +++ b/swift/obj/replicator.py @@ -456,7 +456,8 @@ class ObjectReplicator(Daemon): continue return jobs - def collect_jobs(self, override_devices=None, override_partitions=None): + def collect_jobs(self, override_devices=None, override_partitions=None, + override_policies=None): """ Returns a sorted list of jobs (dictionaries) that specify the partitions, nodes, etc to be rsynced. @@ -465,11 +466,15 @@ class ObjectReplicator(Daemon): will be returned :param override_partitions: if set, only jobs on these partitions will be returned - + :param override_policies: if set, only jobs in these storage + policies will be returned """ jobs = [] ips = whataremyips() for policy in POLICIES: + if (override_policies is not None + and str(policy.idx) not in override_policies): + continue # may need to branch here for future policy types jobs += self.process_repl(policy, ips, override_devices=override_devices, @@ -481,7 +486,8 @@ class ObjectReplicator(Daemon): self.job_count = len(jobs) return jobs - def replicate(self, override_devices=None, override_partitions=None): + def replicate(self, override_devices=None, override_partitions=None, + override_policies=None): """Run a replication pass""" self.start = time.time() self.suffix_count = 0 @@ -498,7 +504,8 @@ class ObjectReplicator(Daemon): try: self.run_pool = GreenPool(size=self.concurrency) jobs = self.collect_jobs(override_devices=override_devices, - override_partitions=override_partitions) + override_partitions=override_partitions, + override_policies=override_policies) for job in jobs: dev_path = join(self.devices_dir, job['device']) if self.mount_check and not ismount(dev_path): @@ -539,14 +546,18 @@ class ObjectReplicator(Daemon): override_devices = list_from_csv(kwargs.get('devices')) override_partitions = list_from_csv(kwargs.get('partitions')) + override_policies = list_from_csv(kwargs.get('policies')) if not override_devices: override_devices = None if not override_partitions: override_partitions = None + if not override_policies: + override_policies = None self.replicate( override_devices=override_devices, - override_partitions=override_partitions) + override_partitions=override_partitions, + override_policies=override_policies) total = (time.time() - start) / 60 self.logger.info( _("Object replication complete (once). (%.02f minutes)"), total) diff --git a/test/unit/obj/test_replicator.py b/test/unit/obj/test_replicator.py index 5d680a29..c9377872 100644 --- a/test/unit/obj/test_replicator.py +++ b/test/unit/obj/test_replicator.py @@ -261,8 +261,7 @@ class TestObjectReplicator(unittest.TestCase): def blowup_mkdirs(path): raise OSError('Ow!') - mkdirs_orig = object_replicator.mkdirs - try: + with mock.patch.object(object_replicator, 'mkdirs', blowup_mkdirs): rmtree(self.objects, ignore_errors=1) object_replicator.mkdirs = blowup_mkdirs self.replicator.collect_jobs() @@ -275,8 +274,6 @@ class TestObjectReplicator(unittest.TestCase): self.assertTrue(exc_args[0].startswith('ERROR creating ')) self.assertEquals(exc_kwargs, {}) self.assertEquals(exc_str, 'Ow!') - finally: - object_replicator.mkdirs = mkdirs_orig def test_collect_jobs(self): jobs = self.replicator.collect_jobs() @@ -546,6 +543,27 @@ class TestObjectReplicator(unittest.TestCase): override_partitions=['1']) self.assertFalse(os.access(part_path, os.F_OK)) + def test_delete_policy_override_params(self): + df0 = self.df_mgr.get_diskfile('sda', '99', 'a', 'c', 'o') + df1 = self.df_mgr.get_diskfile('sda', '99', 'a', 'c', 'o', + policy_idx=1) + mkdirs(df0._datadir) + mkdirs(df1._datadir) + + pol0_part_path = os.path.join(self.objects, '99') + pol1_part_path = os.path.join(self.objects_1, '99') + + # sanity checks + self.assertTrue(os.access(pol0_part_path, os.F_OK)) + self.assertTrue(os.access(pol1_part_path, os.F_OK)) + + # a bogus policy index doesn't bother the replicator any more than a + # bogus device or partition does + self.replicator.run_once(policies='1,2,5') + + self.assertFalse(os.access(pol1_part_path, os.F_OK)) + self.assertTrue(os.access(pol0_part_path, os.F_OK)) + def test_run_once_recover_from_failure(self): conf = dict(swift_dir=self.testdir, devices=self.devices, mount_check='false', timeout='300', stats_interval='1')