diff --git a/swift/cli/relinker.py b/swift/cli/relinker.py
index f35a7e16b8..38edd90c17 100644
--- a/swift/cli/relinker.py
+++ b/swift/cli/relinker.py
@@ -22,8 +22,6 @@ import logging
 import os
 from functools import partial
 from swift.common.storage_policy import POLICIES
-from swift.common.exceptions import DiskFileDeleted, DiskFileNotExist, \
-    DiskFileQuarantined
 from swift.common.utils import replace_partition_in_path, config_true_value, \
     audit_location_generator, get_logger, readconf, drop_privileges, \
     RateLimitedIterator, lock_path
@@ -215,8 +213,9 @@ def hook_post_partition(logger, states, step, policy, diskfile_manager,
 def hashes_filter(next_part_power, suff_path, hashes):
     hashes = list(hashes)
     for hsh in hashes:
-        fname = os.path.join(suff_path, hsh, 'fake-file-name')
-        if replace_partition_in_path(fname, next_part_power) == fname:
+        fname = os.path.join(suff_path, hsh)
+        if fname == replace_partition_in_path(
+                fname, next_part_power, is_hash_dir=True):
             hashes.remove(hsh)
     return hashes
 
@@ -363,79 +362,109 @@ def cleanup(conf, logger, device):
             partitions_filter=cleanup_partition_filter,
             hook_post_partition=cleanup_hook_post_partition,
             hashes_filter=cleanup_hashes_filter,
-            logger=logger, error_counter=error_counter)
+            logger=logger,
+            error_counter=error_counter,
+            yield_hash_dirs=True)
         if conf['files_per_second'] > 0:
             locations = RateLimitedIterator(
                 locations, conf['files_per_second'])
-        for fname, device, partition in locations:
-            # Relinking will take a while; we'll likely have some tombstones
-            # transition to being reapable during the process. When we open
-            # them in the new partition space, they'll get cleaned up and
-            # raise DiskFileNotExist. Without replicators running, this is
-            # likely the first opportunity for clean-up. To avoid a false-
-            # positive error below, open up in the old space *first* -- if
-            # that raises DiskFileNotExist, ignore it and move on.
-            loc = diskfile.AuditLocation(
-                os.path.dirname(fname), device, partition, policy)
-            df = diskfile_mgr.get_diskfile_from_audit_location(loc)
-            try:
-                with df.open():
-                    pass
-            except DiskFileQuarantined as exc:
-                logger.warning('ERROR Object %(obj)s failed audit and was'
-                               ' quarantined: %(err)r',
-                               {'obj': loc, 'err': exc})
-                errors += 1
-                continue
-            except DiskFileNotExist:
-                logger.debug('Found reapable on-disk file: %s', fname)
+
+        for hash_path, device, partition in locations:
+            # Compare the contents of each hash dir with contents of same hash
+            # dir in its new partition to verify that the new location has the
+            # most up to date set of files. The new location may have newer
+            # files if it has been updated since relinked.
+            new_hash_path = replace_partition_in_path(
+                hash_path, part_power, is_hash_dir=True)
+
+            if new_hash_path == hash_path:
                 continue
 
