Copy cached WF spec stored by definition id into WF execution cache

* When we start a workflow we don't need to reparse a workflow
  specification if we request it by workflow execution id for the
  first time. Instead, we can just update workflow execution
  specification cache with the value from workflow definition
  specification cache. For large workflows, it significantly
  reduces time of workflow start.

Change-Id: Ibe4c788040996e3f119c96cb130fdc95d46286d9
This commit is contained in:
Renat Akhmerov 2016-09-12 17:12:10 +03:00
parent 698248363c
commit a0f6c7ae3f
3 changed files with 90 additions and 0 deletions

View File

@ -228,6 +228,11 @@ class Workflow(object):
data_flow.add_environment_to_context(self.wf_ex)
data_flow.add_workflow_variables_to_context(self.wf_ex, self.wf_spec)
spec_parser.cache_workflow_spec_by_execution_id(
self.wf_ex.id,
self.wf_spec
)
@profiler.trace('workflow-set-state')
def set_state(self, state, state_info=None, recursive=False):
assert self.wf_ex

View File

@ -17,6 +17,7 @@ from mistral.services import workbooks as wb_service
from mistral.services import workflows as wf_service
from mistral.tests.unit import base
from mistral.workbook import parser as spec_parser
from mistral.workflow import states
class SpecificationCachingTest(base.DbTestCase):
@ -156,3 +157,82 @@ class SpecificationCachingTest(base.DbTestCase):
self.assertEqual(2, len(wf_spec.get_tasks()))
self.assertEqual(0, spec_parser.get_wf_execution_spec_cache_size())
self.assertEqual(2, spec_parser.get_wf_definition_spec_cache_size())
def test_update_workflow_spec_for_execution(self):
wf_text = """
version: '2.0'
wf:
tasks:
task1:
action: std.echo output="Echo"
"""
wfs = wf_service.create_workflows(wf_text)
self.assertEqual(0, spec_parser.get_wf_execution_spec_cache_size())
self.assertEqual(0, spec_parser.get_wf_definition_spec_cache_size())
wf_def = wfs[0]
wf_spec = spec_parser.get_workflow_spec_by_definition_id(
wf_def.id,
wf_def.updated_at
)
self.assertEqual(1, len(wf_spec.get_tasks()))
self.assertEqual(0, spec_parser.get_wf_execution_spec_cache_size())
self.assertEqual(1, spec_parser.get_wf_definition_spec_cache_size())
wf_ex = db_api.create_workflow_execution({
'id': '1-2-3-4',
'name': 'wf',
'workflow_id': wf_def.id,
'spec': wf_spec.to_dict(),
'state': states.RUNNING
})
# Check that we can get a valid spec by execution id.
wf_spec_by_exec_id = spec_parser.get_workflow_spec_by_execution_id(
wf_ex.id
)
self.assertEqual(1, len(wf_spec_by_exec_id.get_tasks()))
# Now update workflow definition and check that cache is updated too.
wf_text = """
version: '2.0'
wf:
tasks:
task1:
action: std.echo output="1"
task2:
action: std.echo output="2"
"""
wfs = wf_service.update_workflows(wf_text)
self.assertEqual(1, spec_parser.get_wf_definition_spec_cache_size())
wf_spec = spec_parser.get_workflow_spec_by_definition_id(
wfs[0].id,
wfs[0].updated_at
)
self.assertEqual(2, len(wf_spec.get_tasks()))
self.assertEqual(2, spec_parser.get_wf_definition_spec_cache_size())
self.assertEqual(1, spec_parser.get_wf_execution_spec_cache_size())
# Now finally update execution cache and check that we can
# get a valid spec by execution id.
spec_parser.cache_workflow_spec_by_execution_id(wf_ex.id, wf_spec)
wf_spec_by_exec_id = spec_parser.get_workflow_spec_by_execution_id(
wf_ex.id
)
self.assertEqual(2, len(wf_spec_by_exec_id.get_tasks()))

View File

@ -234,6 +234,11 @@ def get_workflow_spec_by_definition_id(wf_def_id, wf_def_updated_at):
return get_workflow_spec(wf_def.spec)
def cache_workflow_spec_by_execution_id(wf_ex_id, wf_spec):
with _WF_EX_CACHE_LOCK:
_WF_EX_CACHE[cachetools.hashkey(wf_ex_id)] = wf_spec
def get_wf_execution_spec_cache_size():
return len(_WF_EX_CACHE)