Merge "Consider tombstone count before shrinking a shard"

This commit is contained in:
Zuul 2021-05-10 05:49:18 +00:00 committed by Gerrit Code Review
commit 876cf34862
13 changed files with 854 additions and 192 deletions

View File

@ -208,9 +208,10 @@ class InvalidSolutionException(ManageShardRangesException):
def _print_shard_range(sr, level=0):
indent = ' ' * level
print(indent + '%r' % sr.name)
print(indent + ' objects: %9d lower: %r' % (sr.object_count,
sr.lower_str))
print(indent + ' state: %9s upper: %r' % (sr.state_text, sr.upper_str))
print(indent + ' objects: %9d, tombstones: %9d, lower: %r'
% (sr.object_count, sr.tombstones, sr.lower_str))
print(indent + ' state: %9s, upper: %r'
% (sr.state_text, sr.upper_str))
@contextmanager
@ -504,8 +505,8 @@ def compact_shard_ranges(broker, args):
for sequence in compactible:
acceptor = sequence[-1]
donors = sequence[:-1]
print('Donor shard range(s) with total of %d objects:'
% donors.object_count)
print('Donor shard range(s) with total of %d rows:'
% donors.row_count)
for donor in donors:
_print_shard_range(donor, level=1)
print('can be compacted into acceptor shard range:')

View File