-            expected_fname = replace_partition_in_path(fname, part_power)
-            if fname == expected_fname:
-                continue
-            # Make sure there is a valid object file in the expected new
-            # location. Note that this could be newer than the original one
-            # (which happens if there is another PUT after partition power
-            # has been increased, but cleanup did not yet run)
-            loc = diskfile.AuditLocation(
-                os.path.dirname(expected_fname), device, partition, policy)
-            df = diskfile_mgr.get_diskfile_from_audit_location(loc)
-            try:
-                with df.open():
-                    pass
-            except DiskFileQuarantined as exc:
-                logger.warning('ERROR Object %(obj)s failed audit and was'
-                               ' quarantined: %(err)r',
-                               {'obj': loc, 'err': exc})
-                errors += 1
-                continue
-            except DiskFileDeleted:
-                pass
-            except DiskFileNotExist as exc:
-                err = False
-                if policy.policy_type == 'erasure_coding':
-                    # Might be a non-durable fragment - check that there is
-                    # a fragment in the new path. Will be fixed by the
-                    # reconstructor then
-                    if not os.path.isfile(expected_fname):
-                        err = True
-                else:
-                    err = True
-                if err:
+            # Get on disk data for new and old locations, cleaning up any
+            # reclaimable or obsolete files in each. The new location is
+            # cleaned up *before* the old location to prevent false negatives
+            # where the old still has a file that has been cleaned up in the
+            # new; cleaning up the new location first ensures that the old will
+            # always be 'cleaner' than the new.
+            new_df_data = diskfile_mgr.cleanup_ondisk_files(new_hash_path)
+            old_df_data = diskfile_mgr.cleanup_ondisk_files(hash_path)
+            # Now determine the most up to date set of on disk files would be
+            # given the content of old and new locations...
+            new_files = set(new_df_data['files'])
+            old_files = set(old_df_data['files'])
+            union_files = new_files.union(old_files)
+            union_data = diskfile_mgr.get_ondisk_files(
+                union_files, '', verify=False)
+            obsolete_files = set(info['filename']
+                                 for info in union_data.get('obsolete', []))
+            required_files = union_files.difference(obsolete_files)
+            required_links = required_files.intersection(old_files)
+
+            missing_links = 0
+            created_links = 0
+            for filename in required_links:
+                # Before removing old files, be sure that the corresponding
+                # required new files exist by calling relink_paths again. There
+                # are several possible outcomes:
+                #  - The common case is that the new file exists, in which case
+                #    relink_paths checks that the new file has the same inode
+                #    as the old file. An exception is raised if the inode of
+                #    the new file is not the same as the old file.
+                #  - The new file may not exist because the relinker failed to
+                #    create the link to the old file and has erroneously moved
+                #    on to cleanup. In this case the relink_paths will create
+                #    the link now or raise an exception if that fails.
+                #  - The new file may not exist because some other process,
+                #    such as an object server handling a request, has cleaned
+                #    it up since we called cleanup_ondisk_files(new_hash_path).
+                #    In this case a new link will be created to the old file.
+                #    This is unnecessary but simpler than repeating the
+                #    evaluation of what links are now required and safer than
+                #    assuming that a non-existent file that *was* required is
+                #    no longer required. The new file will eventually be
+                #    cleaned up again.
+                old_file = os.path.join(hash_path, filename)
+                new_file = os.path.join(new_hash_path, filename)
+                try:
+                    if diskfile.relink_paths(old_file, new_file):
+                        logger.debug(
+                            "Relinking (cleanup) created link: %s to %s",
+                            old_file, new_file)
+                        created_links += 1
+                except OSError as exc:
                     logger.warning(
-                        'Error cleaning up %s: %r', fname, exc)
+                        "Error relinking (cleanup): failed to relink %s to "
+                        "%s: %s", old_file, new_file, exc)
                     errors += 1
-                    continue
+                    missing_links += 1
+            if created_links:
+                diskfile.invalidate_hash(os.path.dirname(new_hash_path))
+            if missing_links:
+                continue
+
+            # the new partition hash dir has the most up to date set of on
+            # disk files so it is safe to delete the old location...
+            rehash = False
             try:
-                os.remove(fname)
-                cleaned_up += 1
-                logger.debug("Removed %s", fname)
-                suffix_dir = os.path.dirname(os.path.dirname(fname))
-                diskfile.invalidate_hash(suffix_dir)
+                for filename in old_files:
+                    os.remove(os.path.join(hash_path, filename))
+                    rehash = True
             except OSError as exc:
-                logger.warning('Error cleaning up %s: %r', fname, exc)
+                logger.warning('Error cleaning up %s: %r', hash_path, exc)
                 errors += 1
