Removed NOT IN query from expiration policy.

NOT IN subquery with limit is not supported by some databases.

Closes-bug: #1712585
Change-Id: Ieeb6629cc6c2fc0794ac9a0ef731816c2b6ea59b
This commit is contained in:
Kupai József 2017-09-18 16:05:15 +02:00
parent 7627c5d9f9
commit 5c0b720da2
4 changed files with 99 additions and 139 deletions

View File

@ -393,12 +393,20 @@ def get_next_cron_triggers(time):
return IMPL.get_next_cron_triggers(time) return IMPL.get_next_cron_triggers(time)
def get_executions_to_clean(expiration_time, limit=None, def get_expired_executions(expiration_time, limit=None, columns=(),
max_finished_executions=None, columns=()): session=None):
return IMPL.get_executions_to_clean( return IMPL.get_expired_executions(
expiration_time, expiration_time,
limit, limit,
columns
)
def get_superfluous_executions(max_finished_executions, limit=None, columns=(),
session=None):
return IMPL.get_superfluous_executions(
max_finished_executions, max_finished_executions,
limit,
columns columns
) )

View File

@ -35,7 +35,6 @@ from mistral.db.sqlalchemy import sqlite_lock
from mistral.db import utils as m_dbutils from mistral.db import utils as m_dbutils
from mistral.db.v2.sqlalchemy import filters as db_filters from mistral.db.v2.sqlalchemy import filters as db_filters
from mistral.db.v2.sqlalchemy import models from mistral.db.v2.sqlalchemy import models
from mistral.db.v2.sqlalchemy.models import WorkflowExecution
from mistral import exceptions as exc from mistral import exceptions as exc
from mistral.services import security from mistral.services import security
from mistral import utils from mistral import utils
@ -1099,30 +1098,31 @@ def delete_delayed_calls(session=None, **kwargs):
@b.session_aware() @b.session_aware()
def get_executions_to_clean(expiration_time, limit=None, def get_expired_executions(expiration_time, limit=None, columns=(),
max_finished_executions=None, columns=(), session=None):
session=None): query = _get_completed_root_executions_query(columns)
# Get the ids of the executions that won't be deleted. query = query.filter(models.WorkflowExecution.updated_at < expiration_time)
# These are the not expired executions,
# limited by the new max_finished_executions constraint.
query = _get_completed_root_executions_query((WorkflowExecution.id,))
query = query.filter(
models.WorkflowExecution.updated_at >= expiration_time
)
query = query.order_by(models.WorkflowExecution.updated_at.desc())
if max_finished_executions:
query = query.limit(max_finished_executions)
# And take the inverse of that set.
inverse = _get_completed_root_executions_query(columns)
inverse = inverse.filter(~WorkflowExecution.id.in_(query))
inverse = inverse.order_by(models.WorkflowExecution.updated_at.asc())
if limit: if limit:
inverse.limit(limit) query = query.limit(limit)
return inverse.all() return query.all()
@b.session_aware()
def get_superfluous_executions(max_finished_executions, limit=None, columns=(),
session=None):
if not max_finished_executions:
return []
query = _get_completed_root_executions_query(columns)
query = query.order_by(models.WorkflowExecution.updated_at.desc())
query = query.offset(max_finished_executions)
if limit:
query = query.limit(limit)
return query.all()
def _get_completed_root_executions_query(columns): def _get_completed_root_executions_query(columns):

View File

@ -61,16 +61,24 @@ class ExecutionExpirationPolicy(periodic_task.PeriodicTasks):
def _delete_executions(batch_size, expiration_time, def _delete_executions(batch_size, expiration_time,
max_finished_executions): max_finished_executions):
_delete_until_depleted(
lambda: db_api.get_expired_executions(
expiration_time,
batch_size
)
)
_delete_until_depleted(
lambda: db_api.get_superfluous_executions(
max_finished_executions,
batch_size
)
)
def _delete_until_depleted(fetch_func):
while True: while True:
with db_api.transaction(): with db_api.transaction():
# TODO(gpaz): In the future should use generic method with execs = fetch_func()
# filters params and not specific method that filter by time.
execs = db_api.get_executions_to_clean(
expiration_time,
limit=batch_size,
max_finished_executions=max_finished_executions
)
if not execs: if not execs:
break break
_delete(execs) _delete(execs)

View File

