Merge "Move action caching to db layer"
This commit is contained in:
commit
ea4681500a
|
@ -13,8 +13,11 @@
|
|||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import cachetools
|
||||
import contextlib
|
||||
import threading
|
||||
|
||||
from oslo_config import cfg
|
||||
from oslo_db import api as db_api
|
||||
|
||||
|
||||
|
@ -24,6 +27,15 @@ _BACKEND_MAPPING = {
|
|||
|
||||
IMPL = db_api.DBAPI('sqlalchemy', backend_mapping=_BACKEND_MAPPING)
|
||||
|
||||
CONF = cfg.CONF
|
||||
|
||||
_ACTION_DEF_CACHE = cachetools.TTLCache(
|
||||
maxsize=1000,
|
||||
ttl=CONF.engine.action_definition_cache_time # 60 seconds by default
|
||||
)
|
||||
|
||||
_ACTION_DEF_CACHE_LOCK = threading.RLock()
|
||||
|
||||
|
||||
def setup_db():
|
||||
IMPL.setup_db()
|
||||
|
@ -179,7 +191,20 @@ def get_action_definition(name, fields=()):
|
|||
|
||||
def load_action_definition(name, fields=()):
|
||||
"""Unlike get_action_definition this method is allowed to return None."""
|
||||
return IMPL.load_action_definition(name, fields=fields)
|
||||
with _ACTION_DEF_CACHE_LOCK:
|
||||
action_def = _ACTION_DEF_CACHE.get(name)
|
||||
|
||||
if action_def:
|
||||
return action_def
|
||||
|
||||
action_def = IMPL.load_action_definition(name, fields=fields)
|
||||
|
||||
with _ACTION_DEF_CACHE_LOCK:
|
||||
_ACTION_DEF_CACHE[name] = (
|
||||
action_def.get_clone() if action_def else None
|
||||
)
|
||||
|
||||
return action_def
|
||||
|
||||
|
||||
def get_action_definitions(limit=None, marker=None, sort_keys=None,
|
||||
|
|
|
@ -14,10 +14,7 @@
|
|||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import threading
|
||||
|
||||
import abc
|
||||
import cachetools
|
||||
from oslo_config import cfg
|
||||
from oslo_log import log as logging
|
||||
from osprofiler import profiler
|
||||
|
@ -45,36 +42,6 @@ LOG = logging.getLogger(__name__)
|
|||
CONF = cfg.CONF
|
||||
|
||||
|
||||
_ACTION_DEF_CACHE = cachetools.TTLCache(
|
||||
maxsize=1000,
|
||||
ttl=CONF.engine.action_definition_cache_time # 60 seconds by default
|
||||
)
|
||||
|
||||
_ACTION_DEF_CACHE_LOCK = threading.RLock()
|
||||
|
||||
|
||||
def _find_action_definition_by_name(action_name):
|
||||
"""Find action definition name.
|
||||
|
||||
:param action_name: Action name.
|
||||
:return: Action definition (possibly a cached value).
|
||||
"""
|
||||
with _ACTION_DEF_CACHE_LOCK:
|
||||
action_def = _ACTION_DEF_CACHE.get(action_name)
|
||||
|
||||
if action_def:
|
||||
return action_def
|
||||
|
||||
action_def = db_api.load_action_definition(action_name)
|
||||
|
||||
with _ACTION_DEF_CACHE_LOCK:
|
||||
_ACTION_DEF_CACHE[action_name] = (
|
||||
action_def.get_clone() if action_def else None
|
||||
)
|
||||
|
||||
return action_def
|
||||
|
||||
|
||||
@six.add_metaclass(abc.ABCMeta)
|
||||
class Action(object):
|
||||
"""Action.
|
||||
|
@ -426,7 +393,7 @@ class AdHocAction(PythonAction):
|
|||
wf_ctx=None):
|
||||
self.action_spec = spec_parser.get_action_spec(action_def.spec)
|
||||
|
||||
base_action_def = _find_action_definition_by_name(
|
||||
base_action_def = db_api.load_action_definition(
|
||||
self.action_spec.get_base()
|
||||
)
|
||||
|
||||
|
@ -704,10 +671,10 @@ def resolve_action_definition(action_spec_name, wf_name=None,
|
|||
|
||||
action_full_name = "%s.%s" % (wb_name, action_spec_name)
|
||||
|
||||
action_db = _find_action_definition_by_name(action_full_name)
|
||||
action_db = db_api.load_action_definition(action_full_name)
|
||||
|
||||
if not action_db:
|
||||
action_db = _find_action_definition_by_name(action_spec_name)
|
||||
action_db = db_api.load_action_definition(action_spec_name)
|
||||
|
||||
if not action_db:
|
||||
raise exc.InvalidActionException(
|
||||
|
|
|
@ -30,7 +30,6 @@ from mistral import context as auth_context
|
|||
from mistral.db.sqlalchemy import base as db_sa_base
|
||||
from mistral.db.sqlalchemy import sqlite_lock
|
||||
from mistral.db.v2 import api as db_api
|
||||
from mistral.engine import actions
|
||||
from mistral.lang import parser as spec_parser
|
||||
from mistral.services import action_manager
|
||||
from mistral.services import security
|
||||
|
@ -279,7 +278,7 @@ class DbTestCase(BaseTest):
|
|||
action_manager.sync_db()
|
||||
|
||||
def _clean_db(self):
|
||||
actions._ACTION_DEF_CACHE.clear()
|
||||
db_api._ACTION_DEF_CACHE.clear()
|
||||
|
||||
contexts = [
|
||||
get_context(default=False),
|
||||
|
|
|
@ -18,7 +18,6 @@ import cachetools
|
|||
from oslo_config import cfg
|
||||
|
||||
from mistral.db.v2 import api as db_api
|
||||
from mistral.engine import actions
|
||||
from mistral.services import actions as action_service
|
||||
from mistral.services import workflows as wf_service
|
||||
from mistral.tests.unit.engine import base
|
||||
|
@ -80,7 +79,7 @@ class LookupUtilsTest(base.EngineTestCase):
|
|||
ttl=5 # 5 seconds
|
||||
)
|
||||
cache_patch = mock.patch.object(
|
||||
actions, '_ACTION_DEF_CACHE', new_cache)
|
||||
db_api, '_ACTION_DEF_CACHE', new_cache)
|
||||
cache_patch.start()
|
||||
self.addCleanup(cache_patch.stop)
|
||||
|
||||
|
@ -90,24 +89,24 @@ class LookupUtilsTest(base.EngineTestCase):
|
|||
self.await_workflow_paused(wf_ex.id)
|
||||
|
||||
# Check that 'action1' 'echo' and 'noop' are cached.
|
||||
self.assertEqual(3, len(actions._ACTION_DEF_CACHE))
|
||||
self.assertIn('action1', actions._ACTION_DEF_CACHE)
|
||||
self.assertIn('std.noop', actions._ACTION_DEF_CACHE)
|
||||
self.assertIn('std.echo', actions._ACTION_DEF_CACHE)
|
||||
self.assertEqual(3, len(db_api._ACTION_DEF_CACHE))
|
||||
self.assertIn('action1', db_api._ACTION_DEF_CACHE)
|
||||
self.assertIn('std.noop', db_api._ACTION_DEF_CACHE)
|
||||
self.assertIn('std.echo', db_api._ACTION_DEF_CACHE)
|
||||
|
||||
# Wait some time until cache expires
|
||||
self._await(
|
||||
lambda: len(actions._ACTION_DEF_CACHE) == 0,
|
||||
lambda: len(db_api._ACTION_DEF_CACHE) == 0,
|
||||
fail_message="No triggers were found"
|
||||
)
|
||||
|
||||
self.assertEqual(0, len(actions._ACTION_DEF_CACHE))
|
||||
self.assertEqual(0, len(db_api._ACTION_DEF_CACHE))
|
||||
|
||||
self.engine.resume_workflow(wf_ex.id)
|
||||
|
||||
self.await_workflow_success(wf_ex.id)
|
||||
|
||||
# Check all actions are cached again.
|
||||
self.assertEqual(2, len(actions._ACTION_DEF_CACHE))
|
||||
self.assertIn('action1', actions._ACTION_DEF_CACHE)
|
||||
self.assertIn('std.echo', actions._ACTION_DEF_CACHE)
|
||||
self.assertEqual(2, len(db_api._ACTION_DEF_CACHE))
|
||||
self.assertIn('action1', db_api._ACTION_DEF_CACHE)
|
||||
self.assertIn('std.echo', db_api._ACTION_DEF_CACHE)
|
||||
|
|
Loading…
Reference in New Issue