+            else:
+                cleaned_up += 1
+                logger.debug("Removed %s", hash_path)
+
+            if rehash:
+                try:
+                    diskfile.invalidate_hash(os.path.dirname(hash_path))
+                except Exception as exc:
+                    # note: not counted as an error
+                    logger.warning(
+                        'Error invalidating suffix for %s: %r',
+                        hash_path, exc)
+
     return determine_exit_code(
         logger=logger,
         found_policy=found_policy,
diff --git a/swift/common/utils.py b/swift/common/utils.py
index e91f67e168..3094b99870 100644
--- a/swift/common/utils.py
+++ b/swift/common/utils.py
@@ -3217,7 +3217,7 @@ def audit_location_generator(devices, datadir, suffix='',
                              hook_pre_partition=None, hook_post_partition=None,
                              hook_pre_suffix=None, hook_post_suffix=None,
                              hook_pre_hash=None, hook_post_hash=None,
-                             error_counter=None):
+                             error_counter=None, yield_hash_dirs=False):
     """
     Given a devices path and a data directory, yield (path, device,
     partition) for all files in that directory
@@ -3236,6 +3236,7 @@ def audit_location_generator(devices, datadir, suffix='',
                     one of the DATADIR constants defined in the account,
                     container, and object servers.
     :param suffix: path name suffix required for all names returned
+                   (ignored if yield_hash_dirs is True)
     :param mount_check: Flag to check if a mount check should be performed
                     on devices
     :param logger: a logger object
@@ -3257,6 +3258,8 @@ def audit_location_generator(devices, datadir, suffix='',
     :param hook_post_hash: a callable taking hash_path as parameter
     :param error_counter: a dictionary used to accumulate error counts; may
                           add keys 'unmounted' and 'unlistable_partitions'
+    :param yield_hash_dirs: if True, yield hash dirs instead of individual
+                            files
     """
     device_dir = listdir(devices)
     # randomize devices in case of process restart before sweep completed
