Fix workflow execution cascade delete error
If the cascade deletion error is identified, then resort to recursively deleting the subworkflows before deleting the given execution. Closes-Bug: https://bugs.launchpad.net/mistral/+bug/1832300 Change-Id: I6e0d9db93e103eaf3a329dfa1b4153975c135008
This commit is contained in:
parent
74b2fffec2
commit
f5f9a1dffd
|
@ -16,6 +16,7 @@
|
|||
|
||||
import contextlib
|
||||
import datetime
|
||||
import re
|
||||
import sys
|
||||
import threading
|
||||
|
||||
|
@ -889,13 +890,59 @@ def delete_workflow_execution(id, session=None):
|
|||
insecure = context.ctx().is_admin
|
||||
query = b.model_query(model) if insecure else _secure_query(model)
|
||||
|
||||
count = query.filter(
|
||||
models.WorkflowExecution.id == id).delete()
|
||||
try:
|
||||
count = query.filter(
|
||||
models.WorkflowExecution.id == id
|
||||
).delete()
|
||||
|
||||
if count == 0:
|
||||
raise exc.DBEntityNotFoundError(
|
||||
"WorkflowExecution not found [id=%s]" % id
|
||||
)
|
||||
if count == 0:
|
||||
raise exc.DBEntityNotFoundError(
|
||||
"WorkflowExecution not found [id=%s]" % id
|
||||
)
|
||||
except db_exc.DBError as e:
|
||||
if is_mysql_max_depth_error(e) or is_mariadb_max_depth_error(e):
|
||||
# https://bugs.launchpad.net/mistral/+bug/1832300
|
||||
# mysql cascade delete error
|
||||
delete_workflow_execution_recurse(id)
|
||||
else:
|
||||
raise
|
||||
|
||||
|
||||
def is_mysql_max_depth_error(e):
|
||||
pattern = ".*3008.*Foreign key cascade delete" \
|
||||
"/update exceeds max depth of 15.*"
|
||||
return re.match(pattern, str(e))
|
||||
|
||||
|
||||
def is_mariadb_max_depth_error(e):
|
||||
pattern = ".*Got error 193.*ON DELETE CASCADE.*"
|
||||
return re.match(pattern, str(e))
|
||||
|
||||
|
||||
def delete_workflow_execution_recurse(wf_ex_id):
|
||||
sub_wf_ex_ids = _get_all_direct_subworkflows(wf_ex_id)
|
||||
|
||||
for sub_wf_ex_id in sub_wf_ex_ids:
|
||||
delete_workflow_execution(sub_wf_ex_id)
|
||||
|
||||
delete_workflow_execution(wf_ex_id)
|
||||
|
||||
|
||||
def _get_all_direct_subworkflows(wf_ex_id):
|
||||
model = models.WorkflowExecution
|
||||
insecure = context.ctx().is_admin
|
||||
if insecure:
|
||||
query = b.model_query(model, columns=['id'])
|
||||
else:
|
||||
query = _secure_query(model, model.id)
|
||||
query = query.join(
|
||||
models.TaskExecution,
|
||||
models.WorkflowExecution.task_execution_id == models.TaskExecution.id
|
||||
).filter(
|
||||
models.TaskExecution.workflow_execution_id == wf_ex_id
|
||||
)
|
||||
|
||||
return [i[0] for i in query.all()]
|
||||
|
||||
|
||||
@b.session_aware()
|
||||
|
|
|
@ -3376,3 +3376,35 @@ class LockTest(SQLAlchemyTest):
|
|||
|
||||
# Make sure that outside 'with' section the lock record does not exist.
|
||||
self.assertEqual(0, len(db_api.get_named_locks()))
|
||||
|
||||
def test_internal_get_direct_subworkflows(self):
|
||||
def wex(wex_id, tex_id=None):
|
||||
db_api.create_workflow_execution(
|
||||
{'id': wex_id, 'name': wex_id, 'task_execution_id': tex_id}
|
||||
)
|
||||
|
||||
def tex(tex_id, wex_id):
|
||||
db_api.create_task_execution(
|
||||
{'id': tex_id, 'name': tex_id, 'workflow_execution_id': wex_id}
|
||||
)
|
||||
|
||||
def assert_subworkflows(expected):
|
||||
self.assertEqual(
|
||||
set(expected), set(db_api._get_all_direct_subworkflows('root'))
|
||||
)
|
||||
|
||||
wex('root')
|
||||
tex('t1', 'root')
|
||||
wex('sub1', 't1')
|
||||
|
||||
assert_subworkflows(['sub1'])
|
||||
|
||||
tex('t2', 'root')
|
||||
wex('sub2', 't1')
|
||||
|
||||
assert_subworkflows(['sub1', 'sub2'])
|
||||
|
||||
tex('sub1t1', 'sub1')
|
||||
wex('sub1sub1', 'sub1t1')
|
||||
|
||||
assert_subworkflows(['sub1', 'sub2'])
|
||||
|
|
|
@ -501,3 +501,38 @@ class SubworkflowsTest(base.EngineTestCase):
|
|||
self.assertEqual(0, len(db_api.get_workflow_executions()))
|
||||
self.assertEqual(0, len(db_api.get_task_executions()))
|
||||
self.assertEqual(0, len(db_api.get_action_executions()))
|
||||
|
||||
def test_cascade_delete_deep(self):
|
||||
wf_text = """
|
||||
version: 2.0
|
||||
|
||||
wf:
|
||||
input:
|
||||
- level
|
||||
tasks:
|
||||
initial:
|
||||
action: std.noop
|
||||
on-success:
|
||||
- recurse: <% $.level > 0 %>
|
||||
|
||||
recurse:
|
||||
workflow: wf
|
||||
input:
|
||||
level: <% $.level - 1 %>
|
||||
"""
|
||||
|
||||
wf_service.create_workflows(wf_text)
|
||||
|
||||
wf_ex = self.engine.start_workflow('wf', wf_input={"level": 7})
|
||||
|
||||
self.await_workflow_success(wf_ex.id)
|
||||
|
||||
self.assertEqual(8, len(db_api.get_workflow_executions()))
|
||||
|
||||
# Now delete the root workflow execution and make sure that
|
||||
# all dependent objects are deleted as well.
|
||||
db_api.delete_workflow_execution(wf_ex.id)
|
||||
|
||||
self.assertEqual(0, len(db_api.get_workflow_executions()))
|
||||
self.assertEqual(0, len(db_api.get_task_executions()))
|
||||
self.assertEqual(0, len(db_api.get_action_executions()))
|
||||
|
|
Loading…
Reference in New Issue