From 9abd8ae71ec385e7687a11b50d7473be65eb9341 Mon Sep 17 00:00:00 2001 From: Matthew Oliver Date: Tue, 2 Dec 2025 14:36:40 +0000 Subject: [PATCH] sharder: use correct Timestamp formats Use Timestamp.internal for X-Timestamp header when making a PUT request to send shard range updates to the root container. The unit test already asserts that the internal format is sent, but the test failed to use a Timestamp where the internal and normal formats differed.. Don't try to cast timestamp strings to floats when sorting CleavingContext metadata. Timestamp strings may have the form _ so it is safer to cast a Timestamp string to a Timestamp and then to cast the Timestamp to a float when necessary. Change-Id: I59e140d1226a4b0c2c8f37678bc2dc87d9ffa631 Signed-off-by: Alistair Coles Co-Authored-By: Alistair Coles Signed-off-by: Matthew Oliver --- swift/container/sharder.py | 19 ++++++------- test/unit/container/test_sharder.py | 44 +++++++++++++++++++++++++---- 2 files changed, 47 insertions(+), 16 deletions(-) diff --git a/swift/container/sharder.py b/swift/container/sharder.py index ef7cb5e871..e878b83ebc 100644 --- a/swift/container/sharder.py +++ b/swift/container/sharder.py @@ -1018,27 +1018,26 @@ class ContainerSharder(ContainerSharderConf, ContainerReplicator): if own_shard_range.state not in ShardRange.CLEAVING_STATES: return - sharded_ctx = None + latest_context = latest_context_ts = None if db_state == SHARDED: contexts = CleavingContext.load_all(broker) if not contexts: return - context_ts = max(float(ts) for c, ts in contexts) - if context_ts + self.recon_sharded_timeout \ + contexts_sorted = sorted(contexts, key=lambda x: Timestamp(x[1])) + latest_context = contexts_sorted[-1] + latest_context_ts = Timestamp(latest_context[1]) + if float(latest_context_ts) + self.recon_sharded_timeout \ < float(Timestamp.now()): # last context timestamp too old for the # broker to be recorded return - contexts_sorted = sorted(contexts, key=lambda x: float(x[1])) - sharded_ctx = contexts_sorted[-1] - update_own_shard_range_stats(broker, own_shard_range) info = self._make_stats_info(broker, node, own_shard_range) - if sharded_ctx: - info["total_replicate_time"] = sharded_ctx[0].replication_time - sharding_total_elapsed = (float(sharded_ctx[1]) + if latest_context: + info["total_replicate_time"] = latest_context[0].replication_time + sharding_total_elapsed = (float(latest_context_ts) - float(own_shard_range.epoch)) info['total_sharding_time'] = sharding_total_elapsed @@ -1215,7 +1214,7 @@ class ContainerSharder(ContainerSharderConf, ContainerReplicator): headers.update({'X-Backend-Record-Type': RECORD_TYPE_SHARD, USE_REPLICATION_NETWORK_HEADER: 'True', 'User-Agent': 'container-sharder %s' % os.getpid(), - 'X-Timestamp': Timestamp.now().normal, + 'X-Timestamp': Timestamp.now().internal, 'Content-Length': len(body), 'Content-Type': 'application/json'}) diff --git a/test/unit/container/test_sharder.py b/test/unit/container/test_sharder.py index 653d6f0ebb..d6c508dc72 100644 --- a/test/unit/container/test_sharder.py +++ b/test/unit/container/test_sharder.py @@ -75,19 +75,32 @@ class BaseTestSharder(unittest.TestCase): def _make_broker(self, account='a', container='c', epoch=None, device='sda', part=0, hash_=None, put_timestamp=None): + """ + Make a ContainerBroker. + + :param account: account name + :param container: container name + :param epoch: epoch; a Timestamp instance + :param device: device name + :param part: partition number + :param hash_: hash of container name + :param put_timestamp: put timestamp; a Timestamp instance + :return: a ContainerBroker instance + """ hash_ = hash_ or md5( container.encode('utf-8'), usedforsecurity=False).hexdigest() datadir = os.path.join( self.tempdir, device, 'containers', str(part), hash_[-3:], hash_) if epoch: - filename = '%s_%s.db' % (hash_, epoch) + filename = '%s_%s.db' % (hash_, epoch.normal) else: filename = hash_ + '.db' db_file = os.path.join(datadir, filename) broker = ContainerBroker( db_file, account=account, container=container, logger=self.logger) - broker.initialize(put_timestamp=put_timestamp) + put_ts_str = None if put_timestamp is None else put_timestamp.internal + broker.initialize(put_timestamp=put_ts_str) return broker def _make_old_style_sharding_broker(self, account='a', container='c', @@ -2266,7 +2279,7 @@ class TestSharder(BaseTestSharder): db_hash = hash_path(acceptor.account, acceptor.container) # NB expected cleaved db name includes acceptor epoch - db_name = '%s_%s.db' % (db_hash, acceptor_epoch.internal) + db_name = '%s_%s.db' % (db_hash, acceptor_epoch.normal) expected_acceptor_dbs.append( os.path.join(self.tempdir, 'sda', 'containers', '0', db_hash[-3:], db_hash, db_name)) @@ -3454,6 +3467,21 @@ class TestSharder(BaseTestSharder): sharder._record_sharding_progress(broker, {}, None, 0.5) mocked.assert_not_called() + def test_sharded_record_sharding_progress_tolerates_timestamp_offset(self): + # CleavingContext metadata might one day have timestamps with offset, + # so verify they can be sorted... + ts_iter_with_offset = (Timestamp(float(ts) + i, offset=i) + for i, ts in enumerate(self.ts_iter, start=1)) + with mock.patch('swift.common.utils.Timestamp.now', + side_effect=ts_iter_with_offset): + broker = self._check_complete_sharding( + 'a', 'c', (('', 'mid'), ('mid', ''))) + + with self._mock_sharder() as sharder: + with mock.patch.object(sharder, '_append_stat') as mocked: + sharder._record_sharding_progress(broker, {}, None, 1234) + mocked.assert_called_once_with('sharding_in_progress', 'all', mock.ANY) + def test_incomplete_sharding_progress_warning_log(self): # test to verify sharder will print warning logs if sharding has been # taking too long. @@ -5368,6 +5396,10 @@ class TestSharder(BaseTestSharder): with self._mock_sharder(replicas=replicas) as sharder: with mocked_http_conn(*resp_codes, give_send=on_send) as conn: with mock_timestamp_now() as now: + # we don't expect these PUTs to have offsets but it's + # used here to verify that the internal format of the + # Timestamp is used for X-Timestamp + now.offset = 1 res = sharder._send_shard_ranges( broker, 'a', 'c', shard_ranges) @@ -5764,7 +5796,7 @@ class TestSharder(BaseTestSharder): def test_process_broker_leader_auto_shard(self): # verify conditions for acting as auto-shard leader - broker = self._make_broker(put_timestamp=next(self.ts_iter).internal) + broker = self._make_broker(put_timestamp=next(self.ts_iter)) objects = [ ['obj%3d' % i, self.ts_encoded(), i, 'text/plain', 'etag%s' % i, 0] for i in range(10)] @@ -5817,7 +5849,7 @@ class TestSharder(BaseTestSharder): 'rows_per_shard': 5, 'shrink_threshold': 1, 'auto_shard': True} - broker = self._make_broker(put_timestamp=next(self.ts_iter).internal) + broker = self._make_broker(put_timestamp=next(self.ts_iter)) broker.delete_db(next(self.ts_iter).internal) self.assertTrue(broker.is_deleted()) # sanity check node = {'ip': '1.2.3.4', 'port': 6040, 'device': 'sda5', 'id': '2', @@ -6029,7 +6061,7 @@ class TestSharder(BaseTestSharder): def test_audit_root_container_reset_epoch(self): epoch = next(self.ts_iter) - broker = self._make_broker(epoch=epoch.normal) + broker = self._make_broker(epoch=epoch) shard_bounds = (('', 'j'), ('j', 'k'), ('k', 's'), ('s', 'y'), ('y', '')) shard_ranges = self._make_shard_ranges(shard_bounds,