
Consider two replicas of the same object whose ondisk files have diverged due to failures: A has t2.ts B has t1.data, t4.meta (The DELETE at t2 did not make it to B. The POST at t4 was rejected by A.) After ssync replication the two ondisk file sets will not be consistent: A has t2.ts (ssync cannot POST t4.meta to this node) B has t2.ts, t4.meta (ssync should not delete t4.meta, there may be a t3.data somewhere) Consequenty the two nodes will report different hashes for the object's suffix, and replication will repeat, always with the inconsistent outcome. This scenario is reproduced by the probe test added in this patch. (Note that rsync replication does result in (t2.ts, t4.meta) on both nodes.) The solution is to change the way that suffix hashes are calculated. Currently the names of *all* files found in each object dir are added to the hash. With this patch the timestamps of only those files that could be used to construct a valid diskfile are added to the hash. File extensions are appended to the timestamp so that in most 'normal' situations the result of the hashing is the same as before this patch. That avoids a storm of hash mismatches when this patch is deployed in an existing cluster. In the problem case described above, t4.meta is no longer added to the hash, since it is not useful for constructing a diskfile. (Note that t4.meta is not deleted because it may become useful should a t3.data be replicated in future). Closes-Bug: 1534276 Change-Id: I99e88b8d5f5d9bc22b42112a99634ba942415e05
487 lines
19 KiB
Python
487 lines
19 KiB
Python
#!/usr/bin/python -u
|
|
# Copyright (c) 2010-2012 OpenStack Foundation
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
# you may not use this file except in compliance with the License.
|
|
# You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
|
# implied.
|
|
# See the License for the specific language governing permissions and
|
|
# limitations under the License.
|
|
from io import StringIO
|
|
from tempfile import mkdtemp
|
|
from textwrap import dedent
|
|
import unittest
|
|
|
|
import os
|
|
import shutil
|
|
import uuid
|
|
|
|
from swift.common.direct_client import direct_get_suffix_hashes
|
|
from swift.common.exceptions import DiskFileDeleted
|
|
from swift.common.internal_client import UnexpectedResponse
|
|
from swift.container.backend import ContainerBroker
|
|
from swift.common import internal_client, utils
|
|
from swiftclient import client
|
|
from swift.common.ring import Ring
|
|
from swift.common.utils import Timestamp, get_logger, hash_path
|
|
from swift.obj.diskfile import DiskFileManager
|
|
from swift.common.storage_policy import POLICIES
|
|
|
|
from test.probe.brain import BrainSplitter
|
|
from test.probe.common import ReplProbeTest
|
|
|
|
|
|
class Test(ReplProbeTest):
|
|
def setUp(self):
|
|
"""
|
|
Reset all environment and start all servers.
|
|
"""
|
|
super(Test, self).setUp()
|
|
self.container_name = 'container-%s' % uuid.uuid4()
|
|
self.object_name = 'object-%s' % uuid.uuid4()
|
|
self.brain = BrainSplitter(self.url, self.token, self.container_name,
|
|
self.object_name, 'object',
|
|
policy=self.policy)
|
|
self.tempdir = mkdtemp()
|
|
conf_path = os.path.join(self.tempdir, 'internal_client.conf')
|
|
conf_body = """
|
|
[DEFAULT]
|
|
swift_dir = /etc/swift
|
|
|
|
[pipeline:main]
|
|
pipeline = catch_errors cache proxy-server
|
|
|
|
[app:proxy-server]
|
|
use = egg:swift#proxy
|
|
object_post_as_copy = false
|
|
|
|
[filter:cache]
|
|
use = egg:swift#memcache
|
|
|
|
[filter:catch_errors]
|
|
use = egg:swift#catch_errors
|
|
"""
|
|
with open(conf_path, 'w') as f:
|
|
f.write(dedent(conf_body))
|
|
self.int_client = internal_client.InternalClient(conf_path, 'test', 1)
|
|
|
|
def tearDown(self):
|
|
super(Test, self).tearDown()
|
|
shutil.rmtree(self.tempdir)
|
|
|
|
def _get_object_info(self, account, container, obj, number,
|
|
policy=None):
|
|
obj_conf = self.configs['object-server']
|
|
config_path = obj_conf[number]
|
|
options = utils.readconf(config_path, 'app:object-server')
|
|
swift_dir = options.get('swift_dir', '/etc/swift')
|
|
ring = POLICIES.get_object_ring(policy, swift_dir)
|
|
part, nodes = ring.get_nodes(account, container, obj)
|
|
for node in nodes:
|
|
# assumes one to one mapping
|
|
if node['port'] == int(options.get('bind_port')):
|
|
device = node['device']
|
|
break
|
|
else:
|
|
return None
|
|
mgr = DiskFileManager(options, get_logger(options))
|
|
disk_file = mgr.get_diskfile(device, part, account, container, obj,
|
|
policy)
|
|
info = disk_file.read_metadata()
|
|
return info
|
|
|
|
def _assert_consistent_object_metadata(self):
|
|
obj_info = []
|
|
for i in range(1, 5):
|
|
info_i = self._get_object_info(self.account, self.container_name,
|
|
self.object_name, i)
|
|
if info_i:
|
|
obj_info.append(info_i)
|
|
self.assertTrue(len(obj_info) > 1)
|
|
for other in obj_info[1:]:
|
|
self.assertEqual(obj_info[0], other,
|
|
'Object metadata mismatch: %s != %s'
|
|
% (obj_info[0], other))
|
|
|
|
def _assert_consistent_deleted_object(self):
|
|
for i in range(1, 5):
|
|
try:
|
|
info = self._get_object_info(self.account, self.container_name,
|
|
self.object_name, i)
|
|
if info is not None:
|
|
self.fail('Expected no disk file info but found %s' % info)
|
|
except DiskFileDeleted:
|
|
pass
|
|
|
|
def _get_db_info(self, account, container, number):
|
|
server_type = 'container'
|
|
obj_conf = self.configs['%s-server' % server_type]
|
|
config_path = obj_conf[number]
|
|
options = utils.readconf(config_path, 'app:container-server')
|
|
root = options.get('devices')
|
|
|
|
swift_dir = options.get('swift_dir', '/etc/swift')
|
|
ring = Ring(swift_dir, ring_name=server_type)
|
|
part, nodes = ring.get_nodes(account, container)
|
|
for node in nodes:
|
|
# assumes one to one mapping
|
|
if node['port'] == int(options.get('bind_port')):
|
|
device = node['device']
|
|
break
|
|
else:
|
|
return None
|
|
|
|
path_hash = utils.hash_path(account, container)
|
|
_dir = utils.storage_directory('%ss' % server_type, part, path_hash)
|
|
db_dir = os.path.join(root, device, _dir)
|
|
db_file = os.path.join(db_dir, '%s.db' % path_hash)
|
|
db = ContainerBroker(db_file)
|
|
return db.get_info()
|
|
|
|
def _assert_consistent_container_dbs(self):
|
|
db_info = []
|
|
for i in range(1, 5):
|
|
info_i = self._get_db_info(self.account, self.container_name, i)
|
|
if info_i:
|
|
db_info.append(info_i)
|
|
self.assertTrue(len(db_info) > 1)
|
|
for other in db_info[1:]:
|
|
self.assertEqual(db_info[0]['hash'], other['hash'],
|
|
'Container db hash mismatch: %s != %s'
|
|
% (db_info[0]['hash'], other['hash']))
|
|
|
|
def _assert_object_metadata_matches_listing(self, listing, metadata):
|
|
self.assertEqual(listing['bytes'], int(metadata['content-length']))
|
|
self.assertEqual(listing['hash'], metadata['etag'])
|
|
self.assertEqual(listing['content_type'], metadata['content-type'])
|
|
modified = Timestamp(metadata['x-timestamp']).isoformat
|
|
self.assertEqual(listing['last_modified'], modified)
|
|
|
|
def _put_object(self, headers=None, body=u'stuff'):
|
|
headers = headers or {}
|
|
self.int_client.upload_object(StringIO(body), self.account,
|
|
self.container_name,
|
|
self.object_name, headers)
|
|
|
|
def _post_object(self, headers):
|
|
self.int_client.set_object_metadata(self.account, self.container_name,
|
|
self.object_name, headers)
|
|
|
|
def _delete_object(self):
|
|
self.int_client.delete_object(self.account, self.container_name,
|
|
self.object_name)
|
|
|
|
def _get_object(self, headers=None, expect_statuses=(2,)):
|
|
return self.int_client.get_object(self.account,
|
|
self.container_name,
|
|
self.object_name,
|
|
headers,
|
|
acceptable_statuses=expect_statuses)
|
|
|
|
def _get_object_metadata(self):
|
|
return self.int_client.get_object_metadata(self.account,
|
|
self.container_name,
|
|
self.object_name)
|
|
|
|
def _assert_consistent_suffix_hashes(self):
|
|
opart, onodes = self.object_ring.get_nodes(
|
|
self.account, self.container_name, self.object_name)
|
|
name_hash = hash_path(
|
|
self.account, self.container_name, self.object_name)
|
|
results = []
|
|
for node in onodes:
|
|
results.append(
|
|
(node,
|
|
direct_get_suffix_hashes(node, opart, [name_hash[-3:]])))
|
|
for (node, hashes) in results[1:]:
|
|
self.assertEqual(results[0][1], hashes,
|
|
'Inconsistent suffix hashes found: %s' % results)
|
|
|
|
def test_object_delete_is_replicated(self):
|
|
self.brain.put_container(policy_index=int(self.policy))
|
|
# put object
|
|
self._put_object()
|
|
|
|
# put newer object with sysmeta to first server subset
|
|
self.brain.stop_primary_half()
|
|
self._put_object()
|
|
self.brain.start_primary_half()
|
|
|
|
# delete object on second server subset
|
|
self.brain.stop_handoff_half()
|
|
self._delete_object()
|
|
self.brain.start_handoff_half()
|
|
|
|
# run replicator
|
|
self.get_to_final_state()
|
|
|
|
# check object deletion has been replicated on first server set
|
|
self.brain.stop_primary_half()
|
|
self._get_object(expect_statuses=(4,))
|
|
self.brain.start_primary_half()
|
|
|
|
# check object deletion persists on second server set
|
|
self.brain.stop_handoff_half()
|
|
self._get_object(expect_statuses=(4,))
|
|
|
|
# put newer object to second server set
|
|
self._put_object()
|
|
self.brain.start_handoff_half()
|
|
|
|
# run replicator
|
|
self.get_to_final_state()
|
|
|
|
# check new object has been replicated on first server set
|
|
self.brain.stop_primary_half()
|
|
self._get_object()
|
|
self.brain.start_primary_half()
|
|
|
|
# check new object persists on second server set
|
|
self.brain.stop_handoff_half()
|
|
self._get_object()
|
|
|
|
def test_object_after_replication_with_subsequent_post(self):
|
|
self.brain.put_container(policy_index=0)
|
|
|
|
# put object
|
|
self._put_object(headers={'Content-Type': 'foo'}, body=u'older')
|
|
|
|
# put newer object to first server subset
|
|
self.brain.stop_primary_half()
|
|
self._put_object(headers={'Content-Type': 'bar'}, body=u'newer')
|
|
metadata = self._get_object_metadata()
|
|
etag = metadata['etag']
|
|
self.brain.start_primary_half()
|
|
|
|
# post some user meta to all servers
|
|
self._post_object({'x-object-meta-bar': 'meta-bar'})
|
|
|
|
# run replicator
|
|
self.get_to_final_state()
|
|
|
|
# check that newer data has been replicated to second server subset
|
|
self.brain.stop_handoff_half()
|
|
metadata = self._get_object_metadata()
|
|
self.assertEqual(etag, metadata['etag'])
|
|
self.assertEqual('bar', metadata['content-type'])
|
|
self.assertEqual('meta-bar', metadata['x-object-meta-bar'])
|
|
self.brain.start_handoff_half()
|
|
|
|
self._assert_consistent_object_metadata()
|
|
self._assert_consistent_container_dbs()
|
|
|
|
def test_sysmeta_after_replication_with_subsequent_put(self):
|
|
sysmeta = {'x-object-sysmeta-foo': 'older'}
|
|
sysmeta2 = {'x-object-sysmeta-foo': 'newer'}
|
|
usermeta = {'x-object-meta-bar': 'meta-bar'}
|
|
self.brain.put_container(policy_index=0)
|
|
|
|
# put object with sysmeta to first server subset
|
|
self.brain.stop_primary_half()
|
|
self._put_object(headers=sysmeta)
|
|
metadata = self._get_object_metadata()
|
|
for key in sysmeta:
|
|
self.assertTrue(key in metadata)
|
|
self.assertEqual(metadata[key], sysmeta[key])
|
|
self.brain.start_primary_half()
|
|
|
|
# put object with updated sysmeta to second server subset
|
|
self.brain.stop_handoff_half()
|
|
self._put_object(headers=sysmeta2)
|
|
metadata = self._get_object_metadata()
|
|
for key in sysmeta2:
|
|
self.assertTrue(key in metadata)
|
|
self.assertEqual(metadata[key], sysmeta2[key])
|
|
self._post_object(usermeta)
|
|
metadata = self._get_object_metadata()
|
|
for key in usermeta:
|
|
self.assertTrue(key in metadata)
|
|
self.assertEqual(metadata[key], usermeta[key])
|
|
for key in sysmeta2:
|
|
self.assertTrue(key in metadata)
|
|
self.assertEqual(metadata[key], sysmeta2[key])
|
|
|
|
self.brain.start_handoff_half()
|
|
|
|
# run replicator
|
|
self.get_to_final_state()
|
|
|
|
# check sysmeta has been replicated to first server subset
|
|
self.brain.stop_primary_half()
|
|
metadata = self._get_object_metadata()
|
|
for key in usermeta:
|
|
self.assertTrue(key in metadata)
|
|
self.assertEqual(metadata[key], usermeta[key])
|
|
for key in sysmeta2.keys():
|
|
self.assertTrue(key in metadata, key)
|
|
self.assertEqual(metadata[key], sysmeta2[key])
|
|
self.brain.start_primary_half()
|
|
|
|
# check user sysmeta ok on second server subset
|
|
self.brain.stop_handoff_half()
|
|
metadata = self._get_object_metadata()
|
|
for key in usermeta:
|
|
self.assertTrue(key in metadata)
|
|
self.assertEqual(metadata[key], usermeta[key])
|
|
for key in sysmeta2.keys():
|
|
self.assertTrue(key in metadata, key)
|
|
self.assertEqual(metadata[key], sysmeta2[key])
|
|
|
|
self._assert_consistent_object_metadata()
|
|
self._assert_consistent_container_dbs()
|
|
|
|
def test_sysmeta_after_replication_with_subsequent_post(self):
|
|
sysmeta = {'x-object-sysmeta-foo': 'sysmeta-foo'}
|
|
usermeta = {'x-object-meta-bar': 'meta-bar'}
|
|
self.brain.put_container(policy_index=int(self.policy))
|
|
# put object
|
|
self._put_object()
|
|
# put newer object with sysmeta to first server subset
|
|
self.brain.stop_primary_half()
|
|
self._put_object(headers=sysmeta)
|
|
metadata = self._get_object_metadata()
|
|
for key in sysmeta:
|
|
self.assertTrue(key in metadata)
|
|
self.assertEqual(metadata[key], sysmeta[key])
|
|
self.brain.start_primary_half()
|
|
|
|
# post some user meta to second server subset
|
|
self.brain.stop_handoff_half()
|
|
self._post_object(usermeta)
|
|
metadata = self._get_object_metadata()
|
|
for key in usermeta:
|
|
self.assertTrue(key in metadata)
|
|
self.assertEqual(metadata[key], usermeta[key])
|
|
for key in sysmeta:
|
|
self.assertFalse(key in metadata)
|
|
self.brain.start_handoff_half()
|
|
|
|
# run replicator
|
|
self.get_to_final_state()
|
|
|
|
# check user metadata has been replicated to first server subset
|
|
# and sysmeta is unchanged
|
|
self.brain.stop_primary_half()
|
|
metadata = self._get_object_metadata()
|
|
expected = dict(sysmeta)
|
|
expected.update(usermeta)
|
|
for key in expected.keys():
|
|
self.assertTrue(key in metadata, key)
|
|
self.assertEqual(metadata[key], expected[key])
|
|
self.brain.start_primary_half()
|
|
|
|
# check user metadata and sysmeta both on second server subset
|
|
self.brain.stop_handoff_half()
|
|
metadata = self._get_object_metadata()
|
|
for key in expected.keys():
|
|
self.assertTrue(key in metadata, key)
|
|
self.assertEqual(metadata[key], expected[key])
|
|
self._assert_consistent_object_metadata()
|
|
self._assert_consistent_container_dbs()
|
|
|
|
def test_sysmeta_after_replication_with_prior_post(self):
|
|
sysmeta = {'x-object-sysmeta-foo': 'sysmeta-foo'}
|
|
usermeta = {'x-object-meta-bar': 'meta-bar'}
|
|
self.brain.put_container(policy_index=int(self.policy))
|
|
# put object
|
|
self._put_object()
|
|
|
|
# put user meta to first server subset
|
|
self.brain.stop_handoff_half()
|
|
self._post_object(headers=usermeta)
|
|
metadata = self._get_object_metadata()
|
|
for key in usermeta:
|
|
self.assertTrue(key in metadata)
|
|
self.assertEqual(metadata[key], usermeta[key])
|
|
self.brain.start_handoff_half()
|
|
|
|
# put newer object with sysmeta to second server subset
|
|
self.brain.stop_primary_half()
|
|
self._put_object(headers=sysmeta)
|
|
metadata = self._get_object_metadata()
|
|
for key in sysmeta:
|
|
self.assertTrue(key in metadata)
|
|
self.assertEqual(metadata[key], sysmeta[key])
|
|
self.brain.start_primary_half()
|
|
|
|
# run replicator
|
|
self.get_to_final_state()
|
|
|
|
# check stale user metadata is not replicated to first server subset
|
|
# and sysmeta is unchanged
|
|
self.brain.stop_primary_half()
|
|
metadata = self._get_object_metadata()
|
|
for key in sysmeta:
|
|
self.assertTrue(key in metadata)
|
|
self.assertEqual(metadata[key], sysmeta[key])
|
|
for key in usermeta:
|
|
self.assertFalse(key in metadata)
|
|
self.brain.start_primary_half()
|
|
|
|
# check stale user metadata is removed from second server subset
|
|
# and sysmeta is replicated
|
|
self.brain.stop_handoff_half()
|
|
metadata = self._get_object_metadata()
|
|
for key in sysmeta:
|
|
self.assertTrue(key in metadata)
|
|
self.assertEqual(metadata[key], sysmeta[key])
|
|
for key in usermeta:
|
|
self.assertFalse(key in metadata)
|
|
self._assert_consistent_object_metadata()
|
|
self._assert_consistent_container_dbs()
|
|
|
|
def test_post_trumped_by_prior_delete(self):
|
|
# new metadata and content-type posted to subset of nodes should not
|
|
# cause object to persist after replication of an earlier delete on
|
|
# other nodes.
|
|
self.brain.put_container(policy_index=0)
|
|
# incomplete put
|
|
self.brain.stop_primary_half()
|
|
self._put_object(headers={'Content-Type': 'oldest',
|
|
'X-Object-Sysmeta-Test': 'oldest',
|
|
'X-Object-Meta-Test': 'oldest'})
|
|
self.brain.start_primary_half()
|
|
|
|
# incomplete put then delete
|
|
self.brain.stop_handoff_half()
|
|
self._put_object(headers={'Content-Type': 'oldest',
|
|
'X-Object-Sysmeta-Test': 'oldest',
|
|
'X-Object-Meta-Test': 'oldest'})
|
|
self._delete_object()
|
|
self.brain.start_handoff_half()
|
|
|
|
# handoff post
|
|
self.brain.stop_primary_half()
|
|
self._post_object(headers={'Content-Type': 'newest',
|
|
'X-Object-Sysmeta-Test': 'ignored',
|
|
'X-Object-Meta-Test': 'newest'})
|
|
|
|
# check object metadata
|
|
metadata = self._get_object_metadata()
|
|
self.assertEqual(metadata['x-object-sysmeta-test'], 'oldest')
|
|
self.assertEqual(metadata['x-object-meta-test'], 'newest')
|
|
self.assertEqual(metadata['content-type'], 'oldest')
|
|
|
|
self.brain.start_primary_half()
|
|
|
|
# delete trumps later post
|
|
self.get_to_final_state()
|
|
|
|
# check object is now deleted
|
|
self.assertRaises(UnexpectedResponse, self._get_object_metadata)
|
|
container_metadata, objs = client.get_container(self.url, self.token,
|
|
self.container_name)
|
|
self.assertEqual(0, len(objs))
|
|
self._assert_consistent_container_dbs()
|
|
self._assert_consistent_deleted_object()
|
|
self._assert_consistent_suffix_hashes()
|
|
|
|
if __name__ == "__main__":
|
|
unittest.main()
|