On-demand garbage collection

Garbage collection is triggered by io.murano.system.GC.collect()
static method. Garbage collector destroys all object that are not
reachable anymore. GC can handle objects with cross-references and
isolated object graphs. When portion of object model becomes not
reachable it destroyed in predictable order such that child objects get
destroyed before their parents and, when possible, before objects
that are subscribed to their destruction notifications.

Internally, both pre-deployment garbage collection (that was done by
comparision of ``Objects`` and ``ObjectsCopy``) and post-deployment
orphan object collection are now done through the new GC.

MuranoPL GC utilizes Python GC to track and collect objects.
The main idea is to make ObjectStore have weak links only and
prevent hard links in other dsl places so that only links between objects
remain. Then prevent Python GC to automatically delete objects that
are not used anymore and use gc.collect() to obtain a list of
objects that are both not reachable and cannot be destroyed and then
destroy them in the correct order.

Targets-blueprint: dependency-driven-resource-deallocation
Closes-Bug: #1619635
Closes-Bug: #1597747
Change-Id: I653d8f71f003afa0a1ea588c9d14198571f5ad14
This commit is contained in:
Stan Lagun 2016-08-30 11:56:51 -07:00
parent 3ab0be1f3e
commit 584571c3d8
23 changed files with 681 additions and 257 deletions

View File

@ -221,19 +221,22 @@ class TaskExecutor(object):
try:
obj = executor.load(self.model)
except Exception as e:
executor.finalize()
return self.exception_result(e, None, '<load>')
if obj is not None:
try:
self._validate_model(obj.object, pkg_loader, executor)
except Exception as e:
executor.finalize()
return self.exception_result(e, obj, '<validate>')
try:
LOG.debug('Invoking pre-cleanup hooks')
self.session.start()
executor.cleanup(self._model)
executor.object_store.cleanup()
except Exception as e:
executor.finalize()
return self.exception_result(e, obj, '<GC>')
finally:
LOG.debug('Invoking post-cleanup hooks')
@ -249,15 +252,11 @@ class TaskExecutor(object):
action_result = self._invoke(executor)
finally:
try:
self._model, alive_object_ids = \
serializer.serialize_model(obj, executor)
LOG.debug('Cleaning up orphan objects')
n = executor.cleanup_orphans(alive_object_ids)
LOG.debug('{} orphan objects were destroyed'.format(n))
self._model = executor.finalize(obj)
except Exception as e:
return self.exception_result(e, None, '<model>')
except Exception as e:
executor.finalize()
return self.exception_result(e, obj, self.action['method'])
finally:
LOG.debug('Invoking post-execution hooks')

View File

@ -12,6 +12,8 @@
# License for the specific language governing permissions and limitations
# under the License.
import collections
import six
from murano.dsl import dsl_types
@ -19,7 +21,7 @@ from murano.dsl import dsl_types
class AttributeStore(object):
def __init__(self):
self._attributes = {}
self._attributes = collections.defaultdict(lambda: {})
@staticmethod
def _get_attribute_key(tagged_object, owner_type, name):
@ -27,32 +29,39 @@ class AttributeStore(object):
owner_type = owner_type.type
if isinstance(tagged_object, dsl_types.MuranoObjectInterface):
tagged_object = tagged_object.object
return tagged_object.object_id, owner_type.name, name
return tagged_object.object_id, (owner_type.name, name)
def get(self, tagged_object, owner_type, name):
return self._attributes.get(self._get_attribute_key(
tagged_object, owner_type, name))
key1, key2 = self._get_attribute_key(tagged_object, owner_type, name)
return self._attributes[key1].get(key2)
def set(self, tagged_object, owner_type, name, value):
if isinstance(value, dsl_types.MuranoObjectInterface):
value = value.id
elif isinstance(value, dsl_types.MuranoObject):
value = value.object_id
key = self._get_attribute_key(tagged_object, owner_type, name)
key1, key2 = self._get_attribute_key(tagged_object, owner_type, name)
if value is None:
self._attributes.pop(key, None)
self._attributes[key1].pop(key2, None)
else:
self._attributes[key] = value
self._attributes[key1][key2] = value
def serialize(self, known_objects):
return [
[key[0], key[1], key[2], value]
for key, value
in six.iteritems(self._attributes)
if key[0] in known_objects
[key1, key2[0], key2[1], value]
for key1, inner in six.iteritems(self._attributes)
for key2, value in six.iteritems(inner)
if key1 in known_objects
]
def load(self, data):
for item in data:
if item[3] is not None:
self._attributes[(item[0], item[1], item[2])] = item[3]
self._attributes[item[0]][(item[1], item[2])] = item[3]
def forget_object(self, obj):
if isinstance(obj, dsl_types.MuranoObjectInterface):
obj = obj.id
elif isinstance(obj, dsl_types.MuranoObject):
obj = obj.object_id
self._attributes.pop(obj, None)

View File

