From a14d2c857c9bde7e55554d15eca89a33dba9e339 Mon Sep 17 00:00:00 2001
From: Clay Gerrard <clay.gerrard@gmail.com>
Date: Tue, 29 Apr 2014 00:21:08 -0700
Subject: [PATCH] Enqueue misplaced objects during container replication

After a container database is replicated, a _post_replicate_hook will enqueue
misplaced objects for the container-reconciler into the .misplaced_objects
containers.  Items to be reconciled are "batch loaded" into the reconciler
queue and the end of a container replication cycle by levering container
replication itself.

DocImpact
Implements: blueprint storage-policies
Change-Id: I3627efcdea75403586dffee46537a60add08bfda
---
 swift/common/db_replicator.py                 |  21 +-
 swift/common/exceptions.py                    |   4 +
 swift/container/replicator.py                 | 178 ++++++++-
 .../test_container_merge_policy_index.py      | 366 +++++++++++++++++-
 test/unit/common/test_db_replicator.py        |  23 +-
 test/unit/container/test_replicator.py        | 190 +++++++++
 6 files changed, 769 insertions(+), 13 deletions(-)

diff --git a/swift/common/db_replicator.py b/swift/common/db_replicator.py
index 9e1282c0ba..952130ae8e 100644
--- a/swift/common/db_replicator.py
+++ b/swift/common/db_replicator.py
@@ -156,6 +156,7 @@ class Replicator(Daemon):
         self.cpool = GreenPool(size=concurrency)
         swift_dir = conf.get('swift_dir', '/etc/swift')
         self.ring = ring.Ring(swift_dir, ring_name=self.server_type)