@ -138,7 +138,7 @@ class ExpirationPolicyTest(base.DbTestCase):
self.assertEqual('running_not_expired', exec_child.task_execution_id) self.assertEqual('running_not_expired', exec_child.task_execution_id)
# Call for all expired wfs execs. # Call for all expired wfs execs.
execs = db_api.get_executions_to_clean(now) execs = db_api.get_expired_executions(now)
# Should be only 5, the RUNNING execution shouldn't return, # Should be only 5, the RUNNING execution shouldn't return,
# so the child wf (that has parent task id). # so the child wf (that has parent task id).
@ -147,11 +147,11 @@ class ExpirationPolicyTest(base.DbTestCase):
# Switch context to Admin since expiration policy running as Admin. # Switch context to Admin since expiration policy running as Admin.
_switch_context(True, True) _switch_context(True, True)
_set_expiration_policy_config(1, 30, None) _set_expiration_policy_config(evaluation_interval=1, older_than=30)
expiration_policy.run_execution_expiration_policy(self, ctx) expiration_policy.run_execution_expiration_policy(self, ctx)
# Only non_expired available (update_at < older_than). # Only non_expired available (update_at < older_than).
execs = db_api.get_executions_to_clean(now) execs = db_api.get_expired_executions(now)
self.assertEqual(2, len(execs)) self.assertEqual(2, len(execs))
self.assertListEqual( self.assertListEqual(
@ -162,9 +162,9 @@ class ExpirationPolicyTest(base.DbTestCase):
sorted([ex.id for ex in execs]) sorted([ex.id for ex in execs])
) )
_set_expiration_policy_config(1, 5, None) _set_expiration_policy_config(evaluation_interval=1, older_than=5)
expiration_policy.run_execution_expiration_policy(self, ctx) expiration_policy.run_execution_expiration_policy(self, ctx)
execs = db_api.get_executions_to_clean(now) execs = db_api.get_expired_executions(now)
self.assertEqual(0, len(execs)) self.assertEqual(0, len(execs))
@ -177,40 +177,21 @@ class ExpirationPolicyTest(base.DbTestCase):
Expected_result: All expired executions are successfully deleted. Expected_result: All expired executions are successfully deleted.
""" """
cfg.CONF.set_default(
'batch_size',
3,
group='execution_expiration_policy'
)
_create_workflow_executions() _create_workflow_executions()
_set_expiration_policy_config(1, 30, None)
# Call for all expired wfs execs.
now = datetime.datetime.utcnow() now = datetime.datetime.utcnow()
execs = db_api.get_executions_to_clean(now)
# Should be only 5, the RUNNING execution shouldn't return, _set_expiration_policy_config(
# so the child wf (that has parent task id). evaluation_interval=1,
self.assertEqual(5, len(execs)) older_than=30,
batch_size=3
older_than = cfg.CONF.execution_expiration_policy.older_than
exp_time = (datetime.datetime.utcnow()
- datetime.timedelta(minutes=older_than))
batch_size = cfg.CONF.execution_expiration_policy.batch_size
mfe = cfg.CONF.execution_expiration_policy.max_finished_executions
expiration_policy._delete_executions(
batch_size,
exp_time,
mfe
) )
execs = db_api.get_executions_to_clean(now) expiration_policy.run_execution_expiration_policy(self, ctx)
execs = db_api.get_expired_executions(now)
self.assertEqual(2, len(execs)) self.assertEqual(2, len(execs))
_set_expiration_policy_config(1, 5, None) _set_expiration_policy_config(evaluation_interval=1, older_than=5)
expiration_policy.run_execution_expiration_policy(self, ctx) expiration_policy.run_execution_expiration_policy(self, ctx)
execs = db_api.get_executions_to_clean(now) execs = db_api.get_expired_executions(now)
self.assertEqual(0, len(execs)) self.assertEqual(0, len(execs))
def test_deletion_of_expired_executions_with_batch_size_scenario2(self): def test_deletion_of_expired_executions_with_batch_size_scenario2(self):
@ -221,35 +202,16 @@ class ExpirationPolicyTest(base.DbTestCase):
Expected_result: All expired executions are successfully deleted. Expected_result: All expired executions are successfully deleted.
""" """
cfg.CONF.set_default(
'batch_size',
2,
group='execution_expiration_policy'
)
_create_workflow_executions() _create_workflow_executions()
_set_expiration_policy_config(1, 5, None)
# Call for all expired wfs execs.
now = datetime.datetime.utcnow() now = datetime.datetime.utcnow()
execs = db_api.get_executions_to_clean(now)
# Should be only 5, the RUNNING execution shouldn't return, _set_expiration_policy_config(
# so the child wf (that has parent task id). evaluation_interval=1,
self.assertEqual(5, len(execs)) older_than=5,
batch_size=2
older_than = cfg.CONF.execution_expiration_policy.older_than
exp_time = (datetime.datetime.utcnow()
- datetime.timedelta(minutes=older_than))
batch_size = cfg.CONF.execution_expiration_policy.batch_size
mfe = cfg.CONF.execution_expiration_policy.max_finished_executions
expiration_policy._delete_executions(
batch_size,
exp_time,
mfe
) )
execs = db_api.get_executions_to_clean(now) expiration_policy.run_execution_expiration_policy(self, ctx)
execs = db_api.get_expired_executions(now)
self.assertEqual(0, len(execs)) self.assertEqual(0, len(execs))
def test_expiration_policy_for_executions_with_max_executions_scen1(self): def test_expiration_policy_for_executions_with_max_executions_scen1(self):
@ -261,17 +223,11 @@ class ExpirationPolicyTest(base.DbTestCase):
""" """
_create_workflow_executions() _create_workflow_executions()
_set_expiration_policy_config(
now = datetime.datetime.utcnow() evaluation_interval=1,
older_than=30,
# Call for all expired wfs execs. mfe=1
execs = db_api.get_executions_to_clean(now) )
# Should be only 5, the RUNNING execution shouldn't return,
# so the child wf (that has parent task id).
self.assertEqual(5, len(execs))
_set_expiration_policy_config(1, 30, 1)
expiration_policy.run_execution_expiration_policy(self, ctx) expiration_policy.run_execution_expiration_policy(self, ctx)
# Assert the two running executions # Assert the two running executions
@ -298,17 +254,11 @@ class ExpirationPolicyTest(base.DbTestCase):
""" """
_create_workflow_executions() _create_workflow_executions()
_set_expiration_policy_config(
now = datetime.datetime.utcnow() evaluation_interval=1,
older_than=30,
# Call for all expired wfs execs. mfe=100
execs = db_api.get_executions_to_clean(now) )
# Should be only 5, the RUNNING execution shouldn't return,
# so the child wf (that has parent task id).
self.assertEqual(5, len(execs))
_set_expiration_policy_config(1, 30, 100)
expiration_policy.run_execution_expiration_policy(self, ctx) expiration_policy.run_execution_expiration_policy(self, ctx)
# Assert the two running executions # Assert the two running executions
@ -328,27 +278,11 @@ class ExpirationPolicyTest(base.DbTestCase):
sorted([ex.id for ex in execs]) sorted([ex.id for ex in execs])
) )
def test_negative_wrong_conf_values(self):
_set_expiration_policy_config(None, None, None)
e_policy = expiration_policy.ExecutionExpirationPolicy(cfg.CONF)
self.assertDictEqual({}, e_policy._periodic_spacing)
self.assertListEqual([], e_policy._periodic_tasks)
_set_expiration_policy_config(None, 60, None)
e_policy = expiration_policy.ExecutionExpirationPolicy(cfg.CONF)
self.assertDictEqual({}, e_policy._periodic_spacing)
self.assertListEqual([], e_policy._periodic_tasks)
_set_expiration_policy_config(60, None, None)
e_policy = expiration_policy.ExecutionExpirationPolicy(cfg.CONF)
self.assertDictEqual({}, e_policy._periodic_spacing)
self.assertListEqual([], e_policy._periodic_tasks)
def test_periodic_task_parameters(self): def test_periodic_task_parameters(self):
_set_expiration_policy_config(17, 13, None) _set_expiration_policy_config(
evaluation_interval=17,
older_than=13
)
e_policy = expiration_policy.ExecutionExpirationPolicy(cfg.CONF) e_policy = expiration_policy.ExecutionExpirationPolicy(cfg.CONF)
@ -374,11 +308,15 @@ class ExpirationPolicyTest(base.DbTestCase):
"Periodic task shouldn't have been created." "Periodic task shouldn't have been created."
) )
_assert_scheduling([1, 1, None], True) _assert_scheduling([1, 1, None, None], True)
_assert_scheduling([1, None, 1], True) _assert_scheduling([1, None, 1, None], True)
_assert_scheduling([1, 1, 1], True) _assert_scheduling([1, 1, 1, None], True)
_assert_scheduling([1, None, None], False) _assert_scheduling([1, None, None, None], False)
_assert_scheduling([None, 1, 1], False) _assert_scheduling([None, 1, 1, None], False)
_assert_scheduling([None, 1, 1, None], False)
_assert_scheduling([1, 0, 0, 0], False)
_assert_scheduling([0, 1, 1, 0], False)
_assert_scheduling([0, 1, 1, 0], False)
def tearDown(self): def tearDown(self):
"""Restores the size limit config to default.""" """Restores the size limit config to default."""
@ -388,10 +326,11 @@ class ExpirationPolicyTest(base.DbTestCase):
ctx.set_ctx(None) ctx.set_ctx(None)
_set_expiration_policy_config(None, None, None) _set_expiration_policy_config(None, None, None, None)
def _set_expiration_policy_config(evaluation_interval, older_than, mfe): def _set_expiration_policy_config(evaluation_interval, older_than, mfe=0,
batch_size=0):
cfg.CONF.set_default( cfg.CONF.set_default(
'evaluation_interval', 'evaluation_interval',
evaluation_interval, evaluation_interval,
@ -407,3 +346,8 @@ def _set_expiration_policy_config(evaluation_interval, older_than, mfe):
mfe, mfe,
group='execution_expiration_policy' group='execution_expiration_policy'
) )
cfg.CONF.set_default(
'batch_size',
batch_size,
group='execution_expiration_policy'
)