Merge "Deleting the expired execution with batch size"

This commit is contained in:
Jenkins 2017-03-10 11:31:52 +00:00 committed by Gerrit Code Review
commit e62d30c305
5 changed files with 158 additions and 31 deletions

View File

@ -223,6 +223,14 @@ execution_expiration_policy_opts = [
'Minimum value is 1. '
'Note that only final state execution will remove '
'( SUCCESS / ERROR ).')
),
cfg.IntOpt(
'batch_size',
default=0,
help=_('Size of batch of expired executions to be deleted.'
'The default value is 0. If it is set to 0, '
'size of batch is total number of expired executions'
'that is going to be deleted.')
)
]

View File

@ -393,8 +393,8 @@ def get_next_cron_triggers(time):
return IMPL.get_next_cron_triggers(time)
def get_expired_executions(time):
return IMPL.get_expired_executions(time)
def get_expired_executions(time, limit=None):
return IMPL.get_expired_executions(time, limit=limit)
def create_cron_trigger(values):

View File

@ -1041,7 +1041,7 @@ def delete_delayed_calls(session=None, **kwargs):
@b.session_aware()
def get_expired_executions(time, session=None):
def get_expired_executions(time, session=None, limit=None):
query = b.model_query(models.WorkflowExecution)
# Only WorkflowExecution that are not a child of other WorkflowExecution.
@ -1056,6 +1056,9 @@ def get_expired_executions(time, session=None):
)
)
if limit:
query = query.limit(limit)
return query.all()

View File

@ -57,40 +57,68 @@ class ExecutionExpirationPolicy(periodic_task.PeriodicTasks):
"is not configured or older_than < '1'.")
def _delete_expired_executions(batch_size, expired_time):
if batch_size:
while True:
with db_api.transaction():
# TODO(gpaz): In the future should use generic method with
# filters params and not specific method that filter by time.
expired_executions = db_api.get_expired_executions(
expired_time,
limit=batch_size
)
if not expired_executions:
break
_delete(expired_executions)
else:
with db_api.transaction():
expired_executions = db_api.get_expired_executions(expired_time)
_delete(expired_executions)
def _delete(exp_executions):
for execution in exp_executions:
try:
# Setup project_id for _secure_query delete execution.
# TODO(tuan_luong): Manipulation with auth_ctx should be
# out of db transaction scope.
ctx = auth_ctx.MistralContext(
user_id=None,
project_id=execution.project_id,
auth_token=None,
is_admin=True
)
auth_ctx.set_ctx(ctx)
LOG.debug(
'DELETE execution id : %s from date : %s '
'according to expiration policy',
execution.id,
execution.updated_at
)
db_api.delete_workflow_execution(execution.id)
except Exception as e:
msg = ("Failed to delete [execution_id=%s]\n %s"
% (execution.id, traceback.format_exc(e)))
LOG.warning(msg)
finally:
auth_ctx.set_ctx(None)
def run_execution_expiration_policy(self, ctx):
LOG.debug("Starting expiration policy task.")
LOG.debug("Starting expiration policy.")
older_than = CONF.execution_expiration_policy.older_than
exp_time = (datetime.datetime.utcnow()
exp_time = (datetime.datetime.now()
- datetime.timedelta(minutes=older_than))
with db_api.transaction():
# TODO(gpaz): In the future should use generic method with
# filters params and not specific method that filter by time.
for execution in db_api.get_expired_executions(exp_time):
try:
# Setup project_id for _secure_query delete execution.
ctx = auth_ctx.MistralContext(
user_id=None,
project_id=execution.project_id,
auth_token=None,
is_admin=True
)
auth_ctx.set_ctx(ctx)
batch_size = CONF.execution_expiration_policy.batch_size
LOG.debug(
'DELETE execution id : %s from date : %s '
'according to expiration policy',
execution.id,
execution.updated_at
)
db_api.delete_workflow_execution(execution.id)
except Exception as e:
msg = ("Failed to delete [execution_id=%s]\n %s"
% (execution.id, traceback.format_exc(e)))
LOG.warning(msg)
finally:
auth_ctx.set_ctx(None)
# The default value of batch size is 0
# If it is not set, size of batch will be the size
# of total number of expired executions.
_delete_expired_executions(batch_size, exp_time)
def setup():

View File

@ -159,6 +159,94 @@ class ExpirationPolicyTest(base.DbTestCase):
self.assertEqual(0, len(execs))
def test_deletion_of_expired_executions_with_batch_size_scenario1(self):
# Scenario1 test will test with batch_size of 3,
# 5 expired executions and different values of "older_than"
# which is 30 and 5 minutes respectively.
# Expected_result: All expired executions are successfully deleted.
cfg.CONF.set_default(
'batch_size',
3,
group='execution_expiration_policy'
)
# Since we are removing other projects execution,
# we want to load the executions with other project_id.
_switch_context('non_admin_project', False)
_create_workflow_executions()
# Switch context to Admin since expiration policy running as Admin.
_switch_context(None, True)
_set_expiration_policy_config(1, 30)
# Call for all expired wfs execs.
now = datetime.datetime.now()
execs = db_api.get_expired_executions(now)
# Should be only 5, the RUNNING execution shouldn't return,
# so the child wf (that has parent task id).
self.assertEqual(5, len(execs))
older_than = cfg.CONF.execution_expiration_policy.older_than
exp_time = (datetime.datetime.now()
- datetime.timedelta(minutes=older_than))
batch_size = cfg.CONF.execution_expiration_policy.batch_size
expiration_policy._delete_expired_executions(
batch_size,
exp_time
)
execs = db_api.get_expired_executions(now)
self.assertEqual(2, len(execs))
_set_expiration_policy_config(1, 5)
expiration_policy.run_execution_expiration_policy(self, ctx)
execs = db_api.get_expired_executions(now)
self.assertEqual(0, len(execs))
def test_deletion_of_expired_executions_with_batch_size_scenario2(self):
# Scenario2 will test with batch_size of 2, 5 expired executions
# with value of "older_than" that is 5 minutes.
# Expected_result: All expired executions are successfully deleted.
cfg.CONF.set_default(
'batch_size',
2,
group='execution_expiration_policy'
)
# Since we are removing other projects execution,
# we want to load the executions with other project_id.
_switch_context('non_admin_project', False)
_create_workflow_executions()
# Switch context to Admin since expiration policy running as Admin.
_switch_context(None, True)
_set_expiration_policy_config(1, 5)
# Call for all expired wfs execs.
now = datetime.datetime.now()
execs = db_api.get_expired_executions(now)
# Should be only 5, the RUNNING execution shouldn't return,
# so the child wf (that has parent task id).
self.assertEqual(5, len(execs))
older_than = cfg.CONF.execution_expiration_policy.older_than
exp_time = (datetime.datetime.now()
- datetime.timedelta(minutes=older_than))
batch_size = cfg.CONF.execution_expiration_policy.batch_size
expiration_policy._delete_expired_executions(
batch_size,
exp_time
)
execs = db_api.get_expired_executions(now)
self.assertEqual(0, len(execs))
def test_negative_wrong_conf_values(self):
_set_expiration_policy_config(None, None)
e_policy = expiration_policy.ExecutionExpirationPolicy(cfg.CONF)