@@ -3316,17 +3319,21 @@ def audit_location_generator(devices, datadir, suffix='',
                     hash_path = os.path.join(suff_path, hsh)
                     if hook_pre_hash:
                         hook_pre_hash(hash_path)
-                    try:
-                        files = sorted(listdir(hash_path), reverse=True)
-                    except OSError as e:
-                        if e.errno != errno.ENOTDIR:
-                            raise
-                        continue
-                    for fname in files:
-                        if suffix and not fname.endswith(suffix):
+                    if yield_hash_dirs:
+                        if os.path.isdir(hash_path):
+                            yield hash_path, device, partition
+                    else:
+                        try:
+                            files = sorted(listdir(hash_path), reverse=True)
+                        except OSError as e:
+                            if e.errno != errno.ENOTDIR:
+                                raise
                             continue
-                        path = os.path.join(hash_path, fname)
-                        yield path, device, partition
+                        for fname in files:
+                            if suffix and not fname.endswith(suffix):
+                                continue
+                            path = os.path.join(hash_path, fname)
+                            yield path, device, partition
                     if hook_post_hash:
                         hook_post_hash(hash_path)
                 if hook_post_suffix:
@@ -5792,19 +5799,21 @@ def get_partition_for_hash(hex_hash, part_power):
     return struct.unpack_from('>I', raw_hash)[0] >> part_shift
 
 
-def replace_partition_in_path(path, part_power):
+def replace_partition_in_path(path, part_power, is_hash_dir=False):
     """
-    Takes a full path to a file and a partition power and returns
-    the same path, but with the correct partition number. Most useful when
-    increasing the partition power.
+    Takes a path and a partition power and returns the same path, but with the
+    correct partition number. Most useful when increasing the partition power.
 
     :param path: full path to a file, for example object .data file
     :param part_power: partition power to compute correct partition number
+    :param is_hash_dir: if True then ``path`` is the path to a hash dir,
+        otherwise ``path`` is the path to a file in a hash dir.
     :returns: Path with re-computed partition power
     """
     path_components = path.split(os.sep)
-    part = get_partition_for_hash(path_components[-2], part_power)
-    path_components[-4] = "%d" % part
+    part = get_partition_for_hash(path_components[-1 if is_hash_dir else -2],
+                                  part_power)
+    path_components[-3 if is_hash_dir else -4] = "%d" % part
     return os.sep.join(path_components)
 
 
diff --git a/swift/obj/diskfile.py b/swift/obj/diskfile.py
index 6423d02530..950e73670b 100644
--- a/swift/obj/diskfile.py
+++ b/swift/obj/diskfile.py
@@ -935,7 +935,9 @@ class BaseDiskFileManager(object):
         valid fileset, or None.
 
         :param files: a list of file names.
-        :param datadir: directory name files are from.
+        :param datadir: directory name files are from; this is used to
+                        construct file paths in the results, but the datadir is
+                        not modified by this method.
         :param verify: if True verify that the ondisk file contract has not
                        been violated, otherwise do not verify.
         :param policy: storage policy used to store the files. Used to
diff --git a/test/unit/cli/test_relinker.py b/test/unit/cli/test_relinker.py
index eab781c3cf..317854a920 100644
--- a/test/unit/cli/test_relinker.py
+++ b/test/unit/cli/test_relinker.py
@@ -31,7 +31,7 @@ import uuid
 from six.moves import cStringIO as StringIO
 
 from swift.cli import relinker
-from swift.common import exceptions, ring, utils
+from swift.common import ring, utils
 from swift.common import storage_policy
 from swift.common.exceptions import PathNotDir
 from swift.common.storage_policy import (
@@ -81,7 +81,7 @@ class TestRelinker(unittest.TestCase):
             digest = binascii.unhexlify(self._hash)
             self.part = struct.unpack_from('>I', digest)[0] >> 24
             self.next_part = struct.unpack_from('>I', digest)[0] >> 23
-            path = os.path.join(os.path.sep, account, container, obj)
+            self.obj_path = os.path.join(os.path.sep, account, container, obj)
             # There's 1/512 chance that both old and new parts will be 0;
             # that's not a terribly interesting case, as there's nothing to do
             attempts.append((self.part, self.next_part, 2**PART_POWER))
@@ -97,21 +97,23 @@ class TestRelinker(unittest.TestCase):
         self.objdir = os.path.join(
             self.objects, str(self.part), self._hash[-3:], self._hash)
         os.makedirs(self.objdir)
-        self.object_fname = utils.Timestamp.now().internal + ".data"
+        self.obj_ts = utils.Timestamp.now()
+        self.object_fname = self.obj_ts.internal + ".data"
 
         self.objname = os.path.join(self.objdir, self.object_fname)
         with open(self.objname, "wb") as dummy:
             dummy.write(b"Hello World!")
-            write_metadata(dummy, {'name': path, 'Content-Length': '12'})
+            write_metadata(dummy,
+                           {'name': self.obj_path, 'Content-Length': '12'})
 
         self.policy = StoragePolicy(0, 'platinum', True)
         storage_policy._POLICIES = StoragePolicyCollection([self.policy])
 
         self.part_dir = os.path.join(self.objects, str(self.part))
-        self.suffix_dir = os.path.join(self.part_dir, self._hash[-3:])
+        self.suffix = self._hash[-3:]
+        self.suffix_dir = os.path.join(self.part_dir, self.suffix)
         self.next_part_dir = os.path.join(self.objects, str(self.next_part))
-        self.next_suffix_dir = os.path.join(
-            self.next_part_dir, self._hash[-3:])
+        self.next_suffix_dir = os.path.join(self.next_part_dir, self.suffix)
         self.expected_dir = os.path.join(self.next_suffix_dir, self._hash)
         self.expected_file = os.path.join(self.expected_dir, self.object_fname)
 
@@ -599,7 +601,7 @@ class TestRelinker(unittest.TestCase):
         # partition!
         self._setup_object(lambda part: part < 2 ** (PART_POWER - 1))
         with mock.patch('swift.cli.relinker.replace_partition_in_path',
-                        lambda *args: args[0]):
+                        lambda *args, **kwargs: args[0]):
             self.assertEqual(0, relinker.main([
                 'cleanup',
                 '--swift-dir', self.testdir,
@@ -611,11 +613,11 @@ class TestRelinker(unittest.TestCase):
     def test_cleanup_second_quartile_no_rehash(self):
         # we need a part in upper half of current part power
         self._setup_object(lambda part: part >= 2 ** (PART_POWER - 1))
-        self.assertGreater(self.part, 2 ** (PART_POWER - 1))
+        self.assertGreaterEqual(self.part, 2 ** (PART_POWER - 1))
         self._common_test_cleanup()
 
         def fake_hash_suffix(suffix_dir, policy):
-            # check that the suffix dir is empty and remove it just like the
+            # check that the hash dir is empty and remove it just like the
             # real _hash_suffix
             self.assertEqual([self._hash], os.listdir(suffix_dir))
             hash_dir = os.path.join(suffix_dir, self._hash)
@@ -984,21 +986,120 @@ class TestRelinker(unittest.TestCase):
         os.close(locks[0])  # Release the lock
 
     def test_cleanup_not_yet_relinked(self):
+        # force rehash of new partition to not happen during cleanup
+        self._setup_object(lambda part: part >= 2 ** (PART_POWER - 1))
         self._common_test_cleanup(relink=False)
-        self.assertEqual(1, relinker.main([
-            'cleanup',
-            '--swift-dir', self.testdir,
-            '--devices', self.devices,
-            '--skip-mount',
-        ]))
+        with mock.patch.object(relinker.logging, 'getLogger',
+                               return_value=self.logger):
+            self.assertEqual(0, relinker.main([
+                'cleanup',
+                '--swift-dir', self.testdir,
+                '--devices', self.devices,
+                '--skip-mount',
+            ]))
 
-        self.assertTrue(os.path.isfile(
-            os.path.join(self.objdir, self.object_fname)))
+        self.assertFalse(os.path.isfile(self.objname))  # old file removed
+        self.assertTrue(os.path.isfile(self.expected_file))  # link created
+        self.assertEqual([], self.logger.get_lines_for_level('warning'))
+        self.assertIn(
+            'Relinking (cleanup) created link: %s to %s'
+            % (self.objname, self.expected_file),
+            self.logger.get_lines_for_level('debug'))
+        self.assertEqual([], self.logger.get_lines_for_level('warning'))
+        # old partition should be cleaned up
+        self.assertFalse(os.path.exists(self.part_dir))
+        # suffix should be invalidated in new partition
+        hashes_invalid = os.path.join(self.next_part_dir, 'hashes.invalid')
+        self.assertTrue(os.path.exists(hashes_invalid))
+        with open(hashes_invalid, 'r') as fd:
+            self.assertEqual(str(self.suffix), fd.read().strip())
+
+    def test_cleanup_same_object_different_inode_in_new_partition(self):
+        # force rehash of new partition to not happen during cleanup
+        self._setup_object(lambda part: part >= 2 ** (PART_POWER - 1))
+        self._common_test_cleanup(relink=False)
+        # new file in the new partition but different inode
+        os.makedirs(self.expected_dir)
+        with open(self.expected_file, 'w') as fd:
+            fd.write('same but different')
+
+        with mock.patch.object(relinker.logging, 'getLogger',
+                               return_value=self.logger):
+            res = relinker.main([
+                'cleanup',
+                '--swift-dir', self.testdir,
+                '--devices', self.devices,
+                '--skip-mount',
+            ])
+
+        self.assertEqual(1, res)
+        self.assertTrue(os.path.isfile(self.objname))
+        with open(self.objname, 'r') as fd:
+            self.assertEqual('Hello World!', fd.read())
+        self.assertTrue(os.path.isfile(self.expected_file))
+        with open(self.expected_file, 'r') as fd:
+            self.assertEqual('same but different', fd.read())
+        warning_lines = self.logger.get_lines_for_level('warning')
+        self.assertEqual(1, len(warning_lines), warning_lines)
+        self.assertIn('Error relinking (cleanup): failed to relink %s to %s'
+                      % (self.objname, self.expected_file), warning_lines[0])
+        # suffix should not be invalidated in new partition
+        hashes_invalid = os.path.join(self.next_part_dir, 'hashes.invalid')
+        self.assertFalse(os.path.exists(hashes_invalid))
+
+    def test_cleanup_older_object_in_new_partition(self):
+        # relink of the current object failed, but there is an older version of
+        # same object in the new partition
+        # force rehash of new partition to not happen during cleanup
+        self._setup_object(lambda part: part >= 2 ** (PART_POWER - 1))
+        self._common_test_cleanup(relink=False)
+        os.makedirs(self.expected_dir)
+        older_obj_file = os.path.join(
+            self.expected_dir,
+            utils.Timestamp(int(self.obj_ts) - 1).internal + '.data')
+        with open(older_obj_file, "wb") as fd:
+            fd.write(b"Hello Olde Worlde!")
+            write_metadata(fd, {'name': self.obj_path, 'Content-Length': '18'})
+
+        with mock.patch.object(relinker.logging, 'getLogger',
+                               return_value=self.logger):
+            res = relinker.main([
+                'cleanup',
+                '--swift-dir', self.testdir,
+                '--devices', self.devices,
+                '--skip-mount',
+            ])
+
+        self.assertEqual(0, res)
+        self.assertFalse(os.path.isfile(self.objname))  # old file removed
+        self.assertTrue(os.path.isfile(older_obj_file))
+        self.assertTrue(os.path.isfile(self.expected_file))  # link created
+        self.assertIn(
+            'Relinking (cleanup) created link: %s to %s'
+            % (self.objname, self.expected_file),
+            self.logger.get_lines_for_level('debug'))
+        self.assertEqual([], self.logger.get_lines_for_level('warning'))
+        # old partition should be cleaned up
+        self.assertFalse(os.path.exists(self.part_dir))
+        # suffix should be invalidated in new partition
+        hashes_invalid = os.path.join(self.next_part_dir, 'hashes.invalid')
+        self.assertTrue(os.path.exists(hashes_invalid))
+        with open(hashes_invalid, 'r') as fd:
+            self.assertEqual(str(self.suffix), fd.read().strip())
 
     def test_cleanup_deleted(self):
+        # force rehash of new partition to not happen during cleanup
+        self._setup_object(lambda part: part >= 2 ** (PART_POWER - 1))
         self._common_test_cleanup()
+        # rehash during relink creates hashes.invalid...
+        hashes_invalid = os.path.join(self.next_part_dir, 'hashes.invalid')
+        self.assertTrue(os.path.exists(hashes_invalid))
 
         # Pretend the object got deleted in between and there is a tombstone
+        # note: the tombstone would normally be at a newer timestamp but here
+        # we make the tombstone at same timestamp - it  is treated as the
+        # 'required' file in the new partition, so the .data is deleted in the
+        # old partition
         fname_ts = self.expected_file[:-4] + "ts"
         os.rename(self.expected_file, fname_ts)
 
@@ -1008,6 +1109,14 @@ class TestRelinker(unittest.TestCase):
             '--devices', self.devices,
             '--skip-mount',
         ]))
+        self.assertTrue(os.path.isfile(fname_ts))
+        self.assertFalse(os.path.exists(self.objname))
+        # old partition should be cleaned up
+        self.assertFalse(os.path.exists(self.part_dir))
+        # suffix should not be invalidated in new partition
+        self.assertTrue(os.path.exists(hashes_invalid))
+        with open(hashes_invalid, 'r') as fd:
+            self.assertEqual('', fd.read().strip())
 
     def test_cleanup_reapable(self):
         # relink a tombstone
@@ -1029,32 +1138,68 @@ class TestRelinker(unittest.TestCase):
             ]))
         self.assertEqual(self.logger.get_lines_for_level('error'), [])
         self.assertEqual(self.logger.get_lines_for_level('warning'), [])
