diff --git a/swift/container/reconciler.py b/swift/container/reconciler.py index 4a295da2d0..7a8b5fd1e9 100644 --- a/swift/container/reconciler.py +++ b/swift/container/reconciler.py @@ -159,8 +159,8 @@ def get_reconciler_content_type(op): def get_row_to_q_entry_translator(broker): - account = broker.account - container = broker.container + account = broker.root_account + container = broker.root_container op_type = { 0: get_reconciler_content_type('put'), 1: get_reconciler_content_type('delete'), diff --git a/test/unit/container/test_replicator.py b/test/unit/container/test_replicator.py index 7548ab2f56..98b43e6fcb 100644 --- a/test/unit/container/test_replicator.py +++ b/test/unit/container/test_replicator.py @@ -924,6 +924,91 @@ class TestReplicatorSync(test_db_replicator.TestReplicatorSync): # our sync pointer self.assertEqual(broker.get_reconciler_sync(), 2) + def test_misplaced_rows_replicate_and_enqueue_from_shard(self): + # force all timestamps to fall in same hour + ts = (Timestamp(t) for t in + itertools.count(int(time.time()) // 3600 * 3600)) + policy = random.choice(list(POLICIES)) + broker = self._get_broker('.shards_a', 'some-other-c', node_index=0) + broker.initialize(next(ts).internal, policy.idx) + broker.set_sharding_sysmeta('Root', 'a/c') + remote_policy = random.choice([p for p in POLICIES if p is not + policy]) + remote_broker = self._get_broker( + '.shards_a', 'some-other-c', node_index=1) + remote_broker.initialize(next(ts).internal, remote_policy.idx) + + # add a misplaced row to *local* broker + obj_put_timestamp = next(ts).internal + broker.put_object( + 'o', obj_put_timestamp, 0, 'content-type', + 'etag', storage_policy_index=remote_policy.idx) + misplaced = broker.get_misplaced_since(-1, 10) + self.assertEqual(len(misplaced), 1) + # since this row is misplaced it doesn't show up in count + self.assertEqual(broker.get_info()['object_count'], 0) + + # add another misplaced row to *local* broker with composite timestamp + ts_data = next(ts) + ts_ctype = next(ts) + ts_meta = next(ts) + broker.put_object( + 'o2', ts_data.internal, 0, 'content-type', + 'etag', storage_policy_index=remote_policy.idx, + ctype_timestamp=ts_ctype.internal, meta_timestamp=ts_meta.internal) + misplaced = broker.get_misplaced_since(-1, 10) + self.assertEqual(len(misplaced), 2) + # since this row is misplaced it doesn't show up in count + self.assertEqual(broker.get_info()['object_count'], 0) + + # replicate + part, node = self._get_broker_part_node(broker) + daemon = self._run_once(node) + # push to remote, and third node was missing (also maybe reconciler) + self.assertTrue(2 < daemon.stats['rsync'] <= 3, daemon.stats['rsync']) + + # grab the rsynced instance of remote_broker + remote_broker = self._get_broker( + '.shards_a', 'some-other-c', node_index=1) + + # remote has misplaced rows too now + misplaced = remote_broker.get_misplaced_since(-1, 10) + self.assertEqual(len(misplaced), 2) + + # and the correct policy_index and object_count + info = remote_broker.get_info() + expectations = { + 'object_count': 0, + 'storage_policy_index': policy.idx, + } + for key, value in expectations.items(): + self.assertEqual(info[key], value) + + # and we should have also enqueued these rows in a single reconciler, + # since we forced the object timestamps to be in the same hour. + reconciler = daemon.get_reconciler_broker(misplaced[0]['created_at']) + # but it may not be on the same node as us anymore though... + reconciler = self._get_broker(reconciler.account, + reconciler.container, node_index=0) + self.assertEqual(reconciler.get_info()['object_count'], 2) + objects = reconciler.list_objects_iter( + 10, '', None, None, None, None, storage_policy_index=0) + self.assertEqual(len(objects), 2) + # NB: reconciler work is for the *root* container! + expected = ('%s:/a/c/o' % remote_policy.idx, obj_put_timestamp, 0, + 'application/x-put', obj_put_timestamp) + self.assertEqual(objects[0], expected) + # the second object's listing has ts_meta as its last modified time + # but its full composite timestamp is in the hash field. + expected = ('%s:/a/c/o2' % remote_policy.idx, ts_meta.internal, 0, + 'application/x-put', + encode_timestamps(ts_data, ts_ctype, ts_meta)) + self.assertEqual(objects[1], expected) + + # having safely enqueued to the reconciler we can advance + # our sync pointer + self.assertEqual(broker.get_reconciler_sync(), 2) + def test_multiple_out_sync_reconciler_enqueue_normalize(self): ts = (Timestamp(t).internal for t in itertools.count(int(time.time())))