Breakup reclaim into batches

We want to do the table scan without locking and group the locking
deletes into small indexed operations to minimize the impact of
background processes calling reclaim each cycle.

Change-Id: I3ccd145c14a9b68ff8a9da61f79034549c9bc127
Co-Authored-By: Tim Burke <tim.burke@gmail.com>
Closes-Bug: #1877651
(cherry picked from commit aab45880f8)
This commit is contained in:
Clay Gerrard 2020-05-13 13:32:18 -05:00 committed by Tim Burke
parent 635c71c59e
commit 170dddaacc
5 changed files with 208 additions and 16 deletions

View File

@ -51,6 +51,9 @@ PICKLE_PROTOCOL = 2
# records will be merged.
PENDING_CAP = 131072
SQLITE_ARG_LIMIT = 999
RECLAIM_PAGE_SIZE = 10000
def utf8encode(*args):
return [(s.encode('utf8') if isinstance(s, six.text_type) else s)
@ -961,16 +964,48 @@ class DatabaseBroker(object):
with lock_parent_directory(self.pending_file,
self.pending_timeout):
self._commit_puts()
with self.get() as conn:
self._reclaim(conn, age_timestamp, sync_timestamp)
self._reclaim_metadata(conn, age_timestamp)
conn.commit()
marker = ''
finished = False
while not finished:
with self.get() as conn:
marker = self._reclaim(conn, age_timestamp, marker)
if not marker:
finished = True
self._reclaim_other_stuff(
conn, age_timestamp, sync_timestamp)
conn.commit()
def _reclaim(self, conn, age_timestamp, sync_timestamp):
conn.execute('''
DELETE FROM %s WHERE deleted = 1 AND %s < ?
''' % (self.db_contains_type, self.db_reclaim_timestamp),
(age_timestamp,))
def _reclaim_other_stuff(self, conn, age_timestamp, sync_timestamp):
"""
This is only called once at the end of reclaim after _reclaim has been
called for each page.
"""
self._reclaim_sync(conn, sync_timestamp)
self._reclaim_metadata(conn, age_timestamp)
def _reclaim(self, conn, age_timestamp, marker):
clean_batch_qry = '''
DELETE FROM %s WHERE deleted = 1
AND name > ? AND %s < ?
''' % (self.db_contains_type, self.db_reclaim_timestamp)
curs = conn.execute('''
SELECT name FROM %s WHERE deleted = 1
AND name > ?
ORDER BY NAME LIMIT 1 OFFSET ?
''' % (self.db_contains_type,), (marker, RECLAIM_PAGE_SIZE))
row = curs.fetchone()
if row:
# do a single book-ended DELETE and bounce out
end_marker = row[0]
conn.execute(clean_batch_qry + ' AND name <= ?', (
marker, age_timestamp, end_marker))
else:
# delete off the end and reset marker to indicate we're done
end_marker = ''
conn.execute(clean_batch_qry, (marker, age_timestamp))
return end_marker
def _reclaim_sync(self, conn, sync_timestamp):
try:
conn.execute('''
DELETE FROM outgoing_sync WHERE updated_at < ?

View File

@ -32,9 +32,7 @@ from swift.common.utils import Timestamp, encode_timestamps, \
ShardRange, renamer, find_shard_range, MD5_OF_EMPTY_STRING, mkdirs, \
get_db_files, parse_db_filename, make_db_file_path, split_path
from swift.common.db import DatabaseBroker, utf8encode, BROKER_TIMEOUT, \
zero_like, DatabaseAlreadyExists
SQLITE_ARG_LIMIT = 999
zero_like, DatabaseAlreadyExists, SQLITE_ARG_LIMIT
DATADIR = 'containers'
@ -1587,9 +1585,9 @@ class ContainerBroker(DatabaseBroker):
CONTAINER_STAT_VIEW_SCRIPT +
'COMMIT;')
def _reclaim(self, conn, age_timestamp, sync_timestamp):
super(ContainerBroker, self)._reclaim(conn, age_timestamp,
sync_timestamp)
def _reclaim_other_stuff(self, conn, age_timestamp, sync_timestamp):
super(ContainerBroker, self)._reclaim_other_stuff(
conn, age_timestamp, sync_timestamp)
# populate instance cache, but use existing conn to avoid deadlock
# when it has a pending update
self._populate_instance_cache(conn=conn)

View File

@ -175,6 +175,72 @@ class TestAccountBroker(unittest.TestCase):
broker.delete_db(Timestamp.now().internal)
broker.reclaim(Timestamp.now().internal, time())
def test_batched_reclaim(self):
num_of_containers = 60
container_specs = []
now = time()
top_of_the_minute = now - (now % 60)
c = itertools.cycle([True, False])
for m, is_deleted in six.moves.zip(range(num_of_containers), c):
offset = top_of_the_minute - (m * 60)
container_specs.append((Timestamp(offset), is_deleted))
random.seed(now)
random.shuffle(container_specs)
policy_indexes = list(p.idx for p in POLICIES)
broker = AccountBroker(':memory:', account='test_account')
broker.initialize(Timestamp('1').internal)
for i, container_spec in enumerate(container_specs):
# with container12 before container2 and shuffled ts.internal we
# shouldn't be able to accidently rely on any implicit ordering
name = 'container%s' % i
pidx = random.choice(policy_indexes)
ts, is_deleted = container_spec
if is_deleted:
broker.put_container(name, 0, ts.internal, 0, 0, pidx)
else:
broker.put_container(name, ts.internal, 0, 0, 0, pidx)
def count_reclaimable(conn, reclaim_age):
return conn.execute(
"SELECT count(*) FROM container "
"WHERE deleted = 1 AND delete_timestamp < ?", (reclaim_age,)
).fetchone()[0]
# This is intended to divide the set of timestamps exactly in half
# regardless of the value of now
reclaim_age = top_of_the_minute + 1 - (num_of_containers / 2 * 60)
with broker.get() as conn:
self.assertEqual(count_reclaimable(conn, reclaim_age),
num_of_containers / 4)
orig__reclaim = broker._reclaim
trace = []
def tracing_reclaim(conn, age_timestamp, marker):
trace.append((age_timestamp, marker,
count_reclaimable(conn, age_timestamp)))
return orig__reclaim(conn, age_timestamp, marker)
with mock.patch.object(broker, '_reclaim', new=tracing_reclaim), \
mock.patch('swift.common.db.RECLAIM_PAGE_SIZE', 10):
broker.reclaim(reclaim_age, reclaim_age)
with broker.get() as conn:
self.assertEqual(count_reclaimable(conn, reclaim_age), 0)
self.assertEqual(3, len(trace), trace)
self.assertEqual([age for age, marker, reclaimable in trace],
[reclaim_age] * 3)
# markers are in-order
self.assertLess(trace[0][1], trace[1][1])
self.assertLess(trace[1][1], trace[2][1])
# reclaimable count gradually decreases
# generally, count1 > count2 > count3, but because of the randomness
# we may occassionally have count1 == count2 or count2 == count3
self.assertGreaterEqual(trace[0][2], trace[1][2])
self.assertGreaterEqual(trace[1][2], trace[2][2])
# technically, this might happen occasionally, but *really* rarely
self.assertTrue(trace[0][2] > trace[1][2] or
trace[1][2] > trace[2][2])
def test_delete_db_status(self):
ts = (Timestamp(t).internal for t in itertools.count(int(time())))
start = next(ts)

View File

@ -1141,7 +1141,7 @@ class TestDatabaseBroker(unittest.TestCase):
return broker
# only testing _reclaim_metadata here
@patch.object(DatabaseBroker, '_reclaim')
@patch.object(DatabaseBroker, '_reclaim', return_value='')
def test_metadata(self, mock_reclaim):
# Initializes a good broker for us
broker = self.get_replication_info_tester(metadata=True)

View File

@ -28,6 +28,7 @@ from contextlib import contextmanager
import sqlite3
import pickle
import json
import itertools
import six
@ -478,6 +479,98 @@ class TestContainerBroker(unittest.TestCase):
broker.reclaim(Timestamp.now().internal, time())
broker.delete_db(Timestamp.now().internal)
def test_batch_reclaim(self):
num_of_objects = 60
obj_specs = []
now = time()
top_of_the_minute = now - (now % 60)
c = itertools.cycle([True, False])
for m, is_deleted in six.moves.zip(range(num_of_objects), c):
offset = top_of_the_minute - (m * 60)
obj_specs.append((Timestamp(offset), is_deleted))
random.seed(now)
random.shuffle(obj_specs)
policy_indexes = list(p.idx for p in POLICIES)
broker = ContainerBroker(':memory:', account='test_account',
container='test_container')
broker.initialize(Timestamp('1').internal, 0)
for i, obj_spec in enumerate(obj_specs):
# with object12 before object2 and shuffled ts.internal we
# shouldn't be able to accidently rely on any implicit ordering
obj_name = 'object%s' % i
pidx = random.choice(policy_indexes)
ts, is_deleted = obj_spec
if is_deleted:
broker.delete_object(obj_name, ts.internal, pidx)
else:
broker.put_object(obj_name, ts.internal, 0, 'text/plain',
'etag', storage_policy_index=pidx)
def count_reclaimable(conn, reclaim_age):
return conn.execute(
"SELECT count(*) FROM object "
"WHERE deleted = 1 AND created_at < ?", (reclaim_age,)
).fetchone()[0]
# This is intended to divide the set of timestamps exactly in half
# regardless of the value of now
reclaim_age = top_of_the_minute + 1 - (num_of_objects / 2 * 60)
with broker.get() as conn:
self.assertEqual(count_reclaimable(conn, reclaim_age),
num_of_objects / 4)
orig__reclaim = broker._reclaim
trace = []
def tracing_reclaim(conn, age_timestamp, marker):
trace.append((age_timestamp, marker,
count_reclaimable(conn, age_timestamp)))
return orig__reclaim(conn, age_timestamp, marker)
with mock.patch.object(broker, '_reclaim', new=tracing_reclaim), \
mock.patch('swift.common.db.RECLAIM_PAGE_SIZE', 10):
broker.reclaim(reclaim_age, reclaim_age)
with broker.get() as conn:
self.assertEqual(count_reclaimable(conn, reclaim_age), 0)
self.assertEqual(3, len(trace), trace)
self.assertEqual([age for age, marker, reclaimable in trace],
[reclaim_age] * 3)
# markers are in-order
self.assertLess(trace[0][1], trace[1][1])
self.assertLess(trace[1][1], trace[2][1])
# reclaimable count gradually decreases
# generally, count1 > count2 > count3, but because of the randomness
# we may occassionally have count1 == count2 or count2 == count3
self.assertGreaterEqual(trace[0][2], trace[1][2])
self.assertGreaterEqual(trace[1][2], trace[2][2])
# technically, this might happen occasionally, but *really* rarely
self.assertTrue(trace[0][2] > trace[1][2] or
trace[1][2] > trace[2][2])
def test_reclaim_with_duplicate_names(self):
broker = ContainerBroker(':memory:', account='test_account',
container='test_container')
broker.initialize(Timestamp('1').internal, 0)
now = time()
ages_ago = Timestamp(now - (3 * 7 * 24 * 60 * 60))
for i in range(10):
for spidx in range(10):
obj_name = 'object%s' % i
broker.delete_object(obj_name, ages_ago.internal, spidx)
reclaim_age = now - (2 * 7 * 24 * 60 * 60)
with broker.get() as conn:
self.assertEqual(conn.execute(
"SELECT count(*) FROM object "
"WHERE created_at < ?", (reclaim_age,)
).fetchone()[0], 100)
with mock.patch('swift.common.db.RECLAIM_PAGE_SIZE', 10):
broker.reclaim(reclaim_age, reclaim_age)
with broker.get() as conn:
self.assertEqual(conn.execute(
"SELECT count(*) FROM object "
).fetchone()[0], 0)
@with_tempdir
def test_reclaim_deadlock(self, tempdir):
db_path = os.path.join(