Fix specification caching mechanism

* The problem was that cache instance was instantiated on different
  Mistral instances and invalidation didn't work properly (known
  problem for local cache implementations). The solution, first of
  all, is to cache specifications by workflow execution ids so that
  we get a consistent spec value during workflow execution lifetime.
  And secondly, if we need to build a specification based on
  workflow definition id we also need to use 'updated_at' as part of
  cache key so that if workflow definition has changed the cache is
  updated properly. Old cache entities would be kicked out of cache
  by LRU algorithm as the cache runs out of space.

Change-Id: If97b2e47d8adcbd7b5d5844b56e24eac1b1ae6c1
This commit is contained in:
Renat Akhmerov 2016-08-09 18:56:50 +07:00
parent 297fe921e1
commit f2445acb28
12 changed files with 234 additions and 129 deletions

View File

@ -36,6 +36,63 @@ from mistral import utils
LOG = logging.getLogger(__name__)
def _get_hash_function_by(column_name):
def calc_hash(context):
val = context.current_parameters[column_name] or {}
if isinstance(val, dict):
# If the value is a dictionary we need to make sure to have
# keys in the same order in a string representation.
hash_base = json.dumps(sorted(val.items()))
else:
hash_base = str(val)
return hashlib.sha256(hash_base.encode('utf-8')).hexdigest()
return calc_hash
def validate_long_type_length(cls, field_name, value):
"""Makes sure the value does not exceeds the maximum size."""
if value:
# Get the configured limit.
size_limit_kb = cfg.CONF.engine.execution_field_size_limit_kb
# If the size is unlimited.
if size_limit_kb < 0:
return
size_kb = int(sys.getsizeof(str(value)) / 1024)
if size_kb > size_limit_kb:
LOG.error(
"Size limit %dKB exceed for class [%s], "
"field %s of size %dKB.",
size_limit_kb, str(cls), field_name, size_kb
)
raise exc.SizeLimitExceededException(
field_name,
size_kb,
size_limit_kb
)
def register_length_validator(attr_name):
"""Register an event listener on the attribute.
This event listener will validate the size every
time a 'set' occurs.
"""
for cls in utils.iter_subclasses(Execution):
if hasattr(cls, attr_name):
event.listen(
getattr(cls, attr_name),
'set',
lambda t, v, o, i: validate_long_type_length(cls, attr_name, v)
)
class Definition(mb.MistralSecureModelBase):
__abstract__ = True
@ -200,46 +257,6 @@ for cls in utils.iter_subclasses(Execution):
)
def validate_long_type_length(cls, field_name, value):
"""Makes sure the value does not exceeds the maximum size."""
if value:
# Get the configured limit.
size_limit_kb = cfg.CONF.engine.execution_field_size_limit_kb
# If the size is unlimited.
if size_limit_kb < 0:
return
size_kb = int(sys.getsizeof(str(value)) / 1024)
if size_kb > size_limit_kb:
LOG.error(
"Size limit %dKB exceed for class [%s], "
"field %s of size %dKB.",
size_limit_kb, str(cls), field_name, size_kb
)
raise exc.SizeLimitExceededException(
field_name,
size_kb,
size_limit_kb
)
def register_length_validator(attr_name):
"""Register an event listener on the attribute.
This event listener will validate the size every
time a 'set' occurs.
"""
for cls in utils.iter_subclasses(Execution):
if hasattr(cls, attr_name):
event.listen(
getattr(cls, attr_name),
'set',
lambda t, v, o, i: validate_long_type_length(cls, attr_name, v)
)
# Many-to-one for 'ActionExecution' and 'TaskExecution'.
ActionExecution.task_execution_id = sa.Column(
@ -350,16 +367,6 @@ class Environment(mb.MistralSecureModelBase):
variables = sa.Column(st.JsonDictType())
def _get_hash_function_by(column_name):
def calc_hash(context):
d = context.current_parameters[column_name] or {}
return hashlib.sha256(json.dumps(sorted(d.items())).
encode('utf-8')).hexdigest()
return calc_hash
class CronTrigger(mb.MistralSecureModelBase):
"""Contains info about cron triggers."""

View File

@ -61,7 +61,9 @@ def _build_action(action_ex):
if action_ex.workflow_name:
wf_name = action_ex.workflow_name
wf_spec = spec_parser.get_workflow_spec_by_id(action_ex.workflow_id)
wf_spec = spec_parser.get_workflow_spec_by_execution_id(
action_ex.task_execution.workflow_execution_id
)
wf_spec_name = wf_spec.get_name()
adhoc_action_name = action_ex.runtime_context.get('adhoc_action_name')

View File

@ -451,8 +451,8 @@ class WorkflowAction(Action):
assert not self.action_ex
parent_wf_ex = self.task_ex.workflow_execution
parent_wf_spec = spec_parser.get_workflow_spec_by_id(
parent_wf_ex.workflow_id
parent_wf_spec = spec_parser.get_workflow_spec_by_execution_id(
parent_wf_ex.id
)
task_spec = spec_parser.get_task_spec(self.task_ex.spec)
@ -465,7 +465,10 @@ class WorkflowAction(Action):
wf_spec_name
)
wf_spec = spec_parser.get_workflow_spec_by_id(wf_def.id)
wf_spec = spec_parser.get_workflow_spec_by_definition_id(
wf_def.id,
wf_def.updated_at
)
wf_params = {
'task_execution_id': self.task_ex.id,

View File

@ -97,7 +97,7 @@ def _on_action_complete(action_ex):
task = _create_task(
wf_ex,
spec_parser.get_workflow_spec_by_id(wf_ex.workflow_id),
spec_parser.get_workflow_spec_by_execution_id(wf_ex.id),
task_spec,
task_ex.in_context,
task_ex
@ -125,22 +125,24 @@ def _on_action_complete(action_ex):
def fail_task(task_ex, msg):
task = _build_task_from_execution(
spec_parser.get_workflow_spec_by_id(task_ex.workflow_id),
task_ex
wf_spec = spec_parser.get_workflow_spec_by_execution_id(
task_ex.workflow_execution_id
)
task = _build_task_from_execution(wf_spec, task_ex)
task.set_state(states.ERROR, msg)
wf_handler.fail_workflow(task_ex.workflow_execution, msg)
def continue_task(task_ex):
task = _build_task_from_execution(
spec_parser.get_workflow_spec_by_id(task_ex.workflow_id),
task_ex
wf_spec = spec_parser.get_workflow_spec_by_execution_id(
task_ex.workflow_execution_id
)
task = _build_task_from_execution(wf_spec, task_ex)
try:
task.set_state(states.RUNNING, None)
@ -166,11 +168,12 @@ def continue_task(task_ex):
def complete_task(task_ex, state, state_info):
task = _build_task_from_execution(
spec_parser.get_workflow_spec_by_id(task_ex.workflow_id),
task_ex
wf_spec = spec_parser.get_workflow_spec_by_execution_id(
task_ex.workflow_execution_id
)
task = _build_task_from_execution(wf_spec, task_ex)
try:
task.complete(state, state_info)
except exc.MistralException as e:
@ -263,9 +266,13 @@ def _check_task_start_allowed(task_ex_id):
with db_api.transaction():
task_ex = db_api.get_task_execution(task_ex_id)
wf_spec = spec_parser.get_workflow_spec_by_execution_id(
task_ex.workflow_execution_id
)
wf_ctrl = wf_base.get_controller(
task_ex.workflow_execution,
spec_parser.get_workflow_spec_by_id(task_ex.workflow_id)
wf_spec
)
if wf_ctrl.is_task_start_allowed(task_ex):

View File

@ -56,7 +56,10 @@ class Workflow(object):
def __init__(self, wf_def, wf_ex=None):
self.wf_def = wf_def
self.wf_ex = wf_ex
self.wf_spec = spec_parser.get_workflow_spec_by_id(wf_def.id)
self.wf_spec = spec_parser.get_workflow_spec_by_definition_id(
wf_def.id,
wf_def.updated_at
)
@profiler.trace('workflow-start')
def start(self, input_dict, desc='', params=None):

View File

@ -46,10 +46,6 @@ def update_workbook_v2(definition, scope='private'):
_, db_wfs = _on_workbook_update(wb_db, wb_spec)
# Once transaction has committed we need to update specification cache.
for db_wf, wf_spec in zip(db_wfs, wb_spec.get_workflows()):
spec_parser.update_workflow_cache(db_wf.id, wf_spec)
return wb_db

View File

@ -102,11 +102,6 @@ def update_workflows(definition, scope='private', identifier=None):
identifier=identifier
))
# Once transaction has committed we need to update specification cache.
for db_wf, wf_spec in zip(db_wfs, wf_list_spec.get_workflows()):
spec_parser.update_workflow_cache(db_wf.id, wf_spec)
return db_wfs

View File

@ -32,12 +32,17 @@ class SpecificationCachingTest(base.DbTestCase):
wfs = wf_service.create_workflows(wf_text)
self.assertEqual(0, spec_parser.get_workflow_spec_cache_size())
self.assertEqual(0, spec_parser.get_wf_execution_spec_cache_size())
self.assertEqual(0, spec_parser.get_wf_definition_spec_cache_size())
wf_spec = spec_parser.get_workflow_spec_by_id(wfs[0].id)
wf_spec = spec_parser.get_workflow_spec_by_definition_id(
wfs[0].id,
wfs[0].updated_at
)
self.assertIsNotNone(wf_spec)
self.assertEqual(1, spec_parser.get_workflow_spec_cache_size())
self.assertEqual(0, spec_parser.get_wf_execution_spec_cache_size())
self.assertEqual(1, spec_parser.get_wf_definition_spec_cache_size())
def test_workflow_spec_cache_update_via_workflow_service(self):
wf_text = """
@ -51,12 +56,17 @@ class SpecificationCachingTest(base.DbTestCase):
wfs = wf_service.create_workflows(wf_text)
self.assertEqual(0, spec_parser.get_workflow_spec_cache_size())
self.assertEqual(0, spec_parser.get_wf_execution_spec_cache_size())
self.assertEqual(0, spec_parser.get_wf_definition_spec_cache_size())
wf_spec = spec_parser.get_workflow_spec_by_id(wfs[0].id)
wf_spec = spec_parser.get_workflow_spec_by_definition_id(
wfs[0].id,
wfs[0].updated_at
)
self.assertEqual(1, len(wf_spec.get_tasks()))
self.assertEqual(1, spec_parser.get_workflow_spec_cache_size())
self.assertEqual(0, spec_parser.get_wf_execution_spec_cache_size())
self.assertEqual(1, spec_parser.get_wf_definition_spec_cache_size())
# Now update workflow definition and check that cache is updated too.
@ -74,12 +84,16 @@ class SpecificationCachingTest(base.DbTestCase):
wfs = wf_service.update_workflows(wf_text)
self.assertEqual(1, spec_parser.get_workflow_spec_cache_size())
self.assertEqual(1, spec_parser.get_wf_definition_spec_cache_size())
wf_spec = spec_parser.get_workflow_spec_by_id(wfs[0].id)
wf_spec = spec_parser.get_workflow_spec_by_definition_id(
wfs[0].id,
wfs[0].updated_at
)
self.assertEqual(2, len(wf_spec.get_tasks()))
self.assertEqual(1, spec_parser.get_workflow_spec_cache_size())
self.assertEqual(2, spec_parser.get_wf_definition_spec_cache_size())
self.assertEqual(0, spec_parser.get_wf_execution_spec_cache_size())
def test_workflow_spec_cache_update_via_workbook_service(self):
wb_text = """
@ -96,14 +110,19 @@ class SpecificationCachingTest(base.DbTestCase):
wb_service.create_workbook_v2(wb_text)
self.assertEqual(0, spec_parser.get_workflow_spec_cache_size())
self.assertEqual(0, spec_parser.get_wf_execution_spec_cache_size())
self.assertEqual(0, spec_parser.get_wf_definition_spec_cache_size())
wf = db_api.get_workflow_definition('wb.wf')
wf_spec = spec_parser.get_workflow_spec_by_id(wf.id)
wf_spec = spec_parser.get_workflow_spec_by_definition_id(
wf.id,
wf.updated_at
)
self.assertEqual(1, len(wf_spec.get_tasks()))
self.assertEqual(1, spec_parser.get_workflow_spec_cache_size())
self.assertEqual(0, spec_parser.get_wf_execution_spec_cache_size())
self.assertEqual(1, spec_parser.get_wf_definition_spec_cache_size())
# Now update workflow definition and check that cache is updated too.
@ -124,9 +143,16 @@ class SpecificationCachingTest(base.DbTestCase):
wb_service.update_workbook_v2(wb_text)
self.assertEqual(1, spec_parser.get_workflow_spec_cache_size())
self.assertEqual(0, spec_parser.get_wf_execution_spec_cache_size())
self.assertEqual(1, spec_parser.get_wf_definition_spec_cache_size())
wf_spec = spec_parser.get_workflow_spec_by_id(wf.id)
wf = db_api.get_workflow_definition(wf.id)
wf_spec = spec_parser.get_workflow_spec_by_definition_id(
wf.id,
wf.updated_at
)
self.assertEqual(2, len(wf_spec.get_tasks()))
self.assertEqual(1, spec_parser.get_workflow_spec_cache_size())
self.assertEqual(0, spec_parser.get_wf_execution_spec_cache_size())
self.assertEqual(2, spec_parser.get_wf_definition_spec_cache_size())

View File

@ -27,7 +27,10 @@ from mistral.workflow import states
class DirectWorkflowControllerTest(base.DbTestCase):
def _prepare_test(self, wf_text):
wfs = wf_service.create_workflows(wf_text)
wf_spec = spec_parser.get_workflow_spec_by_id(wfs[0].id)
wf_spec = spec_parser.get_workflow_spec_by_definition_id(
wfs[0].id,
wfs[0].updated_at
)
wf_ex = models.WorkflowExecution(
id='1-2-3-4',
@ -38,7 +41,8 @@ class DirectWorkflowControllerTest(base.DbTestCase):
self.wf_ex = wf_ex
self.wf_spec = wf_spec
self.wf_ctrl = d_wf.DirectWorkflowController(wf_ex)
return wf_ex
def _create_task_execution(self, name, state):
tasks_spec = self.wf_spec.get_tasks()
@ -54,8 +58,10 @@ class DirectWorkflowControllerTest(base.DbTestCase):
return task_ex
@mock.patch.object(db_api, 'get_workflow_execution')
@mock.patch.object(db_api, 'get_task_execution')
def test_continue_workflow(self, get_task_execution):
def test_continue_workflow(self, get_task_execution,
get_workflow_execution):
wf_text = """---
version: '2.0'
@ -78,16 +84,20 @@ class DirectWorkflowControllerTest(base.DbTestCase):
action: std.echo output="Hoy"
"""
self._prepare_test(wf_text)
wf_ex = self._prepare_test(wf_text)
get_workflow_execution.return_value = wf_ex
wf_ctrl = d_wf.DirectWorkflowController(wf_ex)
# Workflow execution is in initial step. No running tasks.
cmds = self.wf_ctrl.continue_workflow()
cmds = wf_ctrl.continue_workflow()
self.assertEqual(1, len(cmds))
cmd = cmds[0]
self.assertIs(self.wf_ctrl.wf_ex, cmd.wf_ex)
self.assertIs(wf_ctrl.wf_ex, cmd.wf_ex)
self.assertIsNotNone(cmd.task_spec)
self.assertEqual('task1', cmd.task_spec.get_name())
self.assertEqual(states.RUNNING, self.wf_ex.state)
@ -109,7 +119,7 @@ class DirectWorkflowControllerTest(base.DbTestCase):
)
)
cmds = self.wf_ctrl.continue_workflow()
cmds = wf_ctrl.continue_workflow()
task1_ex.processed = True
@ -131,7 +141,7 @@ class DirectWorkflowControllerTest(base.DbTestCase):
)
)
cmds = self.wf_ctrl.continue_workflow()
cmds = wf_ctrl.continue_workflow()
task2_ex.processed = True

View File

@ -12,6 +12,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import mock
from mistral.db.v2 import api as db_api
from mistral.db.v2.sqlalchemy import models
from mistral import exceptions as exc
@ -63,7 +65,6 @@ class ReverseWorkflowControllerTest(base.DbTestCase):
self.wf_ex = wf_ex
self.wb_spec = wb_spec
self.wf_ctrl = reverse_wf.ReverseWorkflowController(wf_ex)
def _create_task_execution(self, name, state):
tasks_spec = self.wb_spec.get_workflows()['wf'].get_tasks()
@ -78,29 +79,46 @@ class ReverseWorkflowControllerTest(base.DbTestCase):
return task_ex
def test_start_workflow_task2(self):
@mock.patch.object(db_api, 'get_workflow_execution')
def test_start_workflow_task2(self, get_workflow_execution):
get_workflow_execution.return_value = self.wf_ex
wf_ctrl = reverse_wf.ReverseWorkflowController(self.wf_ex)
self.wf_ex.params = {'task_name': 'task2'}
cmds = self.wf_ctrl.continue_workflow()
cmds = wf_ctrl.continue_workflow()
self.assertEqual(1, len(cmds))
self.assertEqual('task1', cmds[0].task_spec.get_name())
def test_start_workflow_task1(self):
@mock.patch.object(db_api, 'get_workflow_execution')
def test_start_workflow_task1(self, get_workflow_execution):
get_workflow_execution.return_value = self.wf_ex
wf_ctrl = reverse_wf.ReverseWorkflowController(self.wf_ex)
self.wf_ex.params = {'task_name': 'task1'}
cmds = self.wf_ctrl.continue_workflow()
cmds = wf_ctrl.continue_workflow()
self.assertEqual(1, len(cmds))
self.assertEqual('task1', cmds[0].task_spec.get_name())
def test_start_workflow_without_task(self):
self.assertRaises(
exc.WorkflowException,
self.wf_ctrl.continue_workflow
)
@mock.patch.object(db_api, 'get_workflow_execution')
def test_start_workflow_without_task(self, get_workflow_execution):
get_workflow_execution.return_value = self.wf_ex
wf_ctrl = reverse_wf.ReverseWorkflowController(self.wf_ex)
self.assertRaises(exc.WorkflowException, wf_ctrl.continue_workflow)
@mock.patch.object(db_api, 'get_workflow_execution')
def test_continue_workflow(self, get_workflow_execution):
get_workflow_execution.return_value = self.wf_ex
wf_ctrl = reverse_wf.ReverseWorkflowController(self.wf_ex)
def test_continue_workflow(self):
self.wf_ex.params = {'task_name': 'task2'}
# Assume task1 completed.
@ -115,7 +133,7 @@ class ReverseWorkflowControllerTest(base.DbTestCase):
)
)
cmds = self.wf_ctrl.continue_workflow()
cmds = wf_ctrl.continue_workflow()
task1_ex.processed = True
@ -134,7 +152,7 @@ class ReverseWorkflowControllerTest(base.DbTestCase):
)
)
cmds = self.wf_ctrl.continue_workflow()
cmds = wf_ctrl.continue_workflow()
task1_ex.processed = True