@ -202,11 +202,8 @@ class Owned(contracts.ContractMethod):
if self.value is None or isinstance(self.value, contracts.ObjRef):
return self.value
if isinstance(self.value, dsl_types.MuranoObject):
p = self.value.owner
while p is not None:
if p is self.this:
return self.value
p = p.owner
if helpers.find_object_owner(self.value, lambda t: t is self.this):
return self.value
raise exceptions.ContractViolationException(
'Object {0} violates owned() contract'.format(self.value))
raise exceptions.ContractViolationException(
@ -227,13 +224,10 @@ class NotOwned(contracts.ContractMethod):
if self.value is None or isinstance(self.value, contracts.ObjRef):
return self.value
if isinstance(self.value, dsl_types.MuranoObject):
p = self.value.owner
while p is not None:
if p is self.this:
raise exceptions.ContractViolationException(
'Object {0} violates notOwned() contract'.format(
self.value))
p = p.owner
if helpers.find_object_owner(self.value, lambda t: t is self.this):
raise exceptions.ContractViolationException(
'Object {0} violates notOwned() contract'.format(
self.value))
return self.value
raise exceptions.ContractViolationException(
'Value {0} is not an object'.format(self.value))

View File

@ -170,7 +170,7 @@ class MuranoObjectInterface(dsl_types.MuranoObjectInterface):
def func(*args, **kwargs):
self._insert_instruction()
with helpers.with_object_store(
self.__object_interface.object_store):
self.__object_interface._object_store):
context = helpers.get_context()
obj = self.__object_interface.object
return to_mutable(obj.type.invoke(
@ -190,8 +190,8 @@ class MuranoObjectInterface(dsl_types.MuranoObjectInterface):
frame[4][0].strip(), location)
def __init__(self, mpl_object):
self.__object = mpl_object
self.__object_store = helpers.get_object_store()
self._object = mpl_object
self._object_store = helpers.get_object_store()
@staticmethod
def create(mpl_object):
@ -201,11 +201,7 @@ class MuranoObjectInterface(dsl_types.MuranoObjectInterface):
@property
def object(self):
return self.__object
@property
def object_store(self):
return self.__object_store
return self._object
@property
def id(self):
@ -221,11 +217,10 @@ class MuranoObjectInterface(dsl_types.MuranoObjectInterface):
type = helpers.get_class(type)
elif isinstance(type, dsl_types.MuranoTypeReference):
type = type.type
p = self.object.owner
while p is not None:
if type.is_compatible(p):
return MuranoObjectInterface(p)
p = p.owner
owner = helpers.find_object_owner(
self.object, lambda t: type.is_compatible(t))
if owner:
return MuranoObjectInterface(owner)
if not optional:
raise ValueError('Object is not owned by any instance of type '
'{0}'.format(type.name))

View File

@ -12,9 +12,9 @@
# License for the specific language governing permissions and limitations
# under the License.
import collections
import contextlib
import itertools
import traceback
import eventlet
import eventlet.event
@ -28,11 +28,13 @@ from murano.common.i18n import _LW
from murano.dsl import attribute_store
from murano.dsl import constants
from murano.dsl import dsl
from murano.dsl import dsl_exception
from murano.dsl import dsl_types
from murano.dsl import exceptions as dsl_exceptions
from murano.dsl import helpers
from murano.dsl import object_store
from murano.dsl.principal_objects import stack_trace
from murano.dsl import serializer
from murano.dsl import yaql_integration
LOG = logging.getLogger(__name__)
@ -47,6 +49,7 @@ class MuranoDslExecutor(object):
self._object_store = object_store.ObjectStore(self)
self._locks = {}
self._root_context_cache = {}
self._static_properties = {}
@property
def object_store(self):
@ -310,122 +313,66 @@ class MuranoDslExecutor(object):
self._attribute_store.load(data.get(constants.DM_ATTRIBUTES) or [])
model = data.get(constants.DM_OBJECTS)
if model is None:
return None
result = self._object_store.load(model, None, keep_ids=True)
result = None
else:
result = self._object_store.load(model, None, keep_ids=True)
model_copy = data.get(constants.DM_OBJECTS_COPY)
if model_copy:
self._object_store.load(model_copy, None, keep_ids=True)
return dsl.MuranoObjectInterface.create(result)
def cleanup(self, data):
objects_copy = data.get(constants.DM_OBJECTS_COPY)
if not objects_copy:
def signal_destruction_dependencies(self, *objects):
if not objects:
return
gc_object_store = object_store.ObjectStore(self, self.object_store)
with helpers.with_object_store(gc_object_store):
gc_object_store.load(objects_copy, None, keep_ids=True)
objects_to_clean = []
for object_id in self._list_potential_object_ids(objects_copy):
if (gc_object_store.has(object_id, False) and
not self._object_store.has(object_id, False)):
objects_to_clean.append(object_id)
if objects_to_clean:
self._clean_garbage(gc_object_store, objects_to_clean)
elif len(objects) > 1:
return helpers.parallel_select(
objects, self.signal_destruction_dependencies)
def cleanup_orphans(self, alive_object_ids):
orphan_ids = self._collect_orphans(alive_object_ids)
self._destroy_orphans(orphan_ids)
return len(orphan_ids)
obj = objects[0]
if obj.destroyed:
return
for dependency in obj.dependencies.get('onDestruction', []):
try:
handler = dependency['handler']
if handler:
subscriber = dependency['subscriber']
if subscriber:
subscriber = subscriber()
if (subscriber and
subscriber.initialized and
not subscriber.destroyed):
method = subscriber.type.find_single_method(handler)
self.invoke_method(
method, subscriber, None, [obj], {},
invoke_action=False)
except Exception as e:
LOG.warning(_LW(
'Muted exception during destruction dependency '
'execution in {0}: {1}').format(obj, e), exc_info=True)
obj.dependencies.pop('onDestruction', None)
def _collect_orphans(self, alive_object_ids):
orphan_ids = []
for obj_id in self._object_store.iterate():
if obj_id not in alive_object_ids:
orphan_ids.append(obj_id)
return orphan_ids
def destroy_objects(self, *objects):
if not objects:
return
elif len(objects) > 1:
return helpers.parallel_select(
objects, self.destroy_objects)
def _destroy_orphans(self, orphan_ids):
with helpers.with_object_store(self.object_store):
list_of_destroyed = self._clean_garbage(self.object_store,
orphan_ids)
for obj_id in list_of_destroyed:
self._object_store.remove(obj_id)
def _destroy_object(self, obj):
obj = objects[0]
if obj.destroyed:
return
methods = obj.type.find_methods(lambda m: m.name == '.destroy')
for method in methods:
try:
method.invoke(obj, (), {}, None)
except Exception as e:
if isinstance(e, dsl_exception.MuranoPlException):
tb = e.format(prefix=' ')
else:
tb = traceback.format_exc()
LOG.warning(_LW(
'Muted exception during execution of .destroy'
'on {0}: {1}').format(obj, e), exc_info=True)
def _clean_garbage(self, object_store, d_list):
d_set = set(d_list)
dd_graph = {}
# NOTE(starodubcevna): construct a graph which looks like:
# {
# obj1_id: {subscriber1_id, subscriber2_id},
# obj2_id: {subscriber2_id, subscriber3_id}
# }
for obj_id in d_set:
obj = object_store.get(obj_id)
dds = obj.dependencies.get('onDestruction', [])
subscribers_set = {dd['subscriber'] for dd in dds}
dd_graph[obj_id] = subscribers_set
def topological(graph):
"""Topological sort implementation
This implementation will work even if we have cycle dependencies,
e.g. [a->b, b->c, c->a]. In this case the order of deletion will be
undified and it's okay.
"""
result = []
def dfs(obj_id):
for subscriber_id in graph[obj_id]:
if subscriber_id in graph:
dfs(subscriber_id)
result.append(obj_id)
del graph[obj_id]
while graph:
dfs(next(iter(graph)))
return result
deletion_order = topological(dd_graph)
destroyed = set()
for obj_id in deletion_order:
obj = object_store.get(obj_id)
dependencies = obj.dependencies.get('onDestruction', [])
for dependency in dependencies:
subscriber_id = dependency['subscriber']
if object_store.has(subscriber_id):
subscriber = object_store.get(subscriber_id)
handler = dependency['handler']
if handler:
method = subscriber.type.find_single_method(handler)
self.invoke_method(method, subscriber, None, [], {},
invoke_action=False)
self._destroy_object(obj)
destroyed.add(obj_id)
return destroyed
def _list_potential_object_ids(self, data):
if isinstance(data, dict):
for val in six.itervalues(data):
for res in self._list_potential_object_ids(val):
yield res
sys_dict = data.get('?')
if (isinstance(sys_dict, dict) and
sys_dict.get('id') and sys_dict.get('type')):
yield sys_dict['id']
elif isinstance(data, collections.Iterable) and not isinstance(
data, six.string_types):
for val in data:
for res in self._list_potential_object_ids(val):
yield res
'Muted exception during execution of .destroy '
'on {0}: {1}').format(obj, tb), exc_info=True)
def create_root_context(self, runtime_version):
context = self._root_context_cache.get(runtime_version)
@ -497,3 +444,31 @@ class MuranoDslExecutor(object):
def run(self, cls, method_name, this, args, kwargs):
with helpers.with_object_store(self.object_store):
return cls.invoke(method_name, this, args, kwargs)
def get_static_property(self, murano_type, name, context):
prop = murano_type.find_static_property(name)
cls = prop.declaring_type
value = self._static_properties.get(prop, prop.default)
return prop.transform(value, cls, None, context)
def set_static_property(self, murano_type, name, value,
context, dry_run=False):
prop = murano_type.find_static_property(name)
cls = prop.declaring_type
value = prop.transform(value, cls, None, context)
if not dry_run:
self._static_properties[prop] = prop.finalize(
value, cls, context)
def finalize(self, model_root=None):
if model_root:
used_objects = serializer.collect_objects(model_root)
self.object_store.prepare_finalize(used_objects)
model = serializer.serialize_model(model_root, self)
self.object_store.finalize()
else:
model = None
self.object_store.prepare_finalize(None)
self.object_store.finalize()
self._static_properties.clear()
return model