@ -227,6 +227,82 @@ def get_db_connection(path, timeout=30, logger=None, okay_to_create=False):
return conn
class TombstoneReclaimer(object):
"""Encapsulates reclamation of deleted rows in a database."""
def __init__(self, broker, age_timestamp):
"""
Encapsulates reclamation of deleted rows in a database.
:param broker: an instance of :class:`~swift.common.db.DatabaseBroker`.
:param age_timestamp: a float timestamp: tombstones older than this
time will be deleted.
"""
self.broker = broker
self.age_timestamp = age_timestamp
self.marker = ''
self.remaining_tombstones = self.reclaimed = 0
self.finished = False
# limit 1 offset N gives back the N+1th matching row; that row is used
# as an exclusive end_marker for a batch of deletes, so a batch
# comprises rows satisfying self.marker <= name < end_marker.
self.batch_query = '''
SELECT name FROM %s WHERE deleted = 1
AND name >= ?
ORDER BY NAME LIMIT 1 OFFSET ?
''' % self.broker.db_contains_type
self.clean_batch_query = '''
DELETE FROM %s WHERE deleted = 1
AND name >= ? AND %s < %s
''' % (self.broker.db_contains_type, self.broker.db_reclaim_timestamp,
self.age_timestamp)
def _reclaim(self, conn):
curs = conn.execute(self.batch_query, (self.marker, RECLAIM_PAGE_SIZE))
row = curs.fetchone()
end_marker = row[0] if row else ''
if end_marker:
# do a single book-ended DELETE and bounce out
curs = conn.execute(self.clean_batch_query + ' AND name < ?',
(self.marker, end_marker))
self.marker = end_marker
self.reclaimed += curs.rowcount
self.remaining_tombstones += RECLAIM_PAGE_SIZE - curs.rowcount
else:
# delete off the end
curs = conn.execute(self.clean_batch_query, (self.marker,))
self.finished = True
self.reclaimed += curs.rowcount
def reclaim(self):
"""
Perform reclaim of deleted rows older than ``age_timestamp``.
"""
while not self.finished:
with self.broker.get() as conn:
self._reclaim(conn)
conn.commit()
def get_tombstone_count(self):
"""
Return the number of remaining tombstones newer than ``age_timestamp``.
Executes the ``reclaim`` method if it has not already been called on
this instance.
:return: The number of tombstones in the ``broker`` that are newer than
``age_timestamp``.
"""
if not self.finished:
self.reclaim()
with self.broker.get() as conn:
curs = conn.execute('''
SELECT COUNT(*) FROM %s WHERE deleted = 1
AND name >= ?
''' % (self.broker.db_contains_type,), (self.marker,))
tombstones = curs.fetchone()[0]
self.remaining_tombstones += tombstones
return self.remaining_tombstones
class DatabaseBroker(object):
"""Encapsulates working with a database."""
@ -988,47 +1064,22 @@ class DatabaseBroker(object):
with lock_parent_directory(self.pending_file,
self.pending_timeout):
self._commit_puts()
marker = ''
finished = False
while not finished:
with self.get() as conn:
marker = self._reclaim(conn, age_timestamp, marker)
if not marker:
finished = True
self._reclaim_other_stuff(
conn, age_timestamp, sync_timestamp)
conn.commit()
tombstone_reclaimer = TombstoneReclaimer(self, age_timestamp)
tombstone_reclaimer.reclaim()
with self.get() as conn:
self._reclaim_other_stuff(conn, age_timestamp, sync_timestamp)
conn.commit()
return tombstone_reclaimer
def _reclaim_other_stuff(self, conn, age_timestamp, sync_timestamp):
"""
This is only called once at the end of reclaim after _reclaim has been
called for each page.
This is only called once at the end of reclaim after tombstone reclaim
has been completed.
"""
self._reclaim_sync(conn, sync_timestamp)
self._reclaim_metadata(conn, age_timestamp)
def _reclaim(self, conn, age_timestamp, marker):
clean_batch_qry = '''
DELETE FROM %s WHERE deleted = 1
AND name >= ? AND %s < ?
''' % (self.db_contains_type, self.db_reclaim_timestamp)
curs = conn.execute('''
SELECT name FROM %s WHERE deleted = 1
AND name >= ?
ORDER BY NAME LIMIT 1 OFFSET ?
''' % (self.db_contains_type,), (marker, RECLAIM_PAGE_SIZE))
row = curs.fetchone()
if row:
# do a single book-ended DELETE and bounce out
end_marker = row[0]
conn.execute(clean_batch_qry + ' AND name < ?', (
marker, age_timestamp, end_marker))
else:
# delete off the end and reset marker to indicate we're done
end_marker = ''
conn.execute(clean_batch_qry, (marker, age_timestamp))
return end_marker
def _reclaim_sync(self, conn, sync_timestamp):
try:
conn.execute('''

View File

@ -566,6 +566,12 @@ class Replicator(Daemon):
self.logger.debug('Successfully deleted db %s', broker.db_file)
return True
def _reclaim(self, broker, now=None):
if not now:
now = time.time()
return broker.reclaim(now - self.reclaim_age,
now - (self.reclaim_age * 2))
def _replicate_object(self, partition, object_file, node_id):
"""
Replicate the db, choosing method based on whether or not it
@ -591,8 +597,7 @@ class Replicator(Daemon):
try:
broker = self.brokerclass(object_file, pending_timeout=30,
logger=self.logger)
broker.reclaim(now - self.reclaim_age,
now - (self.reclaim_age * 2))
self._reclaim(broker, now)
info = broker.get_replication_info()
bpart = self.ring.get_part(
info['account'], info.get('container'))

View File

@ -5042,6 +5042,8 @@ class ShardRange(object):
sharding was enabled for a container.
:param reported: optional indicator that this shard and its stats have
been reported to the root container.
:param tombstones: the number of tombstones in the shard range; defaults to
-1 to indicate that the value is unknown.
"""
FOUND = 10
CREATED = 20
@ -5079,7 +5081,7 @@ class ShardRange(object):
def __init__(self, name, timestamp, lower=MIN, upper=MAX,
object_count=0, bytes_used=0, meta_timestamp=None,
deleted=False, state=None, state_timestamp=None, epoch=None,
reported=False):
reported=False, tombstones=-1):
self.account = self.container = self._timestamp = \
self._meta_timestamp = self._state_timestamp = self._epoch = None
self._lower = ShardRange.MIN
@ -5099,6 +5101,7 @@ class ShardRange(object):
self.state_timestamp = state_timestamp
self.epoch = epoch
self.reported = reported
self.tombstones = tombstones
@classmethod
def sort_key(cls, sr):
@ -5274,6 +5277,24 @@ class ShardRange(object):
raise ValueError('bytes_used cannot be < 0')
self._bytes = bytes_used
@property
def tombstones(self):
return self._tombstones
@tombstones.setter
def tombstones(self, tombstones):
self._tombstones = int(tombstones)
@property
def row_count(self):
"""
Returns the total number of rows in the shard range i.e. the sum of
objects and tombstones.
:return: the row count
"""
return self.object_count + max(self.tombstones, 0)
def update_meta(self, object_count, bytes_used, meta_timestamp=None):
"""
Set the object stats metadata to the given values and update the
@ -5300,6 +5321,27 @@ class ShardRange(object):
else:
self.meta_timestamp = meta_timestamp
def update_tombstones(self, tombstones, meta_timestamp=None):
"""
Set the tombstones metadata to the given values and update the
meta_timestamp to the current time.
:param tombstones: should be an integer
:param meta_timestamp: timestamp for metadata; if not given the
current time will be set.
:raises ValueError: if ``tombstones`` cannot be cast to an int, or
if meta_timestamp is neither None nor can be cast to a
:class:`~swift.common.utils.Timestamp`.
"""
tombstones = int(tombstones)
if 0 <= tombstones != self.tombstones:
self.tombstones = tombstones
self.reported = False
if meta_timestamp is None:
self.meta_timestamp = Timestamp.now()
else:
self.meta_timestamp = meta_timestamp
def increment_meta(self, object_count, bytes_used):
"""
Increment the object stats metadata by the given values and update the
@ -5518,6 +5560,7 @@ class ShardRange(object):
yield 'state_timestamp', self.state_timestamp.internal
yield 'epoch', self.epoch.internal if self.epoch is not None else None
yield 'reported', 1 if self.reported else 0
yield 'tombstones', self.tombstones
def copy(self, timestamp=None, **kwargs):
"""
@ -5550,7 +5593,7 @@ class ShardRange(object):
params['upper'], params['object_count'], params['bytes_used'],
params['meta_timestamp'], params['deleted'], params['state'],
params['state_timestamp'], params['epoch'],
params.get('reported', 0))
params.get('reported', 0), params.get('tombstones', -1))
def expand(self, donors):
"""
@ -5625,6 +5668,15 @@ class ShardRangeList(UserList):
"""
return sum(sr.object_count for sr in self)
@property
def row_count(self):
"""
Returns the total number of rows of all items in the list.
:return: total row count
"""
return sum(sr.row_count for sr in self)
@property
def bytes_used(self):
"""

View File

@ -66,7 +66,7 @@ SHARD_AUDITING_STATES = [ShardRange.CREATED, ShardRange.CLEAVED,
# tuples and vice-versa
SHARD_RANGE_KEYS = ('name', 'timestamp', 'lower', 'upper', 'object_count',
'bytes_used', 'meta_timestamp', 'deleted', 'state',
'state_timestamp', 'epoch', 'reported')
'state_timestamp', 'epoch', 'reported', 'tombstones')
POLICY_STAT_TABLE_CREATE = '''
CREATE TABLE policy_stat (
@ -287,6 +287,7 @@ def merge_shards(shard_data, existing):
if existing['meta_timestamp'] >= shard_data['meta_timestamp']:
for k in ('object_count', 'bytes_used', 'meta_timestamp'):
shard_data[k] = existing[k]
shard_data['tombstones'] = existing.get('tombstones', -1)
else:
new_content = True
@ -294,6 +295,7 @@ def merge_shards(shard_data, existing):
if existing['reported'] and \
existing['object_count'] == shard_data['object_count'] and \
existing['bytes_used'] == shard_data['bytes_used'] and \
existing.get('tombstones', -1) == shard_data['tombstones'] and \
existing['state'] == shard_data['state'] and \
existing['epoch'] == shard_data['epoch']:
shard_data['reported'] = 1
@ -618,7 +620,8 @@ class ContainerBroker(DatabaseBroker):
state INTEGER,
state_timestamp TEXT,
epoch TEXT,
reported INTEGER DEFAULT 0
reported INTEGER DEFAULT 0,
tombstones INTEGER DEFAULT -1
);
""" % SHARD_RANGE_TABLE)
@ -1450,22 +1453,34 @@ class ContainerBroker(DatabaseBroker):
for item in to_add.values()))
conn.commit()
migrations = {
'no such column: reported':
self._migrate_add_shard_range_reported,
'no such column: tombstones':
self._migrate_add_shard_range_tombstones,
('no such table: %s' % SHARD_RANGE_TABLE):
self.create_shard_range_table,
}
migrations_done = set()
with self.get() as conn:
try:
return _really_merge_items(conn)
except sqlite3.OperationalError as err:
# Without the rollback, new enough (>= py37) python/sqlite3
# will panic:
# sqlite3.OperationalError: cannot start a transaction
# within a transaction
conn.rollback()
if 'no such column: reported' in str(err):
self._migrate_add_shard_range_reported(conn)
while True:
try:
return _really_merge_items(conn)
if ('no such table: %s' % SHARD_RANGE_TABLE) in str(err):
self.create_shard_range_table(conn)
return _really_merge_items(conn)
raise
except sqlite3.OperationalError as err:
# Without the rollback, new enough (>= py37) python/sqlite3
# will panic:
# sqlite3.OperationalError: cannot start a transaction
# within a transaction
conn.rollback()
for err_str, migration in migrations.items():
if err_str in migrations_done:
continue
if err_str in str(err):
migration(conn)
migrations_done.add(err_str)
break
else:
raise
def get_reconciler_sync(self):
with self.get() as conn:
@ -1624,6 +1639,17 @@ class ContainerBroker(DatabaseBroker):
COMMIT;
''' % SHARD_RANGE_TABLE)
def _migrate_add_shard_range_tombstones(self, conn):
"""
Add the tombstones column to the 'shard_range' table.
"""
conn.executescript('''
BEGIN;
ALTER TABLE %s
ADD COLUMN tombstones INTEGER DEFAULT -1;
COMMIT;
''' % SHARD_RANGE_TABLE)
def _reclaim_other_stuff(self, conn, age_timestamp, sync_timestamp):
super(ContainerBroker, self)._reclaim_other_stuff(
conn, age_timestamp, sync_timestamp)
@ -1673,7 +1699,11 @@ class ContainerBroker(DatabaseBroker):
elif states is not None:
included_states.add(states)
def do_query(conn, use_reported_column=True):
# defaults to be used when legacy db's are missing columns
default_values = {'reported': 0,
'tombstones': -1}
def do_query(conn, defaults=None):
condition = ''
conditions = []
params = []
@ -1691,10 +1721,13 @@ class ContainerBroker(DatabaseBroker):
params.append(self.path)
if conditions:
condition = ' WHERE ' + ' AND '.join(conditions)
if use_reported_column:
columns = SHARD_RANGE_KEYS
else:
columns = SHARD_RANGE_KEYS[:-1] + ('0 as reported', )
columns = SHARD_RANGE_KEYS[:-2]
for column in SHARD_RANGE_KEYS[-2:]:
if column in defaults:
columns += (('%s as %s' %
(default_values[column], column)),)
else:
columns += (column,)
sql = '''
SELECT %s
FROM %s%s;
@ -1704,14 +1737,26 @@ class ContainerBroker(DatabaseBroker):
return [row for row in data]
with self.maybe_get(connection) as conn:
try:
return do_query(conn)
except sqlite3.OperationalError as err:
if ('no such table: %s' % SHARD_RANGE_TABLE) in str(err):
return []
if 'no such column: reported' in str(err):
return do_query(conn, use_reported_column=False)
raise
defaults = set()
attempts = len(default_values) + 1
while attempts:
attempts -= 1
try:
return do_query(conn, defaults)
except sqlite3.OperationalError as err:
if ('no such table: %s' % SHARD_RANGE_TABLE) in str(err):
return []
if not attempts:
raise
new_defaults = set()
for column in default_values.keys():
if 'no such column: %s' % column in str(err):
new_defaults.add(column)
if not new_defaults:
raise
if new_defaults.intersection(defaults):
raise
defaults.update(new_defaults)
@classmethod
def resolve_shard_range_states(cls, states):

View File

@ -120,6 +120,8 @@ def find_overlapping_ranges(shard_ranges):
def is_sharding_candidate(shard_range, threshold):
# note: use *object* count as the condition for sharding: tombstones will
# eventually be reclaimed so should not trigger sharding
return (shard_range.state == ShardRange.ACTIVE and
shard_range.object_count >= threshold)
@ -127,10 +129,13 @@ def is_sharding_candidate(shard_range, threshold):
def is_shrinking_candidate(shard_range, shrink_threshold, merge_size,
states=None):
# typically shrink_threshold < merge_size but check both just in case
# note: use *row* count (objects plus tombstones) as the condition for
# shrinking to avoid inadvertently moving large numbers of tombstones into
# an acceptor
states = states or (ShardRange.ACTIVE,)
return (shard_range.state in states and
shard_range.object_count < shrink_threshold and
shard_range.object_count <= merge_size)
shard_range.row_count < shrink_threshold and
shard_range.row_count <= merge_size)
def find_sharding_candidates(broker, threshold, shard_ranges=None):
@ -186,6 +191,8 @@ def find_compactible_shard_sequences(broker,
compacted into each acceptor; -1 implies unlimited.
:param max_expanding: the maximum number of acceptors to be found (i.e. the
maximum number of sequences to be returned); -1 implies unlimited.
:param include_shrinking: if True then existing compactible sequences are
included in the results; default is False.
:returns: A list of :class:`~swift.common.utils.ShardRangeList` each
containing a sequence of neighbouring shard ranges that may be
compacted; the final shard range in the list is the acceptor
@ -196,10 +203,6 @@ def find_compactible_shard_sequences(broker,
# First cut is simple: assume root container shard usage stats are good
# enough to make decision; only merge with upper neighbour so that
# upper bounds never change (shard names include upper bound).
# TODO: object counts may well not be the appropriate metric for
# deciding to shrink because a shard with low object_count may have a
# large number of deleted object rows that will need to be merged with
# a neighbour. We may need to expose row count as well as object count.
shard_ranges = broker.get_shard_ranges()
own_shard_range = broker.get_own_shard_range()
@ -216,7 +219,7 @@ def find_compactible_shard_sequences(broker,
sequence[-1], shrink_threshold, merge_size,
states=(ShardRange.ACTIVE, ShardRange.SHRINKING)) or
0 < max_shrinking < len(sequence) or
sequence.object_count >= merge_size)):
sequence.row_count >= merge_size)):
return True
return False
@ -250,7 +253,7 @@ def find_compactible_shard_sequences(broker,
if shard_range.state == ShardRange.SHRINKING:
# already shrinking: add to sequence unconditionally
sequence.append(shard_range)
elif (sequence.object_count + shard_range.object_count
elif (sequence.row_count + shard_range.row_count
<= merge_size):
# add to sequence: could be a donor or acceptor
sequence.append(shard_range)
@ -1839,7 +1842,18 @@ class ContainerSharder(ContainerReplicator):
def _update_root_container(self, broker):
own_shard_range = broker.get_own_shard_range(no_default=True)
if not own_shard_range or own_shard_range.reported:
if not own_shard_range:
return
# do a reclaim *now* in order to get best estimate of tombstone count
# that is consistent with the current object_count
reclaimer = self._reclaim(broker)
tombstones = reclaimer.get_tombstone_count()
self.logger.debug('tombstones in %s = %d',
quote(broker.path), tombstones)
own_shard_range.update_tombstones(tombstones)
if own_shard_range.reported:
return
# persist the reported shard metadata

View File

@ -1682,8 +1682,13 @@ class TestContainerSharding(BaseTestContainerSharding):
orig_range_data, range_data,
excludes=['meta_timestamp', 'state_timestamp'])
# ...until the sharders run and update root
self.run_sharders(orig_shard_ranges[0])
# ...until the sharders run and update root; reclaim tombstones so
# that the shard is shrinkable
shard_0_part = self.get_part_and_node_numbers(
orig_shard_ranges[0])[0]
for conf_index in self.configs['container-sharder'].keys():
self.run_custom_sharder(conf_index, {'reclaim_age': 0},
override_partitions=[shard_0_part])
exp_obj_count = len(second_shard_objects) + 1
self.assert_container_object_count(exp_obj_count)
self.assert_container_listing([alpha] + second_shard_objects)
@ -1748,15 +1753,37 @@ class TestContainerSharding(BaseTestContainerSharding):
self.assert_container_listing([alpha])
# runs sharders so second range shrinks away, requires up to 3
# cycles
self.sharders.once() # shard updates root stats
# run sharders: second range should not shrink away yet because it
# has tombstones
self.sharders.once() # second shard updates root stats
self.assert_container_listing([alpha])
self.sharders.once() # root finds shrinkable shard
self.assert_container_listing([alpha])
self.sharders.once() # shards shrink themselves
self.assert_container_listing([alpha])
# the acceptor shard is intact...
shard_nodes_data = self.direct_get_container_shard_ranges(
orig_shard_ranges[1].account, orig_shard_ranges[1].container)
obj_count, bytes_used = check_shard_nodes_data(shard_nodes_data)
self.assertEqual(1, obj_count)
# run sharders to reclaim tombstones so that the second shard is
# shrinkable
shard_1_part = self.get_part_and_node_numbers(
orig_shard_ranges[1])[0]
for conf_index in self.configs['container-sharder'].keys():
self.run_custom_sharder(conf_index, {'reclaim_age': 0},
override_partitions=[shard_1_part])
self.assert_container_listing([alpha])
# run sharders so second range shrinks away, requires up to 2
# cycles
self.sharders.once() # root finds shrinkable shard
self.assert_container_listing([alpha])
self.sharders.once() # shards shrink themselves
self.assert_container_listing([alpha])
# the second shard range has sharded and is empty
shard_nodes_data = self.direct_get_container_shard_ranges(
orig_shard_ranges[1].account, orig_shard_ranges[1].container)
@ -2215,10 +2242,15 @@ class TestContainerSharding(BaseTestContainerSharding):
self.assert_container_listing([])
self.assert_container_post_ok('has objects')
# run sharder on shard containers to update root stats
# run sharder on shard containers to update root stats; reclaim
# the tombstones so that the shards appear to be shrinkable
shard_ranges = self.get_container_shard_ranges()
self.assertLengthEqual(shard_ranges, 2)
self.run_sharders(shard_ranges)
shard_partitions = [self.get_part_and_node_numbers(sr)[0]
for sr in shard_ranges]
for conf_index in self.configs['container-sharder'].keys():
self.run_custom_sharder(conf_index, {'reclaim_age': 0},
override_partitions=shard_partitions)
self.assert_container_object_count(0)
# First, test a misplaced object moving from one shard to another.
@ -2349,8 +2381,12 @@ class TestContainerSharding(BaseTestContainerSharding):
self.assert_container_listing(shard_1_objects)
self.assert_container_post_ok('has objects')
# run sharder on first shard container to update root stats
self.run_sharders(shard_ranges[0])
# run sharder on first shard container to update root stats; reclaim
# the tombstones so that the shard appears to be shrinkable
shard_0_part = self.get_part_and_node_numbers(shard_ranges[0])[0]
for conf_index in self.configs['container-sharder'].keys():
self.run_custom_sharder(conf_index, {'reclaim_age': 0},
override_partitions=[shard_0_part])
self.assert_container_object_count(len(shard_1_objects))
# First, test a misplaced object moving from one shard to another.
@ -2384,10 +2420,13 @@ class TestContainerSharding(BaseTestContainerSharding):
# Now we have just one active shard, test a misplaced object moving
# from that shard to the root.
# delete most objects from second shard range and run sharder on root
# to discover second shrink candidate
# delete most objects from second shard range, reclaim the tombstones,
# and run sharder on root to discover second shrink candidate
self.delete_objects(shard_1_objects)
self.run_sharders(shard_ranges[1])
shard_1_part = self.get_part_and_node_numbers(shard_ranges[1])[0]
for conf_index in self.configs['container-sharder'].keys():
self.run_custom_sharder(conf_index, {'reclaim_age': 0},
override_partitions=[shard_1_part])
self.sharders.once(additional_args='--partitions=%s' % self.brain.part)
# then run sharder on the shard node to shrink it to root - note this
# moves alpha to the root db
@ -2457,7 +2496,10 @@ class TestContainerSharding(BaseTestContainerSharding):
self.assert_container_post_ok('has objects')
# run sharder on first shard container to update root stats
self.run_sharders(shard_ranges[0])
shard_0_part = self.get_part_and_node_numbers(shard_ranges[0])[0]
for conf_index in self.configs['container-sharder'].keys():
self.run_custom_sharder(conf_index, {'reclaim_age': 0},
override_partitions=[shard_0_part])
self.assert_container_object_count(len(shard_1_objects))
# First, test a misplaced object moving from one shard to another.

View File

@ -36,7 +36,7 @@ import six
from swift.account.backend import AccountBroker
from swift.common.utils import Timestamp
from test.unit import patch_policies, with_tempdir, make_timestamp_iter
from swift.common.db import DatabaseConnectionError
from swift.common.db import DatabaseConnectionError, TombstoneReclaimer
from swift.common.request_helpers import get_reserved_name
from swift.common.storage_policy import StoragePolicy, POLICIES
from swift.common.utils import md5
@ -218,15 +218,17 @@ class TestAccountBroker(unittest.TestCase):
self.assertEqual(count_reclaimable(conn, reclaim_age),
num_of_containers / 4)
orig__reclaim = broker._reclaim
trace = []
def tracing_reclaim(conn, age_timestamp, marker):
trace.append((age_timestamp, marker,
count_reclaimable(conn, age_timestamp)))
return orig__reclaim(conn, age_timestamp, marker)
class TracingReclaimer(TombstoneReclaimer):
def _reclaim(self, conn):
trace.append(
(self.age_timestamp, self.marker,
count_reclaimable(conn, self.age_timestamp)))
return super(TracingReclaimer, self)._reclaim(conn)
with mock.patch.object(broker, '_reclaim', new=tracing_reclaim), \
with mock.patch(
'swift.common.db.TombstoneReclaimer', TracingReclaimer), \
mock.patch('swift.common.db.RECLAIM_PAGE_SIZE', 10):
broker.reclaim(reclaim_age, reclaim_age)
with broker.get() as conn:

View File

@ -429,6 +429,7 @@ class TestManageShardRanges(unittest.TestCase):
' "state": "sharding",',
' "state_timestamp": "%s",' % now.internal,
' "timestamp": "%s",' % now.internal,
' "tombstones": -1,',
' "upper": ""',
'}',
'db_state = sharding',
@ -472,6 +473,7 @@ class TestManageShardRanges(unittest.TestCase):
' "state": "sharding",',
' "state_timestamp": "%s",' % now.internal,
' "timestamp": "%s",' % now.internal,
' "tombstones": -1,',
' "upper": ""',
'}',
'db_state = sharded',
@ -861,12 +863,37 @@ class TestManageShardRanges(unittest.TestCase):
broker = self._make_broker()
shard_ranges = make_shard_ranges(broker, self.shard_data, '.shards_')
for i, sr in enumerate(shard_ranges):
sr.tombstones = 999
if i not in small_ranges:
sr.object_count = 100001
sr.update_state(ShardRange.ACTIVE)
broker.merge_shard_ranges(shard_ranges)
self._move_broker_to_sharded_state(broker)
expected_base = [
'Donor shard range(s) with total of 2018 rows:',
" '.shards_a",
" objects: 10, tombstones: 999, lower: 'obj29'",
" state: active, upper: 'obj39'",
" '.shards_a",
" objects: 10, tombstones: 999, lower: 'obj39'",
" state: active, upper: 'obj49'",
'can be compacted into acceptor shard range:',
" '.shards_a",
" objects: 100001, tombstones: 999, lower: 'obj49'",
" state: active, upper: 'obj59'",
'Donor shard range(s) with total of 1009 rows:',
" '.shards_a",
" objects: 10, tombstones: 999, lower: 'obj69'",
" state: active, upper: 'obj79'",
'can be compacted into acceptor shard range:',
" '.shards_a",
" objects: 100001, tombstones: 999, lower: 'obj79'",
" state: active, upper: 'obj89'",
'Once applied to the broker these changes will result in '
'shard range compaction the next time the sharder runs.',
]
def do_compact(user_input, exit_code):
out = StringIO()
err = StringIO()
@ -880,29 +907,7 @@ class TestManageShardRanges(unittest.TestCase):
err_lines = err.getvalue().split('\n')
self.assert_starts_with(err_lines[0], 'Loaded db broker for ')
out_lines = out.getvalue().split('\n')
expected = [
'Donor shard range(s) with total of 20 objects:',
" '.shards_a",
" objects: 10 lower: 'obj29'",
" state: active upper: 'obj39'",
" '.shards_a",
" objects: 10 lower: 'obj39'",
" state: active upper: 'obj49'",
'can be compacted into acceptor shard range:',
" '.shards_a",
" objects: 100001 lower: 'obj49'",
" state: active upper: 'obj59'",
'Donor shard range(s) with total of 10 objects:',
" '.shards_a",
" objects: 10 lower: 'obj69'",
" state: active upper: 'obj79'",
'can be compacted into acceptor shard range:',
" '.shards_a",
" objects: 100001 lower: 'obj79'",
" state: active upper: 'obj89'",
'Once applied to the broker these changes will result in '
'shard range compaction the next time the sharder runs.',
]
expected = list(expected_base)
if user_input == 'yes':
expected.extend([
'Updated 2 shard sequences for compaction.',
@ -1334,14 +1339,12 @@ class TestManageShardRanges(unittest.TestCase):
['No shards identified for compaction.'],
out_lines[:1])
def test_compact_shrink_threshold(self):
def _do_test_compact_shrink_threshold(self, broker, shard_ranges):
# verify option to set the shrink threshold for compaction;
broker = self._make_broker()
shard_ranges = make_shard_ranges(broker, self.shard_data, '.shards_')
for i, sr in enumerate(shard_ranges):
sr.update_state(ShardRange.ACTIVE)
# (n-2)th shard range has one extra object
shard_ranges[-2].object_count = 11
shard_ranges[-2].object_count = shard_ranges[-2].object_count + 1
broker.merge_shard_ranges(shard_ranges)
self._move_broker_to_sharded_state(broker)
# with threshold set to 10 no shard ranges can be shrunk
@ -1384,6 +1387,19 @@ class TestManageShardRanges(unittest.TestCase):
self.assertEqual([ShardRange.SHRINKING] * 8 + [ShardRange.ACTIVE] * 2,
[sr.state for sr in updated_ranges])
def test_compact_shrink_threshold(self):
broker = self._make_broker()
shard_ranges = make_shard_ranges(broker, self.shard_data, '.shards_')
self._do_test_compact_shrink_threshold(broker, shard_ranges)
def test_compact_shrink_threshold_with_tombstones(self):
broker = self._make_broker()
shard_ranges = make_shard_ranges(broker, self.shard_data, '.shards_')
for i, sr in enumerate(shard_ranges):
sr.object_count = sr.object_count - i
sr.tombstones = i
self._do_test_compact_shrink_threshold(broker, shard_ranges)
def test_repair_not_root(self):
broker = self._make_broker()
shard_ranges = make_shard_ranges(broker, self.shard_data, '.shards_')

View File

@ -41,7 +41,7 @@ from swift.common.constraints import \
MAX_META_VALUE_LENGTH, MAX_META_COUNT, MAX_META_OVERALL_SIZE
from swift.common.db import chexor, dict_factory, get_db_connection, \
DatabaseBroker, DatabaseConnectionError, DatabaseAlreadyExists, \
GreenDBConnection, PICKLE_PROTOCOL, zero_like
GreenDBConnection, PICKLE_PROTOCOL, zero_like, TombstoneReclaimer
from swift.common.utils import normalize_timestamp, mkdirs, Timestamp
from swift.common.exceptions import LockTimeout
from swift.common.swob import HTTPException
@ -1093,6 +1093,7 @@ class TestDatabaseBroker(unittest.TestCase):
broker = DatabaseBroker(':memory:', account='a')
broker.db_type = 'test'
broker.db_contains_type = 'test'
broker.db_reclaim_timestamp = 'created_at'
broker_creation = normalize_timestamp(1)
broker_uuid = str(uuid4())
broker_metadata = metadata and json.dumps(
@ -1183,7 +1184,7 @@ class TestDatabaseBroker(unittest.TestCase):
return broker
# only testing _reclaim_metadata here
@patch.object(DatabaseBroker, '_reclaim', return_value='')
@patch.object(TombstoneReclaimer, 'reclaim')
def test_metadata(self, mock_reclaim):
# Initializes a good broker for us
broker = self.get_replication_info_tester(metadata=True)
@ -1569,7 +1570,7 @@ class TestDatabaseBroker(unittest.TestCase):
self.assertFalse(pending)
class TestTombstoneReclaim(unittest.TestCase):
class TestTombstoneReclaimer(unittest.TestCase):
def _make_object(self, broker, obj_name, ts, deleted):
if deleted:
broker.delete_test(obj_name, ts.internal)
@ -1586,29 +1587,32 @@ class TestTombstoneReclaim(unittest.TestCase):
with broker.get() as conn:
return self._count_reclaimable(conn, reclaim_age)
def _setup_reclaimable_active(self):
def _setup_tombstones(self, reverse_names=True):
broker = ExampleBroker(':memory:', account='test_account',
container='test_container')
broker.initialize(Timestamp('1').internal, 0)
now = time.time()
top_of_the_minute = now - (now % 60)
# namespace:
# namespace if reverse:
# a-* has 70 'active' tombstones followed by 70 reclaimable
# b-* has 70 'active' tombstones followed by 70 reclaimable
# else:
# a-* has 70 reclaimable followed by 70 'active' tombstones
# b-* has 70 reclaimable followed by 70 'active' tombstones
for i in range(0, 560, 4):
self._make_object(broker, 'a_%3d' % (560 - i),
Timestamp(top_of_the_minute - (i * 60)),
True)
self._make_object(broker, 'a_%3d' % (559 - i),
Timestamp(top_of_the_minute - ((i + 1) * 60)),
False)
self._make_object(broker, 'b_%3d' % (560 - i),
Timestamp(top_of_the_minute - ((i + 2) * 60)),
True)
self._make_object(broker, 'b_%3d' % (559 - i),
Timestamp(top_of_the_minute - ((i + 3) * 60)),
False)
self._make_object(
broker, 'a_%3d' % (560 - i if reverse_names else i),
Timestamp(top_of_the_minute - (i * 60)), True)
self._make_object(
broker, 'a_%3d' % (559 - i if reverse_names else i + 1),
Timestamp(top_of_the_minute - ((i + 1) * 60)), False)
self._make_object(
broker, 'b_%3d' % (560 - i if reverse_names else i),
Timestamp(top_of_the_minute - ((i + 2) * 60)), True)
self._make_object(
broker, 'b_%3d' % (559 - i if reverse_names else i + 1),
Timestamp(top_of_the_minute - ((i + 3) * 60)), False)
broker._commit_puts()
# divide the set of timestamps exactly in half for reclaim
@ -1635,11 +1639,12 @@ class TestTombstoneReclaim(unittest.TestCase):
yield reclaimable
def test_batched_reclaim_several_small_batches(self):
broker, totm, reclaim_age = self._setup_reclaimable_active()
broker, totm, reclaim_age = self._setup_tombstones()
with self._mock_broker_get(broker, reclaim_age) as reclaimable:
with patch('swift.common.db.RECLAIM_PAGE_SIZE', 50):
broker.reclaim(reclaim_age, reclaim_age)
reclaimer = TombstoneReclaimer(broker, reclaim_age)
reclaimer.reclaim()
expected_reclaimable = [140, # 0 rows fetched
90, # 50 rows fetched, 50 reclaimed
@ -1652,11 +1657,12 @@ class TestTombstoneReclaim(unittest.TestCase):
self.assertEqual(0, self._get_reclaimable(broker, reclaim_age))
def test_batched_reclaim_exactly_two_batches(self):
broker, totm, reclaim_age = self._setup_reclaimable_active()
broker, totm, reclaim_age = self._setup_tombstones()
with self._mock_broker_get(broker, reclaim_age) as reclaimable:
with patch('swift.common.db.RECLAIM_PAGE_SIZE', 140):
broker.reclaim(reclaim_age, reclaim_age)
reclaimer = TombstoneReclaimer(broker, reclaim_age)
reclaimer.reclaim()
expected_reclaimable = [140, # 0 rows fetched
70, # 140 rows fetched, 70 reclaimed
@ -1665,16 +1671,57 @@ class TestTombstoneReclaim(unittest.TestCase):
self.assertEqual(0, self._get_reclaimable(broker, reclaim_age))
def test_batched_reclaim_one_large_batch(self):
broker, totm, reclaim_age = self._setup_reclaimable_active()
broker, totm, reclaim_age = self._setup_tombstones()
with self._mock_broker_get(broker, reclaim_age) as reclaimable:
with patch('swift.common.db.RECLAIM_PAGE_SIZE', 1000):
broker.reclaim(reclaim_age, reclaim_age)
reclaimer = TombstoneReclaimer(broker, reclaim_age)
reclaimer.reclaim()
expected_reclaimable = [140] # 0 rows fetched
self.assertEqual(expected_reclaimable, reclaimable)
self.assertEqual(0, self._get_reclaimable(broker, reclaim_age))
def test_reclaim_get_tombstone_count(self):
broker, totm, reclaim_age = self._setup_tombstones(reverse_names=False)
with patch('swift.common.db.RECLAIM_PAGE_SIZE', 122):
reclaimer = TombstoneReclaimer(broker, reclaim_age)
reclaimer.reclaim()
self.assertEqual(0, self._get_reclaimable(broker, reclaim_age))
tombstones = self._get_reclaimable(broker, totm + 1)
self.assertEqual(140, tombstones)
# in this scenario the reclaim phase finds the remaining tombstone
# count (140)
self.assertEqual(140, reclaimer.remaining_tombstones)
self.assertEqual(140, reclaimer.get_tombstone_count())
def test_reclaim_get_tombstone_count_with_leftover(self):
broker, totm, reclaim_age = self._setup_tombstones()
with patch('swift.common.db.RECLAIM_PAGE_SIZE', 122):
reclaimer = TombstoneReclaimer(broker, reclaim_age)
reclaimer.reclaim()
self.assertEqual(0, self._get_reclaimable(broker, reclaim_age))
tombstones = self._get_reclaimable(broker, totm + 1)
self.assertEqual(140, tombstones)
# in this scenario the reclaim phase finds a subset (104) of all
# tombstones (140)
self.assertEqual(104, reclaimer.remaining_tombstones)
# get_tombstone_count finds the rest
actual = reclaimer.get_tombstone_count()
self.assertEqual(140, actual)
def test_get_tombstone_count_with_leftover(self):
# verify that a call to get_tombstone_count() will invoke a reclaim if
# reclaim not already invoked
broker, totm, reclaim_age = self._setup_tombstones()
with patch('swift.common.db.RECLAIM_PAGE_SIZE', 122):
reclaimer = TombstoneReclaimer(broker, reclaim_age)
actual = reclaimer.get_tombstone_count()
self.assertEqual(0, self._get_reclaimable(broker, reclaim_age))
self.assertEqual(140, actual)
if __name__ == '__main__':
unittest.main()

View File

@ -7846,7 +7846,7 @@ class TestShardRange(unittest.TestCase):
meta_timestamp=ts_1.internal, deleted=0,
state=utils.ShardRange.FOUND,
state_timestamp=ts_1.internal, epoch=None,
reported=0)
reported=0, tombstones=-1)
assert_initialisation_ok(dict(empty_run, name='a/c', timestamp=ts_1),
expect)
assert_initialisation_ok(dict(name='a/c', timestamp=ts_1), expect)
@ -7856,17 +7856,18 @@ class TestShardRange(unittest.TestCase):
meta_timestamp=ts_2, deleted=0,
state=utils.ShardRange.CREATED,
state_timestamp=ts_3.internal, epoch=ts_4,
reported=0)
reported=0, tombstones=11)
expect.update({'lower': 'l', 'upper': 'u', 'object_count': 2,
'bytes_used': 10, 'meta_timestamp': ts_2.internal,
'state': utils.ShardRange.CREATED,
'state_timestamp': ts_3.internal, 'epoch': ts_4,
'reported': 0})
'reported': 0, 'tombstones': 11})
assert_initialisation_ok(good_run.copy(), expect)
# obj count and bytes used as int strings
# obj count, tombstones and bytes used as int strings
good_str_run = good_run.copy()
good_str_run.update({'object_count': '2', 'bytes_used': '10'})
good_str_run.update({'object_count': '2', 'bytes_used': '10',
'tombstones': '11'})
assert_initialisation_ok(good_str_run, expect)
good_no_meta = good_run.copy()
@ -7922,7 +7923,7 @@ class TestShardRange(unittest.TestCase):
'upper': upper, 'object_count': 10, 'bytes_used': 100,
'meta_timestamp': ts_2.internal, 'deleted': 0,
'state': utils.ShardRange.FOUND, 'state_timestamp': ts_3.internal,
'epoch': ts_4, 'reported': 0}
'epoch': ts_4, 'reported': 0, 'tombstones': -1}
self.assertEqual(expected, sr_dict)
self.assertIsInstance(sr_dict['lower'], six.string_types)
self.assertIsInstance(sr_dict['upper'], six.string_types)
@ -7937,9 +7938,9 @@ class TestShardRange(unittest.TestCase):
for key in sr_dict:
bad_dict = dict(sr_dict)
bad_dict.pop(key)
if key == 'reported':
# This was added after the fact, and we need to be able to eat
# data from old servers
if key in ('reported', 'tombstones'):
# These were added after the fact, and we need to be able to
# eat data from old servers
utils.ShardRange.from_dict(bad_dict)
utils.ShardRange(**bad_dict)
continue
@ -8053,6 +8054,62 @@ class TestShardRange(unittest.TestCase):
check_bad_args('bad', 10)
check_bad_args(10, 'bad')
def test_update_tombstones(self):
ts_1 = next(self.ts_iter)
sr = utils.ShardRange('a/test', ts_1, 'l', 'u', 0, 0, None)
self.assertEqual(-1, sr.tombstones)
self.assertFalse(sr.reported)
with mock_timestamp_now(next(self.ts_iter)) as now:
sr.update_tombstones(1)
self.assertEqual(1, sr.tombstones)
self.assertEqual(now, sr.meta_timestamp)
self.assertFalse(sr.reported)
sr.reported = True
with mock_timestamp_now(next(self.ts_iter)) as now:
sr.update_tombstones(3, None)
self.assertEqual(3, sr.tombstones)
self.assertEqual(now, sr.meta_timestamp)
self.assertFalse(sr.reported)
sr.reported = True
ts_2 = next(self.ts_iter)
sr.update_tombstones(5, ts_2)
self.assertEqual(5, sr.tombstones)
self.assertEqual(ts_2, sr.meta_timestamp)
self.assertFalse(sr.reported)
# no change in value -> no change in reported
sr.reported = True
ts_3 = next(self.ts_iter)
sr.update_tombstones(5, ts_3)
self.assertEqual(5, sr.tombstones)
self.assertEqual(ts_3, sr.meta_timestamp)
self.assertTrue(sr.reported)
sr.update_meta('11', '12')
self.assertEqual(11, sr.object_count)
self.assertEqual(12, sr.bytes_used)
def check_bad_args(*args):
with self.assertRaises(ValueError):
sr.update_tombstones(*args)
check_bad_args('bad')
check_bad_args(10, 'bad')
def test_row_count(self):
ts_1 = next(self.ts_iter)
sr = utils.ShardRange('a/test', ts_1, 'l', 'u', 0, 0, None)
self.assertEqual(0, sr.row_count)
sr.update_meta(11, 123)
self.assertEqual(11, sr.row_count)
sr.update_tombstones(13)
self.assertEqual(24, sr.row_count)
sr.update_meta(0, 0)
self.assertEqual(13, sr.row_count)
def test_state_timestamp_setter(self):
ts_1 = next(self.ts_iter)
sr = utils.ShardRange('a/test', ts_1, 'l', 'u', 0, 0, None)
@ -8662,9 +8719,9 @@ class TestShardRangeList(unittest.TestCase):
self.ts_iter = make_timestamp_iter()
self.shard_ranges = [
utils.ShardRange('a/b', self.t1, 'a', 'b',
object_count=2, bytes_used=22),
object_count=2, bytes_used=22, tombstones=222),
utils.ShardRange('b/c', self.t2, 'b', 'c',
object_count=4, bytes_used=44),
object_count=4, bytes_used=44, tombstones=444),
utils.ShardRange('c/y', self.t1, 'c', 'y',
object_count=6, bytes_used=66),
]
@ -8676,6 +8733,7 @@ class TestShardRangeList(unittest.TestCase):
self.assertEqual(utils.ShardRange.MIN, srl.upper)
self.assertEqual(0, srl.object_count)
self.assertEqual(0, srl.bytes_used)
self.assertEqual(0, srl.row_count)
def test_init_with_list(self):
srl = ShardRangeList(self.shard_ranges[:2])
@ -8684,6 +8742,7 @@ class TestShardRangeList(unittest.TestCase):
self.assertEqual('c', srl.upper)
self.assertEqual(6, srl.object_count)
self.assertEqual(66, srl.bytes_used)
self.assertEqual(672, srl.row_count)
srl.append(self.shard_ranges[2])
self.assertEqual(3, len(srl))
@ -8691,6 +8750,8 @@ class TestShardRangeList(unittest.TestCase):
self.assertEqual('y', srl.upper)
self.assertEqual(12, srl.object_count)
self.assertEqual(132, srl.bytes_used)
self.assertEqual(-1, self.shard_ranges[2].tombstones) # sanity check
self.assertEqual(678, srl.row_count) # NB: tombstones=-1 not counted
def test_pop(self):
srl = ShardRangeList(self.shard_ranges[:2])
@ -8700,6 +8761,7 @@ class TestShardRangeList(unittest.TestCase):
self.assertEqual('b', srl.upper)
self.assertEqual(2, srl.object_count)
self.assertEqual(22, srl.bytes_used)
self.assertEqual(224, srl.row_count)
def test_slice(self):
srl = ShardRangeList(self.shard_ranges)
@ -8710,6 +8772,7 @@ class TestShardRangeList(unittest.TestCase):
self.assertEqual('b', sublist.upper)
self.assertEqual(2, sublist.object_count)
self.assertEqual(22, sublist.bytes_used)
self.assertEqual(224, sublist.row_count)
sublist = srl[1:]
self.assertIsInstance(sublist, ShardRangeList)
@ -8718,6 +8781,7 @@ class TestShardRangeList(unittest.TestCase):
self.assertEqual('y', sublist.upper)
self.assertEqual(10, sublist.object_count)
self.assertEqual(110, sublist.bytes_used)
self.assertEqual(454, sublist.row_count)
def test_includes(self):
srl = ShardRangeList(self.shard_ranges)

View File

@ -36,7 +36,8 @@ from swift.common.exceptions import LockTimeout
from swift.container.backend import ContainerBroker, \
update_new_item_from_existing, UNSHARDED, SHARDING, SHARDED, \
COLLAPSED, SHARD_LISTING_STATES, SHARD_UPDATE_STATES
from swift.common.db import DatabaseAlreadyExists, GreenDBConnection
from swift.common.db import DatabaseAlreadyExists, GreenDBConnection, \
TombstoneReclaimer
from swift.common.request_helpers import get_reserved_name
from swift.common.utils import Timestamp, encode_timestamps, hash_path, \
ShardRange, make_db_file_path, md5, ShardRangeList
@ -715,15 +716,17 @@ class TestContainerBroker(unittest.TestCase):
self.assertEqual(count_reclaimable(conn, reclaim_age),
num_of_objects / 4)
orig__reclaim = broker._reclaim
trace = []
def tracing_reclaim(conn, age_timestamp, marker):
trace.append((age_timestamp, marker,
count_reclaimable(conn, age_timestamp)))
return orig__reclaim(conn, age_timestamp, marker)
class TracingReclaimer(TombstoneReclaimer):
def _reclaim(self, conn):
trace.append(
(self.age_timestamp, self.marker,
count_reclaimable(conn, self.age_timestamp)))
return super(TracingReclaimer, self)._reclaim(conn)
with mock.patch.object(broker, '_reclaim', new=tracing_reclaim), \
with mock.patch(
'swift.common.db.TombstoneReclaimer', TracingReclaimer), \
mock.patch('swift.common.db.RECLAIM_PAGE_SIZE', 10):
broker.reclaim(reclaim_age, reclaim_age)
@ -856,7 +859,8 @@ class TestContainerBroker(unittest.TestCase):
TestContainerBrokerBeforeXSync,
TestContainerBrokerBeforeSPI,
TestContainerBrokerBeforeShardRanges,
TestContainerBrokerBeforeShardRangeReportedColumn):
TestContainerBrokerBeforeShardRangeReportedColumn,
TestContainerBrokerBeforeShardRangeTombstonesColumn):
self.assertEqual(info['status_changed_at'], '0')
else:
self.assertEqual(info['status_changed_at'],
@ -2210,7 +2214,8 @@ class TestContainerBroker(unittest.TestCase):
TestContainerBrokerBeforeXSync,
TestContainerBrokerBeforeSPI,
TestContainerBrokerBeforeShardRanges,
TestContainerBrokerBeforeShardRangeReportedColumn):
TestContainerBrokerBeforeShardRangeReportedColumn,
TestContainerBrokerBeforeShardRangeTombstonesColumn):
self.assertEqual(info['status_changed_at'], '0')
else:
self.assertEqual(info['status_changed_at'],
@ -3509,7 +3514,8 @@ class TestContainerBroker(unittest.TestCase):
TestContainerBrokerBeforeXSync,
TestContainerBrokerBeforeSPI,
TestContainerBrokerBeforeShardRanges,
TestContainerBrokerBeforeShardRangeReportedColumn):
TestContainerBrokerBeforeShardRangeReportedColumn,
TestContainerBrokerBeforeShardRangeTombstonesColumn):
self.assertEqual(info['status_changed_at'], '0')
else:
self.assertEqual(timestamp.internal, info['status_changed_at'])
@ -4843,7 +4849,7 @@ class TestContainerBroker(unittest.TestCase):
@with_tempdir
def test_merge_shard_ranges(self, tempdir):
ts = [next(self.ts) for _ in range(14)]
ts = [next(self.ts) for _ in range(16)]
db_path = os.path.join(
tempdir, 'containers', 'part', 'suffix', 'hash', 'container.db')
broker = ContainerBroker(
@ -4947,6 +4953,20 @@ class TestContainerBroker(unittest.TestCase):
broker.merge_shard_ranges(ShardRangeList([sr_c_13, sr_b_13]))
self._assert_shard_ranges(
broker, [sr_b_13, sr_c_13])
# merge with tombstones but same meta_timestamp
sr_c_13_tombs = ShardRange('a/c_c', ts[13], lower='b', upper='c',
object_count=10, meta_timestamp=ts[13],
tombstones=999)
broker.merge_shard_ranges(sr_c_13_tombs)
self._assert_shard_ranges(
broker, [sr_b_13, sr_c_13])
# merge with tombstones at newer meta_timestamp
sr_c_13_tombs = ShardRange('a/c_c', ts[13], lower='b', upper='c',
object_count=1, meta_timestamp=ts[14],
tombstones=999)
broker.merge_shard_ranges(sr_c_13_tombs)
self._assert_shard_ranges(
broker, [sr_b_13, sr_c_13_tombs])
@with_tempdir
def test_merge_shard_ranges_state(self, tempdir):
@ -5670,7 +5690,7 @@ class TestContainerBrokerBeforeShardRangeReportedColumn(
ContainerBrokerMigrationMixin, TestContainerBroker):
"""
Tests for ContainerBroker against databases created
before the shard_ranges table was added.
before the shard_ranges table reported column was added.
"""
# *grumble grumble* This should include container_info/policy_stat :-/
expected_db_tables = {'outgoing_sync', 'incoming_sync', 'object',
@ -5699,6 +5719,234 @@ class TestContainerBrokerBeforeShardRangeReportedColumn(
conn.execute('''SELECT reported
FROM shard_range''')
@with_tempdir
def test_get_shard_ranges_attempts(self, tempdir):
# verify that old broker handles new sql query for shard range rows
db_path = os.path.join(tempdir, 'container.db')
broker = ContainerBroker(db_path, account='a', container='c')
broker.initialize(next(self.ts).internal, 0)
@contextmanager
def patch_execute():
with broker.get() as conn:
mock_conn = mock.MagicMock()
mock_execute = mock.MagicMock()
mock_conn.execute = mock_execute
@contextmanager
def mock_get():
yield mock_conn
with mock.patch.object(broker, 'get', mock_get):
yield mock_execute, conn
with patch_execute() as (mock_execute, conn):
mock_execute.side_effect = conn.execute
broker.get_shard_ranges()
expected = [
mock.call('\n SELECT name, timestamp, lower, upper, '
'object_count, bytes_used, meta_timestamp, deleted, '
'state, state_timestamp, epoch, reported, '
'tombstones\n '
'FROM shard_range WHERE deleted=0 AND name != ?;\n'
' ', ['a/c']),
mock.call('\n SELECT name, timestamp, lower, upper, '
'object_count, bytes_used, meta_timestamp, deleted, '
'state, state_timestamp, epoch, 0 as reported, '
'tombstones\n '
'FROM shard_range WHERE deleted=0 AND name != ?;\n'
' ', ['a/c']),
mock.call('\n SELECT name, timestamp, lower, upper, '
'object_count, bytes_used, meta_timestamp, deleted, '
'state, state_timestamp, epoch, 0 as reported, '
'-1 as tombstones\n '
'FROM shard_range WHERE deleted=0 AND name != ?;\n'
' ', ['a/c']),
]
self.assertEqual(expected, mock_execute.call_args_list,
mock_execute.call_args_list)
# if unexpectedly the call to execute continues to fail for reported,
# verify that the exception is raised after a retry
with patch_execute() as (mock_execute, conn):
def mock_execute_handler(*args, **kwargs):
if len(mock_execute.call_args_list) < 3:
return conn.execute(*args, **kwargs)
else:
raise sqlite3.OperationalError('no such column: reported')
mock_execute.side_effect = mock_execute_handler
with self.assertRaises(sqlite3.OperationalError):
broker.get_shard_ranges()
self.assertEqual(expected, mock_execute.call_args_list,
mock_execute.call_args_list)
# if unexpectedly the call to execute continues to fail for tombstones,
# verify that the exception is raised after a retry
with patch_execute() as (mock_execute, conn):
def mock_execute_handler(*args, **kwargs):
if len(mock_execute.call_args_list) < 3:
return conn.execute(*args, **kwargs)
else:
raise sqlite3.OperationalError(
'no such column: tombstones')
mock_execute.side_effect = mock_execute_handler
with self.assertRaises(sqlite3.OperationalError):
broker.get_shard_ranges()
self.assertEqual(expected, mock_execute.call_args_list,
mock_execute.call_args_list)
@with_tempdir
def test_merge_shard_ranges_migrates_table(self, tempdir):
# verify that old broker migrates shard range table
db_path = os.path.join(tempdir, 'container.db')
broker = ContainerBroker(db_path, account='a', container='c')
broker.initialize(next(self.ts).internal, 0)
shard_ranges = [ShardRange('.shards_a/c_0', next(self.ts), 'a', 'b'),
ShardRange('.shards_a/c_1', next(self.ts), 'b', 'c')]
orig_migrate_reported = broker._migrate_add_shard_range_reported
orig_migrate_tombstones = broker._migrate_add_shard_range_tombstones
with mock.patch.object(
broker, '_migrate_add_shard_range_reported',
side_effect=orig_migrate_reported) as mocked_reported:
with mock.patch.object(
broker, '_migrate_add_shard_range_tombstones',
side_effect=orig_migrate_tombstones) as mocked_tombstones:
broker.merge_shard_ranges(shard_ranges[:1])
mocked_reported.assert_called_once_with(mock.ANY)
mocked_tombstones.assert_called_once_with(mock.ANY)
self._assert_shard_ranges(broker, shard_ranges[:1])
with mock.patch.object(
broker, '_migrate_add_shard_range_reported',
side_effect=orig_migrate_reported) as mocked_reported:
with mock.patch.object(
broker, '_migrate_add_shard_range_tombstones',
side_effect=orig_migrate_tombstones) as mocked_tombstones:
broker.merge_shard_ranges(shard_ranges[1:])
mocked_reported.assert_not_called()
mocked_tombstones.assert_not_called()
self._assert_shard_ranges(broker, shard_ranges)
@with_tempdir
def test_merge_shard_ranges_fails_to_migrate_table(self, tempdir):
# verify that old broker will raise exception if it unexpectedly fails
# to migrate shard range table
db_path = os.path.join(tempdir, 'container.db')
broker = ContainerBroker(db_path, account='a', container='c')
broker.initialize(next(self.ts).internal, 0)
shard_ranges = [ShardRange('.shards_a/c_0', next(self.ts), 'a', 'b'),
ShardRange('.shards_a/c_1', next(self.ts), 'b', 'c')]
# unexpected error during migration
with mock.patch.object(
broker, '_migrate_add_shard_range_reported',
side_effect=sqlite3.OperationalError('unexpected')) \
as mocked_reported:
with self.assertRaises(sqlite3.OperationalError):
broker.merge_shard_ranges(shard_ranges)
# one failed attempt was made to add reported column
self.assertEqual(1, mocked_reported.call_count)
# migration silently fails
with mock.patch.object(
broker, '_migrate_add_shard_range_reported') \
as mocked_reported:
with self.assertRaises(sqlite3.OperationalError):
broker.merge_shard_ranges(shard_ranges)
# one failed attempt was made to add reported column
self.assertEqual(1, mocked_reported.call_count)
with mock.patch.object(
broker, '_migrate_add_shard_range_tombstones') \
as mocked_tombstones:
with self.assertRaises(sqlite3.OperationalError):
broker.merge_shard_ranges(shard_ranges)
# first migration adds reported column
# one failed attempt was made to add tombstones column
self.assertEqual(1, mocked_tombstones.call_count)
def pre_tombstones_create_shard_range_table(self, conn):
"""
Copied from ContainerBroker before the
tombstones column was added; used for testing with
TestContainerBrokerBeforeShardRangeTombstonesColumn.
Create a shard_range table with no 'tombstones' column.
:param conn: DB connection object
"""
# Use execute (not executescript) so we get the benefits of our
# GreenDBConnection. Creating a table requires a whole-DB lock;
# *any* in-progress cursor will otherwise trip a "database is locked"
# error.
conn.execute("""
CREATE TABLE shard_range (
ROWID INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT,
timestamp TEXT,
lower TEXT,
upper TEXT,
object_count INTEGER DEFAULT 0,
bytes_used INTEGER DEFAULT 0,
meta_timestamp TEXT,
deleted INTEGER DEFAULT 0,
state INTEGER,
state_timestamp TEXT,
epoch TEXT,
reported INTEGER DEFAULT 0
);
""")
conn.execute("""
CREATE TRIGGER shard_range_update BEFORE UPDATE ON shard_range
BEGIN
SELECT RAISE(FAIL, 'UPDATE not allowed; DELETE and INSERT');
END;
""")
class TestContainerBrokerBeforeShardRangeTombstonesColumn(
ContainerBrokerMigrationMixin, TestContainerBroker):
"""
Tests for ContainerBroker against databases created
before the shard_ranges table tombstones column was added.
"""
expected_db_tables = {'outgoing_sync', 'incoming_sync', 'object',
'sqlite_sequence', 'container_stat', 'shard_range'}
def setUp(self):
super(TestContainerBrokerBeforeShardRangeTombstonesColumn,
self).setUp()
ContainerBroker.create_shard_range_table = \
pre_tombstones_create_shard_range_table
broker = ContainerBroker(':memory:', account='a', container='c')
broker.initialize(Timestamp('1').internal, 0)
with self.assertRaises(sqlite3.DatabaseError) as raised, \
broker.get() as conn:
conn.execute('''SELECT tombstones
FROM shard_range''')
self.assertIn('no such column: tombstones', str(raised.exception))
def tearDown(self):
super(TestContainerBrokerBeforeShardRangeTombstonesColumn,
self).tearDown()
broker = ContainerBroker(':memory:', account='a', container='c')
broker.initialize(Timestamp('1').internal, 0)
with broker.get() as conn:
conn.execute('''SELECT tombstones
FROM shard_range''')
class TestUpdateNewItemFromExisting(unittest.TestCase):
# TODO: add test scenarios that have swift_bytes in content_type

View File

@ -113,13 +113,13 @@ class BaseTestSharder(unittest.TestCase):
return broker
def _make_shard_ranges(self, bounds, state=None, object_count=0,
timestamp=Timestamp.now()):
timestamp=Timestamp.now(), **kwargs):
if not isinstance(state, (tuple, list)):
state = [state] * len(bounds)
state_iter = iter(state)
return [ShardRange('.shards_a/c_%s' % upper, timestamp,
lower, upper, state=next(state_iter),
object_count=object_count)
object_count=object_count, **kwargs)
for lower, upper in bounds]
def ts_encoded(self):
@ -4739,10 +4739,39 @@ class TestSharder(BaseTestSharder):
bytes_used=own_shard_range.bytes_used + 1)]
self.check_shard_ranges_sent(broker, expected_sent)
# initialise tombstones
with mock_timestamp_now(next(self.ts_iter)):
own_shard_range = broker.get_own_shard_range()
own_shard_range.update_tombstones(0)
broker.merge_shard_ranges([own_shard_range])
for state in ShardRange.STATES:
with annotate_failure(state):
check_only_own_shard_range_sent(state)
def check_tombstones_sent(state):
own_shard_range = broker.get_own_shard_range()
self.assertTrue(own_shard_range.update_state(
state, state_timestamp=next(self.ts_iter)))
broker.merge_shard_ranges([own_shard_range])
# delete an object, expect to see it reflected in the own shard
# range that is sent
broker.delete_object(str(own_shard_range.object_count),
next(self.ts_iter).internal)
with mock_timestamp_now() as now:
# force own shard range meta updates to be at fixed timestamp
expected_sent = [
dict(own_shard_range,
meta_timestamp=now.internal,
object_count=own_shard_range.object_count - 1,
bytes_used=own_shard_range.bytes_used - 1,
tombstones=own_shard_range.tombstones + 1)]
self.check_shard_ranges_sent(broker, expected_sent)
for state in ShardRange.STATES:
with annotate_failure(state):
check_tombstones_sent(state)
def test_update_root_container_already_reported(self):
broker = self._make_broker()
@ -4765,6 +4794,12 @@ class TestSharder(BaseTestSharder):
sharder._update_root_container(broker)
self.assertFalse(mock_conn.requests)
# initialise tombstones
with mock_timestamp_now(next(self.ts_iter)):
own_shard_range = broker.get_own_shard_range()
own_shard_range.update_tombstones(0)
broker.merge_shard_ranges([own_shard_range])
for state in ShardRange.STATES:
with annotate_failure(state):
check_already_reported_not_sent(state)
@ -4796,7 +4831,8 @@ class TestSharder(BaseTestSharder):
own_shard_range.copy(
meta_timestamp=now.internal,
object_count=own_shard_range.object_count + 1,
bytes_used=own_shard_range.bytes_used + 1)] +
bytes_used=own_shard_range.bytes_used + 1,
tombstones=0)] +
shard_ranges,
key=lambda sr: (sr.upper, sr.state, sr.lower))
self.check_shard_ranges_sent(
@ -5471,8 +5507,11 @@ class TestSharder(BaseTestSharder):
shard_bounds = (('', 'here'), ('here', 'there'), ('there', ''))
size = (DEFAULT_SHARD_SHRINK_POINT *
DEFAULT_SHARD_CONTAINER_THRESHOLD / 100)
# all shard ranges too big to shrink
shard_ranges = self._make_shard_ranges(
shard_bounds, state=ShardRange.ACTIVE, object_count=size)
shard_bounds, state=ShardRange.ACTIVE, object_count=size - 1,
tombstones=1)
own_sr = broker.get_own_shard_range()
own_sr.update_state(ShardRange.SHARDED, Timestamp.now())
broker.merge_shard_ranges(shard_ranges + [own_sr])
@ -5484,7 +5523,7 @@ class TestSharder(BaseTestSharder):
broker.get_shard_ranges())
# one range just below threshold
shard_ranges[0].update_meta(size - 1, 0)
shard_ranges[0].update_meta(size - 2, 0)
broker.merge_shard_ranges(shard_ranges[0])
with self._mock_sharder() as sharder:
with mock_timestamp_now() as now:
@ -6843,13 +6882,8 @@ class TestSharderFunctions(BaseTestSharder):
sequences = find_compactible_shard_sequences(broker, 10, 999, 3, 3)
self.assertEqual([shard_ranges[8:]], sequences)
def test_find_compactible_shrink_threshold(self):
def _do_test_find_compactible_shrink_threshold(self, broker, shard_ranges):
# verify option to set the shrink threshold for compaction;
broker = self._make_broker()
shard_ranges = self._make_shard_ranges(
(('', 'b'), ('b', 'c'), ('c', 'd'), ('d', 'e'), ('e', 'f'),
('f', 'g'), ('g', 'h'), ('h', 'i'), ('i', 'j'), ('j', '')),
state=ShardRange.ACTIVE, object_count=10)
# (n-2)th shard range has one extra object
shard_ranges[-2].object_count = 11
broker.merge_shard_ranges(shard_ranges)
@ -6861,13 +6895,24 @@ class TestSharderFunctions(BaseTestSharder):
sequences = find_compactible_shard_sequences(broker, 11, 999, -1, -1)
self.assertEqual([shard_ranges[:9]], sequences)
def test_find_compactible_expansion_limit(self):
# verify option to limit the size of each acceptor after compaction
def test_find_compactible_shrink_threshold(self):
broker = self._make_broker()
shard_ranges = self._make_shard_ranges(
(('', 'b'), ('b', 'c'), ('c', 'd'), ('d', 'e'), ('e', 'f'),
('f', 'g'), ('g', 'h'), ('h', 'i'), ('i', 'j'), ('j', '')),
state=ShardRange.ACTIVE, object_count=6)
state=ShardRange.ACTIVE, object_count=10)
self._do_test_find_compactible_shrink_threshold(broker, shard_ranges)
def test_find_compactible_shrink_threshold_with_tombstones(self):
broker = self._make_broker()
shard_ranges = self._make_shard_ranges(
(('', 'b'), ('b', 'c'), ('c', 'd'), ('d', 'e'), ('e', 'f'),
('f', 'g'), ('g', 'h'), ('h', 'i'), ('i', 'j'), ('j', '')),
state=ShardRange.ACTIVE, object_count=7, tombstones=3)
self._do_test_find_compactible_shrink_threshold(broker, shard_ranges)
def _do_test_find_compactible_expansion_limit(self, broker, shard_ranges):
# verify option to limit the size of each acceptor after compaction
broker.merge_shard_ranges(shard_ranges)
sequences = find_compactible_shard_sequences(broker, 10, 33, -1, -1)
self.assertEqual([shard_ranges[:5], shard_ranges[5:]], sequences)
@ -6877,11 +6922,30 @@ class TestSharderFunctions(BaseTestSharder):
sequences = find_compactible_shard_sequences(broker, 10, 33, -1, -1)
self.assertEqual([shard_ranges[:4], shard_ranges[7:]], sequences)
def test_find_compactible_expansion_limit(self):
# verify option to limit the size of each acceptor after compaction
broker = self._make_broker()
shard_ranges = self._make_shard_ranges(
(('', 'b'), ('b', 'c'), ('c', 'd'), ('d', 'e'), ('e', 'f'),
('f', 'g'), ('g', 'h'), ('h', 'i'), ('i', 'j'), ('j', '')),
state=ShardRange.ACTIVE, object_count=6)
self._do_test_find_compactible_expansion_limit(broker, shard_ranges)
def test_find_compactible_expansion_limit_with_tombstones(self):
# verify option to limit the size of each acceptor after compaction
broker = self._make_broker()
shard_ranges = self._make_shard_ranges(
(('', 'b'), ('b', 'c'), ('c', 'd'), ('d', 'e'), ('e', 'f'),
('f', 'g'), ('g', 'h'), ('h', 'i'), ('i', 'j'), ('j', '')),
state=ShardRange.ACTIVE, object_count=1, tombstones=5)
self._do_test_find_compactible_expansion_limit(broker, shard_ranges)
def test_is_sharding_candidate(self):
for state in ShardRange.STATES:
for object_count in (9, 10, 11):
sr = ShardRange('.shards_a/c', next(self.ts_iter), '', '',
state=state, object_count=object_count)
state=state, object_count=object_count,
tombstones=100) # tombstones not considered
with annotate_failure('%s %s' % (state, object_count)):
if state == ShardRange.ACTIVE and object_count >= 10:
self.assertTrue(is_sharding_candidate(sr, 10))
@ -6894,6 +6958,10 @@ class TestSharderFunctions(BaseTestSharder):
sr = ShardRange('.shards_a/c', next(self.ts_iter), '', '',
state=state, object_count=9)
self.assertTrue(is_shrinking_candidate(sr, 10, 9, ok_states))
# shard range has 9 rows
sr = ShardRange('.shards_a/c', next(self.ts_iter), '', '',
state=state, object_count=4, tombstones=5)
self.assertTrue(is_shrinking_candidate(sr, 10, 9, ok_states))
do_check_true(ShardRange.ACTIVE, (ShardRange.ACTIVE,))
do_check_true(ShardRange.ACTIVE,
@ -6901,11 +6969,12 @@ class TestSharderFunctions(BaseTestSharder):
do_check_true(ShardRange.SHRINKING,
(ShardRange.ACTIVE, ShardRange.SHRINKING))
def do_check_false(state, object_count):
def do_check_false(state, object_count, tombstones):
states = (ShardRange.ACTIVE, ShardRange.SHRINKING)
# shard range has 10 objects
sr = ShardRange('.shards_a/c', next(self.ts_iter), '', '',
state=state, object_count=object_count)
state=state, object_count=object_count,
tombstones=tombstones)
self.assertFalse(is_shrinking_candidate(sr, 10, 20))
self.assertFalse(is_shrinking_candidate(sr, 10, 20, states))
self.assertFalse(is_shrinking_candidate(sr, 10, 9))
@ -6916,7 +6985,13 @@ class TestSharderFunctions(BaseTestSharder):
for state in ShardRange.STATES:
for object_count in (10, 11):
with annotate_failure('%s %s' % (state, object_count)):
do_check_false(state, object_count)
do_check_false(state, object_count, 0)
for tombstones in (10, 11):
with annotate_failure('%s %s' % (state, tombstones)):
do_check_false(state, 0, tombstones)
for tombstones in (5, 6):
with annotate_failure('%s %s' % (state, tombstones)):
do_check_false(state, 5, tombstones)
def test_find_and_rank_whole_path_split(self):
ts_0 = next(self.ts_iter)