-        self.assertIn(
-            "Found reapable on-disk file: %s" % self.objname,
-            self.logger.get_lines_for_level('debug'))
-        # self.expected_file may or may not exist; it depends on whether the
-        # object was in the upper-half of the partition space. ultimately,
-        # that part doesn't really matter much -- but we definitely *don't*
-        # want self.objname around polluting the old partition space.
+        # reclaimed during relinker cleanup...
         self.assertFalse(os.path.exists(self.objname))
+        # reclaimed during relinker relink or relinker cleanup, depending on
+        # which quartile the partition is in ...
+        self.assertFalse(os.path.exists(self.expected_file))
 
-    def test_cleanup_doesnotexist(self):
+    def test_cleanup_new_does_not_exist(self):
         self._common_test_cleanup()
-
-        # Pretend the file in the new place got deleted inbetween
+        # Pretend the file in the new place got deleted in between relink and
+        # cleanup: cleanup should re-create the link
         os.remove(self.expected_file)
 
         with mock.patch.object(relinker.logging, 'getLogger',
                                return_value=self.logger):
-            self.assertEqual(1, relinker.main([
+            self.assertEqual(0, relinker.main([
                 'cleanup',
                 '--swift-dir', self.testdir,
                 '--devices', self.devices,
                 '--skip-mount',
             ]))
-        self.assertEqual(self.logger.get_lines_for_level('warning'),
-                         ['Error cleaning up %s: %s' % (self.objname,
-                          repr(exceptions.DiskFileNotExist()))])
+        self.assertTrue(os.path.isfile(self.expected_file))  # link created
+        self.assertFalse(os.path.exists(self.objname))  # link created
+        self.assertIn(
+            'Relinking (cleanup) created link: %s to %s'
+            % (self.objname, self.expected_file),
+            self.logger.get_lines_for_level('debug'))
+        self.assertEqual([], self.logger.get_lines_for_level('warning'))
+
+    def test_cleanup_new_does_not_exist_and_relink_fails(self):
+        # force rehash of new partition to not happen during cleanup
+        self._setup_object(lambda part: part >= 2 ** (PART_POWER - 1))
+        self._common_test_cleanup()
+        # rehash during relink creates hashes.invalid...
+        hashes_invalid = os.path.join(self.next_part_dir, 'hashes.invalid')
+        self.assertTrue(os.path.exists(hashes_invalid))
+        # Pretend the file in the new place got deleted in between relink and
+        # cleanup: cleanup attempts to re-create the link but fails
+        os.remove(self.expected_file)
+
+        with mock.patch('swift.obj.diskfile.os.link', side_effect=OSError):
+            with mock.patch.object(relinker.logging, 'getLogger',
+                                   return_value=self.logger):
+                self.assertEqual(1, relinker.main([
+                    'cleanup',
+                    '--swift-dir', self.testdir,
+                    '--devices', self.devices,
+                    '--skip-mount',
+                ]))
+        self.assertFalse(os.path.isfile(self.expected_file))
+        self.assertTrue(os.path.isfile(self.objname))  # old file intact
+        self.assertEqual(
+            self.logger.get_lines_for_level('warning'),
+            ['Error relinking (cleanup): failed to relink %s to %s: '
+             % (self.objname, self.expected_file)]
+        )
+        # suffix should not be invalidated in new partition
+        self.assertTrue(os.path.exists(hashes_invalid))
+        with open(hashes_invalid, 'r') as fd:
+            self.assertEqual('', fd.read().strip())
+        # nor in the old partition
+        old_hashes_invalid = os.path.join(self.part_dir, 'hashes.invalid')
+        self.assertFalse(os.path.exists(old_hashes_invalid))
 
     @patch_policies(
         [ECStoragePolicy(
@@ -1062,7 +1207,6 @@ class TestRelinker(unittest.TestCase):
          ec_ndata=4, ec_nparity=2)])
     def test_cleanup_diskfile_error(self):
         self._common_test_cleanup()
-
         # Switch the policy type so all fragments raise DiskFileError.
         with mock.patch.object(relinker.logging, 'getLogger',
                                return_value=self.logger):
@@ -1073,36 +1217,12 @@ class TestRelinker(unittest.TestCase):
                 '--skip-mount',
             ]))
         log_lines = self.logger.get_lines_for_level('warning')