View File

@ -15,6 +15,7 @@
import collections
import contextlib
import functools
import gc
import inspect
import itertools
import re
@ -621,8 +622,26 @@ def weak_proxy(obj):
def weak_ref(obj):
class MuranoObjectWeakRef(weakref.ReferenceType):
def __init__(self, murano_object):
self.ref = weakref.ref(murano_object)
self.object_id = murano_object.object_id
def __call__(self):
res = self.ref()
if not res:
object_store = get_object_store()
if object_store:
res = object_store.get(self.object_id)
if res:
self.ref = weakref.ref(res)
return res
if obj is None or isinstance(obj, weakref.ReferenceType):
return obj
if isinstance(obj, dsl_types.MuranoObject):
return MuranoObjectWeakRef(obj)
return weakref.ref(obj)
@ -673,3 +692,38 @@ def format_scalar(value):
def is_passkey(value):
passkey = get_contract_passkey()
return passkey is not None and value is passkey
def find_object_owner(obj, predicate):
p = obj.owner
while p:
if predicate(p):
return p
p = p.owner
return None
# This function is not intended to be used in the code but is very useful
# for debugging object reference leaks
def walk_gc(obj, towards, handler):
visited = set()
queue = collections.deque([(obj, [])])
while queue:
item, trace = queue.popleft()
if id(item) in visited:
continue
if handler(item):
if towards:
yield trace + [item]
else:
yield [item] + trace
visited.add(id(item))
if towards:
queue.extend(
[(t, trace + [item]) for t in gc.get_referrers(item)]
)
else:
queue.extend(
[(t, [item] + trace) for t in gc.get_referents(item)]
)

View File

