diff --git a/doc/source/main_features.rst b/doc/source/main_features.rst index 46f3f850..e5383036 100644 --- a/doc/source/main_features.rst +++ b/doc/source/main_features.rst @@ -288,27 +288,37 @@ can be resolved by setting an expiration policy. **By default this feature is disabled.** -When enabled, the policy will define the maximum age of an execution in -minutes since the last updated time. To enable and set a policy, edit the -Mistral configuration file and specify ``older_than`` and -``evaluation_interval`` in minutes. +This policy defines the maximum age of an execution since the last updated time +(in minutes) and the maximum number of finished executions. Each evaluation will +satisfy these conditions, so the expired executions (older than specified) will +be deleted, and the number of execution in finished state (regardless of expiration) +will be limited to max_finished_executions. + +To enable the policy, edit the Mistral configuration file and specify +``evaluation_interval`` and at least one of the ``older_than`` +or ``evaluation_interval`` options. .. code-block:: cfg [execution_expiration_policy] - older_than = 10080 # 1 week evaluation_interval = 120 # 2 hours - -For the expiration policy to be enabled, both of these configuration options -must be set. - -- **older_than** - - This defines the maximum age of an execution in minutes since it was last - updated. It must be greater or equal to ``1``. + older_than = 10080 # 1 week + max_finished_executions = 500 - **evaluation_interval** - The evaluation interval defines how frequently Mistral will check and expire - old executions. In the above example it is set to two hours, so every two - hours Mistral will clean up and look for expired executions. + The evaluation interval defines how frequently Mistral will check and ensure + the above mentioned constraints. In the above example it is set to two hours, + so every two hours Mistral will remove executions older than 1 week, and + keep only the 500 latest finished executions. + +- **older_than** + + Defines the maximum age of an execution in minutes since it was last + updated. It must be greater or equal to ``1``. + +- **max_finished_executions** + + Defines the maximum number of finished executions. + It must be greater or equal to ``1``. + diff --git a/mistral/config.py b/mistral/config.py index 4bb12e2c..335a5a38 100644 --- a/mistral/config.py +++ b/mistral/config.py @@ -213,16 +213,27 @@ execution_expiration_policy_opts = [ 'evaluation_interval', help=_('How often will the executions be evaluated ' '(in minutes). For example for value 120 the interval ' - 'will be 2 hours (every 2 hours).') + 'will be 2 hours (every 2 hours).' + 'Note that only final state executions will be removed: ' + '( SUCCESS / ERROR / CANCELLED ).') ), cfg.IntOpt( 'older_than', help=_('Evaluate from which time remove executions in minutes. ' 'For example when older_than = 60, remove all executions ' 'that finished a 60 minutes ago or more. ' - 'Minimum value is 1. ' - 'Note that only final state execution will remove ' - '( SUCCESS / ERROR ).') + 'Minimum value is 1.') + ), + cfg.IntOpt( + 'max_finished_executions', + default=0, + help=_('The maximum number of finished workflow executions' + 'to be stored. For example when max_finished_executions = 100,' + 'only the 100 latest finished executions will be preserved.' + 'This means that even unexpired executions are eligible' + 'for deletion, to decrease the number of executions in the' + 'database. The default value is 0. If it is set to 0,' + 'this constraint won\'t be applied.') ), cfg.IntOpt( 'batch_size', diff --git a/mistral/db/v2/api.py b/mistral/db/v2/api.py index 9b36e850..9e5a615c 100644 --- a/mistral/db/v2/api.py +++ b/mistral/db/v2/api.py @@ -393,8 +393,14 @@ def get_next_cron_triggers(time): return IMPL.get_next_cron_triggers(time) -def get_expired_executions(time, limit=None): - return IMPL.get_expired_executions(time, limit=limit) +def get_executions_to_clean(expiration_time, limit=None, + max_finished_executions=None, columns=()): + return IMPL.get_executions_to_clean( + expiration_time, + limit, + max_finished_executions, + columns + ) def create_cron_trigger(values): diff --git a/mistral/db/v2/sqlalchemy/api.py b/mistral/db/v2/sqlalchemy/api.py index 2835125e..5f991d67 100644 --- a/mistral/db/v2/sqlalchemy/api.py +++ b/mistral/db/v2/sqlalchemy/api.py @@ -35,6 +35,7 @@ from mistral.db.sqlalchemy import sqlite_lock from mistral.db import utils as m_dbutils from mistral.db.v2.sqlalchemy import filters as db_filters from mistral.db.v2.sqlalchemy import models +from mistral.db.v2.sqlalchemy.models import WorkflowExecution from mistral import exceptions as exc from mistral.services import security from mistral import utils @@ -1034,13 +1035,37 @@ def delete_delayed_calls(session=None, **kwargs): @b.session_aware() -def get_expired_executions(time, session=None, limit=None): - query = b.model_query(models.WorkflowExecution) +def get_executions_to_clean(expiration_time, limit=None, + max_finished_executions=None, columns=(), + session=None): + # Get the ids of the executions that won't be deleted. + # These are the not expired executions, + # limited by the new max_finished_executions constraint. + query = _get_completed_root_executions_query((WorkflowExecution.id,)) + query = query.filter( + models.WorkflowExecution.updated_at >= expiration_time + ) + query = query.order_by(models.WorkflowExecution.updated_at.desc()) + if max_finished_executions: + query = query.limit(max_finished_executions) + + # And take the inverse of that set. + inverse = _get_completed_root_executions_query(columns) + inverse = inverse.filter(~WorkflowExecution.id.in_(query)) + inverse = inverse.order_by(models.WorkflowExecution.updated_at.asc()) + + if limit: + inverse.limit(limit) + + return inverse.all() + + +def _get_completed_root_executions_query(columns): + query = b.model_query(models.WorkflowExecution, columns=columns) # Only WorkflowExecution that are not a child of other WorkflowExecution. query = query.filter(models.WorkflowExecution. task_execution_id == sa.null()) - query = query.filter(models.WorkflowExecution.updated_at < time) query = query.filter( sa.or_( models.WorkflowExecution.state == states.SUCCESS, @@ -1048,14 +1073,8 @@ def get_expired_executions(time, session=None, limit=None): models.WorkflowExecution.state == states.CANCELLED ) ) + return query - if limit: - query = query.limit(limit) - - return query.all() - - -# Cron triggers. @b.session_aware() def get_cron_trigger(name, session=None): diff --git a/mistral/services/expiration_policy.py b/mistral/services/expiration_policy.py index d9fbd5bb..89586c1d 100644 --- a/mistral/services/expiration_policy.py +++ b/mistral/services/expiration_policy.py @@ -41,9 +41,10 @@ class ExecutionExpirationPolicy(periodic_task.PeriodicTasks): super(ExecutionExpirationPolicy, self).__init__(conf) interval = CONF.execution_expiration_policy.evaluation_interval - older_than = CONF.execution_expiration_policy.older_than + ot = CONF.execution_expiration_policy.older_than + mfe = CONF.execution_expiration_policy.max_finished_executions - if (interval and older_than and older_than >= 1): + if interval and ((ot and ot >= 1) or (mfe and mfe >= 1)): _periodic_task = periodic_task.periodic_task( spacing=interval * 60, run_immediately=True @@ -54,31 +55,29 @@ class ExecutionExpirationPolicy(periodic_task.PeriodicTasks): ) else: LOG.debug("Expiration policy disabled. Evaluation_interval " - "is not configured or older_than < '1'.") + "is not configured or both older_than and " + "max_finished_executions < '1'.") -def _delete_expired_executions(batch_size, expired_time): - if batch_size: - while True: - with db_api.transaction(): - # TODO(gpaz): In the future should use generic method with - # filters params and not specific method that filter by time. - expired_executions = db_api.get_expired_executions( - expired_time, - limit=batch_size - ) - - if not expired_executions: - break - _delete(expired_executions) - else: +def _delete_executions(batch_size, expiration_time, + max_finished_executions): + while True: with db_api.transaction(): - expired_executions = db_api.get_expired_executions(expired_time) - _delete(expired_executions) + # TODO(gpaz): In the future should use generic method with + # filters params and not specific method that filter by time. + execs = db_api.get_executions_to_clean( + expiration_time, + limit=batch_size, + max_finished_executions=max_finished_executions + ) + + if not execs: + break + _delete(execs) -def _delete(exp_executions): - for execution in exp_executions: +def _delete(executions): + for execution in executions: try: # Setup project_id for _secure_query delete execution. # TODO(tuan_luong): Manipulation with auth_ctx should be @@ -110,15 +109,16 @@ def run_execution_expiration_policy(self, ctx): LOG.debug("Starting expiration policy.") older_than = CONF.execution_expiration_policy.older_than - exp_time = (datetime.datetime.now() + exp_time = (datetime.datetime.utcnow() - datetime.timedelta(minutes=older_than)) batch_size = CONF.execution_expiration_policy.batch_size + max_executions = CONF.execution_expiration_policy.max_finished_executions # The default value of batch size is 0 # If it is not set, size of batch will be the size # of total number of expired executions. - _delete_expired_executions(batch_size, exp_time) + _delete_executions(batch_size, exp_time, max_executions) def setup(): diff --git a/mistral/tests/unit/services/test_expiration_policy.py b/mistral/tests/unit/services/test_expiration_policy.py new file mode 100644 index 00000000..4a719c5a --- /dev/null +++ b/mistral/tests/unit/services/test_expiration_policy.py @@ -0,0 +1,409 @@ +# Copyright 2015 - Alcatel-lucent, Inc. +# Copyright 2015 - StackStorm, Inc. +# Copyright 2016 - Brocade Communications Systems, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import datetime + +from mistral import context as ctx +from mistral.db.v2 import api as db_api +from mistral.services import expiration_policy +from mistral.services.expiration_policy import ExecutionExpirationPolicy +from mistral.tests.unit import base +from mistral.tests.unit.base import get_context +from oslo_config import cfg + + +def _create_workflow_executions(): + time_now = datetime.datetime.utcnow() + + wf_execs = [ + { + 'id': 'success_expired', + 'name': 'success_expired', + 'created_at': time_now - datetime.timedelta(minutes=60), + 'updated_at': time_now - datetime.timedelta(minutes=59), + 'workflow_name': 'test_exec', + 'state': "SUCCESS", + }, + { + 'id': 'error_expired', + 'name': 'error_expired', + 'created_at': time_now - datetime.timedelta(days=3, minutes=10), + 'updated_at': time_now - datetime.timedelta(days=3), + 'workflow_name': 'test_exec', + 'state': "ERROR", + }, + { + 'id': 'running_not_expired', + 'name': 'running_not_expired', + 'created_at': time_now - datetime.timedelta(days=3, minutes=10), + 'updated_at': time_now - datetime.timedelta(days=3), + 'workflow_name': 'test_exec', + 'state': "RUNNING", + }, + { + 'id': 'running_not_expired2', + 'name': 'running_not_expired2', + 'created_at': time_now - datetime.timedelta(days=3, minutes=10), + 'updated_at': time_now - datetime.timedelta(days=4), + 'workflow_name': 'test_exec', + 'state': "RUNNING", + }, + { + 'id': 'success_not_expired', + 'name': 'success_not_expired', + 'created_at': time_now - datetime.timedelta(minutes=15), + 'updated_at': time_now - datetime.timedelta(minutes=5), + 'workflow_name': 'test_exec', + 'state': "SUCCESS", + }, + { + 'id': 'abc', + 'name': 'cancelled_expired', + 'created_at': time_now - datetime.timedelta(minutes=60), + 'updated_at': time_now - datetime.timedelta(minutes=59), + 'workflow_name': 'test_exec', + 'state': "CANCELLED", + }, + { + 'id': 'cancelled_not_expired', + 'name': 'cancelled_not_expired', + 'created_at': time_now - datetime.timedelta(minutes=15), + 'updated_at': time_now - datetime.timedelta(minutes=6), + 'workflow_name': 'test_exec', + 'state': "CANCELLED", + } + ] + + for wf_exec in wf_execs: + db_api.create_workflow_execution(wf_exec) + + # Create a nested workflow execution. + + db_api.create_task_execution( + { + 'id': 'running_not_expired', + 'workflow_execution_id': 'success_not_expired', + 'name': 'my_task' + } + ) + + db_api.create_workflow_execution( + { + 'id': 'expired_but_not_a_parent', + 'name': 'expired_but_not_a_parent', + 'created_at': time_now - datetime.timedelta(days=15), + 'updated_at': time_now - datetime.timedelta(days=10), + 'workflow_name': 'test_exec', + 'state': "SUCCESS", + 'task_execution_id': 'running_not_expired' + } + ) + + +def _switch_context(is_default, is_admin): + ctx.set_ctx(get_context(is_default, is_admin)) + + +class ExpirationPolicyTest(base.DbTestCase): + def test_expiration_policy_for_executions_with_different_project_id(self): + # Delete execution uses a secured filtering and we need + # to verify that admin able to do that for other projects. + cfg.CONF.set_default('auth_enable', True, group='pecan') + + # Since we are removing other projects execution, + # we want to load the executions with other project_id. + _switch_context(False, False) + + _create_workflow_executions() + + now = datetime.datetime.utcnow() + + # This execution has a parent wf and testing that we are + # querying only for parent wfs. + exec_child = db_api.get_workflow_execution('expired_but_not_a_parent') + + self.assertEqual('running_not_expired', exec_child.task_execution_id) + + # Call for all expired wfs execs. + execs = db_api.get_executions_to_clean(now) + + # Should be only 5, the RUNNING execution shouldn't return, + # so the child wf (that has parent task id). + self.assertEqual(5, len(execs)) + + # Switch context to Admin since expiration policy running as Admin. + _switch_context(True, True) + + _set_expiration_policy_config(1, 30, None) + expiration_policy.run_execution_expiration_policy(self, ctx) + + # Only non_expired available (update_at < older_than). + execs = db_api.get_executions_to_clean(now) + + self.assertEqual(2, len(execs)) + self.assertListEqual( + [ + 'cancelled_not_expired', + 'success_not_expired' + ], + sorted([ex.id for ex in execs]) + ) + + _set_expiration_policy_config(1, 5, None) + expiration_policy.run_execution_expiration_policy(self, ctx) + execs = db_api.get_executions_to_clean(now) + + self.assertEqual(0, len(execs)) + + def test_deletion_of_expired_executions_with_batch_size_scenario1(self): + """scenario1 + + This test will use batch_size of 3, + 5 expired executions and different values of "older_than" + which is 30 and 5 minutes respectively. + Expected_result: All expired executions are successfully deleted. + """ + + cfg.CONF.set_default( + 'batch_size', + 3, + group='execution_expiration_policy' + ) + + _create_workflow_executions() + + _set_expiration_policy_config(1, 30, None) + + # Call for all expired wfs execs. + now = datetime.datetime.utcnow() + execs = db_api.get_executions_to_clean(now) + + # Should be only 5, the RUNNING execution shouldn't return, + # so the child wf (that has parent task id). + self.assertEqual(5, len(execs)) + + older_than = cfg.CONF.execution_expiration_policy.older_than + exp_time = (datetime.datetime.utcnow() + - datetime.timedelta(minutes=older_than)) + batch_size = cfg.CONF.execution_expiration_policy.batch_size + mfe = cfg.CONF.execution_expiration_policy.max_finished_executions + expiration_policy._delete_executions( + batch_size, + exp_time, + mfe + ) + execs = db_api.get_executions_to_clean(now) + self.assertEqual(2, len(execs)) + + _set_expiration_policy_config(1, 5, None) + expiration_policy.run_execution_expiration_policy(self, ctx) + execs = db_api.get_executions_to_clean(now) + self.assertEqual(0, len(execs)) + + def test_deletion_of_expired_executions_with_batch_size_scenario2(self): + """scenario2 + + This test will use batch_size of 2, 5 expired executions + with value of "older_than" that is 5 minutes. + Expected_result: All expired executions are successfully deleted. + """ + + cfg.CONF.set_default( + 'batch_size', + 2, + group='execution_expiration_policy' + ) + + _create_workflow_executions() + + _set_expiration_policy_config(1, 5, None) + + # Call for all expired wfs execs. + now = datetime.datetime.utcnow() + execs = db_api.get_executions_to_clean(now) + + # Should be only 5, the RUNNING execution shouldn't return, + # so the child wf (that has parent task id). + self.assertEqual(5, len(execs)) + + older_than = cfg.CONF.execution_expiration_policy.older_than + exp_time = (datetime.datetime.utcnow() + - datetime.timedelta(minutes=older_than)) + batch_size = cfg.CONF.execution_expiration_policy.batch_size + mfe = cfg.CONF.execution_expiration_policy.max_finished_executions + expiration_policy._delete_executions( + batch_size, + exp_time, + mfe + ) + execs = db_api.get_executions_to_clean(now) + self.assertEqual(0, len(execs)) + + def test_expiration_policy_for_executions_with_max_executions_scen1(self): + """scenario1 + + Tests the max_executions logic with + max_finished_executions = + 'total not expired and completed executions' - 1 + """ + + _create_workflow_executions() + + now = datetime.datetime.utcnow() + + # Call for all expired wfs execs. + execs = db_api.get_executions_to_clean(now) + + # Should be only 5, the RUNNING execution shouldn't return, + # so the child wf (that has parent task id). + self.assertEqual(5, len(execs)) + + _set_expiration_policy_config(1, 30, 1) + expiration_policy.run_execution_expiration_policy(self, ctx) + + # Assert the two running executions + # (running_not_expired, running_not_expired2), + # the sub execution (expired_but_not_a_parent) and the one allowed + # finished execution (success_not_expired) are there. + execs = db_api.get_workflow_executions() + self.assertEqual(4, len(execs)) + self.assertListEqual( + [ + 'expired_but_not_a_parent', + 'running_not_expired', + 'running_not_expired2', + 'success_not_expired' + ], + sorted([ex.id for ex in execs]) + ) + + def test_expiration_policy_for_executions_with_max_executions_scen2(self): + """scenario2 + + Tests the max_executions logic with: + max_finished_executions > total completed executions + """ + + _create_workflow_executions() + + now = datetime.datetime.utcnow() + + # Call for all expired wfs execs. + execs = db_api.get_executions_to_clean(now) + + # Should be only 5, the RUNNING execution shouldn't return, + # so the child wf (that has parent task id). + self.assertEqual(5, len(execs)) + + _set_expiration_policy_config(1, 30, 100) + expiration_policy.run_execution_expiration_policy(self, ctx) + + # Assert the two running executions + # (running_not_expired, running_not_expired2), the sub execution + # (expired_but_not_a_parent) and the all finished execution + # (success_not_expired, 'cancelled_not_expired') are there. + execs = db_api.get_workflow_executions() + self.assertEqual(5, len(execs)) + self.assertListEqual( + [ + 'cancelled_not_expired', + 'expired_but_not_a_parent', + 'running_not_expired', + 'running_not_expired2', + 'success_not_expired' + ], + sorted([ex.id for ex in execs]) + ) + + def test_negative_wrong_conf_values(self): + _set_expiration_policy_config(None, None, None) + e_policy = expiration_policy.ExecutionExpirationPolicy(cfg.CONF) + + self.assertDictEqual({}, e_policy._periodic_spacing) + self.assertListEqual([], e_policy._periodic_tasks) + + _set_expiration_policy_config(None, 60, None) + e_policy = expiration_policy.ExecutionExpirationPolicy(cfg.CONF) + + self.assertDictEqual({}, e_policy._periodic_spacing) + self.assertListEqual([], e_policy._periodic_tasks) + + _set_expiration_policy_config(60, None, None) + e_policy = expiration_policy.ExecutionExpirationPolicy(cfg.CONF) + + self.assertDictEqual({}, e_policy._periodic_spacing) + self.assertListEqual([], e_policy._periodic_tasks) + + def test_periodic_task_parameters(self): + _set_expiration_policy_config(17, 13, None) + + e_policy = expiration_policy.ExecutionExpirationPolicy(cfg.CONF) + + self.assertEqual( + 17 * 60, + e_policy._periodic_spacing['run_execution_expiration_policy'] + ) + + def test_periodic_task_scheduling(self): + def _assert_scheduling(expiration_policy_config, should_schedule): + ExecutionExpirationPolicy._periodic_tasks = [] + _set_expiration_policy_config(*expiration_policy_config) + e_policy = expiration_policy.ExecutionExpirationPolicy(cfg.CONF) + + if should_schedule: + self.assertTrue( + e_policy._periodic_tasks, + "Periodic task should have been created." + ) + else: + self.assertFalse( + e_policy._periodic_tasks, + "Periodic task shouldn't have been created." + ) + + _assert_scheduling([1, 1, None], True) + _assert_scheduling([1, None, 1], True) + _assert_scheduling([1, 1, 1], True) + _assert_scheduling([1, None, None], False) + _assert_scheduling([None, 1, 1], False) + + def tearDown(self): + """Restores the size limit config to default.""" + super(ExpirationPolicyTest, self).tearDown() + + cfg.CONF.set_default('auth_enable', False, group='pecan') + + ctx.set_ctx(None) + + _set_expiration_policy_config(None, None, None) + + +def _set_expiration_policy_config(evaluation_interval, older_than, mfe): + cfg.CONF.set_default( + 'evaluation_interval', + evaluation_interval, + group='execution_expiration_policy' + ) + cfg.CONF.set_default( + 'older_than', + older_than, + group='execution_expiration_policy' + ) + cfg.CONF.set_default( + 'max_finished_executions', + mfe, + group='execution_expiration_policy' + ) diff --git a/mistral/tests/unit/services/test_expired_executions_policy.py b/mistral/tests/unit/services/test_expired_executions_policy.py deleted file mode 100644 index 592c31db..00000000 --- a/mistral/tests/unit/services/test_expired_executions_policy.py +++ /dev/null @@ -1,298 +0,0 @@ -# Copyright 2015 - Alcatel-lucent, Inc. -# Copyright 2015 - StackStorm, Inc. -# Copyright 2016 - Brocade Communications Systems, Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import datetime - -from mistral import context as ctx -from mistral.db.v2 import api as db_api -from mistral.services import expiration_policy -from mistral.tests.unit import base -from oslo_config import cfg - - -def _create_workflow_executions(): - time_now = datetime.datetime.utcnow() - - wf_execs = [ - { - 'id': '123', - 'name': 'success_expired', - 'created_at': time_now - datetime.timedelta(minutes=60), - 'updated_at': time_now - datetime.timedelta(minutes=59), - 'workflow_name': 'test_exec', - 'state': "SUCCESS", - }, - { - 'id': '456', - 'name': 'error_expired', - 'created_at': time_now - datetime.timedelta(days=3, minutes=10), - 'updated_at': time_now - datetime.timedelta(days=3), - 'workflow_name': 'test_exec', - 'state': "ERROR", - }, - { - 'id': '789', - 'name': 'running_not_expired', - 'created_at': time_now - datetime.timedelta(days=3, minutes=10), - 'updated_at': time_now - datetime.timedelta(days=3), - 'workflow_name': 'test_exec', - 'state': "RUNNING", - }, - { - 'id': '987', - 'name': 'success_not_expired', - 'created_at': time_now - datetime.timedelta(minutes=15), - 'updated_at': time_now - datetime.timedelta(minutes=5), - 'workflow_name': 'test_exec', - 'state': "SUCCESS", - }, - { - 'id': 'abc', - 'name': 'cancelled_expired', - 'created_at': time_now - datetime.timedelta(minutes=60), - 'updated_at': time_now - datetime.timedelta(minutes=59), - 'workflow_name': 'test_exec', - 'state': "CANCELLED", - }, - { - 'id': 'def', - 'name': 'cancelled_not_expired', - 'created_at': time_now - datetime.timedelta(minutes=15), - 'updated_at': time_now - datetime.timedelta(minutes=5), - 'workflow_name': 'test_exec', - 'state': "CANCELLED", - } - ] - - for wf_exec in wf_execs: - db_api.create_workflow_execution(wf_exec) - - # Create a nested workflow execution. - - db_api.create_task_execution( - { - 'id': '789', - 'workflow_execution_id': '987', - 'name': 'my_task' - } - ) - - db_api.create_workflow_execution( - { - 'id': '654', - 'name': 'expired but not a parent', - 'created_at': time_now - datetime.timedelta(days=15), - 'updated_at': time_now - datetime.timedelta(days=10), - 'workflow_name': 'test_exec', - 'state': "SUCCESS", - 'task_execution_id': '789' - } - ) - - -def _switch_context(project_id, is_admin): - _ctx = ctx.MistralContext( - user_id=None, - project_id=project_id, - auth_token=None, - is_admin=is_admin - ) - - ctx.set_ctx(_ctx) - - -class ExpirationPolicyTest(base.DbTestCase): - def test_expiration_policy_for_executions(self): - # Delete execution uses a secured filtering and we need - # to verify that admin able to do that for other projects. - cfg.CONF.set_default('auth_enable', True, group='pecan') - - # Since we are removing other projects execution, - # we want to load the executions with other project_id. - _switch_context('non_admin_project', False) - - _create_workflow_executions() - - now = datetime.datetime.utcnow() - - # This execution has a parent wf and testing that we are - # querying only for parent wfs. - exec_child = db_api.get_workflow_execution('654') - - self.assertEqual('789', exec_child.task_execution_id) - - # Call for all expired wfs execs. - execs = db_api.get_expired_executions(now) - - # Should be only 5, the RUNNING execution shouldn't return, - # so the child wf (that has parent task id). - self.assertEqual(5, len(execs)) - - # Switch context to Admin since expiration policy running as Admin. - _switch_context(None, True) - - _set_expiration_policy_config(1, 30) - expiration_policy.run_execution_expiration_policy(self, ctx) - - # Only non_expired available (update_at < older_than). - execs = db_api.get_expired_executions(now) - - self.assertEqual(2, len(execs)) - self.assertListEqual(['987', 'def'], sorted([ex.id for ex in execs])) - - _set_expiration_policy_config(1, 5) - expiration_policy.run_execution_expiration_policy(self, ctx) - execs = db_api.get_expired_executions(now) - - self.assertEqual(0, len(execs)) - - def test_deletion_of_expired_executions_with_batch_size_scenario1(self): - # Scenario1 test will test with batch_size of 3, - # 5 expired executions and different values of "older_than" - # which is 30 and 5 minutes respectively. - # Expected_result: All expired executions are successfully deleted. - - cfg.CONF.set_default( - 'batch_size', - 3, - group='execution_expiration_policy' - ) - - # Since we are removing other projects execution, - # we want to load the executions with other project_id. - _switch_context('non_admin_project', False) - - _create_workflow_executions() - - # Switch context to Admin since expiration policy running as Admin. - _switch_context(None, True) - - _set_expiration_policy_config(1, 30) - - # Call for all expired wfs execs. - now = datetime.datetime.now() - execs = db_api.get_expired_executions(now) - - # Should be only 5, the RUNNING execution shouldn't return, - # so the child wf (that has parent task id). - self.assertEqual(5, len(execs)) - - older_than = cfg.CONF.execution_expiration_policy.older_than - exp_time = (datetime.datetime.now() - - datetime.timedelta(minutes=older_than)) - batch_size = cfg.CONF.execution_expiration_policy.batch_size - expiration_policy._delete_expired_executions( - batch_size, - exp_time - ) - execs = db_api.get_expired_executions(now) - self.assertEqual(2, len(execs)) - - _set_expiration_policy_config(1, 5) - expiration_policy.run_execution_expiration_policy(self, ctx) - execs = db_api.get_expired_executions(now) - self.assertEqual(0, len(execs)) - - def test_deletion_of_expired_executions_with_batch_size_scenario2(self): - # Scenario2 will test with batch_size of 2, 5 expired executions - # with value of "older_than" that is 5 minutes. - # Expected_result: All expired executions are successfully deleted. - - cfg.CONF.set_default( - 'batch_size', - 2, - group='execution_expiration_policy' - ) - - # Since we are removing other projects execution, - # we want to load the executions with other project_id. - _switch_context('non_admin_project', False) - - _create_workflow_executions() - - # Switch context to Admin since expiration policy running as Admin. - _switch_context(None, True) - - _set_expiration_policy_config(1, 5) - - # Call for all expired wfs execs. - now = datetime.datetime.now() - execs = db_api.get_expired_executions(now) - - # Should be only 5, the RUNNING execution shouldn't return, - # so the child wf (that has parent task id). - self.assertEqual(5, len(execs)) - - older_than = cfg.CONF.execution_expiration_policy.older_than - exp_time = (datetime.datetime.now() - - datetime.timedelta(minutes=older_than)) - batch_size = cfg.CONF.execution_expiration_policy.batch_size - expiration_policy._delete_expired_executions( - batch_size, - exp_time - ) - execs = db_api.get_expired_executions(now) - self.assertEqual(0, len(execs)) - - def test_negative_wrong_conf_values(self): - _set_expiration_policy_config(None, None) - e_policy = expiration_policy.ExecutionExpirationPolicy(cfg.CONF) - - self.assertDictEqual({}, e_policy._periodic_spacing) - self.assertListEqual([], e_policy._periodic_tasks) - - _set_expiration_policy_config(None, 60) - e_policy = expiration_policy.ExecutionExpirationPolicy(cfg.CONF) - - self.assertDictEqual({}, e_policy._periodic_spacing) - self.assertListEqual([], e_policy._periodic_tasks) - - _set_expiration_policy_config(60, None) - e_policy = expiration_policy.ExecutionExpirationPolicy(cfg.CONF) - - self.assertDictEqual({}, e_policy._periodic_spacing) - self.assertListEqual([], e_policy._periodic_tasks) - - def test_periodic_task_parameters(self): - _set_expiration_policy_config(17, 13) - - e_policy = expiration_policy.ExecutionExpirationPolicy(cfg.CONF) - - self.assertEqual(17 * 60, e_policy._periodic_spacing - ['run_execution_expiration_policy']) - - def tearDown(self): - """Restores the size limit config to default.""" - super(ExpirationPolicyTest, self).tearDown() - - cfg.CONF.set_default('auth_enable', False, group='pecan') - - ctx.set_ctx(None) - - _set_expiration_policy_config(None, None) - - -def _set_expiration_policy_config(evaluation_interval, older_than): - cfg.CONF.set_default( - 'evaluation_interval', - evaluation_interval, - group='execution_expiration_policy' - ) - cfg.CONF.set_default( - 'older_than', - older_than, - group='execution_expiration_policy' - )