From 09c08864dc5e13dc8b7bdff5898ca03c48df8229 Mon Sep 17 00:00:00 2001 From: Renat Akhmerov Date: Fri, 29 Jul 2016 17:13:03 +0700 Subject: [PATCH] Invalidate workflow spec cache on workflow definition updates * Workflow spec cache is updated on update of workbook and workflow * Fixed scheduler docstring Change-Id: I64042db4aa902166d2e80ea30adb7cbca1515c39 --- mistral/services/scheduler.py | 16 ++-- mistral/services/workbooks.py | 47 +++++++--- mistral/services/workflows.py | 5 + .../tests/unit/workbook/test_spec_caching.py | 94 +++++++++++++++++++ mistral/workbook/parser.py | 11 ++- 5 files changed, 148 insertions(+), 25 deletions(-) diff --git a/mistral/services/scheduler.py b/mistral/services/scheduler.py index 6bc56021..04339ddc 100644 --- a/mistral/services/scheduler.py +++ b/mistral/services/scheduler.py @@ -105,15 +105,11 @@ class CallScheduler(periodic_task.PeriodicTasks): time_filter = datetime.datetime.now() + datetime.timedelta( seconds=1) - # Wrap delayed calls processing in transaction to - # guarantee that calls will be processed just once. - # Do delete query to DB first to force hanging up all - # parallel transactions. - # It should work on isolation level 'READ-COMMITTED', - # 'REPEATABLE-READ' and above. - # - # 'REPEATABLE-READ' is by default in MySQL and - # 'READ-COMMITTED is by default in PostgreSQL. + # Wrap delayed calls processing in transaction to guarantee that calls + # will be processed just once. Do delete query to DB first to force + # hanging up all parallel transactions. + # It should work with transactions which run at least 'READ-COMMITTED' + # mode. delayed_calls = [] with db_api.transaction(): @@ -128,7 +124,7 @@ class CallScheduler(periodic_task.PeriodicTasks): result, number_of_updated = db_api.update_delayed_call( id=call.id, values={'processing': True}, - query_filter={"processing": False} + query_filter={'processing': False} ) # If number_of_updated != 1 other scheduler already diff --git a/mistral/services/workbooks.py b/mistral/services/workbooks.py index 00379c82..f8a981f4 100644 --- a/mistral/services/workbooks.py +++ b/mistral/services/workbooks.py @@ -20,8 +20,10 @@ from mistral.workbook import parser as spec_parser def create_workbook_v2(definition, scope='private'): + wb_spec = spec_parser.get_workbook_spec_from_yaml(definition) + wb_values = _get_workbook_values( - spec_parser.get_workbook_spec_from_yaml(definition), + wb_spec, definition, scope ) @@ -29,34 +31,38 @@ def create_workbook_v2(definition, scope='private'): with db_api_v2.transaction(): wb_db = db_api_v2.create_workbook(wb_values) - _on_workbook_update(wb_db, wb_values) + _on_workbook_update(wb_db, wb_spec) return wb_db def update_workbook_v2(definition, scope='private'): - values = _get_workbook_values( - spec_parser.get_workbook_spec_from_yaml(definition), - definition, - scope - ) + wb_spec = spec_parser.get_workbook_spec_from_yaml(definition) + + values = _get_workbook_values(wb_spec, definition, scope) with db_api_v2.transaction(): wb_db = db_api_v2.update_workbook(values['name'], values) - _on_workbook_update(wb_db, values) + _, db_wfs = _on_workbook_update(wb_db, wb_spec) + + # Once transaction has committed we need to update specification cache. + for db_wf, wf_spec in zip(db_wfs, wb_spec.get_workflows()): + spec_parser.update_workflow_cache(db_wf.id, wf_spec) return wb_db -def _on_workbook_update(wb_db, values): - wb_spec = spec_parser.get_workbook_spec(values['spec']) +def _on_workbook_update(wb_db, wb_spec): + db_actions = _create_or_update_actions(wb_db, wb_spec.get_actions()) + db_wfs = _create_or_update_workflows(wb_db, wb_spec.get_workflows()) - _create_or_update_actions(wb_db, wb_spec.get_actions()) - _create_or_update_workflows(wb_db, wb_spec.get_workflows()) + return db_actions, db_wfs def _create_or_update_actions(wb_db, actions_spec): + db_actions = [] + if actions_spec: for action_spec in actions_spec: action_name = '%s.%s' % (wb_db.name, action_spec.get_name()) @@ -76,10 +82,19 @@ def _create_or_update_actions(wb_db, actions_spec): 'project_id': wb_db.project_id } - db_api_v2.create_or_update_action_definition(action_name, values) + db_actions.append( + db_api_v2.create_or_update_action_definition( + action_name, + values + ) + ) + + return db_actions def _create_or_update_workflows(wb_db, workflows_spec): + db_wfs = [] + if workflows_spec: for wf_spec in workflows_spec: wf_name = '%s.%s' % (wb_db.name, wf_spec.get_name()) @@ -93,7 +108,11 @@ def _create_or_update_workflows(wb_db, workflows_spec): 'tags': wf_spec.get_tags() } - db_api_v2.create_or_update_workflow_definition(wf_name, values) + db_wfs.append( + db_api_v2.create_or_update_workflow_definition(wf_name, values) + ) + + return db_wfs def _get_workbook_values(wb_spec, definition, scope): diff --git a/mistral/services/workflows.py b/mistral/services/workflows.py index 25e375b3..233a90d0 100644 --- a/mistral/services/workflows.py +++ b/mistral/services/workflows.py @@ -101,6 +101,11 @@ def update_workflows(definition, scope='private', identifier=None): identifier=identifier )) + # Once transaction has committed we need to update specification cache. + + for db_wf, wf_spec in zip(db_wfs, wf_list_spec.get_workflows()): + spec_parser.update_workflow_cache(db_wf.id, wf_spec) + return db_wfs diff --git a/mistral/tests/unit/workbook/test_spec_caching.py b/mistral/tests/unit/workbook/test_spec_caching.py index 8e8996f1..5d863516 100644 --- a/mistral/tests/unit/workbook/test_spec_caching.py +++ b/mistral/tests/unit/workbook/test_spec_caching.py @@ -12,6 +12,8 @@ # See the License for the specific language governing permissions and # limitations under the License. +from mistral.db.v2 import api as db_api +from mistral.services import workbooks as wb_service from mistral.services import workflows as wf_service from mistral.tests.unit import base from mistral.workbook import parser as spec_parser @@ -36,3 +38,95 @@ class SpecificationCachingTest(base.DbTestCase): self.assertIsNotNone(wf_spec) self.assertEqual(1, spec_parser.get_workflow_spec_cache_size()) + + def test_workflow_spec_cache_update_via_workflow_service(self): + wf_text = """ + version: '2.0' + + wf: + tasks: + task1: + action: std.echo output="Echo" + """ + + wfs = wf_service.create_workflows(wf_text) + + self.assertEqual(0, spec_parser.get_workflow_spec_cache_size()) + + wf_spec = spec_parser.get_workflow_spec_by_id(wfs[0].id) + + self.assertEqual(1, len(wf_spec.get_tasks())) + self.assertEqual(1, spec_parser.get_workflow_spec_cache_size()) + + # Now update workflow definition and check that cache is updated too. + + wf_text = """ + version: '2.0' + + wf: + tasks: + task1: + action: std.echo output="1" + + task2: + action: std.echo output="2" + """ + + wfs = wf_service.update_workflows(wf_text) + + self.assertEqual(1, spec_parser.get_workflow_spec_cache_size()) + + wf_spec = spec_parser.get_workflow_spec_by_id(wfs[0].id) + + self.assertEqual(2, len(wf_spec.get_tasks())) + self.assertEqual(1, spec_parser.get_workflow_spec_cache_size()) + + def test_workflow_spec_cache_update_via_workbook_service(self): + wb_text = """ + version: '2.0' + + name: wb + + workflows: + wf: + tasks: + task1: + action: std.echo output="Echo" + """ + + wb_service.create_workbook_v2(wb_text) + + self.assertEqual(0, spec_parser.get_workflow_spec_cache_size()) + + wf = db_api.get_workflow_definition('wb.wf') + + wf_spec = spec_parser.get_workflow_spec_by_id(wf.id) + + self.assertEqual(1, len(wf_spec.get_tasks())) + self.assertEqual(1, spec_parser.get_workflow_spec_cache_size()) + + # Now update workflow definition and check that cache is updated too. + + wb_text = """ + version: '2.0' + + name: wb + + workflows: + wf: + tasks: + task1: + action: std.echo output="1" + + task2: + action: std.echo output="2" + """ + + wb_service.update_workbook_v2(wb_text) + + self.assertEqual(1, spec_parser.get_workflow_spec_cache_size()) + + wf_spec = spec_parser.get_workflow_spec_by_id(wf.id) + + self.assertEqual(2, len(wf_spec.get_tasks())) + self.assertEqual(1, spec_parser.get_workflow_spec_cache_size()) diff --git a/mistral/workbook/parser.py b/mistral/workbook/parser.py index a12760c3..86af531a 100644 --- a/mistral/workbook/parser.py +++ b/mistral/workbook/parser.py @@ -14,6 +14,7 @@ # limitations under the License. from cachetools import cached +from cachetools import hashkey from cachetools import LRUCache from threading import RLock import yaml @@ -35,6 +36,7 @@ ALL_VERSIONS = [V2_0] _WF_CACHE = LRUCache(maxsize=100) +_WF_CACHE_LOCK = RLock() def parse_yaml(text): @@ -186,7 +188,8 @@ def _parse_def_from_wb(wb_def, section_name, item_name): # Methods for obtaining specifications in a more efficient way using # caching techniques. -@cached(_WF_CACHE, lock=RLock()) + +@cached(_WF_CACHE, lock=_WF_CACHE_LOCK) def get_workflow_spec_by_id(wf_def_id): if not wf_def_id: return None @@ -203,3 +206,9 @@ def get_workflow_spec_cache_size(): def clear_caches(): """Clears all specification caches.""" _WF_CACHE.clear() + + +def update_workflow_cache(wf_def_id, spec): + with _WF_CACHE_LOCK: + # We have to use hashkey function because @cached uses it implicitly. + _WF_CACHE[hashkey(wf_def_id)] = spec