+        self._local_device_ids = set()
         self.per_diff = int(conf.get('per_diff', 1000))
         self.max_diffs = int(conf.get('max_diffs') or 100)
         self.interval = int(conf.get('interval') or
@@ -406,6 +407,14 @@ class Replicator(Daemon):
             return self._usync_db(max(rinfo['point'], local_sync),
                                   broker, http, rinfo['id'], info['id'])
 
+    def _post_replicate_hook(self, broker, info, responses):
+        """
+        :param broker: the container that just replicated
+        :param info: pre-replication full info dict
+        :param responses: a list of bools indicating success from nodes
+        """
+        pass
+
     def _replicate_object(self, partition, object_file, node_id):
         """
         Replicate the db, choosing method based on whether or not it
@@ -490,13 +499,19 @@ class Replicator(Daemon):
             self.stats['success' if success else 'failure'] += 1
             self.logger.increment('successes' if success else 'failures')
             responses.append(success)
+        try:
+            self._post_replicate_hook(broker, info, responses)
+        except (Exception, Timeout):
+            self.logger.exception('UNHANDLED EXCEPTION: in post replicate '
+                                  'hook for %s', broker.db_file)
         if not shouldbehere and all(responses):
             # If the db shouldn't be on this node and has been successfully
             # synced to all of its peers, it can be removed.
-            self.delete_db(object_file)
+            self.delete_db(broker)
         self.logger.timing_since('timing', start_time)
 
-    def delete_db(self, object_file):
+    def delete_db(self, broker):
+        object_file = broker.db_file
         hash_dir = os.path.dirname(object_file)
         suf_dir = os.path.dirname(hash_dir)
         with lock_parent_directory(object_file):
@@ -534,6 +549,7 @@ class Replicator(Daemon):
         if not ips:
             self.logger.error(_('ERROR Failed to get my own IPs?'))
             return
+        self._local_device_ids = set()
         for node in self.ring.devs:
             if (node and node['replication_ip'] in ips and
                     node['replication_port'] == self.port):
@@ -547,6 +563,7 @@ class Replicator(Daemon):
                     time.time() - self.reclaim_age)
                 datadir = os.path.join(self.root, node['device'], self.datadir)
                 if os.path.isdir(datadir):
+                    self._local_device_ids.add(node['id'])
                     dirs.append((datadir, node['id']))
         self.logger.info(_('Beginning replication run'))
         for part, object_file, node_id in roundrobin_datadirs(dirs):
diff --git a/swift/common/exceptions.py b/swift/common/exceptions.py
index 29f0631689..a3e6cba0f7 100644
--- a/swift/common/exceptions.py
+++ b/swift/common/exceptions.py
@@ -69,6 +69,10 @@ class DiskFileDeviceUnavailable(DiskFileError):
     pass
 
 
+class DeviceUnavailable(SwiftException):
+    pass
+
+
 class PathNotDir(OSError):
     pass
 
diff --git a/swift/container/replicator.py b/swift/container/replicator.py
index b0a2e911a3..d60b53c305 100644
--- a/swift/container/replicator.py
+++ b/swift/container/replicator.py
@@ -13,14 +13,23 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
+import os
+import itertools
 import time
+from collections import defaultdict
+from eventlet import Timeout
 
 from swift.container.backend import ContainerBroker, DATADIR
-from swift.container.reconciler import incorrect_policy_index
+from swift.container.reconciler import (
+    MISPLACED_OBJECTS_ACCOUNT, incorrect_policy_index,
+    get_reconciler_container_name, get_row_to_q_entry_translater)
 from swift.common import db_replicator
-from swift.common.utils import json, normalize_timestamp
-from swift.common.http import is_success
 from swift.common.storage_policy import POLICIES
+from swift.common.exceptions import DeviceUnavailable
+from swift.common.http import is_success
+from swift.common.db import DatabaseAlreadyExists
+from swift.common.utils import (json, normalize_timestamp, hash_path,
+                                storage_directory, quorum_size)
 
 
 class ContainerReplicator(db_replicator.Replicator):
@@ -60,6 +69,169 @@ class ContainerReplicator(db_replicator.Replicator):
             node, response, info, broker, http)
         return rv
 
+    def find_local_handoff_for_part(self, part):
+        """
+        Look through devices in the ring for the first handoff devie that was
+        identified during job creation as available on this node.
+
+        :returns: a node entry from the ring
+        """
+        nodes = self.ring.get_part_nodes(part)
+        more_nodes = self.ring.get_more_nodes(part)
+
+        for node in itertools.chain(nodes, more_nodes):
+            if node['id'] in self._local_device_ids:
+                return node
+        return None
+
+    def get_reconciler_broker(self, timestamp):
+        """
+        Get a local instance of the reconciler container broker that is
+        appropriate to enqueue the given timestamp.
+
+        :param timestamp: the timestamp of the row to be enqueued
+
+        :returns: a local reconciler broker
+        """
+        container = get_reconciler_container_name(timestamp)
+        if self.reconciler_containers and \
+                container in self.reconciler_containers:
+            return self.reconciler_containers[container][1]
+        account = MISPLACED_OBJECTS_ACCOUNT
+        part = self.ring.get_part(account, container)
+        node = self.find_local_handoff_for_part(part)
+        if not node:
+            raise DeviceUnavailable(
+                'No mounted devices found suitable to Handoff reconciler '
+                'container %s in partition %s' % (container, part))
+        hsh = hash_path(account, container)
+        db_dir = storage_directory(DATADIR, part, hsh)
+        db_path = os.path.join(self.root, node['device'], db_dir, hsh + '.db')
+        broker = ContainerBroker(db_path, account=account, container=container)
+        if not os.path.exists(broker.db_file):
+            try:
+                broker.initialize(timestamp, 0)
+            except DatabaseAlreadyExists:
+                pass
+        if self.reconciler_containers is not None:
+            self.reconciler_containers[container] = part, broker, node['id']
+        return broker
+
+    def feed_reconciler(self, container, item_list):
+        """
+        Add queue entries for rows in item_list to the local reconciler
+        container database.
+
+        :param container: the name of the reconciler container
+        :param item_list: the list of rows to enqueue
+
+        :returns: True if successfully enqueued
+        """
+
+        try:
+            reconciler = self.get_reconciler_broker(container)
+        except DeviceUnavailable as e:
+            self.logger.warning('DeviceUnavailable: %s', e)
+            return False
+        self.logger.debug('Adding %d objects to the reconciler at %s',
+                          len(item_list), reconciler.db_file)
+        try:
+            reconciler.merge_items(item_list)
+        except (Exception, Timeout):
+            self.logger.exception('UNHANDLED EXCEPTION: trying to merge '
+                                  '%d items to reconciler container %s',
+                                  len(item_list), reconciler.db_file)
+            return False
+        return True
+
+    def dump_to_reconciler(self, broker, point):
+        """
+        Look for object rows for objects updates in the wrong storage policy
+        in broker with a ``ROWID`` greater than the rowid given as point.
+
+        :param broker: the container broker with misplaced objects
+        :param point: the last verified ``reconciler_sync_point``
+
+        :returns: the last successfull enqueued rowid
+        """
+        max_sync = broker.get_max_row()
+        misplaced = broker.get_misplaced_since(point, self.per_diff)
+        if not misplaced:
+            return max_sync
+        translater = get_row_to_q_entry_translater(broker)
+        errors = False
+        low_sync = point
+        while misplaced:
+            batches = defaultdict(list)
+            for item in misplaced:
+                container = get_reconciler_container_name(item['created_at'])
+                batches[container].append(translater(item))
+            for container, item_list in batches.items():
+                success = self.feed_reconciler(container, item_list)
+                if not success:
+                    errors = True
+            point = misplaced[-1]['ROWID']
+            if not errors:
+                low_sync = point
+            misplaced = broker.get_misplaced_since(point, self.per_diff)
+        return low_sync
+
+    def _post_replicate_hook(self, broker, info, responses):
+        if info['account'] == MISPLACED_OBJECTS_ACCOUNT:
+            return
+        if not broker.has_multiple_policies():
+            broker.update_reconciler_sync(info['max_row'])
+            return
+        point = broker.get_reconciler_sync()
+        max_sync = self.dump_to_reconciler(broker, point)
+        success = responses.count(True) >= quorum_size(len(responses))
+        if max_sync > point and success:
+            # to be safe, only slide up the sync point with a quorum on
+            # replication
+            broker.update_reconciler_sync(max_sync)
+
+    def delete_db(self, broker):
+        """
+        Ensure that reconciler databases are only cleaned up at the end of the
+        replication run.
+        """
+        if (self.reconciler_cleanups is not None and
+                broker.account == MISPLACED_OBJECTS_ACCOUNT):
+            # this container shouldn't be here, make sure it's cleaned up
+            self.reconciler_cleanups[broker.container] = broker
+            return
+        return super(ContainerReplicator, self).delete_db(broker)
+
+    def replicate_reconcilers(self):
+        """
+        Ensure any items merged to reconciler containers during replication
+        are pushed out to correct nodes and any reconciler containers that do
+        not belong on this node are removed.
+        """
+        self.logger.info('Replicating %d reconciler containers',
+                         len(self.reconciler_containers))
+        for part, reconciler, node_id in self.reconciler_containers.values():
+            self.cpool.spawn_n(
+                self._replicate_object, part, reconciler.db_file, node_id)
+        self.cpool.waitall()
+        # wipe out the cache do disable bypass in delete_db
+        cleanups = self.reconciler_cleanups
+        self.reconciler_cleanups = self.reconciler_containers = None
+        self.logger.info('Cleaning up %d reconciler containers',
+                         len(cleanups))
+        for reconciler in cleanups.values():
+            self.cpool.spawn_n(self.delete_db, reconciler)
+        self.cpool.waitall()
+        self.logger.info('Finished reconciler replication')
+
+    def run_once(self, *args, **kwargs):
+        self.reconciler_containers = {}
+        self.reconciler_cleanups = {}
+        rv = super(ContainerReplicator, self).run_once(*args, **kwargs)
+        if any([self.reconciler_containers, self.reconciler_cleanups]):
+            self.replicate_reconcilers()
+        return rv
+
 
 class ContainerReplicatorRpc(db_replicator.ReplicatorRpc):
 
diff --git a/test/probe/test_container_merge_policy_index.py b/test/probe/test_container_merge_policy_index.py
index da6b12315e..3f26f1dac1 100644
--- a/test/probe/test_container_merge_policy_index.py
+++ b/test/probe/test_container_merge_policy_index.py
@@ -12,8 +12,10 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
+from hashlib import md5
 import sys
 import itertools
+import time
 import unittest
 import uuid
 from optparse import OptionParser
@@ -23,8 +25,9 @@ import random
 from nose import SkipTest
 
 from swift.common.manager import Manager
-from swift.common.storage_policy import POLICIES
-from swift.common import utils, ring, direct_client
+from swift.common.internal_client import InternalClient
+from swift.common import utils, direct_client, ring
+from swift.common.storage_policy import POLICIES, POLICY_INDEX
 from swift.common.http import HTTP_NOT_FOUND
 from test.probe.common import reset_environment, get_to_final_state
 
@@ -200,23 +203,378 @@ class TestContainerMergePolicyIndex(unittest.TestCase):
             metadata = direct_client.direct_head_container(
                 node, container_part, self.account, self.container_name)
             head_responses.append((node, metadata))
-        found_policy_indexes = set(metadata['x-storage-policy-index'] for
+        found_policy_indexes = set(metadata[POLICY_INDEX] for
                                    node, metadata in head_responses)
         self.assert_(len(found_policy_indexes) > 1,
                      'primary nodes did not disagree about policy index %r' %
                      head_responses)
+        # find our object
+        orig_policy_index = None
+        for policy_index in found_policy_indexes:
+            object_ring = POLICIES.get_object_ring(policy_index, '/etc/swift')
+            part, nodes = object_ring.get_nodes(
+                self.account, self.container_name, self.object_name)
+            for node in nodes:
+                try:
+                    direct_client.direct_head_object(
+                        node, part, self.account, self.container_name,
+                        self.object_name, headers={POLICY_INDEX: policy_index})
+                except direct_client.ClientException as err:
+                    continue
+                orig_policy_index = policy_index
+                break
+            if orig_policy_index is not None:
+                break
+        else:
+            self.fail('Unable to find /%s/%s/%s in %r' % (
+                self.account, self.container_name, self.object_name,
+                found_policy_indexes))
         get_to_final_state()
+        Manager(['container-reconciler']).once()
+        # validate containers
         head_responses = []
         for node in container_nodes:
             metadata = direct_client.direct_head_container(
                 node, container_part, self.account, self.container_name)
             head_responses.append((node, metadata))
-        found_policy_indexes = set(metadata['x-storage-policy-index'] for
+        found_policy_indexes = set(metadata[POLICY_INDEX] for
                                    node, metadata in head_responses)
         self.assert_(len(found_policy_indexes) == 1,
                      'primary nodes disagree about policy index %r' %
                      head_responses)
 
+        expected_policy_index = found_policy_indexes.pop()
+        self.assertNotEqual(orig_policy_index, expected_policy_index)
+        # validate object placement
+        orig_policy_ring = POLICIES.get_object_ring(orig_policy_index,
+                                                    '/etc/swift')
+        for node in orig_policy_ring.devs:
+            try:
+                direct_client.direct_head_object(
+                    node, part, self.account, self.container_name,
+                    self.object_name, headers={
+                        POLICY_INDEX: orig_policy_index})
+            except direct_client.ClientException as err:
+                if err.http_status == HTTP_NOT_FOUND:
+                    continue
+                raise
+            else:
+                self.fail('Found /%s/%s/%s in %s' % (
+                    self.account, self.container_name, self.object_name,
+                    orig_policy_index))
+        # use proxy to access object (bad container info might be cached...)
+        timeout = time.time() + TIMEOUT
+        while time.time() < timeout:
+            try:
+                metadata = client.head_object(self.url, self.token,
+                                              self.container_name,
+                                              self.object_name)
+            except ClientException as err:
+                if err.http_status != HTTP_NOT_FOUND:
+                    raise
+                time.sleep(1)
+            else:
+                break
+        else:
+            self.fail('could not HEAD /%s/%s/%s/ from policy %s '
+                      'after %s seconds.' % (
+                          self.account, self.container_name, self.object_name,
+                          expected_policy_index, TIMEOUT))
+
+    def test_reconcile_delete(self):
+        # generic split brain
+        self.brain.stop_primary_half()
+        self.brain.put_container()
+        self.brain.put_object()
+        self.brain.start_primary_half()
+        self.brain.stop_handoff_half()
+        self.brain.put_container()
+        self.brain.delete_object()
+        self.brain.start_handoff_half()
+        # make sure we have some manner of split brain
+        container_part, container_nodes = self.container_ring.get_nodes(
+            self.account, self.container_name)
+        head_responses = []
+        for node in container_nodes:
+            metadata = direct_client.direct_head_container(
+                node, container_part, self.account, self.container_name)
+            head_responses.append((node, metadata))
+        found_policy_indexes = set(metadata[POLICY_INDEX] for
+                                   node, metadata in head_responses)
+        self.assert_(len(found_policy_indexes) > 1,
+                     'primary nodes did not disagree about policy index %r' %
+                     head_responses)
+        # find our object
+        orig_policy_index = ts_policy_index = None
+        for policy_index in found_policy_indexes:
+            object_ring = POLICIES.get_object_ring(policy_index, '/etc/swift')
+            part, nodes = object_ring.get_nodes(
+                self.account, self.container_name, self.object_name)
+            for node in nodes:
+                try:
+                    direct_client.direct_head_object(
+                        node, part, self.account, self.container_name,
+                        self.object_name, headers={POLICY_INDEX: policy_index})
+                except direct_client.ClientException as err:
+                    if 'x-backend-timestamp' in err.http_headers:
+                        ts_policy_index = policy_index
+                        break
+                else:
+                    orig_policy_index = policy_index
+                    break
+        if not orig_policy_index:
+            self.fail('Unable to find /%s/%s/%s in %r' % (
+                self.account, self.container_name, self.object_name,
+                found_policy_indexes))
+        if not ts_policy_index:
+            self.fail('Unable to find tombstone /%s/%s/%s in %r' % (
+                self.account, self.container_name, self.object_name,
+                found_policy_indexes))
+        get_to_final_state()
+        Manager(['container-reconciler']).once()
+        # validate containers
+        head_responses = []
+        for node in container_nodes:
+            metadata = direct_client.direct_head_container(
+                node, container_part, self.account, self.container_name)
+            head_responses.append((node, metadata))
+        new_found_policy_indexes = set(metadata[POLICY_INDEX] for node,
+                                       metadata in head_responses)
+        self.assert_(len(new_found_policy_indexes) == 1,
+                     'primary nodes disagree about policy index %r' %
+                     dict((node['port'], metadata[POLICY_INDEX])
+                          for node, metadata in head_responses))
+        expected_policy_index = new_found_policy_indexes.pop()
+        self.assertEqual(orig_policy_index, expected_policy_index)
+        # validate object fully deleted
+        for policy_index in found_policy_indexes:
+            object_ring = POLICIES.get_object_ring(policy_index, '/etc/swift')
+            part, nodes = object_ring.get_nodes(
+                self.account, self.container_name, self.object_name)
+            for node in nodes:
+                try:
+                    direct_client.direct_head_object(
+                        node, part, self.account, self.container_name,
+                        self.object_name, headers={POLICY_INDEX: policy_index})
+                except direct_client.ClientException as err:
+                    if err.http_status == HTTP_NOT_FOUND:
+                        continue
+                else:
+                    self.fail('Found /%s/%s/%s in %s on %s' % (
+                        self.account, self.container_name, self.object_name,
+                        orig_policy_index, node))
+
+    def test_reconcile_manifest(self):
+        manifest_data = []
+
+        def write_part(i):
+            body = 'VERIFY%0.2d' % i + '\x00' * 1048576
+            part_name = 'manifest_part_%0.2d' % i
+            manifest_entry = {
+                "path": "/%s/%s" % (self.container_name, part_name),
+                "etag": md5(body).hexdigest(),
+                "size_bytes": len(body),
+            }
+            client.put_object(self.url, self.token, self.container_name,
+                              part_name, contents=body)
+            manifest_data.append(manifest_entry)
+
+        # get an old container stashed
+        self.brain.stop_primary_half()
+        policy = random.choice(list(POLICIES))
+        self.brain.put_container(policy.idx)
+        self.brain.start_primary_half()
+        # write some parts
+        for i in range(10):
+            write_part(i)
+
+        self.brain.stop_handoff_half()
+        wrong_policy = random.choice([p for p in POLICIES if p is not policy])
+        self.brain.put_container(wrong_policy.idx)
+        # write some more parts
+        for i in range(10, 20):
+            write_part(i)
+
+        # write manifest
+        try:
+            client.put_object(self.url, self.token, self.container_name,
+                              self.object_name,
+                              contents=utils.json.dumps(manifest_data),
+                              query_string='multipart-manifest=put')
+        except ClientException as err:
+            # so as it works out, you can't really upload a multi-part
+            # manifest for objects that are currently misplaced - you have to
+            # wait until they're all available - which is about the same as
+            # some other failure that causes data to be unavailable to the
+            # proxy at the time of upload
+            self.assertEqual(err.http_status, 400)
+
+        # but what the heck, we'll sneak one in just to see what happens...
+        direct_manifest_name = self.object_name + '-direct-test'
+        object_ring = POLICIES.get_object_ring(wrong_policy.idx, '/etc/swift')
+        part, nodes = object_ring.get_nodes(
+            self.account, self.container_name, direct_manifest_name)
+        container_part = self.container_ring.get_part(self.account,
+                                                      self.container_name)
+
+        def translate_direct(data):
+            return {
+                'hash': data['etag'],
+                'bytes': data['size_bytes'],
+                'name': data['path'],
+            }
+        direct_manifest_data = map(translate_direct, manifest_data)
+        headers = {
+            'x-container-host': ','.join('%s:%s' % (n['ip'], n['port']) for n
+                                         in self.container_ring.devs),
+            'x-container-device': ','.join(n['device'] for n in
+                                           self.container_ring.devs),
+            'x-container-partition': container_part,
+            POLICY_INDEX: wrong_policy.idx,
+            'X-Static-Large-Object': 'True',
+        }
+        for node in nodes:
+            direct_client.direct_put_object(
+                node, part, self.account, self.container_name,
+                direct_manifest_name,
+                contents=utils.json.dumps(direct_manifest_data),
+                headers=headers)
+            break  # one should do it...
+
+        self.brain.start_handoff_half()
+        get_to_final_state()
+        Manager(['container-reconciler']).once()
+        # clear proxy cache
+        client.post_container(self.url, self.token, self.container_name, {})
+
+        # let's see how that direct upload worked out...
+        metadata, body = client.get_object(
+            self.url, self.token, self.container_name, direct_manifest_name,
+            query_string='multipart-manifest=get')
+        self.assertEqual(metadata['x-static-large-object'].lower(), 'true')
+        for i, entry in enumerate(utils.json.loads(body)):
+            for key in ('hash', 'bytes', 'name'):
+                self.assertEquals(entry[key], direct_manifest_data[i][key])
+        metadata, body = client.get_object(
+            self.url, self.token, self.container_name, direct_manifest_name)
+        self.assertEqual(metadata['x-static-large-object'].lower(), 'true')
+        self.assertEqual(int(metadata['content-length']),
+                         sum(part['size_bytes'] for part in manifest_data))
+        self.assertEqual(body, ''.join('VERIFY%0.2d' % i + '\x00' * 1048576
+                                       for i in range(20)))
+
+        # and regular upload should work now too
+        client.put_object(self.url, self.token, self.container_name,
+                          self.object_name,
+                          contents=utils.json.dumps(manifest_data),
+                          query_string='multipart-manifest=put')
+        metadata = client.head_object(self.url, self.token,
+                                      self.container_name,
+                                      self.object_name)
+        self.assertEqual(int(metadata['content-length']),
+                         sum(part['size_bytes'] for part in manifest_data))
+
+    def test_reconciler_move_object_twice(self):
+        # select some policies
+        old_policy = random.choice(list(POLICIES))
+        new_policy = random.choice([p for p in POLICIES if p != old_policy])
+
+        # setup a split brain
+        self.brain.stop_handoff_half()
+        # get old_policy on two primaries
+        self.brain.put_container(policy_index=int(old_policy))
+        self.brain.start_handoff_half()
+        self.brain.stop_primary_half()
+        # force a recreate on handoffs
+        self.brain.put_container(policy_index=int(old_policy))
+        self.brain.delete_container()
+        self.brain.put_container(policy_index=int(new_policy))
+        self.brain.put_object()  # populate memcache with new_policy
+        self.brain.start_primary_half()
+
+        # at this point two primaries have old policy
+        container_part, container_nodes = self.container_ring.get_nodes(
+            self.account, self.container_name)
+        head_responses = []
+        for node in container_nodes:
+            metadata = direct_client.direct_head_container(
+                node, container_part, self.account, self.container_name)
+            head_responses.append((node, metadata))
+        old_container_node_ids = [
+            node['id'] for node, metadata in head_responses
+            if int(old_policy) ==
+            int(metadata['X-Backend-Storage-Policy-Index'])]
+        self.assertEqual(2, len(old_container_node_ids))
+
+        # hopefully memcache still has the new policy cached
+        self.brain.put_object()
+        # double-check object correctly written to new policy
+        conf_files = []
+        for server in Manager(['container-reconciler']).servers:
+            conf_files.extend(server.conf_files())
+        conf_file = conf_files[0]
+        client = InternalClient(conf_file, 'probe-test', 3)
+        client.get_object_metadata(
+            self.account, self.container_name, self.object_name,
+            headers={'X-Backend-Storage-Policy-Index': int(new_policy)})
+        client.get_object_metadata(
+            self.account, self.container_name, self.object_name,
+            acceptable_statuses=(4,),
+            headers={'X-Backend-Storage-Policy-Index': int(old_policy)})
+
+        # shutdown the containers that know about the new policy
+        self.brain.stop_handoff_half()
+
+        # and get rows enqueued from old nodes
+        for server_type in ('container-replicator', 'container-updater'):
+            server = Manager([server_type])
+            tuple(server.once(number=n + 1) for n in old_container_node_ids)
+
+        # verify entry in the queue for the "misplaced" new_policy
+        for container in client.iter_containers('.misplaced_objects'):
+            for obj in client.iter_objects('.misplaced_objects',
+                                           container['name']):
+                expected = '%d:/%s/%s/%s' % (new_policy, self.account,
+                                             self.container_name,
+                                             self.object_name)
+                self.assertEqual(obj['name'], expected)
+
+        Manager(['container-reconciler']).once()
+
+        # verify object in old_policy
+        client.get_object_metadata(
+            self.account, self.container_name, self.object_name,
+            headers={'X-Backend-Storage-Policy-Index': int(old_policy)})
+
+        # verify object is *not* in new_policy
+        client.get_object_metadata(
+            self.account, self.container_name, self.object_name,
+            acceptable_statuses=(4,),
+            headers={'X-Backend-Storage-Policy-Index': int(new_policy)})
+
+        get_to_final_state()
+
+        # verify entry in the queue
+        client = InternalClient(conf_file, 'probe-test', 3)
+        for container in client.iter_containers('.misplaced_objects'):
+            for obj in client.iter_objects('.misplaced_objects',
+                                           container['name']):
+                expected = '%d:/%s/%s/%s' % (old_policy, self.account,
+                                             self.container_name,
+                                             self.object_name)
+                self.assertEqual(obj['name'], expected)
+
+        Manager(['container-reconciler']).once()
+
+        # and now it flops back
+        client.get_object_metadata(
+            self.account, self.container_name, self.object_name,
+            headers={'X-Backend-Storage-Policy-Index': int(new_policy)})
+        client.get_object_metadata(
+            self.account, self.container_name, self.object_name,
+            acceptable_statuses=(4,),
+            headers={'X-Backend-Storage-Policy-Index': int(old_policy)})
+
 
 def main():
     options, commands = parser.parse_args()
diff --git a/test/unit/common/test_db_replicator.py b/test/unit/common/test_db_replicator.py
index 181c44d496..67faa330d9 100644
--- a/test/unit/common/test_db_replicator.py
+++ b/test/unit/common/test_db_replicator.py
@@ -274,8 +274,8 @@ class TestDBReplicator(unittest.TestCase):
         self._patchers.append(patcher)
         return patched_thing
 
-    def stub_delete_db(self, object_file):
-        self.delete_db_calls.append(object_file)
+    def stub_delete_db(self, broker):
+        self.delete_db_calls.append('/path/to/file')
 
     def test_repl_connection(self):
         node = {'replication_ip': '127.0.0.1', 'replication_port': 80,
@@ -657,7 +657,8 @@ class TestDBReplicator(unittest.TestCase):
             self.assertTrue(os.path.exists(temp_file2.name))
             self.assertEqual(0, replicator.stats['remove'])
 
-            replicator.delete_db(temp_file.name)
+            temp_file.db_file = temp_file.name
+            replicator.delete_db(temp_file)
 
             self.assertTrue(os.path.exists(temp_dir))
             self.assertTrue(os.path.exists(temp_suf_dir))
@@ -669,7 +670,8 @@ class TestDBReplicator(unittest.TestCase):
                              replicator.logger.log_dict['increment'])
             self.assertEqual(1, replicator.stats['remove'])
 
-            replicator.delete_db(temp_file2.name)
+            temp_file2.db_file = temp_file2.name
+            replicator.delete_db(temp_file2)
 
             self.assertTrue(os.path.exists(temp_dir))
             self.assertFalse(os.path.exists(temp_suf_dir))
@@ -1343,6 +1345,19 @@ class TestReplicatorSync(unittest.TestCase):
             daemon.run_once()
         return daemon
 
+    def test_local_ids(self):
+        if self.datadir is None:
+            # base test case
+            return
+        for drive in ('sda', 'sdb', 'sdd'):
+            os.makedirs(os.path.join(self.root, drive, self.datadir))
+        for node in self._ring.devs:
+            daemon = self._run_once(node)
+            if node['device'] == 'sdc':
+                self.assertEqual(daemon._local_device_ids, set())
+            else:
+                self.assertEqual(daemon._local_device_ids,
+                                 set([node['id']]))
 
 if __name__ == '__main__':
     unittest.main()
diff --git a/test/unit/container/test_replicator.py b/test/unit/container/test_replicator.py
index cd88518c9a..a1888c4031 100644
--- a/test/unit/container/test_replicator.py
+++ b/test/unit/container/test_replicator.py
@@ -24,6 +24,8 @@ import sqlite3
 
 from swift.common import db_replicator
 from swift.container import replicator, backend, server
+from swift.container.reconciler import (
+    MISPLACED_OBJECTS_ACCOUNT, get_reconciler_container_name)
 from swift.common.utils import normalize_timestamp
 from swift.common.storage_policy import POLICIES
 
@@ -664,6 +666,194 @@ class TestReplicatorSync(test_db_replicator.TestReplicatorSync):
             remote_broker.update_put_timestamp(remote_recreate_timestamp)
             remote_broker.update_status_changed_at(remote_recreate_timestamp)
 
+    def test_sync_to_remote_with_misplaced(self):
+        ts = itertools.count(int(time.time()))
+        # create "local" broker
+        policy = random.choice(list(POLICIES))
+        broker = self._get_broker('a', 'c', node_index=0)
+        broker.initialize(normalize_timestamp(ts.next()),
+                          policy.idx)
+
+        # create "remote" broker
+        remote_policy = random.choice([p for p in POLICIES if p is not
+                                       policy])
+        remote_broker = self._get_broker('a', 'c', node_index=1)
+        remote_broker.initialize(normalize_timestamp(ts.next()),
+                                 remote_policy.idx)
+        # add misplaced row to remote_broker
+        remote_broker.put_object(
+            '/a/c/o', normalize_timestamp(ts.next()), 0, 'content-type',
+            'etag', storage_policy_index=remote_broker.storage_policy_index)
+        # since this row matches policy index or remote, it shows up in count
+        self.assertEqual(remote_broker.get_info()['object_count'], 1)
+        self.assertEqual([], remote_broker.get_misplaced_since(-1, 1))
+
+        #replicate
+        part, node = self._get_broker_part_node(broker)
+        daemon = self._run_once(node)
+        # since our local broker has no rows to push it logs as no_change
+        self.assertEqual(1, daemon.stats['no_change'])
+        self.assertEqual(0, broker.get_info()['object_count'])
+
+        # remote broker updates it's policy index; this makes the remote
+        # broker's object count change
+        info = remote_broker.get_info()
+        expectations = {
+            'object_count': 0,
+            'storage_policy_index': policy.idx,
+        }
+        for key, value in expectations.items():
+            self.assertEqual(info[key], value)
+        # but it also knows those objects are misplaced now
+        misplaced = remote_broker.get_misplaced_since(-1, 100)
+        self.assertEqual(len(misplaced), 1)
+
+        # we also pushed out to node 3 with rsync
+        self.assertEqual(1, daemon.stats['rsync'])
+        third_broker = self._get_broker('a', 'c', node_index=2)
+        info = third_broker.get_info()
+        for key, value in expectations.items():
+            self.assertEqual(info[key], value)
+
+    def test_misplaced_rows_replicate_and_enqueue(self):
+        ts = itertools.count(int(time.time()))
+        policy = random.choice(list(POLICIES))
+        broker = self._get_broker('a', 'c', node_index=0)
+        broker.initialize(normalize_timestamp(ts.next()),
+                          policy.idx)
+        remote_policy = random.choice([p for p in POLICIES if p is not
+                                       policy])
+        remote_broker = self._get_broker('a', 'c', node_index=1)
+        remote_broker.initialize(normalize_timestamp(ts.next()),
+                                 remote_policy.idx)
+
+        # add a misplaced row to *local* broker
+        obj_put_timestamp = normalize_timestamp(ts.next())
+        broker.put_object(
+            'o', obj_put_timestamp, 0, 'content-type',
+            'etag', storage_policy_index=remote_policy.idx)
+        misplaced = broker.get_misplaced_since(-1, 1)
+        self.assertEqual(len(misplaced), 1)
+        # since this row is misplaced it doesn't show up in count
+        self.assertEqual(broker.get_info()['object_count'], 0)
+
+        # replicate
+        part, node = self._get_broker_part_node(broker)
+        daemon = self._run_once(node)
+        # push to remote, and third node was missing (also maybe reconciler)
+        self.assert_(2 < daemon.stats['rsync'] <= 3)
+
+        # grab the rsynced instance of remote_broker
+        remote_broker = self._get_broker('a', 'c', node_index=1)
+
+        # remote has misplaced rows too now
+        misplaced = remote_broker.get_misplaced_since(-1, 1)
+        self.assertEqual(len(misplaced), 1)
+
+        # and the correct policy_index and object_count
+        info = remote_broker.get_info()
+        expectations = {
+            'object_count': 0,
+            'storage_policy_index': policy.idx,
+        }
+        for key, value in expectations.items():
+            self.assertEqual(info[key], value)
+
+        # and we should have also enqeued these rows in the reconciler
+        reconciler = daemon.get_reconciler_broker(misplaced[0]['created_at'])
+        # but it may not be on the same node as us anymore though...
+        reconciler = self._get_broker(reconciler.account,
+                                      reconciler.container, node_index=0)
+        self.assertEqual(reconciler.get_info()['object_count'], 1)
+        objects = reconciler.list_objects_iter(
+            1, '', None, None, None, None, storage_policy_index=0)
+        self.assertEqual(len(objects), 1)
+        expected = ('%s:/a/c/o' % remote_policy.idx, obj_put_timestamp, 0,
+                    'application/x-put', obj_put_timestamp)
+        self.assertEqual(objects[0], expected)
+
+        # having safely enqueued to the reconciler we can advance
+        # our sync pointer
+        self.assertEqual(broker.get_reconciler_sync(), 1)
+
+    def test_multiple_out_sync_reconciler_enqueue_normalize(self):
+        ts = itertools.count(int(time.time()))
+        policy = random.choice(list(POLICIES))
+        broker = self._get_broker('a', 'c', node_index=0)
+        broker.initialize(normalize_timestamp(ts.next()), policy.idx)
+        remote_policy = random.choice([p for p in POLICIES if p is not
+                                       policy])
+        remote_broker = self._get_broker('a', 'c', node_index=1)
+        remote_broker.initialize(normalize_timestamp(ts.next()),
+                                 remote_policy.idx)
+
+        # add some rows to brokers
+        for db in (broker, remote_broker):
+            for p in (policy, remote_policy):
+                db.put_object('o-%s' % p.name, normalize_timestamp(ts.next()),
+                              0, 'content-type', 'etag',
+                              storage_policy_index=p.idx)
+            db._commit_puts()
+
+        expected_policy_stats = {
+            policy.idx: {'object_count': 1, 'bytes_used': 0},
+            remote_policy.idx: {'object_count': 1, 'bytes_used': 0},
+        }
+        for db in (broker, remote_broker):
+            policy_stats = db.get_policy_stats()
+            self.assertEqual(policy_stats, expected_policy_stats)
+
+        # each db has 2 rows, 4 total
+        all_items = set()
+        for db in (broker, remote_broker):
+            items = db.get_items_since(-1, 4)
+            all_items.update(
+                (item['name'], item['created_at']) for item in items)
+        self.assertEqual(4, len(all_items))
+
+        # replicate both ways
+        part, node = self._get_broker_part_node(broker)
+        self._run_once(node)
+        part, node = self._get_broker_part_node(remote_broker)
+        self._run_once(node)
+
+        # only the latest timestamps should survive
+        most_recent_items = {}
+        for name, timestamp in all_items:
+            most_recent_items[name] = max(
+                timestamp, most_recent_items.get(name, -1))
+        self.assertEqual(2, len(most_recent_items))
+
+        for db in (broker, remote_broker):
+            items = db.get_items_since(-1, 4)
+            self.assertEqual(len(items), len(most_recent_items))
+            for item in items:
+                self.assertEqual(most_recent_items[item['name']],
+                                 item['created_at'])
+
+        # and the reconciler also collapses updates
+        reconciler_containers = set()
+        for item in all_items:
+            _name, timestamp = item
+            reconciler_containers.add(
+                get_reconciler_container_name(timestamp))
+
+        reconciler_items = set()
+        for reconciler_container in reconciler_containers:
+            for node_index in range(3):
+                reconciler = self._get_broker(MISPLACED_OBJECTS_ACCOUNT,
+                                              reconciler_container,
+                                              node_index=node_index)
+                items = reconciler.get_items_since(-1, 4)
+                reconciler_items.update(
+                    (item['name'], item['created_at']) for item in items)
+        # they can't *both* be in the wrong policy ;)
+        self.assertEqual(1, len(reconciler_items))
+        for reconciler_name, timestamp in reconciler_items:
+            _policy_index, path = reconciler_name.split(':', 1)
+            a, c, name = path.lstrip('/').split('/')
+            self.assertEqual(most_recent_items[name], timestamp)
+
 
 if __name__ == '__main__':
     unittest.main()