sharder: use correct Timestamp formats
Use Timestamp.internal for X-Timestamp header when making a PUT request to send shard range updates to the root container. The unit test already asserts that the internal format is sent, but the test failed to use a Timestamp where the internal and normal formats differed.. Don't try to cast timestamp strings to floats when sorting CleavingContext metadata. Timestamp strings may have the form <float>_<offset> so it is safer to cast a Timestamp string to a Timestamp and then to cast the Timestamp to a float when necessary. Change-Id: I59e140d1226a4b0c2c8f37678bc2dc87d9ffa631 Signed-off-by: Alistair Coles <alistairncoles@gmail.com> Co-Authored-By: Alistair Coles <alistairncoles@gmail.com> Signed-off-by: Matthew Oliver <matt@oliver.net.au>
This commit is contained in:
committed by
Alistair Coles
parent
0eabd90388
commit
9abd8ae71e
@@ -1018,27 +1018,26 @@ class ContainerSharder(ContainerSharderConf, ContainerReplicator):
|
|||||||
if own_shard_range.state not in ShardRange.CLEAVING_STATES:
|
if own_shard_range.state not in ShardRange.CLEAVING_STATES:
|
||||||
return
|
return
|
||||||
|
|
||||||
sharded_ctx = None
|
latest_context = latest_context_ts = None
|
||||||
if db_state == SHARDED:
|
if db_state == SHARDED:
|
||||||
contexts = CleavingContext.load_all(broker)
|
contexts = CleavingContext.load_all(broker)
|
||||||
if not contexts:
|
if not contexts:
|
||||||
return
|
return
|
||||||
context_ts = max(float(ts) for c, ts in contexts)
|
contexts_sorted = sorted(contexts, key=lambda x: Timestamp(x[1]))
|
||||||
if context_ts + self.recon_sharded_timeout \
|
latest_context = contexts_sorted[-1]
|
||||||
|
latest_context_ts = Timestamp(latest_context[1])
|
||||||
|
if float(latest_context_ts) + self.recon_sharded_timeout \
|
||||||
< float(Timestamp.now()):
|
< float(Timestamp.now()):
|
||||||
# last context timestamp too old for the
|
# last context timestamp too old for the
|
||||||
# broker to be recorded
|
# broker to be recorded
|
||||||
return
|
return
|
||||||
|
|
||||||
contexts_sorted = sorted(contexts, key=lambda x: float(x[1]))
|
|
||||||
sharded_ctx = contexts_sorted[-1]
|
|
||||||
|
|
||||||
update_own_shard_range_stats(broker, own_shard_range)
|
update_own_shard_range_stats(broker, own_shard_range)
|
||||||
info = self._make_stats_info(broker, node, own_shard_range)
|
info = self._make_stats_info(broker, node, own_shard_range)
|
||||||
|
|
||||||
if sharded_ctx:
|
if latest_context:
|
||||||
info["total_replicate_time"] = sharded_ctx[0].replication_time
|
info["total_replicate_time"] = latest_context[0].replication_time
|
||||||
sharding_total_elapsed = (float(sharded_ctx[1])
|
sharding_total_elapsed = (float(latest_context_ts)
|
||||||
- float(own_shard_range.epoch))
|
- float(own_shard_range.epoch))
|
||||||
info['total_sharding_time'] = sharding_total_elapsed
|
info['total_sharding_time'] = sharding_total_elapsed
|
||||||
|
|
||||||
@@ -1215,7 +1214,7 @@ class ContainerSharder(ContainerSharderConf, ContainerReplicator):
|
|||||||
headers.update({'X-Backend-Record-Type': RECORD_TYPE_SHARD,
|
headers.update({'X-Backend-Record-Type': RECORD_TYPE_SHARD,
|
||||||
USE_REPLICATION_NETWORK_HEADER: 'True',
|
USE_REPLICATION_NETWORK_HEADER: 'True',
|
||||||
'User-Agent': 'container-sharder %s' % os.getpid(),
|
'User-Agent': 'container-sharder %s' % os.getpid(),
|
||||||
'X-Timestamp': Timestamp.now().normal,
|
'X-Timestamp': Timestamp.now().internal,
|
||||||
'Content-Length': len(body),
|
'Content-Length': len(body),
|
||||||
'Content-Type': 'application/json'})
|
'Content-Type': 'application/json'})
|
||||||
|
|
||||||
|
|||||||
@@ -75,19 +75,32 @@ class BaseTestSharder(unittest.TestCase):
|
|||||||
|
|
||||||
def _make_broker(self, account='a', container='c', epoch=None,
|
def _make_broker(self, account='a', container='c', epoch=None,
|
||||||
device='sda', part=0, hash_=None, put_timestamp=None):
|
device='sda', part=0, hash_=None, put_timestamp=None):
|
||||||
|
"""
|
||||||
|
Make a ContainerBroker.
|
||||||
|
|
||||||
|
:param account: account name
|
||||||
|
:param container: container name
|
||||||
|
:param epoch: epoch; a Timestamp instance
|
||||||
|
:param device: device name
|
||||||
|
:param part: partition number
|
||||||
|
:param hash_: hash of container name
|
||||||
|
:param put_timestamp: put timestamp; a Timestamp instance
|
||||||
|
:return: a ContainerBroker instance
|
||||||
|
"""
|
||||||
hash_ = hash_ or md5(
|
hash_ = hash_ or md5(
|
||||||
container.encode('utf-8'), usedforsecurity=False).hexdigest()
|
container.encode('utf-8'), usedforsecurity=False).hexdigest()
|
||||||
datadir = os.path.join(
|
datadir = os.path.join(
|
||||||
self.tempdir, device, 'containers', str(part), hash_[-3:], hash_)
|
self.tempdir, device, 'containers', str(part), hash_[-3:], hash_)
|
||||||
if epoch:
|
if epoch:
|
||||||
filename = '%s_%s.db' % (hash_, epoch)
|
filename = '%s_%s.db' % (hash_, epoch.normal)
|
||||||
else:
|
else:
|
||||||
filename = hash_ + '.db'
|
filename = hash_ + '.db'
|
||||||
db_file = os.path.join(datadir, filename)
|
db_file = os.path.join(datadir, filename)
|
||||||
broker = ContainerBroker(
|
broker = ContainerBroker(
|
||||||
db_file, account=account, container=container,
|
db_file, account=account, container=container,
|
||||||
logger=self.logger)
|
logger=self.logger)
|
||||||
broker.initialize(put_timestamp=put_timestamp)
|
put_ts_str = None if put_timestamp is None else put_timestamp.internal
|
||||||
|
broker.initialize(put_timestamp=put_ts_str)
|
||||||
return broker
|
return broker
|
||||||
|
|
||||||
def _make_old_style_sharding_broker(self, account='a', container='c',
|
def _make_old_style_sharding_broker(self, account='a', container='c',
|
||||||
@@ -2266,7 +2279,7 @@ class TestSharder(BaseTestSharder):
|
|||||||
db_hash = hash_path(acceptor.account,
|
db_hash = hash_path(acceptor.account,
|
||||||
acceptor.container)
|
acceptor.container)
|
||||||
# NB expected cleaved db name includes acceptor epoch
|
# NB expected cleaved db name includes acceptor epoch
|
||||||
db_name = '%s_%s.db' % (db_hash, acceptor_epoch.internal)
|
db_name = '%s_%s.db' % (db_hash, acceptor_epoch.normal)
|
||||||
expected_acceptor_dbs.append(
|
expected_acceptor_dbs.append(
|
||||||
os.path.join(self.tempdir, 'sda', 'containers', '0',
|
os.path.join(self.tempdir, 'sda', 'containers', '0',
|
||||||
db_hash[-3:], db_hash, db_name))
|
db_hash[-3:], db_hash, db_name))
|
||||||
@@ -3454,6 +3467,21 @@ class TestSharder(BaseTestSharder):
|
|||||||
sharder._record_sharding_progress(broker, {}, None, 0.5)
|
sharder._record_sharding_progress(broker, {}, None, 0.5)
|
||||||
mocked.assert_not_called()
|
mocked.assert_not_called()
|
||||||
|
|
||||||
|
def test_sharded_record_sharding_progress_tolerates_timestamp_offset(self):
|
||||||
|
# CleavingContext metadata might one day have timestamps with offset,
|
||||||
|
# so verify they can be sorted...
|
||||||
|
ts_iter_with_offset = (Timestamp(float(ts) + i, offset=i)
|
||||||
|
for i, ts in enumerate(self.ts_iter, start=1))
|
||||||
|
with mock.patch('swift.common.utils.Timestamp.now',
|
||||||
|
side_effect=ts_iter_with_offset):
|
||||||
|
broker = self._check_complete_sharding(
|
||||||
|
'a', 'c', (('', 'mid'), ('mid', '')))
|
||||||
|
|
||||||
|
with self._mock_sharder() as sharder:
|
||||||
|
with mock.patch.object(sharder, '_append_stat') as mocked:
|
||||||
|
sharder._record_sharding_progress(broker, {}, None, 1234)
|
||||||
|
mocked.assert_called_once_with('sharding_in_progress', 'all', mock.ANY)
|
||||||
|
|
||||||
def test_incomplete_sharding_progress_warning_log(self):
|
def test_incomplete_sharding_progress_warning_log(self):
|
||||||
# test to verify sharder will print warning logs if sharding has been
|
# test to verify sharder will print warning logs if sharding has been
|
||||||
# taking too long.
|
# taking too long.
|
||||||
@@ -5368,6 +5396,10 @@ class TestSharder(BaseTestSharder):
|
|||||||
with self._mock_sharder(replicas=replicas) as sharder:
|
with self._mock_sharder(replicas=replicas) as sharder:
|
||||||
with mocked_http_conn(*resp_codes, give_send=on_send) as conn:
|
with mocked_http_conn(*resp_codes, give_send=on_send) as conn:
|
||||||
with mock_timestamp_now() as now:
|
with mock_timestamp_now() as now:
|
||||||
|
# we don't expect these PUTs to have offsets but it's
|
||||||
|
# used here to verify that the internal format of the
|
||||||
|
# Timestamp is used for X-Timestamp
|
||||||
|
now.offset = 1
|
||||||
res = sharder._send_shard_ranges(
|
res = sharder._send_shard_ranges(
|
||||||
broker, 'a', 'c', shard_ranges)
|
broker, 'a', 'c', shard_ranges)
|
||||||
|
|
||||||
@@ -5764,7 +5796,7 @@ class TestSharder(BaseTestSharder):
|
|||||||
|
|
||||||
def test_process_broker_leader_auto_shard(self):
|
def test_process_broker_leader_auto_shard(self):
|
||||||
# verify conditions for acting as auto-shard leader
|
# verify conditions for acting as auto-shard leader
|
||||||
broker = self._make_broker(put_timestamp=next(self.ts_iter).internal)
|
broker = self._make_broker(put_timestamp=next(self.ts_iter))
|
||||||
objects = [
|
objects = [
|
||||||
['obj%3d' % i, self.ts_encoded(), i, 'text/plain',
|
['obj%3d' % i, self.ts_encoded(), i, 'text/plain',
|
||||||
'etag%s' % i, 0] for i in range(10)]
|
'etag%s' % i, 0] for i in range(10)]
|
||||||
@@ -5817,7 +5849,7 @@ class TestSharder(BaseTestSharder):
|
|||||||
'rows_per_shard': 5,
|
'rows_per_shard': 5,
|
||||||
'shrink_threshold': 1,
|
'shrink_threshold': 1,
|
||||||
'auto_shard': True}
|
'auto_shard': True}
|
||||||
broker = self._make_broker(put_timestamp=next(self.ts_iter).internal)
|
broker = self._make_broker(put_timestamp=next(self.ts_iter))
|
||||||
broker.delete_db(next(self.ts_iter).internal)
|
broker.delete_db(next(self.ts_iter).internal)
|
||||||
self.assertTrue(broker.is_deleted()) # sanity check
|
self.assertTrue(broker.is_deleted()) # sanity check
|
||||||
node = {'ip': '1.2.3.4', 'port': 6040, 'device': 'sda5', 'id': '2',
|
node = {'ip': '1.2.3.4', 'port': 6040, 'device': 'sda5', 'id': '2',
|
||||||
@@ -6029,7 +6061,7 @@ class TestSharder(BaseTestSharder):
|
|||||||
|
|
||||||
def test_audit_root_container_reset_epoch(self):
|
def test_audit_root_container_reset_epoch(self):
|
||||||
epoch = next(self.ts_iter)
|
epoch = next(self.ts_iter)
|
||||||
broker = self._make_broker(epoch=epoch.normal)
|
broker = self._make_broker(epoch=epoch)
|
||||||
shard_bounds = (('', 'j'), ('j', 'k'), ('k', 's'),
|
shard_bounds = (('', 'j'), ('j', 'k'), ('k', 's'),
|
||||||
('s', 'y'), ('y', ''))
|
('s', 'y'), ('y', ''))
|
||||||
shard_ranges = self._make_shard_ranges(shard_bounds,
|
shard_ranges = self._make_shard_ranges(shard_bounds,
|
||||||
|
|||||||
Reference in New Issue
Block a user