Merge "Cache action definitions"

This commit is contained in:
Zuul 2018-02-15 10:14:19 +00:00 committed by Gerrit Code Review
commit 1f83e5d5a2
5 changed files with 153 additions and 13 deletions

View File

@ -172,6 +172,12 @@ engine_opts = [
' will be restored automatically. If this property is' ' will be restored automatically. If this property is'
' set to a negative value Mistral will never be doing ' ' set to a negative value Mistral will never be doing '
' this check.') ' this check.')
),
cfg.IntOpt(
'action_definition_cache_time',
default=60,
help=_('A number of seconds that indicates how long action '
'definitions should be stored in the local cache.')
) )
] ]

View File

@ -32,6 +32,7 @@ from mistral.services import security
from mistral import utils from mistral import utils
from mistral.utils import wf_trace from mistral.utils import wf_trace
from mistral.workflow import data_flow from mistral.workflow import data_flow
from mistral.workflow import lookup_utils
from mistral.workflow import states from mistral.workflow import states
from mistral_lib import actions as ml_actions from mistral_lib import actions as ml_actions
@ -367,11 +368,11 @@ class AdHocAction(PythonAction):
wf_ctx=None): wf_ctx=None):
self.action_spec = spec_parser.get_action_spec(action_def.spec) self.action_spec = spec_parser.get_action_spec(action_def.spec)
try: base_action_def = lookup_utils.find_action_definition_by_name(
base_action_def = db_api.get_action_definition( self.action_spec.get_base()
self.action_spec.get_base() )
)
except exc.DBEntityNotFoundError: if not base_action_def:
raise exc.InvalidActionException( raise exc.InvalidActionException(
"Failed to find action [action_name=%s]" % "Failed to find action [action_name=%s]" %
self.action_spec.get_base() self.action_spec.get_base()
@ -607,10 +608,14 @@ def resolve_action_definition(action_spec_name, wf_name=None,
action_full_name = "%s.%s" % (wb_name, action_spec_name) action_full_name = "%s.%s" % (wb_name, action_spec_name)
action_db = db_api.load_action_definition(action_full_name) action_db = lookup_utils.find_action_definition_by_name(
action_full_name
)
if not action_db: if not action_db:
action_db = db_api.load_action_definition(action_spec_name) action_db = lookup_utils.find_action_definition_by_name(
action_spec_name
)
if not action_db: if not action_db:
raise exc.InvalidActionException( raise exc.InvalidActionException(

View File

@ -12,9 +12,13 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
import time
import cachetools
from oslo_config import cfg from oslo_config import cfg
from mistral.db.v2 import api as db_api from mistral.db.v2 import api as db_api
from mistral.services import actions as action_service
from mistral.services import workflows as wf_service from mistral.services import workflows as wf_service
from mistral.tests.unit.engine import base from mistral.tests.unit.engine import base
from mistral.workflow import lookup_utils from mistral.workflow import lookup_utils
@ -80,3 +84,81 @@ class LookupUtilsTest(base.EngineTestCase):
# Expecting that the cache size is 0 because the workflow has # Expecting that the cache size is 0 because the workflow has
# finished and invalidated corresponding cache entry. # finished and invalidated corresponding cache entry.
self.assertEqual(0, lookup_utils.get_task_execution_cache_size()) self.assertEqual(0, lookup_utils.get_task_execution_cache_size())
def test_action_definition_cache_ttl(self):
action = """---
version: '2.0'
action1:
base: std.echo output='Hi'
output:
result: $
"""
wf_text = """---
version: '2.0'
wf:
tasks:
task1:
action: action1
on-success: join_task
task2:
action: action1
on-success: join_task
join_task:
join: all
on-success: task4
task4:
action: action1
pause-before: true
"""
wf_service.create_workflows(wf_text)
# Create an action.
db_actions = action_service.create_actions(action)
self.assertEqual(1, len(db_actions))
self._assert_single_item(db_actions, name='action1')
# Explicitly mark the action to be deleted after the test execution.
self.addCleanup(db_api.delete_action_definitions, name='action1')
# Reinitialise the cache with reduced action_definition_cache_time
# to make the test faster.
# Save the existing cache into a temporary variable and restore
# the value when the test passed.
old_cache = lookup_utils._ACTION_DEF_CACHE
lookup_utils._ACTION_DEF_CACHE = cachetools.TTLCache(
maxsize=1000,
ttl=5 # 5 seconds
)
self.addCleanup(setattr, lookup_utils, '_ACTION_DEF_CACHE', old_cache)
# Start workflow.
wf_ex = self.engine.start_workflow('wf')
self.await_workflow_paused(wf_ex.id)
# Check that 'action1' 'echo' and 'noop' are cached.
self.assertEqual(3, lookup_utils.get_action_definition_cache_size())
self.assertIn('action1', lookup_utils._ACTION_DEF_CACHE)
self.assertIn('std.noop', lookup_utils._ACTION_DEF_CACHE)
self.assertIn('std.echo', lookup_utils._ACTION_DEF_CACHE)
# Wait some time until cache expires
time.sleep(7)
self.assertEqual(0, lookup_utils.get_action_definition_cache_size())
self.engine.resume_workflow(wf_ex.id)
self.await_workflow_success(wf_ex.id)
# Check all actions are cached again.
self.assertEqual(2, lookup_utils.get_action_definition_cache_size())
self.assertIn('action1', lookup_utils._ACTION_DEF_CACHE)
self.assertIn('std.echo', lookup_utils._ACTION_DEF_CACHE)

View File

@ -29,13 +29,18 @@ Mostly, they are useful for doing any kind of fast lookups with in order
to make some decision based on their state. to make some decision based on their state.
""" """
import cachetools
import threading import threading
import cachetools
from oslo_config import cfg
from mistral.db.v2 import api as db_api from mistral.db.v2 import api as db_api
from mistral.workflow import states from mistral.workflow import states
CONF = cfg.CONF
def _create_lru_cache_for_workflow_execution(wf_ex_id): def _create_lru_cache_for_workflow_execution(wf_ex_id):
return cachetools.LRUCache(maxsize=500) return cachetools.LRUCache(maxsize=500)
@ -49,7 +54,33 @@ _TASK_EX_CACHE = cachetools.LRUCache(
missing=_create_lru_cache_for_workflow_execution missing=_create_lru_cache_for_workflow_execution
) )
_CACHE_LOCK = threading.RLock() _ACTION_DEF_CACHE = cachetools.TTLCache(
maxsize=1000,
ttl=CONF.engine.action_definition_cache_time # 60 seconds by default
)
_TASK_EX_CACHE_LOCK = threading.RLock()
_ACTION_DEF_CACHE_LOCK = threading.RLock()
def find_action_definition_by_name(action_name):
"""Find action definition name.
:param action_name: Action name.
:return: Action definition (possibly a cached value).
"""
with _ACTION_DEF_CACHE_LOCK:
action_definition = _ACTION_DEF_CACHE.get(action_name)
if action_definition:
return action_definition
action_definition = db_api.load_action_definition(action_name)
with _ACTION_DEF_CACHE_LOCK:
_ACTION_DEF_CACHE[action_name] = action_definition
return action_definition
def find_task_executions_by_name(wf_ex_id, task_name): def find_task_executions_by_name(wf_ex_id, task_name):
@ -59,7 +90,7 @@ def find_task_executions_by_name(wf_ex_id, task_name):
:param task_name: Task name. :param task_name: Task name.
:return: Task executions (possibly a cached value). :return: Task executions (possibly a cached value).
""" """
with _CACHE_LOCK: with _TASK_EX_CACHE_LOCK:
t_execs = _TASK_EX_CACHE[wf_ex_id].get(task_name) t_execs = _TASK_EX_CACHE[wf_ex_id].get(task_name)
if t_execs: if t_execs:
@ -78,7 +109,7 @@ def find_task_executions_by_name(wf_ex_id, task_name):
) )
if all_finished: if all_finished:
with _CACHE_LOCK: with _TASK_EX_CACHE_LOCK:
_TASK_EX_CACHE[wf_ex_id][task_name] = t_execs _TASK_EX_CACHE[wf_ex_id][task_name] = t_execs
return t_execs return t_execs
@ -124,12 +155,19 @@ def get_task_execution_cache_size():
return len(_TASK_EX_CACHE) return len(_TASK_EX_CACHE)
def get_action_definition_cache_size():
return len(_ACTION_DEF_CACHE)
def invalidate_cached_task_executions(wf_ex_id): def invalidate_cached_task_executions(wf_ex_id):
with _CACHE_LOCK: with _TASK_EX_CACHE_LOCK:
if wf_ex_id in _TASK_EX_CACHE: if wf_ex_id in _TASK_EX_CACHE:
del _TASK_EX_CACHE[wf_ex_id] del _TASK_EX_CACHE[wf_ex_id]
def clear_caches(): def clear_caches():
with _CACHE_LOCK: with _TASK_EX_CACHE_LOCK:
_TASK_EX_CACHE.clear() _TASK_EX_CACHE.clear()
with _ACTION_DEF_CACHE_LOCK:
_ACTION_DEF_CACHE.clear()

View File

@ -0,0 +1,9 @@
---
features:
- |
Enable caching of action definitions in local memory. Now, instead of
downloading the definitions from the database every time, mistral engine
will store them in a local cache. This should reduce the number of
database requests and improve the whole performance of the the system.
Cache ttl can be configured with ``action_definition_cache_time`` option
from [engine] group. The default value is 60 seconds.