From e67b272e2ff250f06995b6a34d6f35427061019d Mon Sep 17 00:00:00 2001 From: TuanLuong Date: Tue, 24 Jan 2017 12:31:03 +0100 Subject: [PATCH] Deleting the expired execution with batch size If there is a huge number of expired executions, only one sqlalchemy session could not afford it and it raises a DBExceptionError with timeout. This patch will truncate the expried executions by using batch size. Change-Id: I894db17b1c2fc5c292e7979ab46ea00fd14ff833 Closes-Bug: #1658946 --- mistral/config.py | 8 ++ mistral/db/v2/api.py | 4 +- mistral/db/v2/sqlalchemy/api.py | 5 +- mistral/services/expiration_policy.py | 84 ++++++++++++------ .../test_expired_executions_policy.py | 88 +++++++++++++++++++ 5 files changed, 158 insertions(+), 31 deletions(-) diff --git a/mistral/config.py b/mistral/config.py index 4ed2f536..0ae7073e 100644 --- a/mistral/config.py +++ b/mistral/config.py @@ -223,6 +223,14 @@ execution_expiration_policy_opts = [ 'Minimum value is 1. ' 'Note that only final state execution will remove ' '( SUCCESS / ERROR ).') + ), + cfg.IntOpt( + 'batch_size', + default=0, + help=_('Size of batch of expired executions to be deleted.' + 'The default value is 0. If it is set to 0, ' + 'size of batch is total number of expired executions' + 'that is going to be deleted.') ) ] diff --git a/mistral/db/v2/api.py b/mistral/db/v2/api.py index 8c4170de..9b36e850 100644 --- a/mistral/db/v2/api.py +++ b/mistral/db/v2/api.py @@ -393,8 +393,8 @@ def get_next_cron_triggers(time): return IMPL.get_next_cron_triggers(time) -def get_expired_executions(time): - return IMPL.get_expired_executions(time) +def get_expired_executions(time, limit=None): + return IMPL.get_expired_executions(time, limit=limit) def create_cron_trigger(values): diff --git a/mistral/db/v2/sqlalchemy/api.py b/mistral/db/v2/sqlalchemy/api.py index 4b6f65b2..b0f61c1f 100644 --- a/mistral/db/v2/sqlalchemy/api.py +++ b/mistral/db/v2/sqlalchemy/api.py @@ -1038,7 +1038,7 @@ def delete_delayed_calls(session=None, **kwargs): @b.session_aware() -def get_expired_executions(time, session=None): +def get_expired_executions(time, session=None, limit=None): query = b.model_query(models.WorkflowExecution) # Only WorkflowExecution that are not a child of other WorkflowExecution. @@ -1053,6 +1053,9 @@ def get_expired_executions(time, session=None): ) ) + if limit: + query = query.limit(limit) + return query.all() diff --git a/mistral/services/expiration_policy.py b/mistral/services/expiration_policy.py index 3daa7efe..d9fbd5bb 100644 --- a/mistral/services/expiration_policy.py +++ b/mistral/services/expiration_policy.py @@ -57,40 +57,68 @@ class ExecutionExpirationPolicy(periodic_task.PeriodicTasks): "is not configured or older_than < '1'.") +def _delete_expired_executions(batch_size, expired_time): + if batch_size: + while True: + with db_api.transaction(): + # TODO(gpaz): In the future should use generic method with + # filters params and not specific method that filter by time. + expired_executions = db_api.get_expired_executions( + expired_time, + limit=batch_size + ) + + if not expired_executions: + break + _delete(expired_executions) + else: + with db_api.transaction(): + expired_executions = db_api.get_expired_executions(expired_time) + _delete(expired_executions) + + +def _delete(exp_executions): + for execution in exp_executions: + try: + # Setup project_id for _secure_query delete execution. + # TODO(tuan_luong): Manipulation with auth_ctx should be + # out of db transaction scope. + ctx = auth_ctx.MistralContext( + user_id=None, + project_id=execution.project_id, + auth_token=None, + is_admin=True + ) + auth_ctx.set_ctx(ctx) + + LOG.debug( + 'DELETE execution id : %s from date : %s ' + 'according to expiration policy', + execution.id, + execution.updated_at + ) + db_api.delete_workflow_execution(execution.id) + except Exception as e: + msg = ("Failed to delete [execution_id=%s]\n %s" + % (execution.id, traceback.format_exc(e))) + LOG.warning(msg) + finally: + auth_ctx.set_ctx(None) + + def run_execution_expiration_policy(self, ctx): - LOG.debug("Starting expiration policy task.") + LOG.debug("Starting expiration policy.") older_than = CONF.execution_expiration_policy.older_than - exp_time = (datetime.datetime.utcnow() + exp_time = (datetime.datetime.now() - datetime.timedelta(minutes=older_than)) - with db_api.transaction(): - # TODO(gpaz): In the future should use generic method with - # filters params and not specific method that filter by time. - for execution in db_api.get_expired_executions(exp_time): - try: - # Setup project_id for _secure_query delete execution. - ctx = auth_ctx.MistralContext( - user_id=None, - project_id=execution.project_id, - auth_token=None, - is_admin=True - ) - auth_ctx.set_ctx(ctx) + batch_size = CONF.execution_expiration_policy.batch_size - LOG.debug( - 'DELETE execution id : %s from date : %s ' - 'according to expiration policy', - execution.id, - execution.updated_at - ) - db_api.delete_workflow_execution(execution.id) - except Exception as e: - msg = ("Failed to delete [execution_id=%s]\n %s" - % (execution.id, traceback.format_exc(e))) - LOG.warning(msg) - finally: - auth_ctx.set_ctx(None) + # The default value of batch size is 0 + # If it is not set, size of batch will be the size + # of total number of expired executions. + _delete_expired_executions(batch_size, exp_time) def setup(): diff --git a/mistral/tests/unit/services/test_expired_executions_policy.py b/mistral/tests/unit/services/test_expired_executions_policy.py index 29813ec6..c95bc1b6 100644 --- a/mistral/tests/unit/services/test_expired_executions_policy.py +++ b/mistral/tests/unit/services/test_expired_executions_policy.py @@ -159,6 +159,94 @@ class ExpirationPolicyTest(base.DbTestCase): self.assertEqual(0, len(execs)) + def test_deletion_of_expired_executions_with_batch_size_scenario1(self): + # Scenario1 test will test with batch_size of 3, + # 5 expired executions and different values of "older_than" + # which is 30 and 5 minutes respectively. + # Expected_result: All expired executions are successfully deleted. + + cfg.CONF.set_default( + 'batch_size', + 3, + group='execution_expiration_policy' + ) + + # Since we are removing other projects execution, + # we want to load the executions with other project_id. + _switch_context('non_admin_project', False) + + _create_workflow_executions() + + # Switch context to Admin since expiration policy running as Admin. + _switch_context(None, True) + + _set_expiration_policy_config(1, 30) + + # Call for all expired wfs execs. + now = datetime.datetime.now() + execs = db_api.get_expired_executions(now) + + # Should be only 5, the RUNNING execution shouldn't return, + # so the child wf (that has parent task id). + self.assertEqual(5, len(execs)) + + older_than = cfg.CONF.execution_expiration_policy.older_than + exp_time = (datetime.datetime.now() + - datetime.timedelta(minutes=older_than)) + batch_size = cfg.CONF.execution_expiration_policy.batch_size + expiration_policy._delete_expired_executions( + batch_size, + exp_time + ) + execs = db_api.get_expired_executions(now) + self.assertEqual(2, len(execs)) + + _set_expiration_policy_config(1, 5) + expiration_policy.run_execution_expiration_policy(self, ctx) + execs = db_api.get_expired_executions(now) + self.assertEqual(0, len(execs)) + + def test_deletion_of_expired_executions_with_batch_size_scenario2(self): + # Scenario2 will test with batch_size of 2, 5 expired executions + # with value of "older_than" that is 5 minutes. + # Expected_result: All expired executions are successfully deleted. + + cfg.CONF.set_default( + 'batch_size', + 2, + group='execution_expiration_policy' + ) + + # Since we are removing other projects execution, + # we want to load the executions with other project_id. + _switch_context('non_admin_project', False) + + _create_workflow_executions() + + # Switch context to Admin since expiration policy running as Admin. + _switch_context(None, True) + + _set_expiration_policy_config(1, 5) + + # Call for all expired wfs execs. + now = datetime.datetime.now() + execs = db_api.get_expired_executions(now) + + # Should be only 5, the RUNNING execution shouldn't return, + # so the child wf (that has parent task id). + self.assertEqual(5, len(execs)) + + older_than = cfg.CONF.execution_expiration_policy.older_than + exp_time = (datetime.datetime.now() + - datetime.timedelta(minutes=older_than)) + batch_size = cfg.CONF.execution_expiration_policy.batch_size + expiration_policy._delete_expired_executions( + batch_size, + exp_time + ) + execs = db_api.get_expired_executions(now) + self.assertEqual(0, len(execs)) + def test_negative_wrong_conf_values(self): _set_expiration_policy_config(None, None) e_policy = expiration_policy.ExecutionExpirationPolicy(cfg.CONF)