Use "passive_deletes=True" in ORM relationships
* Using "passive_deletes=True" when configuring relationships with SQLAlchemy significantly reduces memory footprint when deleting a graph of objects (e.g. root workflow execution with all dependent tasks, actions and sub workflows) from DB. This happens because in this case SQLAlchemy doesn't load those dependent objects that are not yet in the session and rather relies on cascading mechanisms of databases which are activated by the proper configuration of foreign key columns. For example, "ON DELETE CASCADE" clause in the foreign key definition in case of for MySQL. Particularly, this change reduces memory footprint of API processes when deleting large graphs of executions and execution expiration policy mechanism where the same happens but on the engine side. * Added a test to make sure that cascaded deletion works well with this new property. Related-Bug: #1757966 Change-Id: I36988817fb8e7695094ef98958b8aa151208fadb
This commit is contained in:
parent
6652a01efc
commit
afb0f584c5
@ -101,6 +101,7 @@ def launch_notifier():
|
|||||||
|
|
||||||
def launch_api():
|
def launch_api():
|
||||||
server = api_service.WSGIService('mistral_api')
|
server = api_service.WSGIService('mistral_api')
|
||||||
|
|
||||||
launch_process(server, workers=server.workers)
|
launch_process(server, workers=server.workers)
|
||||||
|
|
||||||
|
|
||||||
|
@ -290,7 +290,8 @@ TaskExecution.action_executions = relationship(
|
|||||||
backref=backref('task_execution', remote_side=[TaskExecution.id]),
|
backref=backref('task_execution', remote_side=[TaskExecution.id]),
|
||||||
cascade='all, delete-orphan',
|
cascade='all, delete-orphan',
|
||||||
foreign_keys=ActionExecution.task_execution_id,
|
foreign_keys=ActionExecution.task_execution_id,
|
||||||
lazy='select'
|
lazy='select',
|
||||||
|
passive_deletes=True
|
||||||
)
|
)
|
||||||
|
|
||||||
sa.Index(
|
sa.Index(
|
||||||
@ -311,7 +312,8 @@ TaskExecution.workflow_executions = relationship(
|
|||||||
backref=backref('task_execution', remote_side=[TaskExecution.id]),
|
backref=backref('task_execution', remote_side=[TaskExecution.id]),
|
||||||
cascade='all, delete-orphan',
|
cascade='all, delete-orphan',
|
||||||
foreign_keys=WorkflowExecution.task_execution_id,
|
foreign_keys=WorkflowExecution.task_execution_id,
|
||||||
lazy='select'
|
lazy='select',
|
||||||
|
passive_deletes=True
|
||||||
)
|
)
|
||||||
|
|
||||||
sa.Index(
|
sa.Index(
|
||||||
@ -339,7 +341,8 @@ WorkflowExecution.task_executions = relationship(
|
|||||||
backref=backref('workflow_execution', remote_side=[WorkflowExecution.id]),
|
backref=backref('workflow_execution', remote_side=[WorkflowExecution.id]),
|
||||||
cascade='all, delete-orphan',
|
cascade='all, delete-orphan',
|
||||||
foreign_keys=TaskExecution.workflow_execution_id,
|
foreign_keys=TaskExecution.workflow_execution_id,
|
||||||
lazy='select'
|
lazy='select',
|
||||||
|
passive_deletes=True
|
||||||
)
|
)
|
||||||
|
|
||||||
sa.Index(
|
sa.Index(
|
||||||
|
@ -21,6 +21,7 @@ from mistral import context as auth_context
|
|||||||
from mistral.db.v2 import api as db_api
|
from mistral.db.v2 import api as db_api
|
||||||
from mistral import exceptions as exc
|
from mistral import exceptions as exc
|
||||||
from mistral.services import workbooks as wb_service
|
from mistral.services import workbooks as wb_service
|
||||||
|
from mistral.services import workflows as wf_service
|
||||||
from mistral.tests.unit.engine import base
|
from mistral.tests.unit.engine import base
|
||||||
from mistral.workflow import states
|
from mistral.workflow import states
|
||||||
|
|
||||||
@ -455,3 +456,44 @@ class SubworkflowsTest(base.EngineTestCase):
|
|||||||
self.assertIsNone(wf1_ex.root_execution_id, None)
|
self.assertIsNone(wf1_ex.root_execution_id, None)
|
||||||
self.assertEqual(wf2_ex.root_execution_id, wf1_ex.id)
|
self.assertEqual(wf2_ex.root_execution_id, wf1_ex.id)
|
||||||
self.assertEqual(wf3_ex.root_execution_id, wf1_ex.id)
|
self.assertEqual(wf3_ex.root_execution_id, wf1_ex.id)
|
||||||
|
|
||||||
|
def test_cascade_delete(self):
|
||||||
|
wf_text = """
|
||||||
|
version: 2.0
|
||||||
|
|
||||||
|
wf:
|
||||||
|
tasks:
|
||||||
|
task1:
|
||||||
|
workflow: sub_wf1
|
||||||
|
|
||||||
|
task2:
|
||||||
|
workflow: sub_wf2
|
||||||
|
|
||||||
|
sub_wf1:
|
||||||
|
tasks:
|
||||||
|
task1:
|
||||||
|
action: std.noop
|
||||||
|
|
||||||
|
sub_wf2:
|
||||||
|
tasks:
|
||||||
|
task1:
|
||||||
|
action: std.noop
|
||||||
|
"""
|
||||||
|
|
||||||
|
wf_service.create_workflows(wf_text)
|
||||||
|
|
||||||
|
wf_ex = self.engine.start_workflow('wf')
|
||||||
|
|
||||||
|
self.await_workflow_success(wf_ex.id)
|
||||||
|
|
||||||
|
self.assertEqual(3, len(db_api.get_workflow_executions()))
|
||||||
|
self.assertEqual(4, len(db_api.get_task_executions()))
|
||||||
|
self.assertEqual(2, len(db_api.get_action_executions()))
|
||||||
|
|
||||||
|
# Now delete the root workflow execution and make sure that
|
||||||
|
# all dependent objects are deleted as well.
|
||||||
|
db_api.delete_workflow_execution(wf_ex.id)
|
||||||
|
|
||||||
|
self.assertEqual(0, len(db_api.get_workflow_executions()))
|
||||||
|
self.assertEqual(0, len(db_api.get_task_executions()))
|
||||||
|
self.assertEqual(0, len(db_api.get_action_executions()))
|
||||||
|
Loading…
Reference in New Issue
Block a user