@ -23,6 +23,7 @@ from murano.dsl import constants
from murano.dsl import dsl
from murano.dsl import dsl_types
from murano.dsl import exceptions
from murano.dsl import helpers
from murano.dsl import yaql_functions
from murano.dsl import yaql_integration
@ -62,7 +63,8 @@ def _prepare_context():
mc = src.type
else:
mc = src
mc.set_property(key, value, context['#root_context'])
helpers.get_executor().set_static_property(
mc, key, value, context['#root_context'])
else:
raise ValueError(
'attribution may only be applied to '
@ -76,7 +78,15 @@ def _prepare_context():
return src.get_property(key, context['#root_context'])
except exceptions.UninitializedPropertyAccessError:
return {}
elif isinstance(src, (
dsl_types.MuranoTypeReference,
dsl_types.MuranoType)):
if isinstance(src, dsl_types.MuranoTypeReference):
mc = src.type
else:
mc = src
return helpers.get_executor().get_static_property(
mc, key, context['#root_context'])
else:
raise ValueError(
'attribution may only be applied to '

View File

@ -61,7 +61,8 @@ class MetaData(MetaProvider):
def instantiate(context):
obj = helpers.get_object_store().load(
template, owner=None,
context=context, scope_type=declaring_type)
context=context, scope_type=declaring_type,
bypass_store=True)
obj.declaring_type = declaring_type
return obj
return instantiate

View File

@ -25,64 +25,67 @@ from murano.dsl import yaql_integration
class MuranoObject(dsl_types.MuranoObject):
def __init__(self, murano_class, owner, object_id=None, name=None,
known_classes=None, this=None):
self.__initialized = False
self._initialized = False
self._destroyed = False
if known_classes is None:
known_classes = {}
self.__owner = helpers.weak_ref(owner.real_this if owner else None)
self.__object_id = object_id or helpers.generate_id()
self.__type = murano_class
self.__properties = {}
self.__parents = {}
self.__this = this
self.__name = name
self.__extension = None
self.__config = murano_class.package.get_class_config(
if this is None:
self._owner = owner
self._object_id = object_id or helpers.generate_id()
self._type = murano_class
self._properties = {}
self._parents = {}
self._this = this
self._name = name
self._extension = None
self._executor = helpers.weak_ref(helpers.get_executor())
self._config = murano_class.package.get_class_config(
murano_class.name)
if not isinstance(self.__config, dict):
self.__config = {}
if not isinstance(self._config, dict):
self._config = {}
known_classes[murano_class.name] = self
for parent_class in murano_class.parents:
name = parent_class.name
if name not in known_classes:
obj = MuranoObject(
parent_class, owner,
object_id=self.__object_id,
object_id=self._object_id,
known_classes=known_classes, this=self.real_this)
self.__parents[name] = known_classes[name] = obj
self._parents[name] = known_classes[name] = obj
else:
self.__parents[name] = known_classes[name]
self.__dependencies = {}
self._parents[name] = known_classes[name]
self._dependencies = {}
@property
def extension(self):
return self.__extension
return self._extension
@property
def name(self):
return self.real_this.__name
return self.real_this._name
@extension.setter
def extension(self, value):
self.__extension = value
self._extension = value
def initialize(self, context, params, used_names=None):
context = context.create_child_context()
context[constants.CTX_ALLOW_PROPERTY_WRITES] = True
object_store = helpers.get_object_store()
for property_name in self.__type.properties:
spec = self.__type.properties[property_name]
for property_name in self.type.properties:
spec = self.type.properties[property_name]
if spec.usage == dsl_types.PropertyUsages.Config:
if property_name in self.__config:
property_value = self.__config[property_name]
if property_name in self._config:
property_value = self._config[property_name]
else:
property_value = dsl.NO_VALUE
self.set_property(property_name, property_value,
dry_run=self.__initialized)
dry_run=self._initialized)
init = self.type.methods.get('.init')
used_names = used_names or set()
names = set(self.__type.properties)
names = set(self.type.properties)
if init:
names.update(six.iterkeys(init.arguments_scheme))
last_errors = len(names)
@ -94,7 +97,7 @@ class MuranoObject(dsl_types.MuranoObject):
spec = init.arguments_scheme[property_name]
is_init_arg = True
else:
spec = self.__type.properties[property_name]
spec = self.type.properties[property_name]
is_init_arg = False
if property_name in used_names:
@ -116,7 +119,7 @@ class MuranoObject(dsl_types.MuranoObject):
else:
self.set_property(
property_name, property_value, context,
dry_run=self.__initialized)
dry_run=self._initialized)
used_names.add(property_name)
except exceptions.UninitializedPropertyAccessError:
errors += 1
@ -130,8 +133,8 @@ class MuranoObject(dsl_types.MuranoObject):
last_errors = errors
if (not object_store.initializing and
self.__extension is None and
not self.__initialized and
self._extension is None and
not self._initialized and
not helpers.is_objects_dry_run_mode()):
method = self.type.methods.get('__init__')
if method:
@ -140,7 +143,7 @@ class MuranoObject(dsl_types.MuranoObject):
yield lambda: method.invoke(
self, filtered_params[0], filtered_params[1], context)
for parent in self.__parents.values():
for parent in self._parents.values():
for t in parent.initialize(context, params, used_names):
yield t
@ -149,41 +152,46 @@ class MuranoObject(dsl_types.MuranoObject):
context[constants.CTX_ARGUMENT_OWNER] = self.real_this
init.invoke(self.real_this, (), init_args,
context.create_child_context())
self.__initialized = True
self._initialized = True
if (not object_store.initializing
and not helpers.is_objects_dry_run_mode()
and not self.__initialized):
and not self._initialized):
yield run_init
@property
def object_id(self):
return self.__object_id
return self._object_id
@property
def type(self):
return self.__type
return self._type
@property
def owner(self):
if self.__owner is None:
return None
return self.__owner()
if self._this is None:
return self._owner
else:
return self.real_this.owner
@property
def real_this(self):
return self.__this or self
return self._this or self
@property
def executor(self):
return self._executor()
@property
def initialized(self):
return self.__initialized
return self._initialized
@property
def dependencies(self):
return self.__dependencies
return self._dependencies
def get_property(self, name, context=None):
start_type, derived = self.__type, False
start_type, derived = self.type, False
caller_class = None if not context else helpers.get_type(context)
if caller_class is not None and caller_class.is_compatible(self):
start_type, derived = caller_class, True
@ -193,7 +201,8 @@ class MuranoObject(dsl_types.MuranoObject):
if len(declared_properties) > 0:
spec = self.real_this.type.find_single_property(name)
if spec.usage == dsl_types.PropertyUsages.Static:
return spec.declaring_type.get_property(name, context)
return self.executor.get_static_property(
spec.declaring_type, name, context)
else:
return self.real_this._get_property_value(name)
elif derived:
@ -203,10 +212,10 @@ class MuranoObject(dsl_types.MuranoObject):
def _get_property_value(self, name):
try:
return self.__properties[name]
return self._properties[name]
except KeyError:
raise exceptions.UninitializedPropertyAccessError(
name, self.__type)
name, self.type)
def set_property(self, name, value, context=None, dry_run=False):
start_type, derived = self.real_this.type, False
@ -214,7 +223,7 @@ class MuranoObject(dsl_types.MuranoObject):
if caller_class is not None and caller_class.is_compatible(self):
start_type, derived = caller_class, True
if context is None:
context = helpers.get_executor().create_object_context(self)
context = self.executor.create_object_context(self)
declared_properties = start_type.find_properties(
lambda p: p.name == name)
if len(declared_properties) > 0:
@ -230,7 +239,7 @@ class MuranoObject(dsl_types.MuranoObject):
if spec.usage == dsl_types.PropertyUsages.Static:
default = None
else:
default = self.__config.get(name, spec.default)
default = self._config.get(name, spec.default)
if spec is ultimate_spec:
value = spec.transform(
@ -242,26 +251,27 @@ class MuranoObject(dsl_types.MuranoObject):
if len(property_list) > 1:
value = ultimate_spec.finalize(value, self.real_this, context)
if ultimate_spec.usage == dsl_types.PropertyUsages.Static:
ultimate_spec.declaring_type.set_property(name, value, context,
dry_run=dry_run)
self.executor.set_static_property(
ultimate_spec.declaring_type, name, value, context,
dry_run=dry_run)
elif not dry_run:
self.real_this.__properties[name] = value
self.real_this._properties[name] = value
elif derived:
if not dry_run:
obj = self.cast(caller_class)
obj.__properties[name] = value
obj._properties[name] = value
else:
raise exceptions.PropertyWriteError(name, start_type)
def cast(self, cls):
for p in helpers.traverse(self, lambda t: t.__parents.values()):
for p in helpers.traverse(self, lambda t: t._parents.values()):
if p.type == cls:
return p
raise TypeError('Cannot cast {0} to {1}'.format(self.type, cls))
def _list_properties(self, name):
for p in helpers.traverse(
self.real_this, lambda t: t.__parents.values()):
self.real_this, lambda t: t._parents.values()):
if name in p.type.properties:
yield p.type.properties[name]
@ -274,17 +284,17 @@ class MuranoObject(dsl_types.MuranoObject):
allow_refs=False):
context = helpers.get_context()
result = {}
for parent in self.__parents.values():
for parent in self._parents.values():
result.update(parent.to_dictionary(
include_hidden, dsl_types.DumpTypes.Serializable,
allow_refs))
skip_usages = (dsl_types.PropertyUsages.Runtime,
dsl_types.PropertyUsages.Config)
for property_name in self.type.properties:
if property_name in self.real_this.__properties:
if property_name in self.real_this._properties:
spec = self.type.properties[property_name]
if spec.usage not in skip_usages or include_hidden:
prop_value = self.real_this.__properties[property_name]
prop_value = self.real_this._properties[property_name]
if isinstance(prop_value, MuranoObject) and allow_refs:
meta = [m for m in spec.get_meta(context)
if m.type.name == ('io.murano.metadata.'
@ -313,3 +323,49 @@ class MuranoObject(dsl_types.MuranoObject):
'name': self.name
}})
return result
def mark_destroyed(self, clear_data=False):
self._destroyed = True
self._suppress__del__ = None
if clear_data or not self.initialized:
self._extension = None
self._properties = None
self._owner = None
self._this = None
for p in six.itervalues(self._parents):
p.mark_destroyed(clear_data)
@property
def destroyed(self):
return self._destroyed
class RecyclableMuranoObject(MuranoObject):
def __init__(self, *args, **kwargs):
# Create self-reference to prevent __del__ from being called
# automatically when there are no other objects referring to this one.
# Without this reference __del__ will get called immediately after
# reference counter will go to 0 and the object will put itself into
# pending list creating another reference to itself and thus preventing
# its child objects from being deleted. After the .destroy method
# child objects will become eligible for destruction but will be
# unable to use find() method since their owner will be destroyed
# and collected at that point. With this reference gc.collect()
# will collect the whole object graph at once and then we could
# sort it and destroy in the correct order so that child objects
# will be destroyed first.
self._suppress__del__ = self
super(RecyclableMuranoObject, self).__init__(*args, **kwargs)
def __del__(self):
# For Py2 the purpose of __del__ (in combination with _suppress__del__)
# method is just to prevent object from being released automatically.
# In Py3 the gc.collect list will be empty and __del__ will be called
# for objects that were not destroyed yet.
if self._this is None and self._initialized and not self._destroyed:
self.executor.object_store.schedule_object_destruction(self)
def mark_destroyed(self, clear_data=False):
self.executor.attribute_store.forget_object(self)
super(RecyclableMuranoObject, self).mark_destroyed(clear_data)

