sharder: update own_sr stats explicitly
Previously, when fetching a shard range from a container DB using ContainerBroker.get_own_shard_range(), the stats of the returned shard range were updated as a side effect. However, the stats were not persisted in the own shard range row in the DB. Often the extra DB queries to get the stats are unnecessary because we don't need up-to-date stats in the returned shard range. The behavior also leads to potential confusion because the object stats of the returned shard range object are not necessarily consistent with the object stats of the same shard range in the DB. This patch therefore removes the stats updating behavior from get_own_shard_range() and makes the stats updating happen as an explicit separate operation, when needed. This is also more consistent with how the tombstone count is updated. Up-to-date own shard range stats are persisted when a container is first enabled for sharding, and then each time a shard container reports its stats to the root container. Change-Id: Ib10ef918c8983ca006a3740db8cfd07df2dfecf7
This commit is contained in:
parent
a1939cba03
commit
001d931e6a
|
@ -173,7 +173,8 @@ from swift.container.sharder import make_shard_ranges, sharding_enabled, \
|
|||
CleavingContext, process_compactible_shard_sequences, \
|
||||
find_compactible_shard_sequences, find_overlapping_ranges, \
|
||||
find_paths, rank_paths, finalize_shrinking, DEFAULT_SHARDER_CONF, \
|
||||
ContainerSharderConf, find_paths_with_gaps, combine_shard_ranges
|
||||
ContainerSharderConf, find_paths_with_gaps, combine_shard_ranges, \
|
||||
update_own_shard_range_stats
|
||||
|
||||
EXIT_SUCCESS = 0
|
||||
EXIT_ERROR = 1
|
||||
|
@ -377,6 +378,9 @@ def db_info(broker, args):
|
|||
if own_sr else None))
|
||||
db_state = broker.get_db_state()
|
||||
print('db_state = %s' % db_state)
|
||||
info = broker.get_info()
|
||||
print('object_count = %d' % info['object_count'])
|
||||
print('bytes_used = %d' % info['bytes_used'])
|
||||
if db_state == 'sharding':
|
||||
print('Retiring db id: %s' % broker.get_brokers()[0].get_info()['id'])
|
||||
print('Cleaving context: %s' %
|
||||
|
@ -504,6 +508,8 @@ def _enable_sharding(broker, own_shard_range, args):
|
|||
if own_shard_range.update_state(ShardRange.SHARDING):
|
||||
own_shard_range.epoch = Timestamp.now()
|
||||
own_shard_range.state_timestamp = own_shard_range.epoch
|
||||
# initialise own_shard_range with current broker object stats...
|
||||
update_own_shard_range_stats(broker, own_shard_range)
|
||||
|
||||
with broker.updated_timeout(args.enable_timeout):
|
||||
broker.merge_shard_ranges([own_shard_range])
|
||||
|
|
|
@ -442,7 +442,7 @@ class ContainerBroker(DatabaseBroker):
|
|||
if self.db_epoch is None:
|
||||
# never been sharded
|
||||
return UNSHARDED
|
||||
if self.db_epoch != self._own_shard_range().epoch:
|
||||
if self.db_epoch != self.get_own_shard_range().epoch:
|
||||
return UNSHARDED
|
||||
if not self.get_shard_ranges():
|
||||
return COLLAPSED
|
||||
|
@ -1870,7 +1870,7 @@ class ContainerBroker(DatabaseBroker):
|
|||
marker, end_marker)
|
||||
|
||||
if fill_gaps:
|
||||
own_shard_range = self._own_shard_range()
|
||||
own_shard_range = self.get_own_shard_range()
|
||||
if shard_ranges:
|
||||
last_upper = shard_ranges[-1].upper
|
||||
else:
|
||||
|
@ -1879,7 +1879,7 @@ class ContainerBroker(DatabaseBroker):
|
|||
required_upper = min(end_marker or own_shard_range.upper,
|
||||
own_shard_range.upper)
|
||||
if required_upper > last_upper:
|
||||
filler_sr = self.get_own_shard_range()
|
||||
filler_sr = own_shard_range
|
||||
filler_sr.lower = last_upper
|
||||
filler_sr.upper = required_upper
|
||||
shard_ranges.append(filler_sr)
|
||||
|
@ -1889,7 +1889,22 @@ class ContainerBroker(DatabaseBroker):
|
|||
|
||||
return shard_ranges
|
||||
|
||||
def _own_shard_range(self, no_default=False):
|
||||
def get_own_shard_range(self, no_default=False):
|
||||
"""
|
||||
Returns a shard range representing this broker's own shard range. If no
|
||||
such range has been persisted in the broker's shard ranges table then a
|
||||
default shard range representing the entire namespace will be returned.
|
||||
|
||||
The ``object_count`` and ``bytes_used`` of the returned shard range are
|
||||
not guaranteed to be up-to-date with the current object stats for this
|
||||
broker. Callers that require up-to-date stats should use the
|
||||
``get_info`` method.
|
||||
|
||||
:param no_default: if True and the broker's own shard range is not
|
||||
found in the shard ranges table then None is returned, otherwise a
|
||||
default shard range is returned.
|
||||
:return: an instance of :class:`~swift.common.utils.ShardRange`
|
||||
"""
|
||||
shard_ranges = self.get_shard_ranges(include_own=True,
|
||||
include_deleted=True,
|
||||
exclude_others=True)
|
||||
|
@ -1903,28 +1918,6 @@ class ContainerBroker(DatabaseBroker):
|
|||
state=ShardRange.ACTIVE)
|
||||
return own_shard_range
|
||||
|
||||
def get_own_shard_range(self, no_default=False):
|
||||
"""
|
||||
Returns a shard range representing this broker's own shard range. If no
|
||||
such range has been persisted in the broker's shard ranges table then a
|
||||
default shard range representing the entire namespace will be returned.
|
||||
|
||||
The returned shard range will be updated with the current object stats
|
||||
for this broker and a meta timestamp set to the current time. For these
|
||||
values to be persisted the caller must merge the shard range.
|
||||
|
||||
:param no_default: if True and the broker's own shard range is not
|
||||
found in the shard ranges table then None is returned, otherwise a
|
||||
default shard range is returned.
|
||||
:return: an instance of :class:`~swift.common.utils.ShardRange`
|
||||
"""
|
||||
own_shard_range = self._own_shard_range(no_default=no_default)
|
||||
if own_shard_range:
|
||||
info = self.get_info()
|
||||
own_shard_range.update_meta(
|
||||
info['object_count'], info['bytes_used'])
|
||||
return own_shard_range
|
||||
|
||||
def is_own_shard_range(self, shard_range):
|
||||
return shard_range.name == self.path
|
||||
|
||||
|
@ -1936,7 +1929,7 @@ class ContainerBroker(DatabaseBroker):
|
|||
:param epoch: a :class:`~swift.utils.common.Timestamp`
|
||||
:return: the broker's updated own shard range.
|
||||
"""
|
||||
own_shard_range = self._own_shard_range()
|
||||
own_shard_range = self.get_own_shard_range()
|
||||
own_shard_range.update_state(ShardRange.SHARDING, epoch)
|
||||
own_shard_range.epoch = epoch
|
||||
self.merge_shard_ranges(own_shard_range)
|
||||
|
@ -2232,9 +2225,7 @@ class ContainerBroker(DatabaseBroker):
|
|||
|
||||
# Else, we're either a root or a legacy deleted shard whose sharding
|
||||
# sysmeta was deleted
|
||||
|
||||
# Use internal method so we don't try to update stats.
|
||||
own_shard_range = self._own_shard_range(no_default=True)
|
||||
own_shard_range = self.get_own_shard_range(no_default=True)
|
||||
if not own_shard_range:
|
||||
return True # Never been sharded
|
||||
|
||||
|
|
|
@ -559,6 +559,26 @@ def combine_shard_ranges(new_shard_ranges, existing_shard_ranges):
|
|||
key=ShardRange.sort_key)
|
||||
|
||||
|
||||
def update_own_shard_range_stats(broker, own_shard_range):
|
||||
"""
|
||||
Update the ``own_shard_range`` with the up-to-date object stats from
|
||||
the ``broker``.
|
||||
|
||||
Note: this method does not persist the updated ``own_shard_range``;
|
||||
callers should use ``broker.merge_shard_ranges`` if the updated stats
|
||||
need to be persisted.
|
||||
|
||||
:param broker: an instance of ``ContainerBroker``.
|
||||
:param own_shard_range: and instance of ``ShardRange``.
|
||||
:returns: ``own_shard_range`` with up-to-date ``object_count``
|
||||
and ``bytes_used``.
|
||||
"""
|
||||
info = broker.get_info()
|
||||
own_shard_range.update_meta(
|
||||
info['object_count'], info['bytes_used'])
|
||||
return own_shard_range
|
||||
|
||||
|
||||
class CleavingContext(object):
|
||||
"""
|
||||
Encapsulates metadata associated with the process of cleaving a retiring
|
||||
|
@ -943,6 +963,7 @@ class ContainerSharder(ContainerSharderConf, ContainerReplicator):
|
|||
|
||||
def _identify_sharding_candidate(self, broker, node):
|
||||
own_shard_range = broker.get_own_shard_range()
|
||||
update_own_shard_range_stats(broker, own_shard_range)
|
||||
if is_sharding_candidate(
|
||||
own_shard_range, self.shard_container_threshold):
|
||||
self.sharding_candidates.append(
|
||||
|
@ -957,6 +978,7 @@ class ContainerSharder(ContainerSharderConf, ContainerReplicator):
|
|||
|
||||
if compactible_ranges:
|
||||
own_shard_range = broker.get_own_shard_range()
|
||||
update_own_shard_range_stats(broker, own_shard_range)
|
||||
shrink_candidate = self._make_stats_info(
|
||||
broker, node, own_shard_range)
|
||||
# The number of ranges/donors that can be shrunk if the
|
||||
|
@ -992,6 +1014,7 @@ class ContainerSharder(ContainerSharderConf, ContainerReplicator):
|
|||
# broker to be recorded
|
||||
return
|
||||
|
||||
update_own_shard_range_stats(broker, own_shard_range)
|
||||
info = self._make_stats_info(broker, node, own_shard_range)
|
||||
info['state'] = own_shard_range.state_text
|
||||
info['db_state'] = broker.get_db_state()
|
||||
|
@ -2244,11 +2267,12 @@ class ContainerSharder(ContainerSharderConf, ContainerReplicator):
|
|||
self.logger.debug('tombstones in %s = %d',
|
||||
quote(broker.path), tombstones)
|
||||
own_shard_range.update_tombstones(tombstones)
|
||||
|
||||
update_own_shard_range_stats(broker, own_shard_range)
|
||||
if own_shard_range.reported:
|
||||
# no change to the stats metadata
|
||||
return
|
||||
|
||||
# persist the reported shard metadata
|
||||
# stats metadata has been updated so persist it
|
||||
broker.merge_shard_ranges(own_shard_range)
|
||||
# now get a consistent list of own and other shard ranges
|
||||
shard_ranges = broker.get_shard_ranges(
|
||||
|
@ -2285,8 +2309,10 @@ class ContainerSharder(ContainerSharderConf, ContainerReplicator):
|
|||
if state in (UNSHARDED, COLLAPSED):
|
||||
if is_leader and broker.is_root_container():
|
||||
# bootstrap sharding of root container
|
||||
own_shard_range = broker.get_own_shard_range()
|
||||
update_own_shard_range_stats(broker, own_shard_range)
|
||||
self._find_and_enable_sharding_candidates(
|
||||
broker, shard_ranges=[broker.get_own_shard_range()])
|
||||
broker, shard_ranges=[own_shard_range])
|
||||
|
||||
own_shard_range = broker.get_own_shard_range()
|
||||
if own_shard_range.state in ShardRange.CLEAVING_STATES:
|
||||
|
|
|
@ -1864,7 +1864,7 @@ class TestContainerSharding(BaseAutoContainerSharding):
|
|||
orig_shard_ranges[1].account, orig_shard_ranges[1].container)
|
||||
check_shard_nodes_data(
|
||||
shard_nodes_data, expected_state='sharded', expected_shards=1,
|
||||
exp_obj_count=1)
|
||||
exp_obj_count=0)
|
||||
|
||||
# check root container
|
||||
root_nodes_data = self.direct_get_container_shard_ranges()
|
||||
|
@ -3183,7 +3183,7 @@ class TestManagedContainerSharding(BaseTestContainerSharding):
|
|||
self.maxDiff = None
|
||||
self.assertEqual(exp_shard_ranges, shard_ranges)
|
||||
self.assertEqual(ShardRange.SHARDED,
|
||||
broker._own_shard_range().state)
|
||||
broker.get_own_shard_range().state)
|
||||
|
||||
# Sadly, the first replica to start sharding is still reporting its db
|
||||
# state to be 'unsharded' because, although it has sharded, its shard
|
||||
|
@ -3219,7 +3219,7 @@ class TestManagedContainerSharding(BaseTestContainerSharding):
|
|||
self.assertLengthEqual(shard_ranges, len(exp_shard_ranges))
|
||||
self.assertEqual(exp_shard_ranges, shard_ranges)
|
||||
self.assertEqual(ShardRange.SHARDED,
|
||||
broker._own_shard_range().state)
|
||||
broker.get_own_shard_range().state)
|
||||
self.assertEqual(epoch_1, broker.db_epoch)
|
||||
self.assertIn(brokers[0].get_db_state(), (SHARDING, SHARDED))
|
||||
self.assertEqual(SHARDED, brokers[1].get_db_state())
|
||||
|
@ -3263,7 +3263,7 @@ class TestManagedContainerSharding(BaseTestContainerSharding):
|
|||
self.assertLengthEqual(shard_ranges, len(exp_shard_ranges))
|
||||
self.assertEqual(exp_shard_ranges, shard_ranges)
|
||||
self.assertEqual(ShardRange.SHARDED,
|
||||
broker._own_shard_range().state)
|
||||
broker.get_own_shard_range().state)
|
||||
self.assertEqual(epoch_1, broker.db_epoch)
|
||||
self.assertEqual(SHARDED, broker.get_db_state())
|
||||
|
||||
|
|
|
@ -590,6 +590,13 @@ class TestManageShardRanges(unittest.TestCase):
|
|||
|
||||
def test_info(self):
|
||||
broker = self._make_broker()
|
||||
ts = next(self.ts_iter)
|
||||
broker.merge_items([
|
||||
{'name': 'obj%02d' % i, 'created_at': ts.internal, 'size': 9,
|
||||
'content_type': 'application/octet-stream', 'etag': 'not-really',
|
||||
'deleted': 0, 'storage_policy_index': 0,
|
||||
'ctype_timestamp': ts.internal, 'meta_timestamp': ts.internal}
|
||||
for i in range(100)])
|
||||
broker.update_metadata({'X-Container-Sysmeta-Sharding':
|
||||
(True, Timestamp.now().internal)})
|
||||
out = StringIO()
|
||||
|
@ -600,6 +607,8 @@ class TestManageShardRanges(unittest.TestCase):
|
|||
expected = ['Sharding enabled = True',
|
||||
'Own shard range: None',
|
||||
'db_state = unsharded',
|
||||
'object_count = 100',
|
||||
'bytes_used = 900',
|
||||
'Metadata:',
|
||||
' X-Container-Sysmeta-Sharding = True']
|
||||
self.assertEqual(expected, out.getvalue().splitlines())
|
||||
|
@ -635,13 +644,15 @@ class TestManageShardRanges(unittest.TestCase):
|
|||
' "upper": ""',
|
||||
'}',
|
||||
'db_state = sharding',
|
||||
'object_count = 100',
|
||||
'bytes_used = 900',
|
||||
'Retiring db id: %s' % retiring_db_id,
|
||||
'Cleaving context: {',
|
||||
' "cleave_to_row": null,',
|
||||
' "cleaving_done": false,',
|
||||
' "cursor": "",',
|
||||
' "last_cleave_to_row": null,',
|
||||
' "max_row": -1,',
|
||||
' "max_row": 100,',
|
||||
' "misplaced_done": false,',
|
||||
' "ranges_done": 0,',
|
||||
' "ranges_todo": 0,',
|
||||
|
@ -679,6 +690,10 @@ class TestManageShardRanges(unittest.TestCase):
|
|||
' "upper": ""',
|
||||
'}',
|
||||
'db_state = sharded',
|
||||
# in sharded state the object stats are determined by the
|
||||
# shard ranges, and we haven't created any in the test...
|
||||
'object_count = 0',
|
||||
'bytes_used = 0',
|
||||
'Metadata:',
|
||||
' X-Container-Sysmeta-Sharding = True']
|
||||
self.assertEqual(expected,
|
||||
|
@ -1063,6 +1078,13 @@ class TestManageShardRanges(unittest.TestCase):
|
|||
|
||||
def test_enable(self):
|
||||
broker = self._make_broker()
|
||||
ts = next(self.ts_iter)
|
||||
broker.merge_items([
|
||||
{'name': 'obj%02d' % i, 'created_at': ts.internal, 'size': 9,
|
||||
'content_type': 'application/octet-stream', 'etag': 'not-really',
|
||||
'deleted': 0, 'storage_policy_index': 0,
|
||||
'ctype_timestamp': ts.internal, 'meta_timestamp': ts.internal}
|
||||
for i in range(100)])
|
||||
broker.update_metadata({'X-Container-Sysmeta-Sharding':
|
||||
(True, Timestamp.now().internal)})
|
||||
# no shard ranges
|
||||
|
@ -1085,7 +1107,7 @@ class TestManageShardRanges(unittest.TestCase):
|
|||
'.shards_a', 'c', 'c', Timestamp.now(), data['index'])
|
||||
shard_ranges.append(
|
||||
ShardRange(path, Timestamp.now(), data['lower'],
|
||||
data['upper'], data['object_count']))
|
||||
data['upper'], data['object_count'], bytes_used=9))
|
||||
broker.merge_shard_ranges(shard_ranges)
|
||||
out = StringIO()
|
||||
err = StringIO()
|
||||
|
@ -1101,6 +1123,10 @@ class TestManageShardRanges(unittest.TestCase):
|
|||
self.assertEqual(['Loaded db broker for a/c'],
|
||||
err.getvalue().splitlines())
|
||||
self._assert_enabled(broker, now)
|
||||
self.assertEqual(100, broker.get_info()['object_count'])
|
||||
self.assertEqual(100, broker.get_own_shard_range().object_count)
|
||||
self.assertEqual(900, broker.get_info()['bytes_used'])
|
||||
self.assertEqual(900, broker.get_own_shard_range().bytes_used)
|
||||
|
||||
# already enabled
|
||||
out = StringIO()
|
||||
|
|
|
@ -372,7 +372,7 @@ class TestContainerBroker(unittest.TestCase):
|
|||
broker.put_object('o', next(self.ts).internal, 0, 'text/plain',
|
||||
EMPTY_ETAG)
|
||||
own_sr = broker.get_own_shard_range()
|
||||
self.assertEqual(1, own_sr.object_count)
|
||||
self.assertEqual(0, own_sr.object_count)
|
||||
broker.merge_shard_ranges([own_sr])
|
||||
self.assertFalse(broker.empty())
|
||||
broker.delete_object('o', next(self.ts).internal)
|
||||
|
@ -461,7 +461,7 @@ class TestContainerBroker(unittest.TestCase):
|
|||
broker.put_object('o', next(self.ts).internal, 0, 'text/plain',
|
||||
EMPTY_ETAG)
|
||||
own_sr = broker.get_own_shard_range()
|
||||
self.assertEqual(1, own_sr.object_count)
|
||||
self.assertEqual(0, own_sr.object_count)
|
||||
broker.merge_shard_ranges([own_sr])
|
||||
self.assertFalse(broker.empty())
|
||||
broker.delete_object('o', next(self.ts).internal)
|
||||
|
@ -543,7 +543,7 @@ class TestContainerBroker(unittest.TestCase):
|
|||
broker.put_object('o', next(self.ts).internal, 0, 'text/plain',
|
||||
EMPTY_ETAG)
|
||||
own_sr = broker.get_own_shard_range()
|
||||
self.assertEqual(1, own_sr.object_count)
|
||||
self.assertEqual(0, own_sr.object_count)
|
||||
broker.merge_shard_ranges([own_sr])
|
||||
self.assertFalse(broker.empty())
|
||||
broker.delete_object('o', next(self.ts).internal)
|
||||
|
@ -4117,24 +4117,16 @@ class TestContainerBroker(unittest.TestCase):
|
|||
# fill gaps
|
||||
filler = own_shard_range.copy()
|
||||
filler.lower = 'h'
|
||||
with mock_timestamp_now() as now:
|
||||
actual = broker.get_shard_ranges(fill_gaps=True)
|
||||
filler.meta_timestamp = now
|
||||
actual = broker.get_shard_ranges(fill_gaps=True)
|
||||
self.assertEqual([dict(sr) for sr in undeleted + [filler]],
|
||||
[dict(sr) for sr in actual])
|
||||
with mock_timestamp_now() as now:
|
||||
actual = broker.get_shard_ranges(fill_gaps=True, marker='a')
|
||||
filler.meta_timestamp = now
|
||||
actual = broker.get_shard_ranges(fill_gaps=True, marker='a')
|
||||
self.assertEqual([dict(sr) for sr in undeleted + [filler]],
|
||||
[dict(sr) for sr in actual])
|
||||
with mock_timestamp_now() as now:
|
||||
actual = broker.get_shard_ranges(fill_gaps=True, end_marker='z')
|
||||
filler.meta_timestamp = now
|
||||
actual = broker.get_shard_ranges(fill_gaps=True, end_marker='z')
|
||||
self.assertEqual([dict(sr) for sr in undeleted + [filler]],
|
||||
[dict(sr) for sr in actual])
|
||||
with mock_timestamp_now() as now:
|
||||
actual = broker.get_shard_ranges(fill_gaps=True, end_marker='k')
|
||||
filler.meta_timestamp = now
|
||||
actual = broker.get_shard_ranges(fill_gaps=True, end_marker='k')
|
||||
filler.upper = 'k'
|
||||
self.assertEqual([dict(sr) for sr in undeleted + [filler]],
|
||||
[dict(sr) for sr in actual])
|
||||
|
@ -4342,14 +4334,15 @@ class TestContainerBroker(unittest.TestCase):
|
|||
db_path, account='.shards_a', container='shard_c')
|
||||
broker.initialize(next(self.ts).internal, 0)
|
||||
|
||||
# no row for own shard range - expect entire namespace default
|
||||
# no row for own shard range - expect a default own shard range
|
||||
# covering the entire namespace default
|
||||
now = Timestamp.now()
|
||||
expected = ShardRange(broker.path, now, '', '', 0, 0, now,
|
||||
state=ShardRange.ACTIVE)
|
||||
own_sr = ShardRange(broker.path, now, '', '', 0, 0, now,
|
||||
state=ShardRange.ACTIVE)
|
||||
with mock.patch('swift.container.backend.Timestamp.now',
|
||||
return_value=now):
|
||||
actual = broker.get_own_shard_range()
|
||||
self.assertEqual(dict(expected), dict(actual))
|
||||
self.assertEqual(dict(own_sr), dict(actual))
|
||||
|
||||
actual = broker.get_own_shard_range(no_default=True)
|
||||
self.assertIsNone(actual)
|
||||
|
@ -4361,52 +4354,44 @@ class TestContainerBroker(unittest.TestCase):
|
|||
[own_sr,
|
||||
ShardRange('.a/c1', next(self.ts), 'b', 'c'),
|
||||
ShardRange('.a/c2', next(self.ts), 'c', 'd')])
|
||||
expected = ShardRange(broker.path, ts_1, 'l', 'u', 0, 0, now)
|
||||
with mock.patch('swift.container.backend.Timestamp.now',
|
||||
return_value=now):
|
||||
actual = broker.get_own_shard_range()
|
||||
self.assertEqual(dict(expected), dict(actual))
|
||||
actual = broker.get_own_shard_range()
|
||||
self.assertEqual(dict(own_sr), dict(actual))
|
||||
|
||||
# check stats get updated
|
||||
# check stats are not automatically updated
|
||||
broker.put_object(
|
||||
'o1', next(self.ts).internal, 100, 'text/plain', 'etag1')
|
||||
broker.put_object(
|
||||
'o2', next(self.ts).internal, 99, 'text/plain', 'etag2')
|
||||
expected = ShardRange(
|
||||
broker.path, ts_1, 'l', 'u', 2, 199, now)
|
||||
with mock.patch('swift.container.backend.Timestamp.now',
|
||||
return_value=now):
|
||||
actual = broker.get_own_shard_range()
|
||||
self.assertEqual(dict(expected), dict(actual))
|
||||
actual = broker.get_own_shard_range()
|
||||
self.assertEqual(dict(own_sr), dict(actual))
|
||||
|
||||
# check non-zero stats returned
|
||||
own_sr.update_meta(object_count=2, bytes_used=199,
|
||||
meta_timestamp=next(self.ts))
|
||||
broker.merge_shard_ranges(own_sr)
|
||||
actual = broker.get_own_shard_range()
|
||||
self.assertEqual(dict(own_sr), dict(actual))
|
||||
|
||||
# still returned when deleted
|
||||
own_sr.update_meta(object_count=0, bytes_used=0,
|
||||
meta_timestamp=next(self.ts))
|
||||
delete_ts = next(self.ts)
|
||||
own_sr.set_deleted(timestamp=delete_ts)
|
||||
broker.merge_shard_ranges(own_sr)
|
||||
with mock.patch('swift.container.backend.Timestamp.now',
|
||||
return_value=now):
|
||||
actual = broker.get_own_shard_range()
|
||||
expected = ShardRange(
|
||||
broker.path, delete_ts, 'l', 'u', 2, 199, now, deleted=True)
|
||||
self.assertEqual(dict(expected), dict(actual))
|
||||
actual = broker.get_own_shard_range()
|
||||
self.assertEqual(dict(own_sr), dict(actual))
|
||||
|
||||
# still in table after reclaim_age
|
||||
broker.reclaim(next(self.ts).internal, next(self.ts).internal)
|
||||
with mock.patch('swift.container.backend.Timestamp.now',
|
||||
return_value=now):
|
||||
actual = broker.get_own_shard_range()
|
||||
self.assertEqual(dict(expected), dict(actual))
|
||||
actual = broker.get_own_shard_range()
|
||||
self.assertEqual(dict(own_sr), dict(actual))
|
||||
|
||||
# entire namespace
|
||||
ts_2 = next(self.ts)
|
||||
broker.merge_shard_ranges(
|
||||
[ShardRange(broker.path, ts_2, '', '')])
|
||||
expected = ShardRange(
|
||||
broker.path, ts_2, '', '', 2, 199, now)
|
||||
with mock.patch('swift.container.backend.Timestamp.now',
|
||||
return_value=now):
|
||||
actual = broker.get_own_shard_range()
|
||||
self.assertEqual(dict(expected), dict(actual))
|
||||
own_sr = ShardRange(broker.path, ts_2, '', '')
|
||||
broker.merge_shard_ranges([own_sr])
|
||||
actual = broker.get_own_shard_range()
|
||||
self.assertEqual(dict(own_sr), dict(actual))
|
||||
|
||||
@with_tempdir
|
||||
def test_enable_sharding(self, tempdir):
|
||||
|
|
|
@ -2778,7 +2778,7 @@ class TestContainerController(unittest.TestCase):
|
|||
|
||||
# listing shards don't cover entire namespace so expect an extra filler
|
||||
extra_shard_range = ShardRange(
|
||||
'a/c', ts_now, shard_ranges[2].upper, ShardRange.MAX, 2, 1024,
|
||||
'a/c', ts_now, shard_ranges[2].upper, ShardRange.MAX, 0, 0,
|
||||
state=ShardRange.ACTIVE)
|
||||
expected = shard_ranges[:3] + [extra_shard_range]
|
||||
check_shard_GET(expected, 'a/c', params='&states=listing')
|
||||
|
@ -2792,7 +2792,7 @@ class TestContainerController(unittest.TestCase):
|
|||
params='&states=listing&reverse=true&end_marker=pickle')
|
||||
# updating shards don't cover entire namespace so expect a filler
|
||||
extra_shard_range = ShardRange(
|
||||
'a/c', ts_now, shard_ranges[3].upper, ShardRange.MAX, 2, 1024,
|
||||
'a/c', ts_now, shard_ranges[3].upper, ShardRange.MAX, 0, 0,
|
||||
state=ShardRange.ACTIVE)
|
||||
expected = shard_ranges[1:4] + [extra_shard_range]
|
||||
check_shard_GET(expected, 'a/c', params='&states=updating')
|
||||
|
@ -2801,7 +2801,7 @@ class TestContainerController(unittest.TestCase):
|
|||
# when no listing shard ranges cover the requested namespace range then
|
||||
# filler is for entire requested namespace
|
||||
extra_shard_range = ShardRange(
|
||||
'a/c', ts_now, 'treacle', ShardRange.MAX, 2, 1024,
|
||||
'a/c', ts_now, 'treacle', ShardRange.MAX, 0, 0,
|
||||
state=ShardRange.ACTIVE)
|
||||
check_shard_GET([extra_shard_range], 'a/c',
|
||||
params='&states=listing&marker=treacle')
|
||||
|
@ -2809,7 +2809,7 @@ class TestContainerController(unittest.TestCase):
|
|||
[extra_shard_range], 'a/c',
|
||||
params='&states=listing&reverse=true&end_marker=treacle')
|
||||
extra_shard_range = ShardRange(
|
||||
'a/c', ts_now, 'treacle', 'walnut', 2, 1024,
|
||||
'a/c', ts_now, 'treacle', 'walnut', 0, 0,
|
||||
state=ShardRange.ACTIVE)
|
||||
params = '&states=listing&marker=treacle&end_marker=walnut'
|
||||
check_shard_GET([extra_shard_range], 'a/c', params=params)
|
||||
|
@ -3073,7 +3073,6 @@ class TestContainerController(unittest.TestCase):
|
|||
def test_GET_shard_ranges_using_state_aliases(self):
|
||||
# make a shard container
|
||||
ts_iter = make_timestamp_iter()
|
||||
ts_now = Timestamp.now() # used when mocking Timestamp.now()
|
||||
shard_ranges = []
|
||||
lower = ''
|
||||
for state in sorted(ShardRange.STATES.keys()):
|
||||
|
@ -3090,8 +3089,7 @@ class TestContainerController(unittest.TestCase):
|
|||
sr for sr in shard_ranges if sr.state in expected_states]
|
||||
own_shard_range = ShardRange(path, next(ts_iter), '', '',
|
||||
state=ShardRange.ACTIVE)
|
||||
expected.append(own_shard_range.copy(
|
||||
lower=expected[-1].upper, meta_timestamp=ts_now))
|
||||
expected.append(own_shard_range.copy(lower=expected[-1].upper))
|
||||
expected = [dict(sr, last_modified=sr.timestamp.isoformat)
|
||||
for sr in expected]
|
||||
headers = {'X-Timestamp': next(ts_iter).normal}
|
||||
|
@ -3114,8 +3112,7 @@ class TestContainerController(unittest.TestCase):
|
|||
req = Request.blank('/sda1/p/%s?format=json%s' %
|
||||
(path, params), method='GET',
|
||||
headers={'X-Backend-Record-Type': 'shard'})
|
||||
with mock_timestamp_now(ts_now):
|
||||
resp = req.get_response(self.controller)
|
||||
resp = req.get_response(self.controller)
|
||||
self.assertEqual(resp.status_int, 200)
|
||||
self.assertEqual(resp.content_type, 'application/json')
|
||||
self.assertEqual(expected, json.loads(resp.body))
|
||||
|
|
|
@ -43,7 +43,8 @@ from swift.container.sharder import ContainerSharder, sharding_enabled, \
|
|||
find_shrinking_candidates, process_compactible_shard_sequences, \
|
||||
find_compactible_shard_sequences, is_shrinking_candidate, \
|
||||
is_sharding_candidate, find_paths, rank_paths, ContainerSharderConf, \
|
||||
find_paths_with_gaps, combine_shard_ranges, find_overlapping_ranges
|
||||
find_paths_with_gaps, combine_shard_ranges, find_overlapping_ranges, \
|
||||
update_own_shard_range_stats
|
||||
from swift.common.utils import ShardRange, Timestamp, hash_path, \
|
||||
encode_timestamps, parse_db_filename, quorum_size, Everything, md5, \
|
||||
ShardName
|
||||
|
@ -5001,14 +5002,13 @@ class TestSharder(BaseTestSharder):
|
|||
with mock.patch.object(
|
||||
broker, 'set_sharding_state') as mock_set_sharding_state:
|
||||
with self._mock_sharder() as sharder:
|
||||
with mock_timestamp_now() as now:
|
||||
with mock_timestamp_now():
|
||||
with mock.patch.object(sharder, '_audit_container'):
|
||||
sharder._process_broker(broker, node, 99)
|
||||
own_shard_range = broker.get_own_shard_range(
|
||||
no_default=True)
|
||||
mock_set_sharding_state.assert_not_called()
|
||||
self.assertEqual(dict(own_sr, meta_timestamp=now),
|
||||
dict(own_shard_range))
|
||||
self.assertEqual(dict(own_sr), dict(own_shard_range))
|
||||
self.assertEqual(UNSHARDED, broker.get_db_state())
|
||||
self.assertFalse(broker.logger.get_lines_for_level('warning'))
|
||||
self.assertFalse(broker.logger.get_lines_for_level('error'))
|
||||
|
@ -5112,12 +5112,11 @@ class TestSharder(BaseTestSharder):
|
|||
own_sr.epoch = epoch
|
||||
broker.merge_shard_ranges([own_sr])
|
||||
with self._mock_sharder() as sharder:
|
||||
with mock_timestamp_now() as now:
|
||||
with mock_timestamp_now():
|
||||
sharder._process_broker(broker, node, 99)
|
||||
own_shard_range = broker.get_own_shard_range(
|
||||
no_default=True)
|
||||
self.assertEqual(dict(own_sr, meta_timestamp=now),
|
||||
dict(own_shard_range))
|
||||
self.assertEqual(dict(own_sr), dict(own_shard_range))
|
||||
self.assertEqual(UNSHARDED, broker.get_db_state())
|
||||
if epoch:
|
||||
self.assertFalse(broker.logger.get_lines_for_level('warning'))
|
||||
|
@ -5148,15 +5147,14 @@ class TestSharder(BaseTestSharder):
|
|||
own_sr.epoch = epoch
|
||||
broker.merge_shard_ranges([own_sr])
|
||||
with self._mock_sharder() as sharder:
|
||||
with mock_timestamp_now() as now:
|
||||
with mock_timestamp_now():
|
||||
# we're not testing rest of the process here so prevent any
|
||||
# attempt to progress shard range states
|
||||
sharder._create_shard_containers = lambda *args: 0
|
||||
sharder._process_broker(broker, node, 99)
|
||||
own_shard_range = broker.get_own_shard_range(no_default=True)
|
||||
|
||||
self.assertEqual(dict(own_sr, meta_timestamp=now),
|
||||
dict(own_shard_range))
|
||||
self.assertEqual(dict(own_sr), dict(own_shard_range))
|
||||
self.assertEqual(SHARDING, broker.get_db_state())
|
||||
self.assertEqual(epoch.normal, parse_db_filename(broker.db_file)[1])
|
||||
self.assertFalse(broker.logger.get_lines_for_level('warning'))
|
||||
|
@ -5752,14 +5750,13 @@ class TestSharder(BaseTestSharder):
|
|||
self.assertTrue(shard_ranges[1].update_state(ShardRange.ACTIVE,
|
||||
state_timestamp=root_ts))
|
||||
shard_ranges[1].timestamp = root_ts
|
||||
with mock_timestamp_now() as ts_now:
|
||||
with mock_timestamp_now():
|
||||
sharder, mock_swift = self.call_audit_container(
|
||||
broker, shard_ranges)
|
||||
self._assert_stats(expected_stats, sharder, 'audit_shard')
|
||||
self.assertEqual(['Updating own shard range from root'],
|
||||
sharder.logger.get_lines_for_level('debug'))
|
||||
own_shard_range.meta_timestamp = ts_now
|
||||
expected = shard_ranges[1].copy(meta_timestamp=ts_now)
|
||||
expected = shard_ranges[1].copy()
|
||||
self.assertEqual(['Updated own shard range from %s to %s'
|
||||
% (own_shard_range, expected)],
|
||||
sharder.logger.get_lines_for_level('info'))
|
||||
|
@ -5854,15 +5851,14 @@ class TestSharder(BaseTestSharder):
|
|||
root_ts = next(self.ts_iter)
|
||||
shard_ranges[1].update_state(ShardRange.SHARDING,
|
||||
state_timestamp=root_ts)
|
||||
with mock_timestamp_now() as ts_now:
|
||||
with mock_timestamp_now():
|
||||
sharder, mock_swift = self.call_audit_container(
|
||||
broker, shard_ranges)
|
||||
self.assert_no_audit_messages(sharder, mock_swift)
|
||||
self.assertFalse(broker.is_deleted())
|
||||
self.assertEqual(['Updating own shard range from root'],
|
||||
sharder.logger.get_lines_for_level('debug'))
|
||||
own_shard_range.meta_timestamp = ts_now
|
||||
expected = shard_ranges[1].copy(meta_timestamp=ts_now)
|
||||
expected = shard_ranges[1].copy()
|
||||
self.assertEqual(['Updated own shard range from %s to %s'
|
||||
% (own_shard_range, expected)],
|
||||
sharder.logger.get_lines_for_level('info'))
|
||||
|
@ -6783,8 +6779,7 @@ class TestSharder(BaseTestSharder):
|
|||
# children ranges from root are merged
|
||||
self._assert_shard_ranges_equal(child_srs, broker.get_shard_ranges())
|
||||
# own sr from root is merged
|
||||
self.assertEqual(dict(parent_sr, meta_timestamp=mock.ANY),
|
||||
dict(broker.get_own_shard_range()))
|
||||
self.assertEqual(dict(parent_sr), dict(broker.get_own_shard_range()))
|
||||
self.assertFalse(sharder.logger.get_lines_for_level('warning'))
|
||||
self.assertFalse(sharder.logger.get_lines_for_level('error'))
|
||||
|
||||
|
@ -6801,8 +6796,7 @@ class TestSharder(BaseTestSharder):
|
|||
# children ranges from root are merged
|
||||
self._assert_shard_ranges_equal(child_srs, broker.get_shard_ranges())
|
||||
# own sr from root is merged
|
||||
self.assertEqual(dict(parent_sr, meta_timestamp=mock.ANY),
|
||||
dict(broker.get_own_shard_range()))
|
||||
self.assertEqual(dict(parent_sr), dict(broker.get_own_shard_range()))
|
||||
self.assertFalse(sharder.logger.get_lines_for_level('warning'))
|
||||
self.assertFalse(sharder.logger.get_lines_for_level('error'))
|
||||
|
||||
|
@ -6822,8 +6816,7 @@ class TestSharder(BaseTestSharder):
|
|||
# children ranges from root are NOT merged
|
||||
self._assert_shard_ranges_equal(child_srs, broker.get_shard_ranges())
|
||||
# own sr from root is merged
|
||||
self.assertEqual(dict(parent_sr, meta_timestamp=mock.ANY),
|
||||
dict(broker.get_own_shard_range()))
|
||||
self.assertEqual(dict(parent_sr), dict(broker.get_own_shard_range()))
|
||||
self.assertFalse(sharder.logger.get_lines_for_level('warning'))
|
||||
self.assertFalse(sharder.logger.get_lines_for_level('error'))
|
||||
|
||||
|
@ -6950,8 +6943,10 @@ class TestSharder(BaseTestSharder):
|
|||
with self._mock_sharder(
|
||||
conf={'shard_container_threshold': 2}) as sharder:
|
||||
with mock_timestamp_now() as now:
|
||||
own_sr = update_own_shard_range_stats(
|
||||
broker, broker.get_own_shard_range())
|
||||
sharder._find_and_enable_sharding_candidates(
|
||||
broker, [broker.get_own_shard_range()])
|
||||
broker, [own_sr])
|
||||
own_sr = broker.get_own_shard_range()
|
||||
self.assertEqual(ShardRange.SHARDING, own_sr.state)
|
||||
self.assertEqual(now, own_sr.state_timestamp)
|
||||
|
@ -6961,8 +6956,10 @@ class TestSharder(BaseTestSharder):
|
|||
with self._mock_sharder(
|
||||
conf={'shard_container_threshold': 2}) as sharder:
|
||||
with mock_timestamp_now():
|
||||
own_sr = update_own_shard_range_stats(
|
||||
broker, broker.get_own_shard_range())
|
||||
sharder._find_and_enable_sharding_candidates(
|
||||
broker, [broker.get_own_shard_range()])
|
||||
broker, [own_sr])
|
||||
own_sr = broker.get_own_shard_range()
|
||||
self.assertEqual(ShardRange.SHARDING, own_sr.state)
|
||||
self.assertEqual(now, own_sr.state_timestamp)
|
||||
|
@ -7341,7 +7338,7 @@ class TestSharder(BaseTestSharder):
|
|||
'found': 3,
|
||||
'top': [
|
||||
{
|
||||
'object_count': mock.ANY,
|
||||
'object_count': 500000,
|
||||
'account': brokers[C3].account,
|
||||
'meta_timestamp': mock.ANY,
|
||||
'container': brokers[C3].container,
|
||||
|
@ -7351,7 +7348,7 @@ class TestSharder(BaseTestSharder):
|
|||
'node_index': 0,
|
||||
'compactible_ranges': 3
|
||||
}, {
|
||||
'object_count': mock.ANY,
|
||||
'object_count': 2500000,
|
||||
'account': brokers[C2].account,
|
||||
'meta_timestamp': mock.ANY,
|
||||
'container': brokers[C2].container,
|
||||
|
@ -7361,7 +7358,7 @@ class TestSharder(BaseTestSharder):
|
|||
'node_index': 0,
|
||||
'compactible_ranges': 2
|
||||
}, {
|
||||
'object_count': mock.ANY,
|
||||
'object_count': 2999999,
|
||||
'account': brokers[C1].account,
|
||||
'meta_timestamp': mock.ANY,
|
||||
'container': brokers[C1].container,
|
||||
|
@ -8850,6 +8847,31 @@ class TestSharderFunctions(BaseTestSharder):
|
|||
ranges[3])},
|
||||
overlapping_ranges)
|
||||
|
||||
def test_update_own_shard_range_stats(self):
|
||||
broker = self._make_broker()
|
||||
ts = next(self.ts_iter)
|
||||
broker.merge_items([
|
||||
{'name': 'obj%02d' % i, 'created_at': ts.internal, 'size': 9,
|
||||
'content_type': 'application/octet-stream', 'etag': 'not-really',
|
||||
'deleted': 0, 'storage_policy_index': 0,
|
||||
'ctype_timestamp': ts.internal, 'meta_timestamp': ts.internal}
|
||||
for i in range(100)])
|
||||
|
||||
self.assertEqual(100, broker.get_info()['object_count'])
|
||||
self.assertEqual(900, broker.get_info()['bytes_used'])
|
||||
|
||||
own_sr = broker.get_own_shard_range()
|
||||
self.assertEqual(0, own_sr.object_count)
|
||||
self.assertEqual(0, own_sr.bytes_used)
|
||||
# own_sr is updated...
|
||||
update_own_shard_range_stats(broker, own_sr)
|
||||
self.assertEqual(100, own_sr.object_count)
|
||||
self.assertEqual(900, own_sr.bytes_used)
|
||||
# ...but not persisted
|
||||
own_sr = broker.get_own_shard_range()
|
||||
self.assertEqual(0, own_sr.object_count)
|
||||
self.assertEqual(0, own_sr.bytes_used)
|
||||
|
||||
|
||||
class TestContainerSharderConf(unittest.TestCase):
|
||||
def test_default(self):
|
||||
|
|
Loading…
Reference in New Issue