diff --git a/murano/common/config.py b/murano/common/config.py index d3d34e62..fd69ca93 100644 --- a/murano/common/config.py +++ b/murano/common/config.py @@ -220,7 +220,7 @@ packages_opts = [ cfg.StrOpt('packages_cache', help='Location (directory) for Murano package cache.'), - cfg.BoolOpt('enable_package_cache', default=True, + cfg.BoolOpt('enable_packages_cache', default=True, help=_('Enables murano-engine to persist on disk ' 'packages downloaded during deployments. ' 'The packages would be re-used for consequent ' diff --git a/murano/engine/package_loader.py b/murano/engine/package_loader.py index dad42c6b..df8cf02f 100644 --- a/murano/engine/package_loader.py +++ b/murano/engine/package_loader.py @@ -14,7 +14,7 @@ # limitations under the License. import collections -import errno +import itertools import os import os.path import shutil @@ -24,7 +24,6 @@ import uuid import eventlet from muranoclient.common import exceptions as muranoclient_exc -from oslo_concurrency import lockutils from oslo_config import cfg from oslo_log import log as logging import six @@ -39,11 +38,13 @@ from murano.engine.system import system_objects from murano.engine import yaql_yaml_loader from murano.packages import exceptions as pkg_exc from murano.packages import load_utils +from murano import utils as m_utils CONF = cfg.CONF LOG = logging.getLogger(__name__) -download_greenlocks = collections.defaultdict(lockutils.ReaderWriterLock) +download_mem_locks = collections.defaultdict(m_utils.ReaderWriterLock) +usage_mem_locks = collections.defaultdict(m_utils.ReaderWriterLock) class ApiPackageLoader(package_loader.MuranoPackageLoader): @@ -55,6 +56,10 @@ class ApiPackageLoader(package_loader.MuranoPackageLoader): self._package_cache = {} self._root_loader = root_loader or self + self._mem_locks = [] + self._ipc_locks = [] + self._downloaded = [] + def load_class_package(self, class_name, version_spec): packages = self._class_cache.get(class_name) if packages: @@ -67,6 +72,7 @@ class ApiPackageLoader(package_loader.MuranoPackageLoader): version_spec)} try: package_definition = self._get_definition(filter_opts) + self._lock_usage(package_definition) except LookupError: exc_info = sys.exc_info() raise (exceptions.NoPackageForClassFound(class_name), @@ -86,6 +92,7 @@ class ApiPackageLoader(package_loader.MuranoPackageLoader): version_spec)} try: package_definition = self._get_definition(filter_opts) + self._lock_usage(package_definition) except LookupError: exc_info = sys.exc_info() six.reraise(exceptions.NoPackageFound(package_name), @@ -106,21 +113,14 @@ class ApiPackageLoader(package_loader.MuranoPackageLoader): os.path.join(tempfile.gettempdir(), 'murano-packages-cache') ) - if CONF.packages_opts.enable_package_cache: + if CONF.packages_opts.enable_packages_cache: directory = os.path.abspath(base_directory) else: directory = os.path.abspath(os.path.join(base_directory, uuid.uuid4().hex)) if not os.path.isdir(directory): - # NOTE(kzaitsev): in case we want packages to persist on - # disk and subsequent call to makedirs might fail if 2+ loaders - # from different processes would attempt to call it simultaneously - try: - os.makedirs(directory) - except OSError as e: - if e.errno != errno.EEXIST: - raise + m_utils.ensure_tree(directory) LOG.debug('Cache for package loader is located at: {dir}'.format( dir=directory)) @@ -163,23 +163,25 @@ class ApiPackageLoader(package_loader.MuranoPackageLoader): package_directory = os.path.join( self._cache_directory, package_def.fully_qualified_name, + getattr(package_def, 'version', '0.0.0'), package_id) if os.path.isdir(package_directory): try: return load_utils.load_from_dir(package_directory) except pkg_exc.PackageLoadError: - LOG.error(_LE('Unable to load package from cache. Clean-up.')) + LOG.exception( + _LE('Unable to load package from cache. Clean-up.')) shutil.rmtree(package_directory, ignore_errors=True) # the package is not yet in cache, let's try and download it. - download_flock_path = os.path.join( + download_lock_path = os.path.join( self._cache_directory, '{}_download.lock'.format(package_id)) - download_flock = lockutils.InterProcessLock( - path=download_flock_path, sleep_func=eventlet.sleep) + download_ipc_lock = m_utils.ExclusiveInterProcessLock( + path=download_lock_path, sleep_func=eventlet.sleep) - with download_greenlocks[package_id].write_lock(),\ - download_flock: + with download_mem_locks[package_id].write_lock(),\ + download_ipc_lock: # NOTE(kzaitsev): # in case there were 2 concurrent threads/processes one might have @@ -216,6 +218,11 @@ class ApiPackageLoader(package_loader.MuranoPackageLoader): LOG.info(_LI( "Successfully downloaded and unpacked package {} {}") .format(package_def.fully_qualified_name, package_id)) + self._downloaded.append(app_package) + + self.try_cleanup_cache( + os.path.split(package_directory)[0], + current_id=package_id) return app_package except IOError: msg = 'Unable to extract package data for %s' % package_id @@ -228,6 +235,42 @@ class ApiPackageLoader(package_loader.MuranoPackageLoader): except OSError: pass + def try_cleanup_cache(self, package_directory=None, current_id=None): + if not package_directory: + return + + pkg_ids_listed = set() + try: + pkg_ids_listed = set(os.listdir(package_directory)) + except OSError: + # No directory for this package, probably someone + # already deleted everything. Anyway nothing to delete + return + + # if current_id was given: ensure it's not checked for removal + pkg_ids_listed -= {current_id} + + for pkg_id in pkg_ids_listed: + stale_directory = os.path.join( + package_directory, + pkg_id) + if os.path.isdir(package_directory): + + usage_lock_path = os.path.join( + self._cache_directory, + '{}_usage.lock'.format(current_id)) + ipc_lock = m_utils.ExclusiveInterProcessLock( + path=usage_lock_path, sleep_func=eventlet.sleep) + + with usage_mem_locks[pkg_id].write_lock(False) as acquired: + if not acquired: + continue + acquired_ipc_lock = ipc_lock.acquire(blocking=False) + if acquired_ipc_lock: + shutil.rmtree(stale_directory, + ignore_errors=True) + ipc_lock.release() + def _get_best_package_match(self, packages): public = None other = [] @@ -243,9 +286,43 @@ class ApiPackageLoader(package_loader.MuranoPackageLoader): elif other: return other[0] + def _lock_usage(self, package_definition): + """Locks package for usage""" + + # do not lock anything if we do not persist packages on disk + if not CONF.packages_opts.enable_packages_cache: + return + + # A work around the fact that read_lock only supports `with` syntax. + mem_lock = _with_to_generator( + usage_mem_locks[package_definition].read_lock()) + + package_id = package_definition.id + usage_lock_path = os.path.join(self._cache_directory, + '{}_usage.lock'.format(package_id)) + ipc_lock = m_utils.SharedInterProcessLock( + path=usage_lock_path, + sleep_func=eventlet.sleep + ) + ipc_lock = _with_to_generator(ipc_lock) + + next(mem_lock) + next(ipc_lock) + self._mem_locks.append(mem_lock) + self._ipc_locks.append(ipc_lock) + def cleanup(self): - if not CONF.packages_opts.enable_package_cache: + """Cleans up any lock we had acquired and removes any stale packages""" + + if not CONF.packages_opts.enable_packages_cache: shutil.rmtree(self._cache_directory, ignore_errors=True) + return + + for lock in itertools.chain(self._mem_locks, self._ipc_locks): + try: + next(lock) + except StopIteration: + continue def __enter__(self): return self @@ -338,6 +415,10 @@ class DirectoryPackageLoader(package_loader.MuranoPackageLoader): packages.add(folder) yield folder + def cleanup(self): + """A stub for possible cleanup""" + pass + class CombinedPackageLoader(package_loader.MuranoPackageLoader): def __init__(self, murano_client_factory, tenant_id, root_loader=None): @@ -376,12 +457,24 @@ class CombinedPackageLoader(package_loader.MuranoPackageLoader): return self def __exit__(self, exc_type, exc_val, exc_tb): - self.api_loader.cleanup() + self.cleanup() return False + def cleanup(self): + """Calls cleanup method of all loaders we combine""" + self.api_loader.cleanup() + for d_loader in self.directory_loaders: + d_loader.cleanup() + def get_class(package, name): version = package.runtime_version loader = yaql_yaml_loader.get_loader(version) contents, file_id = package.get_class(name) return loader(contents, file_id) + + +def _with_to_generator(context_obj): + with context_obj as obj: + yield obj + yield diff --git a/murano/tests/unit/engine/test_package_loader.py b/murano/tests/unit/engine/test_package_loader.py index 320de767..8a82a7a8 100644 --- a/murano/tests/unit/engine/test_package_loader.py +++ b/murano/tests/unit/engine/test_package_loader.py @@ -11,6 +11,8 @@ # under the License. import os +import shutil +import tempfile import mock from oslo_config import cfg @@ -19,10 +21,106 @@ import semantic_version from murano.dsl import murano_package as dsl_package from murano.engine import package_loader from murano.tests.unit import base +from murano_tempest_tests import utils CONF = cfg.CONF +class TestPackageCache(base.MuranoTestCase): + + def setUp(self): + super(TestPackageCache, self).setUp() + + self.location = tempfile.mkdtemp() + CONF.set_override('enable_packages_cache', True, 'packages_opts') + self.old_location = CONF.packages_opts.packages_cache + CONF.set_override('packages_cache', self.location, 'packages_opts') + + self.murano_client = mock.MagicMock() + self.murano_client_factory = mock.MagicMock( + return_value=self.murano_client) + self.loader = package_loader.ApiPackageLoader( + self.murano_client_factory, 'test_tenant_id') + + def tearDown(self): + CONF.set_override('packages_cache', self.old_location, 'packages_opts') + shutil.rmtree(self.location, ignore_errors=True) + super(TestPackageCache, self).tearDown() + + def test_load_package(self): + fqn = 'io.murano.apps.test' + path, name = utils.compose_package( + 'test', + os.path.join(self.location, 'manifest.yaml'), + self.location, archive_dir=self.location) + with open(path) as f: + package_data = f.read() + spec = semantic_version.Spec('*') + + old_id, new_id = '123', '456' + package = mock.MagicMock() + package.fully_qualified_name = fqn + package.id = old_id + package.version = '0.0.1' + + self.murano_client.packages.filter = mock.MagicMock( + return_value=[package]) + self.murano_client.packages.download = mock.MagicMock( + return_value=package_data) + + # load the package + self.loader.load_class_package(fqn, spec) + + # assert that everything got created + self.assertTrue(os.path.isdir(os.path.join( + self.location, fqn))) + self.assertTrue(os.path.isdir(os.path.join( + self.location, fqn, package.version))) + self.assertTrue(os.path.isdir(os.path.join( + self.location, fqn, package.version, old_id))) + self.assertTrue(os.path.isfile(os.path.join( + self.location, fqn, package.version, old_id, 'manifest.yaml'))) + + # assert, that we called download + self.assertEqual(self.murano_client.packages.download.call_count, 1) + + # now that the cache is in place, call it for the 2d time + self.loader._package_cache = {} + self.loader._class_cache = {} + self.loader.load_class_package(fqn, spec) + + # check that we didn't download a thing + self.assertEqual(self.murano_client.packages.download.call_count, 1) + + # changing id, new package would be downloaded. + package.id = new_id + self.loader._package_cache = {} + self.loader._class_cache = {} + self.loader.load_class_package(fqn, spec) + + # check that we didn't download a thing + self.assertEqual(self.murano_client.packages.download.call_count, 2) + + # check that old directories got deleted + self.assertFalse(os.path.isdir(os.path.join( + self.location, fqn, package.version, old_id))) + + # check that new directories got created correctly + self.assertTrue(os.path.isdir(os.path.join( + self.location, fqn))) + self.assertTrue(os.path.isdir(os.path.join( + self.location, fqn, package.version))) + self.assertTrue(os.path.isdir(os.path.join( + self.location, fqn, package.version, new_id))) + self.assertTrue(os.path.isfile(os.path.join( + self.location, fqn, package.version, new_id, 'manifest.yaml'))) + + self.assertTrue(os.path.isdir(os.path.join( + self.location, fqn, package.version))) + self.assertTrue(os.path.isdir(os.path.join( + self.location, fqn, package.version, new_id))) + + class TestCombinedPackageLoader(base.MuranoTestCase): @classmethod def setUpClass(cls): diff --git a/murano/utils.py b/murano/utils.py index a4e740e4..061cbfb4 100644 --- a/murano/utils.py +++ b/murano/utils.py @@ -12,8 +12,13 @@ # License for the specific language governing permissions and limitations # under the License. +import contextlib +import errno +import fcntl import functools +import os +from oslo_concurrency import lockutils from oslo_log import log as logging from webob import exc @@ -124,3 +129,103 @@ def verify_session(func): raise exc.HTTPForbidden(explanation=msg) return func(self, request, *args, **kwargs) return __inner + + +def ensure_tree(path): + """Create a directory (and any ancestor directories required). + + :param path: Directory to create + :return: bool, whether the directory has actually been created + """ + try: + os.makedirs(path) + except OSError as e: + if e.errno == errno.EEXIST: + if not os.path.isdir(path): + raise + else: + return False + elif e.errno == errno.EISDIR: + return False + else: + raise + else: + return True + + +ExclusiveInterProcessLock = lockutils.InterProcessLock +if os.name == 'nt': + # no shared locks on windows + SharedInterProcessLock = lockutils.InterProcessLock +else: + + class SharedInterProcessLock(lockutils.InterProcessLock): + def trylock(self): + # LOCK_EX instead of LOCK_EX + fcntl.lockf(self.lockfile, fcntl.LOCK_SH | fcntl.LOCK_NB) + + def _do_open(self): + # the file has to be open in read mode, therefore this method has + # to be overriden + basedir = os.path.dirname(self.path) + if basedir: + made_basedir = ensure_tree(basedir) + if made_basedir: + self.logger.debug( + 'Created lock base path `%s`', basedir) + # The code here is mostly copy-pasted from oslo_concurrency and + # fasteners. The file has to be open with read permissions to be + # suitable for shared locking + if self.lockfile is None or self.lockfile.closed: + try: + # ensure the file is there, but do not obtain an extra file + # descriptor, as closing it would release fcntl lock + fd = os.open(self.path, os.O_CREAT | os.O_EXCL) + os.close(fd) + except OSError: + pass + self.lockfile = open(self.path, 'r') + + +class ReaderWriterLock(lockutils.ReaderWriterLock): + + @contextlib.contextmanager + def write_lock(self, blocking=True): + """Context manager that grants a write lock. + Will wait until no active readers. Blocks readers after acquiring. + Raises a ``RuntimeError`` if an active reader attempts to acquire + a lock. + """ + timeout = None if blocking else 0.00001 + me = self._current_thread() + i_am_writer = self.is_writer(check_pending=False) + if self.is_reader() and not i_am_writer: + raise RuntimeError("Reader %s to writer privilege" + " escalation not allowed" % me) + if i_am_writer: + # Already the writer; this allows for basic reentrancy. + yield self + else: + with self._cond: + self._pending_writers.append(me) + while True: + # No readers, and no active writer, am I next?? + if len(self._readers) == 0 and self._writer is None: + if self._pending_writers[0] == me: + self._writer = self._pending_writers.popleft() + break + + # NOTE(kzaitsev): this actually means, that we can wait + # more than timeout times, since if we get True value we + # would get another spin inside of the while loop + # Should we do anything about it? + acquired = self._cond.wait(timeout) + if not acquired: + yield False + return + try: + yield True + finally: + with self._cond: + self._writer = None + self._cond.notify_all() diff --git a/releasenotes/notes/package_cache-68495dcde223c167.yaml b/releasenotes/notes/package_cache-68495dcde223c167.yaml new file mode 100644 index 00000000..9a64cb82 --- /dev/null +++ b/releasenotes/notes/package_cache-68495dcde223c167.yaml @@ -0,0 +1,7 @@ +--- +features: + - Murano engine is now capable of caching packages on disk for reuse. + This is controlled by `packages_cache` directory path and boolean parameter + `enable_packages_cache` (true by default). The packages are cached in a + eventlet/inter-process safe manner and are cleaned up as soon as newer + version of tha package available (unless it's used by ongoing deployment)