View File

@ -88,7 +88,6 @@ class MuranoClass(dsl_types.MuranoClass, MuranoType, dslmeta.MetaProvider):
self._parents = self._adjusted_parents(remappings)
self._context = None
self._exported_context = None
self._property_values = {}
self._meta = dslmeta.MetaData(meta, dsl_types.MetaTargets.Type, self)
self._meta_values = None
self._imports = list(self._resolve_imports(imports))
@ -426,19 +425,6 @@ class MuranoClass(dsl_types.MuranoClass, MuranoType, dslmeta.MetaProvider):
m.static_stub, name=m.static_stub.name)
return self._exported_context
def get_property(self, name, context):
prop = self.find_static_property(name)
cls = prop.declaring_type
value = cls._property_values.get(name, prop.default)
return prop.transform(value, cls, None, context)
def set_property(self, name, value, context, dry_run=False):
prop = self.find_static_property(name)
cls = prop.declaring_type
value = prop.transform(value, cls, None, context)
if not dry_run:
cls._property_values[name] = value
def get_meta(self, context):
if self._meta_values is None:
executor = helpers.get_executor()

View File

@ -12,8 +12,11 @@
# License for the specific language governing permissions and limitations
# under the License.
import collections
import gc
import weakref
from oslo_log import log as logging
import six
from murano.dsl import dsl_types
@ -21,12 +24,16 @@ from murano.dsl import helpers
from murano.dsl import murano_object
LOG = logging.getLogger(__name__)
class ObjectStore(object):
def __init__(self, executor, parent_store=None):
def __init__(self, executor, parent_store=None, weak_store=True):
self._parent_store = parent_store
self._store = {}
self._store = weakref.WeakValueDictionary() if weak_store else {}
self._designer_attributes_store = {}
self._executor = weakref.ref(executor)
self._pending_destruction = set()
@property
def executor(self):
@ -35,7 +42,7 @@ class ObjectStore(object):
def get(self, object_id, check_parent_store=True):
result = self._store.get(object_id)
if not result and self._parent_store and check_parent_store:
return self._parent_store.get(object_id)
return self._parent_store.get(object_id, check_parent_store)
return result
def has(self, object_id, check_parent_store=True):
@ -48,6 +55,10 @@ class ObjectStore(object):
def put(self, murano_object, object_id=None):
self._store[object_id or murano_object.object_id] = murano_object
def schedule_object_destruction(self, murano_object):
self._pending_destruction.add(murano_object)
self._store[murano_object.object_id] = murano_object
def iterate(self):
return six.iterkeys(self._store)
@ -55,7 +66,8 @@ class ObjectStore(object):
self._store.pop(object_id)
def load(self, value, owner, default_type=None,
scope_type=None, context=None, keep_ids=False):
scope_type=None, context=None, keep_ids=False,
bypass_store=False):
# do the object model load in a temporary object store and copy
# loaded objects here after that
model_store = InitializationObjectStore(
@ -67,7 +79,8 @@ class ObjectStore(object):
for obj_id in model_store.iterate():
obj = model_store.get(obj_id)
if obj.initialized:
self.put(obj)
if not bypass_store:
self.put(obj)
return result
@staticmethod
@ -86,6 +99,126 @@ class ObjectStore(object):
def parent_store(self):
return self._parent_store
def cleanup(self):
LOG.debug('Cleaning up orphan objects')
with helpers.with_object_store(self):
n = self._collect_garbage()
LOG.debug('{} orphan objects were destroyed'.format(n))
return n
def prepare_finalize(self, used_objects):
used_objects = set(used_objects) if used_objects else []
sentenced_objects = [
obj for obj in six.itervalues(self._store)
if obj not in used_objects
]
with helpers.with_object_store(self):
if sentenced_objects:
for __ in self._destroy_garbage(sentenced_objects):
pass
def finalize(self):
with helpers.with_object_store(self):
for t in self._store.values():
t.mark_destroyed(True)
self._pending_destruction.clear()
self._store.clear()
def _collect_garbage(self):
repeat = True
count = 0
while repeat:
repeat = False
gc.collect()
for obj in gc.garbage:
if (isinstance(obj, murano_object.RecyclableMuranoObject) and
obj.executor is self._executor()):
repeat = True
if obj.initialized and not obj.destroyed:
self.schedule_object_destruction(obj)
else:
obj.mark_destroyed(True)
obj = None
del gc.garbage[:]
if self._pending_destruction:
for obj in self._destroy_garbage(self._pending_destruction):
if obj in self._pending_destruction:
repeat = True
obj.mark_destroyed()
self._pending_destruction.remove(obj)
count += 1
return count
def _destroy_garbage(self, sentenced_objects):
dd_graph = {}
# NOTE(starodubcevna): construct a graph which looks like:
# {
# obj1: [subscriber1, subscriber2],
# obj2: [subscriber2, subscriber3]
# }
for obj in sentenced_objects:
obj_subscribers = [obj.owner]
dds = obj.dependencies.get('onDestruction', [])
for dd in dds:
subscriber = dd['subscriber']
if subscriber:
subscriber = subscriber()
if subscriber and subscriber not in obj_subscribers:
obj_subscribers.append(subscriber)
dd_graph[obj] = obj_subscribers
def topological(graph):
"""Topological sort implementation
This implementation will work even if we have cycle dependencies,
e.g. [a->b, b->c, c->a]. In this case the order of deletion will be
undefined and it's okay.
"""
visited = collections.defaultdict(int)
indexes = collections.defaultdict(int)
def dfs(obj):
visited[obj] += 1
subscribers = graph.get(obj)
if subscribers is not None:
m = 0
for i, subscriber in enumerate(subscribers):
if i == 0 or not visited[subscriber]:
for t in dfs(subscriber):
yield t
m = max(m, indexes[subscriber])
if visited.get(obj, 0) <= 2:
visited[obj] += 1
indexes[obj] = m + 1
yield obj, m + 1
for i, obj in enumerate(graph.keys()):
if not visited[obj]:
for t in dfs(obj):
yield t
visited.clear()
indexes.clear()
order = collections.defaultdict(list)
for obj, index in topological(dd_graph):
order[index].append(obj)
for key in sorted(order):
group = order[key]
self.executor.signal_destruction_dependencies(*group)
for key in sorted(order, reverse=True):
group = order[key]
self.executor.destroy_objects(*group)
for t in group:
yield t
# Temporary ObjectStore to load object graphs. Does 2-phase load
# and maintains internal state on what phase is currently running
@ -97,7 +230,7 @@ class ObjectStore(object):
class InitializationObjectStore(ObjectStore):
def __init__(self, root_owner, parent_store, keep_ids):
super(InitializationObjectStore, self).__init__(
parent_store.executor, parent_store)
parent_store.executor, parent_store, weak_store=False)
self._initializing = False
self._root_owner = root_owner
self._keep_ids = keep_ids
@ -124,10 +257,17 @@ class InitializationObjectStore(ObjectStore):
if isinstance(class_obj, dsl_types.MuranoTypeReference):
class_obj = class_obj.type
object_id = parsed['id']
is_tmp_object = (object_id is None and
owner is not self._root_owner and
self._initializing)
obj = None if object_id is None else self.get(
object_id, self._keep_ids)
if not obj:
obj = murano_object.MuranoObject(
if is_tmp_object or helpers.is_objects_dry_run_mode():
mo_type = murano_object.MuranoObject
else:
mo_type = murano_object.RecyclableMuranoObject
obj = mo_type(
class_obj, owner,
name=parsed['name'],
object_id=object_id if self._keep_ids else None)

View File

@ -18,6 +18,7 @@ from yaql import yaqlization
from murano.dsl import dsl
from murano.dsl import dsl_types
from murano.dsl import helpers
from murano.dsl import meta
@ -109,8 +110,8 @@ def property_owner(murano_property):
@specs.method
def property_get_value(context, property_, object_):
if object_ is None:
return property_.declaring_type.get_property(
name=property_.name, context=context)
return helpers.get_executor().get_static_property(
property_.declaring_type, name=property_.name, context=context)
return object_.cast(property_.declaring_type).get_property(
name=property_.name, context=context)
@ -122,7 +123,8 @@ def property_get_value(context, property_, object_):
@specs.method
def property_set_value(context, property_, object_, value):
if object_ is None:
property_.declaring_type.set_property(
helpers.get_executor().set_static_property(
property_.declaring_type,
name=property_.name, value=value, context=context)
else:
object_.cast(property_.declaring_type).set_property(

View File

@ -34,7 +34,7 @@ def serialize(obj, executor,
make_copy=False,
serialize_attributes=False,
serialize_actions=False,
serialization_type=serialization_type)[0]['Objects']
serialization_type=serialization_type)['Objects']
def _serialize_object(root_object, designer_attributes, allow_refs,
@ -66,7 +66,6 @@ def serialize_model(root_object, executor,
tree = None
tree_copy = None
attributes = []
serialized_objects = set()
else:
with helpers.with_object_store(executor.object_store):
tree, serialized_objects = _serialize_object(
@ -84,7 +83,7 @@ def serialize_model(root_object, executor,
'Objects': tree,
'ObjectsCopy': tree_copy,
'Attributes': attributes
}, serialized_objects
}
def _serialize_available_action(obj, current_actions, executor):
@ -223,3 +222,27 @@ def is_nested_in(obj, ancestor):
if obj is None:
return False
obj = obj.owner
def collect_objects(root_object):
visited = set()
def rec(obj):
if utils.is_sequence(obj) or isinstance(obj, utils.SetType):
for value in obj:
for t in rec(value):
yield t
elif isinstance(obj, utils.MappingType):
for value in six.itervalues(obj):
for t in rec(value):
yield t
elif isinstance(obj, dsl_types.MuranoObjectInterface):
for t in rec(obj.object):
yield t
elif isinstance(obj, dsl_types.MuranoObject) and obj not in visited:
visited.add(obj)
yield obj
for t in rec(obj.to_dictionary()):
yield t
return rec(root_object)

View File

@ -165,7 +165,8 @@ def obj_attribution(context, obj, property_name):
@specs.parameter('property_name', yaqltypes.Keyword())
@specs.name('#operator_.')
def obj_attribution_static(context, cls, property_name):
return cls.type.get_property(property_name, context)
return helpers.get_executor().get_static_property(
cls.type, property_name, context)
@specs.parameter('receiver', dsl.MuranoObjectParameter(decorate=False))

View File

@ -30,30 +30,51 @@ class GarbageCollector(object):
@specs.parameter('handler', yaqltypes.String(nullable=True))
def subscribe_destruction(target, handler=None):
caller_context = helpers.get_caller_context()
target_this = target.object
target_this = target.object.real_this
receiver = helpers.get_this(caller_context)
if handler:
receiver.type.find_single_method(handler)
dependency = {'subscriber': receiver.object_id,
'handler': handler}
target_this.dependencies.setdefault(
'onDestruction', []).append(dependency)
dependency = GarbageCollector._find_dependency(
target_this, receiver, handler)
if not dependency:
dependency = {'subscriber': helpers.weak_ref(receiver),
'handler': handler}
target_this.dependencies.setdefault(
'onDestruction', []).append(dependency)
@staticmethod
@specs.parameter('target', dsl.MuranoObjectParameter())
@specs.parameter('handler', yaqltypes.String(nullable=True))
def unsubscibe_destruction(target, handler=None):
def unsubscribe_destruction(target, handler=None):
caller_context = helpers.get_caller_context()
target_this = target.object
target_this = target.object.real_this
receiver = helpers.get_this(caller_context)
if handler:
receiver.type.find_single_method(handler)
dependency = {'subscriber': receiver.object_id,
'handler': handler}
dds = target_this.dependencies.get('onDestruction', [])
if dependency in dds:
dependency = GarbageCollector._find_dependency(
target_this, receiver, handler)
if dependency:
dds.remove(dependency)
@staticmethod
def _find_dependency(target, subscriber, handler):
dds = target.dependencies.get('onDestruction', [])
for dd in dds:
if dd['handler'] != handler:
continue
d_subscriber = dd['subscriber']
if d_subscriber:
d_subscriber = d_subscriber()
if d_subscriber == subscriber:
return dd
@staticmethod
def collect():
helpers.get_executor().object_store.cleanup()

View File

@ -83,9 +83,11 @@ class Runner(object):
self.executor = executor.MuranoDslExecutor(
package_loader, TestContextManager(functions),
execution_session.ExecutionSession())
self._root = self.executor.load(model).object
self._root = self.executor.load(model)
if self._root:
self._root = self._root.object
if 'ObjectsCopy' in model:
self.executor.cleanup(model)
self.executor.object_store.cleanup()
def _execute(self, name, obj, *args, **kwargs):
try:
@ -137,7 +139,7 @@ class Runner(object):
@property
def serialized_model(self):
return serializer.serialize_model(self._root, self.executor)[0]
return serializer.serialize_model(self._root, self.executor)
@property
def preserve_exception(self):

View File

@ -39,10 +39,18 @@ class DslTestCase(base.MuranoTestCase):
self.register_function(
lambda data: self._traces.append(data), 'trace')
self._traces = []
self._runners = []
eventlet.debug.hub_exceptions(False)
def new_runner(self, model):
return runner.Runner(model, self.package_loader, self._functions)
r = runner.Runner(model, self.package_loader, self._functions)
self._runners.append(r)
return r
def tearDown(self):
super(DslTestCase, self).tearDown()
for r in self._runners:
r.executor.finalize(r.root)
@property
def traces(self):

View File

@ -0,0 +1,50 @@
Name: TestGCNode
Extends: Node
Methods:
.init:
Body:
- $.find(Node)
.destroy:
Body:
- trace($.value)
---
Namespaces:
sys: io.murano.system
Name: TestGC
Methods:
testObjectsCollect:
Body:
- $model:
:TestGCNode:
value: A
nodes:
- :TestGCNode:
value: B
- new($model)
- sys:GC.collect()
testObjectsCollectWithSubscription:
Body:
- $model:
:TestGCNode:
value: A
nodes:
:TestGCNode:
value: B
- $x: new($model)
- sys:GC.subscribeDestruction($x, _handler)
- sys:GC.subscribeDestruction($x.nodes[0], _handler)
- $x: null
- sys:GC.collect()
_handler:
Arguments:
- obj:
Contract: $.class(TestGCNode).notNull()
Body:
- trace('Destroy ' + $obj.value)

View File

@ -33,6 +33,10 @@ Properties:
Usage: Static
Default: xxx
staticDictProperty:
Contract: {}
Usage: Static
conflictingStaticProperty:
Contract: $.string()
Default: 'conflictingStaticProperty-child'
@ -172,6 +176,16 @@ Methods:
- type('test.TestStatics').staticProperty: qq
- Return: type('test.TestStatics').staticProperty
testModifyStaticDictProperty:
Body:
Return: $.modifyStaticDictProperty()
modifyStaticDictProperty:
Usage: Static
Body:
- :TestStatics.staticDictProperty.key: value
- Return: $.staticDictProperty
testPropertyIsStatic:
Body:
Return: $.modifyStaticPropertyOnInstance()

View File

@ -0,0 +1,56 @@
# Copyright (c) 2016 Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from murano.engine.system import garbage_collector
from murano.tests.unit.dsl.foundation import object_model as om
from murano.tests.unit.dsl.foundation import test_case
class TestGC(test_case.DslTestCase):
def setUp(self):
super(TestGC, self).setUp()
self.package_loader.load_package('io.murano', None).register_class(
garbage_collector.GarbageCollector)
self.runner = self.new_runner(om.Object('TestGC'))
def test_model_destroyed(self):
model = om.Object(
'TestGCNode', 'root',
value='root',
nodes=[
om.Object(
'TestGCNode', 'node1',
value='node1',
nodes=['root', 'node2']
),
om.Object(
'TestGCNode', 'node2',
value='node2',
nodes=['root', 'node1']
),
]
)
model = {'Objects': None, 'ObjectsCopy': model}
self.new_runner(model)
self.assertItemsEqual(['node1', 'node2'], self.traces[:2])
self.assertEqual('root', self.traces[-1])
def test_collect_from_code(self):
self.runner.testObjectsCollect()
self.assertEqual(['B', 'A'], self.traces)
def test_collect_with_subscription(self):
self.runner.testObjectsCollectWithSubscription()
self.assertEqual(['Destroy A', 'Destroy B', 'B', 'A'], self.traces)

View File

@ -100,6 +100,10 @@ class TestStatics(test_case.DslTestCase):
self.assertEqual(
'qq', self._runner.testModifyStaticPropertyUsingTypeFunc())
def test_modify_static_dict_property(self):
self.assertEqual(
{'key': 'value'}, self._runner.testModifyStaticDictProperty())
def test_property_is_static(self):
self.assertEqual('qq', self._runner.testPropertyIsStatic())

View File

@ -12,7 +12,10 @@
# License for the specific language governing permissions and limitations
# under the License.
import _weakref
import mock
from testtools import matchers
import unittest
from murano.dsl import dsl
@ -42,25 +45,33 @@ class TestGarbageCollector(base.MuranoTestCase):
def test_set_dd(self, this, caller_context):
this.return_value = self.mock_subscriber
target_0 = mock.MagicMock(spec=dsl.MuranoObjectInterface)
target_0.object = mock.MagicMock(murano_object.MuranoObject)
target_0.object = self.mock_subscriber
target_0.object.real_this = self.mock_subscriber
target_0.object.dependencies = {}
garbage_collector.GarbageCollector.subscribe_destruction(
target_0, handler="mockHandler")
self.assertEqual([{"subscriber": "1234", "handler": "mockHandler"}],
target_0.object.dependencies["onDestruction"])
dep = self.mock_subscriber.dependencies["onDestruction"]
self.assertThat(dep, matchers.HasLength(1))
dep = dep[0]
self.assertEqual("mockHandler", dep["handler"])
self.assertEqual(self.mock_subscriber, dep["subscriber"]())
@mock.patch("murano.dsl.helpers.get_caller_context")
@mock.patch("murano.dsl.helpers.get_this")
def test_unset_dd(self, this, caller_context):
this.return_value = self.mock_subscriber
target_1 = mock.MagicMock(spec=dsl.MuranoObjectInterface)
target_1.object = mock.MagicMock(murano_object.MuranoObject)
target_1.object = self.mock_subscriber
target_1.object.real_this = self.mock_subscriber
target_1.object.dependencies = (
{"onDestruction": [{"subscriber": "1234",
"handler": "mockHandler"}]})
garbage_collector.GarbageCollector.unsubscibe_destruction(
{"onDestruction": [{
"subscriber": _weakref.ref(self.mock_subscriber),
"handler": "mockHandler"
}]})
garbage_collector.GarbageCollector.unsubscribe_destruction(
target_1, handler="mockHandler")
self.assertEqual([], target_1.object.dependencies["onDestruction"])
self.assertEqual(
[], self.mock_subscriber.dependencies["onDestruction"])
@unittest.skip("WIP")
@mock.patch("murano.dsl.helpers.get_caller_context")

View File

@ -0,0 +1,13 @@
---
features:
- New on-request garbage collector for MuranoPL objects were implemented.
Garbage collection is triggered by io.murano.system.GC.collect()
static method. Garbage collector destroys all object that are not
reachable anymore. GC can handle objects with cross-references and
isolated object graphs. When portion of object model becomes not
reachable it destroyed in predictable order such that child objects get
destroyed before their parents and, when possible, before objects
that are subscribed to their destruction notifications.
- Internally, both pre-deployment garbage collection (that was done by
comparision of ``Objects`` and ``ObjectsCopy``) and post-deployment
orphan object collection are now done through the new GC.