Merge "Make sure there are no duplicates in the spec cache w/o restarts"
This commit is contained in:
commit
bfb6548027
@ -33,9 +33,11 @@ V2_0 = '2.0'
|
||||
ALL_VERSIONS = [V2_0]
|
||||
|
||||
|
||||
# {workflow execution id => workflow specification}.
|
||||
_WF_EX_CACHE = cachetools.LRUCache(maxsize=100)
|
||||
_WF_EX_CACHE_LOCK = threading.RLock()
|
||||
|
||||
# {(workflow def id, workflow def updated at) => workflow specification}.
|
||||
_WF_DEF_CACHE = cachetools.LRUCache(maxsize=100)
|
||||
_WF_DEF_CACHE_LOCK = threading.RLock()
|
||||
|
||||
@ -203,9 +205,12 @@ def get_workflow_spec_by_execution_id(wf_ex_id):
|
||||
|
||||
The idea is that when a workflow execution is running we
|
||||
must be getting the same workflow specification even if
|
||||
the workflow definition has already changed. However, note
|
||||
that this is true only if the current engine instance didn't
|
||||
restart during the entire workflow execution run.
|
||||
|
||||
:param wf_ex_id: Workflow execution id.
|
||||
:return: Workflow specification.
|
||||
:return: Workflow specification.
|
||||
"""
|
||||
if not wf_ex_id:
|
||||
return None
|
||||
@ -230,8 +235,8 @@ def get_workflow_spec_by_definition_id(wf_def_id, wf_def_updated_at):
|
||||
|
||||
:param wf_def_id: Workflow definition id.
|
||||
:param wf_def_updated_at: Workflow definition 'updated_at' value. It
|
||||
serves only as part of cache key and is not explicitly used in the
|
||||
method.
|
||||
serves only as part of cache key and is not explicitly used in the
|
||||
method.
|
||||
:return: Workflow specification.
|
||||
"""
|
||||
if not wf_def_id:
|
||||
|
@ -17,6 +17,7 @@ from mistral.lang import parser as spec_parser
|
||||
from mistral.services import workbooks as wb_service
|
||||
from mistral.services import workflows as wf_service
|
||||
from mistral.tests.unit import base
|
||||
from mistral.tests.unit.engine import base as engine_base
|
||||
from mistral.workflow import states
|
||||
|
||||
|
||||
@ -236,3 +237,61 @@ class SpecificationCachingTest(base.DbTestCase):
|
||||
)
|
||||
|
||||
self.assertEqual(2, len(wf_spec_by_exec_id.get_tasks()))
|
||||
|
||||
|
||||
class SpecificationCachingEngineTest(engine_base.EngineTestCase):
|
||||
def test_cache_workflow_spec_no_duplicates(self):
|
||||
wfs_text = """
|
||||
version: '2.0'
|
||||
|
||||
wf:
|
||||
tasks:
|
||||
task1:
|
||||
action: std.noop
|
||||
on-success:
|
||||
- task2
|
||||
- task3
|
||||
|
||||
task2:
|
||||
workflow: sub_wf my_param="val1"
|
||||
|
||||
task3:
|
||||
workflow: sub_wf my_param="val2"
|
||||
|
||||
sub_wf:
|
||||
input:
|
||||
- my_param
|
||||
|
||||
tasks:
|
||||
task1:
|
||||
action: std.echo output="Param value is <% $.my_param %>"
|
||||
"""
|
||||
|
||||
wfs = wf_service.create_workflows(wfs_text)
|
||||
|
||||
self.assertEqual(2, len(wfs))
|
||||
|
||||
self.assertEqual(0, spec_parser.get_wf_execution_spec_cache_size())
|
||||
self.assertEqual(0, spec_parser.get_wf_definition_spec_cache_size())
|
||||
|
||||
wf_ex = self.engine.start_workflow('wf')
|
||||
|
||||
self.await_workflow_success(wf_ex.id)
|
||||
|
||||
# We expect to have a cache entry for every workflow execution
|
||||
# but two of them should refer to the same object.
|
||||
self.assertEqual(3, spec_parser.get_wf_execution_spec_cache_size())
|
||||
self.assertEqual(2, spec_parser.get_wf_definition_spec_cache_size())
|
||||
|
||||
sub_wf_execs = db_api.get_workflow_executions(name='sub_wf')
|
||||
|
||||
self.assertEqual(2, len(sub_wf_execs))
|
||||
|
||||
spec1 = spec_parser.get_workflow_spec_by_execution_id(
|
||||
sub_wf_execs[0].id
|
||||
)
|
||||
spec2 = spec_parser.get_workflow_spec_by_execution_id(
|
||||
sub_wf_execs[1].id
|
||||
)
|
||||
|
||||
self.assertIs(spec1, spec2)
|
||||
|
Loading…
Reference in New Issue
Block a user