View File

@ -14,7 +14,6 @@
# limitations under the License.
from cachetools import cached
from cachetools import hashkey
from cachetools import LRUCache
from threading import RLock
import yaml
@ -35,8 +34,11 @@ V2_0 = '2.0'
ALL_VERSIONS = [V2_0]
_WF_CACHE = LRUCache(maxsize=100)
_WF_CACHE_LOCK = RLock()
_WF_EX_CACHE = LRUCache(maxsize=100)
_WF_EX_CACHE_LOCK = RLock()
_WF_DEF_CACHE = LRUCache(maxsize=100)
_WF_DEF_CACHE_LOCK = RLock()
def parse_yaml(text):
@ -108,9 +110,9 @@ def get_workflow_spec(spec_dict):
"""Get workflow specification object from dictionary.
NOTE: For large workflows this method can work very long (seconds).
For this reason, method 'get_workflow_spec_by_id' should be used
whenever possible because it caches specification objects by
workflow definition id.
For this reason, method 'get_workflow_spec_by_definition_id' or
'get_workflow_spec_by_execution_id' should be used whenever possible
because they cache specification objects.
:param spec_dict: Raw specification dictionary.
"""
@ -188,9 +190,43 @@ def _parse_def_from_wb(wb_def, section_name, item_name):
# Methods for obtaining specifications in a more efficient way using
# caching techniques.
@cached(_WF_EX_CACHE, lock=_WF_EX_CACHE_LOCK)
def get_workflow_spec_by_execution_id(wf_ex_id):
"""Gets workflow specification by workflow execution id.
@cached(_WF_CACHE, lock=_WF_CACHE_LOCK)
def get_workflow_spec_by_id(wf_def_id):
The idea is that when a workflow execution is running we
must be getting the same workflow specification even if
:param wf_ex_id: Workflow execution id.
:return: Workflow specification.
"""
if not wf_ex_id:
return None
wf_ex = db_api.get_workflow_execution(wf_ex_id)
return get_workflow_spec(wf_ex.spec)
@cached(_WF_DEF_CACHE, lock=_WF_DEF_CACHE_LOCK)
def get_workflow_spec_by_definition_id(wf_def_id, wf_def_updated_at):
"""Gets specification by workflow definition id and its 'updated_at'.
The idea of this method is to return a cached specification for the
given workflow id and workflow definition 'updated_at'. As long as the
given workflow definition remains the same in DB users of this method
will be getting a cached value. Once the workflow definition has
changed clients will be providing a different 'updated_at' value and
hence this method will be called and spec is updated for this combination
of parameters. Old cached values will be kicked out by LRU algorithm
if the cache runs out of space.
:param wf_def_id: Workflow definition id.
:param wf_def_updated_at: Workflow definition 'updated_at' value. It
serves only as part of cache key and is not explicitly used in the
method.
:return: Workflow specification.
"""
if not wf_def_id:
return None
@ -199,16 +235,18 @@ def get_workflow_spec_by_id(wf_def_id):
return get_workflow_spec(wf_def.spec)
def get_workflow_spec_cache_size():
return len(_WF_CACHE)
def get_wf_execution_spec_cache_size():
return len(_WF_EX_CACHE)
def get_wf_definition_spec_cache_size():
return len(_WF_DEF_CACHE)
def clear_caches():
"""Clears all specification caches."""
_WF_CACHE.clear()
with _WF_EX_CACHE_LOCK:
_WF_EX_CACHE.clear()
def update_workflow_cache(wf_def_id, spec):
with _WF_CACHE_LOCK:
# We have to use hashkey function because @cached uses it implicitly.
_WF_CACHE[hashkey(wf_def_id)] = spec
with _WF_DEF_CACHE_LOCK:
_WF_DEF_CACHE.clear()

View File

@ -43,7 +43,7 @@ def get_controller(wf_ex, wf_spec=None):
"""
if not wf_spec:
wf_spec = spec_parser.get_workflow_spec_by_id(wf_ex.workflow_id)
wf_spec = spec_parser.get_workflow_spec_by_execution_id(wf_ex.id)
wf_type = wf_spec.get_type()
@ -81,7 +81,7 @@ class WorkflowController(object):
self.wf_ex = wf_ex
if wf_spec is None:
wf_spec = spec_parser.get_workflow_spec_by_id(wf_ex.workflow_id)
wf_spec = spec_parser.get_workflow_spec_by_execution_id(wf_ex.id)
self.wf_spec = wf_spec