Merge "Limit the number of finished executions."
This commit is contained in:
commit
001c8b1c4e
@ -288,27 +288,37 @@ can be resolved by setting an expiration policy.
|
|||||||
|
|
||||||
**By default this feature is disabled.**
|
**By default this feature is disabled.**
|
||||||
|
|
||||||
When enabled, the policy will define the maximum age of an execution in
|
This policy defines the maximum age of an execution since the last updated time
|
||||||
minutes since the last updated time. To enable and set a policy, edit the
|
(in minutes) and the maximum number of finished executions. Each evaluation will
|
||||||
Mistral configuration file and specify ``older_than`` and
|
satisfy these conditions, so the expired executions (older than specified) will
|
||||||
``evaluation_interval`` in minutes.
|
be deleted, and the number of execution in finished state (regardless of expiration)
|
||||||
|
will be limited to max_finished_executions.
|
||||||
|
|
||||||
|
To enable the policy, edit the Mistral configuration file and specify
|
||||||
|
``evaluation_interval`` and at least one of the ``older_than``
|
||||||
|
or ``evaluation_interval`` options.
|
||||||
|
|
||||||
.. code-block:: cfg
|
.. code-block:: cfg
|
||||||
|
|
||||||
[execution_expiration_policy]
|
[execution_expiration_policy]
|
||||||
older_than = 10080 # 1 week
|
|
||||||
evaluation_interval = 120 # 2 hours
|
evaluation_interval = 120 # 2 hours
|
||||||
|
older_than = 10080 # 1 week
|
||||||
For the expiration policy to be enabled, both of these configuration options
|
max_finished_executions = 500
|
||||||
must be set.
|
|
||||||
|
|
||||||
- **older_than**
|
|
||||||
|
|
||||||
This defines the maximum age of an execution in minutes since it was last
|
|
||||||
updated. It must be greater or equal to ``1``.
|
|
||||||
|
|
||||||
- **evaluation_interval**
|
- **evaluation_interval**
|
||||||
|
|
||||||
The evaluation interval defines how frequently Mistral will check and expire
|
The evaluation interval defines how frequently Mistral will check and ensure
|
||||||
old executions. In the above example it is set to two hours, so every two
|
the above mentioned constraints. In the above example it is set to two hours,
|
||||||
hours Mistral will clean up and look for expired executions.
|
so every two hours Mistral will remove executions older than 1 week, and
|
||||||
|
keep only the 500 latest finished executions.
|
||||||
|
|
||||||
|
- **older_than**
|
||||||
|
|
||||||
|
Defines the maximum age of an execution in minutes since it was last
|
||||||
|
updated. It must be greater or equal to ``1``.
|
||||||
|
|
||||||
|
- **max_finished_executions**
|
||||||
|
|
||||||
|
Defines the maximum number of finished executions.
|
||||||
|
It must be greater or equal to ``1``.
|
||||||
|
|
||||||
|
@ -213,16 +213,27 @@ execution_expiration_policy_opts = [
|
|||||||
'evaluation_interval',
|
'evaluation_interval',
|
||||||
help=_('How often will the executions be evaluated '
|
help=_('How often will the executions be evaluated '
|
||||||
'(in minutes). For example for value 120 the interval '
|
'(in minutes). For example for value 120 the interval '
|
||||||
'will be 2 hours (every 2 hours).')
|
'will be 2 hours (every 2 hours).'
|
||||||
|
'Note that only final state executions will be removed: '
|
||||||
|
'( SUCCESS / ERROR / CANCELLED ).')
|
||||||
),
|
),
|
||||||
cfg.IntOpt(
|
cfg.IntOpt(
|
||||||
'older_than',
|
'older_than',
|
||||||
help=_('Evaluate from which time remove executions in minutes. '
|
help=_('Evaluate from which time remove executions in minutes. '
|
||||||
'For example when older_than = 60, remove all executions '
|
'For example when older_than = 60, remove all executions '
|
||||||
'that finished a 60 minutes ago or more. '
|
'that finished a 60 minutes ago or more. '
|
||||||
'Minimum value is 1. '
|
'Minimum value is 1.')
|
||||||
'Note that only final state execution will remove '
|
),
|
||||||
'( SUCCESS / ERROR ).')
|
cfg.IntOpt(
|
||||||
|
'max_finished_executions',
|
||||||
|
default=0,
|
||||||
|
help=_('The maximum number of finished workflow executions'
|
||||||
|
'to be stored. For example when max_finished_executions = 100,'
|
||||||
|
'only the 100 latest finished executions will be preserved.'
|
||||||
|
'This means that even unexpired executions are eligible'
|
||||||
|
'for deletion, to decrease the number of executions in the'
|
||||||
|
'database. The default value is 0. If it is set to 0,'
|
||||||
|
'this constraint won\'t be applied.')
|
||||||
),
|
),
|
||||||
cfg.IntOpt(
|
cfg.IntOpt(
|
||||||
'batch_size',
|
'batch_size',
|
||||||
|
@ -393,8 +393,14 @@ def get_next_cron_triggers(time):
|
|||||||
return IMPL.get_next_cron_triggers(time)
|
return IMPL.get_next_cron_triggers(time)
|
||||||
|
|
||||||
|
|
||||||
def get_expired_executions(time, limit=None):
|
def get_executions_to_clean(expiration_time, limit=None,
|
||||||
return IMPL.get_expired_executions(time, limit=limit)
|
max_finished_executions=None, columns=()):
|
||||||
|
return IMPL.get_executions_to_clean(
|
||||||
|
expiration_time,
|
||||||
|
limit,
|
||||||
|
max_finished_executions,
|
||||||
|
columns
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
def create_cron_trigger(values):
|
def create_cron_trigger(values):
|
||||||
|
@ -35,6 +35,7 @@ from mistral.db.sqlalchemy import sqlite_lock
|
|||||||
from mistral.db import utils as m_dbutils
|
from mistral.db import utils as m_dbutils
|
||||||
from mistral.db.v2.sqlalchemy import filters as db_filters
|
from mistral.db.v2.sqlalchemy import filters as db_filters
|
||||||
from mistral.db.v2.sqlalchemy import models
|
from mistral.db.v2.sqlalchemy import models
|
||||||
|
from mistral.db.v2.sqlalchemy.models import WorkflowExecution
|
||||||
from mistral import exceptions as exc
|
from mistral import exceptions as exc
|
||||||
from mistral.services import security
|
from mistral.services import security
|
||||||
from mistral import utils
|
from mistral import utils
|
||||||
@ -1034,13 +1035,37 @@ def delete_delayed_calls(session=None, **kwargs):
|
|||||||
|
|
||||||
|
|
||||||
@b.session_aware()
|
@b.session_aware()
|
||||||
def get_expired_executions(time, session=None, limit=None):
|
def get_executions_to_clean(expiration_time, limit=None,
|
||||||
query = b.model_query(models.WorkflowExecution)
|
max_finished_executions=None, columns=(),
|
||||||
|
session=None):
|
||||||
|
# Get the ids of the executions that won't be deleted.
|
||||||
|
# These are the not expired executions,
|
||||||
|
# limited by the new max_finished_executions constraint.
|
||||||
|
query = _get_completed_root_executions_query((WorkflowExecution.id,))
|
||||||
|
query = query.filter(
|
||||||
|
models.WorkflowExecution.updated_at >= expiration_time
|
||||||
|
)
|
||||||
|
query = query.order_by(models.WorkflowExecution.updated_at.desc())
|
||||||
|
|
||||||
|
if max_finished_executions:
|
||||||
|
query = query.limit(max_finished_executions)
|
||||||
|
|
||||||
|
# And take the inverse of that set.
|
||||||
|
inverse = _get_completed_root_executions_query(columns)
|
||||||
|
inverse = inverse.filter(~WorkflowExecution.id.in_(query))
|
||||||
|
inverse = inverse.order_by(models.WorkflowExecution.updated_at.asc())
|
||||||
|
|
||||||
|
if limit:
|
||||||
|
inverse.limit(limit)
|
||||||
|
|
||||||
|
return inverse.all()
|
||||||
|
|
||||||
|
|
||||||
|
def _get_completed_root_executions_query(columns):
|
||||||
|
query = b.model_query(models.WorkflowExecution, columns=columns)
|
||||||
# Only WorkflowExecution that are not a child of other WorkflowExecution.
|
# Only WorkflowExecution that are not a child of other WorkflowExecution.
|
||||||
query = query.filter(models.WorkflowExecution.
|
query = query.filter(models.WorkflowExecution.
|
||||||
task_execution_id == sa.null())
|
task_execution_id == sa.null())
|
||||||
query = query.filter(models.WorkflowExecution.updated_at < time)
|
|
||||||
query = query.filter(
|
query = query.filter(
|
||||||
sa.or_(
|
sa.or_(
|
||||||
models.WorkflowExecution.state == states.SUCCESS,
|
models.WorkflowExecution.state == states.SUCCESS,
|
||||||
@ -1048,14 +1073,8 @@ def get_expired_executions(time, session=None, limit=None):
|
|||||||
models.WorkflowExecution.state == states.CANCELLED
|
models.WorkflowExecution.state == states.CANCELLED
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
|
return query
|
||||||
|
|
||||||
if limit:
|
|
||||||
query = query.limit(limit)
|
|
||||||
|
|
||||||
return query.all()
|
|
||||||
|
|
||||||
|
|
||||||
# Cron triggers.
|
|
||||||
|
|
||||||
@b.session_aware()
|
@b.session_aware()
|
||||||
def get_cron_trigger(name, session=None):
|
def get_cron_trigger(name, session=None):
|
||||||
|
@ -41,9 +41,10 @@ class ExecutionExpirationPolicy(periodic_task.PeriodicTasks):
|
|||||||
super(ExecutionExpirationPolicy, self).__init__(conf)
|
super(ExecutionExpirationPolicy, self).__init__(conf)
|
||||||
|
|
||||||
interval = CONF.execution_expiration_policy.evaluation_interval
|
interval = CONF.execution_expiration_policy.evaluation_interval
|
||||||
older_than = CONF.execution_expiration_policy.older_than
|
ot = CONF.execution_expiration_policy.older_than
|
||||||
|
mfe = CONF.execution_expiration_policy.max_finished_executions
|
||||||
|
|
||||||
if (interval and older_than and older_than >= 1):
|
if interval and ((ot and ot >= 1) or (mfe and mfe >= 1)):
|
||||||
_periodic_task = periodic_task.periodic_task(
|
_periodic_task = periodic_task.periodic_task(
|
||||||
spacing=interval * 60,
|
spacing=interval * 60,
|
||||||
run_immediately=True
|
run_immediately=True
|
||||||
@ -54,31 +55,29 @@ class ExecutionExpirationPolicy(periodic_task.PeriodicTasks):
|
|||||||
)
|
)
|
||||||
else:
|
else:
|
||||||
LOG.debug("Expiration policy disabled. Evaluation_interval "
|
LOG.debug("Expiration policy disabled. Evaluation_interval "
|
||||||
"is not configured or older_than < '1'.")
|
"is not configured or both older_than and "
|
||||||
|
"max_finished_executions < '1'.")
|
||||||
|
|
||||||
|
|
||||||
def _delete_expired_executions(batch_size, expired_time):
|
def _delete_executions(batch_size, expiration_time,
|
||||||
if batch_size:
|
max_finished_executions):
|
||||||
while True:
|
while True:
|
||||||
with db_api.transaction():
|
with db_api.transaction():
|
||||||
# TODO(gpaz): In the future should use generic method with
|
# TODO(gpaz): In the future should use generic method with
|
||||||
# filters params and not specific method that filter by time.
|
# filters params and not specific method that filter by time.
|
||||||
expired_executions = db_api.get_expired_executions(
|
execs = db_api.get_executions_to_clean(
|
||||||
expired_time,
|
expiration_time,
|
||||||
limit=batch_size
|
limit=batch_size,
|
||||||
|
max_finished_executions=max_finished_executions
|
||||||
)
|
)
|
||||||
|
|
||||||
if not expired_executions:
|
if not execs:
|
||||||
break
|
break
|
||||||
_delete(expired_executions)
|
_delete(execs)
|
||||||
else:
|
|
||||||
with db_api.transaction():
|
|
||||||
expired_executions = db_api.get_expired_executions(expired_time)
|
|
||||||
_delete(expired_executions)
|
|
||||||
|
|
||||||
|
|
||||||
def _delete(exp_executions):
|
def _delete(executions):
|
||||||
for execution in exp_executions:
|
for execution in executions:
|
||||||
try:
|
try:
|
||||||
# Setup project_id for _secure_query delete execution.
|
# Setup project_id for _secure_query delete execution.
|
||||||
# TODO(tuan_luong): Manipulation with auth_ctx should be
|
# TODO(tuan_luong): Manipulation with auth_ctx should be
|
||||||
@ -110,15 +109,16 @@ def run_execution_expiration_policy(self, ctx):
|
|||||||
LOG.debug("Starting expiration policy.")
|
LOG.debug("Starting expiration policy.")
|
||||||
|
|
||||||
older_than = CONF.execution_expiration_policy.older_than
|
older_than = CONF.execution_expiration_policy.older_than
|
||||||
exp_time = (datetime.datetime.now()
|
exp_time = (datetime.datetime.utcnow()
|
||||||
- datetime.timedelta(minutes=older_than))
|
- datetime.timedelta(minutes=older_than))
|
||||||
|
|
||||||
batch_size = CONF.execution_expiration_policy.batch_size
|
batch_size = CONF.execution_expiration_policy.batch_size
|
||||||
|
max_executions = CONF.execution_expiration_policy.max_finished_executions
|
||||||
|
|
||||||
# The default value of batch size is 0
|
# The default value of batch size is 0
|
||||||
# If it is not set, size of batch will be the size
|
# If it is not set, size of batch will be the size
|
||||||
# of total number of expired executions.
|
# of total number of expired executions.
|
||||||
_delete_expired_executions(batch_size, exp_time)
|
_delete_executions(batch_size, exp_time, max_executions)
|
||||||
|
|
||||||
|
|
||||||
def setup():
|
def setup():
|
||||||
|
409
mistral/tests/unit/services/test_expiration_policy.py
Normal file
409
mistral/tests/unit/services/test_expiration_policy.py
Normal file
@ -0,0 +1,409 @@
|
|||||||
|
# Copyright 2015 - Alcatel-lucent, Inc.
|
||||||
|
# Copyright 2015 - StackStorm, Inc.
|
||||||
|
# Copyright 2016 - Brocade Communications Systems, Inc.
|
||||||
|
#
|
||||||
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
# you may not use this file except in compliance with the License.
|
||||||
|
# You may obtain a copy of the License at
|
||||||
|
#
|
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
#
|
||||||
|
# Unless required by applicable law or agreed to in writing, software
|
||||||
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
# See the License for the specific language governing permissions and
|
||||||
|
# limitations under the License.
|
||||||
|
|
||||||
|
import datetime
|
||||||
|
|
||||||
|
from mistral import context as ctx
|
||||||
|
from mistral.db.v2 import api as db_api
|
||||||
|
from mistral.services import expiration_policy
|
||||||
|
from mistral.services.expiration_policy import ExecutionExpirationPolicy
|
||||||
|
from mistral.tests.unit import base
|
||||||
|
from mistral.tests.unit.base import get_context
|
||||||
|
from oslo_config import cfg
|
||||||
|
|
||||||
|
|
||||||
|
def _create_workflow_executions():
|
||||||
|
time_now = datetime.datetime.utcnow()
|
||||||
|
|
||||||
|
wf_execs = [
|
||||||
|
{
|
||||||
|
'id': 'success_expired',
|
||||||
|
'name': 'success_expired',
|
||||||
|
'created_at': time_now - datetime.timedelta(minutes=60),
|
||||||
|
'updated_at': time_now - datetime.timedelta(minutes=59),
|
||||||
|
'workflow_name': 'test_exec',
|
||||||
|
'state': "SUCCESS",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
'id': 'error_expired',
|
||||||
|
'name': 'error_expired',
|
||||||
|
'created_at': time_now - datetime.timedelta(days=3, minutes=10),
|
||||||
|
'updated_at': time_now - datetime.timedelta(days=3),
|
||||||
|
'workflow_name': 'test_exec',
|
||||||
|
'state': "ERROR",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
'id': 'running_not_expired',
|
||||||
|
'name': 'running_not_expired',
|
||||||
|
'created_at': time_now - datetime.timedelta(days=3, minutes=10),
|
||||||
|
'updated_at': time_now - datetime.timedelta(days=3),
|
||||||
|
'workflow_name': 'test_exec',
|
||||||
|
'state': "RUNNING",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
'id': 'running_not_expired2',
|
||||||
|
'name': 'running_not_expired2',
|
||||||
|
'created_at': time_now - datetime.timedelta(days=3, minutes=10),
|
||||||
|
'updated_at': time_now - datetime.timedelta(days=4),
|
||||||
|
'workflow_name': 'test_exec',
|
||||||
|
'state': "RUNNING",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
'id': 'success_not_expired',
|
||||||
|
'name': 'success_not_expired',
|
||||||
|
'created_at': time_now - datetime.timedelta(minutes=15),
|
||||||
|
'updated_at': time_now - datetime.timedelta(minutes=5),
|
||||||
|
'workflow_name': 'test_exec',
|
||||||
|
'state': "SUCCESS",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
'id': 'abc',
|
||||||
|
'name': 'cancelled_expired',
|
||||||
|
'created_at': time_now - datetime.timedelta(minutes=60),
|
||||||
|
'updated_at': time_now - datetime.timedelta(minutes=59),
|
||||||
|
'workflow_name': 'test_exec',
|
||||||
|
'state': "CANCELLED",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
'id': 'cancelled_not_expired',
|
||||||
|
'name': 'cancelled_not_expired',
|
||||||
|
'created_at': time_now - datetime.timedelta(minutes=15),
|
||||||
|
'updated_at': time_now - datetime.timedelta(minutes=6),
|
||||||
|
'workflow_name': 'test_exec',
|
||||||
|
'state': "CANCELLED",
|
||||||
|
}
|
||||||
|
]
|
||||||
|
|
||||||
|
for wf_exec in wf_execs:
|
||||||
|
db_api.create_workflow_execution(wf_exec)
|
||||||
|
|
||||||
|
# Create a nested workflow execution.
|
||||||
|
|
||||||
|
db_api.create_task_execution(
|
||||||
|
{
|
||||||
|
'id': 'running_not_expired',
|
||||||
|
'workflow_execution_id': 'success_not_expired',
|
||||||
|
'name': 'my_task'
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
|
db_api.create_workflow_execution(
|
||||||
|
{
|
||||||
|
'id': 'expired_but_not_a_parent',
|
||||||
|
'name': 'expired_but_not_a_parent',
|
||||||
|
'created_at': time_now - datetime.timedelta(days=15),
|
||||||
|
'updated_at': time_now - datetime.timedelta(days=10),
|
||||||
|
'workflow_name': 'test_exec',
|
||||||
|
'state': "SUCCESS",
|
||||||
|
'task_execution_id': 'running_not_expired'
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def _switch_context(is_default, is_admin):
|
||||||
|
ctx.set_ctx(get_context(is_default, is_admin))
|
||||||
|
|
||||||
|
|
||||||
|
class ExpirationPolicyTest(base.DbTestCase):
|
||||||
|
def test_expiration_policy_for_executions_with_different_project_id(self):
|
||||||
|
# Delete execution uses a secured filtering and we need
|
||||||
|
# to verify that admin able to do that for other projects.
|
||||||
|
cfg.CONF.set_default('auth_enable', True, group='pecan')
|
||||||
|
|
||||||
|
# Since we are removing other projects execution,
|
||||||
|
# we want to load the executions with other project_id.
|
||||||
|
_switch_context(False, False)
|
||||||
|
|
||||||
|
_create_workflow_executions()
|
||||||
|
|
||||||
|
now = datetime.datetime.utcnow()
|
||||||
|
|
||||||
|
# This execution has a parent wf and testing that we are
|
||||||
|
# querying only for parent wfs.
|
||||||
|
exec_child = db_api.get_workflow_execution('expired_but_not_a_parent')
|
||||||
|
|
||||||
|
self.assertEqual('running_not_expired', exec_child.task_execution_id)
|
||||||
|
|
||||||
|
# Call for all expired wfs execs.
|
||||||
|
execs = db_api.get_executions_to_clean(now)
|
||||||
|
|
||||||
|
# Should be only 5, the RUNNING execution shouldn't return,
|
||||||
|
# so the child wf (that has parent task id).
|
||||||
|
self.assertEqual(5, len(execs))
|
||||||
|
|
||||||
|
# Switch context to Admin since expiration policy running as Admin.
|
||||||
|
_switch_context(True, True)
|
||||||
|
|
||||||
|
_set_expiration_policy_config(1, 30, None)
|
||||||
|
expiration_policy.run_execution_expiration_policy(self, ctx)
|
||||||
|
|
||||||
|
# Only non_expired available (update_at < older_than).
|
||||||
|
execs = db_api.get_executions_to_clean(now)
|
||||||
|
|
||||||
|
self.assertEqual(2, len(execs))
|
||||||
|
self.assertListEqual(
|
||||||
|
[
|
||||||
|
'cancelled_not_expired',
|
||||||
|
'success_not_expired'
|
||||||
|
],
|
||||||
|
sorted([ex.id for ex in execs])
|
||||||
|
)
|
||||||
|
|
||||||
|
_set_expiration_policy_config(1, 5, None)
|
||||||
|
expiration_policy.run_execution_expiration_policy(self, ctx)
|
||||||
|
execs = db_api.get_executions_to_clean(now)
|
||||||
|
|
||||||
|
self.assertEqual(0, len(execs))
|
||||||
|
|
||||||
|
def test_deletion_of_expired_executions_with_batch_size_scenario1(self):
|
||||||
|
"""scenario1
|
||||||
|
|
||||||
|
This test will use batch_size of 3,
|
||||||
|
5 expired executions and different values of "older_than"
|
||||||
|
which is 30 and 5 minutes respectively.
|
||||||
|
Expected_result: All expired executions are successfully deleted.
|
||||||
|
"""
|
||||||
|
|
||||||
|
cfg.CONF.set_default(
|
||||||
|
'batch_size',
|
||||||
|
3,
|
||||||
|
group='execution_expiration_policy'
|
||||||
|
)
|
||||||
|
|
||||||
|
_create_workflow_executions()
|
||||||
|
|
||||||
|
_set_expiration_policy_config(1, 30, None)
|
||||||
|
|
||||||
|
# Call for all expired wfs execs.
|
||||||
|
now = datetime.datetime.utcnow()
|
||||||
|
execs = db_api.get_executions_to_clean(now)
|
||||||
|
|
||||||
|
# Should be only 5, the RUNNING execution shouldn't return,
|
||||||
|
# so the child wf (that has parent task id).
|
||||||
|
self.assertEqual(5, len(execs))
|
||||||
|
|
||||||
|
older_than = cfg.CONF.execution_expiration_policy.older_than
|
||||||
|
exp_time = (datetime.datetime.utcnow()
|
||||||
|
- datetime.timedelta(minutes=older_than))
|
||||||
|
batch_size = cfg.CONF.execution_expiration_policy.batch_size
|
||||||
|
mfe = cfg.CONF.execution_expiration_policy.max_finished_executions
|
||||||
|
expiration_policy._delete_executions(
|
||||||
|
batch_size,
|
||||||
|
exp_time,
|
||||||
|
mfe
|
||||||
|
)
|
||||||
|
execs = db_api.get_executions_to_clean(now)
|
||||||
|
self.assertEqual(2, len(execs))
|
||||||
|
|
||||||
|
_set_expiration_policy_config(1, 5, None)
|
||||||
|
expiration_policy.run_execution_expiration_policy(self, ctx)
|
||||||
|
execs = db_api.get_executions_to_clean(now)
|
||||||
|
self.assertEqual(0, len(execs))
|
||||||
|
|
||||||
|
def test_deletion_of_expired_executions_with_batch_size_scenario2(self):
|
||||||
|
"""scenario2
|
||||||
|
|
||||||
|
This test will use batch_size of 2, 5 expired executions
|
||||||
|
with value of "older_than" that is 5 minutes.
|
||||||
|
Expected_result: All expired executions are successfully deleted.
|
||||||
|
"""
|
||||||
|
|
||||||
|
cfg.CONF.set_default(
|
||||||
|
'batch_size',
|
||||||
|
2,
|
||||||
|
group='execution_expiration_policy'
|
||||||
|
)
|
||||||
|
|
||||||
|
_create_workflow_executions()
|
||||||
|
|
||||||
|
_set_expiration_policy_config(1, 5, None)
|
||||||
|
|
||||||
|
# Call for all expired wfs execs.
|
||||||
|
now = datetime.datetime.utcnow()
|
||||||
|
execs = db_api.get_executions_to_clean(now)
|
||||||
|
|
||||||
|
# Should be only 5, the RUNNING execution shouldn't return,
|
||||||
|
# so the child wf (that has parent task id).
|
||||||
|
self.assertEqual(5, len(execs))
|
||||||
|
|
||||||
|
older_than = cfg.CONF.execution_expiration_policy.older_than
|
||||||
|
exp_time = (datetime.datetime.utcnow()
|
||||||
|
- datetime.timedelta(minutes=older_than))
|
||||||
|
batch_size = cfg.CONF.execution_expiration_policy.batch_size
|
||||||
|
mfe = cfg.CONF.execution_expiration_policy.max_finished_executions
|
||||||
|
expiration_policy._delete_executions(
|
||||||
|
batch_size,
|
||||||
|
exp_time,
|
||||||
|
mfe
|
||||||
|
)
|
||||||
|
execs = db_api.get_executions_to_clean(now)
|
||||||
|
self.assertEqual(0, len(execs))
|
||||||
|
|
||||||
|
def test_expiration_policy_for_executions_with_max_executions_scen1(self):
|
||||||
|
"""scenario1
|
||||||
|
|
||||||
|
Tests the max_executions logic with
|
||||||
|
max_finished_executions =
|
||||||
|
'total not expired and completed executions' - 1
|
||||||
|
"""
|
||||||
|
|
||||||
|
_create_workflow_executions()
|
||||||
|
|
||||||
|
now = datetime.datetime.utcnow()
|
||||||
|
|
||||||
|
# Call for all expired wfs execs.
|
||||||
|
execs = db_api.get_executions_to_clean(now)
|
||||||
|
|
||||||
|
# Should be only 5, the RUNNING execution shouldn't return,
|
||||||
|
# so the child wf (that has parent task id).
|
||||||
|
self.assertEqual(5, len(execs))
|
||||||
|
|
||||||
|
_set_expiration_policy_config(1, 30, 1)
|
||||||
|
expiration_policy.run_execution_expiration_policy(self, ctx)
|
||||||
|
|
||||||
|
# Assert the two running executions
|
||||||
|
# (running_not_expired, running_not_expired2),
|
||||||
|
# the sub execution (expired_but_not_a_parent) and the one allowed
|
||||||
|
# finished execution (success_not_expired) are there.
|
||||||
|
execs = db_api.get_workflow_executions()
|
||||||
|
self.assertEqual(4, len(execs))
|
||||||
|
self.assertListEqual(
|
||||||
|
[
|
||||||
|
'expired_but_not_a_parent',
|
||||||
|
'running_not_expired',
|
||||||
|
'running_not_expired2',
|
||||||
|
'success_not_expired'
|
||||||
|
],
|
||||||
|
sorted([ex.id for ex in execs])
|
||||||
|
)
|
||||||
|
|
||||||
|
def test_expiration_policy_for_executions_with_max_executions_scen2(self):
|
||||||
|
"""scenario2
|
||||||
|
|
||||||
|
Tests the max_executions logic with:
|
||||||
|
max_finished_executions > total completed executions
|
||||||
|
"""
|
||||||
|
|
||||||
|
_create_workflow_executions()
|
||||||
|
|
||||||
|
now = datetime.datetime.utcnow()
|
||||||
|
|
||||||
|
# Call for all expired wfs execs.
|
||||||
|
execs = db_api.get_executions_to_clean(now)
|
||||||
|
|
||||||
|
# Should be only 5, the RUNNING execution shouldn't return,
|
||||||
|
# so the child wf (that has parent task id).
|
||||||
|
self.assertEqual(5, len(execs))
|
||||||
|
|
||||||
|
_set_expiration_policy_config(1, 30, 100)
|
||||||
|
expiration_policy.run_execution_expiration_policy(self, ctx)
|
||||||
|
|
||||||
|
# Assert the two running executions
|
||||||
|
# (running_not_expired, running_not_expired2), the sub execution
|
||||||
|
# (expired_but_not_a_parent) and the all finished execution
|
||||||
|
# (success_not_expired, 'cancelled_not_expired') are there.
|
||||||
|
execs = db_api.get_workflow_executions()
|
||||||
|
self.assertEqual(5, len(execs))
|
||||||
|
self.assertListEqual(
|
||||||
|
[
|
||||||
|
'cancelled_not_expired',
|
||||||
|
'expired_but_not_a_parent',
|
||||||
|
'running_not_expired',
|
||||||
|
'running_not_expired2',
|
||||||
|
'success_not_expired'
|
||||||
|
],
|
||||||
|
sorted([ex.id for ex in execs])
|
||||||
|
)
|
||||||
|
|
||||||
|
def test_negative_wrong_conf_values(self):
|
||||||
|
_set_expiration_policy_config(None, None, None)
|
||||||
|
e_policy = expiration_policy.ExecutionExpirationPolicy(cfg.CONF)
|
||||||
|
|
||||||
|
self.assertDictEqual({}, e_policy._periodic_spacing)
|
||||||
|
self.assertListEqual([], e_policy._periodic_tasks)
|
||||||
|
|
||||||
|
_set_expiration_policy_config(None, 60, None)
|
||||||
|
e_policy = expiration_policy.ExecutionExpirationPolicy(cfg.CONF)
|
||||||
|
|
||||||
|
self.assertDictEqual({}, e_policy._periodic_spacing)
|
||||||
|
self.assertListEqual([], e_policy._periodic_tasks)
|
||||||
|
|
||||||
|
_set_expiration_policy_config(60, None, None)
|
||||||
|
e_policy = expiration_policy.ExecutionExpirationPolicy(cfg.CONF)
|
||||||
|
|
||||||
|
self.assertDictEqual({}, e_policy._periodic_spacing)
|
||||||
|
self.assertListEqual([], e_policy._periodic_tasks)
|
||||||
|
|
||||||
|
def test_periodic_task_parameters(self):
|
||||||
|
_set_expiration_policy_config(17, 13, None)
|
||||||
|
|
||||||
|
e_policy = expiration_policy.ExecutionExpirationPolicy(cfg.CONF)
|
||||||
|
|
||||||
|
self.assertEqual(
|
||||||
|
17 * 60,
|
||||||
|
e_policy._periodic_spacing['run_execution_expiration_policy']
|
||||||
|
)
|
||||||
|
|
||||||
|
def test_periodic_task_scheduling(self):
|
||||||
|
def _assert_scheduling(expiration_policy_config, should_schedule):
|
||||||
|
ExecutionExpirationPolicy._periodic_tasks = []
|
||||||
|
_set_expiration_policy_config(*expiration_policy_config)
|
||||||
|
e_policy = expiration_policy.ExecutionExpirationPolicy(cfg.CONF)
|
||||||
|
|
||||||
|
if should_schedule:
|
||||||
|
self.assertTrue(
|
||||||
|
e_policy._periodic_tasks,
|
||||||
|
"Periodic task should have been created."
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
self.assertFalse(
|
||||||
|
e_policy._periodic_tasks,
|
||||||
|
"Periodic task shouldn't have been created."
|
||||||
|
)
|
||||||
|
|
||||||
|
_assert_scheduling([1, 1, None], True)
|
||||||
|
_assert_scheduling([1, None, 1], True)
|
||||||
|
_assert_scheduling([1, 1, 1], True)
|
||||||
|
_assert_scheduling([1, None, None], False)
|
||||||
|
_assert_scheduling([None, 1, 1], False)
|
||||||
|
|
||||||
|
def tearDown(self):
|
||||||
|
"""Restores the size limit config to default."""
|
||||||
|
super(ExpirationPolicyTest, self).tearDown()
|
||||||
|
|
||||||
|
cfg.CONF.set_default('auth_enable', False, group='pecan')
|
||||||
|
|
||||||
|
ctx.set_ctx(None)
|
||||||
|
|
||||||
|
_set_expiration_policy_config(None, None, None)
|
||||||
|
|
||||||
|
|
||||||
|
def _set_expiration_policy_config(evaluation_interval, older_than, mfe):
|
||||||
|
cfg.CONF.set_default(
|
||||||
|
'evaluation_interval',
|
||||||
|
evaluation_interval,
|
||||||
|
group='execution_expiration_policy'
|
||||||
|
)
|
||||||
|
cfg.CONF.set_default(
|
||||||
|
'older_than',
|
||||||
|
older_than,
|
||||||
|
group='execution_expiration_policy'
|
||||||
|
)
|
||||||
|
cfg.CONF.set_default(
|
||||||
|
'max_finished_executions',
|
||||||
|
mfe,
|
||||||
|
group='execution_expiration_policy'
|
||||||
|
)
|
@ -1,298 +0,0 @@
|
|||||||
# Copyright 2015 - Alcatel-lucent, Inc.
|
|
||||||
# Copyright 2015 - StackStorm, Inc.
|
|
||||||
# Copyright 2016 - Brocade Communications Systems, Inc.
|
|
||||||
#
|
|
||||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
||||||
# you may not use this file except in compliance with the License.
|
|
||||||
# You may obtain a copy of the License at
|
|
||||||
#
|
|
||||||
# http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
#
|
|
||||||
# Unless required by applicable law or agreed to in writing, software
|
|
||||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
# See the License for the specific language governing permissions and
|
|
||||||
# limitations under the License.
|
|
||||||
|
|
||||||
import datetime
|
|
||||||
|
|
||||||
from mistral import context as ctx
|
|
||||||
from mistral.db.v2 import api as db_api
|
|
||||||
from mistral.services import expiration_policy
|
|
||||||
from mistral.tests.unit import base
|
|
||||||
from oslo_config import cfg
|
|
||||||
|
|
||||||
|
|
||||||
def _create_workflow_executions():
|
|
||||||
time_now = datetime.datetime.utcnow()
|
|
||||||
|
|
||||||
wf_execs = [
|
|
||||||
{
|
|
||||||
'id': '123',
|
|
||||||
'name': 'success_expired',
|
|
||||||
'created_at': time_now - datetime.timedelta(minutes=60),
|
|
||||||
'updated_at': time_now - datetime.timedelta(minutes=59),
|
|
||||||
'workflow_name': 'test_exec',
|
|
||||||
'state': "SUCCESS",
|
|
||||||
},
|
|
||||||
{
|
|
||||||
'id': '456',
|
|
||||||
'name': 'error_expired',
|
|
||||||
'created_at': time_now - datetime.timedelta(days=3, minutes=10),
|
|
||||||
'updated_at': time_now - datetime.timedelta(days=3),
|
|
||||||
'workflow_name': 'test_exec',
|
|
||||||
'state': "ERROR",
|
|
||||||
},
|
|
||||||
{
|
|
||||||
'id': '789',
|
|
||||||
'name': 'running_not_expired',
|
|
||||||
'created_at': time_now - datetime.timedelta(days=3, minutes=10),
|
|
||||||
'updated_at': time_now - datetime.timedelta(days=3),
|
|
||||||
'workflow_name': 'test_exec',
|
|
||||||
'state': "RUNNING",
|
|
||||||
},
|
|
||||||
{
|
|
||||||
'id': '987',
|
|
||||||
'name': 'success_not_expired',
|
|
||||||
'created_at': time_now - datetime.timedelta(minutes=15),
|
|
||||||
'updated_at': time_now - datetime.timedelta(minutes=5),
|
|
||||||
'workflow_name': 'test_exec',
|
|
||||||
'state': "SUCCESS",
|
|
||||||
},
|
|
||||||
{
|
|
||||||
'id': 'abc',
|
|
||||||
'name': 'cancelled_expired',
|
|
||||||
'created_at': time_now - datetime.timedelta(minutes=60),
|
|
||||||
'updated_at': time_now - datetime.timedelta(minutes=59),
|
|
||||||
'workflow_name': 'test_exec',
|
|
||||||
'state': "CANCELLED",
|
|
||||||
},
|
|
||||||
{
|
|
||||||
'id': 'def',
|
|
||||||
'name': 'cancelled_not_expired',
|
|
||||||
'created_at': time_now - datetime.timedelta(minutes=15),
|
|
||||||
'updated_at': time_now - datetime.timedelta(minutes=5),
|
|
||||||
'workflow_name': 'test_exec',
|
|
||||||
'state': "CANCELLED",
|
|
||||||
}
|
|
||||||
]
|
|
||||||
|
|
||||||
for wf_exec in wf_execs:
|
|
||||||
db_api.create_workflow_execution(wf_exec)
|
|
||||||
|
|
||||||
# Create a nested workflow execution.
|
|
||||||
|
|
||||||
db_api.create_task_execution(
|
|
||||||
{
|
|
||||||
'id': '789',
|
|
||||||
'workflow_execution_id': '987',
|
|
||||||
'name': 'my_task'
|
|
||||||
}
|
|
||||||
)
|
|
||||||
|
|
||||||
db_api.create_workflow_execution(
|
|
||||||
{
|
|
||||||
'id': '654',
|
|
||||||
'name': 'expired but not a parent',
|
|
||||||
'created_at': time_now - datetime.timedelta(days=15),
|
|
||||||
'updated_at': time_now - datetime.timedelta(days=10),
|
|
||||||
'workflow_name': 'test_exec',
|
|
||||||
'state': "SUCCESS",
|
|
||||||
'task_execution_id': '789'
|
|
||||||
}
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
def _switch_context(project_id, is_admin):
|
|
||||||
_ctx = ctx.MistralContext(
|
|
||||||
user_id=None,
|
|
||||||
project_id=project_id,
|
|
||||||
auth_token=None,
|
|
||||||
is_admin=is_admin
|
|
||||||
)
|
|
||||||
|
|
||||||
ctx.set_ctx(_ctx)
|
|
||||||
|
|
||||||
|
|
||||||
class ExpirationPolicyTest(base.DbTestCase):
|
|
||||||
def test_expiration_policy_for_executions(self):
|
|
||||||
# Delete execution uses a secured filtering and we need
|
|
||||||
# to verify that admin able to do that for other projects.
|
|
||||||
cfg.CONF.set_default('auth_enable', True, group='pecan')
|
|
||||||
|
|
||||||
# Since we are removing other projects execution,
|
|
||||||
# we want to load the executions with other project_id.
|
|
||||||
_switch_context('non_admin_project', False)
|
|
||||||
|
|
||||||
_create_workflow_executions()
|
|
||||||
|
|
||||||
now = datetime.datetime.utcnow()
|
|
||||||
|
|
||||||
# This execution has a parent wf and testing that we are
|
|
||||||
# querying only for parent wfs.
|
|
||||||
exec_child = db_api.get_workflow_execution('654')
|
|
||||||
|
|
||||||
self.assertEqual('789', exec_child.task_execution_id)
|
|
||||||
|
|
||||||
# Call for all expired wfs execs.
|
|
||||||
execs = db_api.get_expired_executions(now)
|
|
||||||
|
|
||||||
# Should be only 5, the RUNNING execution shouldn't return,
|
|
||||||
# so the child wf (that has parent task id).
|
|
||||||
self.assertEqual(5, len(execs))
|
|
||||||
|
|
||||||
# Switch context to Admin since expiration policy running as Admin.
|
|
||||||
_switch_context(None, True)
|
|
||||||
|
|
||||||
_set_expiration_policy_config(1, 30)
|
|
||||||
expiration_policy.run_execution_expiration_policy(self, ctx)
|
|
||||||
|
|
||||||
# Only non_expired available (update_at < older_than).
|
|
||||||
execs = db_api.get_expired_executions(now)
|
|
||||||
|
|
||||||
self.assertEqual(2, len(execs))
|
|
||||||
self.assertListEqual(['987', 'def'], sorted([ex.id for ex in execs]))
|
|
||||||
|
|
||||||
_set_expiration_policy_config(1, 5)
|
|
||||||
expiration_policy.run_execution_expiration_policy(self, ctx)
|
|
||||||
execs = db_api.get_expired_executions(now)
|
|
||||||
|
|
||||||
self.assertEqual(0, len(execs))
|
|
||||||
|
|
||||||
def test_deletion_of_expired_executions_with_batch_size_scenario1(self):
|
|
||||||
# Scenario1 test will test with batch_size of 3,
|
|
||||||
# 5 expired executions and different values of "older_than"
|
|
||||||
# which is 30 and 5 minutes respectively.
|
|
||||||
# Expected_result: All expired executions are successfully deleted.
|
|
||||||
|
|
||||||
cfg.CONF.set_default(
|
|
||||||
'batch_size',
|
|
||||||
3,
|
|
||||||
group='execution_expiration_policy'
|
|
||||||
)
|
|
||||||
|
|
||||||
# Since we are removing other projects execution,
|
|
||||||
# we want to load the executions with other project_id.
|
|
||||||
_switch_context('non_admin_project', False)
|
|
||||||
|
|
||||||
_create_workflow_executions()
|
|
||||||
|
|
||||||
# Switch context to Admin since expiration policy running as Admin.
|
|
||||||
_switch_context(None, True)
|
|
||||||
|
|
||||||
_set_expiration_policy_config(1, 30)
|
|
||||||
|
|
||||||
# Call for all expired wfs execs.
|
|
||||||
now = datetime.datetime.now()
|
|
||||||
execs = db_api.get_expired_executions(now)
|
|
||||||
|
|
||||||
# Should be only 5, the RUNNING execution shouldn't return,
|
|
||||||
# so the child wf (that has parent task id).
|
|
||||||
self.assertEqual(5, len(execs))
|
|
||||||
|
|
||||||
older_than = cfg.CONF.execution_expiration_policy.older_than
|
|
||||||
exp_time = (datetime.datetime.now()
|
|
||||||
- datetime.timedelta(minutes=older_than))
|
|
||||||
batch_size = cfg.CONF.execution_expiration_policy.batch_size
|
|
||||||
expiration_policy._delete_expired_executions(
|
|
||||||
batch_size,
|
|
||||||
exp_time
|
|
||||||
)
|
|
||||||
execs = db_api.get_expired_executions(now)
|
|
||||||
self.assertEqual(2, len(execs))
|
|
||||||
|
|
||||||
_set_expiration_policy_config(1, 5)
|
|
||||||
expiration_policy.run_execution_expiration_policy(self, ctx)
|
|
||||||
execs = db_api.get_expired_executions(now)
|
|
||||||
self.assertEqual(0, len(execs))
|
|
||||||
|
|
||||||
def test_deletion_of_expired_executions_with_batch_size_scenario2(self):
|
|
||||||
# Scenario2 will test with batch_size of 2, 5 expired executions
|
|
||||||
# with value of "older_than" that is 5 minutes.
|
|
||||||
# Expected_result: All expired executions are successfully deleted.
|
|
||||||
|
|
||||||
cfg.CONF.set_default(
|
|
||||||
'batch_size',
|
|
||||||
2,
|
|
||||||
group='execution_expiration_policy'
|
|
||||||
)
|
|
||||||
|
|
||||||
# Since we are removing other projects execution,
|
|
||||||
# we want to load the executions with other project_id.
|
|
||||||
_switch_context('non_admin_project', False)
|
|
||||||
|
|
||||||
_create_workflow_executions()
|
|
||||||
|
|
||||||
# Switch context to Admin since expiration policy running as Admin.
|
|
||||||
_switch_context(None, True)
|
|
||||||
|
|
||||||
_set_expiration_policy_config(1, 5)
|
|
||||||
|
|
||||||
# Call for all expired wfs execs.
|
|
||||||
now = datetime.datetime.now()
|
|
||||||
execs = db_api.get_expired_executions(now)
|
|
||||||
|
|
||||||
# Should be only 5, the RUNNING execution shouldn't return,
|
|
||||||
# so the child wf (that has parent task id).
|
|
||||||
self.assertEqual(5, len(execs))
|
|
||||||
|
|
||||||
older_than = cfg.CONF.execution_expiration_policy.older_than
|
|
||||||
exp_time = (datetime.datetime.now()
|
|
||||||
- datetime.timedelta(minutes=older_than))
|
|
||||||
batch_size = cfg.CONF.execution_expiration_policy.batch_size
|
|
||||||
expiration_policy._delete_expired_executions(
|
|
||||||
batch_size,
|
|
||||||
exp_time
|
|
||||||
)
|
|
||||||
execs = db_api.get_expired_executions(now)
|
|
||||||
self.assertEqual(0, len(execs))
|
|
||||||
|
|
||||||
def test_negative_wrong_conf_values(self):
|
|
||||||
_set_expiration_policy_config(None, None)
|
|
||||||
e_policy = expiration_policy.ExecutionExpirationPolicy(cfg.CONF)
|
|
||||||
|
|
||||||
self.assertDictEqual({}, e_policy._periodic_spacing)
|
|
||||||
self.assertListEqual([], e_policy._periodic_tasks)
|
|
||||||
|
|
||||||
_set_expiration_policy_config(None, 60)
|
|
||||||
e_policy = expiration_policy.ExecutionExpirationPolicy(cfg.CONF)
|
|
||||||
|
|
||||||
self.assertDictEqual({}, e_policy._periodic_spacing)
|
|
||||||
self.assertListEqual([], e_policy._periodic_tasks)
|
|
||||||
|
|
||||||
_set_expiration_policy_config(60, None)
|
|
||||||
e_policy = expiration_policy.ExecutionExpirationPolicy(cfg.CONF)
|
|
||||||
|
|
||||||
self.assertDictEqual({}, e_policy._periodic_spacing)
|
|
||||||
self.assertListEqual([], e_policy._periodic_tasks)
|
|
||||||
|
|
||||||
def test_periodic_task_parameters(self):
|
|
||||||
_set_expiration_policy_config(17, 13)
|
|
||||||
|
|
||||||
e_policy = expiration_policy.ExecutionExpirationPolicy(cfg.CONF)
|
|
||||||
|
|
||||||
self.assertEqual(17 * 60, e_policy._periodic_spacing
|
|
||||||
['run_execution_expiration_policy'])
|
|
||||||
|
|
||||||
def tearDown(self):
|
|
||||||
"""Restores the size limit config to default."""
|
|
||||||
super(ExpirationPolicyTest, self).tearDown()
|
|
||||||
|
|
||||||
cfg.CONF.set_default('auth_enable', False, group='pecan')
|
|
||||||
|
|
||||||
ctx.set_ctx(None)
|
|
||||||
|
|
||||||
_set_expiration_policy_config(None, None)
|
|
||||||
|
|
||||||
|
|
||||||
def _set_expiration_policy_config(evaluation_interval, older_than):
|
|
||||||
cfg.CONF.set_default(
|
|
||||||
'evaluation_interval',
|
|
||||||
evaluation_interval,
|
|
||||||
group='execution_expiration_policy'
|
|
||||||
)
|
|
||||||
cfg.CONF.set_default(
|
|
||||||
'older_than',
|
|
||||||
older_than,
|
|
||||||
group='execution_expiration_policy'
|
|
||||||
)
|
|
Loading…
Reference in New Issue
Block a user