ssync: fix decoding of ts_meta when ts_data has offset
The SsyncSender encodes object file timestamps in a compact form and the SsyncReceiver decodes the timestamps and compares them to its object file set. The encoding represents the meta file timestamp as a delta from the data file timestamp, NOT INCLUDING the data file timestamp offset. Previously, the decoding was erroneously calculating the meta file timestamp as the sum of the delta plus the data file timestamp INCLUDING the offset. For example, if the SssyncSender has object file timestamps: ts_data = t0_1.data ts_meta = t1.data then the receiver would erroneously perceive that the sender has: ts_data = t0_1.data ts_meta = t1_1.data As described in the referenced bug report, this erroneous decoding could cause the SsyncReceiver to request that the SsyncSender sync an object that is already in sync, which results in a 409 Conflict at the receiver. The 409 causes the ssync session to terminate, and the same process repeats on the next attempt. Closes-Bug: #2007643 Co-Authored-By: Clay Gerrard <clay.gerrard@gmail.com> Change-Id: I74a0aac0ac29577026743f87f4b654d85e8fcc80
This commit is contained in:
parent
8778b2c132
commit
2fe18b24cd
@ -45,8 +45,8 @@ def decode_missing(line):
|
||||
parts = line.decode('ascii').split()
|
||||
result['object_hash'] = urllib.parse.unquote(parts[0])
|
||||
t_data = urllib.parse.unquote(parts[1])
|
||||
result['ts_data'] = Timestamp(t_data)
|
||||
result['ts_meta'] = result['ts_ctype'] = result['ts_data']
|
||||
result['ts_data'] = ts_data = Timestamp(t_data)
|
||||
result['ts_meta'] = result['ts_ctype'] = ts_data
|
||||
result['durable'] = True # default to True in case this key isn't sent
|
||||
if len(parts) > 2:
|
||||
# allow for a comma separated list of k:v pairs to future-proof
|
||||
@ -54,9 +54,13 @@ def decode_missing(line):
|
||||
for item in [subpart for subpart in subparts if ':' in subpart]:
|
||||
k, v = item.split(':')
|
||||
if k == 'm':
|
||||
result['ts_meta'] = Timestamp(t_data, delta=int(v, 16))
|
||||
# ignore ts_data offset when calculating ts_meta
|
||||
result['ts_meta'] = Timestamp(ts_data.normal,
|
||||
delta=int(v, 16))
|
||||
elif k == 't':
|
||||
result['ts_ctype'] = Timestamp(t_data, delta=int(v, 16))
|
||||
# ignore ts_data offset when calculating ts_ctype
|
||||
result['ts_ctype'] = Timestamp(ts_data.normal,
|
||||
delta=int(v, 16))
|
||||
elif k == 'durable':
|
||||
result['durable'] = utils.config_true_value(v)
|
||||
return result
|
||||
|
@ -15,12 +15,15 @@
|
||||
# limitations under the License.
|
||||
|
||||
from unittest import main
|
||||
import random
|
||||
|
||||
from swiftclient import client
|
||||
|
||||
from swift.common import direct_client
|
||||
from swift.common.request_helpers import get_reserved_name
|
||||
from swift.obj import reconstructor
|
||||
|
||||
from test.probe.common import ReplProbeTest
|
||||
from test.probe.common import ReplProbeTest, ECProbeTest
|
||||
|
||||
|
||||
class TestObjectVersioning(ReplProbeTest):
|
||||
@ -229,5 +232,99 @@ class TestObjectVersioning(ReplProbeTest):
|
||||
self.assertEqual(data, b'new version')
|
||||
|
||||
|
||||
class TestECObjectVersioning(ECProbeTest):
|
||||
|
||||
def setUp(self):
|
||||
super(TestECObjectVersioning, self).setUp()
|
||||
self.part, self.nodes = self.object_ring.get_nodes(
|
||||
self.account, self.container_name, self.object_name)
|
||||
|
||||
def test_versioning_with_metadata_replication(self):
|
||||
# Enable versioning
|
||||
client.put_container(self.url, self.token, self.container_name,
|
||||
headers={
|
||||
'X-Storage-Policy': self.policy.name,
|
||||
'X-Versions-Enabled': 'True',
|
||||
})
|
||||
# create version with metadata in a handoff location
|
||||
failed_primary = random.choice(self.nodes)
|
||||
failed_primary_device_path = self.device_dir(failed_primary)
|
||||
self.kill_drive(failed_primary_device_path)
|
||||
headers = {'x-object-meta-foo': 'meta-foo'}
|
||||
client.put_object(self.url, self.token, self.container_name,
|
||||
self.object_name, contents='some data',
|
||||
headers=headers)
|
||||
headers_post = {'x-object-meta-bar': 'meta-bar'}
|
||||
client.post_object(self.url, self.token, self.container_name,
|
||||
self.object_name, headers=headers_post)
|
||||
# find the handoff
|
||||
primary_ids = [n['id'] for n in self.nodes]
|
||||
for handoff in self.object_ring.devs:
|
||||
if handoff['id'] in primary_ids:
|
||||
continue
|
||||
try:
|
||||
headers, etag = self.direct_get(handoff, self.part)
|
||||
except direct_client.DirectClientException as err:
|
||||
if err.http_status != 404:
|
||||
raise
|
||||
else:
|
||||
break
|
||||
else:
|
||||
self.fail('unable to find object on handoffs')
|
||||
# we want to repair the fault, but avoid doing the handoff revert
|
||||
self.revive_drive(failed_primary_device_path)
|
||||
handoff_config = (handoff['id'] + 1) % 4
|
||||
failed_config = (failed_primary['id'] + 1) % 4
|
||||
partner_nodes = reconstructor._get_partners(
|
||||
failed_primary['index'], self.nodes)
|
||||
random.shuffle(partner_nodes)
|
||||
for partner in partner_nodes:
|
||||
fix_config = (partner['id'] + 1) % 4
|
||||
if fix_config not in (handoff_config, failed_config):
|
||||
break
|
||||
else:
|
||||
self.fail('unable to find fix_config in %r excluding %r & %r' % (
|
||||
[(d['device'], (d['id'] + 1) % 4) for d in partner_nodes],
|
||||
handoff_config, failed_config))
|
||||
|
||||
self.reconstructor.once(number=fix_config)
|
||||
# validate object in all locations
|
||||
missing = []
|
||||
etags = set()
|
||||
metadata = []
|
||||
for node in self.nodes:
|
||||
try:
|
||||
headers, etag = self.direct_get(node, self.part)
|
||||
except direct_client.DirectClientException as err:
|
||||
if err.http_status != 404:
|
||||
raise
|
||||
missing.append(node)
|
||||
continue
|
||||
etags.add(headers['X-Object-Sysmeta-Ec-Etag'])
|
||||
metadata.append(headers['X-Object-Meta-Bar'])
|
||||
if missing:
|
||||
self.fail('Ran reconstructor config #%s to repair %r but '
|
||||
'found 404 on primary: %r' % (
|
||||
fix_config, failed_primary['device'],
|
||||
[d['device'] for d in missing]))
|
||||
self.assertEqual(1, len(etags))
|
||||
self.assertEqual(['meta-bar'] * len(self.nodes), metadata)
|
||||
# process revert
|
||||
self.reconstructor.once(number=handoff_config)
|
||||
# validate object (still?) in primary locations
|
||||
etags = set()
|
||||
metadata = []
|
||||
for node in self.nodes:
|
||||
headers, etag = self.direct_get(node, self.part)
|
||||
etags.add(headers['X-Object-Sysmeta-Ec-Etag'])
|
||||
metadata.append(headers['X-Object-Meta-Bar'])
|
||||
self.assertEqual(1, len(etags))
|
||||
self.assertEqual(['meta-bar'] * len(self.nodes), metadata)
|
||||
# and removed form handoff
|
||||
with self.assertRaises(direct_client.DirectClientException) as ctx:
|
||||
headers, etag = self.direct_get(handoff, self.part)
|
||||
self.assertEqual(ctx.exception.http_status, 404)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
main()
|
||||
|
@ -1464,6 +1464,7 @@ class TestSsyncReplication(TestBaseSsync):
|
||||
|
||||
# o5 is on tx with meta, rx is in sync with data and meta
|
||||
t5 = next(self.ts_iter)
|
||||
t5 = utils.Timestamp(t5, offset=1) # note: use an offset for this test
|
||||
rx_objs['o5'] = self._create_ondisk_files(rx_df_mgr, 'o5', policy, t5)
|
||||
tx_objs['o5'] = self._create_ondisk_files(tx_df_mgr, 'o5', policy, t5)
|
||||
t5_meta = next(self.ts_iter)
|
||||
|
@ -2053,6 +2053,15 @@ class TestModuleMethods(unittest.TestCase):
|
||||
actual = ssync_receiver.decode_missing(msg)
|
||||
self.assertEqual(expected, actual)
|
||||
|
||||
# test encode and decode functions invert with offset
|
||||
t_data_offset = utils.Timestamp(t_data, offset=1)
|
||||
expected = {'object_hash': object_hash, 'ts_meta': t_meta,
|
||||
'ts_data': t_data_offset, 'ts_ctype': t_type,
|
||||
'durable': False}
|
||||
msg = ssync_sender.encode_missing(**expected)
|
||||
actual = ssync_receiver.decode_missing(msg)
|
||||
self.assertEqual(expected, actual)
|
||||
|
||||
def test_decode_wanted(self):
|
||||
parts = ['d']
|
||||
expected = {'data': True}
|
||||
|
Loading…
Reference in New Issue
Block a user