Refactor purge_deleted, operate on batches of stacks
Avoid large sql "in" clauses by operating on smaller batches of stacks at a time. To avoid transaction overhead and contention on the resource table, the first deletions occur outside of a transaction (are autocommitted). This is OK because the purge is re-rentrant -- we won't lose any stack_id's to delete if something goes wrong before the conn.begin() block. That is, we will not orphan any rows if the purge is run multiple times where an error occurs. Change-Id: I9edf0558ed54820842193560e323df6501411d1d
This commit is contained in:
parent
902990097b
commit
882a640f18
|
@ -133,7 +133,8 @@ def purge_deleted():
|
|||
"""Remove database records that have been previously soft deleted."""
|
||||
utils.purge_deleted(CONF.command.age,
|
||||
CONF.command.granularity,
|
||||
CONF.command.project_id)
|
||||
CONF.command.project_id,
|
||||
CONF.command.batch_size)
|
||||
|
||||
|
||||
def do_crypt_parameters_and_properties():
|
||||
|
@ -179,6 +180,13 @@ def add_command_parsers(subparsers):
|
|||
parser.add_argument(
|
||||
'-p', '--project-id',
|
||||
help=_('Project ID to purge deleted stacks.'))
|
||||
# optional parameter, can be skipped. default='20'
|
||||
parser.add_argument(
|
||||
'-b', '--batch_size', default='20',
|
||||
help=_('Number of stacks to delete at a time (per transaction). '
|
||||
'Note that a single stack may have many db rows '
|
||||
'(events, etc.) associated with it.'))
|
||||
|
||||
# update_params parser
|
||||
parser = subparsers.add_parser('update_params')
|
||||
parser.set_defaults(func=do_crypt_parameters_and_properties)
|
||||
|
|
|
@ -13,6 +13,7 @@
|
|||
|
||||
"""Implementation of SQLAlchemy backend."""
|
||||
import datetime
|
||||
import itertools
|
||||
import sys
|
||||
|
||||
from oslo_config import cfg
|
||||
|
@ -1176,13 +1177,18 @@ def service_get_all_by_args(context, host, binary, hostname):
|
|||
filter_by(hostname=hostname).all())
|
||||
|
||||
|
||||
def purge_deleted(age, granularity='days', project_id=None):
|
||||
try:
|
||||
age = int(age)
|
||||
except ValueError:
|
||||
raise exception.Error(_("age should be an integer"))
|
||||
if age < 0:
|
||||
raise exception.Error(_("age should be a positive integer"))
|
||||
def purge_deleted(age, granularity='days', project_id=None, batch_size=20):
|
||||
def _validate_positive_integer(val, argname):
|
||||
try:
|
||||
return int(val)
|
||||
except ValueError:
|
||||
raise exception.Error(_("%s should be an integer") % argname)
|
||||
if val < 0:
|
||||
raise exception.Error(_("%s should be a positive integer")
|
||||
% argname)
|
||||
|
||||
age = _validate_positive_integer(age, 'age')
|
||||
batch_size = _validate_positive_integer(batch_size, 'batch_size')
|
||||
|
||||
if granularity not in ('days', 'hours', 'minutes', 'seconds'):
|
||||
raise exception.Error(
|
||||
|
@ -1200,6 +1206,46 @@ def purge_deleted(age, granularity='days', project_id=None):
|
|||
meta = sqlalchemy.MetaData()
|
||||
meta.bind = engine
|
||||
|
||||
stack = sqlalchemy.Table('stack', meta, autoload=True)
|
||||
service = sqlalchemy.Table('service', meta, autoload=True)
|
||||
|
||||
# Purge deleted services
|
||||
srvc_del = service.delete().where(service.c.deleted_at < time_line)
|
||||
engine.execute(srvc_del)
|
||||
|
||||
# find the soft-deleted stacks that are past their expiry
|
||||
sel = sqlalchemy.select([stack.c.id, stack.c.raw_template_id,
|
||||
stack.c.prev_raw_template_id,
|
||||
stack.c.user_creds_id,
|
||||
stack.c.action,
|
||||
stack.c.status,
|
||||
stack.c.name])
|
||||
if project_id:
|
||||
stack_where = sel.where(and_(
|
||||
stack.c.tenant == project_id,
|
||||
stack.c.deleted_at < time_line))
|
||||
else:
|
||||
stack_where = sel.where(
|
||||
stack.c.deleted_at < time_line)
|
||||
|
||||
stacks = engine.execute(stack_where)
|
||||
|
||||
while True:
|
||||
next_stacks_to_purge = list(itertools.islice(stacks, batch_size))
|
||||
if len(next_stacks_to_purge):
|
||||
_purge_stacks(next_stacks_to_purge, engine, meta)
|
||||
else:
|
||||
break
|
||||
|
||||
|
||||
def _purge_stacks(stack_infos, engine, meta):
|
||||
"""Purge some stacks and their releated events, raw_templates, etc.
|
||||
|
||||
stack_infos is a list of lists of selected stack columns:
|
||||
[[id, raw_template_id, prev_raw_template_id, user_creds_id,
|
||||
action, status, name], ...]
|
||||
"""
|
||||
|
||||
stack = sqlalchemy.Table('stack', meta, autoload=True)
|
||||
stack_lock = sqlalchemy.Table('stack_lock', meta, autoload=True)
|
||||
stack_tag = sqlalchemy.Table('stack_tag', meta, autoload=True)
|
||||
|
@ -1210,102 +1256,88 @@ def purge_deleted(age, granularity='days', project_id=None):
|
|||
raw_template_files = sqlalchemy.Table('raw_template_files', meta,
|
||||
autoload=True)
|
||||
user_creds = sqlalchemy.Table('user_creds', meta, autoload=True)
|
||||
service = sqlalchemy.Table('service', meta, autoload=True)
|
||||
syncpoint = sqlalchemy.Table('sync_point', meta, autoload=True)
|
||||
|
||||
# find the soft-deleted stacks that are past their expiry
|
||||
if project_id:
|
||||
stack_where = sqlalchemy.select([
|
||||
stack.c.id, stack.c.raw_template_id,
|
||||
stack.c.prev_raw_template_id,
|
||||
stack.c.user_creds_id]).where(and_(
|
||||
stack.c.tenant == project_id,
|
||||
stack.c.deleted_at < time_line))
|
||||
else:
|
||||
stack_where = sqlalchemy.select([
|
||||
stack.c.id, stack.c.raw_template_id,
|
||||
stack.c.prev_raw_template_id,
|
||||
stack.c.user_creds_id]).where(
|
||||
stack.c.deleted_at < time_line)
|
||||
stack_info_str = ','.join([str(i) for i in stack_infos])
|
||||
LOG.info("Purging stacks %s" % stack_info_str)
|
||||
|
||||
stacks = list(engine.execute(stack_where))
|
||||
if stacks:
|
||||
stack_ids = [i[0] for i in stacks]
|
||||
# delete stack locks (just in case some got stuck)
|
||||
stack_lock_del = stack_lock.delete().where(
|
||||
stack_lock.c.stack_id.in_(stack_ids))
|
||||
engine.execute(stack_lock_del)
|
||||
# delete stack tags
|
||||
stack_tag_del = stack_tag.delete().where(
|
||||
stack_tag.c.stack_id.in_(stack_ids))
|
||||
engine.execute(stack_tag_del)
|
||||
# delete resource_data
|
||||
res_where = sqlalchemy.select([resource.c.id]).where(
|
||||
resource.c.stack_id.in_(stack_ids))
|
||||
res_data_del = resource_data.delete().where(
|
||||
resource_data.c.resource_id.in_(res_where))
|
||||
engine.execute(res_data_del)
|
||||
# delete resources
|
||||
res_del = resource.delete().where(resource.c.stack_id.in_(stack_ids))
|
||||
engine.execute(res_del)
|
||||
# delete events
|
||||
event_del = event.delete().where(event.c.stack_id.in_(stack_ids))
|
||||
engine.execute(event_del)
|
||||
# clean up any sync_points that may have lingered
|
||||
sync_del = syncpoint.delete().where(
|
||||
syncpoint.c.stack_id.in_(stack_ids))
|
||||
engine.execute(sync_del)
|
||||
stack_ids = [stack_info[0] for stack_info in stack_infos]
|
||||
# delete stack locks (just in case some got stuck)
|
||||
stack_lock_del = stack_lock.delete().where(
|
||||
stack_lock.c.stack_id.in_(stack_ids))
|
||||
engine.execute(stack_lock_del)
|
||||
# delete stack tags
|
||||
stack_tag_del = stack_tag.delete().where(
|
||||
stack_tag.c.stack_id.in_(stack_ids))
|
||||
engine.execute(stack_tag_del)
|
||||
# delete resource_data
|
||||
res_where = sqlalchemy.select([resource.c.id]).where(
|
||||
resource.c.stack_id.in_(stack_ids))
|
||||
res_data_del = resource_data.delete().where(
|
||||
resource_data.c.resource_id.in_(res_where))
|
||||
engine.execute(res_data_del)
|
||||
# delete resources (normally there shouldn't be any)
|
||||
res_del = resource.delete().where(resource.c.stack_id.in_(stack_ids))
|
||||
engine.execute(res_del)
|
||||
# delete events
|
||||
event_del = event.delete().where(event.c.stack_id.in_(stack_ids))
|
||||
engine.execute(event_del)
|
||||
# clean up any sync_points that may have lingered
|
||||
sync_del = syncpoint.delete().where(
|
||||
syncpoint.c.stack_id.in_(stack_ids))
|
||||
engine.execute(sync_del)
|
||||
|
||||
conn = engine.connect()
|
||||
with conn.begin(): # these deletes in a transaction
|
||||
# delete the stacks
|
||||
stack_del = stack.delete().where(stack.c.id.in_(stack_ids))
|
||||
engine.execute(stack_del)
|
||||
conn.execute(stack_del)
|
||||
# delete orphaned raw templates
|
||||
raw_template_ids = [i[1] for i in stacks if i[1] is not None]
|
||||
raw_template_ids.extend(i[2] for i in stacks if i[2] is not None)
|
||||
if raw_template_ids:
|
||||
# keep those still referenced
|
||||
raw_template_ids = [i[1] for i in stack_infos if i[1] is not None]
|
||||
raw_template_ids.extend(i[2] for i in stack_infos if i[2] is not None)
|
||||
if raw_template_ids: # keep those still referenced
|
||||
raw_tmpl_sel = sqlalchemy.select([stack.c.raw_template_id]).where(
|
||||
stack.c.raw_template_id.in_(raw_template_ids))
|
||||
raw_tmpl = [i[0] for i in engine.execute(raw_tmpl_sel)]
|
||||
raw_tmpl = [i[0] for i in conn.execute(raw_tmpl_sel)]
|
||||
raw_template_ids = set(raw_template_ids) - set(raw_tmpl)
|
||||
if raw_template_ids: # keep those still referenced (previous tmpl)
|
||||
raw_tmpl_sel = sqlalchemy.select(
|
||||
[stack.c.prev_raw_template_id]).where(
|
||||
stack.c.prev_raw_template_id.in_(raw_template_ids))
|
||||
raw_tmpl = [i[0] for i in engine.execute(raw_tmpl_sel)]
|
||||
raw_tmpl = [i[0] for i in conn.execute(raw_tmpl_sel)]
|
||||
raw_template_ids = raw_template_ids - set(raw_tmpl)
|
||||
if raw_template_ids: # delete raw_templates if we have any
|
||||
raw_tmpl_file_sel = sqlalchemy.select(
|
||||
[raw_template.c.files_id]).where(
|
||||
raw_template.c.id.in_(raw_template_ids))
|
||||
raw_tmpl_file_ids = [i[0] for i in engine.execute(
|
||||
raw_tmpl_file_ids = [i[0] for i in conn.execute(
|
||||
raw_tmpl_file_sel)]
|
||||
raw_templ_del = raw_template.delete().where(
|
||||
raw_template.c.id.in_(raw_template_ids))
|
||||
engine.execute(raw_templ_del)
|
||||
# purge any raw_template_files that are no longer referenced
|
||||
if raw_tmpl_file_ids:
|
||||
conn.execute(raw_templ_del)
|
||||
if raw_tmpl_file_ids: # keep _files still referenced
|
||||
raw_tmpl_file_sel = sqlalchemy.select(
|
||||
[raw_template.c.files_id]).where(
|
||||
raw_template.c.files_id.in_(raw_tmpl_file_ids))
|
||||
raw_tmpl_files = [i[0] for i in engine.execute(
|
||||
raw_tmpl_files = [i[0] for i in conn.execute(
|
||||
raw_tmpl_file_sel)]
|
||||
raw_tmpl_file_ids = set(raw_tmpl_file_ids) \
|
||||
- set(raw_tmpl_files)
|
||||
if raw_tmpl_file_ids: # delete _files if we have any
|
||||
raw_tmpl_file_del = raw_template_files.delete().where(
|
||||
raw_template_files.c.id.in_(raw_tmpl_file_ids))
|
||||
engine.execute(raw_tmpl_file_del)
|
||||
conn.execute(raw_tmpl_file_del)
|
||||
# purge any user creds that are no longer referenced
|
||||
user_creds_ids = [i[3] for i in stacks if i[3] is not None]
|
||||
if user_creds_ids:
|
||||
# keep those still referenced
|
||||
user_creds_ids = [i[3] for i in stack_infos if i[3] is not None]
|
||||
if user_creds_ids: # keep those still referenced
|
||||
user_sel = sqlalchemy.select([stack.c.user_creds_id]).where(
|
||||
stack.c.user_creds_id.in_(user_creds_ids))
|
||||
users = [i[0] for i in engine.execute(user_sel)]
|
||||
users = [i[0] for i in conn.execute(user_sel)]
|
||||
user_creds_ids = set(user_creds_ids) - set(users)
|
||||
if user_creds_ids: # delete if we have any
|
||||
usr_creds_del = user_creds.delete().where(
|
||||
user_creds.c.id.in_(user_creds_ids))
|
||||
engine.execute(usr_creds_del)
|
||||
# Purge deleted services
|
||||
srvc_del = service.delete().where(service.c.deleted_at < time_line)
|
||||
engine.execute(srvc_del)
|
||||
conn.execute(usr_creds_del)
|
||||
|
||||
|
||||
def sync_point_delete_all_by_stack_and_traversal(context, stack_id,
|
||||
|
|
|
@ -43,8 +43,8 @@ IMPL = LazyPluggable('backend',
|
|||
sqlalchemy='heat.db.sqlalchemy.api')
|
||||
|
||||
|
||||
def purge_deleted(age, granularity='days', project_id=None):
|
||||
IMPL.purge_deleted(age, granularity, project_id)
|
||||
def purge_deleted(age, granularity='days', project_id=None, batch_size=20):
|
||||
IMPL.purge_deleted(age, granularity, project_id, batch_size)
|
||||
|
||||
|
||||
def encrypt_parameters_and_properties(ctxt, encryption_key, verbose):
|
||||
|
|
|
@ -2171,6 +2171,18 @@ class DBAPIStackTest(common.HeatTestCase):
|
|||
self.assertIsNone(db_api.user_creds_get(
|
||||
self.ctx, stacks[s].user_creds_id))
|
||||
|
||||
def test_purge_deleted_batch_arg(self):
|
||||
now = timeutils.utcnow()
|
||||
delta = datetime.timedelta(seconds=3600)
|
||||
deleted = now - delta
|
||||
for i in range(7):
|
||||
create_stack(self.ctx, self.template, self.user_creds,
|
||||
deleted_at=deleted)
|
||||
|
||||
with mock.patch('heat.db.sqlalchemy.api._purge_stacks') as mock_ps:
|
||||
db_api.purge_deleted(age=0, batch_size=2)
|
||||
self.assertEqual(4, mock_ps.call_count)
|
||||
|
||||
def test_stack_get_root_id(self):
|
||||
root = create_stack(self.ctx, self.template, self.user_creds,
|
||||
name='root stack')
|
||||
|
|
Loading…
Reference in New Issue