Fix time-shifting of objects PUT with container-sync
When container-sync PUTs an object to a destination container it uses the timestamp from the container row rather than the actual timestamp of the object being copied. The actual timestamp of the object can be newer, so the sync'd object may end up with the right content but at the wrong, older, timestamp. This patch changes the timestamp sent with the sync'd object to be that of the actual source object being sent. Drive-by fix to make code more readable by removing a variable rename mid-function, fix a typo and remove a redundant function call. Change-Id: I800e6de4cdeea289864414980a96f5929281da04 Closes-Bug: #1540884
This commit is contained in:
parent
a1776b9c1f
commit
f2fca9aafa
@ -170,7 +170,7 @@ class ContainerSync(Daemon):
|
||||
#: running wild on near empty systems.
|
||||
self.interval = int(conf.get('interval', 300))
|
||||
#: Maximum amount of time to spend syncing a container before moving on
|
||||
#: to the next one. If a conatiner sync hasn't finished in this time,
|
||||
#: to the next one. If a container sync hasn't finished in this time,
|
||||
#: it'll just be resumed next scan.
|
||||
self.container_time = int(conf.get('container_time', 60))
|
||||
#: ContainerSyncCluster instance for validating sync-to values.
|
||||
@ -463,27 +463,22 @@ class ContainerSync(Daemon):
|
||||
shuffle(nodes)
|
||||
exc = None
|
||||
looking_for_timestamp = Timestamp(row['created_at'])
|
||||
timestamp = -1
|
||||
headers = body = None
|
||||
# look up for the newest one
|
||||
headers_out = {'X-Newest': True,
|
||||
'X-Backend-Storage-Policy-Index':
|
||||
str(info['storage_policy_index'])}
|
||||
try:
|
||||
source_obj_status, source_obj_info, source_obj_iter = \
|
||||
source_obj_status, headers, body = \
|
||||
self.swift.get_object(info['account'],
|
||||
info['container'], row['name'],
|
||||
headers=headers_out,
|
||||
acceptable_statuses=(2, 4))
|
||||
|
||||
except (Exception, UnexpectedResponse, Timeout) as err:
|
||||
source_obj_info = {}
|
||||
source_obj_iter = None
|
||||
headers = {}
|
||||
body = None
|
||||
exc = err
|
||||
timestamp = Timestamp(source_obj_info.get(
|
||||
'x-timestamp', 0))
|
||||
headers = source_obj_info
|
||||
body = source_obj_iter
|
||||
timestamp = Timestamp(headers.get('x-timestamp', 0))
|
||||
if timestamp < looking_for_timestamp:
|
||||
if exc:
|
||||
raise exc
|
||||
@ -501,7 +496,6 @@ class ContainerSync(Daemon):
|
||||
if 'content-type' in headers:
|
||||
headers['content-type'] = clean_content_type(
|
||||
headers['content-type'])
|
||||
headers['x-timestamp'] = row['created_at']
|
||||
if realm and realm_key:
|
||||
nonce = uuid.uuid4().hex
|
||||
path = urlparse(sync_to).path + '/' + quote(row['name'])
|
||||
|
@ -22,6 +22,7 @@ from swiftclient import client, ClientException
|
||||
|
||||
from swift.common.http import HTTP_NOT_FOUND
|
||||
from swift.common.manager import Manager
|
||||
from test.probe.brain import BrainSplitter
|
||||
from test.probe.common import ReplProbeTest, ENABLED_POLICIES
|
||||
|
||||
|
||||
@ -149,5 +150,70 @@ class TestContainerSync(ReplProbeTest):
|
||||
dest_container, object_name)
|
||||
self.assertEqual(body, 'test-body')
|
||||
|
||||
def test_sync_with_stale_container_rows(self):
|
||||
source_container, dest_container = self._setup_synced_containers()
|
||||
brain = BrainSplitter(self.url, self.token, source_container,
|
||||
None, 'container')
|
||||
|
||||
# upload to source
|
||||
object_name = 'object-%s' % uuid.uuid4()
|
||||
client.put_object(self.url, self.token, source_container, object_name,
|
||||
'test-body')
|
||||
|
||||
# check source container listing
|
||||
_, listing = client.get_container(
|
||||
self.url, self.token, source_container)
|
||||
for expected_obj_dict in listing:
|
||||
if expected_obj_dict['name'] == object_name:
|
||||
break
|
||||
else:
|
||||
self.fail('Failed to find source object %r in container listing %r'
|
||||
% (object_name, listing))
|
||||
|
||||
# stop all container servers
|
||||
brain.stop_primary_half()
|
||||
brain.stop_handoff_half()
|
||||
|
||||
# upload new object content to source - container updates will fail
|
||||
client.put_object(self.url, self.token, source_container, object_name,
|
||||
'new-test-body')
|
||||
source_headers = client.head_object(
|
||||
self.url, self.token, source_container, object_name)
|
||||
|
||||
# start all container servers
|
||||
brain.start_primary_half()
|
||||
brain.start_handoff_half()
|
||||
|
||||
# sanity check: source container listing should not have changed
|
||||
_, listing = client.get_container(
|
||||
self.url, self.token, source_container)
|
||||
for actual_obj_dict in listing:
|
||||
if actual_obj_dict['name'] == object_name:
|
||||
self.assertDictEqual(expected_obj_dict, actual_obj_dict)
|
||||
break
|
||||
else:
|
||||
self.fail('Failed to find source object %r in container listing %r'
|
||||
% (object_name, listing))
|
||||
|
||||
# cycle container-sync - object should be correctly sync'd despite
|
||||
# stale info in container row
|
||||
Manager(['container-sync']).once()
|
||||
|
||||
# verify sync'd object has same content and headers
|
||||
dest_headers, body = client.get_object(self.url, self.token,
|
||||
dest_container, object_name)
|
||||
self.assertEqual(body, 'new-test-body')
|
||||
mismatched_headers = []
|
||||
for k in ('etag', 'content-length', 'content-type', 'x-timestamp',
|
||||
'last-modified'):
|
||||
if source_headers[k] == dest_headers[k]:
|
||||
continue
|
||||
mismatched_headers.append((k, source_headers[k], dest_headers[k]))
|
||||
if mismatched_headers:
|
||||
msg = '\n'.join([('Mismatched header %r, expected %r but got %r'
|
||||
% item) for item in mismatched_headers])
|
||||
self.fail(msg)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
||||
|
@ -885,6 +885,7 @@ class TestContainerSync(unittest.TestCase):
|
||||
self.assertEqual(logger, self.logger)
|
||||
|
||||
sync.put_object = fake_put_object
|
||||
expected_put_count = 0
|
||||
|
||||
with mock.patch('swift.container.sync.InternalClient'):
|
||||
cs = sync.ContainerSync({}, container_ring=FakeRing(),
|
||||
@ -909,7 +910,8 @@ class TestContainerSync(unittest.TestCase):
|
||||
'key', FakeContainerBroker('broker'),
|
||||
{'account': 'a', 'container': 'c', 'storage_policy_index': 0},
|
||||
realm, realm_key))
|
||||
self.assertEqual(cs.container_puts, 1)
|
||||
expected_put_count += 1
|
||||
self.assertEqual(cs.container_puts, expected_put_count)
|
||||
|
||||
def fake_get_object(acct, con, obj, headers, acceptable_statuses):
|
||||
self.assertEqual(headers['X-Newest'], True)
|
||||
@ -935,7 +937,21 @@ class TestContainerSync(unittest.TestCase):
|
||||
'key', FakeContainerBroker('broker'),
|
||||
{'account': 'a', 'container': 'c', 'storage_policy_index': 0},
|
||||
realm, realm_key))
|
||||
self.assertEqual(cs.container_puts, 2)
|
||||
expected_put_count += 1
|
||||
self.assertEqual(cs.container_puts, expected_put_count)
|
||||
|
||||
# Success as everything says it worked, also check that PUT
|
||||
# timestamp equals GET timestamp when it is newer than created_at
|
||||
# value.
|
||||
self.assertTrue(cs.container_sync_row(
|
||||
{'deleted': False,
|
||||
'name': 'object',
|
||||
'created_at': '1.1'}, 'http://sync/to/path',
|
||||
'key', FakeContainerBroker('broker'),
|
||||
{'account': 'a', 'container': 'c', 'storage_policy_index': 0},
|
||||
realm, realm_key))
|
||||
expected_put_count += 1
|
||||
self.assertEqual(cs.container_puts, expected_put_count)
|
||||
|
||||
exc = []
|
||||
|
||||
@ -955,7 +971,7 @@ class TestContainerSync(unittest.TestCase):
|
||||
'key', FakeContainerBroker('broker'),
|
||||
{'account': 'a', 'container': 'c', 'storage_policy_index': 0},
|
||||
realm, realm_key))
|
||||
self.assertEqual(cs.container_puts, 2)
|
||||
self.assertEqual(cs.container_puts, expected_put_count)
|
||||
self.assertEqual(len(exc), 1)
|
||||
self.assertEqual(str(exc[-1]), 'test exception')
|
||||
|
||||
@ -978,7 +994,7 @@ class TestContainerSync(unittest.TestCase):
|
||||
'key', FakeContainerBroker('broker'),
|
||||
{'account': 'a', 'container': 'c', 'storage_policy_index': 0},
|
||||
realm, realm_key))
|
||||
self.assertEqual(cs.container_puts, 2)
|
||||
self.assertEqual(cs.container_puts, expected_put_count)
|
||||
self.assertEqual(len(exc), 1)
|
||||
self.assertEqual(str(exc[-1]), 'test client exception')
|
||||
|
||||
@ -1003,7 +1019,7 @@ class TestContainerSync(unittest.TestCase):
|
||||
'key', FakeContainerBroker('broker'),
|
||||
{'account': 'a', 'container': 'c', 'storage_policy_index': 0},
|
||||
realm, realm_key))
|
||||
self.assertEqual(cs.container_puts, 2)
|
||||
self.assertEqual(cs.container_puts, expected_put_count)
|
||||
self.assertLogMessage('info', 'Unauth')
|
||||
|
||||
def fake_put_object(*args, **kwargs):
|
||||
@ -1018,7 +1034,7 @@ class TestContainerSync(unittest.TestCase):
|
||||
'key', FakeContainerBroker('broker'),
|
||||
{'account': 'a', 'container': 'c', 'storage_policy_index': 0},
|
||||
realm, realm_key))
|
||||
self.assertEqual(cs.container_puts, 2)
|
||||
self.assertEqual(cs.container_puts, expected_put_count)
|
||||
self.assertLogMessage('info', 'Not found', 1)
|
||||
|
||||
def fake_put_object(*args, **kwargs):
|
||||
@ -1033,7 +1049,7 @@ class TestContainerSync(unittest.TestCase):
|
||||
'key', FakeContainerBroker('broker'),
|
||||
{'account': 'a', 'container': 'c', 'storage_policy_index': 0},
|
||||
realm, realm_key))
|
||||
self.assertEqual(cs.container_puts, 2)
|
||||
self.assertEqual(cs.container_puts, expected_put_count)
|
||||
self.assertLogMessage('error', 'ERROR Syncing')
|
||||
finally:
|
||||
sync.uuid = orig_uuid
|
||||
|
Loading…
x
Reference in New Issue
Block a user