-        self.assertEqual(3, len(log_lines),
-                         'Expected 3 log lines, got %r' % log_lines)
-        # Once to check the old partition space...
-        self.assertIn('Bad fragment index: None', log_lines[0])
-        # ... again for the new partition ...
-        self.assertIn('Bad fragment index: None', log_lines[0])
-        # ... and one last time for the rehash
-        self.assertIn('Bad fragment index: None', log_lines[1])
-
-    def test_cleanup_quarantined(self):
-        self._common_test_cleanup()
-        # Pretend the object in the new place got corrupted
-        with open(self.expected_file, "wb") as obj:
-            obj.write(b'trash')
-
-        with mock.patch.object(relinker.logging, 'getLogger',
-                               return_value=self.logger):
-            self.assertEqual(1, relinker.main([
-                'cleanup',
-                '--swift-dir', self.testdir,
-                '--devices', self.devices,
-                '--skip-mount',
-            ]))
-
-        log_lines = self.logger.get_lines_for_level('warning')
-        self.assertEqual(2, len(log_lines),
-                         'Expected 2 log lines, got %r' % log_lines)
-        self.assertIn('metadata content-length 12 does not match '
-                      'actual object size 5', log_lines[0])
-        self.assertIn('failed audit and was quarantined', log_lines[1])
+        # once for cleanup_ondisk_files in old and new location, once for
+        # get_ondisk_files of union of files, once for the rehash
+        self.assertEqual(4, len(log_lines),
+                         'Expected 5 log lines, got %r' % log_lines)
+        for line in log_lines:
+            self.assertIn('Bad fragment index: None', line, log_lines)
 
     def test_rehashing(self):
         calls = []
diff --git a/test/unit/common/test_utils.py b/test/unit/common/test_utils.py
index bbaa21bf2d..65408fbc18 100644
--- a/test/unit/common/test_utils.py
+++ b/test/unit/common/test_utils.py
@@ -4358,6 +4358,14 @@ cluster_dfw1 = http://dfw1.host/v1/
         self.assertEqual(utils.replace_partition_in_path(old, 10), old)
         self.assertEqual(utils.replace_partition_in_path(new, 11), new)
 
+        # check hash_dir option
+        old = '/s/n/d/o/700/c77/af088baea4806dcaba30bf07d9e64c77'
+        exp = '/s/n/d/o/1400/c77/af088baea4806dcaba30bf07d9e64c77'
+        actual = utils.replace_partition_in_path(old, 11, is_hash_dir=True)
+        self.assertEqual(exp, actual)
+        actual = utils.replace_partition_in_path(exp, 11, is_hash_dir=True)
+        self.assertEqual(exp, actual)
+
     def test_round_robin_iter(self):
         it1 = iter([1, 2, 3])
         it2 = iter([4, 5])